• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4983

13 Mar 2026 03:38AM UTC coverage: 68.653% (+0.07%) from 68.587%
#4983

push

travis-ci

web-flow
feat/6641435300-save-audit-in-self (#34738)

434 of 584 new or added lines in 10 files covered. (74.32%)

434 existing lines in 121 files now uncovered.

212745 of 309883 relevant lines covered (68.65%)

134272959.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.62
/source/libs/sync/src/syncMain.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "sync.h"
18
#include "syncAppendEntries.h"
19
#include "syncAppendEntriesReply.h"
20
#include "syncCommit.h"
21
#include "syncElection.h"
22
#include "syncEnv.h"
23
#include "syncIndexMgr.h"
24
#include "syncInt.h"
25
#include "syncMessage.h"
26
#include "syncPipeline.h"
27
#include "syncRaftCfg.h"
28
#include "syncRaftLog.h"
29
#include "syncRaftStore.h"
30
#include "syncReplication.h"
31
#include "syncRequestVote.h"
32
#include "syncRequestVoteReply.h"
33
#include "syncRespMgr.h"
34
#include "syncSnapshot.h"
35
#include "syncTimeout.h"
36
#include "syncUtil.h"
37
#include "syncVoteMgr.h"
38
#include "tglobal.h"
39
#include "tmisce.h"
40
#include "tref.h"
41

42
static void    syncNodeEqPingTimer(void* param, void* tmrId);
43
static void    syncNodeEqElectTimer(void* param, void* tmrId);
44
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
45
static int32_t syncNodeAppendNoop(SSyncNode* ths);
46
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
47
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
48
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
49
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
50
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
51
static int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
52
static bool    syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
53
static int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
54
static bool    syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);
55

56
static bool    syncNodeCanChange(SSyncNode* pSyncNode);
57
static int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
58
static int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
59
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);
60

61
static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
62

63
int64_t syncOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
4,559,891✔
64
  sInfo("vgId:%d, start to open sync", pSyncInfo->vgId);
4,559,891✔
65

66
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo, vnodeVersion, pSyncInfo->electMs, pSyncInfo->heartbeatMs);
4,559,891✔
67
  if (pSyncNode == NULL) {
4,561,169✔
68
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
×
69
    return -1;
×
70
  }
71

72
  pSyncNode->rid = syncNodeAdd(pSyncNode);
4,561,169✔
73
  if (pSyncNode->rid < 0) {
4,561,169✔
74
    syncNodeClose(pSyncNode);
×
75
    return -1;
×
76
  }
77

78
  pSyncNode->pingBaseLine = pSyncInfo->pingMs;
4,560,452✔
79
  pSyncNode->pingTimerMS = pSyncInfo->pingMs;
4,561,169✔
80
  pSyncNode->electBaseLine = pSyncInfo->electMs;
4,561,169✔
81
  pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
4,561,169✔
82
  pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
4,561,169✔
83
  pSyncNode->msgcb = pSyncInfo->msgcb;
4,561,169✔
84
  sInfo("vgId:%d, sync opened, electBaseLine:%d, hbBaseLine:%d", pSyncInfo->vgId, pSyncNode->electBaseLine,
4,561,169✔
85
        pSyncNode->hbBaseLine);
86
  return pSyncNode->rid;
4,561,694✔
87
}
88

89
int32_t syncStart(int64_t rid) {
4,561,105✔
90
  int32_t    code = 0;
4,561,105✔
91
  int32_t    vgId = 0;
4,561,105✔
92
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
4,561,105✔
93
  if (pSyncNode == NULL) {
4,561,105✔
94
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
95
    if (terrno != 0) code = terrno;
×
96
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
97
    TAOS_RETURN(code);
×
98
  }
99
  vgId = pSyncNode->vgId;
4,561,105✔
100
  sInfo("vgId:%d, begin to start sync", pSyncNode->vgId);
4,561,105✔
101

102
  if ((code = syncNodeRestore(pSyncNode)) < 0) {
4,561,105✔
103
    sError("vgId:%d, failed to restore sync log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
104
    goto _err;
×
105
  }
106
  sInfo("vgId:%d, sync node restore is executed", pSyncNode->vgId);
4,561,105✔
107

108
  if ((code = syncNodeStart(pSyncNode)) < 0) {
4,561,821✔
109
    sError("vgId:%d, failed to start sync node since %s", pSyncNode->vgId, tstrerror(code));
×
110
    goto _err;
×
111
  }
112
  sInfo("vgId:%d, sync node start is executed", pSyncNode->vgId);
4,561,105✔
113

114
  syncNodeRelease(pSyncNode);
4,561,105✔
115

116
  sInfo("vgId:%d, sync started", vgId);
4,561,105✔
117

118
  TAOS_RETURN(code);
4,561,105✔
119

120
_err:
×
121
  syncNodeRelease(pSyncNode);
×
122
  TAOS_RETURN(code);
×
123
}
124

125
int32_t syncNodeGetConfig(int64_t rid, SSyncCfg* cfg) {
5,268,626✔
126
  int32_t    code = 0;
5,268,626✔
127
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
5,268,626✔
128

129
  if (pSyncNode == NULL) {
5,268,626✔
130
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
131
    if (terrno != 0) code = terrno;
×
132
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
133
    TAOS_RETURN(code);
×
134
  }
135

136
  *cfg = pSyncNode->raftCfg.cfg;
5,268,626✔
137

138
  syncNodeRelease(pSyncNode);
5,268,626✔
139

140
  return 0;
5,268,626✔
141
}
142

143
void syncStop(int64_t rid) {
4,561,105✔
144
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
4,561,105✔
145
  if (pSyncNode != NULL) {
4,561,105✔
146
    pSyncNode->isStart = false;
4,561,105✔
147
    syncNodeRelease(pSyncNode);
4,561,105✔
148
    syncNodeRemove(rid);
4,561,105✔
149
  }
150
}
4,561,105✔
151

152
void syncPreStop(int64_t rid) {
4,560,227✔
153
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
4,560,227✔
154
  if (pSyncNode != NULL) {
4,561,105✔
155
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
4,561,105✔
156
      sInfo("vgId:%d, stop snapshot receiver", pSyncNode->vgId);
394✔
157
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
394✔
158
    }
159
    syncNodePreClose(pSyncNode);
4,560,518✔
160
    syncNodeRelease(pSyncNode);
4,560,125✔
161
  }
162
}
4,560,515✔
163

164
void syncPostStop(int64_t rid) {
4,120,384✔
165
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
4,120,384✔
166
  if (pSyncNode != NULL) {
4,120,384✔
167
    syncNodePostClose(pSyncNode);
4,120,384✔
168
    syncNodeRelease(pSyncNode);
4,120,384✔
169
  }
170
}
4,120,349✔
171

172
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
378,022✔
173
  if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
378,022✔
174
  return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
378,022✔
175
}
176

177
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
399,892✔
178
  int32_t    code = 0;
399,892✔
179
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
399,892✔
180
  if (pSyncNode == NULL) {
399,892✔
181
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
182
    if (terrno != 0) code = terrno;
×
183
    TAOS_RETURN(code);
×
184
  }
185

186
  if (pSyncNode->raftCfg.lastConfigIndex >= pNewCfg->lastIndex) {
399,892✔
187
    syncNodeRelease(pSyncNode);
21,870✔
188
    sInfo("vgId:%d, no need Reconfig, current index:%" PRId64 ", new index:%" PRId64, pSyncNode->vgId,
21,870✔
189
          pSyncNode->raftCfg.lastConfigIndex, pNewCfg->lastIndex);
190
    return 0;
21,870✔
191
  }
192

193
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
378,022✔
194
    syncNodeRelease(pSyncNode);
×
195
    code = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
×
196
    sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
×
197
    TAOS_RETURN(code);
×
198
  }
199

200
  TAOS_CHECK_RETURN(syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg));
378,022✔
201

202
  if (syncNodeDoConfigChange(pSyncNode, pNewCfg, pNewCfg->lastIndex) != 0) {
378,022✔
203
    code = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
×
204
    sError("vgId:%d, failed to reconfig since do change error", pSyncNode->vgId);
×
205
    TAOS_RETURN(code);
×
206
  }
207

208
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER || pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
378,022✔
209
    // TODO check return value
210
    TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
346,972✔
211

212
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
5,551,552✔
213
      TAOS_CHECK_RETURN(syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]));
5,204,580✔
214
    }
215

216
    TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
346,972✔
217
    // syncNodeReplicate(pSyncNode);
218
  }
219

220
  syncNodeRelease(pSyncNode);
378,022✔
221
  TAOS_RETURN(code);
378,022✔
222
}
223

224
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
402,352,780✔
225
  int32_t code = -1;
402,352,780✔
226
  if (!syncIsInit()) {
402,352,780✔
227
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
228
    if (terrno != 0) code = terrno;
×
229
    TAOS_RETURN(code);
×
230
  }
231

232
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
402,353,605✔
233
  if (pSyncNode == NULL) {
402,353,871✔
234
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
235
    if (terrno != 0) code = terrno;
×
236
    TAOS_RETURN(code);
×
237
  }
238

239
  switch (pMsg->msgType) {
402,353,871✔
240
    case TDMT_SYNC_HEARTBEAT:
25,513,515✔
241
      code = syncNodeOnHeartbeat(pSyncNode, pMsg);
25,513,515✔
242
      break;
25,513,515✔
243
    case TDMT_SYNC_HEARTBEAT_REPLY:
25,507,024✔
244
      code = syncNodeOnHeartbeatReply(pSyncNode, pMsg);
25,507,024✔
245
      break;
25,506,917✔
246
    case TDMT_SYNC_TIMEOUT:
33,582,597✔
247
      code = syncNodeOnTimeout(pSyncNode, pMsg);
33,582,597✔
248
      break;
33,567,089✔
249
    case TDMT_SYNC_TIMEOUT_ELECTION:
565,430✔
250
      code = syncNodeOnTimeout(pSyncNode, pMsg);
565,430✔
251
      break;
565,430✔
252
    case TDMT_SYNC_CLIENT_REQUEST:
81,032,817✔
253
      code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
81,032,817✔
254
      break;
81,032,817✔
255
    case TDMT_SYNC_REQUEST_VOTE:
941,455✔
256
      code = syncNodeOnRequestVote(pSyncNode, pMsg);
941,455✔
257
      break;
941,455✔
258
    case TDMT_SYNC_REQUEST_VOTE_REPLY:
930,293✔
259
      code = syncNodeOnRequestVoteReply(pSyncNode, pMsg);
930,293✔
260
      break;
930,293✔
261
    case TDMT_SYNC_APPEND_ENTRIES:
103,998,646✔
262
      code = syncNodeOnAppendEntries(pSyncNode, pMsg);
103,998,646✔
263
      break;
103,997,106✔
264
    case TDMT_SYNC_APPEND_ENTRIES_REPLY:
103,950,416✔
265
      code = syncNodeOnAppendEntriesReply(pSyncNode, pMsg);
103,950,416✔
266
      break;
103,950,125✔
267
    case TDMT_SYNC_SNAPSHOT_SEND:
450,334✔
268
      code = syncNodeOnSnapshot(pSyncNode, pMsg);
450,334✔
269
      break;
450,334✔
270
    case TDMT_SYNC_SNAPSHOT_RSP:
450,994✔
271
      code = syncNodeOnSnapshotRsp(pSyncNode, pMsg);
450,994✔
272
      break;
450,994✔
273
    case TDMT_SYNC_LOCAL_CMD:
25,423,105✔
274
      code = syncNodeOnLocalCmd(pSyncNode, pMsg);
25,423,105✔
275
      break;
25,422,397✔
276
    case TDMT_SYNC_FORCE_FOLLOWER:
4,768✔
277
      code = syncForceBecomeFollower(pSyncNode, pMsg);
4,768✔
278
      break;
4,768✔
279
    case TDMT_SYNC_SET_ASSIGNED_LEADER:
1,278✔
280
      code = syncBecomeAssignedLeader(pSyncNode, pMsg);
1,278✔
281
      break;
17✔
282
    default:
×
283
      code = TSDB_CODE_MSG_NOT_PROCESSED;
×
284
  }
285

286
  syncNodeRelease(pSyncNode);
402,333,257✔
287
  if (code != 0) {
402,331,692✔
288
    sDebug("vgId:%d, failed to process sync msg:%p type:%s since %s", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
7,110✔
289
           tstrerror(code));
290
  }
291
  TAOS_RETURN(code);
402,331,692✔
292
}
293

294
int32_t syncLeaderTransfer(int64_t rid) {
4,561,105✔
295
  int32_t    code = 0;
4,561,105✔
296
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
4,561,105✔
297
  if (pSyncNode == NULL) {
4,560,227✔
298
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
299
    if (terrno != 0) code = terrno;
×
300
    TAOS_RETURN(code);
×
301
  }
302

303
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
4,560,227✔
304
  syncNodeRelease(pSyncNode);
4,560,814✔
305
  return ret;
4,560,227✔
306
}
307

308
int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
4,768✔
309
  SRaftId id = {0};
4,768✔
310
  syncNodeBecomeFollower(ths, id, "force election");
4,768✔
311

312
  SRpcMsg rsp = {
9,536✔
313
      .code = 0,
314
      .pCont = pRpcMsg->info.rsp,
4,768✔
315
      .contLen = pRpcMsg->info.rspLen,
4,768✔
316
      .info = pRpcMsg->info,
317
  };
318
  tmsgSendRsp(&rsp);
4,768✔
319

320
  return 0;
4,768✔
321
}
322

323
int32_t syncBecomeAssignedLeader(SSyncNode* ths, SRpcMsg* pRpcMsg) {
17✔
324
  int32_t code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
17✔
325
  void*   pHead = NULL;
17✔
326
  int32_t contLen = 0;
17✔
327

328
  SVArbSetAssignedLeaderReq req = {0};
17✔
329
  if (tDeserializeSVArbSetAssignedLeaderReq((char*)pRpcMsg->pCont + sizeof(SMsgHead), pRpcMsg->contLen, &req) != 0) {
17✔
330
    sError("vgId:%d, failed to deserialize SVArbSetAssignedLeaderReq", ths->vgId);
×
331
    code = TSDB_CODE_INVALID_MSG;
×
332
    goto _OVER;
×
333
  }
334

335
  if (ths->arbTerm > req.arbTerm) {
17✔
336
    sInfo("vgId:%d, skip to set assigned leader, msg with lower term, local:%" PRId64 "msg:%" PRId64, ths->vgId,
×
337
          ths->arbTerm, req.arbTerm);
338
    goto _OVER;
×
339
  }
340

341
  ths->arbTerm = TMAX(req.arbTerm, ths->arbTerm);
17✔
342

343
  if (!req.force) {
17✔
344
    if (strncmp(req.memberToken, ths->arbToken, TSDB_ARB_TOKEN_SIZE) != 0) {
17✔
345
      sInfo("vgId:%d, skip to set assigned leader, token mismatch, local:%s, msg:%s", ths->vgId, ths->arbToken,
×
346
            req.memberToken);
347
      goto _OVER;
×
348
    }
349
  }
350

351
  if (ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
17✔
352
    code = TSDB_CODE_SUCCESS;
17✔
353
    raftStoreNextTerm(ths);
17✔
354
    if (terrno != TSDB_CODE_SUCCESS) {
17✔
355
      code = terrno;
×
356
      sError("vgId:%d, failed to set next term since:%s", ths->vgId, tstrerror(code));
×
357
      goto _OVER;
×
358
    }
359
    syncNodeBecomeAssignedLeader(ths);
17✔
360

361
    if ((code = syncNodeAppendNoop(ths)) < 0) {
17✔
362
      sError("vgId:%d, assigned leader failed to append noop entry since %s", ths->vgId, tstrerror(code));
×
363
    }
364
  }
365

366
  SVArbSetAssignedLeaderRsp rsp = {0};
17✔
367
  rsp.arbToken = req.arbToken;
17✔
368
  rsp.memberToken = req.memberToken;
17✔
369
  rsp.vgId = ths->vgId;
17✔
370

371
  contLen = tSerializeSVArbSetAssignedLeaderRsp(NULL, 0, &rsp);
17✔
372
  if (contLen <= 0) {
17✔
373
    code = TSDB_CODE_OUT_OF_MEMORY;
×
374
    sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId);
×
375
    goto _OVER;
×
376
  }
377
  pHead = rpcMallocCont(contLen);
17✔
378
  if (!pHead) {
17✔
379
    code = terrno;
×
380
    sError("vgId:%d, failed to malloc memory for SVArbSetAssignedLeaderRsp", ths->vgId);
×
381
    goto _OVER;
×
382
  }
383
  if (tSerializeSVArbSetAssignedLeaderRsp(pHead, contLen, &rsp) <= 0) {
17✔
384
    code = TSDB_CODE_OUT_OF_MEMORY;
×
385
    sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId);
×
386
    rpcFreeCont(pHead);
×
387
    goto _OVER;
×
388
  }
389

390
  code = TSDB_CODE_SUCCESS;
17✔
391

392
_OVER:;
17✔
393
  SRpcMsg rspMsg = {
17✔
394
      .code = code,
395
      .pCont = pHead,
396
      .contLen = contLen,
397
      .info = pRpcMsg->info,
398
  };
399

400
  tmsgSendRsp(&rspMsg);
17✔
401

402
  tFreeSVArbSetAssignedLeaderReq(&req);
17✔
403
  TAOS_RETURN(code);
17✔
404
}
405

406
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
×
407
  int32_t    code = 0;
×
408
  SSyncNode* pNode = syncNodeAcquire(rid);
×
409
  if (pNode == NULL) {
×
410
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
411
    if (terrno != 0) code = terrno;
×
412
    TAOS_RETURN(code);
×
413
  }
414

415
  SRpcMsg rpcMsg = {0, .info.notFreeAhandle = 1};
×
416
  int32_t ret = syncRespMgrGetAndDel(pNode->pSyncRespMgr, seq, &rpcMsg.info);
×
417
  rpcMsg.code = TSDB_CODE_SYN_TIMEOUT;
×
418

419
  syncNodeRelease(pNode);
×
420
  if (ret == 1) {
×
421
    sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle);
×
422
    code = rpcSendResponse(&rpcMsg);
×
423
    return code;
×
424
  } else {
425
    sError("no message handle to send timeout response, seq:%" PRId64, seq);
×
426
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
427
  }
428
}
429

430
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
27,401,355✔
431
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;
27,401,355✔
432

433
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
82,262,405✔
434
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
54,861,050✔
435
    if (minMatchIndex == SYNC_INDEX_INVALID) {
54,861,050✔
436
      minMatchIndex = matchIndex;
29,314,261✔
437
    } else if (matchIndex > 0 && matchIndex < minMatchIndex) {
25,546,789✔
438
      minMatchIndex = matchIndex;
313,674✔
439
    }
440
  }
441
  return minMatchIndex;
27,401,355✔
442
}
443

444
static SyncIndex syncLogRetentionIndex(SSyncNode* pSyncNode, int64_t bytes) {
743,843✔
445
  return pSyncNode->pLogStore->syncLogIndexRetention(pSyncNode->pLogStore, bytes);
743,843✔
446
}
447

448
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
6,122,046✔
449
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
6,122,046✔
450
  int32_t    code = 0;
6,121,714✔
451
  if (pSyncNode == NULL) {
6,121,714✔
452
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
5,444✔
453
    if (terrno != 0) code = terrno;
5,444✔
454
    sError("sync begin snapshot error");
5,444✔
455
    TAOS_RETURN(code);
5,444✔
456
  }
457

458
  SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
6,116,270✔
459
  SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
6,116,602✔
460
  bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);
6,116,602✔
461

462
  if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
6,115,835✔
463
    sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
28,389✔
464
    syncNodeRelease(pSyncNode);
28,389✔
465
    return 0;
29,095✔
466
  }
467

468
  int64_t logRetention = 0;
6,087,507✔
469

470
  if (syncNodeIsMnode(pSyncNode)) {
6,087,507✔
471
    // mnode
472
    logRetention = tsMndLogRetention;
847,976✔
473
  } else {
474
    // vnode
475
    if (pSyncNode->replicaNum > 1) {
5,238,880✔
476
      logRetention = SYNC_VNODE_LOG_RETENTION;
660,798✔
477
    }
478
  }
479

480
  if (pSyncNode->totalReplicaNum > 1) {
6,086,856✔
481
    if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER &&
744,274✔
482
        pSyncNode->state != TAOS_SYNC_STATE_LEARNER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
56,934✔
483
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate or unknown state, do not delete wal",
431✔
484
              lastApplyIndex);
485
      syncNodeRelease(pSyncNode);
431✔
486
      return 0;
431✔
487
    }
488
    SyncIndex retentionIndex =
743,843✔
489
        TMAX(pSyncNode->minMatchIndex, syncLogRetentionIndex(pSyncNode, SYNC_WAL_LOG_RETENTION_SIZE));
743,843✔
490
    logRetention += TMAX(0, lastApplyIndex - retentionIndex);
743,843✔
491
  }
492

493
_DEL_WAL:
5,343,233✔
494

495
  do {
496
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
6,087,076✔
497
    SyncIndex          snapshotVer = walGetSnapshotVer(pData->pWal);
6,087,076✔
498
    SyncIndex          walCommitVer = walGetCommittedVer(pData->pWal);
6,086,744✔
499
    SyncIndex          wallastVer = walGetLastVer(pData->pWal);
6,086,425✔
500
    if (lastApplyIndex <= walCommitVer) {
6,085,658✔
501
      SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);
6,085,658✔
502

503
      if (snapshottingIndex == SYNC_INDEX_INVALID) {
6,086,425✔
504
        atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
6,086,425✔
505
        pSyncNode->snapshottingTime = taosGetTimestampMs();
6,086,309✔
506

507
        code = walBeginSnapshot(pData->pWal, lastApplyIndex, logRetention);
6,087,076✔
508
        if (code == 0) {
6,087,076✔
509
          sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
6,086,335✔
510
                  pSyncNode->snapshottingIndex, lastApplyIndex);
511
        } else {
512
          sNError(pSyncNode, "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64,
741✔
513
                  terrstr(), pSyncNode->snapshottingIndex, lastApplyIndex);
514
          atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
741✔
515
        }
516

517
      } else {
518
        sNTrace(pSyncNode, "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64,
×
519
                snapshottingIndex, lastApplyIndex);
520
      }
521
    }
522
  } while (0);
523

524
  syncNodeRelease(pSyncNode);
6,087,076✔
525
  TAOS_RETURN(code);
6,087,076✔
526
}
527

528
int32_t syncEndSnapshot(int64_t rid, bool forceTrim) {
6,115,426✔
529
  int32_t    code = 0;
6,115,426✔
530
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
6,115,426✔
531
  if (pSyncNode == NULL) {
6,115,426✔
532
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
533
    if (terrno != 0) code = terrno;
×
534
    sError("sync end snapshot error");
×
535
    TAOS_RETURN(code);
×
536
  }
537

538
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
6,115,426✔
539
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
6,086,335✔
540
    code = walEndSnapshot(pData->pWal, forceTrim);
6,086,335✔
541
    if (code != 0) {
6,086,335✔
542
      sNError(pSyncNode, "wal snapshot end error since:%s", tstrerror(code));
×
543
      syncNodeRelease(pSyncNode);
×
544
      TAOS_RETURN(code);
×
545
    } else {
546
      sNTrace(pSyncNode, "wal snapshot end, index:%" PRId64, atomic_load_64(&pSyncNode->snapshottingIndex));
6,086,335✔
547
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
6,086,335✔
548
    }
549
  }
550

551
  syncNodeRelease(pSyncNode);
6,115,861✔
552
  TAOS_RETURN(code);
6,115,861✔
553
}
554

555
bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
832,396,216✔
556
  if (pSyncNode == NULL) {
832,396,216✔
557
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
558
    sError("sync ready for read error");
×
559
    return false;
×
560
  }
561

562
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
832,396,216✔
563
    terrno = TSDB_CODE_SYN_NOT_LEADER;
34,070,695✔
564
    return false;
34,070,695✔
565
  }
566

567
  if (!pSyncNode->restoreFinish) {
798,327,906✔
568
    terrno = TSDB_CODE_SYN_RESTORING;
257,612✔
569
    return false;
243,303✔
570
  }
571

572
  return true;
798,099,657✔
573
}
574

575
bool syncIsReadyForRead(int64_t rid) {
687,157,644✔
576
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
687,157,644✔
577
  if (pSyncNode == NULL) {
687,172,899✔
578
    sError("sync ready for read error");
×
579
    return false;
×
580
  }
581

582
  bool ready = syncNodeIsReadyForRead(pSyncNode);
687,172,899✔
583

584
  syncNodeRelease(pSyncNode);
687,172,276✔
585
  return ready;
687,154,013✔
586
}
587

588
#ifdef BUILD_NO_CALL
589
bool syncSnapshotSending(int64_t rid) {
590
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
591
  if (pSyncNode == NULL) {
592
    return false;
593
  }
594

595
  bool b = syncNodeSnapshotSending(pSyncNode);
596
  syncNodeRelease(pSyncNode);
597
  return b;
598
}
599

600
bool syncSnapshotRecving(int64_t rid) {
601
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
602
  if (pSyncNode == NULL) {
603
    return false;
604
  }
605

606
  bool b = syncNodeSnapshotRecving(pSyncNode);
607
  syncNodeRelease(pSyncNode);
608
  return b;
609
}
610
#endif
611

612
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
4,560,227✔
613
  if (pSyncNode->peersNum == 0) {
4,560,227✔
614
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
3,130,365✔
615
    return 0;
3,130,365✔
616
  }
617

618
  int32_t ret = 0;
1,430,740✔
619
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->replicaNum > 1) {
1,430,740✔
620
    SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
443,159✔
621
    if (pSyncNode->peersNum == 2) {
443,159✔
622
      SyncIndex matchIndex0 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
267,818✔
623
      SyncIndex matchIndex1 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[1]));
267,818✔
624
      if (matchIndex1 > matchIndex0) {
267,818✔
625
        newLeader = (pSyncNode->peersNodeInfo)[1];
13,483✔
626
      }
627
    }
628
    ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
443,159✔
629
  }
630

631
  return ret;
1,430,740✔
632
}
633

634
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
443,159✔
635
  if (pSyncNode->replicaNum == 1) {
443,159✔
636
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
×
637
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
638
  }
639

640
  sNTrace(pSyncNode, "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
443,159✔
641

642
  SRpcMsg rpcMsg = {0};
443,159✔
643
  TAOS_CHECK_RETURN(syncBuildLeaderTransfer(&rpcMsg, pSyncNode->vgId));
443,159✔
644

645
  SyncLeaderTransfer* pMsg = rpcMsg.pCont;
443,159✔
646
  pMsg->newLeaderId.addr = SYNC_ADDR(&newLeader);
443,159✔
647
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
443,159✔
648
  pMsg->newNodeInfo = newLeader;
443,159✔
649

650
  int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false, NULL);
443,159✔
651
  rpcFreeCont(rpcMsg.pCont);
443,159✔
652
  return ret;
443,159✔
653
}
654

655
int32_t syncResetTimer(int64_t rid, int32_t electInterval, int32_t heartbeatInterval) {
×
656
  int32_t code = 0;
×
657
  sInfo("sync Reset Timer, rid:%" PRId64, rid);
×
658
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
659
  if (pSyncNode == NULL) {
×
660
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
661
    if (terrno != 0) code = terrno;
×
662
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
663
    TAOS_RETURN(code);
×
664
  }
665
  pSyncNode->electBaseLine = electInterval;
×
666
  syncNodeResetElectTimer(pSyncNode);
×
667

668
  sInfo("vgId:%d, sync Reset Timer, rid:%" PRId64, pSyncNode->vgId, rid);
×
669
  code = syncNodeRestartHeartbeatTimer(pSyncNode, heartbeatInterval);
×
670

671
  syncNodeRelease(pSyncNode);
×
672
  return code;
×
673
}
674

675
SSyncState syncGetState(int64_t rid) {
910,324,874✔
676
  SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
910,324,874✔
677

678
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
910,324,874✔
679
  if (pSyncNode != NULL) {
910,343,295✔
680
    state.state = pSyncNode->state;
910,343,295✔
681
    state.roleTimeMs = pSyncNode->roleTimeMs;
910,342,388✔
682
    state.startTimeMs = pSyncNode->startTime;
910,337,720✔
683
    state.restored = pSyncNode->restoreFinish;
910,337,493✔
684
    if (pSyncNode->vgId != 1) {
910,341,653✔
685
      state.canRead = syncNodeIsReadyForRead(pSyncNode);
145,246,283✔
686
    } else {
687
      state.canRead = state.restored;
765,085,872✔
688
    }
689
    state.totalIndex = pSyncNode->pLogBuf->totalIndex;
910,329,282✔
690

691
    double progress = 0;
910,327,900✔
692
    if(pSyncNode->pLogBuf->totalIndex > 0 && pSyncNode->pLogBuf->commitIndex > 0){
910,327,900✔
693
      progress = (double)pSyncNode->pLogBuf->commitIndex/(double)pSyncNode->pLogBuf->totalIndex;
944,851✔
694
      state.progress = (int32_t)(progress * 100);
944,851✔
695
    }
696
    else{
697
      state.progress = -1;
909,372,983✔
698
    }
699
    if (pSyncNode->state == TAOS_SYNC_STATE_LEARNER) {
910,317,834✔
700
      sInfo("vgId:%d, learner progress state, commitIndex:%" PRId64 " totalIndex:%" PRId64
2,164,845✔
701
            ", "
702
            "progress:%lf, progress:%d",
703
            pSyncNode->vgId, pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->totalIndex, progress, state.progress);
704
    }
705

706
    state.term = raftStoreGetTerm(pSyncNode);
910,326,501✔
707
    state.snapSeq = pSyncNode->snapSeq;
910,343,012✔
708
    syncNodeRelease(pSyncNode);
910,343,573✔
709
  }
710

711
  return state;
910,339,667✔
712
}
713

714
SSyncMetrics syncGetMetrics(int64_t rid) {
×
715
  SSyncMetrics metrics = {0};
×
716

717
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
718
  if (pSyncNode != NULL) {
×
719
    sDebug("vgId:%d, sync get metrics, wal_write_bytes:%" PRId64 ", wal_write_time:%" PRId64, pSyncNode->vgId,
×
720
           pSyncNode->wal_write_bytes, pSyncNode->wal_write_time);
721
    metrics.wal_write_bytes = atomic_load_64(&pSyncNode->wal_write_bytes);
×
722
    metrics.wal_write_time = atomic_load_64(&pSyncNode->wal_write_time);
×
723
    syncNodeRelease(pSyncNode);
×
724
  }
725
  return metrics;
×
726
}
727

728
void syncResetMetrics(int64_t rid, const SSyncMetrics* pOldMetrics) {
×
729
  if (pOldMetrics == NULL) return;
×
730

731
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
732
  if (pSyncNode != NULL) {
×
733
    // Atomically subtract the old metrics values from current metrics
734
    (void)atomic_sub_fetch_64(&pSyncNode->wal_write_bytes, pOldMetrics->wal_write_bytes);
×
735
    (void)atomic_sub_fetch_64(&pSyncNode->wal_write_time, pOldMetrics->wal_write_time);
×
736
    syncNodeRelease(pSyncNode);
×
737
  }
738
}
739

740
void syncGetCommitIndex(int64_t rid, int64_t* syncCommitIndex) {
141,030,315✔
741
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
141,030,315✔
742
  if (pSyncNode != NULL) {
141,030,315✔
743
    *syncCommitIndex = pSyncNode->commitIndex;
141,030,315✔
744
    syncNodeRelease(pSyncNode);
141,030,315✔
745
  }
746
}
141,030,315✔
747

748
int32_t syncGetArbToken(int64_t rid, char* outToken) {
25,476,502✔
749
  int32_t    code = 0;
25,476,502✔
750
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
25,476,502✔
751
  if (pSyncNode == NULL) {
25,476,502✔
752
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
753
    if (terrno != 0) code = terrno;
×
754
    TAOS_RETURN(code);
×
755
  }
756

757
  memset(outToken, 0, TSDB_ARB_TOKEN_SIZE);
25,476,502✔
758
  (void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
25,476,502✔
759
  tstrncpy(outToken, pSyncNode->arbToken, TSDB_ARB_TOKEN_SIZE);
25,476,502✔
760
  (void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
25,476,502✔
761

762
  syncNodeRelease(pSyncNode);
25,476,502✔
763
  TAOS_RETURN(code);
25,476,502✔
764
}
765

766
int32_t syncCheckSynced(int64_t rid) {
2,215✔
767
  int32_t    code = 0;
2,215✔
768
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
2,215✔
769
  if (pSyncNode == NULL) {
2,215✔
770
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
771
    if (terrno != 0) code = terrno;
×
772
    TAOS_RETURN(code);
×
773
  }
774

775
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
2,215✔
776
    code = TSDB_CODE_SYN_NOT_LEADER;
×
777
    syncNodeRelease(pSyncNode);
×
778
    TAOS_RETURN(code);
×
779
  }
780

781
  bool isSync = pSyncNode->commitIndex >= pSyncNode->assignedCommitIndex;
2,215✔
782
  code = (isSync ? TSDB_CODE_SUCCESS : TSDB_CODE_VND_ARB_NOT_SYNCED);
2,215✔
783
  if (!isSync) {
2,215✔
784
    sInfo("vgId:%d, not synced, assignedCommitIndex:%" PRId64 ", commitIndex:%" PRId64, pSyncNode->vgId,
×
785
          pSyncNode->assignedCommitIndex, pSyncNode->commitIndex);
786
  }
787

788
  syncNodeRelease(pSyncNode);
2,215✔
789
  TAOS_RETURN(code);
2,215✔
790
}
791

792
int32_t syncUpdateArbTerm(int64_t rid, SyncTerm arbTerm) {
105,103✔
793
  int32_t    code = 0;
105,103✔
794
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
105,103✔
795
  if (pSyncNode == NULL) {
105,103✔
796
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
797
    if (terrno != 0) code = terrno;
×
798
    TAOS_RETURN(code);
×
799
  }
800

801
  pSyncNode->arbTerm = TMAX(arbTerm, pSyncNode->arbTerm);
105,103✔
802
  syncNodeRelease(pSyncNode);
105,103✔
803
  TAOS_RETURN(code);
105,103✔
804
}
805

806
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
237,470,162✔
807
  if (pSyncNode->raftCfg.configIndexCount < 1) {
237,470,162✔
808
    sError("vgId:%d, failed get snapshot config index, configIndexCount:%d", pSyncNode->vgId,
×
809
           pSyncNode->raftCfg.configIndexCount);
810
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
811
    return -2;
×
812
  }
813
  SyncIndex lastIndex = (pSyncNode->raftCfg.configIndexArr)[0];
237,471,455✔
814

815
  for (int32_t i = 0; i < pSyncNode->raftCfg.configIndexCount; ++i) {
481,450,793✔
816
    if ((pSyncNode->raftCfg.configIndexArr)[i] > lastIndex &&
243,979,618✔
817
        (pSyncNode->raftCfg.configIndexArr)[i] <= snapshotLastApplyIndex) {
6,509,471✔
818
      lastIndex = (pSyncNode->raftCfg.configIndexArr)[i];
6,493,060✔
819
    }
820
  }
821
  sTrace("vgId:%d, index:%" PRId64 ", get last snapshot config index:%" PRId64, pSyncNode->vgId, snapshotLastApplyIndex,
237,471,175✔
822
         lastIndex);
823

824
  return lastIndex;
237,471,175✔
825
}
826

827
static SRaftId syncGetRaftIdByEp(SSyncNode* pSyncNode, const SEp* pEp) {
84,699,005✔
828
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
138,070,513✔
829
    if (strcmp(pEp->fqdn, (pSyncNode->peersNodeInfo)[i].nodeFqdn) == 0 &&
87,099,197✔
830
        pEp->port == (pSyncNode->peersNodeInfo)[i].nodePort) {
87,100,428✔
831
      return pSyncNode->peersId[i];
33,730,345✔
832
    }
833
  }
834
  return EMPTY_RAFT_ID;
50,957,025✔
835
}
836

837
static void epsetToString(const SEpSet* pEpSet, char* buffer, size_t bufferSize) {
50,976,853✔
838
  if (pEpSet == NULL || buffer == NULL) {
50,976,853✔
UNCOV
839
    snprintf(buffer, bufferSize, "EpSet is NULL");
×
UNCOV
840
    return;
×
841
  }
842

843
  size_t offset = 0;
50,978,811✔
844
  offset += snprintf(buffer + offset, bufferSize - offset, "EpSet: [");
50,978,811✔
845

846
  for (int i = 0; i < pEpSet->numOfEps; ++i) {
135,676,164✔
847
    if (offset >= bufferSize) break;
84,695,067✔
848
    offset += snprintf(buffer + offset, bufferSize - offset, "%s:%d%s", pEpSet->eps[i].fqdn, pEpSet->eps[i].port,
84,687,836✔
849
                       (i + 1 < pEpSet->numOfEps) ? ", " : "");
84,695,067✔
850
  }
851

852
  if (offset < bufferSize) {
50,990,684✔
853
    snprintf(buffer + offset, bufferSize - offset, "]");
50,988,691✔
854
  }
855
}
856

857
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
50,982,324✔
858
  pEpSet->numOfEps = 0;
50,982,324✔
859

860
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
50,986,024✔
861
  if (pSyncNode == NULL) return;
50,991,227✔
862

863
  int index = -1;
50,991,227✔
864

865
  sDebug("vgId:%d, sync get retry epset, leaderCache:%" PRIx64 ", leaderCacheEp.fqdn:%s, leaderCacheEp.port:%d",
50,991,227✔
866
         pSyncNode->vgId, pSyncNode->leaderCache.addr, pSyncNode->leaderCacheEp.fqdn, pSyncNode->leaderCacheEp.port);
867
  int j = 0;
50,986,114✔
868
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
136,492,174✔
869
    if (pSyncNode->raftCfg.cfg.nodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) continue;
85,513,353✔
870
    SEp* pEp = &pEpSet->eps[j];
84,698,279✔
871
    tstrncpy(pEp->fqdn, pSyncNode->raftCfg.cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
84,700,860✔
872
    pEp->port = (pSyncNode->raftCfg.cfg.nodeInfo)[i].nodePort;
84,701,425✔
873
    pEpSet->numOfEps++;
84,696,773✔
874
    SRaftId id = syncGetRaftIdByEp(pSyncNode, pEp);
84,691,660✔
875
    sDebug("vgId:%d, sync get retry epset, index:%d id:%" PRIx64 " %s:%d", pSyncNode->vgId, i, id.addr, pEp->fqdn,
84,685,757✔
876
           pEp->port);
877
    if (pEp->port == pSyncNode->leaderCacheEp.port &&
84,685,757✔
878
        strncmp(pEp->fqdn, pSyncNode->leaderCacheEp.fqdn, TSDB_FQDN_LEN) == 0 /*&&
39,460,523✔
879
        id.vgId == pSyncNode->leaderCache.vgId && id.addr != 0 && id.vgId != 0*/) {
880
      index = j;
39,458,505✔
881
    }
882
    j++;
84,690,746✔
883
  }
884
  if (pEpSet->numOfEps > 0) {
50,972,203✔
885
    if (index != -1) {
50,975,398✔
886
      pEpSet->inUse = index;
39,462,922✔
887
    } else {
888
      if (pSyncNode->myRaftId.addr == pSyncNode->leaderCache.addr &&
11,512,476✔
889
          pSyncNode->myRaftId.vgId == pSyncNode->leaderCache.vgId) {
689✔
890
        pEpSet->inUse = pSyncNode->raftCfg.cfg.myIndex;
689✔
891
      } else {
892
        pEpSet->inUse = (pSyncNode->raftCfg.cfg.myIndex + 1) % pEpSet->numOfEps;
11,511,871✔
893
      }
894
    }
895
    // pEpSet->inUse = 0;
896
  }
897
  epsetSort(pEpSet);
50,973,315✔
898

899
  char buffer[1024];
50,968,588✔
900
  epsetToString(pEpSet, buffer, sizeof(buffer));
50,977,415✔
901
  sDebug("vgId:%d, sync get retry epset numOfEps:%d %s inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, buffer,
50,984,697✔
902
         pEpSet->inUse);
903
  syncNodeRelease(pSyncNode);
50,984,697✔
904
}
905

906
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
723,993,395✔
907
  int32_t    code = 0;
723,993,395✔
908
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
723,993,395✔
909
  if (pSyncNode == NULL) {
723,995,071✔
910
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
911
    if (terrno != 0) code = terrno;
×
912
    sError("sync propose error");
×
913
    TAOS_RETURN(code);
×
914
  }
915

916
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak, seq);
723,995,071✔
917
  syncNodeRelease(pSyncNode);
723,937,312✔
918
  return ret;
723,938,596✔
919
}
920

921
int32_t syncCheckMember(int64_t rid) {
×
922
  int32_t    code = 0;
×
923
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
924
  if (pSyncNode == NULL) {
×
925
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
926
    if (terrno != 0) code = terrno;
×
927
    sError("sync propose error");
×
928
    TAOS_RETURN(code);
×
929
  }
930

931
  if (pSyncNode->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) {
×
932
    syncNodeRelease(pSyncNode);
×
933
    return TSDB_CODE_SYN_WRONG_ROLE;
×
934
  }
935

936
  syncNodeRelease(pSyncNode);
×
937
  return 0;
×
938
}
939

940
int32_t syncIsCatchUp(int64_t rid) {
2,241,427✔
941
  int32_t    code = 0;
2,241,427✔
942
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
2,241,427✔
943
  if (pSyncNode == NULL) {
2,241,427✔
944
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
945
    if (terrno != 0) code = terrno;
×
946
    sError("sync Node Acquire error since %d", ERRNO);
×
947
    TAOS_RETURN(code);
×
948
  }
949

950
  int32_t isCatchUp = 0;
2,241,427✔
951
  if (pSyncNode->pLogBuf->totalIndex < 0 || pSyncNode->pLogBuf->commitIndex < 0 ||
2,241,427✔
952
      pSyncNode->pLogBuf->totalIndex < pSyncNode->pLogBuf->commitIndex ||
475,140✔
953
      pSyncNode->pLogBuf->totalIndex - pSyncNode->pLogBuf->commitIndex > SYNC_LEARNER_CATCHUP) {
475,140✔
954
    sInfo("vgId:%d, Not catch up, wait one second, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64,
2,128,952✔
955
          pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex,
956
          pSyncNode->pLogBuf->matchIndex);
957
    isCatchUp = 0;
2,128,952✔
958
  } else {
959
    sInfo("vgId:%d, Catch up, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, pSyncNode->vgId,
112,475✔
960
          pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->matchIndex);
961
    isCatchUp = 1;
112,475✔
962
  }
963

964
  syncNodeRelease(pSyncNode);
2,241,427✔
965
  return isCatchUp;
2,241,427✔
966
}
967

968
ESyncRole syncGetRole(int64_t rid) {
2,241,427✔
969
  int32_t    code = 0;
2,241,427✔
970
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
2,241,427✔
971
  if (pSyncNode == NULL) {
2,241,427✔
972
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
973
    if (terrno != 0) code = terrno;
×
974
    sError("sync Node Acquire error since %d", ERRNO);
×
975
    TAOS_RETURN(code);
×
976
  }
977

978
  ESyncRole role = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole;
2,241,427✔
979

980
  syncNodeRelease(pSyncNode);
2,241,427✔
981
  return role;
2,241,427✔
982
}
983

984
int64_t syncGetTerm(int64_t rid) {
10,163,647✔
985
  int32_t    code = 0;
10,163,647✔
986
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
10,163,647✔
987
  if (pSyncNode == NULL) {
10,163,647✔
988
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
989
    if (terrno != 0) code = terrno;
×
990
    sError("sync Node Acquire error since %d", ERRNO);
×
991
    TAOS_RETURN(code);
×
992
  }
993

994
  int64_t term = raftStoreGetTerm(pSyncNode);
10,163,647✔
995

996
  syncNodeRelease(pSyncNode);
10,163,647✔
997
  return term;
10,163,647✔
998
}
999

1000
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
724,434,469✔
1001
  int32_t code = 0;
724,434,469✔
1002
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
724,434,469✔
1003
    code = TSDB_CODE_SYN_NOT_LEADER;
1,791,394✔
1004
    sNWarn(pSyncNode, "sync propose not leader, type:%s", TMSG_INFO(pMsg->msgType));
1,791,394✔
1005
    TAOS_RETURN(code);
1,791,394✔
1006
  }
1007

1008
  if (!pSyncNode->restoreFinish) {
722,654,410✔
1009
    code = TSDB_CODE_SYN_RESTORING;
20,368✔
1010
    sNWarn(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
20,368✔
1011
           TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
1012
    TAOS_RETURN(code);
20,368✔
1013
  }
1014

1015
  // heartbeat timeout
1016
  if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER && syncNodeHeartbeatReplyTimeout(pSyncNode)) {
722,628,378✔
UNCOV
1017
    code = TSDB_CODE_SYN_PROPOSE_NOT_READY;
×
UNCOV
1018
    sNError(pSyncNode, "failed to sync propose since heartbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
×
1019
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
UNCOV
1020
    TAOS_RETURN(code);
×
1021
  }
1022

1023
  // optimized one replica
1024
  if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
722,623,658✔
1025
    SyncIndex retIndex;
641,048,198✔
1026
    int32_t   code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
641,052,873✔
1027
    if (code >= 0) {
640,981,450✔
1028
      pMsg->info.conn.applyIndex = retIndex;
640,981,450✔
1029
      pMsg->info.conn.applyTerm = raftStoreGetTerm(pSyncNode);
641,041,368✔
1030

1031
      // after raft member change, need to handle 1->2 switching point
1032
      // at this point, need to switch entry handling thread
1033
      if (pSyncNode->replicaNum == 1) {
641,052,668✔
1034
        sGDebug(&pMsg->info.traceId, "vgId:%d, index:%" PRId64 ", propose optimized msg type:%s", pSyncNode->vgId,
640,992,155✔
1035
                retIndex, TMSG_INFO(pMsg->msgType));
1036
        return 1;
640,990,337✔
1037
      } else {
1038
        sGDebug(&pMsg->info.traceId,
×
1039
                "vgId:%d, index:%" PRId64 ", propose optimized msg, return to normal, type:%s, handle:%p",
1040
                pSyncNode->vgId, retIndex, TMSG_INFO(pMsg->msgType), pMsg->info.handle);
1041
        return 0;
×
1042
      }
1043
    } else {
1044
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1045
      sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
×
1046
             TMSG_INFO(pMsg->msgType));
1047
      TAOS_RETURN(code);
×
1048
    }
1049
  } else {
1050
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
81,582,917✔
1051
    uint64_t  seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
81,584,832✔
1052
    SRpcMsg   rpcMsg = {.info.traceId = pMsg->info.traceId};
81,584,288✔
1053
    int32_t   code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
81,584,182✔
1054
    if (code != 0) {
81,584,651✔
1055
      sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
×
1056
      code = syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
×
1057
      TAOS_RETURN(code);
×
1058
    }
1059

1060
    sGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, propose msg, type:%s", pSyncNode->vgId, pMsg,
81,584,651✔
1061
            TMSG_INFO(pMsg->msgType));
1062
    code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
81,584,651✔
1063
    if (code != 0) {
81,584,830✔
1064
      sWarn("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
552,121✔
1065
      TAOS_CHECK_RETURN(syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum));
552,408✔
1066
    }
1067

1068
    if (seq != NULL) *seq = seqNum;
81,579,772✔
1069
    TAOS_RETURN(code);
81,579,772✔
1070
  }
1071
}
1072

1073
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
73,620,747✔
1074
  pSyncTimer->pTimer = NULL;
73,620,747✔
1075
  pSyncTimer->counter = 0;
73,621,398✔
1076
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
73,621,075✔
1077
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
73,621,398✔
1078
  pSyncTimer->destId = destId;
73,621,398✔
1079
  pSyncTimer->timeStamp = taosGetTimestampMs();
73,621,574✔
1080
  atomic_store_64(&pSyncTimer->logicClock, 0);
73,621,574✔
1081
  sInfo("vgId:%d, HbTimer init, timerMs:%d for addr:0x%" PRIx64, pSyncNode->vgId, pSyncTimer->timerMS, destId.addr);
73,621,939✔
1082
  return 0;
73,621,157✔
1083
}
1084

1085
static void syncHBSetTimerMS(SSyncTimer* pSyncTimer, int32_t ms) { pSyncTimer->timerMS = ms; }
×
1086

1087
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
1,012,241✔
1088
  int32_t code = 0;
1,012,241✔
1089
  int64_t tsNow = taosGetTimestampMs();
1,012,241✔
1090
  if (syncIsInit()) {
1,012,241✔
1091
    SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid);
1,012,241✔
1092
    if (pData == NULL) {
1,012,241✔
1093
      pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
1,012,224✔
1094
      pData->rid = syncHbTimerDataAdd(pData);
1,012,224✔
1095
    }
1096
    pSyncTimer->hbDataRid = pData->rid;
1,012,241✔
1097
    pSyncTimer->timeStamp = tsNow;
1,012,241✔
1098

1099
    pData->syncNodeRid = pSyncNode->rid;
1,012,241✔
1100
    pData->pTimer = pSyncTimer;
1,012,241✔
1101
    pData->destId = pSyncTimer->destId;
1,012,241✔
1102
    pData->logicClock = pSyncTimer->logicClock;
1,012,241✔
1103
    pData->execTime = tsNow + pSyncTimer->timerMS;
1,012,241✔
1104

1105
    sInfo("vgId:%d, start hb timer, rid:%" PRId64 " addr:0x%" PRIx64 " at %d", pSyncNode->vgId, pData->rid,
1,012,065✔
1106
          pData->destId.addr, pSyncTimer->timerMS);
1107

1108
    bool stopped = taosTmrResetPriority(pSyncTimer->timerCb, pSyncTimer->timerMS, (void*)(pData->rid),
1,012,241✔
1109
                                        syncEnv()->pTimerManager, &pSyncTimer->pTimer, 2);
1,012,065✔
1110
    if (stopped) {
1,012,241✔
1111
      sWarn("vgId:%d, reset hb timer stopped:%d", pSyncNode->vgId, stopped);
17✔
1112
    }
1113
  } else {
1114
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1115
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
×
1116
  }
1117
  return code;
1,012,241✔
1118
}
1119

1120
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
10,756,209✔
1121
  int32_t ret = 0;
10,756,209✔
1122
  (void)atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
10,756,209✔
1123
  bool stop = taosTmrStop(pSyncTimer->pTimer);
10,755,998✔
1124
  sDebug("vgId:%d, stop hb timer stop:%d", pSyncNode->vgId, stop);
10,755,711✔
1125
  pSyncTimer->pTimer = NULL;
10,755,711✔
1126
  syncHbTimerDataRemove(pSyncTimer->hbDataRid);
10,755,494✔
1127
  pSyncTimer->hbDataRid = -1;
10,756,315✔
1128
  return ret;
10,756,426✔
1129
}
1130

1131
int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
4,561,169✔
1132
  int32_t code = 0;
4,561,169✔
1133
  if (pNode->pLogStore == NULL) {
4,561,169✔
1134
    sError("vgId:%d, log store not created", pNode->vgId);
×
1135
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1136
  }
1137
  if (pNode->pFsm == NULL) {
4,561,169✔
1138
    sError("vgId:%d, pFsm not registered", pNode->vgId);
×
1139
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1140
  }
1141
  if (pNode->pFsm->FpGetSnapshotInfo == NULL) {
4,560,858✔
1142
    sError("vgId:%d, FpGetSnapshotInfo not registered", pNode->vgId);
×
1143
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1144
  }
1145
  SSnapshot snapshot = {0};
4,561,169✔
1146
  // TODO check return value
1147
  (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
4,561,169✔
1148

1149
  SyncIndex commitIndex = snapshot.lastApplyIndex;
4,561,169✔
1150
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
4,561,169✔
1151
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
4,561,169✔
1152
  if ((lastVer < commitIndex || firstVer > commitIndex + 1) || pNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
4,561,169✔
1153
    sInfo("vgId:%d, restore log store from snapshot, firstVer:%" PRId64 ", lastVer:%" PRId64 ", commitIndex:%" PRId64,
744✔
1154
          pNode->vgId, firstVer, lastVer, commitIndex);
1155
    if ((code = pNode->pLogStore->syncLogRestoreFromSnapshot(pNode->pLogStore, commitIndex)) != 0) {
744✔
1156
      sError("vgId:%d, failed to restore log store from snapshot since %s. lastVer:%" PRId64 ", snapshotVer:%" PRId64,
×
1157
             pNode->vgId, terrstr(), lastVer, commitIndex);
1158
      TAOS_RETURN(code);
×
1159
    }
1160
  }
1161
  TAOS_RETURN(code);
4,561,169✔
1162
}
1163

1164
// open/close --------------
1165
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion, int32_t electInterval, int32_t heartbeatInterval) {
4,560,495✔
1166
  int32_t    code = 0;
4,560,495✔
1167
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
4,560,495✔
1168
  if (pSyncNode == NULL) {
4,561,169✔
1169
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1170
    goto _error;
×
1171
  }
1172

1173
  if (!taosDirExist((char*)(pSyncInfo->path))) {
4,561,169✔
1174
    if (taosMkDir(pSyncInfo->path) != 0) {
3,322,808✔
1175
      terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1176
      sError("vgId:%d, failed to create dir:%s since %s", pSyncInfo->vgId, pSyncInfo->path, terrstr());
×
1177
      goto _error;
×
1178
    }
1179
  }
1180

1181
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
4,559,663✔
1182
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
4,560,430✔
1183
           TD_DIRSEP);
1184
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
4,560,778✔
1185

1186
  if (!taosCheckExistFile(pSyncNode->configPath)) {
4,560,253✔
1187
    // create a new raft config file
1188
    sInfo("vgId:%d, create a new raft config file", pSyncInfo->vgId);
3,322,631✔
1189
    pSyncNode->vgId = pSyncInfo->vgId;
3,323,370✔
1190
    pSyncNode->mountVgId = pSyncInfo->mountVgId;
3,322,808✔
1191
    pSyncNode->raftCfg.isStandBy = pSyncInfo->isStandBy;
3,322,069✔
1192
    pSyncNode->raftCfg.snapshotStrategy = pSyncInfo->snapshotStrategy;
3,322,808✔
1193
    pSyncNode->raftCfg.lastConfigIndex = pSyncInfo->syncCfg.lastIndex;
3,322,069✔
1194
    pSyncNode->raftCfg.batchSize = pSyncInfo->batchSize;
3,322,808✔
1195
    pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
3,322,808✔
1196
    pSyncNode->raftCfg.configIndexCount = 1;
3,322,808✔
1197
    pSyncNode->raftCfg.configIndexArr[0] = -1;
3,322,808✔
1198

1199
    if ((code = syncWriteCfgFile(pSyncNode, "new")) != 0) {
3,322,808✔
1200
      terrno = code;
×
1201
      sError("vgId:%d, failed to create sync cfg file", pSyncNode->vgId);
×
1202
      goto _error;
×
1203
    }
1204
  } else {
1205
    // update syncCfg by raft_config.json
1206
    if ((code = syncReadCfgFile(pSyncNode)) != 0) {
1,238,361✔
1207
      terrno = code;
×
1208
      sError("vgId:%d, failed to read sync cfg file", pSyncNode->vgId);
×
1209
      goto _error;
×
1210
    }
1211

1212
    if (vnodeVersion > pSyncNode->raftCfg.cfg.changeVersion) {
1,238,361✔
1213
      sInfo("vgId:%d, is going to judge update, in SyncInfo, totalReplicaNum:%d", pSyncNode->vgId,
906,636✔
1214
            pSyncInfo->syncCfg.totalReplicaNum);
1215
      if (pSyncInfo->syncCfg.totalReplicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) {
906,636✔
1216
        sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
812,574✔
1217
        pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
812,574✔
1218
        if ((code = syncWriteCfgFile(pSyncNode, "changed")) != 0) {
812,574✔
1219
          terrno = code;
×
1220
          sError("vgId:%d, failed to write sync cfg file", pSyncNode->vgId);
×
1221
          goto _error;
×
1222
        }
1223
      } else {
1224
        sInfo("vgId:%d, use sync config from sync cfg file", pSyncNode->vgId);
94,062✔
1225
        pSyncInfo->syncCfg = pSyncNode->raftCfg.cfg;
94,062✔
1226
      }
1227
    } else {
1228
      sInfo("vgId:%d, skip save sync cfg file since request ver:%d <= file ver:%d", pSyncNode->vgId, vnodeVersion,
331,725✔
1229
            pSyncInfo->syncCfg.changeVersion);
1230
    }
1231
  }
1232

1233
  // init by SSyncInfo
1234
  pSyncNode->vgId = pSyncInfo->vgId;
4,560,993✔
1235
  pSyncNode->mountVgId = pSyncInfo->mountVgId;
4,560,391✔
1236
  SSyncCfg* pCfg = &pSyncNode->raftCfg.cfg;
4,561,097✔
1237
  bool      updated = false;
4,561,169✔
1238
  sInfo("vgId:%d, start to open sync node, totalReplicaNum:%d replicaNum:%d selfIndex:%d", pSyncNode->vgId,
4,561,169✔
1239
        pCfg->totalReplicaNum, pCfg->replicaNum, pCfg->myIndex);
1240
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
12,118,882✔
1241
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
7,557,713✔
1242
    if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) {
7,557,713✔
1243
      updated = true;
×
1244
    }
1245
    sInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort,
7,557,537✔
1246
          pNode->nodeId, pNode->clusterId);
1247
  }
1248

1249
  if (vnodeVersion > pSyncInfo->syncCfg.changeVersion) {
4,561,169✔
1250
    if (updated) {
440,785✔
1251
      sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId);
×
1252
      if ((code = syncWriteCfgFile(pSyncNode, "updated")) != 0) {
×
1253
        terrno = code;
×
1254
        sError("vgId:%d, failed to write sync cfg file on dnode info updated", pSyncNode->vgId);
×
1255
        goto _error;
×
1256
      }
1257
    }
1258
  }
1259

1260
  pSyncNode->pWal = pSyncInfo->pWal;
4,561,169✔
1261
  pSyncNode->msgcb = pSyncInfo->msgcb;
4,561,169✔
1262
  pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
4,561,169✔
1263
  pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
4,561,169✔
1264
  pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
4,561,169✔
1265

1266
  // create raft log ring buffer
1267
  code = syncLogBufferCreate(&pSyncNode->pLogBuf);
4,561,169✔
1268
  if (pSyncNode->pLogBuf == NULL) {
4,561,169✔
1269
    sError("failed to init sync log buffer since %s. vgId:%d", tstrerror(code), pSyncNode->vgId);
×
1270
    goto _error;
×
1271
  }
1272

1273
  // init replicaNum, replicasId
1274
  pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
4,561,169✔
1275
  pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
4,561,169✔
1276
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
12,118,882✔
1277
    if (syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]) ==
7,557,713✔
1278
        false) {
1279
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
×
1280
      goto _error;
×
1281
    }
1282
  }
1283

1284
  // init internal
1285
  pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
4,561,169✔
1286
  pSyncNode->myRaftId = pSyncNode->replicasId[pSyncNode->raftCfg.cfg.myIndex];
4,561,169✔
1287

1288
  // init peersNum, peers, peersId
1289
  pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
4,561,169✔
1290
  int32_t j = 0;
4,560,583✔
1291
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
12,117,710✔
1292
    if (i != pSyncNode->raftCfg.cfg.myIndex) {
7,557,713✔
1293
      pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
2,996,560✔
1294
      pSyncNode->peersId[j] = pSyncNode->replicasId[i];
2,996,560✔
1295
      syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
2,996,269✔
1296
      j++;
2,996,560✔
1297
    }
1298
  }
1299

1300
  pSyncNode->arbTerm = -1;
4,560,583✔
1301
  (void)taosThreadMutexInit(&pSyncNode->arbTokenMutex, NULL);
4,560,583✔
1302
  syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncInfo->vgId, pSyncNode->arbToken);
4,560,583✔
1303
  sInfo("vgId:%d, generate arb token:%s", pSyncNode->vgId, pSyncNode->arbToken);
4,560,583✔
1304

1305
  // init raft algorithm
1306
  pSyncNode->pFsm = pSyncInfo->pFsm;
4,560,583✔
1307
  pSyncInfo->pFsm = NULL;
4,561,169✔
1308
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
4,561,169✔
1309
  pSyncNode->leaderCache = EMPTY_RAFT_ID;
4,561,169✔
1310
  pSyncNode->leaderCacheEp.port = 0;
4,561,169✔
1311
  pSyncNode->leaderCacheEp.fqdn[0] = '\0';
4,561,169✔
1312

1313
  // init life cycle outside
1314

1315
  // TLA+ Spec
1316
  // InitHistoryVars == /\ elections = {}
1317
  //                    /\ allLogs   = {}
1318
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
1319
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
1320
  //                   /\ state       = [i \in Server |-> Follower]
1321
  //                   /\ votedFor    = [i \in Server |-> Nil]
1322
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
1323
  //                      /\ votesGranted   = [i \in Server |-> {}]
1324
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
1325
  // \* leader does not send itself messages. It's still easier to include these
1326
  // \* in the functions.
1327
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
1328
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
1329
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
1330
  //                /\ commitIndex  = [i \in Server |-> 0]
1331
  // Init == /\ messages = [m \in {} |-> 0]
1332
  //         /\ InitHistoryVars
1333
  //         /\ InitServerVars
1334
  //         /\ InitCandidateVars
1335
  //         /\ InitLeaderVars
1336
  //         /\ InitLogVars
1337
  //
1338

1339
  // init TLA+ server vars
1340
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
4,561,169✔
1341
  pSyncNode->roleTimeMs = taosGetTimestampMs();
4,561,169✔
1342
  if ((code = raftStoreOpen(pSyncNode)) != 0) {
4,561,169✔
1343
    terrno = code;
×
1344
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
×
1345
    goto _error;
×
1346
  }
1347

1348
  // init TLA+ candidate vars
1349
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
4,561,169✔
1350
  if (pSyncNode->pVotesGranted == NULL) {
4,561,169✔
1351
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
×
1352
    goto _error;
×
1353
  }
1354
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
4,561,169✔
1355
  if (pSyncNode->pVotesRespond == NULL) {
4,560,043✔
1356
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
×
1357
    goto _error;
×
1358
  }
1359

1360
  // init TLA+ leader vars
1361
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
4,559,689✔
1362
  if (pSyncNode->pNextIndex == NULL) {
4,556,049✔
1363
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
×
1364
    goto _error;
×
1365
  }
1366
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
4,560,120✔
1367
  if (pSyncNode->pMatchIndex == NULL) {
4,561,169✔
1368
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
×
1369
    goto _error;
×
1370
  }
1371

1372
  // init TLA+ log vars
1373
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
4,560,402✔
1374
  if (pSyncNode->pLogStore == NULL) {
4,561,169✔
1375
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
×
1376
    goto _error;
×
1377
  }
1378

1379
  SyncIndex commitIndex = SYNC_INDEX_INVALID;
4,559,886✔
1380
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
4,559,886✔
1381
    SSnapshot snapshot = {0};
4,560,402✔
1382
    // TODO check return value
1383
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
4,559,981✔
1384
    if (snapshot.lastApplyIndex > commitIndex) {
4,560,047✔
1385
      commitIndex = snapshot.lastApplyIndex;
758,254✔
1386
      sNTrace(pSyncNode, "reset commit index by snapshot");
758,254✔
1387
    }
1388
    pSyncNode->fsmState = snapshot.state;
4,560,047✔
1389
    if (pSyncNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
4,560,224✔
1390
      sError("vgId:%d, fsm state is incomplete.", pSyncNode->vgId);
×
1391
      if (pSyncNode->replicaNum == 1) {
×
1392
        terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1393
        goto _error;
×
1394
      }
1395
    }
1396
  }
1397
  pSyncNode->commitIndex = commitIndex;
4,558,341✔
1398
  sInfo("vgId:%d, sync node commitIndex initialized as %" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
4,559,458✔
1399

1400
  // restore log store on need
1401
  if ((code = syncNodeLogStoreRestoreOnNeed(pSyncNode)) < 0) {
4,562,484✔
1402
    terrno = code;
×
1403
    sError("vgId:%d, failed to restore log store since %s.", pSyncNode->vgId, terrstr());
×
1404
    goto _error;
×
1405
  }
1406

1407
  // timer ms init
1408
  pSyncNode->pingBaseLine = PING_TIMER_MS;
4,561,169✔
1409
  pSyncNode->electBaseLine = electInterval;
4,561,169✔
1410
  pSyncNode->hbBaseLine = heartbeatInterval;
4,561,169✔
1411

1412
  // init ping timer
1413
  pSyncNode->pPingTimer = NULL;
4,561,169✔
1414
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
4,561,169✔
1415
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
4,560,748✔
1416
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
4,561,169✔
1417
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
4,561,169✔
1418
  pSyncNode->pingTimerCounter = 0;
4,561,169✔
1419

1420
  // init elect timer
1421
  pSyncNode->pElectTimer = NULL;
4,560,748✔
1422
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
4,561,169✔
1423
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
4,561,169✔
1424
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
4,561,169✔
1425
  pSyncNode->electTimerCounter = 0;
4,561,169✔
1426

1427
  // init heartbeat timer
1428
  pSyncNode->pHeartbeatTimer = NULL;
4,561,169✔
1429
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
4,561,169✔
1430
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
4,561,169✔
1431
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
4,561,169✔
1432
#ifdef BUILD_NO_CALL
1433
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
1434
#endif
1435
  pSyncNode->heartbeatTimerCounter = 0;
4,561,169✔
1436

1437
  // init peer heartbeat timer
1438
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
72,978,053✔
1439
    if ((code = syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i])) != 0) {
68,416,884✔
1440
      terrno = code;
×
1441
      goto _error;
×
1442
    }
1443
  }
1444

1445
  // tools
1446
  if ((code = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS, &pSyncNode->pSyncRespMgr)) != 0) {
4,561,169✔
1447
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
×
1448
    goto _error;
×
1449
  }
1450
  if (pSyncNode->pSyncRespMgr == NULL) {
4,561,169✔
1451
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
×
1452
    goto _error;
×
1453
  }
1454

1455
  // restore state
1456
  pSyncNode->restoreFinish = false;
4,561,169✔
1457

1458
  // snapshot senders
1459
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
72,970,902✔
1460
    SSyncSnapshotSender* pSender = NULL;
68,409,733✔
1461
    code = snapshotSenderCreate(pSyncNode, i, &pSender);
68,409,657✔
1462
    if (pSender == NULL) return NULL;
68,409,591✔
1463

1464
    pSyncNode->senders[i] = pSender;
68,409,591✔
1465
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
68,409,090✔
1466
  }
1467

1468
  // snapshot receivers
1469
  code = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID, &pSyncNode->pNewNodeReceiver);
4,561,169✔
1470
  if (pSyncNode->pNewNodeReceiver == NULL) return NULL;
4,561,169✔
1471
  sRDebug(pSyncNode->pNewNodeReceiver, "snapshot receiver create while open sync node, data:%p",
4,561,169✔
1472
          pSyncNode->pNewNodeReceiver);
1473

1474
  // is config changing
1475
  pSyncNode->changing = false;
4,561,169✔
1476

1477
  // replication mgr
1478
  if ((code = syncNodeLogReplInit(pSyncNode)) < 0) {
4,561,169✔
1479
    terrno = code;
×
1480
    sError("vgId:%d, failed to init repl mgr since %s.", pSyncNode->vgId, terrstr());
×
1481
    goto _error;
×
1482
  }
1483

1484
  // peer state
1485
  if ((code = syncNodePeerStateInit(pSyncNode)) < 0) {
4,561,169✔
1486
    terrno = code;
×
1487
    sError("vgId:%d, failed to init peer stat since %s.", pSyncNode->vgId, terrstr());
×
1488
    goto _error;
×
1489
  }
1490

1491
  //
1492
  // min match index
1493
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
4,561,169✔
1494

1495
  // start in syncNodeStart
1496
  // start raft
1497

1498
  int64_t timeNow = taosGetTimestampMs();
4,561,169✔
1499
  pSyncNode->startTime = timeNow;
4,561,169✔
1500
  pSyncNode->lastReplicateTime = timeNow;
4,561,169✔
1501

1502
  // snapshotting
1503
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
4,561,169✔
1504

1505
  // init log buffer
1506
  if ((code = syncLogBufferInit(pSyncNode->pLogBuf, pSyncNode)) < 0) {
4,561,169✔
1507
    terrno = code;
×
1508
    sError("vgId:%d, failed to init sync log buffer since %s", pSyncNode->vgId, terrstr());
×
1509
    goto _error;
×
1510
  }
1511

1512
  pSyncNode->isStart = true;
4,561,169✔
1513
  pSyncNode->electNum = 0;
4,561,169✔
1514
  pSyncNode->becomeLeaderNum = 0;
4,561,007✔
1515
  pSyncNode->becomeAssignedLeaderNum = 0;
4,561,007✔
1516
  pSyncNode->configChangeNum = 0;
4,561,169✔
1517
  pSyncNode->hbSlowNum = 0;
4,560,993✔
1518
  pSyncNode->hbrSlowNum = 0;
4,561,169✔
1519
  pSyncNode->tmrRoutineNum = 0;
4,561,169✔
1520

1521
  pSyncNode->snapSeq = -1;
4,561,007✔
1522

1523
  sNInfo(pSyncNode, "sync node opened, node:%p electBaseLine:%d hbBaseLine:%d heartbeatTimeout:%d", pSyncNode,
4,561,169✔
1524
         pSyncNode->electBaseLine, pSyncNode->hbBaseLine, tsHeartbeatTimeout);
1525
  return pSyncNode;
4,561,169✔
1526

UNCOV
1527
_error:
×
UNCOV
1528
  if (pSyncInfo->pFsm) {
×
1529
    taosMemoryFree(pSyncInfo->pFsm);
×
1530
    pSyncInfo->pFsm = NULL;
×
1531
  }
1532
  syncNodeClose(pSyncNode);
×
1533
  pSyncNode = NULL;
×
1534
  return NULL;
×
1535
}
1536

1537
#ifdef BUILD_NO_CALL
1538
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
1539
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
1540
    SSnapshot snapshot = {0};
1541
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
1542
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
1543
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
1544
    }
1545
  }
1546
}
1547
#endif
1548

1549
int32_t syncNodeRestore(SSyncNode* pSyncNode) {
4,561,105✔
1550
  int32_t code = 0;
4,561,105✔
1551
  if (pSyncNode->pLogStore == NULL) {
4,561,105✔
1552
    sError("vgId:%d, log store not created", pSyncNode->vgId);
×
1553
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1554
  }
1555
  if (pSyncNode->pLogBuf == NULL) {
4,561,105✔
1556
    sError("vgId:%d, ring log buffer not created", pSyncNode->vgId);
×
1557
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1558
  }
1559

1560
  (void)taosThreadMutexLock(&pSyncNode->pLogBuf->mutex);
4,561,105✔
1561
  SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
4,561,105✔
1562
  SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore);
4,561,105✔
1563
  SyncIndex endIndex = pSyncNode->pLogBuf->endIndex;
4,561,105✔
1564
  (void)taosThreadMutexUnlock(&pSyncNode->pLogBuf->mutex);
4,561,105✔
1565

1566
  if (lastVer != -1 && endIndex != lastVer + 1) {
4,561,105✔
1567
    code = TSDB_CODE_WAL_LOG_INCOMPLETE;
×
1568
    sWarn("vgId:%d, failed to restore sync node since %s. expected lastLogIndex:%" PRId64 ", lastVer:%" PRId64,
×
1569
          pSyncNode->vgId, terrstr(), endIndex - 1, lastVer);
1570
    // TAOS_RETURN(code);
1571
  }
1572

1573
  // if (endIndex != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
1574
  pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);
4,561,105✔
1575
  sInfo("vgId:%d, restore began, and keep syncing until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
4,561,105✔
1576

1577
  if (pSyncNode->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
9,122,210✔
1578
      (code = syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex, NULL, "restore")) < 0) {
4,561,105✔
1579
    TAOS_RETURN(code);
×
1580
  }
1581

1582
  TAOS_RETURN(code);
4,561,105✔
1583
}
1584

1585
int32_t syncNodeStart(SSyncNode* pSyncNode) {
4,561,105✔
1586
  // start raft
1587
  sInfo("vgId:%d, begin to start sync node", pSyncNode->vgId);
4,561,105✔
1588
  if (pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
4,561,105✔
1589
    syncNodeBecomeLearner(pSyncNode, "first start");
113,860✔
1590
  } else {
1591
    if (pSyncNode->replicaNum == 1) {
4,447,245✔
1592
      raftStoreNextTerm(pSyncNode);
3,185,704✔
1593
      syncNodeBecomeLeader(pSyncNode, "one replica start");
3,185,666✔
1594

1595
      // Raft 3.6.2 Committing entries from previous terms
1596
      TAOS_CHECK_RETURN(syncNodeAppendNoop(pSyncNode));
3,185,704✔
1597
    } else {
1598
      SRaftId id = {0};
1,261,541✔
1599
      syncNodeBecomeFollower(pSyncNode, id, "first start");
1,261,541✔
1600
    }
1601
  }
1602

1603
  int32_t ret = 0;
4,561,092✔
1604
  ret = syncNodeStartPingTimer(pSyncNode);
4,561,092✔
1605
  if (ret != 0) {
4,561,105✔
1606
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, tstrerror(ret));
×
1607
  }
1608
  sInfo("vgId:%d, sync node started", pSyncNode->vgId);
4,561,105✔
1609
  return ret;
4,561,105✔
1610
}
1611

1612
#ifdef BUILD_NO_CALL
1613
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
1614
  // state change
1615
  int32_t code = 0;
1616
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
1617
  pSyncNode->roleTimeMs = taosGetTimestampMs();
1618
  // TODO check return value
1619
  TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1620

1621
  // reset elect timer, long enough
1622
  int32_t electMS = TIMER_MAX_MS;
1623
  code = syncNodeRestartElectTimer(pSyncNode, electMS);
1624
  if (code < 0) {
1625
    sError("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, terrstr());
1626
    return -1;
1627
  }
1628

1629
  code = syncNodeStartPingTimer(pSyncNode);
1630
  if (code < 0) {
1631
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr());
1632
    return -1;
1633
  }
1634
  return code;
1635
}
1636
#endif
1637

1638
void syncNodePreClose(SSyncNode* pSyncNode) {
4,560,518✔
1639
  int32_t code = 0;
4,560,518✔
1640
  if (pSyncNode == NULL) {
4,560,518✔
1641
    sError("failed to pre close sync node since sync node is null");
×
1642
    return;
×
1643
  }
1644
  if (pSyncNode->pFsm == NULL) {
4,560,518✔
1645
    sError("failed to pre close sync node since fsm is null");
×
1646
    return;
×
1647
  }
1648
  if (pSyncNode->pFsm->FpApplyQueueItems == NULL) {
4,560,518✔
1649
    sError("failed to pre close sync node since FpApplyQueueItems is null");
×
1650
    return;
×
1651
  }
1652

1653
  // stop elect timer
1654
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
4,561,105✔
1655
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1656
    return;
×
1657
  }
1658

1659
  // stop heartbeat timer
1660
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
4,561,105✔
1661
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
1662
    return;
×
1663
  }
1664

1665
  // stop ping timer
1666
  if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) {
4,560,301✔
1667
    sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code));
×
1668
    return;
×
1669
  }
1670

1671
  // clean rsp
1672
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
4,560,818✔
1673
}
1674

1675
void syncNodePostClose(SSyncNode* pSyncNode) {
4,120,384✔
1676
  if (pSyncNode->pNewNodeReceiver != NULL) {
4,120,384✔
1677
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
4,120,384✔
1678
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
1679
    }
1680

1681
    sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
4,120,384✔
1682
           pSyncNode->pNewNodeReceiver);
1683
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
4,120,384✔
1684
    pSyncNode->pNewNodeReceiver = NULL;
4,120,384✔
1685
  }
1686
}
4,120,384✔
1687

1688
void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }
1,011,778✔
1689

1690
void syncNodeClose(SSyncNode* pSyncNode) {
4,547,815✔
1691
  int32_t code = 0;
4,547,815✔
1692
  if (pSyncNode == NULL) return;
4,547,815✔
1693
  sNInfo(pSyncNode, "sync node close, node:%p", pSyncNode);
4,547,815✔
1694

1695
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
4,547,815✔
1696

1697
  if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) {
4,547,815✔
1698
    sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code));
×
1699
    return;
×
1700
  }
1701
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
4,547,815✔
1702
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1703
    return;
×
1704
  }
1705
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
4,547,815✔
1706
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
1707
    return;
×
1708
  }
1709
  syncNodeLogReplDestroy(pSyncNode);
4,547,815✔
1710

1711
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
4,547,815✔
1712
  pSyncNode->pSyncRespMgr = NULL;
4,547,815✔
1713
  voteGrantedDestroy(pSyncNode->pVotesGranted);
4,547,815✔
1714
  pSyncNode->pVotesGranted = NULL;
4,547,815✔
1715
  votesRespondDestory(pSyncNode->pVotesRespond);
4,547,815✔
1716
  pSyncNode->pVotesRespond = NULL;
4,547,670✔
1717
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
4,547,670✔
1718
  pSyncNode->pNextIndex = NULL;
4,547,670✔
1719
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
4,547,670✔
1720
  pSyncNode->pMatchIndex = NULL;
4,547,815✔
1721
  logStoreDestory(pSyncNode->pLogStore);
4,547,815✔
1722
  pSyncNode->pLogStore = NULL;
4,547,815✔
1723
  syncLogBufferDestroy(pSyncNode->pLogBuf);
4,547,815✔
1724
  pSyncNode->pLogBuf = NULL;
4,547,815✔
1725

1726
  (void)taosThreadMutexDestroy(&pSyncNode->arbTokenMutex);
4,547,815✔
1727

1728
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
72,762,033✔
1729
    if (pSyncNode->senders[i] != NULL) {
68,214,218✔
1730
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", pSyncNode->vgId, pSyncNode->senders[i]);
68,214,226✔
1731

1732
      if (snapshotSenderIsStart(pSyncNode->senders[i])) {
68,214,226✔
1733
        snapshotSenderStop(pSyncNode->senders[i], false);
395✔
1734
      }
1735

1736
      snapshotSenderDestroy(pSyncNode->senders[i]);
68,214,543✔
1737
      pSyncNode->senders[i] = NULL;
68,212,216✔
1738
    }
1739
  }
1740

1741
  if (pSyncNode->pNewNodeReceiver != NULL) {
4,547,815✔
1742
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
440,721✔
1743
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
1744
    }
1745

1746
    sDebug("vgId:%d, snapshot receiver destroy while close, data:%p", pSyncNode->vgId, pSyncNode->pNewNodeReceiver);
440,721✔
1747
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
440,721✔
1748
    pSyncNode->pNewNodeReceiver = NULL;
440,721✔
1749
  }
1750

1751
  if (pSyncNode->pFsm != NULL) {
4,547,815✔
1752
    taosMemoryFree(pSyncNode->pFsm);
4,547,815✔
1753
  }
1754

1755
  raftStoreClose(pSyncNode);
4,547,815✔
1756

1757
  taosMemoryFree(pSyncNode);
4,547,815✔
1758
}
1759

1760
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->raftCfg.snapshotStrategy; }
×
1761

1762
// timer control --------------
1763
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
4,561,105✔
1764
  int32_t code = 0;
4,561,105✔
1765
  if (syncIsInit()) {
4,561,105✔
1766
    bool stopped = taosTmrResetPriority(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, (void*)pSyncNode->rid,
9,121,607✔
1767
                                        syncEnv()->pTimerManager, &pSyncNode->pPingTimer, 2);
4,561,105✔
1768
    if (stopped) {
4,561,105✔
1769
      sError("vgId:%d, failed to reset ping timer, ms:%d, stopped:%d", pSyncNode->vgId, pSyncNode->pingTimerMS,
×
1770
             stopped);
1771
    }
1772
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
4,561,105✔
1773
  } else {
1774
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
×
1775
  }
1776
  return code;
4,561,105✔
1777
}
1778

1779
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
9,108,116✔
1780
  int32_t code = 0;
9,108,116✔
1781
  (void)atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
9,108,116✔
1782
  bool stop = taosTmrStop(pSyncNode->pPingTimer);
9,108,153✔
1783
  if (!stop) {
9,108,920✔
1784
    sWarn("vgId:%d, failed to stop ping timer, maybe it's already stopped, stop:%d", pSyncNode->vgId, stop);
4,547,815✔
1785
  } else {
1786
    sDebug("vgId:%d, stop ping timer, stop:%d", pSyncNode->vgId, stop);
4,561,105✔
1787
  }
1788
  pSyncNode->pPingTimer = NULL;
9,108,920✔
1789
  return code;
9,108,633✔
1790
}
1791

1792
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
120,712,461✔
1793
  int32_t code = 0;
120,712,461✔
1794
  if (syncIsInit()) {
120,712,461✔
1795
    pSyncNode->electTimerMS = ms;
120,712,461✔
1796

1797
    int64_t execTime = taosGetTimestampMs() + ms;
120,712,461✔
1798
    atomic_store_64(&(pSyncNode->electTimerParam.executeTime), execTime);
120,712,461✔
1799
    atomic_store_64(&(pSyncNode->electTimerParam.logicClock), pSyncNode->electTimerLogicClock);
120,712,461✔
1800
    pSyncNode->electTimerParam.pSyncNode = pSyncNode;
120,712,137✔
1801
    pSyncNode->electTimerParam.pData = NULL;
120,712,137✔
1802

1803
    bool stopped = taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid),
241,423,166✔
1804
                                syncEnv()->pTimerManager, &pSyncNode->pElectTimer);
120,711,599✔
1805
    if (stopped) sWarn("vgId:%d, failed to reset elect timer, ms:%d", pSyncNode->vgId, ms);
120,712,461✔
1806
  } else {
1807
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
×
1808
  }
1809
  return code;
120,712,461✔
1810
}
1811

1812
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
133,447,395✔
1813
  int32_t code = 0;
133,447,395✔
1814
  (void)atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
133,447,395✔
1815
  bool stop = taosTmrStop(pSyncNode->pElectTimer);
133,447,950✔
1816
  sTrace("vgId:%d, stop elect timer, stop:%d", pSyncNode->vgId, stop);
133,446,903✔
1817
  pSyncNode->pElectTimer = NULL;
133,446,903✔
1818

1819
  return code;
133,448,269✔
1820
}
1821

1822
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
120,712,461✔
1823
  int32_t ret = 0;
120,712,461✔
1824
  TAOS_CHECK_RETURN(syncNodeStopElectTimer(pSyncNode));
120,712,461✔
1825
  TAOS_CHECK_RETURN(syncNodeStartElectTimer(pSyncNode, ms));
120,712,461✔
1826
  return ret;
120,712,461✔
1827
}
1828

1829
void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
120,711,497✔
1830
  int32_t code = 0;
120,711,497✔
1831
  int32_t electMS;
1832

1833
  if (pSyncNode->raftCfg.isStandBy) {
120,711,497✔
1834
    electMS = TIMER_MAX_MS;
×
1835
  } else {
1836
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
120,711,836✔
1837
  }
1838

1839
  if ((code = syncNodeRestartElectTimer(pSyncNode, electMS)) != 0) {
120,712,461✔
1840
    sWarn("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1841
    return;
×
1842
  };
1843

1844
  sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
120,712,461✔
1845
          electMS);
1846
}
1847

1848
#ifdef BUILD_NO_CALL
1849
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
1850
  int32_t code = 0;
1851
  if (syncIsInit()) {
1852
    TAOS_CHECK_RETURN(taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, (void*)pSyncNode->rid,
1853
                                   syncEnv()->pTimerManager, &pSyncNode->pHeartbeatTimer));
1854
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
1855
  } else {
1856
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1857
  }
1858

1859
  sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
1860
  return code;
1861
}
1862
#endif
1863

1864
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
3,974,123✔
1865
  int32_t ret = 0;
3,974,123✔
1866

1867
#if 0
1868
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1869
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
1870
#endif
1871

1872
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
4,986,364✔
1873
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
1,012,241✔
1874
    if (pSyncTimer != NULL) {
1,012,241✔
1875
      TAOS_CHECK_RETURN(syncHbTimerStart(pSyncNode, pSyncTimer));
1,012,241✔
1876
    }
1877
  }
1878

1879
  return ret;
3,973,578✔
1880
}
1881

1882
int32_t syncNodeSetHeartbeatTimerMs(SSyncNode* pSyncNode, int32_t ms) {
×
1883
  int32_t code = 0;
×
1884

1885
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
×
1886
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
×
1887
    if (pSyncTimer != NULL) {
×
1888
      syncHBSetTimerMS(pSyncTimer, ms);
×
1889
    }
1890
  }
1891

1892
  return code;
×
1893
}
1894

1895
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
11,622,372✔
1896
  int32_t code = 0;
11,622,372✔
1897

1898
#if 0
1899
  TAOS_CHECK_RETURN(atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1));
1900
  bool stop = taosTmrStop(pSyncNode->pHeartbeatTimer);
1901
  sDebug("vgId:%d, stop heartbeat timer, stop:%d", pSyncNode->vgId, stop);
1902
  pSyncNode->pHeartbeatTimer = NULL;
1903
#endif
1904

1905
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
22,378,798✔
1906
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
10,755,922✔
1907
    if (pSyncTimer != NULL) {
10,756,135✔
1908
      TAOS_CHECK_RETURN(syncHbTimerStop(pSyncNode, pSyncTimer));
10,756,135✔
1909
    }
1910
  }
1911

1912
  return code;
11,623,880✔
1913
}
1914

1915
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode, int32_t heartbeatInterval) {
×
1916
  int32_t code = 0;
×
1917
  sInfo("vgId:%d, sync Node Restart HeartbeatTimer, state=%d", pSyncNode->vgId, pSyncNode->state);
×
1918
  TAOS_CHECK_RETURN(syncNodeSetHeartbeatTimerMs(pSyncNode, heartbeatInterval));
×
1919
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
×
1920
    TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
×
1921
    TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
×
1922
  }
1923

1924
  return 0;
×
1925
}
1926

1927
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pNode, SRpcMsg* pMsg) {
265,824,264✔
1928
  SEpSet* epSet = NULL;
265,824,264✔
1929
  for (int32_t i = 0; i < pNode->peersNum; ++i) {
402,097,094✔
1930
    if (destRaftId->addr == pNode->peersId[i].addr) {
402,087,721✔
1931
      epSet = &pNode->peersEpset[i];
265,817,531✔
1932
      break;
265,817,321✔
1933
    }
1934
  }
1935

1936
  int32_t code = -1;
265,826,846✔
1937
  if (pNode->syncSendMSg != NULL && epSet != NULL) {
265,826,846✔
1938
    syncUtilMsgHtoN(pMsg->pCont);
265,817,717✔
1939
    pMsg->info.noResp = 1;
265,811,315✔
1940
    code = pNode->syncSendMSg(epSet, pMsg);
265,814,180✔
1941
  }
1942

1943
  if (code < 0) {
265,828,774✔
1944
    sError("vgId:%d, failed to send sync msg since %s. epset:%p dnode:%d addr:0x%" PRIx64, pNode->vgId, tstrerror(code),
7,387✔
1945
           epSet, DID(destRaftId), destRaftId->addr);
1946
    rpcFreeCont(pMsg->pCont);
7,387✔
1947
    pMsg->pCont = NULL;
7,387✔
1948
  }
1949

1950
  TAOS_RETURN(code);
265,828,774✔
1951
}
1952

1953
inline bool syncNodeInConfig(SSyncNode* pNode, const SSyncCfg* pCfg) {
471,370✔
1954
  bool b1 = false;
471,370✔
1955
  bool b2 = false;
471,370✔
1956

1957
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
544,564✔
1958
    if (strcmp(pCfg->nodeInfo[i].nodeFqdn, pNode->myNodeInfo.nodeFqdn) == 0 &&
544,564✔
1959
        pCfg->nodeInfo[i].nodePort == pNode->myNodeInfo.nodePort) {
544,564✔
1960
      b1 = true;
471,370✔
1961
      break;
471,370✔
1962
    }
1963
  }
1964

1965
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
544,564✔
1966
    SRaftId raftId = {
544,564✔
1967
        .addr = SYNC_ADDR(&pCfg->nodeInfo[i]),
544,564✔
1968
        .vgId = pNode->vgId,
544,564✔
1969
    };
1970

1971
    if (syncUtilSameId(&raftId, &pNode->myRaftId)) {
544,564✔
1972
      b2 = true;
471,370✔
1973
      break;
471,370✔
1974
    }
1975
  }
1976

1977
  if (b1 != b2) {
471,370✔
1978
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1979
    return false;
×
1980
  }
1981
  return b1;
471,370✔
1982
}
1983

1984
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
1,190,596✔
1985
  if (pOldCfg->totalReplicaNum != pNewCfg->totalReplicaNum) return true;
1,190,596✔
1986
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
655,872✔
1987
  for (int32_t i = 0; i < pOldCfg->totalReplicaNum; ++i) {
1,729,846✔
1988
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
1,398,498✔
1989
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
1,398,498✔
1990
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
1,398,498✔
1991
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
1,398,498✔
1992
    if (pOldInfo->nodeRole != pNewInfo->nodeRole) return true;
1,398,498✔
1993
  }
1994

1995
  return false;
331,348✔
1996
}
1997

1998
int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
378,022✔
1999
  int32_t  code = 0;
378,022✔
2000
  SSyncCfg oldConfig = pSyncNode->raftCfg.cfg;
378,022✔
2001
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
378,022✔
2002
    sInfo("vgId:1, sync not reconfig since not changed");
331,348✔
2003
    return 0;
331,348✔
2004
  }
2005

2006
  pSyncNode->raftCfg.cfg = *pNewConfig;
46,674✔
2007
  pSyncNode->raftCfg.lastConfigIndex = lastConfigChangeIndex;
46,674✔
2008

2009
  pSyncNode->configChangeNum++;
46,674✔
2010

2011
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
46,674✔
2012
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
46,674✔
2013

2014
  bool isDrop = false;
46,674✔
2015
  bool isAdd = false;
46,674✔
2016

2017
  if (IamInOld && !IamInNew) {
46,674✔
2018
    isDrop = true;
×
2019
  } else {
2020
    isDrop = false;
46,674✔
2021
  }
2022

2023
  if (!IamInOld && IamInNew) {
46,674✔
2024
    isAdd = true;
×
2025
  } else {
2026
    isAdd = false;
46,674✔
2027
  }
2028

2029
  // log begin config change
2030
  sNInfo(pSyncNode, "begin do config change, from %d to %d, from %" PRId64 " to %" PRId64 ", replicas:%d",
46,674✔
2031
         pSyncNode->vgId, oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum, oldConfig.lastIndex,
2032
         pNewConfig->lastIndex);
2033

2034
  if (IamInNew) {
46,674✔
2035
    pSyncNode->raftCfg.isStandBy = 0;  // change isStandBy to normal
46,674✔
2036
  }
2037
  if (isDrop) {
46,674✔
2038
    pSyncNode->raftCfg.isStandBy = 1;  // set standby
×
2039
  }
2040

2041
  // add last config index
2042
  SRaftCfg* pCfg = &pSyncNode->raftCfg;
46,674✔
2043
  if (pCfg->configIndexCount >= MAX_CONFIG_INDEX_COUNT) {
46,674✔
2044
    sNError(pSyncNode, "failed to add cfg index:%d since out of range", pCfg->configIndexCount);
×
2045
    terrno = TSDB_CODE_OUT_OF_RANGE;
×
2046
    return -1;
×
2047
  }
2048

2049
  pCfg->configIndexArr[pCfg->configIndexCount] = lastConfigChangeIndex;
46,674✔
2050
  pCfg->configIndexCount++;
46,674✔
2051

2052
  if (IamInNew) {
46,674✔
2053
    //-----------------------------------------
2054
    int32_t ret = 0;
46,674✔
2055

2056
    // save snapshot senders
2057
    SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
46,674✔
2058
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
46,674✔
2059
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
46,674✔
2060
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
746,784✔
2061
      oldSenders[i] = pSyncNode->senders[i];
700,110✔
2062
      sSTrace(oldSenders[i], "snapshot sender save old");
700,110✔
2063
    }
2064

2065
    // init internal
2066
    pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
46,674✔
2067
    if (syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId) == false) return terrno;
46,674✔
2068

2069
    // init peersNum, peers, peersId
2070
    pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
46,674✔
2071
    int32_t j = 0;
46,674✔
2072
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
169,826✔
2073
      if (i != pSyncNode->raftCfg.cfg.myIndex) {
123,152✔
2074
        pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
76,478✔
2075
        syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
76,478✔
2076
        j++;
76,478✔
2077
      }
2078
    }
2079
    for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
123,152✔
2080
      if (syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]) == false)
76,478✔
2081
        return terrno;
×
2082
    }
2083

2084
    // init replicaNum, replicasId
2085
    pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
46,674✔
2086
    pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
46,674✔
2087
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
169,826✔
2088
      if (syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]) ==
123,152✔
2089
          false)
2090
        return terrno;
×
2091
    }
2092

2093
    // update quorum first
2094
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
46,674✔
2095

2096
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
46,674✔
2097
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
46,674✔
2098
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
46,674✔
2099
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
46,674✔
2100

2101
    // reset snapshot senders
2102

2103
    // clear new
2104
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
746,784✔
2105
      pSyncNode->senders[i] = NULL;
700,110✔
2106
    }
2107

2108
    // reset new
2109
    for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
169,826✔
2110
      // reset sender
2111
      bool reset = false;
123,152✔
2112
      for (int32_t j = 0; j < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++j) {
532,808✔
2113
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j]) && oldSenders[j] != NULL) {
510,179✔
2114
          sNTrace(pSyncNode, "snapshot sender reset for:%" PRId64 ", newIndex:%d, dnode:%d, %p",
100,523✔
2115
                  (pSyncNode->replicasId)[i].addr, i, DID(&pSyncNode->replicasId[i]), oldSenders[j]);
2116

2117
          pSyncNode->senders[i] = oldSenders[j];
100,523✔
2118
          oldSenders[j] = NULL;
100,523✔
2119
          reset = true;
100,523✔
2120

2121
          // reset replicaIndex
2122
          int32_t oldreplicaIndex = pSyncNode->senders[i]->replicaIndex;
100,523✔
2123
          pSyncNode->senders[i]->replicaIndex = i;
100,523✔
2124

2125
          sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, dnode:%d, %p, reset:%d",
100,523✔
2126
                  oldreplicaIndex, i, DID(&pSyncNode->replicasId[i]), pSyncNode->senders[i], reset);
2127

2128
          break;
100,523✔
2129
        }
2130
      }
2131
    }
2132

2133
    // create new
2134
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
746,784✔
2135
      if (pSyncNode->senders[i] == NULL) {
700,110✔
2136
        TAOS_CHECK_RETURN(snapshotSenderCreate(pSyncNode, i, &pSyncNode->senders[i]));
599,587✔
2137
        if (pSyncNode->senders[i] == NULL) {
599,587✔
2138
          // will be created later while send snapshot
2139
          sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig");
×
2140
        } else {
2141
          sSDebug(pSyncNode->senders[i], "snapshot sender create while reconfig, data:%p", pSyncNode->senders[i]);
599,587✔
2142
        }
2143
      } else {
2144
        sSDebug(pSyncNode->senders[i], "snapshot sender already exist, data:%p", pSyncNode->senders[i]);
100,523✔
2145
      }
2146
    }
2147

2148
    // free old
2149
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
746,784✔
2150
      if (oldSenders[i] != NULL) {
700,110✔
2151
        sSDebug(oldSenders[i], "snapshot sender destroy old, data:%p replica-index:%d", oldSenders[i], i);
599,587✔
2152
        snapshotSenderDestroy(oldSenders[i]);
599,587✔
2153
        oldSenders[i] = NULL;
599,587✔
2154
      }
2155
    }
2156

2157
    // persist cfg
2158
    TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode, "config_change_with_new_members"));
46,674✔
2159
  } else {
2160
    // persist cfg
2161
    TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode, "config_change_no_new_members"));
×
2162
    sNInfo(pSyncNode, "do not config change from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
×
2163
  }
2164

2165
_END:
×
2166
  // log end config change
2167
  sNInfo(pSyncNode, "end do config change, from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
46,674✔
2168
  return 0;
46,674✔
2169
}
2170

2171
// raft state change --------------
2172
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
237,478✔
2173
  if (term > raftStoreGetTerm(pSyncNode)) {
237,478✔
2174
    raftStoreSetTerm(pSyncNode, term);
×
2175
  }
2176
}
237,478✔
2177

2178
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm, SRaftId id, char* strFrom) {
92,938,045✔
2179
  SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
92,938,045✔
2180
  if (currentTerm > newTerm) {
92,938,045✔
2181
    sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
×
2182
    return;
×
2183
  }
2184

2185
  do {
2186
    sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
92,938,045✔
2187
  } while (0);
2188

2189
  if (pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
92,938,045✔
2190
    (void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
×
2191
    syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncNode->vgId, pSyncNode->arbToken);
×
2192
    sInfo("vgId:%d, generate arb token, will step down from assigned leader, new arbToken:%s", pSyncNode->vgId,
×
2193
          pSyncNode->arbToken);
2194
    (void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
×
2195
  }
2196

2197
  if (currentTerm < newTerm) {
92,938,045✔
2198
    raftStoreSetTerm(pSyncNode, newTerm);
893,003✔
2199
    char tmpBuf[64];
892,969✔
2200
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64 " from %" PRId64 ", since %s", newTerm,
893,003✔
2201
             currentTerm, strFrom);
2202
    syncNodeBecomeFollower(pSyncNode, id, tmpBuf);
893,003✔
2203
    raftStoreClearVote(pSyncNode);
893,003✔
2204
  } else {
2205
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
92,045,042✔
2206
      char tmpBuf[64];
9,369✔
2207
      snprintf(tmpBuf, sizeof(tmpBuf), "step down, with same term to %" PRId64 " from %" PRId64 ", since %s", newTerm, 
9,369✔
2208
               currentTerm, strFrom);
2209
      syncNodeBecomeFollower(pSyncNode, id, tmpBuf);
9,369✔
2210
    }
2211
  }
2212
}
2213

2214
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }
2,168,681✔
2215

2216
void syncNodeBecomeFollower(SSyncNode* pSyncNode, SRaftId leaderId, const char* debugStr) {
2,167,835✔
2217
  int32_t code = 0;  // maybe clear leader cache
2,167,835✔
2218
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
2,167,835✔
2219
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
6,437✔
2220
    pSyncNode->leaderCacheEp.port = 0;
6,437✔
2221
    pSyncNode->leaderCacheEp.fqdn[0] = '\0';
6,437✔
2222
  }
2223

2224
  pSyncNode->hbSlowNum = 0;
2,168,681✔
2225

2226
  pSyncNode->leaderCache = leaderId;  // state change
2,168,681✔
2227

2228
  for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
6,977,931✔
2229
    if (syncUtilSameId(&pSyncNode->replicasId[i], &leaderId)) {
5,711,622✔
2230
      pSyncNode->leaderCacheEp.port = pSyncNode->raftCfg.cfg.nodeInfo[i].nodePort;
902,372✔
2231
      strncpy(pSyncNode->leaderCacheEp.fqdn, pSyncNode->raftCfg.cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
902,372✔
2232
      break;
902,372✔
2233
    }
2234
  }
2235
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
2,168,681✔
2236
  pSyncNode->roleTimeMs = taosGetTimestampMs();
2,168,681✔
2237
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
2,168,681✔
2238
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2239
    return;
×
2240
  }
2241

2242
  // trace log
2243
  sNTrace(pSyncNode, "become follower %s", debugStr);
2,168,681✔
2244

2245
  // send rsp to client
2246
  syncNodeLeaderChangeRsp(pSyncNode);
2,168,681✔
2247

2248
  // call back
2249
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
2,168,681✔
2250
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
2,168,292✔
2251
  }
2252

2253
  // min match index
2254
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
2,169,070✔
2255

2256
  // reset log buffer
2257
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
2,168,681✔
2258
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2259
    return;
×
2260
  }
2261

2262
  // reset elect timer
2263
  syncNodeResetElectTimer(pSyncNode);
2,168,681✔
2264

2265
  sInfo("vgId:%d, become follower. %s", pSyncNode->vgId, debugStr);
2,168,681✔
2266
}
2267

2268
void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) {
113,860✔
2269
  pSyncNode->hbSlowNum = 0;
113,860✔
2270

2271
  // state change
2272
  pSyncNode->state = TAOS_SYNC_STATE_LEARNER;
113,860✔
2273
  pSyncNode->roleTimeMs = taosGetTimestampMs();
113,860✔
2274

2275
  // trace log
2276
  sNTrace(pSyncNode, "become learner %s", debugStr);
113,860✔
2277

2278
  // call back
2279
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLearnerCb != NULL) {
113,860✔
2280
    pSyncNode->pFsm->FpBecomeLearnerCb(pSyncNode->pFsm);
113,860✔
2281
  }
2282

2283
  // min match index
2284
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
113,860✔
2285

2286
  // reset log buffer
2287
  int32_t code = 0;
113,860✔
2288
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
113,860✔
2289
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2290
    return;
×
2291
  };
2292
}
2293

2294
// TLA+ Spec
2295
// \* Candidate i transitions to leader.
2296
// BecomeLeader(i) ==
2297
//     /\ state[i] = Candidate
2298
//     /\ votesGranted[i] \in Quorum
2299
//     /\ state'      = [state EXCEPT ![i] = Leader]
2300
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
2301
//                          [j \in Server |-> Len(log[i]) + 1]]
2302
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
2303
//                          [j \in Server |-> 0]]
2304
//     /\ elections'  = elections \cup
2305
//                          {[eterm     |-> currentTerm[i],
2306
//                            eleader   |-> i,
2307
//                            elog      |-> log[i],
2308
//                            evotes    |-> votesGranted[i],
2309
//                            evoterLog |-> voterLog[i]]}
2310
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
2311
//
2312
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
3,627,799✔
2313
  int32_t code = 0;
3,627,799✔
2314
  pSyncNode->becomeLeaderNum++;
3,627,799✔
2315
  pSyncNode->hbrSlowNum = 0;
3,627,032✔
2316

2317
  // reset restoreFinish
2318
  pSyncNode->restoreFinish = false;
3,627,070✔
2319

2320
  // state change
2321
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
3,626,547✔
2322
  pSyncNode->roleTimeMs = taosGetTimestampMs();
3,627,799✔
2323

2324
  // set leader cache
2325
  pSyncNode->leaderCache = pSyncNode->myRaftId;
3,626,938✔
2326
  strncpy(pSyncNode->leaderCacheEp.fqdn, pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeFqdn,
3,627,435✔
2327
          TSDB_FQDN_LEN);
2328
  pSyncNode->leaderCacheEp.port = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodePort;
3,627,165✔
2329

2330
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
8,120,240✔
2331
    SyncIndex lastIndex;
4,496,427✔
2332
    SyncTerm  lastTerm;
4,496,185✔
2333
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
4,496,937✔
2334
    if (code != 0) {
4,493,946✔
2335
      sError("vgId:%d, failed to become leader since %s", pSyncNode->vgId, tstrerror(code));
×
2336
      return;
×
2337
    }
2338
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
4,493,946✔
2339
  }
2340

2341
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
8,118,277✔
2342
    // maybe overwrite myself, no harm
2343
    // just do it!
2344
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
4,495,929✔
2345
  }
2346

2347
  // init peer mgr
2348
  if ((code = syncNodePeerStateInit(pSyncNode)) != 0) {
3,625,928✔
2349
    sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code));
×
2350
    return;
×
2351
  }
2352

2353
#if 0
2354
  // update sender private term
2355
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
2356
  if (pMySender != NULL) {
2357
    for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
2358
      if (pSyncNode->senders[i]->privateTerm > pMySender->privateTerm) {
2359
        pMySender->privateTerm = pSyncNode->senders[i]->privateTerm;
2360
      }
2361
    }
2362
    (pMySender->privateTerm) += 100;
2363
  }
2364
#endif
2365

2366
  // close receiver
2367
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
3,626,926✔
2368
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
2369
  }
2370

2371
  // stop elect timer
2372
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
3,627,799✔
2373
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
2374
    return;
×
2375
  }
2376

2377
  // start heartbeat timer
2378
  if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) {
3,626,724✔
2379
    sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2380
    return;
×
2381
  }
2382

2383
  // send heartbeat right now
2384
  if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) {
3,627,028✔
2385
    sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code));
×
2386
    return;
×
2387
  }
2388

2389
  // call back
2390
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
3,625,699✔
2391
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
3,625,959✔
2392
  }
2393

2394
  // min match index
2395
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
3,627,823✔
2396

2397
  // reset log buffer
2398
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
3,627,837✔
2399
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2400
    return;
×
2401
  }
2402

2403
  // trace log
2404
  sNInfo(pSyncNode, "node become leader, %s", debugStr);
3,627,837✔
2405
}
2406

2407
void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) {
17✔
2408
  int32_t code = 0;
17✔
2409
  pSyncNode->becomeAssignedLeaderNum++;
17✔
2410
  pSyncNode->hbrSlowNum = 0;
17✔
2411

2412
  // reset restoreFinish
2413
  // pSyncNode->restoreFinish = false;
2414

2415
  // state change
2416
  pSyncNode->state = TAOS_SYNC_STATE_ASSIGNED_LEADER;
17✔
2417
  pSyncNode->roleTimeMs = taosGetTimestampMs();
17✔
2418

2419
  // set leader cache
2420
  pSyncNode->leaderCache = pSyncNode->myRaftId;
17✔
2421

2422
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
51✔
2423
    SyncIndex lastIndex;
×
2424
    SyncTerm  lastTerm;
×
2425
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
34✔
2426
    if (code != 0) {
34✔
2427
      sError("vgId:%d, failed to become assigned leader since %s", pSyncNode->vgId, tstrerror(code));
×
2428
      return;
×
2429
    }
2430
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
34✔
2431
  }
2432

2433
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
51✔
2434
    // maybe overwrite myself, no harm
2435
    // just do it!
2436
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
34✔
2437
  }
2438

2439
  // init peer mgr
2440
  if ((code = syncNodePeerStateInit(pSyncNode)) != 0) {
17✔
2441
    sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code));
×
2442
    return;
×
2443
  }
2444

2445
  // close receiver
2446
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
17✔
2447
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
2448
  }
2449

2450
  // stop elect timer
2451
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
17✔
2452
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
2453
    return;
×
2454
  }
2455

2456
  // start heartbeat timer
2457
  if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) {
17✔
2458
    sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2459
    return;
×
2460
  }
2461

2462
  // send heartbeat right now
2463
  if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) {
17✔
2464
    sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code));
×
2465
    return;
×
2466
  }
2467

2468
  // call back
2469
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeAssignedLeaderCb != NULL) {
17✔
2470
    pSyncNode->pFsm->FpBecomeAssignedLeaderCb(pSyncNode->pFsm);
17✔
2471
  }
2472

2473
  // min match index
2474
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
17✔
2475

2476
  // reset log buffer
2477
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
17✔
2478
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2479
    return;
×
2480
  }
2481

2482
  // trace log
2483
  sNInfo(pSyncNode, "become assigned leader");
17✔
2484
}
2485

2486
void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
442,133✔
2487
  if (pSyncNode->state != TAOS_SYNC_STATE_CANDIDATE) {
442,133✔
2488
    sError("vgId:%d, failed leader from candidate since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state);
×
2489
    return;
×
2490
  }
2491
  bool granted = voteGrantedMajority(pSyncNode->pVotesGranted);
442,133✔
2492
  if (!granted) {
442,133✔
2493
    sError("vgId:%d, not granted by majority.", pSyncNode->vgId);
×
2494
    return;
×
2495
  }
2496
  syncNodeBecomeLeader(pSyncNode, "from candidate to leader");
442,133✔
2497

2498
  sNTrace(pSyncNode, "state change syncNodeCandidate2Leader");
442,133✔
2499

2500
  int32_t ret = syncNodeAppendNoop(pSyncNode);
442,133✔
2501
  if (ret < 0) {
442,133✔
2502
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, terrstr());
×
2503
  }
2504

2505
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
442,133✔
2506

2507
  sInfo("vgId:%d, become leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64, pSyncNode->vgId,
442,133✔
2508
        raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
2509
}
2510

2511
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }
73,291,702✔
2512

2513
int32_t syncSetElectBaseline(int64_t rid, int32_t ms){
28,490✔
2514
  int32_t code = 0;
28,490✔
2515
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
28,490✔
2516
  if (pSyncNode == NULL) {
28,490✔
2517
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
2518
    if (terrno != 0) code = terrno;
×
2519
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
2520
    TAOS_RETURN(code);
×
2521
  }
2522
  pSyncNode->electBaseLine = ms;
28,490✔
2523
  syncNodeResetElectTimer(pSyncNode);
28,490✔
2524
  return code;
28,490✔
2525
}
2526

2527
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
8,186,130✔
2528
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
130,952,370✔
2529
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
122,764,152✔
2530
    pSyncNode->peerStates[i].lastSendTime = 0;
122,768,415✔
2531
  }
2532

2533
  return 0;
8,188,218✔
2534
}
2535

2536
void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
512,606✔
2537
  if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
512,606✔
2538
    sError("vgId:%d, failed candidate from follower since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state);
×
2539
    return;
×
2540
  }
2541
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
512,606✔
2542
  pSyncNode->roleTimeMs = taosGetTimestampMs();
512,606✔
2543
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
512,606✔
2544
  sInfo("vgId:%d, become candidate from follower. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
512,606✔
2545
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
2546

2547
  sNTrace(pSyncNode, "follower to candidate");
512,606✔
2548
}
2549

2550
int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) {
×
2551
  if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) return TSDB_CODE_SYN_INTERNAL_ERROR;
×
2552
  syncNodeBecomeLeader(pSyncNode, "assigned leader to leader");
×
2553

2554
  sNTrace(pSyncNode, "assigned leader to leader");
×
2555

2556
  int32_t ret = syncNodeAppendNoop(pSyncNode);
×
2557
  if (ret < 0) {
×
2558
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, tstrerror(ret));
×
2559
  }
2560

2561
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
×
2562
  sInfo("vgId:%d, become leader from assigned leader. term:%" PRId64 ", commit index:%" PRId64
×
2563
        "assigned commit index:%" PRId64 ", last index:%" PRId64,
2564
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, pSyncNode->assignedCommitIndex,
2565
        lastIndex);
2566
  return 0;
×
2567
}
2568

2569
// just called by syncNodeVoteForSelf
2570
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
555,637✔
2571
  SyncTerm storeTerm = raftStoreGetTerm(pSyncNode);
555,637✔
2572
  if (term != storeTerm) {
555,637✔
2573
    sError("vgId:%d, failed to vote for term, term:%" PRId64 ", storeTerm:%" PRId64, pSyncNode->vgId, term, storeTerm);
×
2574
    return;
×
2575
  }
2576
  sTrace("vgId:%d, begin hasVoted", pSyncNode->vgId);
555,637✔
2577
  bool voted = raftStoreHasVoted(pSyncNode);
555,637✔
2578
  if (voted) {
555,637✔
2579
    sError("vgId:%d, failed to vote for term since not voted", pSyncNode->vgId);
×
2580
    return;
×
2581
  }
2582

2583
  raftStoreVote(pSyncNode, pRaftId);
555,637✔
2584
}
2585

2586
// simulate get vote from outside
2587
void syncNodeVoteForSelf(SSyncNode* pSyncNode, SyncTerm currentTerm) {
555,637✔
2588
  syncNodeVoteForTerm(pSyncNode, currentTerm, &pSyncNode->myRaftId);
555,637✔
2589

2590
  SRpcMsg rpcMsg = {0};
555,637✔
2591
  int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
555,637✔
2592
  if (ret != 0) return;
555,637✔
2593

2594
  SyncRequestVoteReply* pMsg = rpcMsg.pCont;
555,637✔
2595
  pMsg->srcId = pSyncNode->myRaftId;
555,637✔
2596
  pMsg->destId = pSyncNode->myRaftId;
555,637✔
2597
  pMsg->term = currentTerm;
555,637✔
2598
  pMsg->voteGranted = true;
555,637✔
2599

2600
  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
555,637✔
2601
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
555,637✔
2602
  rpcFreeCont(rpcMsg.pCont);
555,637✔
2603
}
2604

2605
// return if has a snapshot
2606
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
6,490,237✔
2607
  bool      ret = false;
6,490,237✔
2608
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
6,490,237✔
2609
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
6,490,275✔
2610
    // TODO check return value
2611
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
6,491,818✔
2612
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
6,491,874✔
2613
      ret = true;
1,423,303✔
2614
    }
2615
  }
2616
  return ret;
6,491,479✔
2617
}
2618

2619
// return max(logLastIndex, snapshotLastIndex)
2620
// if no snapshot and log, return -1
2621
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
6,512,076✔
2622
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
6,512,076✔
2623
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
6,512,280✔
2624
    // TODO check return value
2625
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
6,510,905✔
2626
  }
2627
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
6,511,653✔
2628

2629
  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
6,511,677✔
2630
  return lastIndex;
6,511,677✔
2631
}
2632

2633
// return the last term of snapshot and log
2634
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
2635
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
6,491,627✔
2636
  SyncTerm lastTerm = 0;
6,491,627✔
2637
  if (syncNodeHasSnapshot(pSyncNode)) {
6,491,627✔
2638
    // has snapshot
2639
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1,423,303✔
2640
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
1,423,303✔
2641
      // TODO check return value
2642
      (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
1,423,303✔
2643
    }
2644

2645
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
1,423,303✔
2646
    if (logLastIndex > snapshot.lastApplyIndex) {
1,422,980✔
2647
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
1,022,127✔
2648
    } else {
2649
      lastTerm = snapshot.lastApplyTerm;
400,853✔
2650
    }
2651

2652
  } else {
2653
    // no snapshot
2654
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
5,068,571✔
2655
  }
2656

2657
  return lastTerm;
6,488,576✔
2658
}
2659

2660
// get last index and term along with snapshot
2661
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
5,549,732✔
2662
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
5,549,732✔
2663
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
5,549,854✔
2664
  return 0;
5,547,163✔
2665
}
2666

2667
#ifdef BUILD_NO_CALL
2668
// return append-entries first try index
2669
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
2670
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
2671
  return syncStartIndex;
2672
}
2673

2674
// if index > 0, return index - 1
2675
// else, return -1
2676
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
2677
  SyncIndex preIndex = index - 1;
2678
  if (preIndex < SYNC_INDEX_INVALID) {
2679
    preIndex = SYNC_INDEX_INVALID;
2680
  }
2681

2682
  return preIndex;
2683
}
2684

2685
// if index < 0, return SYNC_TERM_INVALID
2686
// if index == 0, return 0
2687
// if index > 0, return preTerm
2688
// if error, return SYNC_TERM_INVALID
2689
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
2690
  if (index < SYNC_INDEX_BEGIN) {
2691
    return SYNC_TERM_INVALID;
2692
  }
2693

2694
  if (index == SYNC_INDEX_BEGIN) {
2695
    return 0;
2696
  }
2697

2698
  SyncTerm  preTerm = 0;
2699
  SyncIndex preIndex = index - 1;
2700

2701
  SSyncRaftEntry* pPreEntry = NULL;
2702
  SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
2703
  LRUHandle*      h = taosLRUCacheLookup(pCache, &preIndex, sizeof(preIndex));
2704
  int32_t         code = 0;
2705
  if (h) {
2706
    pPreEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
2707
    code = 0;
2708

2709
    pSyncNode->pLogStore->cacheHit++;
2710
    sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", preIndex, pPreEntry->bytes, pPreEntry);
2711

2712
  } else {
2713
    pSyncNode->pLogStore->cacheMiss++;
2714
    sNTrace(pSyncNode, "miss cache index:%" PRId64, preIndex);
2715

2716
    code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
2717
  }
2718

2719
  SSnapshot snapshot = {.data = NULL,
2720
                        .lastApplyIndex = SYNC_INDEX_INVALID,
2721
                        .lastApplyTerm = SYNC_TERM_INVALID,
2722
                        .lastConfigIndex = SYNC_INDEX_INVALID};
2723

2724
  if (code == 0) {
2725
    if (pPreEntry == NULL) return -1;
2726
    preTerm = pPreEntry->term;
2727

2728
    if (h) {
2729
      taosLRUCacheRelease(pCache, h, false);
2730
    } else {
2731
      syncEntryDestroy(pPreEntry);
2732
    }
2733

2734
    return preTerm;
2735
  } else {
2736
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
2737
      // TODO check return value
2738
      (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
2739
      if (snapshot.lastApplyIndex == preIndex) {
2740
        return snapshot.lastApplyTerm;
2741
      }
2742
    }
2743
  }
2744

2745
  sNError(pSyncNode, "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRId64,
2746
          index, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
2747
  return SYNC_TERM_INVALID;
2748
}
2749

2750
// get pre index and term of "index"
2751
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
2752
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
2753
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
2754
  return 0;
2755
}
2756
#endif
2757

2758
static void syncNodeEqPingTimer(void* param, void* tmrId) {
33,591,416✔
2759
  if (!syncIsInit()) return;
33,591,416✔
2760

2761
  int64_t    rid = (int64_t)param;
33,591,416✔
2762
  SSyncNode* pNode = syncNodeAcquire(rid);
33,591,416✔
2763

2764
  if (pNode == NULL) return;
33,591,416✔
2765

2766
  if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
33,591,416✔
2767
    SRpcMsg rpcMsg = {0};
33,591,416✔
2768
    int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
33,591,416✔
2769
                                    pNode->pingTimerMS, pNode);
2770
    if (code != 0) {
33,591,316✔
2771
      sError("failed to build ping msg");
×
2772
      rpcFreeCont(rpcMsg.pCont);
×
2773
      goto _out;
×
2774
    }
2775

2776
    // sTrace("enqueue ping msg");
2777
    code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
33,591,316✔
2778
    if (code != 0) {
33,591,416✔
2779
      sError("failed to sync enqueue ping msg since %s", terrstr());
4,324✔
2780
      rpcFreeCont(rpcMsg.pCont);
4,324✔
2781
      goto _out;
4,324✔
2782
    }
2783

2784
  _out:
33,591,403✔
2785
    if (taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
33,591,416✔
2786
                     &pNode->pPingTimer))
2787
      sError("failed to reset ping timer");
×
2788
  }
2789
  syncNodeRelease(pNode);
33,591,416✔
2790
}
2791

2792
static void syncNodeEqElectTimer(void* param, void* tmrId) {
569,507✔
2793
  if (!syncIsInit()) return;
569,507✔
2794

2795
  int64_t    rid = (int64_t)param;
569,507✔
2796
  SSyncNode* pNode = syncNodeAcquire(rid);
569,507✔
2797

2798
  if (pNode == NULL) return;
569,507✔
2799

2800
  if (pNode->syncEqMsg == NULL) {
566,836✔
2801
    syncNodeRelease(pNode);
×
2802
    return;
×
2803
  }
2804

2805
  int64_t tsNow = taosGetTimestampMs();
566,836✔
2806
  if (tsNow < pNode->electTimerParam.executeTime) {
566,836✔
2807
    syncNodeRelease(pNode);
1,406✔
2808
    return;
1,406✔
2809
  }
2810

2811
  SRpcMsg rpcMsg = {0};
565,430✔
2812
  int32_t code =
2813
      syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerParam.logicClock, pNode->electTimerMS, pNode);
565,430✔
2814

2815
  if (code != 0) {
565,430✔
2816
    sError("failed to build elect msg");
×
2817
    syncNodeRelease(pNode);
×
2818
    return;
×
2819
  }
2820

2821
  SyncTimeout* pTimeout = rpcMsg.pCont;
565,430✔
2822
  sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);
565,430✔
2823

2824
  code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
565,430✔
2825
  if (code != 0) {
565,430✔
2826
    sError("failed to sync enqueue elect msg since %s", terrstr());
×
2827
    rpcFreeCont(rpcMsg.pCont);
×
2828
    syncNodeRelease(pNode);
×
2829
    return;
×
2830
  }
2831

2832
  syncNodeRelease(pNode);
565,430✔
2833
}
2834

2835
#ifdef BUILD_NO_CALL
2836
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
2837
  if (!syncIsInit()) return;
2838

2839
  int64_t    rid = (int64_t)param;
2840
  SSyncNode* pNode = syncNodeAcquire(rid);
2841

2842
  if (pNode == NULL) return;
2843

2844
  if (pNode->totalReplicaNum > 1) {
2845
    if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
2846
      SRpcMsg rpcMsg = {0};
2847
      int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
2848
                                      pNode->heartbeatTimerMS, pNode);
2849

2850
      if (code != 0) {
2851
        sError("failed to build heartbeat msg");
2852
        goto _out;
2853
      }
2854

2855
      sTrace("vgId:%d, enqueue heartbeat timer", pNode->vgId);
2856
      code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
2857
      if (code != 0) {
2858
        sError("failed to enqueue heartbeat msg since %s", terrstr());
2859
        rpcFreeCont(rpcMsg.pCont);
2860
        goto _out;
2861
      }
2862

2863
    _out:
2864
      if (taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
2865
                       &pNode->pHeartbeatTimer) != 0)
2866
        return;
2867

2868
    } else {
2869
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
2870
             pNode->heartbeatTimerLogicClock, pNode->heartbeatTimerLogicClockUser);
2871
    }
2872
  }
2873
}
2874
#endif
2875

2876
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
26,445,467✔
2877
  if (tsSyncLogHeartbeat) {
26,445,467✔
2878
    sInfo("heartbeat timer start");
×
2879
  }
2880
  int32_t code = 0;
26,445,467✔
2881
  int64_t hbDataRid = (int64_t)param;
26,445,467✔
2882
  int64_t tsNow = taosGetTimestampMs();
26,445,467✔
2883

2884
  SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid);
26,445,467✔
2885
  if (pData == NULL) {
26,445,467✔
2886
    sError("hb timer get pData NULL, %" PRId64, hbDataRid);
×
2887
    return;
×
2888
  }
2889

2890
  SSyncNode* pSyncNode = syncNodeAcquire(pData->syncNodeRid);
26,445,467✔
2891
  if (pSyncNode == NULL) {
26,445,467✔
2892
    syncHbTimerDataRelease(pData);
278✔
2893
    sError("hb timer get pSyncNode NULL");
278✔
2894
    return;
278✔
2895
  }
2896

2897
  SSyncTimer* pSyncTimer = pData->pTimer;
26,445,189✔
2898

2899
  if (!pSyncNode->isStart) {
26,445,189✔
2900
    syncNodeRelease(pSyncNode);
×
2901
    syncHbTimerDataRelease(pData);
×
2902
    sError("vgId:%d, hb timer sync node already stop", pSyncNode->vgId);
×
2903
    return;
×
2904
  }
2905

2906
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
26,445,189✔
2907
    syncNodeRelease(pSyncNode);
×
2908
    syncHbTimerDataRelease(pData);
×
2909
    sError("vgId:%d, hb timer sync node not leader", pSyncNode->vgId);
×
2910
    return;
×
2911
  }
2912

2913
  if (tsSyncLogHeartbeat) {
26,445,189✔
2914
    sInfo("vgId:%d, peer hb timer execution, rid:%" PRId64 " addr:0x%" PRIx64, pSyncNode->vgId, hbDataRid,
×
2915
          pData->destId.addr);
2916
  } else {
2917
    sTrace("vgId:%d, peer hb timer execution, rid:%" PRId64 " addr:0x%" PRIx64, pSyncNode->vgId, hbDataRid,
26,445,189✔
2918
           pData->destId.addr);
2919
  }
2920

2921
  if (pSyncNode->totalReplicaNum > 1) {
26,445,189✔
2922
    int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
26,444,889✔
2923
    int64_t msgLogicClock = atomic_load_64(&pData->logicClock);
26,444,889✔
2924

2925
    if (timerLogicClock == msgLogicClock) {
26,444,889✔
2926
      if (tsNow > pData->execTime) {
26,444,333✔
2927
        pData->execTime += pSyncTimer->timerMS;
26,435,184✔
2928

2929
        SRpcMsg rpcMsg = {0};
26,435,184✔
2930
        if ((code = syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId)) != 0) {
26,435,184✔
2931
          sError("vgId:%d, failed to build heartbeat msg since %s", pSyncNode->vgId, tstrerror(code));
×
2932
          syncNodeRelease(pSyncNode);
×
2933
          syncHbTimerDataRelease(pData);
×
2934
          return;
×
2935
        }
2936

2937
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);
26,435,184✔
2938

2939
        SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
26,435,184✔
2940
        pSyncMsg->srcId = pSyncNode->myRaftId;
26,435,184✔
2941
        pSyncMsg->destId = pData->destId;
26,435,184✔
2942
        pSyncMsg->term = raftStoreGetTerm(pSyncNode);
26,435,184✔
2943
        pSyncMsg->commitIndex = pSyncNode->commitIndex;
26,435,184✔
2944
        pSyncMsg->minMatchIndex = pSyncNode->minMatchIndex;
26,435,184✔
2945
        pSyncMsg->privateTerm = 0;
26,435,184✔
2946
        pSyncMsg->timeStamp = tsNow;
26,435,184✔
2947

2948
        // update reset time
2949
        int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
26,435,184✔
2950
        pSyncTimer->timeStamp = tsNow;
26,435,184✔
2951

2952
        // send msg
2953
        TRACE_SET_MSGID(&(rpcMsg.info.traceId), tGenIdPI64());
26,435,184✔
2954
        TRACE_SET_ROOTID(&(rpcMsg.info.traceId), tGenIdPI64());
26,435,184✔
2955
        syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime, &(rpcMsg.info.traceId));
26,435,184✔
2956
        int ret = syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
26,435,184✔
2957
        if (ret != 0) {
26,435,184✔
2958
          sError("vgId:%d, failed to send heartbeat since %s", pSyncNode->vgId, tstrerror(ret));
7,387✔
2959
        }
2960
      }
2961

2962
      if (syncIsInit()) {
26,444,333✔
2963
        if (tsSyncLogHeartbeat) {
26,444,333✔
2964
          sInfo("vgId:%d, reset peer hb timer at %d", pSyncNode->vgId, pSyncTimer->timerMS);
×
2965
        } else {
2966
          sTrace("vgId:%d, reset peer hb timer at %d", pSyncNode->vgId, pSyncTimer->timerMS);
26,444,333✔
2967
        }
2968
        bool stopped = taosTmrResetPriority(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, (void*)hbDataRid,
26,444,333✔
2969
                                            syncEnv()->pTimerManager, &pSyncTimer->pTimer, 2);
26,444,333✔
2970
        if (stopped) sError("vgId:%d, reset peer hb timer error, %s", pSyncNode->vgId, tstrerror(code));
26,444,333✔
2971

2972
      } else {
2973
        sError("sync env is stop, reset peer hb timer error");
×
2974
      }
2975

2976
    } else {
2977
      sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64, pSyncNode->vgId,
556✔
2978
             timerLogicClock, msgLogicClock);
2979
    }
2980

2981
    if (tsSyncLogHeartbeat) {
26,444,889✔
2982
      sInfo("vgId:%d, finish send sync-heartbeat", pSyncNode->vgId);
×
2983
    }
2984
  }
2985

2986
  syncHbTimerDataRelease(pData);
26,445,189✔
2987
  syncNodeRelease(pSyncNode);
26,445,189✔
2988
  if (tsSyncLogHeartbeat) {
26,445,189✔
2989
    sInfo("heartbeat timer stop");
×
2990
  }
2991
}
2992

2993
#ifdef BUILD_NO_CALL
2994
static void deleteCacheEntry(const void* key, size_t keyLen, void* value, void* ud) {
2995
  (void)ud;
2996
  taosMemoryFree(value);
2997
}
2998

2999
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
3000
  SSyncLogStoreData* pData = pLogStore->data;
3001
  sNTrace(pData->pSyncNode, "in cache index:%" PRId64 ", bytes:%u, %p", pEntry->index, pEntry->bytes, pEntry);
3002

3003
  int32_t   code = 0;
3004
  int32_t   entryLen = sizeof(*pEntry) + pEntry->dataLen;
3005
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
3006
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW, NULL);
3007
  if (status != TAOS_LRU_STATUS_OK) {
3008
    code = -1;
3009
  }
3010

3011
  return code;
3012
}
3013
#endif
3014

3015
void syncBuildConfigFromReq(SAlterVnodeReplicaReq* pReq, SSyncCfg* cfg) {  // TODO SAlterVnodeReplicaReq name is proper?
×
3016
  cfg->replicaNum = 0;
×
3017
  cfg->totalReplicaNum = 0;
×
3018
  int32_t code = 0;
×
3019

3020
  for (int i = 0; i < pReq->replica; ++i) {
×
3021
    SNodeInfo* pNode = &cfg->nodeInfo[i];
×
3022
    pNode->nodeId = pReq->replicas[i].id;
×
3023
    pNode->nodePort = pReq->replicas[i].port;
×
3024
    tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
×
3025
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3026
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
×
3027
    sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d, update:%d", pReq->vgId, i, pNode->nodeFqdn,
×
3028
          pNode->nodePort, pNode->nodeId, pNode->nodeRole, update);
3029
    cfg->replicaNum++;
×
3030
  }
3031
  if (pReq->selfIndex != -1) {
×
3032
    cfg->myIndex = pReq->selfIndex;
×
3033
  }
3034
  for (int i = cfg->replicaNum; i < pReq->replica + pReq->learnerReplica; ++i) {
×
3035
    SNodeInfo* pNode = &cfg->nodeInfo[i];
×
3036
    pNode->nodeId = pReq->learnerReplicas[cfg->totalReplicaNum].id;
×
3037
    pNode->nodePort = pReq->learnerReplicas[cfg->totalReplicaNum].port;
×
3038
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3039
    tstrncpy(pNode->nodeFqdn, pReq->learnerReplicas[cfg->totalReplicaNum].fqdn, sizeof(pNode->nodeFqdn));
×
3040
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
×
3041
    sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d, update:%d", pReq->vgId, i, pNode->nodeFqdn,
×
3042
          pNode->nodePort, pNode->nodeId, pNode->nodeRole, update);
3043
    cfg->totalReplicaNum++;
×
3044
  }
3045
  cfg->totalReplicaNum += pReq->replica;
×
3046
  if (pReq->learnerSelfIndex != -1) {
×
3047
    cfg->myIndex = pReq->replica + pReq->learnerSelfIndex;
×
3048
  }
3049
  cfg->changeVersion = pReq->changeVersion;
×
3050
}
×
3051

3052
int32_t syncNodeCheckChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry) {
×
3053
  int32_t code = 0;
×
3054
  if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) {
×
3055
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3056
  }
3057

3058
  SMsgHead* head = (SMsgHead*)pEntry->data;
×
3059
  void*     pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
×
3060

3061
  SAlterVnodeTypeReq req = {0};
×
3062
  if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
×
3063
    code = TSDB_CODE_INVALID_MSG;
×
3064
    TAOS_RETURN(code);
×
3065
  }
3066

3067
  SSyncCfg cfg = {0};
×
3068
  syncBuildConfigFromReq(&req, &cfg);
×
3069

3070
  if (cfg.totalReplicaNum >= 1 &&
×
3071
      (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
×
3072
    bool incfg = false;
×
3073
    for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3074
      if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3075
          ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3076
        incfg = true;
×
3077
        break;
×
3078
      }
3079
    }
3080

3081
    if (!incfg) {
×
3082
      SyncTerm currentTerm = raftStoreGetTerm(ths);
×
3083
      SRaftId  id = EMPTY_RAFT_ID;
×
3084
      syncNodeStepDown(ths, currentTerm, id, "changeConfig");
×
3085
      return 1;
×
3086
    }
3087
  }
3088
  return 0;
×
3089
}
3090

3091
void syncNodeLogConfigInfo(SSyncNode* ths, SSyncCfg* cfg, char* str) {
×
3092
  sInfo("vgId:%d, %s. SyncNode, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64
×
3093
        ", changeVersion:%d, "
3094
        "restoreFinish:%d",
3095
        ths->vgId, str, ths->replicaNum, ths->peersNum, ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion,
3096
        ths->restoreFinish);
3097

3098
  sInfo("vgId:%d, %s, myNodeInfo, clusterId:0x%" PRIx64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str,
×
3099
        ths->myNodeInfo.clusterId, ths->myNodeInfo.nodeId, ths->myNodeInfo.nodeFqdn, ths->myNodeInfo.nodePort,
3100
        ths->myNodeInfo.nodeRole);
3101

3102
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3103
    sInfo("vgId:%d, %s, peersNodeInfo%d, clusterId:0x%" PRIx64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str,
×
3104
          i, ths->peersNodeInfo[i].clusterId, ths->peersNodeInfo[i].nodeId, ths->peersNodeInfo[i].nodeFqdn,
3105
          ths->peersNodeInfo[i].nodePort, ths->peersNodeInfo[i].nodeRole);
3106
  }
3107

3108
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3109
    char    buf[256];
×
3110
    int32_t len = 256;
×
3111
    int32_t n = 0;
×
3112
    n += tsnprintf(buf + n, len - n, "%s", "{");
×
3113
    for (int i = 0; i < ths->peersEpset->numOfEps; i++) {
×
3114
      n += tsnprintf(buf + n, len - n, "%s:%d%s", ths->peersEpset->eps[i].fqdn, ths->peersEpset->eps[i].port,
×
3115
                     (i + 1 < ths->peersEpset->numOfEps ? ", " : ""));
×
3116
    }
3117
    n += tsnprintf(buf + n, len - n, "%s", "}");
×
3118

3119
    sInfo("vgId:%d, %s, peersEpset%d, %s, inUse:%d", ths->vgId, str, i, buf, ths->peersEpset->inUse);
×
3120
  }
3121

3122
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3123
    sInfo("vgId:%d, %s, peersId%d, addr:0x%" PRIx64, ths->vgId, str, i, ths->peersId[i].addr);
×
3124
  }
3125

3126
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3127
    sInfo("vgId:%d, %s, nodeInfo%d, clusterId:0x%" PRIx64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str, i,
×
3128
          ths->raftCfg.cfg.nodeInfo[i].clusterId, ths->raftCfg.cfg.nodeInfo[i].nodeId,
3129
          ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, ths->raftCfg.cfg.nodeInfo[i].nodePort,
3130
          ths->raftCfg.cfg.nodeInfo[i].nodeRole);
3131
  }
3132

3133
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3134
    sInfo("vgId:%d, %s, replicasId%d, addr:0x%" PRIx64, ths->vgId, str, i, ths->replicasId[i].addr);
×
3135
  }
3136
}
×
3137

3138
int32_t syncNodeRebuildPeerAndCfg(SSyncNode* ths, SSyncCfg* cfg) {
×
3139
  int32_t i = 0;
×
3140

3141
  // change peersNodeInfo
3142
  i = 0;
×
3143
  for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3144
    if (!(strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3145
          ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)) {
×
3146
      ths->peersNodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
×
3147
      ths->peersNodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
×
3148
      tstrncpy(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
×
3149
      ths->peersNodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
×
3150
      ths->peersNodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
×
3151

3152
      syncUtilNodeInfo2EpSet(&ths->peersNodeInfo[i], &ths->peersEpset[i]);
×
3153

3154
      if (syncUtilNodeInfo2RaftId(&ths->peersNodeInfo[i], ths->vgId, &ths->peersId[i]) == false) {
×
3155
        sError("vgId:%d, failed to determine raft member id, peer:%d", ths->vgId, i);
×
3156
        return terrno;
×
3157
      }
3158

3159
      i++;
×
3160
    }
3161
  }
3162
  ths->peersNum = i;
×
3163

3164
  // change cfg nodeInfo
3165
  ths->raftCfg.cfg.replicaNum = 0;
×
3166
  i = 0;
×
3167
  for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3168
    if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3169
      ths->raftCfg.cfg.replicaNum++;
×
3170
    }
3171
    ths->raftCfg.cfg.nodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
×
3172
    ths->raftCfg.cfg.nodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
×
3173
    tstrncpy(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
×
3174
    ths->raftCfg.cfg.nodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
×
3175
    ths->raftCfg.cfg.nodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
×
3176
    if ((strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3177
         ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)) {
×
3178
      ths->raftCfg.cfg.myIndex = i;
×
3179
    }
3180
    i++;
×
3181
  }
3182
  ths->raftCfg.cfg.totalReplicaNum = i;
×
3183

3184
  return 0;
×
3185
}
3186

3187
void syncNodeChangePeerAndCfgToVoter(SSyncNode* ths, SSyncCfg* cfg) {
×
3188
  // change peersNodeInfo
3189
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3190
    for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3191
      if (strcmp(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3192
          ths->peersNodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort) {
×
3193
        if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3194
          ths->peersNodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3195
        }
3196
      }
3197
    }
3198
  }
3199

3200
  // change cfg nodeInfo
3201
  ths->raftCfg.cfg.replicaNum = 0;
×
3202
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3203
    for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3204
      if (strcmp(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3205
          ths->raftCfg.cfg.nodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort) {
×
3206
        if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3207
          ths->raftCfg.cfg.nodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3208
          ths->raftCfg.cfg.replicaNum++;
×
3209
        }
3210
      }
3211
    }
3212
  }
3213
}
×
3214

3215
int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum) {
×
3216
  int32_t code = 0;
×
3217
  // 1.rebuild replicasId, remove deleted one
3218
  SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
×
3219
  memcpy(oldReplicasId, ths->replicasId, sizeof(oldReplicasId));
×
3220

3221
  ths->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3222
  ths->totalReplicaNum = ths->raftCfg.cfg.totalReplicaNum;
×
3223
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3224
    if (syncUtilNodeInfo2RaftId(&ths->raftCfg.cfg.nodeInfo[i], ths->vgId, &ths->replicasId[i]) == false) return terrno;
×
3225
  }
3226

3227
  // 2.rebuild MatchIndex, remove deleted one
3228
  SSyncIndexMgr* oldIndex = ths->pMatchIndex;
×
3229

3230
  ths->pMatchIndex = syncIndexMgrCreate(ths);
×
3231
  if (ths->pMatchIndex == NULL) {
×
3232
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3233
    if (terrno != 0) code = terrno;
×
3234
    TAOS_RETURN(code);
×
3235
  }
3236

3237
  syncIndexMgrCopyIfExist(ths->pMatchIndex, oldIndex, oldReplicasId);
×
3238

3239
  syncIndexMgrDestroy(oldIndex);
×
3240

3241
  // 3.rebuild NextIndex, remove deleted one
3242
  SSyncIndexMgr* oldNextIndex = ths->pNextIndex;
×
3243

3244
  ths->pNextIndex = syncIndexMgrCreate(ths);
×
3245
  if (ths->pNextIndex == NULL) {
×
3246
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3247
    if (terrno != 0) code = terrno;
×
3248
    TAOS_RETURN(code);
×
3249
  }
3250

3251
  syncIndexMgrCopyIfExist(ths->pNextIndex, oldNextIndex, oldReplicasId);
×
3252

3253
  syncIndexMgrDestroy(oldNextIndex);
×
3254

3255
  // 4.rebuild pVotesGranted, pVotesRespond, no need to keep old vote state, only rebuild
3256
  voteGrantedUpdate(ths->pVotesGranted, ths);
×
3257
  votesRespondUpdate(ths->pVotesRespond, ths);
×
3258

3259
  // 5.rebuild logReplMgr
3260
  for (int i = 0; i < oldtotalReplicaNum; ++i) {
×
3261
    sDebug("vgId:%d, old logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId,
×
3262
           i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
3263
           ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
3264
  }
3265

3266
  SSyncLogReplMgr* oldLogReplMgrs = NULL;
×
3267
  int64_t          length = sizeof(SSyncLogReplMgr) * (TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA);
×
3268
  oldLogReplMgrs = taosMemoryMalloc(length);
×
3269
  if (NULL == oldLogReplMgrs) return terrno;
×
3270
  memset(oldLogReplMgrs, 0, length);
×
3271

3272
  for (int i = 0; i < oldtotalReplicaNum; i++) {
×
3273
    oldLogReplMgrs[i] = *(ths->logReplMgrs[i]);
×
3274
  }
3275

3276
  syncNodeLogReplDestroy(ths);
×
3277
  if ((code = syncNodeLogReplInit(ths)) != 0) {
×
3278
    taosMemoryFree(oldLogReplMgrs);
×
3279
    TAOS_RETURN(code);
×
3280
  }
3281

3282
  for (int i = 0; i < ths->totalReplicaNum; ++i) {
×
3283
    for (int j = 0; j < oldtotalReplicaNum; j++) {
×
3284
      if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
×
3285
        *(ths->logReplMgrs[i]) = oldLogReplMgrs[j];
×
3286
        ths->logReplMgrs[i]->peerId = i;
×
3287
      }
3288
    }
3289
  }
3290

3291
  for (int i = 0; i < ths->totalReplicaNum; ++i) {
×
3292
    sDebug("vgId:%d, new logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId,
×
3293
           i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
3294
           ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
3295
  }
3296

3297
  // 6.rebuild sender
3298
  for (int i = 0; i < oldtotalReplicaNum; ++i) {
×
3299
    sDebug("vgId:%d, old sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i,
×
3300
           ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
3301
  }
3302

3303
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3304
    if (ths->senders[i] != NULL) {
×
3305
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", ths->vgId, ths->senders[i]);
×
3306

3307
      if (snapshotSenderIsStart(ths->senders[i])) {
×
3308
        snapshotSenderStop(ths->senders[i], false);
×
3309
      }
3310

3311
      snapshotSenderDestroy(ths->senders[i]);
×
3312
      ths->senders[i] = NULL;
×
3313
    }
3314
  }
3315

3316
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3317
    SSyncSnapshotSender* pSender = NULL;
×
3318
    int32_t              code = snapshotSenderCreate(ths, i, &pSender);
×
3319
    if (pSender == NULL) return terrno = code;
×
3320

3321
    ths->senders[i] = pSender;
×
3322
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
×
3323
  }
3324

3325
  for (int i = 0; i < ths->totalReplicaNum; i++) {
×
3326
    sDebug("vgId:%d, new sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i,
×
3327
           ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
3328
  }
3329

3330
  // 7.rebuild synctimer
3331
  if ((code = syncNodeStopHeartbeatTimer(ths)) != 0) {
×
3332
    taosMemoryFree(oldLogReplMgrs);
×
3333
    TAOS_RETURN(code);
×
3334
  }
3335

3336
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3337
    if ((code = syncHbTimerInit(ths, &ths->peerHeartbeatTimerArr[i], ths->replicasId[i])) != 0) {
×
3338
      taosMemoryFree(oldLogReplMgrs);
×
3339
      TAOS_RETURN(code);
×
3340
    }
3341
  }
3342

3343
  if ((code = syncNodeStartHeartbeatTimer(ths)) != 0) {
×
3344
    taosMemoryFree(oldLogReplMgrs);
×
3345
    TAOS_RETURN(code);
×
3346
  }
3347

3348
  // 8.rebuild peerStates
3349
  SPeerState oldState[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0};
×
3350
  for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) {
×
3351
    oldState[i] = ths->peerStates[i];
×
3352
  }
3353

3354
  for (int i = 0; i < ths->totalReplicaNum; i++) {
×
3355
    for (int j = 0; j < oldtotalReplicaNum; j++) {
×
3356
      if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
×
3357
        ths->peerStates[i] = oldState[j];
×
3358
      }
3359
    }
3360
  }
3361

3362
  taosMemoryFree(oldLogReplMgrs);
×
3363

3364
  return 0;
×
3365
}
3366

3367
void syncNodeChangeToVoter(SSyncNode* ths) {
×
3368
  // replicasId, only need to change replicaNum when 1->3
3369
  ths->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3370
  sDebug("vgId:%d, totalReplicaNum:%d", ths->vgId, ths->totalReplicaNum);
×
3371
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
×
3372
    sDebug("vgId:%d, i:%d, replicaId.addr:0x%" PRIx64, ths->vgId, i, ths->replicasId[i].addr);
×
3373
  }
3374

3375
  // pMatchIndex, pNextIndex, only need to change replicaNum when 1->3
3376
  ths->pMatchIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3377
  ths->pNextIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3378

3379
  sDebug("vgId:%d, pMatchIndex->totalReplicaNum:%d", ths->vgId, ths->pMatchIndex->totalReplicaNum);
×
3380
  for (int32_t i = 0; i < ths->pMatchIndex->totalReplicaNum; ++i) {
×
3381
    sDebug("vgId:%d, i:%d, match.index:%" PRId64, ths->vgId, i, ths->pMatchIndex->index[i]);
×
3382
  }
3383

3384
  // pVotesGranted, pVotesRespond
3385
  voteGrantedUpdate(ths->pVotesGranted, ths);
×
3386
  votesRespondUpdate(ths->pVotesRespond, ths);
×
3387

3388
  // logRepMgrs
3389
  // no need to change logRepMgrs when 1->3
3390
}
×
3391

3392
void syncNodeResetPeerAndCfg(SSyncNode* ths) {
×
3393
  SNodeInfo node = {0};
×
3394
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3395
    memcpy(&ths->peersNodeInfo[i], &node, sizeof(SNodeInfo));
×
3396
  }
3397

3398
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3399
    memcpy(&ths->raftCfg.cfg.nodeInfo[i], &node, sizeof(SNodeInfo));
×
3400
  }
3401
}
×
3402

3403
int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str) {
×
3404
  int32_t code = 0;
×
3405
  if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) {
×
3406
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3407
  }
3408

3409
  SMsgHead* head = (SMsgHead*)pEntry->data;
×
3410
  void*     pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
×
3411

3412
  SAlterVnodeTypeReq req = {0};
×
3413
  if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
×
3414
    code = TSDB_CODE_INVALID_MSG;
×
3415
    TAOS_RETURN(code);
×
3416
  }
3417

3418
  SSyncCfg cfg = {0};
×
3419
  syncBuildConfigFromReq(&req, &cfg);
×
3420

3421
  if (cfg.changeVersion <= ths->raftCfg.cfg.changeVersion) {
×
3422
    sInfo(
×
3423
        "vgId:%d, skip conf change entry since lower version. "
3424
        "this entry, index:%" PRId64 ", term:%" PRId64
3425
        ", totalReplicaNum:%d, changeVersion:%d; "
3426
        "current node, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64 ", changeVersion:%d",
3427
        ths->vgId, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum,
3428
        ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion);
3429
    return 0;
×
3430
  }
3431

3432
  if (strcmp(str, "Commit") == 0) {
×
3433
    sInfo(
×
3434
        "vgId:%d, change config from %s. "
3435
        "this, i:%" PRId64
3436
        ", trNum:%d, vers:%d; "
3437
        "node, rNum:%d, pNum:%d, trNum:%d, "
3438
        "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
3439
        "), "
3440
        "cond:(next i:%" PRId64 ", t:%" PRId64 " ==%s)",
3441
        ths->vgId, str, pEntry->index - 1, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum,
3442
        ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, ths->pLogBuf->matchIndex,
3443
        ths->pLogBuf->endIndex, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType));
3444
  } else {
3445
    sInfo(
×
3446
        "vgId:%d, change config from %s. "
3447
        "this, i:%" PRId64 ", t:%" PRId64
3448
        ", trNum:%d, vers:%d; "
3449
        "node, rNum:%d, pNum:%d, trNum:%d, "
3450
        "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
3451
        "), "
3452
        "cond:(pre i:%" PRId64 "==ci:%" PRId64 ", bci:%" PRId64 ")",
3453
        ths->vgId, str, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum,
3454
        ths->peersNum, ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
3455
        ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex, pEntry->index - 1, ths->commitIndex,
3456
        ths->pLogBuf->commitIndex);
3457
  }
3458

3459
  syncNodeLogConfigInfo(ths, &cfg, "before config change");
×
3460

3461
  int32_t oldTotalReplicaNum = ths->totalReplicaNum;
×
3462

3463
  if (cfg.totalReplicaNum == 1 || cfg.totalReplicaNum == 2) {  // remove replica
×
3464

3465
    bool incfg = false;
×
3466
    for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3467
      if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3468
          ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3469
        incfg = true;
×
3470
        break;
×
3471
      }
3472
    }
3473

3474
    if (incfg) {  // remove other
×
3475
      syncNodeResetPeerAndCfg(ths);
×
3476

3477
      // no need to change myNodeInfo
3478

3479
      if ((code = syncNodeRebuildPeerAndCfg(ths, &cfg)) != 0) {
×
3480
        TAOS_RETURN(code);
×
3481
      };
3482

3483
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3484
        TAOS_RETURN(code);
×
3485
      };
3486
    } else {  // remove myself
3487
      // no need to do anything actually, to change the following to reduce distruptive server chance
3488

3489
      syncNodeResetPeerAndCfg(ths);
×
3490

3491
      // change myNodeInfo
3492
      ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3493

3494
      // change peer and cfg
3495
      ths->peersNum = 0;
×
3496
      memcpy(&ths->raftCfg.cfg.nodeInfo[0], &ths->myNodeInfo, sizeof(SNodeInfo));
×
3497
      ths->raftCfg.cfg.replicaNum = 0;
×
3498
      ths->raftCfg.cfg.totalReplicaNum = 1;
×
3499

3500
      // change other
3501
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3502
        TAOS_RETURN(code);
×
3503
      }
3504

3505
      // change state
3506
      ths->state = TAOS_SYNC_STATE_LEARNER;
×
3507
    }
3508

3509
    ths->restoreFinish = false;
×
3510
  } else {                            // add replica, or change replica type
3511
    if (ths->totalReplicaNum == 3) {  // change replica type
×
3512
      sInfo("vgId:%d, begin change replica type", ths->vgId);
×
3513

3514
      // change myNodeInfo
3515
      for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3516
        if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3517
            ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3518
          if (cfg.nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3519
            ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3520
          }
3521
        }
3522
      }
3523

3524
      // change peer and cfg
3525
      syncNodeChangePeerAndCfgToVoter(ths, &cfg);
×
3526

3527
      // change other
3528
      syncNodeChangeToVoter(ths);
×
3529

3530
      // change state
3531
      if (ths->state == TAOS_SYNC_STATE_LEARNER) {
×
3532
        if (ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3533
          ths->state = TAOS_SYNC_STATE_FOLLOWER;
×
3534
        }
3535
      }
3536

3537
      ths->restoreFinish = false;
×
3538
    } else {  // add replica
3539
      sInfo("vgId:%d, begin add replica", ths->vgId);
×
3540

3541
      // no need to change myNodeInfo
3542

3543
      // change peer and cfg
3544
      if ((code = syncNodeRebuildPeerAndCfg(ths, &cfg)) != 0) {
×
3545
        TAOS_RETURN(code);
×
3546
      };
3547

3548
      // change other
3549
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3550
        TAOS_RETURN(code);
×
3551
      };
3552

3553
      // no need to change state
3554

3555
      if (ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) {
×
3556
        ths->restoreFinish = false;
×
3557
      }
3558
    }
3559
  }
3560

3561
  ths->quorum = syncUtilQuorum(ths->replicaNum);
×
3562

3563
  ths->raftCfg.lastConfigIndex = pEntry->index;
×
3564
  ths->raftCfg.cfg.lastIndex = pEntry->index;
×
3565
  ths->raftCfg.cfg.changeVersion = cfg.changeVersion;
×
3566

3567
  syncNodeLogConfigInfo(ths, &cfg, "after config change");
×
3568

3569
  if ((code = syncWriteCfgFile(ths, "apply_config_change_entry")) != 0) {
×
3570
    sError("vgId:%d, failed to create sync cfg file", ths->vgId);
×
3571
    TAOS_RETURN(code);
×
3572
  };
3573

3574
  TAOS_RETURN(code);
×
3575
}
3576

3577
int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry, SRpcMsg* pMsg) {
725,696,020✔
3578
  int32_t code = -1;
725,696,020✔
3579
  if (pEntry->dataLen < sizeof(SMsgHead)) {
725,696,020✔
3580
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
3581
    sError("vgId:%d, msg:%p, cannot append an invalid client request with no msg head, type:%s dataLen:%d", ths->vgId,
×
3582
           pMsg, TMSG_INFO(pEntry->originalRpcType), pEntry->dataLen);
3583
    syncEntryDestroy(pEntry);
×
3584
    pEntry = NULL;
×
3585
    goto _out;
×
3586
  }
3587

3588
  // append to log buffer
3589
  if ((code = syncLogBufferAppend(ths->pLogBuf, ths, pEntry)) < 0) {
725,684,991✔
3590
    sError("vgId:%d, index:%" PRId64 ", failed to enqueue sync log buffer", ths->vgId, pEntry->index);
×
3591
    int32_t ret = 0;
×
3592
    if ((ret = syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno, false)) != 0) {
×
3593
      sError("vgId:%d, index:%" PRId64 ", failed to execute fsm since %s", ths->vgId, pEntry->index, tstrerror(ret));
×
3594
    }
3595
    syncEntryDestroy(pEntry);
×
3596
    pEntry = NULL;
×
3597
    goto _out;
×
3598
  }
3599

3600
  code = 0;
725,682,973✔
3601
_out:;
725,682,973✔
3602
  // proceed match index, with replicating on needed
3603
  SyncIndex       matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL, "Append", pMsg);
725,682,973✔
3604
  const STraceId* trace = pEntry ? &pEntry->originRpcTraceId : NULL;
725,688,604✔
3605

3606
  if (pEntry != NULL) {
725,679,858✔
3607
    sGDebug(trace,
725,705,711✔
3608
            "vgId:%d, index:%" PRId64 ", raft entry appended, msg:%p term:%" PRId64 " buf:[%" PRId64 " %" PRId64
3609
            " %" PRId64 ", %" PRId64 ")",
3610
            ths->vgId, pEntry->index, pMsg, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
3611
            ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
3612
  }
3613

3614
  if (code == 0 && ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
725,660,277✔
3615
    int64_t index = syncNodeUpdateAssignedCommitIndex(ths, matchIndex);
34✔
3616
    sGTrace(trace, "vgId:%d, index:%" PRId64 ", update assigned commit, msg:%p", ths->vgId, index, pMsg);
34✔
3617

3618
    if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
68✔
3619
        syncLogBufferCommit(ths->pLogBuf, ths, ths->assignedCommitIndex, trace, "append-entry") < 0) {
34✔
3620
      sGError(trace, "vgId:%d, index:%" PRId64 ", failed to commit, msg:%p commit index:%" PRId64, ths->vgId, index,
×
3621
              pMsg, ths->commitIndex);
3622
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
3623
    }
3624
  }
3625

3626
  // multi replica
3627
  if (ths->replicaNum > 1) {
725,694,271✔
3628
    TAOS_RETURN(code);
43,832,017✔
3629
  }
3630

3631
  // single replica
3632
  SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, matchIndex);
681,790,936✔
3633
  sGTrace(trace, "vgId:%d, index:%" PRId64 ", raft entry update commit, msg:%p return index:%" PRId64, ths->vgId,
681,807,809✔
3634
          matchIndex, pMsg, returnIndex);
3635

3636
  if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
1,363,633,216✔
3637
      (code = syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex, trace, "append-entry")) < 0) {
681,820,285✔
3638
    sGError(trace,
×
3639
            "vgId:%d, index:%" PRId64 ", failed to commit, msg:%p commit index:%" PRId64 " return index:%" PRId64,
3640
            ths->vgId, matchIndex, pMsg, ths->commitIndex, returnIndex);
3641
  }
3642

3643
  TAOS_RETURN(code);
681,828,358✔
3644
}
3645

3646
bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
722,620,339✔
3647
  if (pSyncNode->totalReplicaNum == 1) {
722,620,339✔
3648
    return false;
677,830,555✔
3649
  }
3650

3651
  int32_t toCount = 0;
44,797,530✔
3652
  int64_t tsNow = taosGetTimestampMs();
44,797,191✔
3653
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
133,522,146✔
3654
    if (pSyncNode->peersNodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
88,724,877✔
3655
      continue;
1,063,871✔
3656
    }
3657
    int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
87,661,403✔
3658
    if (recvTime == 0 || recvTime == -1) {
87,661,084✔
3659
      continue;
×
3660
    }
3661

3662
    if (tsNow - recvTime > tsHeartbeatTimeout) {
87,661,084✔
3663
      toCount++;
142,836✔
3664
    }
3665
  }
3666

3667
  bool b = (toCount >= pSyncNode->quorum ? true : false);
44,797,530✔
3668

3669
  return b;
44,797,530✔
3670
}
3671

3672
bool syncNodeSnapshotSending(SSyncNode* pSyncNode) {
×
3673
  if (pSyncNode == NULL) return false;
×
3674
  bool b = false;
×
3675
  for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
×
3676
    if (pSyncNode->senders[i] != NULL && pSyncNode->senders[i]->start) {
×
3677
      b = true;
×
3678
      break;
×
3679
    }
3680
  }
3681
  return b;
×
3682
}
3683

3684
bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) {
×
3685
  if (pSyncNode == NULL) return false;
×
3686
  if (pSyncNode->pNewNodeReceiver == NULL) return false;
×
3687
  if (pSyncNode->pNewNodeReceiver->start) return true;
×
3688
  return false;
×
3689
}
3690

3691
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
3,627,567✔
3692
  int32_t   code = 0;
3,627,567✔
3693
  SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
3,627,567✔
3694
  SyncTerm  term = raftStoreGetTerm(ths);
3,627,854✔
3695

3696
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
3,627,854✔
3697
  if (pEntry == NULL) {
3,626,889✔
3698
    code = TSDB_CODE_OUT_OF_MEMORY;
×
3699
    TAOS_RETURN(code);
×
3700
  }
3701

3702
  code = syncNodeAppend(ths, pEntry, NULL);
3,626,889✔
3703
  TAOS_RETURN(code);
3,627,854✔
3704
}
3705

3706
#ifdef BUILD_NO_CALL
3707
static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
3708
  int32_t ret = 0;
3709

3710
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
3711
  SyncTerm        term = raftStoreGetTerm(ths);
3712
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
3713
  if (pEntry == NULL) return -1;
3714

3715
  LRUHandle* h = NULL;
3716

3717
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
3718
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry, false);
3719
    if (code != 0) {
3720
      sError("append noop error");
3721
      return -1;
3722
    }
3723

3724
    syncCacheEntry(ths->pLogStore, pEntry, &h);
3725
  }
3726

3727
  if (h) {
3728
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
3729
  } else {
3730
    syncEntryDestroy(pEntry);
3731
  }
3732

3733
  return ret;
3734
}
3735
#endif
3736

3737
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
25,513,515✔
3738
  SyncHeartbeat* pMsg = pRpcMsg->pCont;
25,513,515✔
3739
  bool           resetElect = false;
25,513,515✔
3740

3741
  int64_t tsMs = taosGetTimestampMs();
25,513,515✔
3742

3743
  int64_t lastRecvTime = syncIndexMgrGetRecvTime(ths->pNextIndex, &(pMsg->srcId));
25,513,515✔
3744
  syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);
25,513,515✔
3745
  syncIndexMgrIncRecvCount(ths->pNextIndex, &(pMsg->srcId));
25,513,515✔
3746

3747
  int64_t netElapsed = tsMs - pMsg->timeStamp;
25,513,405✔
3748
  int64_t timeDiff = tsMs - lastRecvTime;
25,513,515✔
3749
  syncLogRecvHeartbeat(ths, pMsg, netElapsed, &pRpcMsg->info.traceId, timeDiff, pRpcMsg);
25,513,515✔
3750

3751
  if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) {
25,513,515✔
3752
    sWarn(
×
3753
        "vgId:%d, drop heartbeat msg from dnode:%d, because it come from another cluster:%d, differ from current "
3754
        "cluster:%d",
3755
        ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), CID(&(ths->myRaftId)));
3756
    return 0;
×
3757
  }
3758

3759
  SyncTerm currentTerm = raftStoreGetTerm(ths);
25,513,405✔
3760

3761
  if (pMsg->term > currentTerm && ths->state == TAOS_SYNC_STATE_LEARNER) {
25,512,858✔
3762
    raftStoreSetTerm(ths, pMsg->term);
112,464✔
3763
    currentTerm = pMsg->term;
112,574✔
3764
  }
3765

3766
  int64_t tsMs2 = taosGetTimestampMs();
25,513,405✔
3767

3768
  int64_t processTime = tsMs2 - tsMs;
25,513,405✔
3769
  if (processTime > SYNC_HEARTBEAT_SLOW_MS) {
25,513,405✔
3770
    sGError(&pRpcMsg->info.traceId,
×
3771
            "vgId:%d, process sync-heartbeat msg from dnode:%d, commit-index:%" PRId64 ", cluster:%d msgTerm:%" PRId64
3772
            " currentTerm:%" PRId64 ", processTime:%" PRId64,
3773
            ths->vgId, DID(&(pMsg->srcId)), pMsg->commitIndex, CID(&(pMsg->srcId)), pMsg->term, currentTerm,
3774
            processTime);
3775
  } else {
3776
    sGDebug(&pRpcMsg->info.traceId,
25,513,405✔
3777
            "vgId:%d, process sync-heartbeat msg from dnode:%d, commit-index:%" PRId64 ", cluster:%d msgTerm:%" PRId64
3778
            " currentTerm:%" PRId64 ", processTime:%" PRId64,
3779
            ths->vgId, DID(&(pMsg->srcId)), pMsg->commitIndex, CID(&(pMsg->srcId)), pMsg->term, currentTerm,
3780
            processTime);
3781
  }
3782

3783
  if (pMsg->term == currentTerm &&
25,513,405✔
3784
      (ths->state != TAOS_SYNC_STATE_LEADER && ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
25,434,096✔
3785
    resetElect = true;
25,433,439✔
3786

3787
    ths->minMatchIndex = pMsg->minMatchIndex;
25,433,439✔
3788

3789
    if (ths->state == TAOS_SYNC_STATE_FOLLOWER || ths->state == TAOS_SYNC_STATE_LEARNER) {
25,433,549✔
3790
      SRpcMsg rpcMsgLocalCmd = {0};
25,422,935✔
3791
      TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId));
25,422,935✔
3792
      rpcMsgLocalCmd.info.traceId = pRpcMsg->info.traceId;
25,422,540✔
3793

3794
      SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
25,422,937✔
3795
      pSyncMsg->cmd =
25,423,484✔
3796
          (ths->state == TAOS_SYNC_STATE_LEARNER) ? SYNC_LOCAL_CMD_LEARNER_CMT : SYNC_LOCAL_CMD_FOLLOWER_CMT;
25,422,937✔
3797
      pSyncMsg->commitIndex = pMsg->commitIndex;
25,422,827✔
3798
      pSyncMsg->currentTerm = pMsg->term;
25,422,596✔
3799

3800
      if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
25,422,488✔
3801
        int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
25,422,758✔
3802
        if (code != 0) {
25,422,181✔
3803
          sError("vgId:%d, failed to enqueue sync-local-cmd msg(cmd=commit) from heartbeat since %s",
597✔
3804
                 ths->vgId, tstrerror(code));
3805
          rpcFreeCont(rpcMsgLocalCmd.pCont);
597✔
3806
        } else {
3807
          sGTrace(&pRpcMsg->info.traceId,
25,421,584✔
3808
                  "vgId:%d, enqueue sync-local-cmd msg(cmd=commit) from heartbeat, commit-index:%" PRId64
3809
                  ", term:%" PRId64,
3810
                  ths->vgId, pMsg->commitIndex, pMsg->term);
3811
        }
3812
      }
3813
    }
3814
  }
3815

3816
  if (pMsg->term >= currentTerm &&
25,511,672✔
3817
      (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
25,512,864✔
3818
    SRpcMsg rpcMsgLocalCmd = {0};
544✔
3819
    TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId));
×
3820
    rpcMsgLocalCmd.info.traceId = pRpcMsg->info.traceId;
×
3821

3822
    SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
×
3823
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
×
3824
    pSyncMsg->currentTerm = pMsg->term;
×
3825
    pSyncMsg->commitIndex = pMsg->commitIndex;
×
3826

3827
    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
×
3828
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
×
3829
      if (code != 0) {
×
3830
        sError("vgId:%d, sync enqueue sync-local-cmd msg(cmd=step-down) error, code:%d", ths->vgId, code);
×
3831
        rpcFreeCont(rpcMsgLocalCmd.pCont);
×
3832
      } else {
3833
        sTrace("vgId:%d, sync enqueue sync-local-cmd msg(cmd=step-down), new-term:%" PRId64, ths->vgId, pMsg->term);
×
3834
      }
3835
    }
3836
  }
3837

3838
  SRpcMsg rpcMsg = {0};
25,511,348✔
3839
  TAOS_CHECK_RETURN(syncBuildHeartbeatReply(&rpcMsg, ths->vgId));
25,511,457✔
3840
  SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
25,510,726✔
3841
  pMsgReply->destId = pMsg->srcId;
25,510,726✔
3842
  pMsgReply->srcId = ths->myRaftId;
25,512,852✔
3843
  pMsgReply->term = currentTerm;
25,511,087✔
3844
  pMsgReply->privateTerm = 8864;  // magic number
25,511,764✔
3845
  pMsgReply->startTime = ths->startTime;
25,512,760✔
3846
  pMsgReply->timeStamp = tsMs;
25,509,596✔
3847
  rpcMsg.info.traceId = pRpcMsg->info.traceId;
25,511,789✔
3848

3849
  // reply
3850
  int64_t tsMs3 = taosGetTimestampMs();
25,511,564✔
3851

3852
  int64_t processTime2 = tsMs3 - tsMs2;
25,511,564✔
3853
  TRACE_SET_MSGID(&(rpcMsg.info.traceId), tGenIdPI64());
25,511,564✔
3854
  if (processTime2 > SYNC_HEARTBEAT_SLOW_MS) {
25,510,007✔
UNCOV
3855
    sGError(&rpcMsg.info.traceId,
×
3856
            "vgId:%d, send sync-heartbeat-reply to dnode:%d term:%" PRId64 " timestamp:%" PRId64
3857
            ", processTime:%" PRId64,
3858
            ths->vgId, DID(&(pMsgReply->destId)), pMsgReply->term, pMsgReply->timeStamp, processTime2);
3859
  } else {
3860
    if(tsSyncLogHeartbeat){
25,510,787✔
3861
      sGInfo(&rpcMsg.info.traceId,
×
3862
            "vgId:%d, send sync-heartbeat-reply to dnode:%d term:%" PRId64 " timestamp:%" PRId64
3863
            ", processTime:%" PRId64,
3864
            ths->vgId, DID(&(pMsgReply->destId)), pMsgReply->term, pMsgReply->timeStamp, processTime2);
3865
    }
3866
    else{
3867
      sGDebug(&rpcMsg.info.traceId,
25,510,787✔
3868
            "vgId:%d, send sync-heartbeat-reply to dnode:%d term:%" PRId64 " timestamp:%" PRId64
3869
            ", processTime:%" PRId64,
3870
            ths->vgId, DID(&(pMsgReply->destId)), pMsgReply->term, pMsgReply->timeStamp, processTime2);
3871
    }
3872
  }
3873

3874
  TAOS_CHECK_RETURN(syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg));
25,510,007✔
3875

3876
  if (resetElect) syncNodeResetElectTimer(ths);
25,513,176✔
3877
  return 0;
25,513,515✔
3878
}
3879

3880
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
25,507,024✔
3881
  int32_t code = 0;
25,507,024✔
3882

3883
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
25,507,024✔
3884
  SSyncLogReplMgr*    pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId);
25,507,024✔
3885
  if (pMgr == NULL) {
25,507,024✔
3886
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3887
    if (terrno != 0) code = terrno;
×
3888
    sError("vgId:%d, failed to get log repl mgr for the peer at addr 0x016%" PRIx64, ths->vgId, pMsg->srcId.addr);
×
3889
    TAOS_RETURN(code);
×
3890
  }
3891

3892
  int64_t tsMs = taosGetTimestampMs();
25,507,024✔
3893
  int64_t lastRecvTime = syncIndexMgrGetRecvTime(ths->pMatchIndex, &pMsg->srcId);
25,507,024✔
3894
  syncLogRecvHeartbeatReply(ths, pMsg, tsMs - pMsg->timeStamp, &pRpcMsg->info.traceId, tsMs - lastRecvTime);
25,507,024✔
3895

3896
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
25,506,918✔
3897
  syncIndexMgrIncRecvCount(ths->pMatchIndex, &(pMsg->srcId));
25,507,024✔
3898

3899
  return syncLogReplProcessHeartbeatReply(pMgr, ths, pMsg);
25,507,024✔
3900
}
3901

3902
#ifdef BUILD_NO_CALL
3903
int32_t syncNodeOnHeartbeatReplyOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
3904
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
3905

3906
  int64_t tsMs = taosGetTimestampMs();
3907
  int64_t timeDiff = tsMs - pMsg->timeStamp;
3908
  syncLogRecvHeartbeatReply(ths, pMsg, timeDiff, &pRpcMsg->info.traceId, timeDiff);
3909

3910
  // update last reply time, make decision whether the other node is alive or not
3911
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
3912
  return 0;
3913
}
3914
#endif
3915

3916
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
25,423,105✔
3917
  SyncLocalCmd* pMsg = pRpcMsg->pCont;
25,423,105✔
3918
  syncLogRecvLocalCmd(ths, pMsg, &pRpcMsg->info.traceId);
25,423,105✔
3919

3920
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
25,422,674✔
3921
    SRaftId id = EMPTY_RAFT_ID;
×
3922
    syncNodeStepDown(ths, pMsg->currentTerm, id, "localCmd");
×
3923

3924
  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT || pMsg->cmd == SYNC_LOCAL_CMD_LEARNER_CMT) {
50,846,106✔
3925
    if (syncLogBufferIsEmpty(ths->pLogBuf)) {
25,423,105✔
3926
      sError("vgId:%d, sync log buffer is empty", ths->vgId);
×
3927
      return 0;
×
3928
    }
3929
    SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf);
25,423,105✔
3930
    if (matchTerm < 0) {
25,422,997✔
3931
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3932
    }
3933
    if (pMsg->currentTerm == matchTerm) {
25,422,997✔
3934
      SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
24,081,670✔
3935
      sTrace("vgId:%d, raft entry update commit, return index:%" PRId64, ths->vgId, returnIndex);
24,081,359✔
3936
    }
3937
    if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
50,845,795✔
3938
        syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex, &pRpcMsg->info.traceId, "heartbeat") < 0) {
25,422,794✔
3939
      sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64, ths->vgId, terrstr(),
736✔
3940
             ths->commitIndex);
3941
    }
3942
  } else {
3943
    sError("error local cmd");
×
3944
  }
3945

3946
  return 0;
25,423,001✔
3947
}
3948

3949
// TLA+ Spec
3950
// ClientRequest(i, v) ==
3951
//     /\ state[i] = Leader
3952
//     /\ LET entry == [term  |-> currentTerm[i],
3953
//                      value |-> v]
3954
//            newLog == Append(log[i], entry)
3955
//        IN  log' = [log EXCEPT ![i] = newLog]
3956
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
3957
//                    leaderVars, commitIndex>>
3958
//
3959

3960
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
722,062,490✔
3961
  sGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, process client request", ths->vgId, pMsg);
722,062,490✔
3962
  int32_t code = 0;
722,062,490✔
3963

3964
  SyncIndex       index = syncLogBufferGetEndIndex(ths->pLogBuf);
722,062,490✔
3965
  SyncTerm        term = raftStoreGetTerm(ths);
722,079,863✔
3966
  SSyncRaftEntry* pEntry = NULL;
722,086,854✔
3967
  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
722,086,854✔
3968
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index, &pMsg->info.traceId);
81,032,817✔
3969
  } else {
3970
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
641,027,087✔
3971
  }
3972

3973
  if (pEntry == NULL) {
722,070,777✔
3974
    sGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to process client request since %s", ths->vgId, pMsg,
×
3975
            terrstr());
3976
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3977
  }
3978

3979
  // 1->2, config change is add in write thread, and will continue in sync thread
3980
  // need save message for it
3981
  if (pMsg->msgType == TDMT_SYNC_CONFIG_CHANGE) {
722,070,777✔
3982
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
×
3983
    uint64_t  seqNum = syncRespMgrAdd(ths->pSyncRespMgr, &stub);
×
3984
    pEntry->seqNum = seqNum;
×
3985
  }
3986

3987
  if (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
722,064,099✔
3988
    if (pRetIndex) {
722,074,839✔
3989
      (*pRetIndex) = index;
641,035,866✔
3990
    }
3991

3992
    if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
722,080,943✔
3993
      int32_t code = syncNodeCheckChangeConfig(ths, pEntry);
×
3994
      if (code < 0) {
×
3995
        sGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to check change config since %s", ths->vgId, pMsg,
×
3996
                terrstr());
3997
        syncEntryDestroy(pEntry);
×
3998
        pEntry = NULL;
×
3999
        TAOS_RETURN(code);
×
4000
      }
4001

4002
      if (code > 0) {
×
4003
        SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
×
4004
        int32_t num = syncRespMgrGetAndDel(ths->pSyncRespMgr, pEntry->seqNum, &rsp.info);
×
4005
        sGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, get response stub for config change, seqNum:%" PRIu64 " num:%d",
×
4006
                ths->vgId, pMsg, pEntry->seqNum, num);
4007
        if (rsp.info.handle != NULL) {
×
4008
          tmsgSendRsp(&rsp);
×
4009
        }
4010
        syncEntryDestroy(pEntry);
×
4011
        pEntry = NULL;
×
4012
        TAOS_RETURN(code);
×
4013
      }
4014
    }
4015

4016
    code = syncNodeAppend(ths, pEntry, pMsg);
722,072,809✔
4017
    return code;
722,020,495✔
4018
  } else {
4019
    syncEntryDestroy(pEntry);
×
4020
    pEntry = NULL;
×
4021
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
4022
  }
4023
}
4024

4025
const char* syncStr(ESyncState state) {
406,833,464✔
4026
  switch (state) {
406,833,464✔
4027
    case TAOS_SYNC_STATE_FOLLOWER:
177,360,790✔
4028
      return "follower";
177,360,790✔
4029
    case TAOS_SYNC_STATE_CANDIDATE:
1,891,047✔
4030
      return "candidate";
1,891,047✔
4031
    case TAOS_SYNC_STATE_LEADER:
213,730,386✔
4032
      return "leader";
213,730,386✔
4033
    case TAOS_SYNC_STATE_ERROR:
×
4034
      return "error";
×
4035
    case TAOS_SYNC_STATE_OFFLINE:
1,312,444✔
4036
      return "offline";
1,312,444✔
4037
    case TAOS_SYNC_STATE_LEARNER:
12,527,760✔
4038
      return "learner";
12,527,760✔
4039
    case TAOS_SYNC_STATE_ASSIGNED_LEADER:
187✔
4040
      return "assigned leader";
187✔
4041
    default:
10,958✔
4042
      return "unknown";
10,958✔
4043
  }
4044
}
4045

4046
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
378,022✔
4047
  for (int32_t i = 0; i < pNewCfg->totalReplicaNum; ++i) {
417,616✔
4048
    SRaftId raftId = {
417,616✔
4049
        .addr = SYNC_ADDR(&pNewCfg->nodeInfo[i]),
417,616✔
4050
        .vgId = ths->vgId,
417,616✔
4051
    };
4052

4053
    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
417,616✔
4054
      pNewCfg->myIndex = i;
378,022✔
4055
      return 0;
378,022✔
4056
    }
4057
  }
4058

4059
  return TSDB_CODE_SYN_INTERNAL_ERROR;
×
4060
}
4061

4062
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
722,616,531✔
4063
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
722,616,531✔
4064
}
4065

4066
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
236,235,543✔
4067
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
473,022,938✔
4068
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
473,023,048✔
4069
      return true;
236,235,334✔
4070
    }
4071
  }
4072
  return false;
×
4073
}
4074

4075
SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
17,300,298✔
4076
  SSyncSnapshotSender* pSender = NULL;
17,300,298✔
4077
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
69,785,274✔
4078
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
52,490,508✔
4079
      pSender = (ths->senders)[i];
17,301,971✔
4080
    }
4081
  }
4082
  return pSender;
17,300,666✔
4083
}
4084

4085
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
11,768,450✔
4086
  SSyncTimer* pTimer = NULL;
11,768,450✔
4087
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
50,773,392✔
4088
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
39,005,589✔
4089
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
11,767,974✔
4090
    }
4091
  }
4092
  return pTimer;
11,768,376✔
4093
}
4094

4095
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
1,055,046✔
4096
  SPeerState* pState = NULL;
1,055,046✔
4097
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
3,592,813✔
4098
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
2,537,767✔
4099
      pState = &((ths->peerStates)[i]);
1,055,046✔
4100
    }
4101
  }
4102
  return pState;
1,055,046✔
4103
}
4104

4105
#ifdef BUILD_NO_CALL
4106
bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
4107
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
4108
  if (pState == NULL) {
4109
    sError("vgId:%d, replica maybe dropped", ths->vgId);
4110
    return false;
4111
  }
4112

4113
  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
4114
  int64_t   tsNow = taosGetTimestampMs();
4115

4116
  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
4117
    return false;
4118
  }
4119

4120
  return true;
4121
}
4122

4123
bool syncNodeCanChange(SSyncNode* pSyncNode) {
4124
  if (pSyncNode->changing) {
4125
    sError("sync cannot change");
4126
    return false;
4127
  }
4128

4129
  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
4130
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
4131
    if (pSyncNode->commitIndex != lastIndex) {
4132
      sError("sync cannot change2");
4133
      return false;
4134
    }
4135
  }
4136

4137
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
4138
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
4139
    if (pSender != NULL && pSender->start) {
4140
      sError("sync cannot change3");
4141
      return false;
4142
    }
4143
  }
4144

4145
  return true;
4146
}
4147
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc