• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4926

13 Jan 2026 05:43AM UTC coverage: 66.053% (-0.05%) from 66.107%
#4926

push

travis-ci

web-flow
feat: [6654385780] show snap progress (#34203)

48 of 59 new or added lines in 7 files covered. (81.36%)

582 existing lines in 124 files now uncovered.

200362 of 303334 relevant lines covered (66.05%)

132283104.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.4
/source/libs/sync/src/syncMain.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "sync.h"
18
#include "syncAppendEntries.h"
19
#include "syncAppendEntriesReply.h"
20
#include "syncCommit.h"
21
#include "syncElection.h"
22
#include "syncEnv.h"
23
#include "syncIndexMgr.h"
24
#include "syncInt.h"
25
#include "syncMessage.h"
26
#include "syncPipeline.h"
27
#include "syncRaftCfg.h"
28
#include "syncRaftLog.h"
29
#include "syncRaftStore.h"
30
#include "syncReplication.h"
31
#include "syncRequestVote.h"
32
#include "syncRequestVoteReply.h"
33
#include "syncRespMgr.h"
34
#include "syncSnapshot.h"
35
#include "syncTimeout.h"
36
#include "syncUtil.h"
37
#include "syncVoteMgr.h"
38
#include "tglobal.h"
39
#include "tmisce.h"
40
#include "tref.h"
41

42
static void    syncNodeEqPingTimer(void* param, void* tmrId);
43
static void    syncNodeEqElectTimer(void* param, void* tmrId);
44
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
45
static int32_t syncNodeAppendNoop(SSyncNode* ths);
46
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
47
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
48
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
49
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
50
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
51
static int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
52
static bool    syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
53
static int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
54
static bool    syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);
55

56
static bool    syncNodeCanChange(SSyncNode* pSyncNode);
57
static int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
58
static int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
59
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);
60

61
static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
62

63
int64_t syncOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
4,222,232✔
64
  sInfo("vgId:%d, start to open sync", pSyncInfo->vgId);
4,222,232✔
65

66
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo, vnodeVersion, pSyncInfo->electMs, pSyncInfo->heartbeatMs);
4,222,232✔
67
  if (pSyncNode == NULL) {
4,222,593✔
68
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
×
69
    return -1;
×
70
  }
71

72
  pSyncNode->rid = syncNodeAdd(pSyncNode);
4,222,593✔
73
  if (pSyncNode->rid < 0) {
4,222,267✔
74
    syncNodeClose(pSyncNode);
×
75
    return -1;
×
76
  }
77

78
  pSyncNode->pingBaseLine = pSyncInfo->pingMs;
4,222,593✔
79
  pSyncNode->pingTimerMS = pSyncInfo->pingMs;
4,222,430✔
80
  pSyncNode->electBaseLine = pSyncInfo->electMs;
4,222,549✔
81
  pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
4,222,593✔
82
  pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
4,222,593✔
83
  pSyncNode->msgcb = pSyncInfo->msgcb;
4,222,593✔
84
  sInfo("vgId:%d, sync opened, electBaseLine:%d, hbBaseLine:%d", pSyncInfo->vgId, pSyncNode->electBaseLine,
4,222,593✔
85
        pSyncNode->hbBaseLine);
86
  return pSyncNode->rid;
4,222,637✔
87
}
88

89
int32_t syncStart(int64_t rid) {
4,222,532✔
90
  int32_t    code = 0;
4,222,532✔
91
  int32_t    vgId = 0;
4,222,532✔
92
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
4,222,532✔
93
  if (pSyncNode == NULL) {
4,222,532✔
94
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
95
    if (terrno != 0) code = terrno;
×
96
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
97
    TAOS_RETURN(code);
×
98
  }
99
  vgId = pSyncNode->vgId;
4,222,532✔
100
  sInfo("vgId:%d, begin to start sync", pSyncNode->vgId);
4,222,532✔
101

102
  if ((code = syncNodeRestore(pSyncNode)) < 0) {
4,222,532✔
103
    sError("vgId:%d, failed to restore sync log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
104
    goto _err;
×
105
  }
106
  sInfo("vgId:%d, sync node restore is executed", pSyncNode->vgId);
4,222,402✔
107

108
  if ((code = syncNodeStart(pSyncNode)) < 0) {
4,222,402✔
109
    sError("vgId:%d, failed to start sync node since %s", pSyncNode->vgId, tstrerror(code));
×
110
    goto _err;
×
111
  }
112
  sInfo("vgId:%d, sync node start is executed", pSyncNode->vgId);
4,222,532✔
113

114
  syncNodeRelease(pSyncNode);
4,222,532✔
115

116
  sInfo("vgId:%d, sync started", vgId);
4,222,532✔
117

118
  TAOS_RETURN(code);
4,222,532✔
119

120
_err:
×
121
  syncNodeRelease(pSyncNode);
×
122
  TAOS_RETURN(code);
×
123
}
124

125
int32_t syncNodeGetConfig(int64_t rid, SSyncCfg* cfg) {
4,898,940✔
126
  int32_t    code = 0;
4,898,940✔
127
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
4,898,940✔
128

129
  if (pSyncNode == NULL) {
4,899,702✔
130
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
131
    if (terrno != 0) code = terrno;
×
132
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
133
    TAOS_RETURN(code);
×
134
  }
135

136
  *cfg = pSyncNode->raftCfg.cfg;
4,899,702✔
137

138
  syncNodeRelease(pSyncNode);
4,899,702✔
139

140
  return 0;
4,899,166✔
141
}
142

143
void syncStop(int64_t rid) {
4,222,532✔
144
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
4,222,532✔
145
  if (pSyncNode != NULL) {
4,222,532✔
146
    pSyncNode->isStart = false;
4,222,532✔
147
    syncNodeRelease(pSyncNode);
4,222,532✔
148
    syncNodeRemove(rid);
4,222,532✔
149
  }
150
}
4,222,532✔
151

152
void syncPreStop(int64_t rid) {
4,222,532✔
153
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
4,222,532✔
154
  if (pSyncNode != NULL) {
4,222,532✔
155
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
4,222,532✔
UNCOV
156
      sInfo("vgId:%d, stop snapshot receiver", pSyncNode->vgId);
×
UNCOV
157
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
158
    }
159
    syncNodePreClose(pSyncNode);
4,222,532✔
160
    syncNodeRelease(pSyncNode);
4,221,853✔
161
  }
162
}
4,222,532✔
163

164
void syncPostStop(int64_t rid) {
3,816,218✔
165
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
3,816,218✔
166
  if (pSyncNode != NULL) {
3,816,499✔
167
    syncNodePostClose(pSyncNode);
3,816,499✔
168
    syncNodeRelease(pSyncNode);
3,816,499✔
169
  }
170
}
3,816,499✔
171

172
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
350,298✔
173
  if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
350,298✔
174
  return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
350,298✔
175
}
176

177
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
369,782✔
178
  int32_t    code = 0;
369,782✔
179
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
369,782✔
180
  if (pSyncNode == NULL) {
369,782✔
181
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
182
    if (terrno != 0) code = terrno;
×
183
    TAOS_RETURN(code);
×
184
  }
185

186
  if (pSyncNode->raftCfg.lastConfigIndex >= pNewCfg->lastIndex) {
369,782✔
187
    syncNodeRelease(pSyncNode);
19,484✔
188
    sInfo("vgId:%d, no need Reconfig, current index:%" PRId64 ", new index:%" PRId64, pSyncNode->vgId,
19,484✔
189
          pSyncNode->raftCfg.lastConfigIndex, pNewCfg->lastIndex);
190
    return 0;
19,484✔
191
  }
192

193
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
350,298✔
194
    syncNodeRelease(pSyncNode);
×
195
    code = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
×
196
    sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
×
197
    TAOS_RETURN(code);
×
198
  }
199

200
  TAOS_CHECK_RETURN(syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg));
350,298✔
201

202
  if (syncNodeDoConfigChange(pSyncNode, pNewCfg, pNewCfg->lastIndex) != 0) {
350,298✔
203
    code = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
×
204
    sError("vgId:%d, failed to reconfig since do change error", pSyncNode->vgId);
×
205
    TAOS_RETURN(code);
×
206
  }
207

208
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER || pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
350,298✔
209
    // TODO check return value
210
    TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
320,056✔
211

212
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
5,120,896✔
213
      TAOS_CHECK_RETURN(syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]));
4,800,840✔
214
    }
215

216
    TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
320,056✔
217
    // syncNodeReplicate(pSyncNode);
218
  }
219

220
  syncNodeRelease(pSyncNode);
350,298✔
221
  TAOS_RETURN(code);
350,298✔
222
}
223

224
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
369,346,354✔
225
  int32_t code = -1;
369,346,354✔
226
  if (!syncIsInit()) {
369,346,354✔
227
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
228
    if (terrno != 0) code = terrno;
×
229
    TAOS_RETURN(code);
×
230
  }
231

232
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
369,346,789✔
233
  if (pSyncNode == NULL) {
369,347,824✔
234
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
235
    if (terrno != 0) code = terrno;
×
236
    TAOS_RETURN(code);
×
237
  }
238

239
  switch (pMsg->msgType) {
369,347,824✔
240
    case TDMT_SYNC_HEARTBEAT:
22,670,218✔
241
      code = syncNodeOnHeartbeat(pSyncNode, pMsg);
22,670,218✔
242
      break;
22,670,218✔
243
    case TDMT_SYNC_HEARTBEAT_REPLY:
22,513,096✔
244
      code = syncNodeOnHeartbeatReply(pSyncNode, pMsg);
22,513,096✔
245
      break;
22,513,096✔
246
    case TDMT_SYNC_TIMEOUT:
29,148,575✔
247
      code = syncNodeOnTimeout(pSyncNode, pMsg);
29,148,575✔
248
      break;
29,133,661✔
249
    case TDMT_SYNC_TIMEOUT_ELECTION:
498,699✔
250
      code = syncNodeOnTimeout(pSyncNode, pMsg);
498,699✔
251
      break;
498,699✔
252
    case TDMT_SYNC_CLIENT_REQUEST:
74,084,615✔
253
      code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
74,084,615✔
254
      break;
74,084,351✔
255
    case TDMT_SYNC_REQUEST_VOTE:
859,330✔
256
      code = syncNodeOnRequestVote(pSyncNode, pMsg);
859,330✔
257
      break;
859,330✔
258
    case TDMT_SYNC_REQUEST_VOTE_REPLY:
843,363✔
259
      code = syncNodeOnRequestVoteReply(pSyncNode, pMsg);
843,363✔
260
      break;
843,363✔
261
    case TDMT_SYNC_APPEND_ENTRIES:
97,701,687✔
262
      code = syncNodeOnAppendEntries(pSyncNode, pMsg);
97,701,687✔
263
      break;
97,701,687✔
264
    case TDMT_SYNC_APPEND_ENTRIES_REPLY:
97,593,265✔
265
      code = syncNodeOnAppendEntriesReply(pSyncNode, pMsg);
97,593,265✔
266
      break;
97,593,265✔
267
    case TDMT_SYNC_SNAPSHOT_SEND:
419,147✔
268
      code = syncNodeOnSnapshot(pSyncNode, pMsg);
419,147✔
269
      break;
419,147✔
270
    case TDMT_SYNC_SNAPSHOT_RSP:
419,458✔
271
      code = syncNodeOnSnapshotRsp(pSyncNode, pMsg);
419,458✔
272
      break;
419,458✔
273
    case TDMT_SYNC_LOCAL_CMD:
22,589,925✔
274
      code = syncNodeOnLocalCmd(pSyncNode, pMsg);
22,589,925✔
275
      break;
22,589,925✔
276
    case TDMT_SYNC_FORCE_FOLLOWER:
4,101✔
277
      code = syncForceBecomeFollower(pSyncNode, pMsg);
4,101✔
278
      break;
4,101✔
279
    case TDMT_SYNC_SET_ASSIGNED_LEADER:
956✔
280
      code = syncBecomeAssignedLeader(pSyncNode, pMsg);
956✔
281
      break;
16✔
282
    default:
×
283
      code = TSDB_CODE_MSG_NOT_PROCESSED;
×
284
  }
285

286
  syncNodeRelease(pSyncNode);
369,330,317✔
287
  if (code != 0) {
369,326,572✔
288
    sDebug("vgId:%d, failed to process sync msg:%p type:%s since %s", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
6,871✔
289
           tstrerror(code));
290
  }
291
  TAOS_RETURN(code);
369,326,572✔
292
}
293

294
int32_t syncLeaderTransfer(int64_t rid) {
4,222,532✔
295
  int32_t    code = 0;
4,222,532✔
296
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
4,222,532✔
297
  if (pSyncNode == NULL) {
4,222,532✔
298
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
299
    if (terrno != 0) code = terrno;
×
300
    TAOS_RETURN(code);
×
301
  }
302

303
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
4,222,532✔
304
  syncNodeRelease(pSyncNode);
4,222,532✔
305
  return ret;
4,222,532✔
306
}
307

308
int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
4,101✔
309
  SRaftId id = {0};
4,101✔
310
  syncNodeBecomeFollower(ths, id, "force election");
4,101✔
311

312
  SRpcMsg rsp = {
8,202✔
313
      .code = 0,
314
      .pCont = pRpcMsg->info.rsp,
4,101✔
315
      .contLen = pRpcMsg->info.rspLen,
4,101✔
316
      .info = pRpcMsg->info,
317
  };
318
  tmsgSendRsp(&rsp);
4,101✔
319

320
  return 0;
4,101✔
321
}
322

323
int32_t syncBecomeAssignedLeader(SSyncNode* ths, SRpcMsg* pRpcMsg) {
16✔
324
  int32_t code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
16✔
325
  void*   pHead = NULL;
16✔
326
  int32_t contLen = 0;
16✔
327

328
  SVArbSetAssignedLeaderReq req = {0};
16✔
329
  if (tDeserializeSVArbSetAssignedLeaderReq((char*)pRpcMsg->pCont + sizeof(SMsgHead), pRpcMsg->contLen, &req) != 0) {
16✔
330
    sError("vgId:%d, failed to deserialize SVArbSetAssignedLeaderReq", ths->vgId);
×
331
    code = TSDB_CODE_INVALID_MSG;
×
332
    goto _OVER;
×
333
  }
334

335
  if (ths->arbTerm > req.arbTerm) {
16✔
336
    sInfo("vgId:%d, skip to set assigned leader, msg with lower term, local:%" PRId64 "msg:%" PRId64, ths->vgId,
×
337
          ths->arbTerm, req.arbTerm);
338
    goto _OVER;
×
339
  }
340

341
  ths->arbTerm = TMAX(req.arbTerm, ths->arbTerm);
16✔
342

343
  if (!req.force) {
16✔
344
    if (strncmp(req.memberToken, ths->arbToken, TSDB_ARB_TOKEN_SIZE) != 0) {
16✔
345
      sInfo("vgId:%d, skip to set assigned leader, token mismatch, local:%s, msg:%s", ths->vgId, ths->arbToken,
×
346
            req.memberToken);
347
      goto _OVER;
×
348
    }
349
  }
350

351
  if (ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
16✔
352
    code = TSDB_CODE_SUCCESS;
16✔
353
    raftStoreNextTerm(ths);
16✔
354
    if (terrno != TSDB_CODE_SUCCESS) {
16✔
355
      code = terrno;
×
356
      sError("vgId:%d, failed to set next term since:%s", ths->vgId, tstrerror(code));
×
357
      goto _OVER;
×
358
    }
359
    syncNodeBecomeAssignedLeader(ths);
16✔
360

361
    if ((code = syncNodeAppendNoop(ths)) < 0) {
16✔
362
      sError("vgId:%d, assigned leader failed to append noop entry since %s", ths->vgId, tstrerror(code));
×
363
    }
364
  }
365

366
  SVArbSetAssignedLeaderRsp rsp = {0};
16✔
367
  rsp.arbToken = req.arbToken;
16✔
368
  rsp.memberToken = req.memberToken;
16✔
369
  rsp.vgId = ths->vgId;
16✔
370

371
  contLen = tSerializeSVArbSetAssignedLeaderRsp(NULL, 0, &rsp);
16✔
372
  if (contLen <= 0) {
16✔
373
    code = TSDB_CODE_OUT_OF_MEMORY;
×
374
    sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId);
×
375
    goto _OVER;
×
376
  }
377
  pHead = rpcMallocCont(contLen);
16✔
378
  if (!pHead) {
16✔
379
    code = terrno;
×
380
    sError("vgId:%d, failed to malloc memory for SVArbSetAssignedLeaderRsp", ths->vgId);
×
381
    goto _OVER;
×
382
  }
383
  if (tSerializeSVArbSetAssignedLeaderRsp(pHead, contLen, &rsp) <= 0) {
16✔
384
    code = TSDB_CODE_OUT_OF_MEMORY;
×
385
    sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId);
×
386
    rpcFreeCont(pHead);
×
387
    goto _OVER;
×
388
  }
389

390
  code = TSDB_CODE_SUCCESS;
16✔
391

392
_OVER:;
16✔
393
  SRpcMsg rspMsg = {
16✔
394
      .code = code,
395
      .pCont = pHead,
396
      .contLen = contLen,
397
      .info = pRpcMsg->info,
398
  };
399

400
  tmsgSendRsp(&rspMsg);
16✔
401

402
  tFreeSVArbSetAssignedLeaderReq(&req);
16✔
403
  TAOS_RETURN(code);
16✔
404
}
405

406
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
×
407
  int32_t    code = 0;
×
408
  SSyncNode* pNode = syncNodeAcquire(rid);
×
409
  if (pNode == NULL) {
×
410
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
411
    if (terrno != 0) code = terrno;
×
412
    TAOS_RETURN(code);
×
413
  }
414

415
  SRpcMsg rpcMsg = {0, .info.notFreeAhandle = 1};
×
416
  int32_t ret = syncRespMgrGetAndDel(pNode->pSyncRespMgr, seq, &rpcMsg.info);
×
417
  rpcMsg.code = TSDB_CODE_SYN_TIMEOUT;
×
418

419
  syncNodeRelease(pNode);
×
420
  if (ret == 1) {
×
421
    sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle);
×
422
    code = rpcSendResponse(&rpcMsg);
×
423
    return code;
×
424
  } else {
425
    sError("no message handle to send timeout response, seq:%" PRId64, seq);
×
426
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
427
  }
428
}
429

430
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
24,177,701✔
431
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;
24,177,701✔
432

433
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
72,610,438✔
434
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
48,432,737✔
435
    if (minMatchIndex == SYNC_INDEX_INVALID) {
48,432,737✔
436
      minMatchIndex = matchIndex;
25,827,175✔
437
    } else if (matchIndex > 0 && matchIndex < minMatchIndex) {
22,605,562✔
438
      minMatchIndex = matchIndex;
298,500✔
439
    }
440
  }
441
  return minMatchIndex;
24,177,701✔
442
}
443

444
static SyncIndex syncLogRetentionIndex(SSyncNode* pSyncNode, int64_t bytes) {
679,202✔
445
  return pSyncNode->pLogStore->syncLogIndexRetention(pSyncNode->pLogStore, bytes);
679,202✔
446
}
447

448
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
5,675,008✔
449
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
5,675,008✔
450
  int32_t    code = 0;
5,675,008✔
451
  if (pSyncNode == NULL) {
5,675,008✔
452
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
2,338✔
453
    if (terrno != 0) code = terrno;
2,338✔
454
    sError("sync begin snapshot error");
2,338✔
455
    TAOS_RETURN(code);
2,338✔
456
  }
457

458
  SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
5,672,670✔
459
  SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
5,672,670✔
460
  bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);
5,672,670✔
461

462
  if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
5,672,150✔
463
    sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
27,769✔
464
    syncNodeRelease(pSyncNode);
27,769✔
465
    return 0;
27,769✔
466
  }
467

468
  int64_t logRetention = 0;
5,644,381✔
469

470
  if (syncNodeIsMnode(pSyncNode)) {
5,644,381✔
471
    // mnode
472
    logRetention = tsMndLogRetention;
772,968✔
473
  } else {
474
    // vnode
475
    if (pSyncNode->replicaNum > 1) {
4,871,933✔
476
      logRetention = SYNC_VNODE_LOG_RETENTION;
598,977✔
477
    }
478
  }
479

480
  if (pSyncNode->totalReplicaNum > 1) {
5,644,901✔
481
    if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER &&
679,609✔
482
        pSyncNode->state != TAOS_SYNC_STATE_LEARNER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
47,964✔
483
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate or unknown state, do not delete wal",
407✔
484
              lastApplyIndex);
485
      syncNodeRelease(pSyncNode);
407✔
486
      return 0;
407✔
487
    }
488
    SyncIndex retentionIndex =
679,202✔
489
        TMAX(pSyncNode->minMatchIndex, syncLogRetentionIndex(pSyncNode, SYNC_WAL_LOG_RETENTION_SIZE));
679,202✔
490
    logRetention += TMAX(0, lastApplyIndex - retentionIndex);
679,202✔
491
  }
492

493
_DEL_WAL:
4,965,292✔
494

495
  do {
496
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
5,644,494✔
497
    SyncIndex          snapshotVer = walGetSnapshotVer(pData->pWal);
5,644,494✔
498
    SyncIndex          walCommitVer = walGetCommittedVer(pData->pWal);
5,644,494✔
499
    SyncIndex          wallastVer = walGetLastVer(pData->pWal);
5,644,494✔
500
    if (lastApplyIndex <= walCommitVer) {
5,644,452✔
501
      SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);
5,644,452✔
502

503
      if (snapshottingIndex == SYNC_INDEX_INVALID) {
5,643,932✔
504
        atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
5,643,932✔
505
        pSyncNode->snapshottingTime = taosGetTimestampMs();
5,644,494✔
506

507
        code = walBeginSnapshot(pData->pWal, lastApplyIndex, logRetention);
5,644,494✔
508
        if (code == 0) {
5,644,494✔
509
          sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
5,643,785✔
510
                  pSyncNode->snapshottingIndex, lastApplyIndex);
511
        } else {
512
          sNError(pSyncNode, "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64,
709✔
513
                  terrstr(), pSyncNode->snapshottingIndex, lastApplyIndex);
514
          atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
709✔
515
        }
516

517
      } else {
518
        sNTrace(pSyncNode, "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64,
×
519
                snapshottingIndex, lastApplyIndex);
520
      }
521
    }
522
  } while (0);
523

524
  syncNodeRelease(pSyncNode);
5,644,494✔
525
  TAOS_RETURN(code);
5,644,494✔
526
}
527

528
int32_t syncEndSnapshot(int64_t rid, bool forceTrim) {
5,671,961✔
529
  int32_t    code = 0;
5,671,961✔
530
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
5,671,961✔
531
  if (pSyncNode == NULL) {
5,671,754✔
532
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
533
    if (terrno != 0) code = terrno;
×
534
    sError("sync end snapshot error");
×
535
    TAOS_RETURN(code);
×
536
  }
537

538
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
5,671,754✔
539
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
5,643,603✔
540
    code = walEndSnapshot(pData->pWal, forceTrim);
5,643,785✔
541
    if (code != 0) {
5,643,785✔
542
      sNError(pSyncNode, "wal snapshot end error since:%s", tstrerror(code));
×
543
      syncNodeRelease(pSyncNode);
×
544
      TAOS_RETURN(code);
×
545
    } else {
546
      sNTrace(pSyncNode, "wal snapshot end, index:%" PRId64, atomic_load_64(&pSyncNode->snapshottingIndex));
5,643,785✔
547
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
5,643,785✔
548
    }
549
  }
550

551
  syncNodeRelease(pSyncNode);
5,672,143✔
552
  TAOS_RETURN(code);
5,671,961✔
553
}
554

555
bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
936,859,765✔
556
  if (pSyncNode == NULL) {
936,859,765✔
557
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
558
    sError("sync ready for read error");
×
559
    return false;
×
560
  }
561

562
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
936,859,765✔
563
    terrno = TSDB_CODE_SYN_NOT_LEADER;
30,123,867✔
564
    return false;
30,123,867✔
565
  }
566

567
  if (!pSyncNode->restoreFinish) {
906,759,934✔
568
    terrno = TSDB_CODE_SYN_RESTORING;
198,167✔
569
    return false;
189,644✔
570
  }
571

572
  return true;
906,575,598✔
573
}
574

575
bool syncIsReadyForRead(int64_t rid) {
809,579,216✔
576
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
809,579,216✔
577
  if (pSyncNode == NULL) {
809,677,855✔
578
    sError("sync ready for read error");
×
579
    return false;
×
580
  }
581

582
  bool ready = syncNodeIsReadyForRead(pSyncNode);
809,677,855✔
583

584
  syncNodeRelease(pSyncNode);
809,632,166✔
585
  return ready;
809,656,255✔
586
}
587

588
#ifdef BUILD_NO_CALL
589
bool syncSnapshotSending(int64_t rid) {
590
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
591
  if (pSyncNode == NULL) {
592
    return false;
593
  }
594

595
  bool b = syncNodeSnapshotSending(pSyncNode);
596
  syncNodeRelease(pSyncNode);
597
  return b;
598
}
599

600
bool syncSnapshotRecving(int64_t rid) {
601
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
602
  if (pSyncNode == NULL) {
603
    return false;
604
  }
605

606
  bool b = syncNodeSnapshotRecving(pSyncNode);
607
  syncNodeRelease(pSyncNode);
608
  return b;
609
}
610
#endif
611

612
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
4,222,532✔
613
  if (pSyncNode->peersNum == 0) {
4,222,532✔
614
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
2,855,370✔
615
    return 0;
2,855,370✔
616
  }
617

618
  int32_t ret = 0;
1,367,162✔
619
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->replicaNum > 1) {
1,367,162✔
620
    SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
421,918✔
621
    if (pSyncNode->peersNum == 2) {
421,918✔
622
      SyncIndex matchIndex0 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
254,909✔
623
      SyncIndex matchIndex1 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[1]));
254,909✔
624
      if (matchIndex1 > matchIndex0) {
254,909✔
625
        newLeader = (pSyncNode->peersNodeInfo)[1];
7,795✔
626
      }
627
    }
628
    ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
421,918✔
629
  }
630

631
  return ret;
1,367,162✔
632
}
633

634
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
421,918✔
635
  if (pSyncNode->replicaNum == 1) {
421,918✔
636
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
×
637
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
638
  }
639

640
  sNTrace(pSyncNode, "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
421,918✔
641

642
  SRpcMsg rpcMsg = {0};
421,918✔
643
  TAOS_CHECK_RETURN(syncBuildLeaderTransfer(&rpcMsg, pSyncNode->vgId));
421,918✔
644

645
  SyncLeaderTransfer* pMsg = rpcMsg.pCont;
421,918✔
646
  pMsg->newLeaderId.addr = SYNC_ADDR(&newLeader);
421,918✔
647
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
421,918✔
648
  pMsg->newNodeInfo = newLeader;
421,918✔
649

650
  int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false, NULL);
421,918✔
651
  rpcFreeCont(rpcMsg.pCont);
421,918✔
652
  return ret;
421,918✔
653
}
654

655
int32_t syncResetTimer(int64_t rid, int32_t electInterval, int32_t heartbeatInterval) {
×
656
  int32_t code = 0;
×
657
  sInfo("sync Reset Timer, rid:%" PRId64, rid);
×
658
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
659
  if (pSyncNode == NULL) {
×
660
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
661
    if (terrno != 0) code = terrno;
×
662
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
663
    TAOS_RETURN(code);
×
664
  }
665
  pSyncNode->electBaseLine = electInterval;
×
666
  syncNodeResetElectTimer(pSyncNode);
×
667

668
  sInfo("vgId:%d, sync Reset Timer, rid:%" PRId64, pSyncNode->vgId, rid);
×
669
  code = syncNodeRestartHeartbeatTimer(pSyncNode, heartbeatInterval);
×
670

671
  syncNodeRelease(pSyncNode);
×
672
  return code;
×
673
}
674

675
SSyncState syncGetState(int64_t rid) {
786,225,351✔
676
  SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
786,225,351✔
677

678
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
786,225,351✔
679
  if (pSyncNode != NULL) {
786,240,480✔
680
    state.state = pSyncNode->state;
786,240,449✔
681
    state.roleTimeMs = pSyncNode->roleTimeMs;
786,239,204✔
682
    state.startTimeMs = pSyncNode->startTime;
786,237,200✔
683
    state.restored = pSyncNode->restoreFinish;
786,237,487✔
684
    if (pSyncNode->vgId != 1) {
786,238,783✔
685
      state.canRead = syncNodeIsReadyForRead(pSyncNode);
127,265,958✔
686
    } else {
687
      state.canRead = state.restored;
658,966,680✔
688
    }
689
    state.totalIndex = pSyncNode->pLogBuf->totalIndex;
786,232,283✔
690

691
    double progress = 0;
786,231,079✔
692
    if(pSyncNode->pLogBuf->totalIndex > 0 && pSyncNode->pLogBuf->commitIndex > 0){
786,231,079✔
693
      progress = (double)pSyncNode->pLogBuf->commitIndex/(double)pSyncNode->pLogBuf->totalIndex;
688,403✔
694
      state.progress = (int32_t)(progress * 100);
688,403✔
695
    }
696
    else{
697
      state.progress = -1;
785,539,617✔
698
    }
699
    if (pSyncNode->state == TAOS_SYNC_STATE_LEARNER) {
786,228,020✔
700
      sInfo("vgId:%d, learner progress state, commitIndex:%" PRId64 " totalIndex:%" PRId64
1,823,222✔
701
            ", "
702
            "progress:%lf, progress:%d",
703
            pSyncNode->vgId, pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->totalIndex, progress, state.progress);
704
    }
705

706
    state.term = raftStoreGetTerm(pSyncNode);
786,225,910✔
707
    state.snapSeq = pSyncNode->snapSeq;
786,239,730✔
708
    syncNodeRelease(pSyncNode);
786,240,489✔
709
  }
710

711
  return state;
786,237,169✔
712
}
713

714
SSyncMetrics syncGetMetrics(int64_t rid) {
×
715
  SSyncMetrics metrics = {0};
×
716

717
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
718
  if (pSyncNode != NULL) {
×
719
    sDebug("vgId:%d, sync get metrics, wal_write_bytes:%" PRId64 ", wal_write_time:%" PRId64, pSyncNode->vgId,
×
720
           pSyncNode->wal_write_bytes, pSyncNode->wal_write_time);
721
    metrics.wal_write_bytes = atomic_load_64(&pSyncNode->wal_write_bytes);
×
722
    metrics.wal_write_time = atomic_load_64(&pSyncNode->wal_write_time);
×
723
    syncNodeRelease(pSyncNode);
×
724
  }
725
  return metrics;
×
726
}
727

728
void syncResetMetrics(int64_t rid, const SSyncMetrics* pOldMetrics) {
×
729
  if (pOldMetrics == NULL) return;
×
730

731
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
732
  if (pSyncNode != NULL) {
×
733
    // Atomically subtract the old metrics values from current metrics
734
    (void)atomic_sub_fetch_64(&pSyncNode->wal_write_bytes, pOldMetrics->wal_write_bytes);
×
735
    (void)atomic_sub_fetch_64(&pSyncNode->wal_write_time, pOldMetrics->wal_write_time);
×
736
    syncNodeRelease(pSyncNode);
×
737
  }
738
}
739

740
void syncGetCommitIndex(int64_t rid, int64_t* syncCommitIndex) {
123,379,641✔
741
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
123,379,641✔
742
  if (pSyncNode != NULL) {
123,379,641✔
743
    *syncCommitIndex = pSyncNode->commitIndex;
123,379,641✔
744
    syncNodeRelease(pSyncNode);
123,379,641✔
745
  }
746
}
123,379,641✔
747

748
int32_t syncGetArbToken(int64_t rid, char* outToken) {
21,656,204✔
749
  int32_t    code = 0;
21,656,204✔
750
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
21,656,204✔
751
  if (pSyncNode == NULL) {
21,656,204✔
752
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
753
    if (terrno != 0) code = terrno;
×
754
    TAOS_RETURN(code);
×
755
  }
756

757
  memset(outToken, 0, TSDB_ARB_TOKEN_SIZE);
21,656,204✔
758
  (void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
21,656,204✔
759
  tstrncpy(outToken, pSyncNode->arbToken, TSDB_ARB_TOKEN_SIZE);
21,656,204✔
760
  (void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
21,656,204✔
761

762
  syncNodeRelease(pSyncNode);
21,656,204✔
763
  TAOS_RETURN(code);
21,656,204✔
764
}
765

766
int32_t syncCheckSynced(int64_t rid) {
1,276✔
767
  int32_t    code = 0;
1,276✔
768
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
1,276✔
769
  if (pSyncNode == NULL) {
1,276✔
770
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
771
    if (terrno != 0) code = terrno;
×
772
    TAOS_RETURN(code);
×
773
  }
774

775
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
1,276✔
776
    code = TSDB_CODE_SYN_NOT_LEADER;
×
777
    syncNodeRelease(pSyncNode);
×
778
    TAOS_RETURN(code);
×
779
  }
780

781
  bool isSync = pSyncNode->commitIndex >= pSyncNode->assignedCommitIndex;
1,276✔
782
  code = (isSync ? TSDB_CODE_SUCCESS : TSDB_CODE_VND_ARB_NOT_SYNCED);
1,276✔
783
  if (!isSync) {
1,276✔
784
    sInfo("vgId:%d, not synced, assignedCommitIndex:%" PRId64 ", commitIndex:%" PRId64, pSyncNode->vgId,
×
785
          pSyncNode->assignedCommitIndex, pSyncNode->commitIndex);
786
  }
787

788
  syncNodeRelease(pSyncNode);
1,276✔
789
  TAOS_RETURN(code);
1,276✔
790
}
791

792
int32_t syncUpdateArbTerm(int64_t rid, SyncTerm arbTerm) {
98,159✔
793
  int32_t    code = 0;
98,159✔
794
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
98,159✔
795
  if (pSyncNode == NULL) {
98,159✔
796
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
797
    if (terrno != 0) code = terrno;
×
798
    TAOS_RETURN(code);
×
799
  }
800

801
  pSyncNode->arbTerm = TMAX(arbTerm, pSyncNode->arbTerm);
98,159✔
802
  syncNodeRelease(pSyncNode);
98,159✔
803
  TAOS_RETURN(code);
98,159✔
804
}
805

806
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
219,586,697✔
807
  if (pSyncNode->raftCfg.configIndexCount < 1) {
219,586,697✔
808
    sError("vgId:%d, failed get snapshot config index, configIndexCount:%d", pSyncNode->vgId,
×
809
           pSyncNode->raftCfg.configIndexCount);
810
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
811
    return -2;
×
812
  }
813
  SyncIndex lastIndex = (pSyncNode->raftCfg.configIndexArr)[0];
219,587,494✔
814

815
  for (int32_t i = 0; i < pSyncNode->raftCfg.configIndexCount; ++i) {
446,000,439✔
816
    if ((pSyncNode->raftCfg.configIndexArr)[i] > lastIndex &&
226,412,945✔
817
        (pSyncNode->raftCfg.configIndexArr)[i] <= snapshotLastApplyIndex) {
6,825,175✔
818
      lastIndex = (pSyncNode->raftCfg.configIndexArr)[i];
6,809,443✔
819
    }
820
  }
821
  sTrace("vgId:%d, index:%" PRId64 ", get last snapshot config index:%" PRId64, pSyncNode->vgId, snapshotLastApplyIndex,
219,587,770✔
822
         lastIndex);
823

824
  return lastIndex;
219,587,770✔
825
}
826

827
static SRaftId syncGetRaftIdByEp(SSyncNode* pSyncNode, const SEp* pEp) {
73,397,835✔
828
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
121,352,317✔
829
    if (strcmp(pEp->fqdn, (pSyncNode->peersNodeInfo)[i].nodeFqdn) == 0 &&
78,288,762✔
830
        pEp->port == (pSyncNode->peersNodeInfo)[i].nodePort) {
78,289,384✔
831
      return pSyncNode->peersId[i];
30,335,006✔
832
    }
833
  }
834
  return EMPTY_RAFT_ID;
43,058,985✔
835
}
836

837
static void epsetToString(const SEpSet* pEpSet, char* buffer, size_t bufferSize) {
43,076,966✔
838
  if (pEpSet == NULL || buffer == NULL) {
43,076,966✔
839
    snprintf(buffer, bufferSize, "EpSet is NULL");
×
840
    return;
×
841
  }
842

843
  size_t offset = 0;
43,077,537✔
844
  offset += snprintf(buffer + offset, bufferSize - offset, "EpSet: [");
43,077,537✔
845

846
  for (int i = 0; i < pEpSet->numOfEps; ++i) {
116,475,529✔
847
    if (offset >= bufferSize) break;
73,399,907✔
848
    offset += snprintf(buffer + offset, bufferSize - offset, "%s:%d%s", pEpSet->eps[i].fqdn, pEpSet->eps[i].port,
73,398,041✔
849
                       (i + 1 < pEpSet->numOfEps) ? ", " : "");
73,399,907✔
850
  }
851

852
  if (offset < bufferSize) {
43,081,503✔
853
    snprintf(buffer + offset, bufferSize - offset, "]");
43,080,693✔
854
  }
855
}
856

857
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
43,078,197✔
858
  pEpSet->numOfEps = 0;
43,078,197✔
859

860
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
43,079,721✔
861
  if (pSyncNode == NULL) return;
43,082,361✔
862

863
  int index = -1;
43,082,361✔
864

865
  sDebug("vgId:%d, sync get retry epset, leaderCache:%" PRIx64 ", leaderCacheEp.fqdn:%s, leaderCacheEp.port:%d",
43,082,361✔
866
         pSyncNode->vgId, pSyncNode->leaderCache.addr, pSyncNode->leaderCacheEp.fqdn, pSyncNode->leaderCacheEp.port);
867
  int j = 0;
43,079,322✔
868
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
117,177,943✔
869
    if (pSyncNode->raftCfg.cfg.nodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) continue;
74,102,553✔
870
    SEp* pEp = &pEpSet->eps[j];
73,397,847✔
871
    tstrncpy(pEp->fqdn, pSyncNode->raftCfg.cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
73,399,839✔
872
    pEp->port = (pSyncNode->raftCfg.cfg.nodeInfo)[i].nodePort;
73,399,034✔
873
    pEpSet->numOfEps++;
73,397,283✔
874
    SRaftId id = syncGetRaftIdByEp(pSyncNode, pEp);
73,391,963✔
875
    sDebug("vgId:%d, sync get retry epset, index:%d id:%" PRIx64 " %s:%d", pSyncNode->vgId, i, id.addr, pEp->fqdn,
73,392,705✔
876
           pEp->port);
877
    if (pEp->port == pSyncNode->leaderCacheEp.port &&
73,392,705✔
878
        strncmp(pEp->fqdn, pSyncNode->leaderCacheEp.fqdn, TSDB_FQDN_LEN) == 0 /*&&
32,325,978✔
879
        id.vgId == pSyncNode->leaderCache.vgId && id.addr != 0 && id.vgId != 0*/) {
880
      index = j;
32,322,840✔
881
    }
882
    j++;
73,394,027✔
883
  }
884
  if (pEpSet->numOfEps > 0) {
43,075,021✔
885
    if (index != -1) {
43,076,094✔
886
      pEpSet->inUse = index;
32,328,304✔
887
    } else {
888
      if (pSyncNode->myRaftId.addr == pSyncNode->leaderCache.addr &&
10,747,790✔
UNCOV
889
          pSyncNode->myRaftId.vgId == pSyncNode->leaderCache.vgId) {
×
UNCOV
890
        pEpSet->inUse = pSyncNode->raftCfg.cfg.myIndex;
×
891
      } else {
892
        pEpSet->inUse = (pSyncNode->raftCfg.cfg.myIndex + 1) % pEpSet->numOfEps;
10,747,846✔
893
      }
894
    }
895
    // pEpSet->inUse = 0;
896
  }
897
  epsetSort(pEpSet);
43,075,726✔
898

899
  char buffer[1024];
43,072,397✔
900
  epsetToString(pEpSet, buffer, sizeof(buffer));
43,078,782✔
901
  sDebug("vgId:%d, sync get retry epset numOfEps:%d %s inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, buffer,
43,078,785✔
902
         pEpSet->inUse);
903
  syncNodeRelease(pSyncNode);
43,078,785✔
904
}
905

906
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
656,212,857✔
907
  int32_t    code = 0;
656,212,857✔
908
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
656,212,857✔
909
  if (pSyncNode == NULL) {
656,217,241✔
910
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
911
    if (terrno != 0) code = terrno;
×
912
    sError("sync propose error");
×
913
    TAOS_RETURN(code);
×
914
  }
915

916
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak, seq);
656,217,241✔
917
  syncNodeRelease(pSyncNode);
656,179,717✔
918
  return ret;
656,197,533✔
919
}
920

921
int32_t syncCheckMember(int64_t rid) {
×
922
  int32_t    code = 0;
×
923
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
924
  if (pSyncNode == NULL) {
×
925
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
926
    if (terrno != 0) code = terrno;
×
927
    sError("sync propose error");
×
928
    TAOS_RETURN(code);
×
929
  }
930

931
  if (pSyncNode->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) {
×
932
    syncNodeRelease(pSyncNode);
×
933
    return TSDB_CODE_SYN_WRONG_ROLE;
×
934
  }
935

936
  syncNodeRelease(pSyncNode);
×
937
  return 0;
×
938
}
939

940
int32_t syncIsCatchUp(int64_t rid) {
2,134,329✔
941
  int32_t    code = 0;
2,134,329✔
942
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
2,134,329✔
943
  if (pSyncNode == NULL) {
2,134,329✔
944
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
945
    if (terrno != 0) code = terrno;
×
946
    sError("sync Node Acquire error since %d", ERRNO);
×
947
    TAOS_RETURN(code);
×
948
  }
949

950
  int32_t isCatchUp = 0;
2,134,329✔
951
  if (pSyncNode->pLogBuf->totalIndex < 0 || pSyncNode->pLogBuf->commitIndex < 0 ||
2,134,329✔
952
      pSyncNode->pLogBuf->totalIndex < pSyncNode->pLogBuf->commitIndex ||
378,552✔
953
      pSyncNode->pLogBuf->totalIndex - pSyncNode->pLogBuf->commitIndex > SYNC_LEARNER_CATCHUP) {
378,552✔
954
    sInfo("vgId:%d, Not catch up, wait one second, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64,
2,026,487✔
955
          pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex,
956
          pSyncNode->pLogBuf->matchIndex);
957
    isCatchUp = 0;
2,026,487✔
958
  } else {
959
    sInfo("vgId:%d, Catch up, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, pSyncNode->vgId,
107,842✔
960
          pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->matchIndex);
961
    isCatchUp = 1;
107,842✔
962
  }
963

964
  syncNodeRelease(pSyncNode);
2,134,329✔
965
  return isCatchUp;
2,134,329✔
966
}
967

968
ESyncRole syncGetRole(int64_t rid) {
2,134,329✔
969
  int32_t    code = 0;
2,134,329✔
970
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
2,134,329✔
971
  if (pSyncNode == NULL) {
2,134,329✔
972
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
973
    if (terrno != 0) code = terrno;
×
974
    sError("sync Node Acquire error since %d", ERRNO);
×
975
    TAOS_RETURN(code);
×
976
  }
977

978
  ESyncRole role = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole;
2,134,329✔
979

980
  syncNodeRelease(pSyncNode);
2,134,329✔
981
  return role;
2,134,329✔
982
}
983

984
int64_t syncGetTerm(int64_t rid) {
8,641,469✔
985
  int32_t    code = 0;
8,641,469✔
986
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
8,641,469✔
987
  if (pSyncNode == NULL) {
8,641,469✔
988
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
989
    if (terrno != 0) code = terrno;
×
990
    sError("sync Node Acquire error since %d", ERRNO);
×
991
    TAOS_RETURN(code);
×
992
  }
993

994
  int64_t term = raftStoreGetTerm(pSyncNode);
8,641,469✔
995

996
  syncNodeRelease(pSyncNode);
8,641,469✔
997
  return term;
8,641,469✔
998
}
999

1000
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
656,630,845✔
1001
  int32_t code = 0;
656,630,845✔
1002
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
656,630,845✔
1003
    code = TSDB_CODE_SYN_NOT_LEADER;
1,749,563✔
1004
    sNWarn(pSyncNode, "sync propose not leader, type:%s", TMSG_INFO(pMsg->msgType));
1,749,563✔
1005
    TAOS_RETURN(code);
1,749,563✔
1006
  }
1007

1008
  if (!pSyncNode->restoreFinish) {
654,895,438✔
1009
    code = TSDB_CODE_SYN_RESTORING;
16,777✔
1010
    sNWarn(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
16,777✔
1011
           TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
1012
    TAOS_RETURN(code);
16,777✔
1013
  }
1014

1015
  // heartbeat timeout
1016
  if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER && syncNodeHeartbeatReplyTimeout(pSyncNode)) {
654,882,841✔
1017
    code = TSDB_CODE_SYN_PROPOSE_NOT_READY;
315✔
1018
    sNError(pSyncNode, "failed to sync propose since heartbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
315✔
1019
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
1020
    TAOS_RETURN(code);
315✔
1021
  }
1022

1023
  // optimized one replica
1024
  if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
654,864,109✔
1025
    SyncIndex retIndex;
580,244,473✔
1026
    int32_t   code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
580,249,341✔
1027
    if (code >= 0) {
580,201,834✔
1028
      pMsg->info.conn.applyIndex = retIndex;
580,201,834✔
1029
      pMsg->info.conn.applyTerm = raftStoreGetTerm(pSyncNode);
580,238,042✔
1030

1031
      // after raft member change, need to handle 1->2 switching point
1032
      // at this point, need to switch entry handling thread
1033
      if (pSyncNode->replicaNum == 1) {
580,250,264✔
1034
        sGDebug(&pMsg->info.traceId, "vgId:%d, index:%" PRId64 ", propose optimized msg type:%s", pSyncNode->vgId,
580,211,787✔
1035
                retIndex, TMSG_INFO(pMsg->msgType));
1036
        return 1;
580,205,718✔
1037
      } else {
1038
        sGDebug(&pMsg->info.traceId,
×
1039
                "vgId:%d, index:%" PRId64 ", propose optimized msg, return to normal, type:%s, handle:%p",
1040
                pSyncNode->vgId, retIndex, TMSG_INFO(pMsg->msgType), pMsg->info.handle);
1041
        return 0;
×
1042
      }
1043
    } else {
1044
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1045
      sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
×
1046
             TMSG_INFO(pMsg->msgType));
1047
      TAOS_RETURN(code);
×
1048
    }
1049
  } else {
1050
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
74,630,500✔
1051
    uint64_t  seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
74,631,829✔
1052
    SRpcMsg   rpcMsg = {.info.traceId = pMsg->info.traceId};
74,631,565✔
1053
    int32_t   code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
74,631,301✔
1054
    if (code != 0) {
74,631,829✔
1055
      sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
×
1056
      code = syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
×
1057
      TAOS_RETURN(code);
×
1058
    }
1059

1060
    sGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, propose msg, type:%s", pSyncNode->vgId, pMsg,
74,631,829✔
1061
            TMSG_INFO(pMsg->msgType));
1062
    code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
74,631,829✔
1063
    if (code != 0) {
74,631,511✔
1064
      sWarn("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
547,214✔
1065
      TAOS_CHECK_RETURN(syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum));
547,214✔
1066
    }
1067

1068
    if (seq != NULL) *seq = seqNum;
74,623,341✔
1069
    TAOS_RETURN(code);
74,623,659✔
1070
  }
1071
}
1072

1073
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
68,139,735✔
1074
  pSyncTimer->pTimer = NULL;
68,139,735✔
1075
  pSyncTimer->counter = 0;
68,139,735✔
1076
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
68,139,608✔
1077
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
68,139,608✔
1078
  pSyncTimer->destId = destId;
68,139,608✔
1079
  pSyncTimer->timeStamp = taosGetTimestampMs();
68,137,968✔
1080
  atomic_store_64(&pSyncTimer->logicClock, 0);
68,137,968✔
1081
  sInfo("vgId:%d, HbTimer init, timerMs:%d for addr:0x%" PRIx64, pSyncNode->vgId, pSyncTimer->timerMS, destId.addr);
68,139,691✔
1082
  return 0;
68,139,735✔
1083
}
1084

1085
static void syncHBSetTimerMS(SSyncTimer* pSyncTimer, int32_t ms) { pSyncTimer->timerMS = ms; }
×
1086

1087
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
965,604✔
1088
  int32_t code = 0;
965,604✔
1089
  int64_t tsNow = taosGetTimestampMs();
965,604✔
1090
  if (syncIsInit()) {
965,604✔
1091
    SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid);
965,604✔
1092
    if (pData == NULL) {
965,604✔
1093
      pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
965,588✔
1094
      pData->rid = syncHbTimerDataAdd(pData);
965,588✔
1095
    }
1096
    pSyncTimer->hbDataRid = pData->rid;
965,604✔
1097
    pSyncTimer->timeStamp = tsNow;
965,604✔
1098

1099
    pData->syncNodeRid = pSyncNode->rid;
965,604✔
1100
    pData->pTimer = pSyncTimer;
965,604✔
1101
    pData->destId = pSyncTimer->destId;
965,604✔
1102
    pData->logicClock = pSyncTimer->logicClock;
965,604✔
1103
    pData->execTime = tsNow + pSyncTimer->timerMS;
965,604✔
1104

1105
    sInfo("vgId:%d, start hb timer, rid:%" PRId64 " addr:0x%" PRIx64 " at %d", pSyncNode->vgId, pData->rid,
965,604✔
1106
          pData->destId.addr, pSyncTimer->timerMS);
1107

1108
    bool stopped = taosTmrResetPriority(pSyncTimer->timerCb, pSyncTimer->timerMS, (void*)(pData->rid),
965,604✔
1109
                                        syncEnv()->pTimerManager, &pSyncTimer->pTimer, 2);
965,604✔
1110
    if (stopped) {
965,604✔
1111
      sWarn("vgId:%d, reset hb timer stopped:%d", pSyncNode->vgId, stopped);
16✔
1112
    }
1113
  } else {
1114
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1115
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
×
1116
  }
1117
  return code;
965,604✔
1118
}
1119

1120
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
10,274,228✔
1121
  int32_t ret = 0;
10,274,228✔
1122
  (void)atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
10,274,228✔
1123
  bool stop = taosTmrStop(pSyncTimer->pTimer);
10,274,228✔
1124
  sDebug("vgId:%d, stop hb timer stop:%d", pSyncNode->vgId, stop);
10,274,228✔
1125
  pSyncTimer->pTimer = NULL;
10,274,228✔
1126
  syncHbTimerDataRemove(pSyncTimer->hbDataRid);
10,274,228✔
1127
  pSyncTimer->hbDataRid = -1;
10,274,228✔
1128
  return ret;
10,274,228✔
1129
}
1130

1131
int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
4,222,593✔
1132
  int32_t code = 0;
4,222,593✔
1133
  if (pNode->pLogStore == NULL) {
4,222,593✔
1134
    sError("vgId:%d, log store not created", pNode->vgId);
×
1135
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1136
  }
1137
  if (pNode->pFsm == NULL) {
4,222,593✔
1138
    sError("vgId:%d, pFsm not registered", pNode->vgId);
×
1139
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1140
  }
1141
  if (pNode->pFsm->FpGetSnapshotInfo == NULL) {
4,222,593✔
1142
    sError("vgId:%d, FpGetSnapshotInfo not registered", pNode->vgId);
×
1143
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1144
  }
1145
  SSnapshot snapshot = {0};
4,222,593✔
1146
  // TODO check return value
1147
  (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
4,222,593✔
1148

1149
  SyncIndex commitIndex = snapshot.lastApplyIndex;
4,222,593✔
1150
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
4,222,593✔
1151
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
4,222,593✔
1152
  if ((lastVer < commitIndex || firstVer > commitIndex + 1) || pNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
4,222,593✔
1153
    sInfo("vgId:%d, restore log store from snapshot, firstVer:%" PRId64 ", lastVer:%" PRId64 ", commitIndex:%" PRId64,
714✔
1154
          pNode->vgId, firstVer, lastVer, commitIndex);
1155
    if ((code = pNode->pLogStore->syncLogRestoreFromSnapshot(pNode->pLogStore, commitIndex)) != 0) {
714✔
1156
      sError("vgId:%d, failed to restore log store from snapshot since %s. lastVer:%" PRId64 ", snapshotVer:%" PRId64,
×
1157
             pNode->vgId, terrstr(), lastVer, commitIndex);
1158
      TAOS_RETURN(code);
×
1159
    }
1160
  }
1161
  TAOS_RETURN(code);
4,222,593✔
1162
}
1163

1164
// open/close --------------
1165
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion, int32_t electInterval, int32_t heartbeatInterval) {
4,221,952✔
1166
  int32_t    code = 0;
4,221,952✔
1167
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
4,221,952✔
1168
  if (pSyncNode == NULL) {
4,222,279✔
1169
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1170
    goto _error;
×
1171
  }
1172

1173
  if (!taosDirExist((char*)(pSyncInfo->path))) {
4,222,279✔
1174
    if (taosMkDir(pSyncInfo->path) != 0) {
3,050,471✔
1175
      terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1176
      sError("vgId:%d, failed to create dir:%s since %s", pSyncInfo->vgId, pSyncInfo->path, terrstr());
×
1177
      goto _error;
×
1178
    }
1179
  }
1180

1181
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
4,222,430✔
1182
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
4,222,593✔
1183
           TD_DIRSEP);
1184
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
4,222,430✔
1185

1186
  if (!taosCheckExistFile(pSyncNode->configPath)) {
4,222,397✔
1187
    // create a new raft config file
1188
    sInfo("vgId:%d, create a new raft config file", pSyncInfo->vgId);
3,050,471✔
1189
    pSyncNode->vgId = pSyncInfo->vgId;
3,050,830✔
1190
    pSyncNode->mountVgId = pSyncInfo->mountVgId;
3,050,471✔
1191
    pSyncNode->raftCfg.isStandBy = pSyncInfo->isStandBy;
3,050,471✔
1192
    pSyncNode->raftCfg.snapshotStrategy = pSyncInfo->snapshotStrategy;
3,050,471✔
1193
    pSyncNode->raftCfg.lastConfigIndex = pSyncInfo->syncCfg.lastIndex;
3,050,471✔
1194
    pSyncNode->raftCfg.batchSize = pSyncInfo->batchSize;
3,050,471✔
1195
    pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
3,050,471✔
1196
    pSyncNode->raftCfg.configIndexCount = 1;
3,050,471✔
1197
    pSyncNode->raftCfg.configIndexArr[0] = -1;
3,050,471✔
1198

1199
    if ((code = syncWriteCfgFile(pSyncNode, "new")) != 0) {
3,050,471✔
1200
      terrno = code;
×
1201
      sError("vgId:%d, failed to create sync cfg file", pSyncNode->vgId);
×
1202
      goto _error;
×
1203
    }
1204
  } else {
1205
    // update syncCfg by raft_config.json
1206
    if ((code = syncReadCfgFile(pSyncNode)) != 0) {
1,172,122✔
1207
      terrno = code;
×
1208
      sError("vgId:%d, failed to read sync cfg file", pSyncNode->vgId);
×
1209
      goto _error;
×
1210
    }
1211

1212
    if (vnodeVersion > pSyncNode->raftCfg.cfg.changeVersion) {
1,172,122✔
1213
      sInfo("vgId:%d, is going to judge update, in SyncInfo, totalReplicaNum:%d", pSyncNode->vgId,
865,996✔
1214
            pSyncInfo->syncCfg.totalReplicaNum);
1215
      if (pSyncInfo->syncCfg.totalReplicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) {
865,996✔
1216
        sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
779,696✔
1217
        pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
779,696✔
1218
        if ((code = syncWriteCfgFile(pSyncNode, "changed")) != 0) {
779,696✔
1219
          terrno = code;
×
1220
          sError("vgId:%d, failed to write sync cfg file", pSyncNode->vgId);
×
1221
          goto _error;
×
1222
        }
1223
      } else {
1224
        sInfo("vgId:%d, use sync config from sync cfg file", pSyncNode->vgId);
86,300✔
1225
        pSyncInfo->syncCfg = pSyncNode->raftCfg.cfg;
86,300✔
1226
      }
1227
    } else {
1228
      sInfo("vgId:%d, skip save sync cfg file since request ver:%d <= file ver:%d", pSyncNode->vgId, vnodeVersion,
305,855✔
1229
            pSyncInfo->syncCfg.changeVersion);
1230
    }
1231
  }
1232

1233
  // init by SSyncInfo
1234
  pSyncNode->vgId = pSyncInfo->vgId;
4,222,593✔
1235
  pSyncNode->mountVgId = pSyncInfo->mountVgId;
4,222,593✔
1236
  SSyncCfg* pCfg = &pSyncNode->raftCfg.cfg;
4,222,549✔
1237
  bool      updated = false;
4,222,281✔
1238
  sInfo("vgId:%d, start to open sync node, totalReplicaNum:%d replicaNum:%d selfIndex:%d", pSyncNode->vgId,
4,222,281✔
1239
        pCfg->totalReplicaNum, pCfg->replicaNum, pCfg->myIndex);
1240
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
11,309,216✔
1241
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
7,086,623✔
1242
    if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) {
7,086,623✔
1243
      updated = true;
×
1244
    }
1245
    sInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort,
7,086,623✔
1246
          pNode->nodeId, pNode->clusterId);
1247
  }
1248

1249
  if (vnodeVersion > pSyncInfo->syncCfg.changeVersion) {
4,222,593✔
1250
    if (updated) {
406,094✔
1251
      sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId);
×
1252
      if ((code = syncWriteCfgFile(pSyncNode, "updated")) != 0) {
×
1253
        terrno = code;
×
1254
        sError("vgId:%d, failed to write sync cfg file on dnode info updated", pSyncNode->vgId);
×
1255
        goto _error;
×
1256
      }
1257
    }
1258
  }
1259

1260
  pSyncNode->pWal = pSyncInfo->pWal;
4,222,593✔
1261
  pSyncNode->msgcb = pSyncInfo->msgcb;
4,222,593✔
1262
  pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
4,222,593✔
1263
  pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
4,222,593✔
1264
  pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
4,222,593✔
1265

1266
  // create raft log ring buffer
1267
  code = syncLogBufferCreate(&pSyncNode->pLogBuf);
4,222,593✔
1268
  if (pSyncNode->pLogBuf == NULL) {
4,222,593✔
1269
    sError("failed to init sync log buffer since %s. vgId:%d", tstrerror(code), pSyncNode->vgId);
×
1270
    goto _error;
×
1271
  }
1272

1273
  // init replicaNum, replicasId
1274
  pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
4,222,593✔
1275
  pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
4,222,593✔
1276
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
11,309,167✔
1277
    if (syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]) ==
7,086,623✔
1278
        false) {
1279
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
×
1280
      goto _error;
×
1281
    }
1282
  }
1283

1284
  // init internal
1285
  pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
4,222,593✔
1286
  pSyncNode->myRaftId = pSyncNode->replicasId[pSyncNode->raftCfg.cfg.myIndex];
4,222,593✔
1287

1288
  // init peersNum, peers, peersId
1289
  pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
4,222,593✔
1290
  int32_t j = 0;
4,222,593✔
1291
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
11,307,954✔
1292
    if (i != pSyncNode->raftCfg.cfg.myIndex) {
7,085,992✔
1293
      pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
2,864,045✔
1294
      pSyncNode->peersId[j] = pSyncNode->replicasId[i];
2,863,414✔
1295
      syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
2,864,045✔
1296
      j++;
2,863,414✔
1297
    }
1298
  }
1299

1300
  pSyncNode->arbTerm = -1;
4,222,593✔
1301
  (void)taosThreadMutexInit(&pSyncNode->arbTokenMutex, NULL);
4,222,593✔
1302
  syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncInfo->vgId, pSyncNode->arbToken);
4,222,593✔
1303
  sInfo("vgId:%d, generate arb token:%s", pSyncNode->vgId, pSyncNode->arbToken);
4,222,593✔
1304

1305
  // init raft algorithm
1306
  pSyncNode->pFsm = pSyncInfo->pFsm;
4,222,593✔
1307
  pSyncInfo->pFsm = NULL;
4,222,593✔
1308
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
4,222,593✔
1309
  pSyncNode->leaderCache = EMPTY_RAFT_ID;
4,222,593✔
1310
  pSyncNode->leaderCacheEp.port = 0;
4,222,593✔
1311
  pSyncNode->leaderCacheEp.fqdn[0] = '\0';
4,222,593✔
1312

1313
  // init life cycle outside
1314

1315
  // TLA+ Spec
1316
  // InitHistoryVars == /\ elections = {}
1317
  //                    /\ allLogs   = {}
1318
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
1319
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
1320
  //                   /\ state       = [i \in Server |-> Follower]
1321
  //                   /\ votedFor    = [i \in Server |-> Nil]
1322
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
1323
  //                      /\ votesGranted   = [i \in Server |-> {}]
1324
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
1325
  // \* leader does not send itself messages. It's still easier to include these
1326
  // \* in the functions.
1327
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
1328
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
1329
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
1330
  //                /\ commitIndex  = [i \in Server |-> 0]
1331
  // Init == /\ messages = [m \in {} |-> 0]
1332
  //         /\ InitHistoryVars
1333
  //         /\ InitServerVars
1334
  //         /\ InitCandidateVars
1335
  //         /\ InitLeaderVars
1336
  //         /\ InitLogVars
1337
  //
1338

1339
  // init TLA+ server vars
1340
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
4,222,593✔
1341
  pSyncNode->roleTimeMs = taosGetTimestampMs();
4,222,593✔
1342
  if ((code = raftStoreOpen(pSyncNode)) != 0) {
4,222,593✔
1343
    terrno = code;
×
1344
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
×
1345
    goto _error;
×
1346
  }
1347

1348
  // init TLA+ candidate vars
1349
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
4,222,593✔
1350
  if (pSyncNode->pVotesGranted == NULL) {
4,222,593✔
1351
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
×
1352
    goto _error;
×
1353
  }
1354
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
4,221,605✔
1355
  if (pSyncNode->pVotesRespond == NULL) {
4,222,593✔
1356
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
×
1357
    goto _error;
×
1358
  }
1359

1360
  // init TLA+ leader vars
1361
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
4,222,078✔
1362
  if (pSyncNode->pNextIndex == NULL) {
4,221,453✔
1363
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
×
1364
    goto _error;
×
1365
  }
1366
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
4,221,453✔
1367
  if (pSyncNode->pMatchIndex == NULL) {
4,222,593✔
1368
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
×
1369
    goto _error;
×
1370
  }
1371

1372
  // init TLA+ log vars
1373
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
4,221,856✔
1374
  if (pSyncNode->pLogStore == NULL) {
4,222,593✔
1375
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
×
1376
    goto _error;
×
1377
  }
1378

1379
  SyncIndex commitIndex = SYNC_INDEX_INVALID;
4,222,593✔
1380
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
4,222,593✔
1381
    SSnapshot snapshot = {0};
4,222,593✔
1382
    // TODO check return value
1383
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
4,222,593✔
1384
    if (snapshot.lastApplyIndex > commitIndex) {
4,222,345✔
1385
      commitIndex = snapshot.lastApplyIndex;
666,086✔
1386
      sNTrace(pSyncNode, "reset commit index by snapshot");
666,086✔
1387
    }
1388
    pSyncNode->fsmState = snapshot.state;
4,222,345✔
1389
    if (pSyncNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
4,221,253✔
1390
      sError("vgId:%d, fsm state is incomplete.", pSyncNode->vgId);
×
1391
      if (pSyncNode->replicaNum == 1) {
×
1392
        terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1393
        goto _error;
×
1394
      }
1395
    }
1396
  }
1397
  pSyncNode->commitIndex = commitIndex;
4,220,805✔
1398
  sInfo("vgId:%d, sync node commitIndex initialized as %" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
4,221,608✔
1399

1400
  // restore log store on need
1401
  if ((code = syncNodeLogStoreRestoreOnNeed(pSyncNode)) < 0) {
4,223,914✔
1402
    terrno = code;
×
1403
    sError("vgId:%d, failed to restore log store since %s.", pSyncNode->vgId, terrstr());
×
1404
    goto _error;
×
1405
  }
1406

1407
  // timer ms init
1408
  pSyncNode->pingBaseLine = PING_TIMER_MS;
4,222,549✔
1409
  pSyncNode->electBaseLine = electInterval;
4,222,593✔
1410
  pSyncNode->hbBaseLine = heartbeatInterval;
4,222,549✔
1411

1412
  // init ping timer
1413
  pSyncNode->pPingTimer = NULL;
4,222,593✔
1414
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
4,222,593✔
1415
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
4,222,593✔
1416
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
4,222,593✔
1417
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
4,222,593✔
1418
  pSyncNode->pingTimerCounter = 0;
4,222,593✔
1419

1420
  // init elect timer
1421
  pSyncNode->pElectTimer = NULL;
4,222,549✔
1422
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
4,222,593✔
1423
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
4,222,593✔
1424
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
4,222,593✔
1425
  pSyncNode->electTimerCounter = 0;
4,222,593✔
1426

1427
  // init heartbeat timer
1428
  pSyncNode->pHeartbeatTimer = NULL;
4,222,593✔
1429
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
4,222,593✔
1430
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
4,222,593✔
1431
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
4,222,593✔
1432
#ifdef BUILD_NO_CALL
1433
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
1434
#endif
1435
  pSyncNode->heartbeatTimerCounter = 0;
4,222,593✔
1436

1437
  // init peer heartbeat timer
1438
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
67,561,488✔
1439
    if ((code = syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i])) != 0) {
63,338,895✔
1440
      terrno = code;
×
1441
      goto _error;
×
1442
    }
1443
  }
1444

1445
  // tools
1446
  if ((code = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS, &pSyncNode->pSyncRespMgr)) != 0) {
4,222,593✔
1447
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
×
1448
    goto _error;
×
1449
  }
1450
  if (pSyncNode->pSyncRespMgr == NULL) {
4,222,593✔
1451
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
×
1452
    goto _error;
×
1453
  }
1454

1455
  // restore state
1456
  pSyncNode->restoreFinish = false;
4,222,593✔
1457

1458
  // snapshot senders
1459
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
67,559,106✔
1460
    SSyncSnapshotSender* pSender = NULL;
63,336,513✔
1461
    code = snapshotSenderCreate(pSyncNode, i, &pSender);
63,336,241✔
1462
    if (pSender == NULL) return NULL;
63,336,940✔
1463

1464
    pSyncNode->senders[i] = pSender;
63,336,940✔
1465
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
63,337,259✔
1466
  }
1467

1468
  // snapshot receivers
1469
  code = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID, &pSyncNode->pNewNodeReceiver);
4,222,593✔
1470
  if (pSyncNode->pNewNodeReceiver == NULL) return NULL;
4,222,593✔
1471
  sRDebug(pSyncNode->pNewNodeReceiver, "snapshot receiver create while open sync node, data:%p",
4,222,593✔
1472
          pSyncNode->pNewNodeReceiver);
1473

1474
  // is config changing
1475
  pSyncNode->changing = false;
4,222,593✔
1476

1477
  // replication mgr
1478
  if ((code = syncNodeLogReplInit(pSyncNode)) < 0) {
4,222,593✔
1479
    terrno = code;
×
1480
    sError("vgId:%d, failed to init repl mgr since %s.", pSyncNode->vgId, terrstr());
×
1481
    goto _error;
×
1482
  }
1483

1484
  // peer state
1485
  if ((code = syncNodePeerStateInit(pSyncNode)) < 0) {
4,222,593✔
1486
    terrno = code;
×
1487
    sError("vgId:%d, failed to init peer stat since %s.", pSyncNode->vgId, terrstr());
×
1488
    goto _error;
×
1489
  }
1490

1491
  //
1492
  // min match index
1493
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
4,222,593✔
1494

1495
  // start in syncNodeStart
1496
  // start raft
1497

1498
  int64_t timeNow = taosGetTimestampMs();
4,222,593✔
1499
  pSyncNode->startTime = timeNow;
4,222,593✔
1500
  pSyncNode->lastReplicateTime = timeNow;
4,222,593✔
1501

1502
  // snapshotting
1503
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
4,222,593✔
1504

1505
  // init log buffer
1506
  if ((code = syncLogBufferInit(pSyncNode->pLogBuf, pSyncNode)) < 0) {
4,222,593✔
1507
    terrno = code;
×
1508
    sError("vgId:%d, failed to init sync log buffer since %s", pSyncNode->vgId, terrstr());
×
1509
    goto _error;
×
1510
  }
1511

1512
  pSyncNode->isStart = true;
4,221,883✔
1513
  pSyncNode->electNum = 0;
4,221,883✔
1514
  pSyncNode->becomeLeaderNum = 0;
4,221,883✔
1515
  pSyncNode->becomeAssignedLeaderNum = 0;
4,221,883✔
1516
  pSyncNode->configChangeNum = 0;
4,221,883✔
1517
  pSyncNode->hbSlowNum = 0;
4,221,883✔
1518
  pSyncNode->hbrSlowNum = 0;
4,221,883✔
1519
  pSyncNode->tmrRoutineNum = 0;
4,221,883✔
1520

1521
  pSyncNode->snapSeq = -1;
4,221,883✔
1522

1523
  sNInfo(pSyncNode, "sync node opened, node:%p electBaseLine:%d hbBaseLine:%d heartbeatTimeout:%d", pSyncNode,
4,221,883✔
1524
         pSyncNode->electBaseLine, pSyncNode->hbBaseLine, tsHeartbeatTimeout);
1525
  return pSyncNode;
4,222,593✔
1526

UNCOV
1527
_error:
×
UNCOV
1528
  if (pSyncInfo->pFsm) {
×
1529
    taosMemoryFree(pSyncInfo->pFsm);
×
1530
    pSyncInfo->pFsm = NULL;
×
1531
  }
1532
  syncNodeClose(pSyncNode);
×
1533
  pSyncNode = NULL;
×
1534
  return NULL;
×
1535
}
1536

1537
#ifdef BUILD_NO_CALL
1538
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
1539
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
1540
    SSnapshot snapshot = {0};
1541
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
1542
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
1543
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
1544
    }
1545
  }
1546
}
1547
#endif
1548

1549
int32_t syncNodeRestore(SSyncNode* pSyncNode) {
4,219,694✔
1550
  int32_t code = 0;
4,219,694✔
1551
  if (pSyncNode->pLogStore == NULL) {
4,219,694✔
1552
    sError("vgId:%d, log store not created", pSyncNode->vgId);
×
1553
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1554
  }
1555
  if (pSyncNode->pLogBuf == NULL) {
4,222,532✔
1556
    sError("vgId:%d, ring log buffer not created", pSyncNode->vgId);
×
1557
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1558
  }
1559

1560
  (void)taosThreadMutexLock(&pSyncNode->pLogBuf->mutex);
4,222,532✔
1561
  SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
4,222,532✔
1562
  SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore);
4,221,754✔
1563
  SyncIndex endIndex = pSyncNode->pLogBuf->endIndex;
4,222,532✔
1564
  (void)taosThreadMutexUnlock(&pSyncNode->pLogBuf->mutex);
4,222,532✔
1565

1566
  if (lastVer != -1 && endIndex != lastVer + 1) {
4,222,532✔
1567
    code = TSDB_CODE_WAL_LOG_INCOMPLETE;
×
1568
    sWarn("vgId:%d, failed to restore sync node since %s. expected lastLogIndex:%" PRId64 ", lastVer:%" PRId64,
×
1569
          pSyncNode->vgId, terrstr(), endIndex - 1, lastVer);
1570
    // TAOS_RETURN(code);
1571
  }
1572

1573
  // if (endIndex != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
1574
  pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);
4,222,532✔
1575
  sInfo("vgId:%d, restore began, and keep syncing until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
4,222,532✔
1576

1577
  if (pSyncNode->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
8,444,377✔
1578
      (code = syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex, NULL, "restore")) < 0) {
4,222,532✔
1579
    TAOS_RETURN(code);
×
1580
  }
1581

1582
  TAOS_RETURN(code);
4,221,845✔
1583
}
1584

1585
int32_t syncNodeStart(SSyncNode* pSyncNode) {
4,222,532✔
1586
  // start raft
1587
  sInfo("vgId:%d, begin to start sync node", pSyncNode->vgId);
4,222,532✔
1588
  if (pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
4,222,532✔
1589
    syncNodeBecomeLearner(pSyncNode, "first start");
109,315✔
1590
  } else {
1591
    if (pSyncNode->replicaNum == 1) {
4,113,217✔
1592
      raftStoreNextTerm(pSyncNode);
2,908,594✔
1593
      syncNodeBecomeLeader(pSyncNode, "one replica start");
2,907,787✔
1594

1595
      // Raft 3.6.2 Committing entries from previous terms
1596
      TAOS_CHECK_RETURN(syncNodeAppendNoop(pSyncNode));
2,908,594✔
1597
    } else {
1598
      SRaftId id = {0};
1,204,623✔
1599
      syncNodeBecomeFollower(pSyncNode, id, "first start");
1,204,623✔
1600
    }
1601
  }
1602

1603
  int32_t ret = 0;
4,222,532✔
1604
  ret = syncNodeStartPingTimer(pSyncNode);
4,222,532✔
1605
  if (ret != 0) {
4,222,532✔
1606
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, tstrerror(ret));
×
1607
  }
1608
  sInfo("vgId:%d, sync node started", pSyncNode->vgId);
4,222,532✔
1609
  return ret;
4,222,532✔
1610
}
1611

1612
#ifdef BUILD_NO_CALL
1613
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
1614
  // state change
1615
  int32_t code = 0;
1616
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
1617
  pSyncNode->roleTimeMs = taosGetTimestampMs();
1618
  // TODO check return value
1619
  TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1620

1621
  // reset elect timer, long enough
1622
  int32_t electMS = TIMER_MAX_MS;
1623
  code = syncNodeRestartElectTimer(pSyncNode, electMS);
1624
  if (code < 0) {
1625
    sError("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, terrstr());
1626
    return -1;
1627
  }
1628

1629
  code = syncNodeStartPingTimer(pSyncNode);
1630
  if (code < 0) {
1631
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr());
1632
    return -1;
1633
  }
1634
  return code;
1635
}
1636
#endif
1637

1638
void syncNodePreClose(SSyncNode* pSyncNode) {
4,222,532✔
1639
  int32_t code = 0;
4,222,532✔
1640
  if (pSyncNode == NULL) {
4,222,532✔
1641
    sError("failed to pre close sync node since sync node is null");
×
1642
    return;
×
1643
  }
1644
  if (pSyncNode->pFsm == NULL) {
4,222,532✔
1645
    sError("failed to pre close sync node since fsm is null");
×
1646
    return;
×
1647
  }
1648
  if (pSyncNode->pFsm->FpApplyQueueItems == NULL) {
4,222,532✔
1649
    sError("failed to pre close sync node since FpApplyQueueItems is null");
×
1650
    return;
×
1651
  }
1652

1653
  // stop elect timer
1654
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
4,222,369✔
1655
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1656
    return;
×
1657
  }
1658

1659
  // stop heartbeat timer
1660
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
4,222,059✔
1661
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
1662
    return;
×
1663
  }
1664

1665
  // stop ping timer
1666
  if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) {
4,222,532✔
1667
    sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code));
×
1668
    return;
×
1669
  }
1670

1671
  // clean rsp
1672
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
4,222,532✔
1673
}
1674

1675
void syncNodePostClose(SSyncNode* pSyncNode) {
3,816,499✔
1676
  if (pSyncNode->pNewNodeReceiver != NULL) {
3,816,499✔
1677
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
3,816,499✔
1678
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
1679
    }
1680

1681
    sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
3,816,499✔
1682
           pSyncNode->pNewNodeReceiver);
1683
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
3,816,499✔
1684
    pSyncNode->pNewNodeReceiver = NULL;
3,816,499✔
1685
  }
1686
}
3,816,499✔
1687

1688
void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }
965,280✔
1689

1690
void syncNodeClose(SSyncNode* pSyncNode) {
4,209,655✔
1691
  int32_t code = 0;
4,209,655✔
1692
  if (pSyncNode == NULL) return;
4,209,655✔
1693
  sNInfo(pSyncNode, "sync close, node:%p", pSyncNode);
4,209,655✔
1694

1695
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
4,209,655✔
1696

1697
  if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) {
4,210,243✔
1698
    sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code));
×
1699
    return;
×
1700
  }
1701
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
4,210,243✔
1702
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1703
    return;
×
1704
  }
1705
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
4,210,243✔
1706
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
1707
    return;
×
1708
  }
1709
  syncNodeLogReplDestroy(pSyncNode);
4,210,243✔
1710

1711
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
4,210,243✔
1712
  pSyncNode->pSyncRespMgr = NULL;
4,210,243✔
1713
  voteGrantedDestroy(pSyncNode->pVotesGranted);
4,210,243✔
1714
  pSyncNode->pVotesGranted = NULL;
4,209,905✔
1715
  votesRespondDestory(pSyncNode->pVotesRespond);
4,209,905✔
1716
  pSyncNode->pVotesRespond = NULL;
4,210,243✔
1717
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
4,210,243✔
1718
  pSyncNode->pNextIndex = NULL;
4,210,243✔
1719
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
4,210,243✔
1720
  pSyncNode->pMatchIndex = NULL;
4,209,559✔
1721
  logStoreDestory(pSyncNode->pLogStore);
4,209,559✔
1722
  pSyncNode->pLogStore = NULL;
4,210,243✔
1723
  syncLogBufferDestroy(pSyncNode->pLogBuf);
4,210,243✔
1724
  pSyncNode->pLogBuf = NULL;
4,210,243✔
1725

1726
  (void)taosThreadMutexDestroy(&pSyncNode->arbTokenMutex);
4,210,037✔
1727

1728
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
67,361,976✔
1729
    if (pSyncNode->senders[i] != NULL) {
63,151,733✔
1730
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", pSyncNode->vgId, pSyncNode->senders[i]);
63,152,789✔
1731

1732
      if (snapshotSenderIsStart(pSyncNode->senders[i])) {
63,152,789✔
UNCOV
1733
        snapshotSenderStop(pSyncNode->senders[i], false);
×
1734
      }
1735

1736
      snapshotSenderDestroy(pSyncNode->senders[i]);
63,153,034✔
1737
      pSyncNode->senders[i] = NULL;
63,150,439✔
1738
    }
1739
  }
1740

1741
  if (pSyncNode->pNewNodeReceiver != NULL) {
4,210,243✔
1742
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
406,033✔
1743
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
1744
    }
1745

1746
    sDebug("vgId:%d, snapshot receiver destroy while close, data:%p", pSyncNode->vgId, pSyncNode->pNewNodeReceiver);
406,033✔
1747
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
406,033✔
1748
    pSyncNode->pNewNodeReceiver = NULL;
406,033✔
1749
  }
1750

1751
  if (pSyncNode->pFsm != NULL) {
4,210,243✔
1752
    taosMemoryFree(pSyncNode->pFsm);
4,210,243✔
1753
  }
1754

1755
  raftStoreClose(pSyncNode);
4,209,762✔
1756

1757
  taosMemoryFree(pSyncNode);
4,210,243✔
1758
}
1759

1760
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->raftCfg.snapshotStrategy; }
×
1761

1762
// timer control --------------
1763
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
4,222,532✔
1764
  int32_t code = 0;
4,222,532✔
1765
  if (syncIsInit()) {
4,222,532✔
1766
    bool stopped = taosTmrResetPriority(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, (void*)pSyncNode->rid,
8,444,510✔
1767
                                        syncEnv()->pTimerManager, &pSyncNode->pPingTimer, 2);
4,222,532✔
1768
    if (stopped) {
4,222,532✔
1769
      sError("vgId:%d, failed to reset ping timer, ms:%d, stopped:%d", pSyncNode->vgId, pSyncNode->pingTimerMS,
×
1770
             stopped);
1771
    }
1772
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
4,222,532✔
1773
  } else {
1774
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
×
1775
  }
1776
  return code;
4,222,532✔
1777
}
1778

1779
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
8,432,775✔
1780
  int32_t code = 0;
8,432,775✔
1781
  (void)atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
8,432,775✔
1782
  bool stop = taosTmrStop(pSyncNode->pPingTimer);
8,432,254✔
1783
  sDebug("vgId:%d, stop ping timer, stop:%d", pSyncNode->vgId, stop);
8,432,775✔
1784
  pSyncNode->pPingTimer = NULL;
8,432,775✔
1785
  return code;
8,432,775✔
1786
}
1787

1788
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
112,684,859✔
1789
  int32_t code = 0;
112,684,859✔
1790
  if (syncIsInit()) {
112,684,859✔
1791
    pSyncNode->electTimerMS = ms;
112,684,595✔
1792

1793
    int64_t execTime = taosGetTimestampMs() + ms;
112,685,160✔
1794
    atomic_store_64(&(pSyncNode->electTimerParam.executeTime), execTime);
112,685,160✔
1795
    atomic_store_64(&(pSyncNode->electTimerParam.logicClock), pSyncNode->electTimerLogicClock);
112,685,160✔
1796
    pSyncNode->electTimerParam.pSyncNode = pSyncNode;
112,684,474✔
1797
    pSyncNode->electTimerParam.pData = NULL;
112,684,474✔
1798

1799
    bool stopped = taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid),
225,367,866✔
1800
                                syncEnv()->pTimerManager, &pSyncNode->pElectTimer);
112,684,173✔
1801
    if (stopped) sWarn("vgId:%d, failed to reset elect timer, ms:%d", pSyncNode->vgId, ms);
112,685,160✔
1802
  } else {
1803
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
×
1804
  }
1805
  return code;
112,685,160✔
1806
}
1807

1808
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
124,445,482✔
1809
  int32_t code = 0;
124,445,482✔
1810
  (void)atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
124,445,482✔
1811
  bool stop = taosTmrStop(pSyncNode->pElectTimer);
124,447,282✔
1812
  sTrace("vgId:%d, stop elect timer, stop:%d", pSyncNode->vgId, stop);
124,445,760✔
1813
  pSyncNode->pElectTimer = NULL;
124,445,760✔
1814

1815
  return code;
124,446,621✔
1816
}
1817

1818
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
112,685,160✔
1819
  int32_t ret = 0;
112,685,160✔
1820
  TAOS_CHECK_RETURN(syncNodeStopElectTimer(pSyncNode));
112,685,160✔
1821
  TAOS_CHECK_RETURN(syncNodeStartElectTimer(pSyncNode, ms));
112,685,160✔
1822
  return ret;
112,685,160✔
1823
}
1824

1825
void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
112,685,160✔
1826
  int32_t code = 0;
112,685,160✔
1827
  int32_t electMS;
1828

1829
  if (pSyncNode->raftCfg.isStandBy) {
112,685,160✔
1830
    electMS = TIMER_MAX_MS;
×
1831
  } else {
1832
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
112,685,160✔
1833
  }
1834

1835
  if ((code = syncNodeRestartElectTimer(pSyncNode, electMS)) != 0) {
112,685,160✔
1836
    sWarn("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1837
    return;
×
1838
  };
1839

1840
  sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
112,685,160✔
1841
          electMS);
1842
}
1843

1844
#ifdef BUILD_NO_CALL
1845
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
1846
  int32_t code = 0;
1847
  if (syncIsInit()) {
1848
    TAOS_CHECK_RETURN(taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, (void*)pSyncNode->rid,
1849
                                   syncEnv()->pTimerManager, &pSyncNode->pHeartbeatTimer));
1850
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
1851
  } else {
1852
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1853
  }
1854

1855
  sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
1856
  return code;
1857
}
1858
#endif
1859

1860
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
3,646,256✔
1861
  int32_t ret = 0;
3,646,256✔
1862

1863
#if 0
1864
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1865
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
1866
#endif
1867

1868
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
4,611,860✔
1869
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
965,604✔
1870
    if (pSyncTimer != NULL) {
965,604✔
1871
      TAOS_CHECK_RETURN(syncHbTimerStart(pSyncNode, pSyncTimer));
965,604✔
1872
    }
1873
  }
1874

1875
  return ret;
3,646,755✔
1876
}
1877

1878
int32_t syncNodeSetHeartbeatTimerMs(SSyncNode* pSyncNode, int32_t ms) {
×
1879
  int32_t code = 0;
×
1880

1881
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
×
1882
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
×
1883
    if (pSyncTimer != NULL) {
×
1884
      syncHBSetTimerMS(pSyncTimer, ms);
×
1885
    }
1886
  }
1887

1888
  return code;
×
1889
}
1890

1891
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
10,821,758✔
1892
  int32_t code = 0;
10,821,758✔
1893

1894
#if 0
1895
  TAOS_CHECK_RETURN(atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1));
1896
  bool stop = taosTmrStop(pSyncNode->pHeartbeatTimer);
1897
  sDebug("vgId:%d, stop heartbeat timer, stop:%d", pSyncNode->vgId, stop);
1898
  pSyncNode->pHeartbeatTimer = NULL;
1899
#endif
1900

1901
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
21,095,986✔
1902
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
10,273,966✔
1903
    if (pSyncTimer != NULL) {
10,274,228✔
1904
      TAOS_CHECK_RETURN(syncHbTimerStop(pSyncNode, pSyncTimer));
10,274,228✔
1905
    }
1906
  }
1907

1908
  return code;
10,821,448✔
1909
}
1910

1911
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode, int32_t heartbeatInterval) {
×
1912
  int32_t code = 0;
×
1913
  sInfo("vgId:%d, sync Node Restart HeartbeatTimer, state=%d", pSyncNode->vgId, pSyncNode->state);
×
1914
  TAOS_CHECK_RETURN(syncNodeSetHeartbeatTimerMs(pSyncNode, heartbeatInterval));
×
1915
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
×
1916
    TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
×
1917
    TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
×
1918
  }
1919

1920
  return 0;
×
1921
}
1922

1923
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pNode, SRpcMsg* pMsg) {
246,039,906✔
1924
  SEpSet* epSet = NULL;
246,039,906✔
1925
  for (int32_t i = 0; i < pNode->peersNum; ++i) {
387,203,155✔
1926
    if (destRaftId->addr == pNode->peersId[i].addr) {
387,185,670✔
1927
      epSet = &pNode->peersEpset[i];
246,019,629✔
1928
      break;
246,021,505✔
1929
    }
1930
  }
1931

1932
  int32_t code = -1;
246,040,821✔
1933
  if (pNode->syncSendMSg != NULL && epSet != NULL) {
246,040,821✔
1934
    syncUtilMsgHtoN(pMsg->pCont);
246,021,024✔
1935
    pMsg->info.noResp = 1;
246,013,849✔
1936
    code = pNode->syncSendMSg(epSet, pMsg);
246,015,936✔
1937
  }
1938

1939
  if (code < 0) {
246,046,248✔
1940
    sError("vgId:%d, failed to send sync msg since %s. epset:%p dnode:%d addr:0x%" PRIx64, pNode->vgId, tstrerror(code),
21,054✔
1941
           epSet, DID(destRaftId), destRaftId->addr);
1942
    rpcFreeCont(pMsg->pCont);
21,054✔
1943
    pMsg->pCont = NULL;
21,054✔
1944
  }
1945

1946
  TAOS_RETURN(code);
246,046,248✔
1947
}
1948

1949
inline bool syncNodeInConfig(SSyncNode* pNode, const SSyncCfg* pCfg) {
441,110✔
1950
  bool b1 = false;
441,110✔
1951
  bool b2 = false;
441,110✔
1952

1953
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
512,262✔
1954
    if (strcmp(pCfg->nodeInfo[i].nodeFqdn, pNode->myNodeInfo.nodeFqdn) == 0 &&
512,262✔
1955
        pCfg->nodeInfo[i].nodePort == pNode->myNodeInfo.nodePort) {
512,262✔
1956
      b1 = true;
441,110✔
1957
      break;
441,110✔
1958
    }
1959
  }
1960

1961
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
512,262✔
1962
    SRaftId raftId = {
512,262✔
1963
        .addr = SYNC_ADDR(&pCfg->nodeInfo[i]),
512,262✔
1964
        .vgId = pNode->vgId,
512,262✔
1965
    };
1966

1967
    if (syncUtilSameId(&raftId, &pNode->myRaftId)) {
512,262✔
1968
      b2 = true;
441,110✔
1969
      break;
441,110✔
1970
    }
1971
  }
1972

1973
  if (b1 != b2) {
441,110✔
1974
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1975
    return false;
×
1976
  }
1977
  return b1;
441,110✔
1978
}
1979

1980
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
1,129,994✔
1981
  if (pOldCfg->totalReplicaNum != pNewCfg->totalReplicaNum) return true;
1,129,994✔
1982
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
616,671✔
1983
  for (int32_t i = 0; i < pOldCfg->totalReplicaNum; ++i) {
1,635,775✔
1984
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
1,330,883✔
1985
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
1,330,883✔
1986
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
1,330,883✔
1987
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
1,330,883✔
1988
    if (pOldInfo->nodeRole != pNewInfo->nodeRole) return true;
1,330,883✔
1989
  }
1990

1991
  return false;
304,892✔
1992
}
1993

1994
int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
350,298✔
1995
  int32_t  code = 0;
350,298✔
1996
  SSyncCfg oldConfig = pSyncNode->raftCfg.cfg;
350,298✔
1997
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
350,298✔
1998
    sInfo("vgId:1, sync not reconfig since not changed");
304,892✔
1999
    return 0;
304,892✔
2000
  }
2001

2002
  pSyncNode->raftCfg.cfg = *pNewConfig;
45,406✔
2003
  pSyncNode->raftCfg.lastConfigIndex = lastConfigChangeIndex;
45,406✔
2004

2005
  pSyncNode->configChangeNum++;
45,406✔
2006

2007
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
45,406✔
2008
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
45,406✔
2009

2010
  bool isDrop = false;
45,406✔
2011
  bool isAdd = false;
45,406✔
2012

2013
  if (IamInOld && !IamInNew) {
45,406✔
2014
    isDrop = true;
×
2015
  } else {
2016
    isDrop = false;
45,406✔
2017
  }
2018

2019
  if (!IamInOld && IamInNew) {
45,406✔
2020
    isAdd = true;
×
2021
  } else {
2022
    isAdd = false;
45,406✔
2023
  }
2024

2025
  // log begin config change
2026
  sNInfo(pSyncNode, "begin do config change, from %d to %d, from %" PRId64 " to %" PRId64 ", replicas:%d",
45,406✔
2027
         pSyncNode->vgId, oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum, oldConfig.lastIndex,
2028
         pNewConfig->lastIndex);
2029

2030
  if (IamInNew) {
45,406✔
2031
    pSyncNode->raftCfg.isStandBy = 0;  // change isStandBy to normal
45,406✔
2032
  }
2033
  if (isDrop) {
45,406✔
2034
    pSyncNode->raftCfg.isStandBy = 1;  // set standby
×
2035
  }
2036

2037
  // add last config index
2038
  SRaftCfg* pCfg = &pSyncNode->raftCfg;
45,406✔
2039
  if (pCfg->configIndexCount >= MAX_CONFIG_INDEX_COUNT) {
45,406✔
2040
    sNError(pSyncNode, "failed to add cfg index:%d since out of range", pCfg->configIndexCount);
×
2041
    terrno = TSDB_CODE_OUT_OF_RANGE;
×
2042
    return -1;
×
2043
  }
2044

2045
  pCfg->configIndexArr[pCfg->configIndexCount] = lastConfigChangeIndex;
45,406✔
2046
  pCfg->configIndexCount++;
45,406✔
2047

2048
  if (IamInNew) {
45,406✔
2049
    //-----------------------------------------
2050
    int32_t ret = 0;
45,406✔
2051

2052
    // save snapshot senders
2053
    SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
45,406✔
2054
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
45,406✔
2055
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
45,406✔
2056
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
726,496✔
2057
      oldSenders[i] = pSyncNode->senders[i];
681,090✔
2058
      sSTrace(oldSenders[i], "snapshot sender save old");
681,090✔
2059
    }
2060

2061
    // init internal
2062
    pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
45,406✔
2063
    if (syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId) == false) return terrno;
45,406✔
2064

2065
    // init peersNum, peers, peersId
2066
    pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
45,406✔
2067
    int32_t j = 0;
45,406✔
2068
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
165,228✔
2069
      if (i != pSyncNode->raftCfg.cfg.myIndex) {
119,822✔
2070
        pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
74,416✔
2071
        syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
74,416✔
2072
        j++;
74,416✔
2073
      }
2074
    }
2075
    for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
119,822✔
2076
      if (syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]) == false)
74,416✔
2077
        return terrno;
×
2078
    }
2079

2080
    // init replicaNum, replicasId
2081
    pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
45,406✔
2082
    pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
45,406✔
2083
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
165,228✔
2084
      if (syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]) ==
119,822✔
2085
          false)
2086
        return terrno;
×
2087
    }
2088

2089
    // update quorum first
2090
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
45,406✔
2091

2092
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
45,406✔
2093
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
45,406✔
2094
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
45,406✔
2095
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
45,406✔
2096

2097
    // reset snapshot senders
2098

2099
    // clear new
2100
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
726,496✔
2101
      pSyncNode->senders[i] = NULL;
681,090✔
2102
    }
2103

2104
    // reset new
2105
    for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
165,228✔
2106
      // reset sender
2107
      bool reset = false;
119,822✔
2108
      for (int32_t j = 0; j < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++j) {
517,473✔
2109
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j]) && oldSenders[j] != NULL) {
495,527✔
2110
          sNTrace(pSyncNode, "snapshot sender reset for:%" PRId64 ", newIndex:%d, dnode:%d, %p",
97,876✔
2111
                  (pSyncNode->replicasId)[i].addr, i, DID(&pSyncNode->replicasId[i]), oldSenders[j]);
2112

2113
          pSyncNode->senders[i] = oldSenders[j];
97,876✔
2114
          oldSenders[j] = NULL;
97,876✔
2115
          reset = true;
97,876✔
2116

2117
          // reset replicaIndex
2118
          int32_t oldreplicaIndex = pSyncNode->senders[i]->replicaIndex;
97,876✔
2119
          pSyncNode->senders[i]->replicaIndex = i;
97,876✔
2120

2121
          sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, dnode:%d, %p, reset:%d",
97,876✔
2122
                  oldreplicaIndex, i, DID(&pSyncNode->replicasId[i]), pSyncNode->senders[i], reset);
2123

2124
          break;
97,876✔
2125
        }
2126
      }
2127
    }
2128

2129
    // create new
2130
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
726,496✔
2131
      if (pSyncNode->senders[i] == NULL) {
681,090✔
2132
        TAOS_CHECK_RETURN(snapshotSenderCreate(pSyncNode, i, &pSyncNode->senders[i]));
583,214✔
2133
        if (pSyncNode->senders[i] == NULL) {
583,214✔
2134
          // will be created later while send snapshot
2135
          sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig");
×
2136
        } else {
2137
          sSDebug(pSyncNode->senders[i], "snapshot sender create while reconfig, data:%p", pSyncNode->senders[i]);
583,214✔
2138
        }
2139
      } else {
2140
        sSDebug(pSyncNode->senders[i], "snapshot sender already exist, data:%p", pSyncNode->senders[i]);
97,876✔
2141
      }
2142
    }
2143

2144
    // free old
2145
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
726,496✔
2146
      if (oldSenders[i] != NULL) {
681,090✔
2147
        sSDebug(oldSenders[i], "snapshot sender destroy old, data:%p replica-index:%d", oldSenders[i], i);
583,214✔
2148
        snapshotSenderDestroy(oldSenders[i]);
583,214✔
2149
        oldSenders[i] = NULL;
583,214✔
2150
      }
2151
    }
2152

2153
    // persist cfg
2154
    TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode, "config_change_with_new_members"));
45,406✔
2155
  } else {
2156
    // persist cfg
2157
    TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode, "config_change_no_new_members"));
×
2158
    sNInfo(pSyncNode, "do not config change from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
×
2159
  }
2160

2161
_END:
×
2162
  // log end config change
2163
  sNInfo(pSyncNode, "end do config change, from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
45,406✔
2164
  return 0;
45,406✔
2165
}
2166

2167
// raft state change --------------
2168
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
222,617✔
2169
  if (term > raftStoreGetTerm(pSyncNode)) {
222,617✔
2170
    raftStoreSetTerm(pSyncNode, term);
×
2171
  }
2172
}
222,617✔
2173

2174
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm, SRaftId id, char* strFrom) {
87,910,519✔
2175
  SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
87,910,519✔
2176
  if (currentTerm > newTerm) {
87,911,227✔
2177
    sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
×
2178
    return;
×
2179
  }
2180

2181
  do {
2182
    sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
87,911,227✔
2183
  } while (0);
2184

2185
  if (pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
87,911,227✔
2186
    (void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
×
2187
    syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncNode->vgId, pSyncNode->arbToken);
×
2188
    sInfo("vgId:%d, generate arb token, will step down from assigned leader, new arbToken:%s", pSyncNode->vgId,
×
2189
          pSyncNode->arbToken);
2190
    (void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
×
2191
  }
2192

2193
  if (currentTerm < newTerm) {
87,909,830✔
2194
    raftStoreSetTerm(pSyncNode, newTerm);
856,114✔
2195
    char tmpBuf[64];
856,082✔
2196
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64 " from %" PRId64 ", since %s", newTerm,
856,114✔
2197
             currentTerm, strFrom);
2198
    syncNodeBecomeFollower(pSyncNode, id, tmpBuf);
856,114✔
2199
    raftStoreClearVote(pSyncNode);
856,114✔
2200
  } else {
2201
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
87,053,716✔
2202
      char tmpBuf[64];
4,089✔
2203
      snprintf(tmpBuf, sizeof(tmpBuf), "step down, with same term to %" PRId64 " from %" PRId64 ", since %s", newTerm, 
4,089✔
2204
               currentTerm, strFrom);
2205
      syncNodeBecomeFollower(pSyncNode, id, tmpBuf);
4,089✔
2206
    }
2207
  }
2208
}
2209

2210
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }
2,068,927✔
2211

2212
void syncNodeBecomeFollower(SSyncNode* pSyncNode, SRaftId leaderId, const char* debugStr) {
2,068,927✔
2213
  int32_t code = 0;  // maybe clear leader cache
2,068,927✔
2214
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
2,068,927✔
2215
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
7,007✔
2216
    pSyncNode->leaderCacheEp.port = 0;
7,007✔
2217
    pSyncNode->leaderCacheEp.fqdn[0] = '\0';
7,007✔
2218
  }
2219

2220
  pSyncNode->hbSlowNum = 0;
2,068,927✔
2221

2222
  pSyncNode->leaderCache = leaderId;  // state change
2,068,927✔
2223

2224
  for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
6,709,127✔
2225
    if (syncUtilSameId(&pSyncNode->replicasId[i], &leaderId)) {
5,500,403✔
2226
      pSyncNode->leaderCacheEp.port = pSyncNode->raftCfg.cfg.nodeInfo[i].nodePort;
860,203✔
2227
      strncpy(pSyncNode->leaderCacheEp.fqdn, pSyncNode->raftCfg.cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
860,203✔
2228
      break;
860,203✔
2229
    }
2230
  }
2231
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
2,068,927✔
2232
  pSyncNode->roleTimeMs = taosGetTimestampMs();
2,068,927✔
2233
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
2,068,927✔
2234
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2235
    return;
×
2236
  }
2237

2238
  // trace log
2239
  sNTrace(pSyncNode, "become follower %s", debugStr);
2,068,927✔
2240

2241
  // send rsp to client
2242
  syncNodeLeaderChangeRsp(pSyncNode);
2,068,927✔
2243

2244
  // call back
2245
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
2,068,927✔
2246
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
2,068,927✔
2247
  }
2248

2249
  // min match index
2250
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
2,068,927✔
2251

2252
  // reset log buffer
2253
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
2,068,927✔
2254
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2255
    return;
×
2256
  }
2257

2258
  // reset elect timer
2259
  syncNodeResetElectTimer(pSyncNode);
2,068,927✔
2260

2261
  sInfo("vgId:%d, become follower. %s", pSyncNode->vgId, debugStr);
2,068,927✔
2262
}
2263

2264
void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) {
109,315✔
2265
  pSyncNode->hbSlowNum = 0;
109,315✔
2266

2267
  // state change
2268
  pSyncNode->state = TAOS_SYNC_STATE_LEARNER;
109,315✔
2269
  pSyncNode->roleTimeMs = taosGetTimestampMs();
109,315✔
2270

2271
  // trace log
2272
  sNTrace(pSyncNode, "become learner %s", debugStr);
109,315✔
2273

2274
  // call back
2275
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLearnerCb != NULL) {
109,315✔
2276
    pSyncNode->pFsm->FpBecomeLearnerCb(pSyncNode->pFsm);
109,315✔
2277
  }
2278

2279
  // min match index
2280
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
109,315✔
2281

2282
  // reset log buffer
2283
  int32_t code = 0;
109,315✔
2284
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
109,315✔
2285
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2286
    return;
×
2287
  };
2288
}
2289

2290
// TLA+ Spec
2291
// \* Candidate i transitions to leader.
2292
// BecomeLeader(i) ==
2293
//     /\ state[i] = Candidate
2294
//     /\ votesGranted[i] \in Quorum
2295
//     /\ state'      = [state EXCEPT ![i] = Leader]
2296
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
2297
//                          [j \in Server |-> Len(log[i]) + 1]]
2298
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
2299
//                          [j \in Server |-> 0]]
2300
//     /\ elections'  = elections \cup
2301
//                          {[eterm     |-> currentTerm[i],
2302
//                            eleader   |-> i,
2303
//                            elog      |-> log[i],
2304
//                            evotes    |-> votesGranted[i],
2305
//                            evoterLog |-> voterLog[i]]}
2306
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
2307
//
2308
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
3,329,477✔
2309
  int32_t code = 0;
3,329,477✔
2310
  pSyncNode->becomeLeaderNum++;
3,329,477✔
2311
  pSyncNode->hbrSlowNum = 0;
3,329,864✔
2312

2313
  // reset restoreFinish
2314
  pSyncNode->restoreFinish = false;
3,329,155✔
2315

2316
  // state change
2317
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
3,327,975✔
2318
  pSyncNode->roleTimeMs = taosGetTimestampMs();
3,328,140✔
2319

2320
  // set leader cache
2321
  pSyncNode->leaderCache = pSyncNode->myRaftId;
3,328,849✔
2322
  strncpy(pSyncNode->leaderCacheEp.fqdn, pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeFqdn,
3,328,751✔
2323
          TSDB_FQDN_LEN);
2324
  pSyncNode->leaderCacheEp.port = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodePort;
3,330,157✔
2325

2326
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
7,483,600✔
2327
    SyncIndex lastIndex;
4,157,529✔
2328
    SyncTerm  lastTerm;
4,157,529✔
2329
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
4,158,067✔
2330
    if (code != 0) {
4,154,857✔
2331
      sError("vgId:%d, failed to become leader since %s", pSyncNode->vgId, tstrerror(code));
×
2332
      return;
×
2333
    }
2334
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
4,154,857✔
2335
  }
2336

2337
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
7,481,424✔
2338
    // maybe overwrite myself, no harm
2339
    // just do it!
2340
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
4,156,201✔
2341
  }
2342

2343
  // init peer mgr
2344
  if ((code = syncNodePeerStateInit(pSyncNode)) != 0) {
3,327,028✔
2345
    sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code));
×
2346
    return;
×
2347
  }
2348

2349
#if 0
2350
  // update sender private term
2351
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
2352
  if (pMySender != NULL) {
2353
    for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
2354
      if (pSyncNode->senders[i]->privateTerm > pMySender->privateTerm) {
2355
        pMySender->privateTerm = pSyncNode->senders[i]->privateTerm;
2356
      }
2357
    }
2358
    (pMySender->privateTerm) += 100;
2359
  }
2360
#endif
2361

2362
  // close receiver
2363
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
3,329,819✔
2364
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
2365
  }
2366

2367
  // stop elect timer
2368
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
3,330,013✔
2369
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
2370
    return;
×
2371
  }
2372

2373
  // start heartbeat timer
2374
  if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) {
3,326,874✔
2375
    sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2376
    return;
×
2377
  }
2378

2379
  // send heartbeat right now
2380
  if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) {
3,326,683✔
2381
    sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code));
×
2382
    return;
×
2383
  }
2384

2385
  // call back
2386
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
3,326,747✔
2387
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
3,327,887✔
2388
  }
2389

2390
  // min match index
2391
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
3,328,047✔
2392

2393
  // reset log buffer
2394
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
3,330,284✔
2395
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2396
    return;
×
2397
  }
2398

2399
  // trace log
2400
  sNInfo(pSyncNode, "node become leader, %s", debugStr);
3,330,284✔
2401
}
2402

2403
void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) {
16✔
2404
  int32_t code = 0;
16✔
2405
  pSyncNode->becomeAssignedLeaderNum++;
16✔
2406
  pSyncNode->hbrSlowNum = 0;
16✔
2407

2408
  // reset restoreFinish
2409
  // pSyncNode->restoreFinish = false;
2410

2411
  // state change
2412
  pSyncNode->state = TAOS_SYNC_STATE_ASSIGNED_LEADER;
16✔
2413
  pSyncNode->roleTimeMs = taosGetTimestampMs();
16✔
2414

2415
  // set leader cache
2416
  pSyncNode->leaderCache = pSyncNode->myRaftId;
16✔
2417

2418
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
48✔
2419
    SyncIndex lastIndex;
×
2420
    SyncTerm  lastTerm;
×
2421
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
32✔
2422
    if (code != 0) {
32✔
2423
      sError("vgId:%d, failed to become assigned leader since %s", pSyncNode->vgId, tstrerror(code));
×
2424
      return;
×
2425
    }
2426
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
32✔
2427
  }
2428

2429
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
48✔
2430
    // maybe overwrite myself, no harm
2431
    // just do it!
2432
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
32✔
2433
  }
2434

2435
  // init peer mgr
2436
  if ((code = syncNodePeerStateInit(pSyncNode)) != 0) {
16✔
2437
    sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code));
×
2438
    return;
×
2439
  }
2440

2441
  // close receiver
2442
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
16✔
2443
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
2444
  }
2445

2446
  // stop elect timer
2447
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
16✔
2448
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
2449
    return;
×
2450
  }
2451

2452
  // start heartbeat timer
2453
  if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) {
16✔
2454
    sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2455
    return;
×
2456
  }
2457

2458
  // send heartbeat right now
2459
  if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) {
16✔
2460
    sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code));
×
2461
    return;
×
2462
  }
2463

2464
  // call back
2465
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeAssignedLeaderCb != NULL) {
16✔
2466
    pSyncNode->pFsm->FpBecomeAssignedLeaderCb(pSyncNode->pFsm);
16✔
2467
  }
2468

2469
  // min match index
2470
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
16✔
2471

2472
  // reset log buffer
2473
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
16✔
2474
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2475
    return;
×
2476
  }
2477

2478
  // trace log
2479
  sNInfo(pSyncNode, "become assigned leader");
16✔
2480
}
2481

2482
void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
421,690✔
2483
  if (pSyncNode->state != TAOS_SYNC_STATE_CANDIDATE) {
421,690✔
2484
    sError("vgId:%d, failed leader from candidate since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state);
×
2485
    return;
×
2486
  }
2487
  bool granted = voteGrantedMajority(pSyncNode->pVotesGranted);
421,690✔
2488
  if (!granted) {
421,690✔
2489
    sError("vgId:%d, not granted by majority.", pSyncNode->vgId);
×
2490
    return;
×
2491
  }
2492
  syncNodeBecomeLeader(pSyncNode, "from candidate to leader");
421,690✔
2493

2494
  sNTrace(pSyncNode, "state change syncNodeCandidate2Leader");
421,690✔
2495

2496
  int32_t ret = syncNodeAppendNoop(pSyncNode);
421,690✔
2497
  if (ret < 0) {
421,690✔
2498
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, terrstr());
×
2499
  }
2500

2501
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
421,690✔
2502

2503
  sInfo("vgId:%d, become leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64, pSyncNode->vgId,
421,690✔
2504
        raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
2505
}
2506

2507
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }
63,980,414✔
2508

2509
int32_t syncSetElectBaseline(int64_t rid, int32_t ms){
24,578✔
2510
  int32_t code = 0;
24,578✔
2511
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
24,578✔
2512
  if (pSyncNode == NULL) {
24,578✔
2513
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
2514
    if (terrno != 0) code = terrno;
×
2515
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
2516
    TAOS_RETURN(code);
×
2517
  }
2518
  pSyncNode->electBaseLine = ms;
24,578✔
2519
  syncNodeResetElectTimer(pSyncNode);
24,578✔
2520
  return code;
24,578✔
2521
}
2522

2523
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
7,549,556✔
2524
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
120,786,208✔
2525
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
113,233,780✔
2526
    pSyncNode->peerStates[i].lastSendTime = 0;
113,236,839✔
2527
  }
2528

2529
  return 0;
7,552,428✔
2530
}
2531

2532
void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
468,799✔
2533
  if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
468,799✔
2534
    sError("vgId:%d, failed candidate from follower since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state);
×
2535
    return;
×
2536
  }
2537
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
468,799✔
2538
  pSyncNode->roleTimeMs = taosGetTimestampMs();
468,799✔
2539
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
468,799✔
2540
  sInfo("vgId:%d, become candidate from follower. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
468,799✔
2541
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
2542

2543
  sNTrace(pSyncNode, "follower to candidate");
468,799✔
2544
}
2545

2546
int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) {
×
2547
  if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) return TSDB_CODE_SYN_INTERNAL_ERROR;
×
2548
  syncNodeBecomeLeader(pSyncNode, "assigned leader to leader");
×
2549

2550
  sNTrace(pSyncNode, "assigned leader to leader");
×
2551

2552
  int32_t ret = syncNodeAppendNoop(pSyncNode);
×
2553
  if (ret < 0) {
×
2554
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, tstrerror(ret));
×
2555
  }
2556

2557
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
×
2558
  sInfo("vgId:%d, become leader from assigned leader. term:%" PRId64 ", commit index:%" PRId64
×
2559
        "assigned commit index:%" PRId64 ", last index:%" PRId64,
2560
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, pSyncNode->assignedCommitIndex,
2561
        lastIndex);
2562
  return 0;
×
2563
}
2564

2565
// just called by syncNodeVoteForSelf
2566
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
497,118✔
2567
  SyncTerm storeTerm = raftStoreGetTerm(pSyncNode);
497,118✔
2568
  if (term != storeTerm) {
497,118✔
2569
    sError("vgId:%d, failed to vote for term, term:%" PRId64 ", storeTerm:%" PRId64, pSyncNode->vgId, term, storeTerm);
×
2570
    return;
×
2571
  }
2572
  sTrace("vgId:%d, begin hasVoted", pSyncNode->vgId);
497,118✔
2573
  bool voted = raftStoreHasVoted(pSyncNode);
497,118✔
2574
  if (voted) {
497,118✔
2575
    sError("vgId:%d, failed to vote for term since not voted", pSyncNode->vgId);
×
2576
    return;
×
2577
  }
2578

2579
  raftStoreVote(pSyncNode, pRaftId);
497,118✔
2580
}
2581

2582
// simulate get vote from outside
2583
void syncNodeVoteForSelf(SSyncNode* pSyncNode, SyncTerm currentTerm) {
497,118✔
2584
  syncNodeVoteForTerm(pSyncNode, currentTerm, &pSyncNode->myRaftId);
497,118✔
2585

2586
  SRpcMsg rpcMsg = {0};
497,118✔
2587
  int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
497,118✔
2588
  if (ret != 0) return;
497,118✔
2589

2590
  SyncRequestVoteReply* pMsg = rpcMsg.pCont;
497,118✔
2591
  pMsg->srcId = pSyncNode->myRaftId;
497,118✔
2592
  pMsg->destId = pSyncNode->myRaftId;
497,118✔
2593
  pMsg->term = currentTerm;
497,118✔
2594
  pMsg->voteGranted = true;
497,118✔
2595

2596
  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
497,118✔
2597
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
497,118✔
2598
  rpcFreeCont(rpcMsg.pCont);
497,118✔
2599
}
2600

2601
// return if has a snapshot
2602
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
5,956,416✔
2603
  bool      ret = false;
5,956,416✔
2604
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
5,956,416✔
2605
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
5,957,029✔
2606
    // TODO check return value
2607
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
5,956,881✔
2608
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
5,956,804✔
2609
      ret = true;
1,191,887✔
2610
    }
2611
  }
2612
  return ret;
5,955,649✔
2613
}
2614

2615
// return max(logLastIndex, snapshotLastIndex)
2616
// if no snapshot and log, return -1
2617
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
5,972,799✔
2618
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
5,972,799✔
2619
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
5,973,877✔
2620
    // TODO check return value
2621
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
5,973,958✔
2622
  }
2623
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
5,974,601✔
2624

2625
  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
5,974,586✔
2626
  return lastIndex;
5,974,586✔
2627
}
2628

2629
// return the last term of snapshot and log
2630
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
2631
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
5,956,416✔
2632
  SyncTerm lastTerm = 0;
5,956,416✔
2633
  if (syncNodeHasSnapshot(pSyncNode)) {
5,956,416✔
2634
    // has snapshot
2635
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1,191,887✔
2636
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
1,191,887✔
2637
      // TODO check return value
2638
      (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
1,191,887✔
2639
    }
2640

2641
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
1,191,887✔
2642
    if (logLastIndex > snapshot.lastApplyIndex) {
1,191,381✔
2643
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
837,647✔
2644
    } else {
2645
      lastTerm = snapshot.lastApplyTerm;
353,734✔
2646
    }
2647

2648
  } else {
2649
    // no snapshot
2650
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
4,764,917✔
2651
  }
2652

2653
  return lastTerm;
5,953,013✔
2654
}
2655

2656
// get last index and term along with snapshot
2657
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
5,096,396✔
2658
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
5,096,396✔
2659
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
5,098,164✔
2660
  return 0;
5,094,044✔
2661
}
2662

2663
#ifdef BUILD_NO_CALL
2664
// return append-entries first try index
2665
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
2666
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
2667
  return syncStartIndex;
2668
}
2669

2670
// if index > 0, return index - 1
2671
// else, return -1
2672
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
2673
  SyncIndex preIndex = index - 1;
2674
  if (preIndex < SYNC_INDEX_INVALID) {
2675
    preIndex = SYNC_INDEX_INVALID;
2676
  }
2677

2678
  return preIndex;
2679
}
2680

2681
// if index < 0, return SYNC_TERM_INVALID
2682
// if index == 0, return 0
2683
// if index > 0, return preTerm
2684
// if error, return SYNC_TERM_INVALID
2685
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
2686
  if (index < SYNC_INDEX_BEGIN) {
2687
    return SYNC_TERM_INVALID;
2688
  }
2689

2690
  if (index == SYNC_INDEX_BEGIN) {
2691
    return 0;
2692
  }
2693

2694
  SyncTerm  preTerm = 0;
2695
  SyncIndex preIndex = index - 1;
2696

2697
  SSyncRaftEntry* pPreEntry = NULL;
2698
  SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
2699
  LRUHandle*      h = taosLRUCacheLookup(pCache, &preIndex, sizeof(preIndex));
2700
  int32_t         code = 0;
2701
  if (h) {
2702
    pPreEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
2703
    code = 0;
2704

2705
    pSyncNode->pLogStore->cacheHit++;
2706
    sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", preIndex, pPreEntry->bytes, pPreEntry);
2707

2708
  } else {
2709
    pSyncNode->pLogStore->cacheMiss++;
2710
    sNTrace(pSyncNode, "miss cache index:%" PRId64, preIndex);
2711

2712
    code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
2713
  }
2714

2715
  SSnapshot snapshot = {.data = NULL,
2716
                        .lastApplyIndex = SYNC_INDEX_INVALID,
2717
                        .lastApplyTerm = SYNC_TERM_INVALID,
2718
                        .lastConfigIndex = SYNC_INDEX_INVALID};
2719

2720
  if (code == 0) {
2721
    if (pPreEntry == NULL) return -1;
2722
    preTerm = pPreEntry->term;
2723

2724
    if (h) {
2725
      taosLRUCacheRelease(pCache, h, false);
2726
    } else {
2727
      syncEntryDestroy(pPreEntry);
2728
    }
2729

2730
    return preTerm;
2731
  } else {
2732
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
2733
      // TODO check return value
2734
      (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
2735
      if (snapshot.lastApplyIndex == preIndex) {
2736
        return snapshot.lastApplyTerm;
2737
      }
2738
    }
2739
  }
2740

2741
  sNError(pSyncNode, "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRId64,
2742
          index, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
2743
  return SYNC_TERM_INVALID;
2744
}
2745

2746
// get pre index and term of "index"
2747
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
2748
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
2749
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
2750
  return 0;
2751
}
2752
#endif
2753

2754
static void syncNodeEqPingTimer(void* param, void* tmrId) {
29,152,490✔
2755
  if (!syncIsInit()) return;
29,152,490✔
2756

2757
  int64_t    rid = (int64_t)param;
29,152,386✔
2758
  SSyncNode* pNode = syncNodeAcquire(rid);
29,152,386✔
2759

2760
  if (pNode == NULL) return;
29,152,490✔
2761

2762
  if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
29,152,490✔
2763
    SRpcMsg rpcMsg = {0};
29,152,386✔
2764
    int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
29,152,386✔
2765
                                    pNode->pingTimerMS, pNode);
2766
    if (code != 0) {
29,151,859✔
2767
      sError("failed to build ping msg");
×
2768
      rpcFreeCont(rpcMsg.pCont);
×
2769
      goto _out;
×
2770
    }
2771

2772
    // sTrace("enqueue ping msg");
2773
    code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
29,151,859✔
2774
    if (code != 0) {
29,152,490✔
2775
      sError("failed to sync enqueue ping msg since %s", terrstr());
1,229✔
2776
      rpcFreeCont(rpcMsg.pCont);
1,229✔
2777
      goto _out;
1,229✔
2778
    }
2779

2780
  _out:
29,152,490✔
2781
    if (taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
29,152,490✔
2782
                     &pNode->pPingTimer))
2783
      sError("failed to reset ping timer");
×
2784
  }
2785
  syncNodeRelease(pNode);
29,152,490✔
2786
}
2787

2788
static void syncNodeEqElectTimer(void* param, void* tmrId) {
503,472✔
2789
  if (!syncIsInit()) return;
503,472✔
2790

2791
  int64_t    rid = (int64_t)param;
503,472✔
2792
  SSyncNode* pNode = syncNodeAcquire(rid);
503,472✔
2793

2794
  if (pNode == NULL) return;
503,472✔
2795

2796
  if (pNode->syncEqMsg == NULL) {
500,534✔
2797
    syncNodeRelease(pNode);
×
2798
    return;
×
2799
  }
2800

2801
  int64_t tsNow = taosGetTimestampMs();
500,534✔
2802
  if (tsNow < pNode->electTimerParam.executeTime) {
500,534✔
2803
    syncNodeRelease(pNode);
1,835✔
2804
    return;
1,835✔
2805
  }
2806

2807
  SRpcMsg rpcMsg = {0};
498,699✔
2808
  int32_t code =
2809
      syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerParam.logicClock, pNode->electTimerMS, pNode);
498,699✔
2810

2811
  if (code != 0) {
498,699✔
2812
    sError("failed to build elect msg");
×
2813
    syncNodeRelease(pNode);
×
2814
    return;
×
2815
  }
2816

2817
  SyncTimeout* pTimeout = rpcMsg.pCont;
498,699✔
2818
  sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);
498,699✔
2819

2820
  code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
498,699✔
2821
  if (code != 0) {
498,699✔
2822
    sError("failed to sync enqueue elect msg since %s", terrstr());
×
2823
    rpcFreeCont(rpcMsg.pCont);
×
2824
    syncNodeRelease(pNode);
×
2825
    return;
×
2826
  }
2827

2828
  syncNodeRelease(pNode);
498,699✔
2829
}
2830

2831
#ifdef BUILD_NO_CALL
2832
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
2833
  if (!syncIsInit()) return;
2834

2835
  int64_t    rid = (int64_t)param;
2836
  SSyncNode* pNode = syncNodeAcquire(rid);
2837

2838
  if (pNode == NULL) return;
2839

2840
  if (pNode->totalReplicaNum > 1) {
2841
    if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
2842
      SRpcMsg rpcMsg = {0};
2843
      int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
2844
                                      pNode->heartbeatTimerMS, pNode);
2845

2846
      if (code != 0) {
2847
        sError("failed to build heartbeat msg");
2848
        goto _out;
2849
      }
2850

2851
      sTrace("vgId:%d, enqueue heartbeat timer", pNode->vgId);
2852
      code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
2853
      if (code != 0) {
2854
        sError("failed to enqueue heartbeat msg since %s", terrstr());
2855
        rpcFreeCont(rpcMsg.pCont);
2856
        goto _out;
2857
      }
2858

2859
    _out:
2860
      if (taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
2861
                       &pNode->pHeartbeatTimer) != 0)
2862
        return;
2863

2864
    } else {
2865
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
2866
             pNode->heartbeatTimerLogicClock, pNode->heartbeatTimerLogicClockUser);
2867
    }
2868
  }
2869
}
2870
#endif
2871

2872
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
23,261,721✔
2873
  if (tsSyncLogHeartbeat) {
23,261,721✔
2874
    sInfo("heartbeat timer start");
×
2875
  }
2876
  int32_t code = 0;
23,261,721✔
2877
  int64_t hbDataRid = (int64_t)param;
23,261,721✔
2878
  int64_t tsNow = taosGetTimestampMs();
23,261,721✔
2879

2880
  SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid);
23,261,721✔
2881
  if (pData == NULL) {
23,261,721✔
2882
    sError("hb timer get pData NULL, %" PRId64, hbDataRid);
×
2883
    return;
×
2884
  }
2885

2886
  SSyncNode* pSyncNode = syncNodeAcquire(pData->syncNodeRid);
23,261,721✔
2887
  if (pSyncNode == NULL) {
23,261,721✔
2888
    syncHbTimerDataRelease(pData);
131✔
2889
    sError("hb timer get pSyncNode NULL");
131✔
2890
    return;
131✔
2891
  }
2892

2893
  SSyncTimer* pSyncTimer = pData->pTimer;
23,261,590✔
2894

2895
  if (!pSyncNode->isStart) {
23,261,590✔
2896
    syncNodeRelease(pSyncNode);
×
2897
    syncHbTimerDataRelease(pData);
×
2898
    sError("vgId:%d, hb timer sync node already stop", pSyncNode->vgId);
×
2899
    return;
×
2900
  }
2901

2902
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
23,261,590✔
2903
    syncNodeRelease(pSyncNode);
×
2904
    syncHbTimerDataRelease(pData);
×
2905
    sError("vgId:%d, hb timer sync node not leader", pSyncNode->vgId);
×
2906
    return;
×
2907
  }
2908

2909
  if (tsSyncLogHeartbeat) {
23,261,590✔
2910
    sInfo("vgId:%d, peer hb timer execution, rid:%" PRId64 " addr:0x%" PRIx64, pSyncNode->vgId, hbDataRid,
×
2911
          pData->destId.addr);
2912
  } else {
2913
    sTrace("vgId:%d, peer hb timer execution, rid:%" PRId64 " addr:0x%" PRIx64, pSyncNode->vgId, hbDataRid,
23,261,590✔
2914
           pData->destId.addr);
2915
  }
2916

2917
  if (pSyncNode->totalReplicaNum > 1) {
23,261,590✔
2918
    int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
23,261,289✔
2919
    int64_t msgLogicClock = atomic_load_64(&pData->logicClock);
23,261,289✔
2920

2921
    if (timerLogicClock == msgLogicClock) {
23,261,289✔
2922
      if (tsNow > pData->execTime) {
23,260,896✔
2923
        pData->execTime += pSyncTimer->timerMS;
23,256,828✔
2924

2925
        SRpcMsg rpcMsg = {0};
23,256,828✔
2926
        if ((code = syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId)) != 0) {
23,256,828✔
2927
          sError("vgId:%d, failed to build heartbeat msg since %s", pSyncNode->vgId, tstrerror(code));
×
2928
          syncNodeRelease(pSyncNode);
×
2929
          syncHbTimerDataRelease(pData);
×
2930
          return;
×
2931
        }
2932

2933
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);
23,256,828✔
2934

2935
        SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
23,256,828✔
2936
        pSyncMsg->srcId = pSyncNode->myRaftId;
23,256,828✔
2937
        pSyncMsg->destId = pData->destId;
23,256,828✔
2938
        pSyncMsg->term = raftStoreGetTerm(pSyncNode);
23,256,828✔
2939
        pSyncMsg->commitIndex = pSyncNode->commitIndex;
23,256,828✔
2940
        pSyncMsg->minMatchIndex = pSyncNode->minMatchIndex;
23,256,828✔
2941
        pSyncMsg->privateTerm = 0;
23,256,828✔
2942
        pSyncMsg->timeStamp = tsNow;
23,256,828✔
2943

2944
        // update reset time
2945
        int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
23,256,828✔
2946
        pSyncTimer->timeStamp = tsNow;
23,256,828✔
2947

2948
        // send msg
2949
        TRACE_SET_MSGID(&(rpcMsg.info.traceId), tGenIdPI64());
23,256,828✔
2950
        TRACE_SET_ROOTID(&(rpcMsg.info.traceId), tGenIdPI64());
23,256,828✔
2951
        syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime, &(rpcMsg.info.traceId));
23,256,828✔
2952
        int ret = syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
23,256,828✔
2953
        if (ret != 0) {
23,256,828✔
2954
          sError("vgId:%d, failed to send heartbeat since %s", pSyncNode->vgId, tstrerror(ret));
21,054✔
2955
        }
2956
      }
2957

2958
      if (syncIsInit()) {
23,260,896✔
2959
        if (tsSyncLogHeartbeat) {
23,260,896✔
2960
          sInfo("vgId:%d, reset peer hb timer at %d", pSyncNode->vgId, pSyncTimer->timerMS);
×
2961
        } else {
2962
          sTrace("vgId:%d, reset peer hb timer at %d", pSyncNode->vgId, pSyncTimer->timerMS);
23,260,896✔
2963
        }
2964
        bool stopped = taosTmrResetPriority(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, (void*)hbDataRid,
23,260,896✔
2965
                                            syncEnv()->pTimerManager, &pSyncTimer->pTimer, 2);
23,260,896✔
2966
        if (stopped) sError("vgId:%d, reset peer hb timer error, %s", pSyncNode->vgId, tstrerror(code));
23,260,896✔
2967

2968
      } else {
2969
        sError("sync env is stop, reset peer hb timer error");
×
2970
      }
2971

2972
    } else {
2973
      sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64, pSyncNode->vgId,
393✔
2974
             timerLogicClock, msgLogicClock);
2975
    }
2976

2977
    if (tsSyncLogHeartbeat) {
23,261,289✔
2978
      sInfo("vgId:%d, finish send sync-heartbeat", pSyncNode->vgId);
×
2979
    }
2980
  }
2981

2982
  syncHbTimerDataRelease(pData);
23,261,590✔
2983
  syncNodeRelease(pSyncNode);
23,261,590✔
2984
  if (tsSyncLogHeartbeat) {
23,261,590✔
2985
    sInfo("heartbeat timer stop");
×
2986
  }
2987
}
2988

2989
#ifdef BUILD_NO_CALL
2990
static void deleteCacheEntry(const void* key, size_t keyLen, void* value, void* ud) {
2991
  (void)ud;
2992
  taosMemoryFree(value);
2993
}
2994

2995
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
2996
  SSyncLogStoreData* pData = pLogStore->data;
2997
  sNTrace(pData->pSyncNode, "in cache index:%" PRId64 ", bytes:%u, %p", pEntry->index, pEntry->bytes, pEntry);
2998

2999
  int32_t   code = 0;
3000
  int32_t   entryLen = sizeof(*pEntry) + pEntry->dataLen;
3001
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
3002
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW, NULL);
3003
  if (status != TAOS_LRU_STATUS_OK) {
3004
    code = -1;
3005
  }
3006

3007
  return code;
3008
}
3009
#endif
3010

3011
void syncBuildConfigFromReq(SAlterVnodeReplicaReq* pReq, SSyncCfg* cfg) {  // TODO SAlterVnodeReplicaReq name is proper?
×
3012
  cfg->replicaNum = 0;
×
3013
  cfg->totalReplicaNum = 0;
×
3014
  int32_t code = 0;
×
3015

3016
  for (int i = 0; i < pReq->replica; ++i) {
×
3017
    SNodeInfo* pNode = &cfg->nodeInfo[i];
×
3018
    pNode->nodeId = pReq->replicas[i].id;
×
3019
    pNode->nodePort = pReq->replicas[i].port;
×
3020
    tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
×
3021
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3022
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
×
3023
    sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d, update:%d", pReq->vgId, i, pNode->nodeFqdn,
×
3024
          pNode->nodePort, pNode->nodeId, pNode->nodeRole, update);
3025
    cfg->replicaNum++;
×
3026
  }
3027
  if (pReq->selfIndex != -1) {
×
3028
    cfg->myIndex = pReq->selfIndex;
×
3029
  }
3030
  for (int i = cfg->replicaNum; i < pReq->replica + pReq->learnerReplica; ++i) {
×
3031
    SNodeInfo* pNode = &cfg->nodeInfo[i];
×
3032
    pNode->nodeId = pReq->learnerReplicas[cfg->totalReplicaNum].id;
×
3033
    pNode->nodePort = pReq->learnerReplicas[cfg->totalReplicaNum].port;
×
3034
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3035
    tstrncpy(pNode->nodeFqdn, pReq->learnerReplicas[cfg->totalReplicaNum].fqdn, sizeof(pNode->nodeFqdn));
×
3036
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
×
3037
    sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d, update:%d", pReq->vgId, i, pNode->nodeFqdn,
×
3038
          pNode->nodePort, pNode->nodeId, pNode->nodeRole, update);
3039
    cfg->totalReplicaNum++;
×
3040
  }
3041
  cfg->totalReplicaNum += pReq->replica;
×
3042
  if (pReq->learnerSelfIndex != -1) {
×
3043
    cfg->myIndex = pReq->replica + pReq->learnerSelfIndex;
×
3044
  }
3045
  cfg->changeVersion = pReq->changeVersion;
×
3046
}
×
3047

3048
int32_t syncNodeCheckChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry) {
×
3049
  int32_t code = 0;
×
3050
  if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) {
×
3051
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3052
  }
3053

3054
  SMsgHead* head = (SMsgHead*)pEntry->data;
×
3055
  void*     pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
×
3056

3057
  SAlterVnodeTypeReq req = {0};
×
3058
  if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
×
3059
    code = TSDB_CODE_INVALID_MSG;
×
3060
    TAOS_RETURN(code);
×
3061
  }
3062

3063
  SSyncCfg cfg = {0};
×
3064
  syncBuildConfigFromReq(&req, &cfg);
×
3065

3066
  if (cfg.totalReplicaNum >= 1 &&
×
3067
      (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
×
3068
    bool incfg = false;
×
3069
    for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3070
      if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3071
          ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3072
        incfg = true;
×
3073
        break;
×
3074
      }
3075
    }
3076

3077
    if (!incfg) {
×
3078
      SyncTerm currentTerm = raftStoreGetTerm(ths);
×
3079
      SRaftId  id = EMPTY_RAFT_ID;
×
3080
      syncNodeStepDown(ths, currentTerm, id, "changeConfig");
×
3081
      return 1;
×
3082
    }
3083
  }
3084
  return 0;
×
3085
}
3086

3087
void syncNodeLogConfigInfo(SSyncNode* ths, SSyncCfg* cfg, char* str) {
×
3088
  sInfo("vgId:%d, %s. SyncNode, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64
×
3089
        ", changeVersion:%d, "
3090
        "restoreFinish:%d",
3091
        ths->vgId, str, ths->replicaNum, ths->peersNum, ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion,
3092
        ths->restoreFinish);
3093

3094
  sInfo("vgId:%d, %s, myNodeInfo, clusterId:0x%" PRIx64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str,
×
3095
        ths->myNodeInfo.clusterId, ths->myNodeInfo.nodeId, ths->myNodeInfo.nodeFqdn, ths->myNodeInfo.nodePort,
3096
        ths->myNodeInfo.nodeRole);
3097

3098
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3099
    sInfo("vgId:%d, %s, peersNodeInfo%d, clusterId:0x%" PRIx64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str,
×
3100
          i, ths->peersNodeInfo[i].clusterId, ths->peersNodeInfo[i].nodeId, ths->peersNodeInfo[i].nodeFqdn,
3101
          ths->peersNodeInfo[i].nodePort, ths->peersNodeInfo[i].nodeRole);
3102
  }
3103

3104
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3105
    char    buf[256];
×
3106
    int32_t len = 256;
×
3107
    int32_t n = 0;
×
3108
    n += tsnprintf(buf + n, len - n, "%s", "{");
×
3109
    for (int i = 0; i < ths->peersEpset->numOfEps; i++) {
×
3110
      n += tsnprintf(buf + n, len - n, "%s:%d%s", ths->peersEpset->eps[i].fqdn, ths->peersEpset->eps[i].port,
×
3111
                     (i + 1 < ths->peersEpset->numOfEps ? ", " : ""));
×
3112
    }
3113
    n += tsnprintf(buf + n, len - n, "%s", "}");
×
3114

3115
    sInfo("vgId:%d, %s, peersEpset%d, %s, inUse:%d", ths->vgId, str, i, buf, ths->peersEpset->inUse);
×
3116
  }
3117

3118
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3119
    sInfo("vgId:%d, %s, peersId%d, addr:0x%" PRIx64, ths->vgId, str, i, ths->peersId[i].addr);
×
3120
  }
3121

3122
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3123
    sInfo("vgId:%d, %s, nodeInfo%d, clusterId:0x%" PRIx64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str, i,
×
3124
          ths->raftCfg.cfg.nodeInfo[i].clusterId, ths->raftCfg.cfg.nodeInfo[i].nodeId,
3125
          ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, ths->raftCfg.cfg.nodeInfo[i].nodePort,
3126
          ths->raftCfg.cfg.nodeInfo[i].nodeRole);
3127
  }
3128

3129
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3130
    sInfo("vgId:%d, %s, replicasId%d, addr:0x%" PRIx64, ths->vgId, str, i, ths->replicasId[i].addr);
×
3131
  }
3132
}
×
3133

3134
int32_t syncNodeRebuildPeerAndCfg(SSyncNode* ths, SSyncCfg* cfg) {
×
3135
  int32_t i = 0;
×
3136

3137
  // change peersNodeInfo
3138
  i = 0;
×
3139
  for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3140
    if (!(strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3141
          ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)) {
×
3142
      ths->peersNodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
×
3143
      ths->peersNodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
×
3144
      tstrncpy(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
×
3145
      ths->peersNodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
×
3146
      ths->peersNodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
×
3147

3148
      syncUtilNodeInfo2EpSet(&ths->peersNodeInfo[i], &ths->peersEpset[i]);
×
3149

3150
      if (syncUtilNodeInfo2RaftId(&ths->peersNodeInfo[i], ths->vgId, &ths->peersId[i]) == false) {
×
3151
        sError("vgId:%d, failed to determine raft member id, peer:%d", ths->vgId, i);
×
3152
        return terrno;
×
3153
      }
3154

3155
      i++;
×
3156
    }
3157
  }
3158
  ths->peersNum = i;
×
3159

3160
  // change cfg nodeInfo
3161
  ths->raftCfg.cfg.replicaNum = 0;
×
3162
  i = 0;
×
3163
  for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3164
    if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3165
      ths->raftCfg.cfg.replicaNum++;
×
3166
    }
3167
    ths->raftCfg.cfg.nodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
×
3168
    ths->raftCfg.cfg.nodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
×
3169
    tstrncpy(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
×
3170
    ths->raftCfg.cfg.nodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
×
3171
    ths->raftCfg.cfg.nodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
×
3172
    if ((strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3173
         ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)) {
×
3174
      ths->raftCfg.cfg.myIndex = i;
×
3175
    }
3176
    i++;
×
3177
  }
3178
  ths->raftCfg.cfg.totalReplicaNum = i;
×
3179

3180
  return 0;
×
3181
}
3182

3183
void syncNodeChangePeerAndCfgToVoter(SSyncNode* ths, SSyncCfg* cfg) {
×
3184
  // change peersNodeInfo
3185
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3186
    for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3187
      if (strcmp(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3188
          ths->peersNodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort) {
×
3189
        if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3190
          ths->peersNodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3191
        }
3192
      }
3193
    }
3194
  }
3195

3196
  // change cfg nodeInfo
3197
  ths->raftCfg.cfg.replicaNum = 0;
×
3198
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3199
    for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3200
      if (strcmp(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3201
          ths->raftCfg.cfg.nodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort) {
×
3202
        if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3203
          ths->raftCfg.cfg.nodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3204
          ths->raftCfg.cfg.replicaNum++;
×
3205
        }
3206
      }
3207
    }
3208
  }
3209
}
×
3210

3211
int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum) {
×
3212
  int32_t code = 0;
×
3213
  // 1.rebuild replicasId, remove deleted one
3214
  SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
×
3215
  memcpy(oldReplicasId, ths->replicasId, sizeof(oldReplicasId));
×
3216

3217
  ths->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3218
  ths->totalReplicaNum = ths->raftCfg.cfg.totalReplicaNum;
×
3219
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3220
    if (syncUtilNodeInfo2RaftId(&ths->raftCfg.cfg.nodeInfo[i], ths->vgId, &ths->replicasId[i]) == false) return terrno;
×
3221
  }
3222

3223
  // 2.rebuild MatchIndex, remove deleted one
3224
  SSyncIndexMgr* oldIndex = ths->pMatchIndex;
×
3225

3226
  ths->pMatchIndex = syncIndexMgrCreate(ths);
×
3227
  if (ths->pMatchIndex == NULL) {
×
3228
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3229
    if (terrno != 0) code = terrno;
×
3230
    TAOS_RETURN(code);
×
3231
  }
3232

3233
  syncIndexMgrCopyIfExist(ths->pMatchIndex, oldIndex, oldReplicasId);
×
3234

3235
  syncIndexMgrDestroy(oldIndex);
×
3236

3237
  // 3.rebuild NextIndex, remove deleted one
3238
  SSyncIndexMgr* oldNextIndex = ths->pNextIndex;
×
3239

3240
  ths->pNextIndex = syncIndexMgrCreate(ths);
×
3241
  if (ths->pNextIndex == NULL) {
×
3242
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3243
    if (terrno != 0) code = terrno;
×
3244
    TAOS_RETURN(code);
×
3245
  }
3246

3247
  syncIndexMgrCopyIfExist(ths->pNextIndex, oldNextIndex, oldReplicasId);
×
3248

3249
  syncIndexMgrDestroy(oldNextIndex);
×
3250

3251
  // 4.rebuild pVotesGranted, pVotesRespond, no need to keep old vote state, only rebuild
3252
  voteGrantedUpdate(ths->pVotesGranted, ths);
×
3253
  votesRespondUpdate(ths->pVotesRespond, ths);
×
3254

3255
  // 5.rebuild logReplMgr
3256
  for (int i = 0; i < oldtotalReplicaNum; ++i) {
×
3257
    sDebug("vgId:%d, old logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId,
×
3258
           i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
3259
           ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
3260
  }
3261

3262
  SSyncLogReplMgr* oldLogReplMgrs = NULL;
×
3263
  int64_t          length = sizeof(SSyncLogReplMgr) * (TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA);
×
3264
  oldLogReplMgrs = taosMemoryMalloc(length);
×
3265
  if (NULL == oldLogReplMgrs) return terrno;
×
3266
  memset(oldLogReplMgrs, 0, length);
×
3267

3268
  for (int i = 0; i < oldtotalReplicaNum; i++) {
×
3269
    oldLogReplMgrs[i] = *(ths->logReplMgrs[i]);
×
3270
  }
3271

3272
  syncNodeLogReplDestroy(ths);
×
3273
  if ((code = syncNodeLogReplInit(ths)) != 0) {
×
3274
    taosMemoryFree(oldLogReplMgrs);
×
3275
    TAOS_RETURN(code);
×
3276
  }
3277

3278
  for (int i = 0; i < ths->totalReplicaNum; ++i) {
×
3279
    for (int j = 0; j < oldtotalReplicaNum; j++) {
×
3280
      if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
×
3281
        *(ths->logReplMgrs[i]) = oldLogReplMgrs[j];
×
3282
        ths->logReplMgrs[i]->peerId = i;
×
3283
      }
3284
    }
3285
  }
3286

3287
  for (int i = 0; i < ths->totalReplicaNum; ++i) {
×
3288
    sDebug("vgId:%d, new logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId,
×
3289
           i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
3290
           ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
3291
  }
3292

3293
  // 6.rebuild sender
3294
  for (int i = 0; i < oldtotalReplicaNum; ++i) {
×
3295
    sDebug("vgId:%d, old sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i,
×
3296
           ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
3297
  }
3298

3299
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3300
    if (ths->senders[i] != NULL) {
×
3301
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", ths->vgId, ths->senders[i]);
×
3302

3303
      if (snapshotSenderIsStart(ths->senders[i])) {
×
3304
        snapshotSenderStop(ths->senders[i], false);
×
3305
      }
3306

3307
      snapshotSenderDestroy(ths->senders[i]);
×
3308
      ths->senders[i] = NULL;
×
3309
    }
3310
  }
3311

3312
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3313
    SSyncSnapshotSender* pSender = NULL;
×
3314
    int32_t              code = snapshotSenderCreate(ths, i, &pSender);
×
3315
    if (pSender == NULL) return terrno = code;
×
3316

3317
    ths->senders[i] = pSender;
×
3318
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
×
3319
  }
3320

3321
  for (int i = 0; i < ths->totalReplicaNum; i++) {
×
3322
    sDebug("vgId:%d, new sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i,
×
3323
           ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
3324
  }
3325

3326
  // 7.rebuild synctimer
3327
  if ((code = syncNodeStopHeartbeatTimer(ths)) != 0) {
×
3328
    taosMemoryFree(oldLogReplMgrs);
×
3329
    TAOS_RETURN(code);
×
3330
  }
3331

3332
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3333
    if ((code = syncHbTimerInit(ths, &ths->peerHeartbeatTimerArr[i], ths->replicasId[i])) != 0) {
×
3334
      taosMemoryFree(oldLogReplMgrs);
×
3335
      TAOS_RETURN(code);
×
3336
    }
3337
  }
3338

3339
  if ((code = syncNodeStartHeartbeatTimer(ths)) != 0) {
×
3340
    taosMemoryFree(oldLogReplMgrs);
×
3341
    TAOS_RETURN(code);
×
3342
  }
3343

3344
  // 8.rebuild peerStates
3345
  SPeerState oldState[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0};
×
3346
  for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) {
×
3347
    oldState[i] = ths->peerStates[i];
×
3348
  }
3349

3350
  for (int i = 0; i < ths->totalReplicaNum; i++) {
×
3351
    for (int j = 0; j < oldtotalReplicaNum; j++) {
×
3352
      if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
×
3353
        ths->peerStates[i] = oldState[j];
×
3354
      }
3355
    }
3356
  }
3357

3358
  taosMemoryFree(oldLogReplMgrs);
×
3359

3360
  return 0;
×
3361
}
3362

3363
void syncNodeChangeToVoter(SSyncNode* ths) {
×
3364
  // replicasId, only need to change replicaNum when 1->3
3365
  ths->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3366
  sDebug("vgId:%d, totalReplicaNum:%d", ths->vgId, ths->totalReplicaNum);
×
3367
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
×
3368
    sDebug("vgId:%d, i:%d, replicaId.addr:0x%" PRIx64, ths->vgId, i, ths->replicasId[i].addr);
×
3369
  }
3370

3371
  // pMatchIndex, pNextIndex, only need to change replicaNum when 1->3
3372
  ths->pMatchIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3373
  ths->pNextIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3374

3375
  sDebug("vgId:%d, pMatchIndex->totalReplicaNum:%d", ths->vgId, ths->pMatchIndex->totalReplicaNum);
×
3376
  for (int32_t i = 0; i < ths->pMatchIndex->totalReplicaNum; ++i) {
×
3377
    sDebug("vgId:%d, i:%d, match.index:%" PRId64, ths->vgId, i, ths->pMatchIndex->index[i]);
×
3378
  }
3379

3380
  // pVotesGranted, pVotesRespond
3381
  voteGrantedUpdate(ths->pVotesGranted, ths);
×
3382
  votesRespondUpdate(ths->pVotesRespond, ths);
×
3383

3384
  // logRepMgrs
3385
  // no need to change logRepMgrs when 1->3
3386
}
×
3387

3388
void syncNodeResetPeerAndCfg(SSyncNode* ths) {
×
3389
  SNodeInfo node = {0};
×
3390
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3391
    memcpy(&ths->peersNodeInfo[i], &node, sizeof(SNodeInfo));
×
3392
  }
3393

3394
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3395
    memcpy(&ths->raftCfg.cfg.nodeInfo[i], &node, sizeof(SNodeInfo));
×
3396
  }
3397
}
×
3398

3399
int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str) {
×
3400
  int32_t code = 0;
×
3401
  if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) {
×
3402
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3403
  }
3404

3405
  SMsgHead* head = (SMsgHead*)pEntry->data;
×
3406
  void*     pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
×
3407

3408
  SAlterVnodeTypeReq req = {0};
×
3409
  if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
×
3410
    code = TSDB_CODE_INVALID_MSG;
×
3411
    TAOS_RETURN(code);
×
3412
  }
3413

3414
  SSyncCfg cfg = {0};
×
3415
  syncBuildConfigFromReq(&req, &cfg);
×
3416

3417
  if (cfg.changeVersion <= ths->raftCfg.cfg.changeVersion) {
×
3418
    sInfo(
×
3419
        "vgId:%d, skip conf change entry since lower version. "
3420
        "this entry, index:%" PRId64 ", term:%" PRId64
3421
        ", totalReplicaNum:%d, changeVersion:%d; "
3422
        "current node, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64 ", changeVersion:%d",
3423
        ths->vgId, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum,
3424
        ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion);
3425
    return 0;
×
3426
  }
3427

3428
  if (strcmp(str, "Commit") == 0) {
×
3429
    sInfo(
×
3430
        "vgId:%d, change config from %s. "
3431
        "this, i:%" PRId64
3432
        ", trNum:%d, vers:%d; "
3433
        "node, rNum:%d, pNum:%d, trNum:%d, "
3434
        "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
3435
        "), "
3436
        "cond:(next i:%" PRId64 ", t:%" PRId64 " ==%s)",
3437
        ths->vgId, str, pEntry->index - 1, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum,
3438
        ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, ths->pLogBuf->matchIndex,
3439
        ths->pLogBuf->endIndex, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType));
3440
  } else {
3441
    sInfo(
×
3442
        "vgId:%d, change config from %s. "
3443
        "this, i:%" PRId64 ", t:%" PRId64
3444
        ", trNum:%d, vers:%d; "
3445
        "node, rNum:%d, pNum:%d, trNum:%d, "
3446
        "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
3447
        "), "
3448
        "cond:(pre i:%" PRId64 "==ci:%" PRId64 ", bci:%" PRId64 ")",
3449
        ths->vgId, str, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum,
3450
        ths->peersNum, ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
3451
        ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex, pEntry->index - 1, ths->commitIndex,
3452
        ths->pLogBuf->commitIndex);
3453
  }
3454

3455
  syncNodeLogConfigInfo(ths, &cfg, "before config change");
×
3456

3457
  int32_t oldTotalReplicaNum = ths->totalReplicaNum;
×
3458

3459
  if (cfg.totalReplicaNum == 1 || cfg.totalReplicaNum == 2) {  // remove replica
×
3460

3461
    bool incfg = false;
×
3462
    for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3463
      if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3464
          ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3465
        incfg = true;
×
3466
        break;
×
3467
      }
3468
    }
3469

3470
    if (incfg) {  // remove other
×
3471
      syncNodeResetPeerAndCfg(ths);
×
3472

3473
      // no need to change myNodeInfo
3474

3475
      if ((code = syncNodeRebuildPeerAndCfg(ths, &cfg)) != 0) {
×
3476
        TAOS_RETURN(code);
×
3477
      };
3478

3479
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3480
        TAOS_RETURN(code);
×
3481
      };
3482
    } else {  // remove myself
3483
      // no need to do anything actually, to change the following to reduce distruptive server chance
3484

3485
      syncNodeResetPeerAndCfg(ths);
×
3486

3487
      // change myNodeInfo
3488
      ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3489

3490
      // change peer and cfg
3491
      ths->peersNum = 0;
×
3492
      memcpy(&ths->raftCfg.cfg.nodeInfo[0], &ths->myNodeInfo, sizeof(SNodeInfo));
×
3493
      ths->raftCfg.cfg.replicaNum = 0;
×
3494
      ths->raftCfg.cfg.totalReplicaNum = 1;
×
3495

3496
      // change other
3497
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3498
        TAOS_RETURN(code);
×
3499
      }
3500

3501
      // change state
3502
      ths->state = TAOS_SYNC_STATE_LEARNER;
×
3503
    }
3504

3505
    ths->restoreFinish = false;
×
3506
  } else {                            // add replica, or change replica type
3507
    if (ths->totalReplicaNum == 3) {  // change replica type
×
3508
      sInfo("vgId:%d, begin change replica type", ths->vgId);
×
3509

3510
      // change myNodeInfo
3511
      for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3512
        if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3513
            ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3514
          if (cfg.nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3515
            ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3516
          }
3517
        }
3518
      }
3519

3520
      // change peer and cfg
3521
      syncNodeChangePeerAndCfgToVoter(ths, &cfg);
×
3522

3523
      // change other
3524
      syncNodeChangeToVoter(ths);
×
3525

3526
      // change state
3527
      if (ths->state == TAOS_SYNC_STATE_LEARNER) {
×
3528
        if (ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3529
          ths->state = TAOS_SYNC_STATE_FOLLOWER;
×
3530
        }
3531
      }
3532

3533
      ths->restoreFinish = false;
×
3534
    } else {  // add replica
3535
      sInfo("vgId:%d, begin add replica", ths->vgId);
×
3536

3537
      // no need to change myNodeInfo
3538

3539
      // change peer and cfg
3540
      if ((code = syncNodeRebuildPeerAndCfg(ths, &cfg)) != 0) {
×
3541
        TAOS_RETURN(code);
×
3542
      };
3543

3544
      // change other
3545
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3546
        TAOS_RETURN(code);
×
3547
      };
3548

3549
      // no need to change state
3550

3551
      if (ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) {
×
3552
        ths->restoreFinish = false;
×
3553
      }
3554
    }
3555
  }
3556

3557
  ths->quorum = syncUtilQuorum(ths->replicaNum);
×
3558

3559
  ths->raftCfg.lastConfigIndex = pEntry->index;
×
3560
  ths->raftCfg.cfg.lastIndex = pEntry->index;
×
3561
  ths->raftCfg.cfg.changeVersion = cfg.changeVersion;
×
3562

3563
  syncNodeLogConfigInfo(ths, &cfg, "after config change");
×
3564

3565
  if ((code = syncWriteCfgFile(ths, "apply_config_change_entry")) != 0) {
×
3566
    sError("vgId:%d, failed to create sync cfg file", ths->vgId);
×
3567
    TAOS_RETURN(code);
×
3568
  };
3569

3570
  TAOS_RETURN(code);
×
3571
}
3572

3573
int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry, SRpcMsg* pMsg) {
657,642,677✔
3574
  int32_t code = -1;
657,642,677✔
3575
  if (pEntry->dataLen < sizeof(SMsgHead)) {
657,642,677✔
3576
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
3577
    sError("vgId:%d, msg:%p, cannot append an invalid client request with no msg head, type:%s dataLen:%d", ths->vgId,
×
3578
           pMsg, TMSG_INFO(pEntry->originalRpcType), pEntry->dataLen);
3579
    syncEntryDestroy(pEntry);
×
3580
    pEntry = NULL;
×
3581
    goto _out;
×
3582
  }
3583

3584
  // append to log buffer
3585
  if ((code = syncLogBufferAppend(ths->pLogBuf, ths, pEntry)) < 0) {
657,617,864✔
3586
    sError("vgId:%d, index:%" PRId64 ", failed to enqueue sync log buffer", ths->vgId, pEntry->index);
×
3587
    int32_t ret = 0;
×
3588
    if ((ret = syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno, false)) != 0) {
×
3589
      sError("vgId:%d, index:%" PRId64 ", failed to execute fsm since %s", ths->vgId, pEntry->index, tstrerror(ret));
×
3590
    }
3591
    syncEntryDestroy(pEntry);
×
3592
    pEntry = NULL;
×
3593
    goto _out;
×
3594
  }
3595

3596
  code = 0;
657,636,744✔
3597
_out:;
657,636,744✔
3598
  // proceed match index, with replicating on needed
3599
  SyncIndex       matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL, "Append", pMsg);
657,636,744✔
3600
  const STraceId* trace = pEntry ? &pEntry->originRpcTraceId : NULL;
657,640,659✔
3601

3602
  if (pEntry != NULL) {
657,648,055✔
3603
    sGDebug(trace,
657,645,683✔
3604
            "vgId:%d, index:%" PRId64 ", raft entry appended, msg:%p term:%" PRId64 " buf:[%" PRId64 " %" PRId64
3605
            " %" PRId64 ", %" PRId64 ")",
3606
            ths->vgId, pEntry->index, pMsg, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
3607
            ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
3608
  }
3609

3610
  if (code == 0 && ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
657,626,245✔
3611
    int64_t index = syncNodeUpdateAssignedCommitIndex(ths, matchIndex);
32✔
3612
    sGTrace(trace, "vgId:%d, index:%" PRId64 ", update assigned commit, msg:%p", ths->vgId, index, pMsg);
32✔
3613

3614
    if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
64✔
3615
        syncLogBufferCommit(ths->pLogBuf, ths, ths->assignedCommitIndex, trace, "append-entry") < 0) {
32✔
3616
      sGError(trace, "vgId:%d, index:%" PRId64 ", failed to commit, msg:%p commit index:%" PRId64, ths->vgId, index,
×
3617
              pMsg, ths->commitIndex);
3618
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
3619
    }
3620
  }
3621

3622
  // multi replica
3623
  if (ths->replicaNum > 1) {
657,644,322✔
3624
    TAOS_RETURN(code);
41,315,128✔
3625
  }
3626

3627
  // single replica
3628
  SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, matchIndex);
616,302,160✔
3629
  sGTrace(trace, "vgId:%d, index:%" PRId64 ", raft entry update commit, msg:%p return index:%" PRId64, ths->vgId,
616,299,212✔
3630
          matchIndex, pMsg, returnIndex);
3631

3632
  if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
1,232,590,568✔
3633
      (code = syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex, trace, "append-entry")) < 0) {
616,303,753✔
3634
    sGError(trace,
×
3635
            "vgId:%d, index:%" PRId64 ", failed to commit, msg:%p commit index:%" PRId64 " return index:%" PRId64,
3636
            ths->vgId, matchIndex, pMsg, ths->commitIndex, returnIndex);
3637
  }
3638

3639
  TAOS_RETURN(code);
616,275,176✔
3640
}
3641

3642
bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
654,865,417✔
3643
  if (pSyncNode->totalReplicaNum == 1) {
654,865,417✔
3644
    return false;
612,925,006✔
3645
  }
3646

3647
  int32_t toCount = 0;
41,946,224✔
3648
  int64_t tsNow = taosGetTimestampMs();
41,946,224✔
3649
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
125,312,497✔
3650
    if (pSyncNode->peersNodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
83,366,591✔
3651
      continue;
663,445✔
3652
    }
3653
    int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
82,703,146✔
3654
    if (recvTime == 0 || recvTime == -1) {
82,702,828✔
3655
      continue;
×
3656
    }
3657

3658
    if (tsNow - recvTime > tsHeartbeatTimeout) {
82,702,828✔
3659
      toCount++;
122,063✔
3660
    }
3661
  }
3662

3663
  bool b = (toCount >= pSyncNode->quorum ? true : false);
41,945,906✔
3664

3665
  return b;
41,946,224✔
3666
}
3667

3668
bool syncNodeSnapshotSending(SSyncNode* pSyncNode) {
×
3669
  if (pSyncNode == NULL) return false;
×
3670
  bool b = false;
×
3671
  for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
×
3672
    if (pSyncNode->senders[i] != NULL && pSyncNode->senders[i]->start) {
×
3673
      b = true;
×
3674
      break;
×
3675
    }
3676
  }
3677
  return b;
×
3678
}
3679

3680
bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) {
×
3681
  if (pSyncNode == NULL) return false;
×
3682
  if (pSyncNode->pNewNodeReceiver == NULL) return false;
×
3683
  if (pSyncNode->pNewNodeReceiver->start) return true;
×
3684
  return false;
×
3685
}
3686

3687
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
3,330,029✔
3688
  int32_t   code = 0;
3,330,029✔
3689
  SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
3,330,029✔
3690
  SyncTerm  term = raftStoreGetTerm(ths);
3,330,300✔
3691

3692
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
3,330,300✔
3693
  if (pEntry == NULL) {
3,330,164✔
3694
    code = TSDB_CODE_OUT_OF_MEMORY;
×
3695
    TAOS_RETURN(code);
×
3696
  }
3697

3698
  code = syncNodeAppend(ths, pEntry, NULL);
3,330,164✔
3699
  TAOS_RETURN(code);
3,330,300✔
3700
}
3701

3702
#ifdef BUILD_NO_CALL
3703
static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
3704
  int32_t ret = 0;
3705

3706
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
3707
  SyncTerm        term = raftStoreGetTerm(ths);
3708
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
3709
  if (pEntry == NULL) return -1;
3710

3711
  LRUHandle* h = NULL;
3712

3713
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
3714
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry, false);
3715
    if (code != 0) {
3716
      sError("append noop error");
3717
      return -1;
3718
    }
3719

3720
    syncCacheEntry(ths->pLogStore, pEntry, &h);
3721
  }
3722

3723
  if (h) {
3724
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
3725
  } else {
3726
    syncEntryDestroy(pEntry);
3727
  }
3728

3729
  return ret;
3730
}
3731
#endif
3732

3733
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
22,670,218✔
3734
  SyncHeartbeat* pMsg = pRpcMsg->pCont;
22,670,218✔
3735
  bool           resetElect = false;
22,670,218✔
3736

3737
  int64_t tsMs = taosGetTimestampMs();
22,670,218✔
3738

3739
  int64_t lastRecvTime = syncIndexMgrGetRecvTime(ths->pNextIndex, &(pMsg->srcId));
22,670,218✔
3740
  syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);
22,670,218✔
3741
  syncIndexMgrIncRecvCount(ths->pNextIndex, &(pMsg->srcId));
22,670,218✔
3742

3743
  int64_t netElapsed = tsMs - pMsg->timeStamp;
22,670,218✔
3744
  int64_t timeDiff = tsMs - lastRecvTime;
22,670,218✔
3745
  syncLogRecvHeartbeat(ths, pMsg, netElapsed, &pRpcMsg->info.traceId, timeDiff, pRpcMsg);
22,670,218✔
3746

3747
  if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) {
22,670,218✔
3748
    sWarn(
×
3749
        "vgId:%d, drop heartbeat msg from dnode:%d, because it come from another cluster:%d, differ from current "
3750
        "cluster:%d",
3751
        ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), CID(&(ths->myRaftId)));
3752
    return 0;
×
3753
  }
3754

3755
  SyncTerm currentTerm = raftStoreGetTerm(ths);
22,670,135✔
3756

3757
  if (pMsg->term > currentTerm && ths->state == TAOS_SYNC_STATE_LEARNER) {
22,670,127✔
3758
    raftStoreSetTerm(ths, pMsg->term);
107,674✔
3759
    currentTerm = pMsg->term;
107,674✔
3760
  }
3761

3762
  int64_t tsMs2 = taosGetTimestampMs();
22,670,218✔
3763

3764
  int64_t processTime = tsMs2 - tsMs;
22,670,218✔
3765
  if (processTime > SYNC_HEARTBEAT_SLOW_MS) {
22,670,218✔
3766
    sGError(&pRpcMsg->info.traceId,
×
3767
            "vgId:%d, process sync-heartbeat msg from dnode:%d, commit-index:%" PRId64 ", cluster:%d msgTerm:%" PRId64
3768
            " currentTerm:%" PRId64 ", processTime:%" PRId64,
3769
            ths->vgId, DID(&(pMsg->srcId)), pMsg->commitIndex, CID(&(pMsg->srcId)), pMsg->term, currentTerm,
3770
            processTime);
3771
  } else {
3772
    sGDebug(&pRpcMsg->info.traceId,
22,670,218✔
3773
            "vgId:%d, process sync-heartbeat msg from dnode:%d, commit-index:%" PRId64 ", cluster:%d msgTerm:%" PRId64
3774
            " currentTerm:%" PRId64 ", processTime:%" PRId64,
3775
            ths->vgId, DID(&(pMsg->srcId)), pMsg->commitIndex, CID(&(pMsg->srcId)), pMsg->term, currentTerm,
3776
            processTime);
3777
  }
3778

3779
  if (pMsg->term == currentTerm &&
22,670,218✔
3780
      (ths->state != TAOS_SYNC_STATE_LEADER && ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
22,593,674✔
3781
    resetElect = true;
22,593,765✔
3782

3783
    ths->minMatchIndex = pMsg->minMatchIndex;
22,593,765✔
3784

3785
    if (ths->state == TAOS_SYNC_STATE_FOLLOWER || ths->state == TAOS_SYNC_STATE_LEARNER) {
22,593,765✔
3786
      SRpcMsg rpcMsgLocalCmd = {0};
22,589,834✔
3787
      TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId));
22,589,834✔
3788
      rpcMsgLocalCmd.info.traceId = pRpcMsg->info.traceId;
22,589,834✔
3789

3790
      SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
22,589,834✔
3791
      pSyncMsg->cmd =
22,589,834✔
3792
          (ths->state == TAOS_SYNC_STATE_LEARNER) ? SYNC_LOCAL_CMD_LEARNER_CMT : SYNC_LOCAL_CMD_FOLLOWER_CMT;
22,589,834✔
3793
      pSyncMsg->commitIndex = pMsg->commitIndex;
22,589,925✔
3794
      pSyncMsg->currentTerm = pMsg->term;
22,589,925✔
3795

3796
      if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
22,589,834✔
3797
        int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
22,589,925✔
3798
        if (code != 0) {
22,589,925✔
3799
          sError("vgId:%d, failed to enqueue sync-local-cmd msg(cmd=commit) from heartbeat since %s",
×
3800
                 ths->vgId, tstrerror(code));
3801
          rpcFreeCont(rpcMsgLocalCmd.pCont);
×
3802
        } else {
3803
          sGTrace(&pRpcMsg->info.traceId,
22,589,925✔
3804
                  "vgId:%d, enqueue sync-local-cmd msg(cmd=commit) from heartbeat, commit-index:%" PRId64
3805
                  ", term:%" PRId64,
3806
                  ths->vgId, pMsg->commitIndex, pMsg->term);
3807
        }
3808
      }
3809
    }
3810
  }
3811

3812
  if (pMsg->term >= currentTerm &&
22,670,014✔
3813
      (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
22,670,117✔
3814
    SRpcMsg rpcMsgLocalCmd = {0};
×
3815
    TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId));
×
3816
    rpcMsgLocalCmd.info.traceId = pRpcMsg->info.traceId;
×
3817

3818
    SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
×
3819
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
×
3820
    pSyncMsg->currentTerm = pMsg->term;
×
3821
    pSyncMsg->commitIndex = pMsg->commitIndex;
×
3822

3823
    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
×
3824
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
×
3825
      if (code != 0) {
×
3826
        sError("vgId:%d, sync enqueue sync-local-cmd msg(cmd=step-down) error, code:%d", ths->vgId, code);
×
3827
        rpcFreeCont(rpcMsgLocalCmd.pCont);
×
3828
      } else {
3829
        sTrace("vgId:%d, sync enqueue sync-local-cmd msg(cmd=step-down), new-term:%" PRId64, ths->vgId, pMsg->term);
×
3830
      }
3831
    }
3832
  }
3833

3834
  SRpcMsg rpcMsg = {0};
22,670,014✔
3835
  TAOS_CHECK_RETURN(syncBuildHeartbeatReply(&rpcMsg, ths->vgId));
22,670,014✔
3836
  SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
22,670,102✔
3837
  pMsgReply->destId = pMsg->srcId;
22,670,102✔
3838
  pMsgReply->srcId = ths->myRaftId;
22,670,037✔
3839
  pMsgReply->term = currentTerm;
22,670,128✔
3840
  pMsgReply->privateTerm = 8864;  // magic number
22,670,218✔
3841
  pMsgReply->startTime = ths->startTime;
22,670,218✔
3842
  pMsgReply->timeStamp = tsMs;
22,670,218✔
3843
  rpcMsg.info.traceId = pRpcMsg->info.traceId;
22,669,989✔
3844

3845
  // reply
3846
  int64_t tsMs3 = taosGetTimestampMs();
22,670,015✔
3847

3848
  int64_t processTime2 = tsMs3 - tsMs2;
22,670,015✔
3849
  TRACE_SET_MSGID(&(rpcMsg.info.traceId), tGenIdPI64());
22,670,015✔
3850
  if (processTime2 > SYNC_HEARTBEAT_SLOW_MS) {
22,669,542✔
3851
    sGError(&rpcMsg.info.traceId,
264✔
3852
            "vgId:%d, send sync-heartbeat-reply to dnode:%d term:%" PRId64 " timestamp:%" PRId64
3853
            ", processTime:%" PRId64,
3854
            ths->vgId, DID(&(pMsgReply->destId)), pMsgReply->term, pMsgReply->timeStamp, processTime2);
3855
  } else {
3856
    if(tsSyncLogHeartbeat){
22,669,503✔
3857
      sGInfo(&rpcMsg.info.traceId,
×
3858
            "vgId:%d, send sync-heartbeat-reply to dnode:%d term:%" PRId64 " timestamp:%" PRId64
3859
            ", processTime:%" PRId64,
3860
            ths->vgId, DID(&(pMsgReply->destId)), pMsgReply->term, pMsgReply->timeStamp, processTime2);
3861
    }
3862
    else{
3863
      sGDebug(&rpcMsg.info.traceId,
22,669,503✔
3864
            "vgId:%d, send sync-heartbeat-reply to dnode:%d term:%" PRId64 " timestamp:%" PRId64
3865
            ", processTime:%" PRId64,
3866
            ths->vgId, DID(&(pMsgReply->destId)), pMsgReply->term, pMsgReply->timeStamp, processTime2);
3867
    }
3868
  }
3869

3870
  TAOS_CHECK_RETURN(syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg));
22,669,542✔
3871

3872
  if (resetElect) syncNodeResetElectTimer(ths);
22,670,218✔
3873
  return 0;
22,670,218✔
3874
}
3875

3876
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
22,513,096✔
3877
  int32_t code = 0;
22,513,096✔
3878

3879
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
22,513,096✔
3880
  SSyncLogReplMgr*    pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId);
22,513,096✔
3881
  if (pMgr == NULL) {
22,513,096✔
3882
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3883
    if (terrno != 0) code = terrno;
×
3884
    sError("vgId:%d, failed to get log repl mgr for the peer at addr 0x016%" PRIx64, ths->vgId, pMsg->srcId.addr);
×
3885
    TAOS_RETURN(code);
×
3886
  }
3887

3888
  int64_t tsMs = taosGetTimestampMs();
22,513,096✔
3889
  int64_t lastRecvTime = syncIndexMgrGetRecvTime(ths->pMatchIndex, &pMsg->srcId);
22,513,096✔
3890
  syncLogRecvHeartbeatReply(ths, pMsg, tsMs - pMsg->timeStamp, &pRpcMsg->info.traceId, tsMs - lastRecvTime);
22,513,096✔
3891

3892
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
22,513,096✔
3893
  syncIndexMgrIncRecvCount(ths->pMatchIndex, &(pMsg->srcId));
22,513,096✔
3894

3895
  return syncLogReplProcessHeartbeatReply(pMgr, ths, pMsg);
22,513,096✔
3896
}
3897

3898
#ifdef BUILD_NO_CALL
3899
int32_t syncNodeOnHeartbeatReplyOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
3900
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
3901

3902
  int64_t tsMs = taosGetTimestampMs();
3903
  int64_t timeDiff = tsMs - pMsg->timeStamp;
3904
  syncLogRecvHeartbeatReply(ths, pMsg, timeDiff, &pRpcMsg->info.traceId, timeDiff);
3905

3906
  // update last reply time, make decision whether the other node is alive or not
3907
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
3908
  return 0;
3909
}
3910
#endif
3911

3912
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
22,589,925✔
3913
  SyncLocalCmd* pMsg = pRpcMsg->pCont;
22,589,925✔
3914
  syncLogRecvLocalCmd(ths, pMsg, &pRpcMsg->info.traceId);
22,589,925✔
3915

3916
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
22,589,925✔
3917
    SRaftId id = EMPTY_RAFT_ID;
×
3918
    syncNodeStepDown(ths, pMsg->currentTerm, id, "localCmd");
×
3919

3920
  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT || pMsg->cmd == SYNC_LOCAL_CMD_LEARNER_CMT) {
45,179,850✔
3921
    if (syncLogBufferIsEmpty(ths->pLogBuf)) {
22,589,925✔
3922
      sError("vgId:%d, sync log buffer is empty", ths->vgId);
×
3923
      return 0;
×
3924
    }
3925
    SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf);
22,589,925✔
3926
    if (matchTerm < 0) {
22,589,925✔
3927
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3928
    }
3929
    if (pMsg->currentTerm == matchTerm) {
22,589,925✔
3930
      SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
21,290,019✔
3931
      sTrace("vgId:%d, raft entry update commit, return index:%" PRId64, ths->vgId, returnIndex);
21,290,019✔
3932
    }
3933
    if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
45,179,850✔
3934
        syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex, &pRpcMsg->info.traceId, "heartbeat") < 0) {
22,589,925✔
3935
      sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64, ths->vgId, terrstr(),
1,376✔
3936
             ths->commitIndex);
3937
    }
3938
  } else {
3939
    sError("error local cmd");
×
3940
  }
3941

3942
  return 0;
22,589,925✔
3943
}
3944

3945
// TLA+ Spec
3946
// ClientRequest(i, v) ==
3947
//     /\ state[i] = Leader
3948
//     /\ LET entry == [term  |-> currentTerm[i],
3949
//                      value |-> v]
3950
//            newLog == Append(log[i], entry)
3951
//        IN  log' = [log EXCEPT ![i] = newLog]
3952
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
3953
//                    leaderVars, commitIndex>>
3954
//
3955

3956
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
654,314,511✔
3957
  sGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, process client request", ths->vgId, pMsg);
654,314,511✔
3958
  int32_t code = 0;
654,314,511✔
3959

3960
  SyncIndex       index = syncLogBufferGetEndIndex(ths->pLogBuf);
654,314,511✔
3961
  SyncTerm        term = raftStoreGetTerm(ths);
654,331,671✔
3962
  SSyncRaftEntry* pEntry = NULL;
654,334,921✔
3963
  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
654,334,921✔
3964
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index, &pMsg->info.traceId);
74,084,615✔
3965
  } else {
3966
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
580,234,837✔
3967
  }
3968

3969
  if (pEntry == NULL) {
654,308,187✔
3970
    sGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to process client request since %s", ths->vgId, pMsg,
×
3971
            terrstr());
3972
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3973
  }
3974

3975
  // 1->2, config change is add in write thread, and will continue in sync thread
3976
  // need save message for it
3977
  if (pMsg->msgType == TDMT_SYNC_CONFIG_CHANGE) {
654,308,187✔
3978
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
×
3979
    uint64_t  seqNum = syncRespMgrAdd(ths->pSyncRespMgr, &stub);
×
3980
    pEntry->seqNum = seqNum;
×
3981
  }
3982

3983
  if (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
654,315,470✔
3984
    if (pRetIndex) {
654,322,354✔
3985
      (*pRetIndex) = index;
580,224,078✔
3986
    }
3987

3988
    if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
654,319,688✔
3989
      int32_t code = syncNodeCheckChangeConfig(ths, pEntry);
×
3990
      if (code < 0) {
×
3991
        sGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to check change config since %s", ths->vgId, pMsg,
×
3992
                terrstr());
3993
        syncEntryDestroy(pEntry);
×
3994
        pEntry = NULL;
×
3995
        TAOS_RETURN(code);
×
3996
      }
3997

3998
      if (code > 0) {
×
3999
        SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
×
4000
        int32_t num = syncRespMgrGetAndDel(ths->pSyncRespMgr, pEntry->seqNum, &rsp.info);
×
4001
        sGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, get response stub for config change, seqNum:%" PRIu64 " num:%d",
×
4002
                ths->vgId, pMsg, pEntry->seqNum, num);
4003
        if (rsp.info.handle != NULL) {
×
4004
          tmsgSendRsp(&rsp);
×
4005
        }
4006
        syncEntryDestroy(pEntry);
×
4007
        pEntry = NULL;
×
4008
        TAOS_RETURN(code);
×
4009
      }
4010
    }
4011

4012
    code = syncNodeAppend(ths, pEntry, pMsg);
654,309,301✔
4013
    return code;
654,279,928✔
4014
  } else {
4015
    syncEntryDestroy(pEntry);
×
4016
    pEntry = NULL;
×
4017
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
4018
  }
4019
}
4020

4021
const char* syncStr(ESyncState state) {
402,980,878✔
4022
  switch (state) {
402,980,878✔
4023
    case TAOS_SYNC_STATE_FOLLOWER:
173,581,688✔
4024
      return "follower";
173,581,688✔
4025
    case TAOS_SYNC_STATE_CANDIDATE:
1,230,973✔
4026
      return "candidate";
1,230,973✔
4027
    case TAOS_SYNC_STATE_LEADER:
215,366,490✔
4028
      return "leader";
215,366,490✔
4029
    case TAOS_SYNC_STATE_ERROR:
×
4030
      return "error";
×
4031
    case TAOS_SYNC_STATE_OFFLINE:
1,217,149✔
4032
      return "offline";
1,217,149✔
4033
    case TAOS_SYNC_STATE_LEARNER:
11,584,583✔
4034
      return "learner";
11,584,583✔
4035
    case TAOS_SYNC_STATE_ASSIGNED_LEADER:
176✔
4036
      return "assigned leader";
176✔
4037
    default:
226✔
4038
      return "unknown";
226✔
4039
  }
4040
}
4041

4042
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
350,298✔
4043
  for (int32_t i = 0; i < pNewCfg->totalReplicaNum; ++i) {
388,683✔
4044
    SRaftId raftId = {
388,683✔
4045
        .addr = SYNC_ADDR(&pNewCfg->nodeInfo[i]),
388,683✔
4046
        .vgId = ths->vgId,
388,683✔
4047
    };
4048

4049
    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
388,683✔
4050
      pNewCfg->myIndex = i;
350,298✔
4051
      return 0;
350,298✔
4052
    }
4053
  }
4054

4055
  return TSDB_CODE_SYN_INTERNAL_ERROR;
×
4056
}
4057

4058
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
654,861,018✔
4059
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
654,861,018✔
4060
}
4061

4062
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
220,506,468✔
4063
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
457,093,201✔
4064
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
457,093,201✔
4065
      return true;
220,506,385✔
4066
    }
4067
  }
4068
  return false;
×
4069
}
4070

4071
SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
15,349,937✔
4072
  SSyncSnapshotSender* pSender = NULL;
15,349,937✔
4073
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
61,989,738✔
4074
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
46,640,739✔
4075
      pSender = (ths->senders)[i];
15,350,966✔
4076
    }
4077
  }
4078
  return pSender;
15,350,824✔
4079
}
4080

4081
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
11,239,832✔
4082
  SSyncTimer* pTimer = NULL;
11,239,832✔
4083
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
48,502,436✔
4084
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
37,261,449✔
4085
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
11,239,832✔
4086
    }
4087
  }
4088
  return pTimer;
11,239,832✔
4089
}
4090

4091
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
952,991✔
4092
  SPeerState* pState = NULL;
952,991✔
4093
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
3,289,756✔
4094
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
2,336,765✔
4095
      pState = &((ths->peerStates)[i]);
952,991✔
4096
    }
4097
  }
4098
  return pState;
952,991✔
4099
}
4100

4101
#ifdef BUILD_NO_CALL
4102
bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
4103
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
4104
  if (pState == NULL) {
4105
    sError("vgId:%d, replica maybe dropped", ths->vgId);
4106
    return false;
4107
  }
4108

4109
  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
4110
  int64_t   tsNow = taosGetTimestampMs();
4111

4112
  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
4113
    return false;
4114
  }
4115

4116
  return true;
4117
}
4118

4119
bool syncNodeCanChange(SSyncNode* pSyncNode) {
4120
  if (pSyncNode->changing) {
4121
    sError("sync cannot change");
4122
    return false;
4123
  }
4124

4125
  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
4126
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
4127
    if (pSyncNode->commitIndex != lastIndex) {
4128
      sError("sync cannot change2");
4129
      return false;
4130
    }
4131
  }
4132

4133
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
4134
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
4135
    if (pSender != NULL && pSender->start) {
4136
      sError("sync cannot change3");
4137
      return false;
4138
    }
4139
  }
4140

4141
  return true;
4142
}
4143
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc