• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3847

11 Apr 2025 06:14AM UTC coverage: 62.612% (+0.2%) from 62.398%
#3847

push

travis-ci

web-flow
Merge pull request #30758 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

154571 of 315259 branches covered (49.03%)

Branch coverage included in aggregate %.

63 of 80 new or added lines in 9 files covered. (78.75%)

946 existing lines in 106 files now uncovered.

240135 of 315138 relevant lines covered (76.2%)

19768383.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.59
/source/libs/sync/src/syncMain.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "sync.h"
18
#include "syncAppendEntries.h"
19
#include "syncAppendEntriesReply.h"
20
#include "syncCommit.h"
21
#include "syncElection.h"
22
#include "syncEnv.h"
23
#include "syncIndexMgr.h"
24
#include "syncInt.h"
25
#include "syncMessage.h"
26
#include "syncPipeline.h"
27
#include "syncRaftCfg.h"
28
#include "syncRaftLog.h"
29
#include "syncRaftStore.h"
30
#include "syncReplication.h"
31
#include "syncRequestVote.h"
32
#include "syncRequestVoteReply.h"
33
#include "syncRespMgr.h"
34
#include "syncSnapshot.h"
35
#include "syncTimeout.h"
36
#include "syncUtil.h"
37
#include "syncVoteMgr.h"
38
#include "tglobal.h"
39
#include "tmisce.h"
40
#include "tref.h"
41

42
static void    syncNodeEqPingTimer(void* param, void* tmrId);
43
static void    syncNodeEqElectTimer(void* param, void* tmrId);
44
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
45
static int32_t syncNodeAppendNoop(SSyncNode* ths);
46
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
47
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
48
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
49
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
50
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
51
static int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
52
static bool    syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
53
static int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
54
static bool    syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);
55

56
static bool    syncNodeCanChange(SSyncNode* pSyncNode);
57
static int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
58
static int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
59
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);
60

61
static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
62

63
int64_t syncOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
13,526✔
64
  sInfo("vgId:%d, start to open sync", pSyncInfo->vgId);
13,526✔
65
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo, vnodeVersion);
13,526✔
66
  if (pSyncNode == NULL) {
13,529!
67
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
×
68
    return -1;
×
69
  }
70

71
  pSyncNode->rid = syncNodeAdd(pSyncNode);
13,529✔
72
  if (pSyncNode->rid < 0) {
13,529!
73
    syncNodeClose(pSyncNode);
×
74
    return -1;
×
75
  }
76

77
  pSyncNode->pingBaseLine = pSyncInfo->pingMs;
13,529✔
78
  pSyncNode->pingTimerMS = pSyncInfo->pingMs;
13,529✔
79
  pSyncNode->electBaseLine = pSyncInfo->electMs;
13,529✔
80
  pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
13,529✔
81
  pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
13,529✔
82
  pSyncNode->msgcb = pSyncInfo->msgcb;
13,529✔
83
  sInfo("vgId:%d, sync opened", pSyncInfo->vgId);
13,529✔
84
  return pSyncNode->rid;
13,529✔
85
}
86

87
int32_t syncStart(int64_t rid) {
13,518✔
88
  int32_t    code = 0;
13,518✔
89
  int32_t    vgId = 0;
13,518✔
90
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
13,518✔
91
  if (pSyncNode == NULL) {
13,528!
92
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
93
    if (terrno != 0) code = terrno;
×
94
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
95
    TAOS_RETURN(code);
×
96
  }
97
  vgId = pSyncNode->vgId;
13,528✔
98
  sInfo("vgId:%d, begin to start sync", pSyncNode->vgId);
13,528✔
99

100
  if ((code = syncNodeRestore(pSyncNode)) < 0) {
13,528!
101
    sError("vgId:%d, failed to restore sync log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
102
    goto _err;
×
103
  }
104
  sInfo("vgId:%d, sync node restore is executed", pSyncNode->vgId);
13,528✔
105

106
  if ((code = syncNodeStart(pSyncNode)) < 0) {
13,528!
107
    sError("vgId:%d, failed to start sync node since %s", pSyncNode->vgId, tstrerror(code));
×
108
    goto _err;
×
109
  }
110
  sInfo("vgId:%d, sync node start is executed", pSyncNode->vgId);
13,528✔
111

112
  syncNodeRelease(pSyncNode);
13,528✔
113

114
  sInfo("vgId:%d, sync started", vgId);
13,528✔
115

116
  TAOS_RETURN(code);
13,528✔
117

118
_err:
×
119
  syncNodeRelease(pSyncNode);
×
120
  TAOS_RETURN(code);
×
121
}
122

123
int32_t syncNodeGetConfig(int64_t rid, SSyncCfg* cfg) {
27,826✔
124
  int32_t    code = 0;
27,826✔
125
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
27,826✔
126

127
  if (pSyncNode == NULL) {
27,828!
128
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
129
    if (terrno != 0) code = terrno;
×
130
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
131
    TAOS_RETURN(code);
×
132
  }
133

134
  *cfg = pSyncNode->raftCfg.cfg;
27,828✔
135

136
  syncNodeRelease(pSyncNode);
27,828✔
137

138
  return 0;
27,826✔
139
}
140

141
void syncStop(int64_t rid) {
13,527✔
142
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
13,527✔
143
  if (pSyncNode != NULL) {
13,528!
144
    pSyncNode->isStart = false;
13,528✔
145
    syncNodeRelease(pSyncNode);
13,528✔
146
    syncNodeRemove(rid);
13,528✔
147
  }
148
}
13,528✔
149

150
void syncPreStop(int64_t rid) {
13,527✔
151
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
13,527✔
152
  if (pSyncNode != NULL) {
13,528!
153
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
13,528!
154
      sInfo("vgId:%d, stop snapshot receiver", pSyncNode->vgId);
×
155
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
156
    }
157
    syncNodePreClose(pSyncNode);
13,528✔
158
    syncNodeRelease(pSyncNode);
13,527✔
159
  }
160
}
13,528✔
161

162
void syncPostStop(int64_t rid) {
11,761✔
163
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
11,761✔
164
  if (pSyncNode != NULL) {
11,762!
165
    syncNodePostClose(pSyncNode);
11,762✔
166
    syncNodeRelease(pSyncNode);
11,761✔
167
  }
168
}
11,762✔
169

170
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
1,678✔
171
  if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
1,678!
172
  return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
1,678!
173
}
174

175
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
1,793✔
176
  int32_t    code = 0;
1,793✔
177
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
1,793✔
178
  if (pSyncNode == NULL) {
1,793!
179
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
180
    if (terrno != 0) code = terrno;
×
181
    TAOS_RETURN(code);
×
182
  }
183

184
  if (pSyncNode->raftCfg.lastConfigIndex >= pNewCfg->lastIndex) {
1,793✔
185
    syncNodeRelease(pSyncNode);
115✔
186
    sInfo("vgId:%d, no need Reconfig, current index:%" PRId64 ", new index:%" PRId64, pSyncNode->vgId,
115!
187
          pSyncNode->raftCfg.lastConfigIndex, pNewCfg->lastIndex);
188
    return 0;
115✔
189
  }
190

191
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
1,678!
192
    syncNodeRelease(pSyncNode);
×
193
    code = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
×
194
    sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
×
195
    TAOS_RETURN(code);
×
196
  }
197

198
  TAOS_CHECK_RETURN(syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg));
1,678!
199

200
  if (syncNodeDoConfigChange(pSyncNode, pNewCfg, pNewCfg->lastIndex) != 0) {
1,678!
201
    code = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
×
202
    sError("vgId:%d, failed to reconfig since do change error", pSyncNode->vgId);
×
203
    TAOS_RETURN(code);
×
204
  }
205

206
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER || pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
1,678!
207
    // TODO check return value
208
    TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1,479!
209

210
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
23,664✔
211
      TAOS_CHECK_RETURN(syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]));
22,185!
212
    }
213

214
    TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
1,479!
215
    // syncNodeReplicate(pSyncNode);
216
  }
217

218
  syncNodeRelease(pSyncNode);
1,678✔
219
  TAOS_RETURN(code);
1,678✔
220
}
221

222
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
2,954,321✔
223
  int32_t code = -1;
2,954,321✔
224
  if (!syncIsInit()) {
2,954,321!
225
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
226
    if (terrno != 0) code = terrno;
×
227
    TAOS_RETURN(code);
×
228
  }
229

230
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
2,954,323✔
231
  if (pSyncNode == NULL) {
2,954,706✔
232
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
262✔
233
    if (terrno != 0) code = terrno;
262!
234
    TAOS_RETURN(code);
×
235
  }
236

237
  switch (pMsg->msgType) {
2,954,444!
238
    case TDMT_SYNC_HEARTBEAT:
38,518✔
239
      code = syncNodeOnHeartbeat(pSyncNode, pMsg);
38,518✔
240
      break;
38,519✔
241
    case TDMT_SYNC_HEARTBEAT_REPLY:
37,761✔
242
      code = syncNodeOnHeartbeatReply(pSyncNode, pMsg);
37,761✔
243
      break;
37,761✔
244
    case TDMT_SYNC_TIMEOUT:
238,946✔
245
      code = syncNodeOnTimeout(pSyncNode, pMsg);
238,946✔
246
      break;
238,870✔
247
    case TDMT_SYNC_TIMEOUT_ELECTION:
1,353✔
248
      code = syncNodeOnTimeout(pSyncNode, pMsg);
1,353✔
249
      break;
1,353✔
250
    case TDMT_SYNC_CLIENT_REQUEST:
230,943✔
251
      code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
230,943✔
252
      break;
230,945✔
253
    case TDMT_SYNC_REQUEST_VOTE:
2,112✔
254
      code = syncNodeOnRequestVote(pSyncNode, pMsg);
2,112✔
255
      break;
2,112✔
256
    case TDMT_SYNC_REQUEST_VOTE_REPLY:
1,990✔
257
      code = syncNodeOnRequestVoteReply(pSyncNode, pMsg);
1,990✔
258
      break;
1,990✔
259
    case TDMT_SYNC_APPEND_ENTRIES:
1,175,295✔
260
      code = syncNodeOnAppendEntries(pSyncNode, pMsg);
1,175,295✔
261
      break;
1,175,301✔
262
    case TDMT_SYNC_APPEND_ENTRIES_REPLY:
1,174,877✔
263
      code = syncNodeOnAppendEntriesReply(pSyncNode, pMsg);
1,174,877✔
264
      break;
1,174,876✔
265
    case TDMT_SYNC_SNAPSHOT_SEND:
7,113✔
266
      code = syncNodeOnSnapshot(pSyncNode, pMsg);
7,113✔
267
      break;
7,113✔
268
    case TDMT_SYNC_SNAPSHOT_RSP:
7,219✔
269
      code = syncNodeOnSnapshotRsp(pSyncNode, pMsg);
7,219✔
270
      break;
7,219✔
271
    case TDMT_SYNC_LOCAL_CMD:
38,302✔
272
      code = syncNodeOnLocalCmd(pSyncNode, pMsg);
38,302✔
273
      break;
38,302✔
274
    case TDMT_SYNC_FORCE_FOLLOWER:
15✔
275
      code = syncForceBecomeFollower(pSyncNode, pMsg);
15✔
276
      break;
15✔
277
    case TDMT_SYNC_SET_ASSIGNED_LEADER:
×
278
      code = syncBecomeAssignedLeader(pSyncNode, pMsg);
×
279
      break;
×
280
    default:
×
281
      code = TSDB_CODE_MSG_NOT_PROCESSED;
×
282
  }
283

284
  syncNodeRelease(pSyncNode);
2,954,376✔
285
  if (code != 0) {
2,954,446✔
286
    sDebug("vgId:%d, failed to process sync msg:%p type:%s since %s", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
16!
287
           tstrerror(code));
288
  }
289
  TAOS_RETURN(code);
2,954,446✔
290
}
291

292
int32_t syncLeaderTransfer(int64_t rid) {
13,528✔
293
  int32_t    code = 0;
13,528✔
294
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
13,528✔
295
  if (pSyncNode == NULL) {
13,528!
296
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
297
    if (terrno != 0) code = terrno;
×
298
    TAOS_RETURN(code);
×
299
  }
300

301
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
13,528✔
302
  syncNodeRelease(pSyncNode);
13,528✔
303
  return ret;
13,527✔
304
}
305

306
int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
15✔
307
  SRaftId id = {0};
15✔
308
  syncNodeBecomeFollower(ths, id, "force election");
15✔
309

310
  SRpcMsg rsp = {
15✔
311
      .code = 0,
312
      .pCont = pRpcMsg->info.rsp,
15✔
313
      .contLen = pRpcMsg->info.rspLen,
15✔
314
      .info = pRpcMsg->info,
315
  };
316
  tmsgSendRsp(&rsp);
15✔
317

318
  return 0;
15✔
319
}
320

321
int32_t syncBecomeAssignedLeader(SSyncNode* ths, SRpcMsg* pRpcMsg) {
×
322
  int32_t code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
323
  void*   pHead = NULL;
×
324
  int32_t contLen = 0;
×
325

326
  SVArbSetAssignedLeaderReq req = {0};
×
327
  if (tDeserializeSVArbSetAssignedLeaderReq((char*)pRpcMsg->pCont + sizeof(SMsgHead), pRpcMsg->contLen, &req) != 0) {
×
328
    sError("vgId:%d, failed to deserialize SVArbSetAssignedLeaderReq", ths->vgId);
×
329
    code = TSDB_CODE_INVALID_MSG;
×
330
    goto _OVER;
×
331
  }
332

333
  if (ths->arbTerm > req.arbTerm) {
×
334
    sInfo("vgId:%d, skip to set assigned leader, msg with lower term, local:%" PRId64 "msg:%" PRId64, ths->vgId,
×
335
          ths->arbTerm, req.arbTerm);
336
    goto _OVER;
×
337
  }
338

339
  ths->arbTerm = TMAX(req.arbTerm, ths->arbTerm);
×
340

341
  if (!req.force) {
×
342
    if (strncmp(req.memberToken, ths->arbToken, TSDB_ARB_TOKEN_SIZE) != 0) {
×
343
      sInfo("vgId:%d, skip to set assigned leader, token mismatch, local:%s, msg:%s", ths->vgId, ths->arbToken,
×
344
            req.memberToken);
345
      goto _OVER;
×
346
    }
347
  }
348

349
  if (ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
×
350
    code = TSDB_CODE_SUCCESS;
×
351
    raftStoreNextTerm(ths);
×
352
    if (terrno != TSDB_CODE_SUCCESS) {
×
353
      code = terrno;
×
354
      sError("vgId:%d, failed to set next term since:%s", ths->vgId, tstrerror(code));
×
355
      goto _OVER;
×
356
    }
357
    syncNodeBecomeAssignedLeader(ths);
×
358

359
    if ((code = syncNodeAppendNoop(ths)) < 0) {
×
360
      sError("vgId:%d, assigned leader failed to append noop entry since %s", ths->vgId, tstrerror(code));
×
361
    }
362
  }
363

364
  SVArbSetAssignedLeaderRsp rsp = {0};
×
365
  rsp.arbToken = req.arbToken;
×
366
  rsp.memberToken = req.memberToken;
×
367
  rsp.vgId = ths->vgId;
×
368

369
  contLen = tSerializeSVArbSetAssignedLeaderRsp(NULL, 0, &rsp);
×
370
  if (contLen <= 0) {
×
371
    code = TSDB_CODE_OUT_OF_MEMORY;
×
372
    sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId);
×
373
    goto _OVER;
×
374
  }
375
  pHead = rpcMallocCont(contLen);
×
376
  if (!pHead) {
×
377
    code = terrno;
×
378
    sError("vgId:%d, failed to malloc memory for SVArbSetAssignedLeaderRsp", ths->vgId);
×
379
    goto _OVER;
×
380
  }
381
  if (tSerializeSVArbSetAssignedLeaderRsp(pHead, contLen, &rsp) <= 0) {
×
382
    code = TSDB_CODE_OUT_OF_MEMORY;
×
383
    sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId);
×
384
    rpcFreeCont(pHead);
×
385
    goto _OVER;
×
386
  }
387

388
  code = TSDB_CODE_SUCCESS;
×
389

390
_OVER:;
×
391
  SRpcMsg rspMsg = {
×
392
      .code = code,
393
      .pCont = pHead,
394
      .contLen = contLen,
395
      .info = pRpcMsg->info,
396
  };
397

398
  tmsgSendRsp(&rspMsg);
×
399

400
  tFreeSVArbSetAssignedLeaderReq(&req);
×
401
  TAOS_RETURN(code);
×
402
}
403

404
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
×
405
  int32_t    code = 0;
×
406
  SSyncNode* pNode = syncNodeAcquire(rid);
×
407
  if (pNode == NULL) {
×
408
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
409
    if (terrno != 0) code = terrno;
×
410
    TAOS_RETURN(code);
×
411
  }
412

413
  SRpcMsg rpcMsg = {0, .info.notFreeAhandle = 1};
×
414
  int32_t ret = syncRespMgrGetAndDel(pNode->pSyncRespMgr, seq, &rpcMsg.info);
×
415
  rpcMsg.code = TSDB_CODE_SYN_TIMEOUT;
×
416

417
  syncNodeRelease(pNode);
×
418
  if (ret == 1) {
×
419
    sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle);
×
420
    code = rpcSendResponse(&rpcMsg);
×
421
    return code;
×
422
  } else {
423
    sError("no message handle to send timeout response, seq:%" PRId64, seq);
×
424
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
425
  }
426
}
427

428
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
44,690✔
429
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;
44,690✔
430

431
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
133,648✔
432
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
88,958✔
433
    if (minMatchIndex == SYNC_INDEX_INVALID) {
88,958✔
434
      minMatchIndex = matchIndex;
47,935✔
435
    } else if (matchIndex > 0 && matchIndex < minMatchIndex) {
41,023✔
436
      minMatchIndex = matchIndex;
1,918✔
437
    }
438
  }
439
  return minMatchIndex;
44,690✔
440
}
441

442
static SyncIndex syncLogRetentionIndex(SSyncNode* pSyncNode, int64_t bytes) {
811✔
443
  return pSyncNode->pLogStore->syncLogIndexRetention(pSyncNode->pLogStore, bytes);
811✔
444
}
445

446
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
31,066✔
447
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
31,066✔
448
  int32_t    code = 0;
31,066✔
449
  if (pSyncNode == NULL) {
31,066✔
450
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
38✔
451
    if (terrno != 0) code = terrno;
38!
452
    sError("sync begin snapshot error");
38!
453
    TAOS_RETURN(code);
38✔
454
  }
455

456
  SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
31,028✔
457
  SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
31,028✔
458
  bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);
31,028✔
459

460
  if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
31,028!
461
    sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
59!
462
    syncNodeRelease(pSyncNode);
59✔
463
    return 0;
59✔
464
  }
465

466
  int64_t logRetention = 0;
30,969✔
467

468
  if (syncNodeIsMnode(pSyncNode)) {
30,969✔
469
    // mnode
470
    logRetention = tsMndLogRetention;
3,199✔
471
  } else {
472
    // vnode
473
    if (pSyncNode->replicaNum > 1) {
27,769✔
474
      logRetention = SYNC_VNODE_LOG_RETENTION;
371✔
475
    }
476
  }
477

478
  if (pSyncNode->totalReplicaNum > 1) {
30,968✔
479
    if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER &&
824✔
480
        pSyncNode->state != TAOS_SYNC_STATE_LEARNER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
155!
481
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate or unknown state, do not delete wal",
13!
482
              lastApplyIndex);
483
      syncNodeRelease(pSyncNode);
13✔
484
      return 0;
13✔
485
    }
486
    SyncIndex retentionIndex =
811✔
487
        TMAX(pSyncNode->minMatchIndex, syncLogRetentionIndex(pSyncNode, SYNC_WAL_LOG_RETENTION_SIZE));
811✔
488
    logRetention += TMAX(0, lastApplyIndex - retentionIndex);
811✔
489
  }
490

491
_DEL_WAL:
30,144✔
492

493
  do {
494
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
30,955✔
495
    SyncIndex          snapshotVer = walGetSnapshotVer(pData->pWal);
30,955✔
496
    SyncIndex          walCommitVer = walGetCommittedVer(pData->pWal);
30,955✔
497
    SyncIndex          wallastVer = walGetLastVer(pData->pWal);
30,955✔
498
    if (lastApplyIndex <= walCommitVer) {
30,955!
499
      SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);
30,955✔
500

501
      if (snapshottingIndex == SYNC_INDEX_INVALID) {
30,955!
502
        atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
30,955✔
503
        pSyncNode->snapshottingTime = taosGetTimestampMs();
30,956✔
504

505
        code = walBeginSnapshot(pData->pWal, lastApplyIndex, logRetention);
30,956✔
506
        if (code == 0) {
30,955!
507
          sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
30,955✔
508
                  pSyncNode->snapshottingIndex, lastApplyIndex);
509
        } else {
510
          sNError(pSyncNode, "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64,
×
511
                  terrstr(), pSyncNode->snapshottingIndex, lastApplyIndex);
512
          atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
×
513
        }
514

515
      } else {
516
        sNTrace(pSyncNode, "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64,
×
517
                snapshottingIndex, lastApplyIndex);
518
      }
519
    }
520
  } while (0);
521

522
  syncNodeRelease(pSyncNode);
30,956✔
523
  TAOS_RETURN(code);
30,956✔
524
}
525

526
int32_t syncEndSnapshot(int64_t rid) {
31,028✔
527
  int32_t    code = 0;
31,028✔
528
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
31,028✔
529
  if (pSyncNode == NULL) {
31,028!
530
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
531
    if (terrno != 0) code = terrno;
×
532
    sError("sync end snapshot error");
×
533
    TAOS_RETURN(code);
×
534
  }
535

536
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
31,028✔
537
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
30,956✔
538
    code = walEndSnapshot(pData->pWal);
30,956✔
539
    if (code != 0) {
30,956!
540
      sNError(pSyncNode, "wal snapshot end error since:%s", tstrerror(code));
×
541
      syncNodeRelease(pSyncNode);
×
542
      TAOS_RETURN(code);
×
543
    } else {
544
      sNTrace(pSyncNode, "wal snapshot end, index:%" PRId64, atomic_load_64(&pSyncNode->snapshottingIndex));
30,956✔
545
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
30,956✔
546
    }
547
  }
548

549
  syncNodeRelease(pSyncNode);
31,028✔
550
  TAOS_RETURN(code);
31,028✔
551
}
552

553
bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
18,556,904✔
554
  if (pSyncNode == NULL) {
18,556,904!
555
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
556
    sError("sync ready for read error");
×
557
    return false;
×
558
  }
559

560
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
18,556,904!
561
    terrno = TSDB_CODE_SYN_NOT_LEADER;
71,375✔
562
    return false;
71,375✔
563
  }
564

565
  if (!pSyncNode->restoreFinish) {
18,485,529✔
566
    terrno = TSDB_CODE_SYN_RESTORING;
19,493✔
567
    return false;
19,493✔
568
  }
569

570
  return true;
18,466,036✔
571
}
572

573
bool syncIsReadyForRead(int64_t rid) {
17,264,967✔
574
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
17,264,967✔
575
  if (pSyncNode == NULL) {
17,268,050!
576
    sError("sync ready for read error");
×
577
    return false;
×
578
  }
579

580
  bool ready = syncNodeIsReadyForRead(pSyncNode);
17,268,050✔
581

582
  syncNodeRelease(pSyncNode);
17,268,252✔
583
  return ready;
17,267,682✔
584
}
585

586
#ifdef BUILD_NO_CALL
587
bool syncSnapshotSending(int64_t rid) {
588
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
589
  if (pSyncNode == NULL) {
590
    return false;
591
  }
592

593
  bool b = syncNodeSnapshotSending(pSyncNode);
594
  syncNodeRelease(pSyncNode);
595
  return b;
596
}
597

598
bool syncSnapshotRecving(int64_t rid) {
599
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
600
  if (pSyncNode == NULL) {
601
    return false;
602
  }
603

604
  bool b = syncNodeSnapshotRecving(pSyncNode);
605
  syncNodeRelease(pSyncNode);
606
  return b;
607
}
608
#endif
609

610
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
13,528✔
611
  if (pSyncNode->peersNum == 0) {
13,528✔
612
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
10,229✔
613
    return 0;
10,229✔
614
  }
615

616
  int32_t ret = 0;
3,299✔
617
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->replicaNum > 1) {
3,299✔
618
    SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
1,019✔
619
    if (pSyncNode->peersNum == 2) {
1,019✔
620
      SyncIndex matchIndex0 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
730✔
621
      SyncIndex matchIndex1 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[1]));
730✔
622
      if (matchIndex1 > matchIndex0) {
730✔
623
        newLeader = (pSyncNode->peersNodeInfo)[1];
41✔
624
      }
625
    }
626
    ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
1,019✔
627
  }
628

629
  return ret;
3,299✔
630
}
631

632
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
1,019✔
633
  if (pSyncNode->replicaNum == 1) {
1,019!
634
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
×
635
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
636
  }
637

638
  sNTrace(pSyncNode, "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
1,019!
639

640
  SRpcMsg rpcMsg = {0};
1,019✔
641
  TAOS_CHECK_RETURN(syncBuildLeaderTransfer(&rpcMsg, pSyncNode->vgId));
1,019!
642

643
  SyncLeaderTransfer* pMsg = rpcMsg.pCont;
1,019✔
644
  pMsg->newLeaderId.addr = SYNC_ADDR(&newLeader);
1,019✔
645
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
1,019✔
646
  pMsg->newNodeInfo = newLeader;
1,019✔
647

648
  int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false, NULL);
1,019✔
649
  rpcFreeCont(rpcMsg.pCont);
1,019✔
650
  return ret;
1,019✔
651
}
652

653
SSyncState syncGetState(int64_t rid) {
6,410,902✔
654
  SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
6,410,902✔
655

656
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
6,410,902✔
657
  if (pSyncNode != NULL) {
6,412,453!
658
    state.state = pSyncNode->state;
6,412,506✔
659
    state.roleTimeMs = pSyncNode->roleTimeMs;
6,412,506✔
660
    state.startTimeMs = pSyncNode->startTime;
6,412,506✔
661
    state.restored = pSyncNode->restoreFinish;
6,412,506✔
662
    if (pSyncNode->vgId != 1) {
6,412,506✔
663
      state.canRead = syncNodeIsReadyForRead(pSyncNode);
1,291,946✔
664
    } else {
665
      state.canRead = state.restored;
5,120,560✔
666
    }
667
    /*
668
    double progress = 0;
669
    if(pSyncNode->pLogBuf->totalIndex > 0 && pSyncNode->pLogBuf->commitIndex > 0){
670
      progress = (double)pSyncNode->pLogBuf->commitIndex/(double)pSyncNode->pLogBuf->totalIndex;
671
      state.progress = (int32_t)(progress * 100);
672
    }
673
    else{
674
      state.progress = -1;
675
    }
676
    sDebug("vgId:%d, learner progress state, commitIndex:%" PRId64 " totalIndex:%" PRId64 ", "
677
            "progress:%lf, progress:%d",
678
          pSyncNode->vgId,
679
         pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->totalIndex, progress, state.progress);
680
    */
681
    state.term = raftStoreGetTerm(pSyncNode);
6,412,459✔
682
    syncNodeRelease(pSyncNode);
6,412,663✔
683
  }
684

685
  return state;
6,412,594✔
686
}
687

688
void syncGetCommitIndex(int64_t rid, int64_t* syncCommitIndex) {
1,110,934✔
689
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
1,110,934✔
690
  if (pSyncNode != NULL) {
1,110,934!
691
    *syncCommitIndex = pSyncNode->commitIndex;
1,110,934✔
692
    syncNodeRelease(pSyncNode);
1,110,934✔
693
  }
694
}
1,110,934✔
695

696
int32_t syncGetArbToken(int64_t rid, char* outToken) {
67,193✔
697
  int32_t    code = 0;
67,193✔
698
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
67,193✔
699
  if (pSyncNode == NULL) {
67,193!
700
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
701
    if (terrno != 0) code = terrno;
×
702
    TAOS_RETURN(code);
×
703
  }
704

705
  memset(outToken, 0, TSDB_ARB_TOKEN_SIZE);
67,193✔
706
  (void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
67,193✔
707
  tstrncpy(outToken, pSyncNode->arbToken, TSDB_ARB_TOKEN_SIZE);
67,193✔
708
  (void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
67,193✔
709

710
  syncNodeRelease(pSyncNode);
67,193✔
711
  TAOS_RETURN(code);
67,193✔
712
}
713

714
int32_t syncCheckSynced(int64_t rid) {
7✔
715
  int32_t    code = 0;
7✔
716
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
7✔
717
  if (pSyncNode == NULL) {
7!
718
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
719
    if (terrno != 0) code = terrno;
×
720
    TAOS_RETURN(code);
×
721
  }
722

723
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
7!
724
    code = TSDB_CODE_VND_ARB_NOT_SYNCED;
×
725
    syncNodeRelease(pSyncNode);
×
726
    TAOS_RETURN(code);
×
727
  }
728

729
  bool isSync = pSyncNode->commitIndex >= pSyncNode->assignedCommitIndex;
7✔
730
  code = (isSync ? TSDB_CODE_SUCCESS : TSDB_CODE_VND_ARB_NOT_SYNCED);
7!
731

732
  syncNodeRelease(pSyncNode);
7✔
733
  TAOS_RETURN(code);
7✔
734
}
735

736
int32_t syncUpdateArbTerm(int64_t rid, SyncTerm arbTerm) {
108✔
737
  int32_t    code = 0;
108✔
738
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
108✔
739
  if (pSyncNode == NULL) {
108!
740
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
741
    if (terrno != 0) code = terrno;
×
742
    TAOS_RETURN(code);
×
743
  }
744

745
  pSyncNode->arbTerm = TMAX(arbTerm, pSyncNode->arbTerm);
108✔
746
  syncNodeRelease(pSyncNode);
108✔
747
  TAOS_RETURN(code);
108✔
748
}
749

750
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
1,684,506✔
751
  if (pSyncNode->raftCfg.configIndexCount < 1) {
1,684,506!
752
    sError("vgId:%d, failed get snapshot config index, configIndexCount:%d", pSyncNode->vgId,
×
753
           pSyncNode->raftCfg.configIndexCount);
754
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
755
    return -2;
×
756
  }
757
  SyncIndex lastIndex = (pSyncNode->raftCfg.configIndexArr)[0];
1,684,506✔
758

759
  for (int32_t i = 0; i < pSyncNode->raftCfg.configIndexCount; ++i) {
3,413,561✔
760
    if ((pSyncNode->raftCfg.configIndexArr)[i] > lastIndex &&
1,729,055✔
761
        (pSyncNode->raftCfg.configIndexArr)[i] <= snapshotLastApplyIndex) {
44,546!
762
      lastIndex = (pSyncNode->raftCfg.configIndexArr)[i];
44,546✔
763
    }
764
  }
765
  sTrace("vgId:%d, index:%" PRId64 ", get last snapshot config index:%" PRId64, pSyncNode->vgId, snapshotLastApplyIndex,
1,684,506✔
766
         lastIndex);
767

768
  return lastIndex;
1,684,509✔
769
}
770

771
static SRaftId syncGetRaftIdByEp(SSyncNode* pSyncNode, const SEp* pEp) {
120,914✔
772
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
245,736✔
773
    if (strcmp(pEp->fqdn, (pSyncNode->peersNodeInfo)[i].nodeFqdn) == 0 &&
204,194✔
774
        pEp->port == (pSyncNode->peersNodeInfo)[i].nodePort) {
204,193✔
775
      return pSyncNode->peersId[i];
79,372✔
776
    }
777
  }
778
  return EMPTY_RAFT_ID;
41,542✔
779
}
780

781
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
41,641✔
782
  pEpSet->numOfEps = 0;
41,641✔
783

784
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
41,641✔
785
  if (pSyncNode == NULL) return;
41,641!
786

787
  int index = -1;
41,641✔
788

789
  int j = 0;
41,641✔
790
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
163,581✔
791
    if (pSyncNode->raftCfg.cfg.nodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) continue;
121,940✔
792
    SEp* pEp = &pEpSet->eps[j];
120,915✔
793
    tstrncpy(pEp->fqdn, pSyncNode->raftCfg.cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
120,915✔
794
    pEp->port = (pSyncNode->raftCfg.cfg.nodeInfo)[i].nodePort;
120,915✔
795
    pEpSet->numOfEps++;
120,915✔
796
    sDebug("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port);
120,915✔
797
    SRaftId id = syncGetRaftIdByEp(pSyncNode, pEp);
120,915✔
798
    if (id.addr == pSyncNode->leaderCache.addr && id.vgId == pSyncNode->leaderCache.vgId && id.addr != 0 &&
120,915!
799
        id.vgId != 0)
16,657!
800
      index = j;
16,657✔
801
    j++;
120,915✔
802
  }
803
  if (pEpSet->numOfEps > 0) {
41,641!
804
    if (index != -1) {
41,641✔
805
      pEpSet->inUse = index;
16,657✔
806
    } else {
807
      if (pSyncNode->myRaftId.addr == pSyncNode->leaderCache.addr &&
24,984✔
808
          pSyncNode->myRaftId.vgId == pSyncNode->leaderCache.vgId) {
707!
809
        pEpSet->inUse = pSyncNode->raftCfg.cfg.myIndex;
707✔
810
      } else {
811
        pEpSet->inUse = (pSyncNode->raftCfg.cfg.myIndex + 1) % pEpSet->numOfEps;
24,277✔
812
      }
813
    }
814
    // pEpSet->inUse = 0;
815
  }
816
  epsetSort(pEpSet);
41,641✔
817

818
  sInfo("vgId:%d, sync get retry epset numOfEps:%d inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, pEpSet->inUse);
41,641!
819
  syncNodeRelease(pSyncNode);
41,641✔
820
}
821

822
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
10,456,782✔
823
  int32_t    code = 0;
10,456,782✔
824
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
10,456,782✔
825
  if (pSyncNode == NULL) {
10,456,954✔
826
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
1✔
827
    if (terrno != 0) code = terrno;
1!
828
    sError("sync propose error");
1!
829
    TAOS_RETURN(code);
1✔
830
  }
831

832
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak, seq);
10,456,953✔
833
  syncNodeRelease(pSyncNode);
10,456,618✔
834
  return ret;
10,456,984✔
835
}
836

837
int32_t syncCheckMember(int64_t rid) {
×
838
  int32_t    code = 0;
×
839
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
840
  if (pSyncNode == NULL) {
×
841
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
842
    if (terrno != 0) code = terrno;
×
843
    sError("sync propose error");
×
844
    TAOS_RETURN(code);
×
845
  }
846

847
  if (pSyncNode->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) {
×
848
    syncNodeRelease(pSyncNode);
×
849
    return TSDB_CODE_SYN_WRONG_ROLE;
×
850
  }
851

852
  syncNodeRelease(pSyncNode);
×
853
  return 0;
×
854
}
855

856
int32_t syncIsCatchUp(int64_t rid) {
4,773✔
857
  int32_t    code = 0;
4,773✔
858
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
4,773✔
859
  if (pSyncNode == NULL) {
4,773!
860
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
861
    if (terrno != 0) code = terrno;
×
862
    sError("sync Node Acquire error since %d", ERRNO);
×
863
    TAOS_RETURN(code);
×
864
  }
865

866
  int32_t isCatchUp = 0;
4,773✔
867
  if (pSyncNode->pLogBuf->totalIndex < 0 || pSyncNode->pLogBuf->commitIndex < 0 ||
4,773!
868
      pSyncNode->pLogBuf->totalIndex < pSyncNode->pLogBuf->commitIndex ||
1,134!
869
      pSyncNode->pLogBuf->totalIndex - pSyncNode->pLogBuf->commitIndex > SYNC_LEARNER_CATCHUP) {
1,134✔
870
    sInfo("vgId:%d, Not catch up, wait one second, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64,
4,512!
871
          pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex,
872
          pSyncNode->pLogBuf->matchIndex);
873
    isCatchUp = 0;
4,512✔
874
  } else {
875
    sInfo("vgId:%d, Catch up, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, pSyncNode->vgId,
261!
876
          pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->matchIndex);
877
    isCatchUp = 1;
261✔
878
  }
879

880
  syncNodeRelease(pSyncNode);
4,773✔
881
  return isCatchUp;
4,773✔
882
}
883

884
ESyncRole syncGetRole(int64_t rid) {
4,773✔
885
  int32_t    code = 0;
4,773✔
886
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
4,773✔
887
  if (pSyncNode == NULL) {
4,773!
888
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
889
    if (terrno != 0) code = terrno;
×
890
    sError("sync Node Acquire error since %d", ERRNO);
×
891
    TAOS_RETURN(code);
×
892
  }
893

894
  ESyncRole role = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole;
4,773✔
895

896
  syncNodeRelease(pSyncNode);
4,773✔
897
  return role;
4,773✔
898
}
899

900
int64_t syncGetTerm(int64_t rid) {
26,729✔
901
  int32_t    code = 0;
26,729✔
902
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
26,729✔
903
  if (pSyncNode == NULL) {
26,729!
904
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
905
    if (terrno != 0) code = terrno;
×
906
    sError("sync Node Acquire error since %d", ERRNO);
×
907
    TAOS_RETURN(code);
×
908
  }
909

910
  int64_t term = raftStoreGetTerm(pSyncNode);
26,729✔
911

912
  syncNodeRelease(pSyncNode);
26,729✔
913
  return term;
26,729✔
914
}
915

916
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
10,457,822✔
917
  int32_t code = 0;
10,457,822✔
918
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
10,457,822!
919
    code = TSDB_CODE_SYN_NOT_LEADER;
11,171✔
920
    sNWarn(pSyncNode, "sync propose not leader, type:%s", TMSG_INFO(pMsg->msgType));
11,171!
921
    TAOS_RETURN(code);
11,171✔
922
  }
923

924
  if (!pSyncNode->restoreFinish) {
10,446,651✔
925
    code = TSDB_CODE_SYN_PROPOSE_NOT_READY;
91✔
926
    sNWarn(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
91!
927
           TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
928
    TAOS_RETURN(code);
91✔
929
  }
930

931
  // heartbeat timeout
932
  if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER && syncNodeHeartbeatReplyTimeout(pSyncNode)) {
10,446,560!
933
    code = TSDB_CODE_SYN_PROPOSE_NOT_READY;
6✔
934
    sNError(pSyncNode, "failed to sync propose since heartbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
6!
935
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
936
    TAOS_RETURN(code);
6✔
937
  }
938

939
  // optimized one replica
940
  if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
10,446,551✔
941
    SyncIndex retIndex;
942
    int32_t   code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
10,214,466✔
943
    if (code >= 0) {
10,214,135!
944
      pMsg->info.conn.applyIndex = retIndex;
10,214,221✔
945
      pMsg->info.conn.applyTerm = raftStoreGetTerm(pSyncNode);
10,214,221✔
946

947
      // after raft member change, need to handle 1->2 switching point
948
      // at this point, need to switch entry handling thread
949
      if (pSyncNode->replicaNum == 1) {
10,214,530✔
950
        sGDebug(&pMsg->info.traceId, "vgId:%d, index:%" PRId64 ", propose optimized msg type:%s", pSyncNode->vgId,
10,214,513!
951
                retIndex, TMSG_INFO(pMsg->msgType));
952
        return 1;
10,214,350✔
953
      } else {
954
        sGDebug(&pMsg->info.traceId,
17!
955
                "vgId:%d, index:%" PRId64 ", propose optimized msg, return to normal, type:%s, handle:%p",
956
                pSyncNode->vgId, retIndex, TMSG_INFO(pMsg->msgType), pMsg->info.handle);
957
        return 0;
×
958
      }
959
    } else {
960
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
961
      sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
×
962
             TMSG_INFO(pMsg->msgType));
963
      TAOS_RETURN(code);
×
964
    }
965
  } else {
966
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
232,253✔
967
    uint64_t  seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
232,264✔
968
    SRpcMsg   rpcMsg = {.info.traceId = pMsg->info.traceId};
232,271✔
969
    int32_t   code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
232,271✔
970
    if (code != 0) {
232,270!
971
      sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
×
972
      code = syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
×
973
      TAOS_RETURN(code);
×
974
    }
975

976
    sGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, propose msg, type:%s", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType));
232,270!
977
    code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
232,270✔
978
    if (code != 0) {
232,268✔
979
      sWarn("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
1,325!
980
      TAOS_CHECK_RETURN(syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum));
1,325✔
981
    }
982

983
    if (seq != NULL) *seq = seqNum;
232,233✔
984
    TAOS_RETURN(code);
232,233✔
985
  }
986
}
987

988
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
225,116✔
989
  pSyncTimer->pTimer = NULL;
225,116✔
990
  pSyncTimer->counter = 0;
225,116✔
991
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
225,116✔
992
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
225,116✔
993
  pSyncTimer->destId = destId;
225,116✔
994
  pSyncTimer->timeStamp = taosGetTimestampMs();
225,117✔
995
  atomic_store_64(&pSyncTimer->logicClock, 0);
225,117✔
996
  return 0;
225,117✔
997
}
998

999
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
2,419✔
1000
  int32_t code = 0;
2,419✔
1001
  int64_t tsNow = taosGetTimestampMs();
2,419✔
1002
  if (syncIsInit()) {
2,419!
1003
    SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid);
2,419✔
1004
    if (pData == NULL) {
2,419!
1005
      pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
2,419!
1006
      pData->rid = syncHbTimerDataAdd(pData);
2,419✔
1007
    }
1008
    pSyncTimer->hbDataRid = pData->rid;
2,419✔
1009
    pSyncTimer->timeStamp = tsNow;
2,419✔
1010

1011
    pData->syncNodeRid = pSyncNode->rid;
2,419✔
1012
    pData->pTimer = pSyncTimer;
2,419✔
1013
    pData->destId = pSyncTimer->destId;
2,419✔
1014
    pData->logicClock = pSyncTimer->logicClock;
2,419✔
1015
    pData->execTime = tsNow + pSyncTimer->timerMS;
2,419✔
1016

1017
    sTrace("vgId:%d, start hb timer, rid:%" PRId64 " addr:0x%" PRIx64 " at %d", pSyncNode->vgId, pData->rid,
2,419!
1018
           pData->destId.addr, pSyncTimer->timerMS);
1019

1020
    bool stopped = taosTmrResetPriority(pSyncTimer->timerCb, pSyncTimer->timerMS, (void*)(pData->rid),
2,419✔
1021
                                        syncEnv()->pTimerManager, &pSyncTimer->pTimer, 2);
2,419✔
1022
    if (stopped) {
2,419!
1023
      sError("vgId:%d, failed to reset hb timer success", pSyncNode->vgId);
×
1024
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1025
    }
1026
  } else {
1027
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1028
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
×
1029
  }
1030
  return code;
2,419✔
1031
}
1032

1033
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
24,425✔
1034
  int32_t ret = 0;
24,425✔
1035
  (void)atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
24,425✔
1036
  bool stop = taosTmrStop(pSyncTimer->pTimer);
24,425✔
1037
  sDebug("vgId:%d, stop hb timer stop:%d", pSyncNode->vgId, stop);
24,426✔
1038
  pSyncTimer->pTimer = NULL;
24,425✔
1039
  syncHbTimerDataRemove(pSyncTimer->hbDataRid);
24,425✔
1040
  pSyncTimer->hbDataRid = -1;
24,425✔
1041
  return ret;
24,425✔
1042
}
1043

1044
int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
13,529✔
1045
  int32_t code = 0;
13,529✔
1046
  if (pNode->pLogStore == NULL) {
13,529!
1047
    sError("vgId:%d, log store not created", pNode->vgId);
×
1048
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1049
  }
1050
  if (pNode->pFsm == NULL) {
13,529!
1051
    sError("vgId:%d, pFsm not registered", pNode->vgId);
×
1052
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1053
  }
1054
  if (pNode->pFsm->FpGetSnapshotInfo == NULL) {
13,529!
1055
    sError("vgId:%d, FpGetSnapshotInfo not registered", pNode->vgId);
×
1056
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1057
  }
1058
  SSnapshot snapshot = {0};
13,529✔
1059
  // TODO check return value
1060
  (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
13,529✔
1061

1062
  SyncIndex commitIndex = snapshot.lastApplyIndex;
13,529✔
1063
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
13,529✔
1064
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
13,529✔
1065
  if ((lastVer < commitIndex || firstVer > commitIndex + 1) || pNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
13,529!
1066
    if ((code = pNode->pLogStore->syncLogRestoreFromSnapshot(pNode->pLogStore, commitIndex)) != 0) {
×
1067
      sError("vgId:%d, failed to restore log store from snapshot since %s. lastVer:%" PRId64 ", snapshotVer:%" PRId64,
×
1068
             pNode->vgId, terrstr(), lastVer, commitIndex);
1069
      TAOS_RETURN(code);
×
1070
    }
1071
  }
1072
  TAOS_RETURN(code);
13,529✔
1073
}
1074

1075
// open/close --------------
1076
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
13,522✔
1077
  int32_t    code = 0;
13,522✔
1078
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
13,522!
1079
  if (pSyncNode == NULL) {
13,529!
1080
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1081
    goto _error;
×
1082
  }
1083

1084
  if (!taosDirExist((char*)(pSyncInfo->path))) {
13,529✔
1085
    if (taosMkDir(pSyncInfo->path) != 0) {
10,816!
1086
      terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1087
      sError("vgId:%d, failed to create dir:%s since %s", pSyncInfo->vgId, pSyncInfo->path, terrstr());
×
1088
      goto _error;
×
1089
    }
1090
  }
1091

1092
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
13,527✔
1093
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
13,527✔
1094
           TD_DIRSEP);
1095
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
13,527✔
1096

1097
  if (!taosCheckExistFile(pSyncNode->configPath)) {
13,527✔
1098
    // create a new raft config file
1099
    sInfo("vgId:%d, create a new raft config file", pSyncInfo->vgId);
10,816✔
1100
    pSyncNode->vgId = pSyncInfo->vgId;
10,816✔
1101
    pSyncNode->raftCfg.isStandBy = pSyncInfo->isStandBy;
10,816✔
1102
    pSyncNode->raftCfg.snapshotStrategy = pSyncInfo->snapshotStrategy;
10,816✔
1103
    pSyncNode->raftCfg.lastConfigIndex = pSyncInfo->syncCfg.lastIndex;
10,816✔
1104
    pSyncNode->raftCfg.batchSize = pSyncInfo->batchSize;
10,816✔
1105
    pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
10,816✔
1106
    pSyncNode->raftCfg.configIndexCount = 1;
10,816✔
1107
    pSyncNode->raftCfg.configIndexArr[0] = -1;
10,816✔
1108

1109
    if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
10,816!
1110
      terrno = code;
×
1111
      sError("vgId:%d, failed to create sync cfg file", pSyncNode->vgId);
×
1112
      goto _error;
×
1113
    }
1114
  } else {
1115
    // update syncCfg by raft_config.json
1116
    if ((code = syncReadCfgFile(pSyncNode)) != 0) {
2,713!
1117
      terrno = code;
×
1118
      sError("vgId:%d, failed to read sync cfg file", pSyncNode->vgId);
×
1119
      goto _error;
×
1120
    }
1121

1122
    if (vnodeVersion > pSyncNode->raftCfg.cfg.changeVersion) {
2,713✔
1123
      if (pSyncInfo->syncCfg.totalReplicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) {
1,679!
1124
        sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
1,384!
1125
        pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
1,384✔
1126
        if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
1,384!
1127
          terrno = code;
×
1128
          sError("vgId:%d, failed to write sync cfg file", pSyncNode->vgId);
×
1129
          goto _error;
×
1130
        }
1131
      } else {
1132
        sInfo("vgId:%d, use sync config from sync cfg file", pSyncNode->vgId);
295!
1133
        pSyncInfo->syncCfg = pSyncNode->raftCfg.cfg;
295✔
1134
      }
1135
    } else {
1136
      sInfo("vgId:%d, skip save sync cfg file since request ver:%d <= file ver:%d", pSyncNode->vgId, vnodeVersion,
1,034!
1137
            pSyncInfo->syncCfg.changeVersion);
1138
    }
1139
  }
1140

1141
  // init by SSyncInfo
1142
  pSyncNode->vgId = pSyncInfo->vgId;
13,529✔
1143
  SSyncCfg* pCfg = &pSyncNode->raftCfg.cfg;
13,529✔
1144
  bool      updated = false;
13,529✔
1145
  sInfo("vgId:%d, start to open sync node, totalReplicaNum:%d replicaNum:%d selfIndex:%d", pSyncNode->vgId,
13,529✔
1146
        pCfg->totalReplicaNum, pCfg->replicaNum, pCfg->myIndex);
1147
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
33,688✔
1148
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
20,159✔
1149
    if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) {
20,159!
1150
      updated = true;
×
1151
    }
1152
    sInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort,
20,159✔
1153
          pNode->nodeId, pNode->clusterId);
1154
  }
1155

1156
  if (vnodeVersion > pSyncInfo->syncCfg.changeVersion) {
13,529✔
1157
    if (updated) {
1,767!
1158
      sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId);
×
1159
      if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
×
1160
        terrno = code;
×
1161
        sError("vgId:%d, failed to write sync cfg file on dnode info updated", pSyncNode->vgId);
×
1162
        goto _error;
×
1163
      }
1164
    }
1165
  }
1166

1167
  pSyncNode->pWal = pSyncInfo->pWal;
13,529✔
1168
  pSyncNode->msgcb = pSyncInfo->msgcb;
13,529✔
1169
  pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
13,529✔
1170
  pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
13,529✔
1171
  pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
13,529✔
1172

1173
  // create raft log ring buffer
1174
  code = syncLogBufferCreate(&pSyncNode->pLogBuf);
13,529✔
1175
  if (pSyncNode->pLogBuf == NULL) {
13,529✔
1176
    sError("failed to init sync log buffer since %s. vgId:%d", tstrerror(code), pSyncNode->vgId);
1!
1177
    goto _error;
×
1178
  }
1179

1180
  // init replicaNum, replicasId
1181
  pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
13,528✔
1182
  pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
13,528✔
1183
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
33,687✔
1184
    if (syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]) ==
20,159!
1185
        false) {
1186
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
×
1187
      goto _error;
×
1188
    }
1189
  }
1190

1191
  // init internal
1192
  pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
13,528✔
1193
  pSyncNode->myRaftId = pSyncNode->replicasId[pSyncNode->raftCfg.cfg.myIndex];
13,528✔
1194

1195
  // init peersNum, peers, peersId
1196
  pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
13,528✔
1197
  int32_t j = 0;
13,528✔
1198
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
33,687✔
1199
    if (i != pSyncNode->raftCfg.cfg.myIndex) {
20,159✔
1200
      pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
6,630✔
1201
      pSyncNode->peersId[j] = pSyncNode->replicasId[i];
6,630✔
1202
      syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
6,630✔
1203
      j++;
6,630✔
1204
    }
1205
  }
1206

1207
  pSyncNode->arbTerm = -1;
13,528✔
1208
  (void)taosThreadMutexInit(&pSyncNode->arbTokenMutex, NULL);
13,528✔
1209
  syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncInfo->vgId, pSyncNode->arbToken);
13,529✔
1210
  sInfo("vgId:%d, generate arb token:%s", pSyncNode->vgId, pSyncNode->arbToken);
13,528✔
1211

1212
  // init raft algorithm
1213
  pSyncNode->pFsm = pSyncInfo->pFsm;
13,529✔
1214
  pSyncInfo->pFsm = NULL;
13,529✔
1215
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
13,529✔
1216
  pSyncNode->leaderCache = EMPTY_RAFT_ID;
13,529✔
1217

1218
  // init life cycle outside
1219

1220
  // TLA+ Spec
1221
  // InitHistoryVars == /\ elections = {}
1222
  //                    /\ allLogs   = {}
1223
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
1224
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
1225
  //                   /\ state       = [i \in Server |-> Follower]
1226
  //                   /\ votedFor    = [i \in Server |-> Nil]
1227
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
1228
  //                      /\ votesGranted   = [i \in Server |-> {}]
1229
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
1230
  // \* leader does not send itself messages. It's still easier to include these
1231
  // \* in the functions.
1232
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
1233
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
1234
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
1235
  //                /\ commitIndex  = [i \in Server |-> 0]
1236
  // Init == /\ messages = [m \in {} |-> 0]
1237
  //         /\ InitHistoryVars
1238
  //         /\ InitServerVars
1239
  //         /\ InitCandidateVars
1240
  //         /\ InitLeaderVars
1241
  //         /\ InitLogVars
1242
  //
1243

1244
  // init TLA+ server vars
1245
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
13,529✔
1246
  pSyncNode->roleTimeMs = taosGetTimestampMs();
13,529✔
1247
  if ((code = raftStoreOpen(pSyncNode)) != 0) {
13,529!
1248
    terrno = code;
×
1249
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
×
1250
    goto _error;
×
1251
  }
1252

1253
  // init TLA+ candidate vars
1254
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
13,529✔
1255
  if (pSyncNode->pVotesGranted == NULL) {
13,528!
1256
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
×
1257
    goto _error;
×
1258
  }
1259
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
13,528✔
1260
  if (pSyncNode->pVotesRespond == NULL) {
13,527!
1261
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
×
1262
    goto _error;
×
1263
  }
1264

1265
  // init TLA+ leader vars
1266
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
13,527✔
1267
  if (pSyncNode->pNextIndex == NULL) {
13,527!
1268
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
×
1269
    goto _error;
×
1270
  }
1271
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
13,527✔
1272
  if (pSyncNode->pMatchIndex == NULL) {
13,529!
1273
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
×
1274
    goto _error;
×
1275
  }
1276

1277
  // init TLA+ log vars
1278
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
13,529✔
1279
  if (pSyncNode->pLogStore == NULL) {
13,527!
1280
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
×
1281
    goto _error;
×
1282
  }
1283

1284
  SyncIndex commitIndex = SYNC_INDEX_INVALID;
13,527✔
1285
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
13,527!
1286
    SSnapshot snapshot = {0};
13,529✔
1287
    // TODO check return value
1288
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
13,529✔
1289
    if (snapshot.lastApplyIndex > commitIndex) {
13,528✔
1290
      commitIndex = snapshot.lastApplyIndex;
1,323✔
1291
      sNTrace(pSyncNode, "reset commit index by snapshot");
1,323!
1292
    }
1293
    pSyncNode->fsmState = snapshot.state;
13,528✔
1294
    if (pSyncNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
13,528!
1295
      sError("vgId:%d, fsm state is incomplete.", pSyncNode->vgId);
×
1296
      if (pSyncNode->replicaNum == 1) {
×
1297
        terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1298
        goto _error;
×
1299
      }
1300
    }
1301
  }
1302
  pSyncNode->commitIndex = commitIndex;
13,526✔
1303
  sInfo("vgId:%d, sync node commitIndex initialized as %" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
13,526✔
1304

1305
  // restore log store on need
1306
  if ((code = syncNodeLogStoreRestoreOnNeed(pSyncNode)) < 0) {
13,527!
1307
    terrno = code;
×
1308
    sError("vgId:%d, failed to restore log store since %s.", pSyncNode->vgId, terrstr());
×
1309
    goto _error;
×
1310
  }
1311

1312
  // timer ms init
1313
  pSyncNode->pingBaseLine = PING_TIMER_MS;
13,529✔
1314
  pSyncNode->electBaseLine = tsElectInterval;
13,529✔
1315
  pSyncNode->hbBaseLine = tsHeartbeatInterval;
13,529✔
1316

1317
  // init ping timer
1318
  pSyncNode->pPingTimer = NULL;
13,529✔
1319
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
13,529✔
1320
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
13,529✔
1321
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
13,529✔
1322
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
13,529✔
1323
  pSyncNode->pingTimerCounter = 0;
13,529✔
1324

1325
  // init elect timer
1326
  pSyncNode->pElectTimer = NULL;
13,529✔
1327
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
13,529✔
1328
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
13,529✔
1329
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
13,529✔
1330
  pSyncNode->electTimerCounter = 0;
13,529✔
1331

1332
  // init heartbeat timer
1333
  pSyncNode->pHeartbeatTimer = NULL;
13,529✔
1334
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
13,529✔
1335
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
13,529✔
1336
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
13,529✔
1337
#ifdef BUILD_NO_CALL
1338
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
1339
#endif
1340
  pSyncNode->heartbeatTimerCounter = 0;
13,529✔
1341

1342
  // init peer heartbeat timer
1343
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
216,461✔
1344
    if ((code = syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i])) != 0) {
202,932!
1345
      terrno = code;
×
1346
      goto _error;
×
1347
    }
1348
  }
1349

1350
  // tools
1351
  if ((code = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS, &pSyncNode->pSyncRespMgr)) != 0) {
13,529!
1352
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
×
1353
    goto _error;
×
1354
  }
1355
  if (pSyncNode->pSyncRespMgr == NULL) {
13,529!
1356
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
×
1357
    goto _error;
×
1358
  }
1359

1360
  // restore state
1361
  pSyncNode->restoreFinish = false;
13,529✔
1362

1363
  // snapshot senders
1364
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
216,381✔
1365
    SSyncSnapshotSender* pSender = NULL;
202,852✔
1366
    code = snapshotSenderCreate(pSyncNode, i, &pSender);
202,852✔
1367
    if (pSender == NULL) return NULL;
202,840!
1368

1369
    pSyncNode->senders[i] = pSender;
202,840✔
1370
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
202,840✔
1371
  }
1372

1373
  // snapshot receivers
1374
  code = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID, &pSyncNode->pNewNodeReceiver);
13,529✔
1375
  if (pSyncNode->pNewNodeReceiver == NULL) return NULL;
13,529!
1376
  sRDebug(pSyncNode->pNewNodeReceiver, "snapshot receiver create while open sync node, data:%p",
13,529✔
1377
          pSyncNode->pNewNodeReceiver);
1378

1379
  // is config changing
1380
  pSyncNode->changing = false;
13,529✔
1381

1382
  // replication mgr
1383
  if ((code = syncNodeLogReplInit(pSyncNode)) < 0) {
13,529!
1384
    terrno = code;
×
1385
    sError("vgId:%d, failed to init repl mgr since %s.", pSyncNode->vgId, terrstr());
×
1386
    goto _error;
×
1387
  }
1388

1389
  // peer state
1390
  if ((code = syncNodePeerStateInit(pSyncNode)) < 0) {
13,529!
1391
    terrno = code;
×
1392
    sError("vgId:%d, failed to init peer stat since %s.", pSyncNode->vgId, terrstr());
×
1393
    goto _error;
×
1394
  }
1395

1396
  //
1397
  // min match index
1398
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
13,529✔
1399

1400
  // start in syncNodeStart
1401
  // start raft
1402

1403
  int64_t timeNow = taosGetTimestampMs();
13,529✔
1404
  pSyncNode->startTime = timeNow;
13,529✔
1405
  pSyncNode->lastReplicateTime = timeNow;
13,529✔
1406

1407
  // snapshotting
1408
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
13,529✔
1409

1410
  // init log buffer
1411
  if ((code = syncLogBufferInit(pSyncNode->pLogBuf, pSyncNode)) < 0) {
13,529!
1412
    terrno = code;
×
1413
    sError("vgId:%d, failed to init sync log buffer since %s", pSyncNode->vgId, terrstr());
×
1414
    goto _error;
×
1415
  }
1416

1417
  pSyncNode->isStart = true;
13,529✔
1418
  pSyncNode->electNum = 0;
13,529✔
1419
  pSyncNode->becomeLeaderNum = 0;
13,529✔
1420
  pSyncNode->becomeAssignedLeaderNum = 0;
13,529✔
1421
  pSyncNode->configChangeNum = 0;
13,529✔
1422
  pSyncNode->hbSlowNum = 0;
13,529✔
1423
  pSyncNode->hbrSlowNum = 0;
13,529✔
1424
  pSyncNode->tmrRoutineNum = 0;
13,529✔
1425

1426
  sNInfo(pSyncNode, "sync node opened, node:%p electInterval:%d heartbeatInterval:%d heartbeatTimeout:%d", pSyncNode,
13,529✔
1427
         tsElectInterval, tsHeartbeatInterval, tsHeartbeatTimeout);
1428
  return pSyncNode;
13,529✔
1429

1430
_error:
×
1431
  if (pSyncInfo->pFsm) {
×
1432
    taosMemoryFree(pSyncInfo->pFsm);
×
1433
    pSyncInfo->pFsm = NULL;
×
1434
  }
1435
  syncNodeClose(pSyncNode);
×
1436
  pSyncNode = NULL;
×
1437
  return NULL;
×
1438
}
1439

1440
#ifdef BUILD_NO_CALL
1441
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
1442
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
1443
    SSnapshot snapshot = {0};
1444
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
1445
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
1446
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
1447
    }
1448
  }
1449
}
1450
#endif
1451

1452
int32_t syncNodeRestore(SSyncNode* pSyncNode) {
13,528✔
1453
  int32_t code = 0;
13,528✔
1454
  if (pSyncNode->pLogStore == NULL) {
13,528!
1455
    sError("vgId:%d, log store not created", pSyncNode->vgId);
×
1456
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1457
  }
1458
  if (pSyncNode->pLogBuf == NULL) {
13,528!
1459
    sError("vgId:%d, ring log buffer not created", pSyncNode->vgId);
×
1460
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1461
  }
1462

1463
  (void)taosThreadMutexLock(&pSyncNode->pLogBuf->mutex);
13,528✔
1464
  SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
13,527✔
1465
  SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore);
13,528✔
1466
  SyncIndex endIndex = pSyncNode->pLogBuf->endIndex;
13,528✔
1467
  (void)taosThreadMutexUnlock(&pSyncNode->pLogBuf->mutex);
13,528✔
1468

1469
  if (lastVer != -1 && endIndex != lastVer + 1) {
13,528!
1470
    code = TSDB_CODE_WAL_LOG_INCOMPLETE;
×
1471
    sWarn("vgId:%d, failed to restore sync node since %s. expected lastLogIndex:%" PRId64 ", lastVer:%" PRId64,
×
1472
          pSyncNode->vgId, terrstr(), endIndex - 1, lastVer);
1473
    // TAOS_RETURN(code);
1474
  }
1475

1476
  // if (endIndex != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
1477
  pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);
13,528✔
1478
  sInfo("vgId:%d, restore began, and keep syncing until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
13,528✔
1479

1480
  if (pSyncNode->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
27,056!
1481
      (code = syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex, NULL, "restore")) < 0) {
13,528✔
1482
    TAOS_RETURN(code);
×
1483
  }
1484

1485
  TAOS_RETURN(code);
13,528✔
1486
}
1487

1488
int32_t syncNodeStart(SSyncNode* pSyncNode) {
13,528✔
1489
  // start raft
1490
  sInfo("vgId:%d, begin to start sync node", pSyncNode->vgId);
13,528✔
1491
  if (pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
13,528✔
1492
    syncNodeBecomeLearner(pSyncNode, "first start");
262✔
1493
  } else {
1494
    if (pSyncNode->replicaNum == 1) {
13,266✔
1495
      raftStoreNextTerm(pSyncNode);
10,360✔
1496
      syncNodeBecomeLeader(pSyncNode, "one replica start");
10,360✔
1497

1498
      // Raft 3.6.2 Committing entries from previous terms
1499
      TAOS_CHECK_RETURN(syncNodeAppendNoop(pSyncNode));
10,360!
1500
    } else {
1501
      SRaftId id = {0};
2,906✔
1502
      syncNodeBecomeFollower(pSyncNode, id, "first start");
2,906✔
1503
    }
1504
  }
1505

1506
  int32_t ret = 0;
13,528✔
1507
  ret = syncNodeStartPingTimer(pSyncNode);
13,528✔
1508
  if (ret != 0) {
13,528!
1509
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, tstrerror(ret));
×
1510
  }
1511
  sInfo("vgId:%d, sync node started", pSyncNode->vgId);
13,528✔
1512
  return ret;
13,528✔
1513
}
1514

1515
#ifdef BUILD_NO_CALL
1516
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
1517
  // state change
1518
  int32_t code = 0;
1519
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
1520
  pSyncNode->roleTimeMs = taosGetTimestampMs();
1521
  // TODO check return value
1522
  TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1523

1524
  // reset elect timer, long enough
1525
  int32_t electMS = TIMER_MAX_MS;
1526
  code = syncNodeRestartElectTimer(pSyncNode, electMS);
1527
  if (code < 0) {
1528
    sError("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, terrstr());
1529
    return -1;
1530
  }
1531

1532
  code = syncNodeStartPingTimer(pSyncNode);
1533
  if (code < 0) {
1534
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr());
1535
    return -1;
1536
  }
1537
  return code;
1538
}
1539
#endif
1540

1541
void syncNodePreClose(SSyncNode* pSyncNode) {
13,528✔
1542
  int32_t code = 0;
13,528✔
1543
  if (pSyncNode == NULL) {
13,528!
1544
    sError("failed to pre close sync node since sync node is null");
×
1545
    return;
×
1546
  }
1547
  if (pSyncNode->pFsm == NULL) {
13,528!
1548
    sError("failed to pre close sync node since fsm is null");
×
1549
    return;
×
1550
  }
1551
  if (pSyncNode->pFsm->FpApplyQueueItems == NULL) {
13,528!
1552
    sError("failed to pre close sync node since FpApplyQueueItems is null");
×
1553
    return;
×
1554
  }
1555

1556
  // stop elect timer
1557
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
13,528!
1558
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1559
    return;
×
1560
  }
1561

1562
  // stop heartbeat timer
1563
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
13,527!
1564
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
1565
    return;
×
1566
  }
1567

1568
  // stop ping timer
1569
  if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) {
13,527!
1570
    sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code));
×
1571
    return;
×
1572
  }
1573

1574
  // clean rsp
1575
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
13,528✔
1576
}
1577

1578
void syncNodePostClose(SSyncNode* pSyncNode) {
11,762✔
1579
  if (pSyncNode->pNewNodeReceiver != NULL) {
11,762!
1580
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
11,762!
1581
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
1582
    }
1583

1584
    sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
11,762✔
1585
           pSyncNode->pNewNodeReceiver);
1586
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
11,762✔
1587
    pSyncNode->pNewNodeReceiver = NULL;
11,761✔
1588
  }
1589
}
11,761✔
1590

1591
void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }
2,415!
1592

1593
void syncNodeClose(SSyncNode* pSyncNode) {
13,527✔
1594
  int32_t code = 0;
13,527✔
1595
  if (pSyncNode == NULL) return;
13,527!
1596
  sNInfo(pSyncNode, "sync close, node:%p", pSyncNode);
13,527✔
1597

1598
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
13,527✔
1599

1600
  if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) {
13,528!
1601
    sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code));
×
1602
    return;
×
1603
  }
1604
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
13,528!
1605
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1606
    return;
×
1607
  }
1608
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
13,528!
1609
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
1610
    return;
×
1611
  }
1612
  syncNodeLogReplDestroy(pSyncNode);
13,528✔
1613

1614
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
13,528✔
1615
  pSyncNode->pSyncRespMgr = NULL;
13,524✔
1616
  voteGrantedDestroy(pSyncNode->pVotesGranted);
13,524✔
1617
  pSyncNode->pVotesGranted = NULL;
13,527✔
1618
  votesRespondDestory(pSyncNode->pVotesRespond);
13,527✔
1619
  pSyncNode->pVotesRespond = NULL;
13,527✔
1620
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
13,527✔
1621
  pSyncNode->pNextIndex = NULL;
13,525✔
1622
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
13,525✔
1623
  pSyncNode->pMatchIndex = NULL;
13,525✔
1624
  logStoreDestory(pSyncNode->pLogStore);
13,525✔
1625
  pSyncNode->pLogStore = NULL;
13,527✔
1626
  syncLogBufferDestroy(pSyncNode->pLogBuf);
13,527✔
1627
  pSyncNode->pLogBuf = NULL;
13,528✔
1628

1629
  (void)taosThreadMutexDestroy(&pSyncNode->arbTokenMutex);
13,528✔
1630

1631
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
216,420✔
1632
    if (pSyncNode->senders[i] != NULL) {
202,892✔
1633
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", pSyncNode->vgId, pSyncNode->senders[i]);
202,890✔
1634

1635
      if (snapshotSenderIsStart(pSyncNode->senders[i])) {
202,892!
1636
        snapshotSenderStop(pSyncNode->senders[i], false);
×
1637
      }
1638

1639
      snapshotSenderDestroy(pSyncNode->senders[i]);
202,887✔
1640
      pSyncNode->senders[i] = NULL;
202,898✔
1641
    }
1642
  }
1643

1644
  if (pSyncNode->pNewNodeReceiver != NULL) {
13,528✔
1645
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
1,766!
1646
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
1647
    }
1648

1649
    sDebug("vgId:%d, snapshot receiver destroy while close, data:%p", pSyncNode->vgId, pSyncNode->pNewNodeReceiver);
1,766✔
1650
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
1,766✔
1651
    pSyncNode->pNewNodeReceiver = NULL;
1,766✔
1652
  }
1653

1654
  if (pSyncNode->pFsm != NULL) {
13,528!
1655
    taosMemoryFree(pSyncNode->pFsm);
13,528!
1656
  }
1657

1658
  raftStoreClose(pSyncNode);
13,528✔
1659

1660
  taosMemoryFree(pSyncNode);
13,528!
1661
}
1662

1663
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->raftCfg.snapshotStrategy; }
×
1664

1665
// timer control --------------
1666
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
13,528✔
1667
  int32_t code = 0;
13,528✔
1668
  if (syncIsInit()) {
13,528!
1669
    bool stopped = taosTmrResetPriority(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, (void*)pSyncNode->rid,
13,528✔
1670
                                        syncEnv()->pTimerManager, &pSyncNode->pPingTimer, 2);
13,528✔
1671
    if (stopped) {
13,528!
1672
      sError("vgId:%d, failed to reset ping timer, ms:%d", pSyncNode->vgId, pSyncNode->pingTimerMS);
×
1673
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1674
    }
1675
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
13,528✔
1676
  } else {
1677
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
×
1678
  }
1679
  return code;
13,528✔
1680
}
1681

1682
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
27,055✔
1683
  int32_t code = 0;
27,055✔
1684
  (void)atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
27,055✔
1685
  bool stop = taosTmrStop(pSyncNode->pPingTimer);
27,056✔
1686
  sDebug("vgId:%d, stop ping timer, stop:%d", pSyncNode->vgId, stop);
27,056✔
1687
  pSyncNode->pPingTimer = NULL;
27,056✔
1688
  return code;
27,056✔
1689
}
1690

1691
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
393,041✔
1692
  int32_t code = 0;
393,041✔
1693
  if (syncIsInit()) {
393,041!
1694
    pSyncNode->electTimerMS = ms;
393,041✔
1695

1696
    int64_t execTime = taosGetTimestampMs() + ms;
393,040✔
1697
    atomic_store_64(&(pSyncNode->electTimerParam.executeTime), execTime);
393,040✔
1698
    atomic_store_64(&(pSyncNode->electTimerParam.logicClock), pSyncNode->electTimerLogicClock);
393,042✔
1699
    pSyncNode->electTimerParam.pSyncNode = pSyncNode;
393,043✔
1700
    pSyncNode->electTimerParam.pData = NULL;
393,043✔
1701

1702
    bool stopped = taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid),
393,043✔
1703
                                syncEnv()->pTimerManager, &pSyncNode->pElectTimer);
393,043✔
1704
    if (stopped) sWarn("vgId:%d, failed to reset elect timer, ms:%d", pSyncNode->vgId, ms);
393,044!
1705
  } else {
1706
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
×
1707
  }
1708
  return code;
393,044✔
1709
}
1710

1711
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
431,461✔
1712
  int32_t code = 0;
431,461✔
1713
  (void)atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
431,461✔
1714
  bool stop = taosTmrStop(pSyncNode->pElectTimer);
431,460✔
1715
  sTrace("vgId:%d, stop elect timer, stop:%d", pSyncNode->vgId, stop);
431,458✔
1716
  pSyncNode->pElectTimer = NULL;
431,459✔
1717

1718
  return code;
431,459✔
1719
}
1720

1721
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
393,044✔
1722
  int32_t ret = 0;
393,044✔
1723
  TAOS_CHECK_RETURN(syncNodeStopElectTimer(pSyncNode));
393,044!
1724
  TAOS_CHECK_RETURN(syncNodeStartElectTimer(pSyncNode, ms));
393,043!
1725
  return ret;
393,044✔
1726
}
1727

1728
void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
393,040✔
1729
  int32_t code = 0;
393,040✔
1730
  int32_t electMS;
1731

1732
  if (pSyncNode->raftCfg.isStandBy) {
393,040!
1733
    electMS = TIMER_MAX_MS;
×
1734
  } else {
1735
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
393,040✔
1736
  }
1737

1738
  if ((code = syncNodeRestartElectTimer(pSyncNode, electMS)) != 0) {
393,044!
1739
    sWarn("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1740
    return;
×
1741
  };
1742

1743
  sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
393,044!
1744
          electMS);
1745
}
1746

1747
#ifdef BUILD_NO_CALL
1748
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
1749
  int32_t code = 0;
1750
  if (syncIsInit()) {
1751
    TAOS_CHECK_RETURN(taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, (void*)pSyncNode->rid,
1752
                                   syncEnv()->pTimerManager, &pSyncNode->pHeartbeatTimer));
1753
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
1754
  } else {
1755
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1756
  }
1757

1758
  sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
1759
  return code;
1760
}
1761
#endif
1762

1763
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
12,840✔
1764
  int32_t ret = 0;
12,840✔
1765

1766
#if 0
1767
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1768
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
1769
#endif
1770

1771
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
15,259✔
1772
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
2,419✔
1773
    if (pSyncTimer != NULL) {
2,419!
1774
      TAOS_CHECK_RETURN(syncHbTimerStart(pSyncNode, pSyncTimer));
2,419!
1775
    }
1776
  }
1777

1778
  return ret;
12,840✔
1779
}
1780

1781
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
33,600✔
1782
  int32_t code = 0;
33,600✔
1783

1784
#if 0
1785
  TAOS_CHECK_RETURN(atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1));
1786
  bool stop = taosTmrStop(pSyncNode->pHeartbeatTimer);
1787
  sDebug("vgId:%d, stop heartbeat timer, stop:%d", pSyncNode->vgId, stop);
1788
  pSyncNode->pHeartbeatTimer = NULL;
1789
#endif
1790

1791
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
58,025✔
1792
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
24,426✔
1793
    if (pSyncTimer != NULL) {
24,425!
1794
      TAOS_CHECK_RETURN(syncHbTimerStop(pSyncNode, pSyncTimer));
24,425!
1795
    }
1796
  }
1797

1798
  return code;
33,599✔
1799
}
1800

1801
#ifdef BUILD_NO_CALL
1802
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
1803
  // TODO check return value
1804
  int32_t code = 0;
1805
  TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1806
  TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
1807
  return 0;
1808
}
1809
#endif
1810

1811
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pNode, SRpcMsg* pMsg) {
2,465,904✔
1812
  SEpSet* epSet = NULL;
2,465,904✔
1813
  for (int32_t i = 0; i < pNode->peersNum; ++i) {
2,850,130✔
1814
    if (destRaftId->addr == pNode->peersId[i].addr) {
2,850,086✔
1815
      epSet = &pNode->peersEpset[i];
2,465,860✔
1816
      break;
2,465,860✔
1817
    }
1818
  }
1819

1820
  int32_t code = -1;
2,465,904✔
1821
  if (pNode->syncSendMSg != NULL && epSet != NULL) {
2,465,904!
1822
    syncUtilMsgHtoN(pMsg->pCont);
2,465,865✔
1823
    pMsg->info.noResp = 1;
2,465,845✔
1824
    code = pNode->syncSendMSg(epSet, pMsg);
2,465,845✔
1825
  }
1826

1827
  if (code < 0) {
2,465,948✔
1828
    sError("vgId:%d, failed to send sync msg since %s. epset:%p dnode:%d addr:0x%" PRIx64, pNode->vgId, tstrerror(code),
80!
1829
           epSet, DID(destRaftId), destRaftId->addr);
1830
    rpcFreeCont(pMsg->pCont);
80✔
1831
  }
1832

1833
  TAOS_RETURN(code);
2,465,948✔
1834
}
1835

1836
inline bool syncNodeInConfig(SSyncNode* pNode, const SSyncCfg* pCfg) {
2,284✔
1837
  bool b1 = false;
2,284✔
1838
  bool b2 = false;
2,284✔
1839

1840
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
2,764!
1841
    if (strcmp(pCfg->nodeInfo[i].nodeFqdn, pNode->myNodeInfo.nodeFqdn) == 0 &&
2,764!
1842
        pCfg->nodeInfo[i].nodePort == pNode->myNodeInfo.nodePort) {
2,764✔
1843
      b1 = true;
2,284✔
1844
      break;
2,284✔
1845
    }
1846
  }
1847

1848
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
2,764!
1849
    SRaftId raftId = {
2,764✔
1850
        .addr = SYNC_ADDR(&pCfg->nodeInfo[i]),
2,764✔
1851
        .vgId = pNode->vgId,
2,764✔
1852
    };
1853

1854
    if (syncUtilSameId(&raftId, &pNode->myRaftId)) {
2,764✔
1855
      b2 = true;
2,284✔
1856
      break;
2,284✔
1857
    }
1858
  }
1859

1860
  if (b1 != b2) {
2,284!
1861
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1862
    return false;
×
1863
  }
1864
  return b1;
2,284✔
1865
}
1866

1867
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
3,062✔
1868
  if (pOldCfg->totalReplicaNum != pNewCfg->totalReplicaNum) return true;
3,062✔
1869
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
2,083✔
1870
  for (int32_t i = 0; i < pOldCfg->totalReplicaNum; ++i) {
5,002✔
1871
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
3,627✔
1872
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
3,627✔
1873
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
3,627!
1874
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
3,627✔
1875
    if (pOldInfo->nodeRole != pNewInfo->nodeRole) return true;
3,625✔
1876
  }
1877

1878
  return false;
1,375✔
1879
}
1880

1881
int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
1,678✔
1882
  int32_t  code = 0;
1,678✔
1883
  SSyncCfg oldConfig = pSyncNode->raftCfg.cfg;
1,678✔
1884
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
1,678✔
1885
    sInfo("vgId:1, sync not reconfig since not changed");
1,375✔
1886
    return 0;
1,375✔
1887
  }
1888

1889
  pSyncNode->raftCfg.cfg = *pNewConfig;
303✔
1890
  pSyncNode->raftCfg.lastConfigIndex = lastConfigChangeIndex;
303✔
1891

1892
  pSyncNode->configChangeNum++;
303✔
1893

1894
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
303✔
1895
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
303✔
1896

1897
  bool isDrop = false;
303✔
1898
  bool isAdd = false;
303✔
1899

1900
  if (IamInOld && !IamInNew) {
303!
1901
    isDrop = true;
×
1902
  } else {
1903
    isDrop = false;
303✔
1904
  }
1905

1906
  if (!IamInOld && IamInNew) {
303!
1907
    isAdd = true;
×
1908
  } else {
1909
    isAdd = false;
303✔
1910
  }
1911

1912
  // log begin config change
1913
  sNInfo(pSyncNode, "begin do config change, from %d to %d, from %" PRId64 " to %" PRId64 ", replicas:%d",
303!
1914
         pSyncNode->vgId, oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum, oldConfig.lastIndex,
1915
         pNewConfig->lastIndex);
1916

1917
  if (IamInNew) {
303!
1918
    pSyncNode->raftCfg.isStandBy = 0;  // change isStandBy to normal
303✔
1919
  }
1920
  if (isDrop) {
303!
1921
    pSyncNode->raftCfg.isStandBy = 1;  // set standby
×
1922
  }
1923

1924
  // add last config index
1925
  SRaftCfg* pCfg = &pSyncNode->raftCfg;
303✔
1926
  if (pCfg->configIndexCount >= MAX_CONFIG_INDEX_COUNT) {
303!
1927
    sNError(pSyncNode, "failed to add cfg index:%d since out of range", pCfg->configIndexCount);
×
1928
    terrno = TSDB_CODE_OUT_OF_RANGE;
×
1929
    return -1;
×
1930
  }
1931

1932
  pCfg->configIndexArr[pCfg->configIndexCount] = lastConfigChangeIndex;
303✔
1933
  pCfg->configIndexCount++;
303✔
1934

1935
  if (IamInNew) {
303!
1936
    //-----------------------------------------
1937
    int32_t ret = 0;
303✔
1938

1939
    // save snapshot senders
1940
    SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
1941
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
303✔
1942
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
1943
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
4,848✔
1944
      oldSenders[i] = pSyncNode->senders[i];
4,545✔
1945
      sSTrace(oldSenders[i], "snapshot sender save old");
4,545!
1946
    }
1947

1948
    // init internal
1949
    pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
303✔
1950
    if (syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId) == false) return terrno;
303!
1951

1952
    // init peersNum, peers, peersId
1953
    pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
303✔
1954
    int32_t j = 0;
303✔
1955
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
1,098✔
1956
      if (i != pSyncNode->raftCfg.cfg.myIndex) {
795✔
1957
        pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
492✔
1958
        syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
492✔
1959
        j++;
492✔
1960
      }
1961
    }
1962
    for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
795✔
1963
      if (syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]) == false)
492!
1964
        return terrno;
×
1965
    }
1966

1967
    // init replicaNum, replicasId
1968
    pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
303✔
1969
    pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
303✔
1970
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
1,098✔
1971
      if (syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]) ==
795!
1972
          false)
1973
        return terrno;
×
1974
    }
1975

1976
    // update quorum first
1977
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
303✔
1978

1979
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
303✔
1980
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
303✔
1981
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
303✔
1982
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
303✔
1983

1984
    // reset snapshot senders
1985

1986
    // clear new
1987
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
4,848✔
1988
      pSyncNode->senders[i] = NULL;
4,545✔
1989
    }
1990

1991
    // reset new
1992
    for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
1,098✔
1993
      // reset sender
1994
      bool reset = false;
795✔
1995
      for (int32_t j = 0; j < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++j) {
3,396✔
1996
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j]) && oldSenders[j] != NULL) {
3,253!
1997
          sNTrace(pSyncNode, "snapshot sender reset for:%" PRId64 ", newIndex:%d, dnode:%d, %p",
652!
1998
                  (pSyncNode->replicasId)[i].addr, i, DID(&pSyncNode->replicasId[i]), oldSenders[j]);
1999

2000
          pSyncNode->senders[i] = oldSenders[j];
652✔
2001
          oldSenders[j] = NULL;
652✔
2002
          reset = true;
652✔
2003

2004
          // reset replicaIndex
2005
          int32_t oldreplicaIndex = pSyncNode->senders[i]->replicaIndex;
652✔
2006
          pSyncNode->senders[i]->replicaIndex = i;
652✔
2007

2008
          sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, dnode:%d, %p, reset:%d",
652!
2009
                  oldreplicaIndex, i, DID(&pSyncNode->replicasId[i]), pSyncNode->senders[i], reset);
2010

2011
          break;
652✔
2012
        }
2013
      }
2014
    }
2015

2016
    // create new
2017
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
4,848✔
2018
      if (pSyncNode->senders[i] == NULL) {
4,545✔
2019
        TAOS_CHECK_RETURN(snapshotSenderCreate(pSyncNode, i, &pSyncNode->senders[i]));
3,893!
2020
        if (pSyncNode->senders[i] == NULL) {
3,893!
2021
          // will be created later while send snapshot
2022
          sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig");
×
2023
        } else {
2024
          sSDebug(pSyncNode->senders[i], "snapshot sender create while reconfig, data:%p", pSyncNode->senders[i]);
3,893✔
2025
        }
2026
      } else {
2027
        sSDebug(pSyncNode->senders[i], "snapshot sender already exist, data:%p", pSyncNode->senders[i]);
652✔
2028
      }
2029
    }
2030

2031
    // free old
2032
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
4,848✔
2033
      if (oldSenders[i] != NULL) {
4,545✔
2034
        sSDebug(oldSenders[i], "snapshot sender destroy old, data:%p replica-index:%d", oldSenders[i], i);
3,893✔
2035
        snapshotSenderDestroy(oldSenders[i]);
3,893✔
2036
        oldSenders[i] = NULL;
3,893✔
2037
      }
2038
    }
2039

2040
    // persist cfg
2041
    TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode));
303!
2042
  } else {
2043
    // persist cfg
2044
    TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode));
×
2045
    sNInfo(pSyncNode, "do not config change from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
×
2046
  }
2047

2048
_END:
×
2049
  // log end config change
2050
  sNInfo(pSyncNode, "end do config change, from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
303!
2051
  return 0;
303✔
2052
}
2053

2054
// raft state change --------------
2055
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
6,426✔
2056
  if (term > raftStoreGetTerm(pSyncNode)) {
6,426!
2057
    raftStoreSetTerm(pSyncNode, term);
×
2058
  }
2059
}
6,426✔
2060

2061
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm, SRaftId id) {
343,253✔
2062
  SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
343,253✔
2063
  if (currentTerm > newTerm) {
343,254!
2064
    sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
×
2065
    return;
×
2066
  }
2067

2068
  do {
2069
    sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
343,254!
2070
  } while (0);
2071

2072
  if (pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
343,254!
2073
    (void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
×
2074
    syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncNode->vgId, pSyncNode->arbToken);
×
2075
    sInfo("vgId:%d, step down as assigned leader, new arbToken:%s", pSyncNode->vgId, pSyncNode->arbToken);
×
2076
    (void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
×
2077
  }
2078

2079
  if (currentTerm < newTerm) {
343,252✔
2080
    raftStoreSetTerm(pSyncNode, newTerm);
2,142✔
2081
    char tmpBuf[64];
2082
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
2,142✔
2083
    syncNodeBecomeFollower(pSyncNode, id, tmpBuf);
2,142✔
2084
    raftStoreClearVote(pSyncNode);
2,142✔
2085
  } else {
2086
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
341,110✔
2087
      syncNodeBecomeFollower(pSyncNode, id, "step down");
3✔
2088
    }
2089
  }
2090
}
2091

2092
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }
5,066✔
2093

2094
void syncNodeBecomeFollower(SSyncNode* pSyncNode, SRaftId leaderId, const char* debugStr) {
5,066✔
2095
  int32_t code = 0;  // maybe clear leader cache
5,066✔
2096
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
5,066✔
2097
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
30✔
2098
  }
2099

2100
  pSyncNode->hbSlowNum = 0;
5,066✔
2101

2102
  pSyncNode->leaderCache = leaderId;  // state change
5,066✔
2103
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
5,066✔
2104
  pSyncNode->roleTimeMs = taosGetTimestampMs();
5,066✔
2105
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
5,066!
2106
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2107
    return;
×
2108
  }
2109

2110
  // trace log
2111
  sNTrace(pSyncNode, "become follower %s", debugStr);
5,066!
2112

2113
  // send rsp to client
2114
  syncNodeLeaderChangeRsp(pSyncNode);
5,066✔
2115

2116
  // call back
2117
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
5,066!
2118
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
5,066✔
2119
  }
2120

2121
  // min match index
2122
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
5,066✔
2123

2124
  // reset log buffer
2125
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
5,066!
2126
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2127
    return;
×
2128
  }
2129

2130
  // reset elect timer
2131
  syncNodeResetElectTimer(pSyncNode);
5,065✔
2132

2133
  sInfo("vgId:%d, become follower. %s", pSyncNode->vgId, debugStr);
5,066!
2134
}
2135

2136
void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) {
262✔
2137
  pSyncNode->hbSlowNum = 0;
262✔
2138

2139
  // state change
2140
  pSyncNode->state = TAOS_SYNC_STATE_LEARNER;
262✔
2141
  pSyncNode->roleTimeMs = taosGetTimestampMs();
262✔
2142

2143
  // trace log
2144
  sNTrace(pSyncNode, "become learner %s", debugStr);
262!
2145

2146
  // call back
2147
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLearnerCb != NULL) {
262!
2148
    pSyncNode->pFsm->FpBecomeLearnerCb(pSyncNode->pFsm);
262✔
2149
  }
2150

2151
  // min match index
2152
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
262✔
2153

2154
  // reset log buffer
2155
  int32_t code = 0;
262✔
2156
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
262!
2157
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2158
    return;
×
2159
  };
2160
}
2161

2162
// TLA+ Spec
2163
// \* Candidate i transitions to leader.
2164
// BecomeLeader(i) ==
2165
//     /\ state[i] = Candidate
2166
//     /\ votesGranted[i] \in Quorum
2167
//     /\ state'      = [state EXCEPT ![i] = Leader]
2168
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
2169
//                          [j \in Server |-> Len(log[i]) + 1]]
2170
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
2171
//                          [j \in Server |-> 0]]
2172
//     /\ elections'  = elections \cup
2173
//                          {[eterm     |-> currentTerm[i],
2174
//                            eleader   |-> i,
2175
//                            elog      |-> log[i],
2176
//                            evotes    |-> votesGranted[i],
2177
//                            evoterLog |-> voterLog[i]]}
2178
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
2179
//
2180
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
11,361✔
2181
  int32_t code = 0;
11,361✔
2182
  pSyncNode->becomeLeaderNum++;
11,361✔
2183
  pSyncNode->hbrSlowNum = 0;
11,361✔
2184

2185
  // reset restoreFinish
2186
  pSyncNode->restoreFinish = false;
11,361✔
2187

2188
  // state change
2189
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
11,361✔
2190
  pSyncNode->roleTimeMs = taosGetTimestampMs();
11,361✔
2191

2192
  // set leader cache
2193
  pSyncNode->leaderCache = pSyncNode->myRaftId;
11,361✔
2194

2195
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
24,689✔
2196
    SyncIndex lastIndex;
2197
    SyncTerm  lastTerm;
2198
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
13,328✔
2199
    if (code != 0) {
13,328!
2200
      sError("vgId:%d, failed to become leader since %s", pSyncNode->vgId, tstrerror(code));
×
2201
      return;
×
2202
    }
2203
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
13,328✔
2204
  }
2205

2206
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
24,689✔
2207
    // maybe overwrite myself, no harm
2208
    // just do it!
2209
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
13,328✔
2210
  }
2211

2212
  // init peer mgr
2213
  if ((code = syncNodePeerStateInit(pSyncNode)) != 0) {
11,361!
2214
    sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code));
×
2215
    return;
×
2216
  }
2217

2218
#if 0
2219
  // update sender private term
2220
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
2221
  if (pMySender != NULL) {
2222
    for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
2223
      if (pSyncNode->senders[i]->privateTerm > pMySender->privateTerm) {
2224
        pMySender->privateTerm = pSyncNode->senders[i]->privateTerm;
2225
      }
2226
    }
2227
    (pMySender->privateTerm) += 100;
2228
  }
2229
#endif
2230

2231
  // close receiver
2232
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
11,361!
2233
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
2234
  }
2235

2236
  // stop elect timer
2237
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
11,361!
2238
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
2239
    return;
×
2240
  }
2241

2242
  // start heartbeat timer
2243
  if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) {
11,361!
2244
    sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2245
    return;
×
2246
  }
2247

2248
  // send heartbeat right now
2249
  if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) {
11,361!
2250
    sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code));
×
2251
    return;
×
2252
  }
2253

2254
  // call back
2255
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
11,361!
2256
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
11,361✔
2257
  }
2258

2259
  // min match index
2260
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
11,361✔
2261

2262
  // reset log buffer
2263
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
11,361!
2264
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2265
    return;
×
2266
  }
2267

2268
  // trace log
2269
  sNInfo(pSyncNode, "become leader %s", debugStr);
11,361✔
2270
}
2271

2272
void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) {
×
2273
  int32_t code = 0;
×
2274
  pSyncNode->becomeAssignedLeaderNum++;
×
2275
  pSyncNode->hbrSlowNum = 0;
×
2276

2277
  // reset restoreFinish
2278
  // pSyncNode->restoreFinish = false;
2279

2280
  // state change
2281
  pSyncNode->state = TAOS_SYNC_STATE_ASSIGNED_LEADER;
×
2282
  pSyncNode->roleTimeMs = taosGetTimestampMs();
×
2283

2284
  // set leader cache
2285
  pSyncNode->leaderCache = pSyncNode->myRaftId;
×
2286

2287
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
×
2288
    SyncIndex lastIndex;
2289
    SyncTerm  lastTerm;
2290
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
×
2291
    if (code != 0) {
×
2292
      sError("vgId:%d, failed to become assigned leader since %s", pSyncNode->vgId, tstrerror(code));
×
2293
      return;
×
2294
    }
2295
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
×
2296
  }
2297

2298
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
×
2299
    // maybe overwrite myself, no harm
2300
    // just do it!
2301
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
×
2302
  }
2303

2304
  // init peer mgr
2305
  if ((code = syncNodePeerStateInit(pSyncNode)) != 0) {
×
2306
    sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code));
×
2307
    return;
×
2308
  }
2309

2310
  // close receiver
2311
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
×
2312
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
2313
  }
2314

2315
  // stop elect timer
2316
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
×
2317
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
2318
    return;
×
2319
  }
2320

2321
  // start heartbeat timer
2322
  if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) {
×
2323
    sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2324
    return;
×
2325
  }
2326

2327
  // send heartbeat right now
2328
  if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) {
×
2329
    sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code));
×
2330
    return;
×
2331
  }
2332

2333
  // call back
2334
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeAssignedLeaderCb != NULL) {
×
2335
    pSyncNode->pFsm->FpBecomeAssignedLeaderCb(pSyncNode->pFsm);
×
2336
  }
2337

2338
  // min match index
2339
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
×
2340

2341
  // reset log buffer
2342
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
×
2343
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2344
    return;
×
2345
  }
2346

2347
  // trace log
2348
  sNInfo(pSyncNode, "become assigned leader");
×
2349
}
2350

2351
void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
1,001✔
2352
  if (pSyncNode->state != TAOS_SYNC_STATE_CANDIDATE) {
1,001!
2353
    sError("vgId:%d, failed leader from candidate since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state);
×
2354
    return;
×
2355
  }
2356
  bool granted = voteGrantedMajority(pSyncNode->pVotesGranted);
1,001✔
2357
  if (!granted) {
1,001!
2358
    sError("vgId:%d, not granted by majority.", pSyncNode->vgId);
×
2359
    return;
×
2360
  }
2361
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");
1,001✔
2362

2363
  sNTrace(pSyncNode, "state change syncNodeCandidate2Leader");
1,001!
2364

2365
  int32_t ret = syncNodeAppendNoop(pSyncNode);
1,001✔
2366
  if (ret < 0) {
1,001!
2367
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, terrstr());
×
2368
  }
2369

2370
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
1,001✔
2371

2372
  sInfo("vgId:%d, become leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64, pSyncNode->vgId,
1,001!
2373
        raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
2374
}
2375

2376
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }
508,857✔
2377

2378
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
24,884✔
2379
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
398,223✔
2380
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
373,339✔
2381
    pSyncNode->peerStates[i].lastSendTime = 0;
373,339✔
2382
  }
2383

2384
  return 0;
24,884✔
2385
}
2386

2387
void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
1,279✔
2388
  if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
1,279!
2389
    sError("vgId:%d, failed candidate from follower since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state);
×
2390
    return;
×
2391
  }
2392
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
1,279✔
2393
  pSyncNode->roleTimeMs = taosGetTimestampMs();
1,279✔
2394
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
1,279✔
2395
  sInfo("vgId:%d, become candidate from follower. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
1,279!
2396
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
2397

2398
  sNTrace(pSyncNode, "follower to candidate");
1,279!
2399
}
2400

2401
int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) {
×
2402
  if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) return TSDB_CODE_SYN_INTERNAL_ERROR;
×
2403
  syncNodeBecomeLeader(pSyncNode, "assigned leader to leader");
×
2404

2405
  sNTrace(pSyncNode, "assigned leader to leader");
×
2406

2407
  int32_t ret = syncNodeAppendNoop(pSyncNode);
×
2408
  if (ret < 0) {
×
2409
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, tstrerror(ret));
×
2410
  }
2411

2412
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
×
2413
  sInfo("vgId:%d, become leader from assigned leader. term:%" PRId64 ", commit index:%" PRId64
×
2414
        "assigned commit index:%" PRId64 ", last index:%" PRId64,
2415
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, pSyncNode->assignedCommitIndex,
2416
        lastIndex);
2417
  return 0;
×
2418
}
2419

2420
// just called by syncNodeVoteForSelf
2421
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
1,351✔
2422
  SyncTerm storeTerm = raftStoreGetTerm(pSyncNode);
1,351✔
2423
  if (term != storeTerm) {
1,351!
2424
    sError("vgId:%d, failed to vote for term, term:%" PRId64 ", storeTerm:%" PRId64, pSyncNode->vgId, term, storeTerm);
×
2425
    return;
×
2426
  }
2427
  sTrace("vgId:%d, begin hasVoted", pSyncNode->vgId);
1,351!
2428
  bool voted = raftStoreHasVoted(pSyncNode);
1,351✔
2429
  if (voted) {
1,351!
2430
    sError("vgId:%d, failed to vote for term since not voted", pSyncNode->vgId);
×
2431
    return;
×
2432
  }
2433

2434
  raftStoreVote(pSyncNode, pRaftId);
1,351✔
2435
}
2436

2437
// simulate get vote from outside
2438
void syncNodeVoteForSelf(SSyncNode* pSyncNode, SyncTerm currentTerm) {
1,351✔
2439
  syncNodeVoteForTerm(pSyncNode, currentTerm, &pSyncNode->myRaftId);
1,351✔
2440

2441
  SRpcMsg rpcMsg = {0};
1,351✔
2442
  int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
1,351✔
2443
  if (ret != 0) return;
1,351!
2444

2445
  SyncRequestVoteReply* pMsg = rpcMsg.pCont;
1,351✔
2446
  pMsg->srcId = pSyncNode->myRaftId;
1,351✔
2447
  pMsg->destId = pSyncNode->myRaftId;
1,351✔
2448
  pMsg->term = currentTerm;
1,351✔
2449
  pMsg->voteGranted = true;
1,351✔
2450

2451
  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
1,351✔
2452
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
1,351✔
2453
  rpcFreeCont(rpcMsg.pCont);
1,351✔
2454
}
2455

2456
// return if has a snapshot
2457
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
18,016✔
2458
  bool      ret = false;
18,016✔
2459
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
18,016✔
2460
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
18,016!
2461
    // TODO check return value
2462
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
18,016✔
2463
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
18,016✔
2464
      ret = true;
2,227✔
2465
    }
2466
  }
2467
  return ret;
18,016✔
2468
}
2469

2470
// return max(logLastIndex, snapshotLastIndex)
2471
// if no snapshot and log, return -1
2472
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
18,113✔
2473
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
18,113✔
2474
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
18,113!
2475
    // TODO check return value
2476
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
18,113✔
2477
  }
2478
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
18,113✔
2479

2480
  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
18,113✔
2481
  return lastIndex;
18,113✔
2482
}
2483

2484
// return the last term of snapshot and log
2485
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
2486
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
18,016✔
2487
  SyncTerm lastTerm = 0;
18,016✔
2488
  if (syncNodeHasSnapshot(pSyncNode)) {
18,016✔
2489
    // has snapshot
2490
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
2,227✔
2491
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
2,227!
2492
      // TODO check return value
2493
      (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
2,227✔
2494
    }
2495

2496
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
2,227✔
2497
    if (logLastIndex > snapshot.lastApplyIndex) {
2,227✔
2498
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
1,289✔
2499
    } else {
2500
      lastTerm = snapshot.lastApplyTerm;
938✔
2501
    }
2502

2503
  } else {
2504
    // no snapshot
2505
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
15,789✔
2506
  }
2507

2508
  return lastTerm;
18,016✔
2509
}
2510

2511
// get last index and term along with snapshot
2512
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
15,906✔
2513
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
15,906✔
2514
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
15,906✔
2515
  return 0;
15,906✔
2516
}
2517

2518
#ifdef BUILD_NO_CALL
2519
// return append-entries first try index
2520
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
2521
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
2522
  return syncStartIndex;
2523
}
2524

2525
// if index > 0, return index - 1
2526
// else, return -1
2527
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
2528
  SyncIndex preIndex = index - 1;
2529
  if (preIndex < SYNC_INDEX_INVALID) {
2530
    preIndex = SYNC_INDEX_INVALID;
2531
  }
2532

2533
  return preIndex;
2534
}
2535

2536
// if index < 0, return SYNC_TERM_INVALID
2537
// if index == 0, return 0
2538
// if index > 0, return preTerm
2539
// if error, return SYNC_TERM_INVALID
2540
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
2541
  if (index < SYNC_INDEX_BEGIN) {
2542
    return SYNC_TERM_INVALID;
2543
  }
2544

2545
  if (index == SYNC_INDEX_BEGIN) {
2546
    return 0;
2547
  }
2548

2549
  SyncTerm  preTerm = 0;
2550
  SyncIndex preIndex = index - 1;
2551

2552
  SSyncRaftEntry* pPreEntry = NULL;
2553
  SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
2554
  LRUHandle*      h = taosLRUCacheLookup(pCache, &preIndex, sizeof(preIndex));
2555
  int32_t         code = 0;
2556
  if (h) {
2557
    pPreEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
2558
    code = 0;
2559

2560
    pSyncNode->pLogStore->cacheHit++;
2561
    sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", preIndex, pPreEntry->bytes, pPreEntry);
2562

2563
  } else {
2564
    pSyncNode->pLogStore->cacheMiss++;
2565
    sNTrace(pSyncNode, "miss cache index:%" PRId64, preIndex);
2566

2567
    code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
2568
  }
2569

2570
  SSnapshot snapshot = {.data = NULL,
2571
                        .lastApplyIndex = SYNC_INDEX_INVALID,
2572
                        .lastApplyTerm = SYNC_TERM_INVALID,
2573
                        .lastConfigIndex = SYNC_INDEX_INVALID};
2574

2575
  if (code == 0) {
2576
    if (pPreEntry == NULL) return -1;
2577
    preTerm = pPreEntry->term;
2578

2579
    if (h) {
2580
      taosLRUCacheRelease(pCache, h, false);
2581
    } else {
2582
      syncEntryDestroy(pPreEntry);
2583
    }
2584

2585
    return preTerm;
2586
  } else {
2587
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
2588
      // TODO check return value
2589
      (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
2590
      if (snapshot.lastApplyIndex == preIndex) {
2591
        return snapshot.lastApplyTerm;
2592
      }
2593
    }
2594
  }
2595

2596
  sNError(pSyncNode, "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRId64,
2597
          index, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
2598
  return SYNC_TERM_INVALID;
2599
}
2600

2601
// get pre index and term of "index"
2602
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
2603
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
2604
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
2605
  return 0;
2606
}
2607
#endif
2608

2609
static void syncNodeEqPingTimer(void* param, void* tmrId) {
239,310✔
2610
  if (!syncIsInit()) return;
239,310!
2611

2612
  int64_t    rid = (int64_t)param;
239,310✔
2613
  SSyncNode* pNode = syncNodeAcquire(rid);
239,310✔
2614

2615
  if (pNode == NULL) return;
239,310!
2616

2617
  if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
239,310!
2618
    SRpcMsg rpcMsg = {0};
239,310✔
2619
    int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
239,310✔
2620
                                    pNode->pingTimerMS, pNode);
2621
    if (code != 0) {
239,310!
2622
      sError("failed to build ping msg");
×
2623
      rpcFreeCont(rpcMsg.pCont);
×
2624
      goto _out;
×
2625
    }
2626

2627
    // sTrace("enqueue ping msg");
2628
    code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
239,310✔
2629
    if (code != 0) {
239,310!
UNCOV
2630
      sError("failed to sync enqueue ping msg since %s", terrstr());
×
UNCOV
2631
      rpcFreeCont(rpcMsg.pCont);
×
UNCOV
2632
      goto _out;
×
2633
    }
2634

2635
  _out:
239,310✔
2636
    if (taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
239,310!
2637
                     &pNode->pPingTimer))
2638
      sError("failed to reset ping timer");
×
2639
  }
2640
  syncNodeRelease(pNode);
239,310✔
2641
}
2642

2643
static void syncNodeEqElectTimer(void* param, void* tmrId) {
1,356✔
2644
  if (!syncIsInit()) return;
1,359!
2645

2646
  int64_t    rid = (int64_t)param;
1,356✔
2647
  SSyncNode* pNode = syncNodeAcquire(rid);
1,356✔
2648

2649
  if (pNode == NULL) return;
1,356✔
2650

2651
  if (pNode->syncEqMsg == NULL) {
1,354!
2652
    syncNodeRelease(pNode);
×
2653
    return;
×
2654
  }
2655

2656
  int64_t tsNow = taosGetTimestampMs();
1,354✔
2657
  if (tsNow < pNode->electTimerParam.executeTime) {
1,354✔
2658
    syncNodeRelease(pNode);
1✔
2659
    return;
1✔
2660
  }
2661

2662
  SRpcMsg rpcMsg = {0};
1,353✔
2663
  int32_t code =
2664
      syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerParam.logicClock, pNode->electTimerMS, pNode);
1,353✔
2665

2666
  if (code != 0) {
1,353!
2667
    sError("failed to build elect msg");
×
2668
    syncNodeRelease(pNode);
×
2669
    return;
×
2670
  }
2671

2672
  SyncTimeout* pTimeout = rpcMsg.pCont;
1,353✔
2673
  sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);
1,353!
2674

2675
  code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
1,353✔
2676
  if (code != 0) {
1,353!
2677
    sError("failed to sync enqueue elect msg since %s", terrstr());
×
2678
    rpcFreeCont(rpcMsg.pCont);
×
2679
    syncNodeRelease(pNode);
×
2680
    return;
×
2681
  }
2682

2683
  syncNodeRelease(pNode);
1,353✔
2684
}
2685

2686
#ifdef BUILD_NO_CALL
2687
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
2688
  if (!syncIsInit()) return;
2689

2690
  int64_t    rid = (int64_t)param;
2691
  SSyncNode* pNode = syncNodeAcquire(rid);
2692

2693
  if (pNode == NULL) return;
2694

2695
  if (pNode->totalReplicaNum > 1) {
2696
    if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
2697
      SRpcMsg rpcMsg = {0};
2698
      int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
2699
                                      pNode->heartbeatTimerMS, pNode);
2700

2701
      if (code != 0) {
2702
        sError("failed to build heartbeat msg");
2703
        goto _out;
2704
      }
2705

2706
      sTrace("vgId:%d, enqueue heartbeat timer", pNode->vgId);
2707
      code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
2708
      if (code != 0) {
2709
        sError("failed to enqueue heartbeat msg since %s", terrstr());
2710
        rpcFreeCont(rpcMsg.pCont);
2711
        goto _out;
2712
      }
2713

2714
    _out:
2715
      if (taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
2716
                       &pNode->pHeartbeatTimer) != 0)
2717
        return;
2718

2719
    } else {
2720
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
2721
             pNode->heartbeatTimerLogicClock, pNode->heartbeatTimerLogicClockUser);
2722
    }
2723
  }
2724
}
2725
#endif
2726

2727
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
42,630✔
2728
  int32_t code = 0;
42,630✔
2729
  int64_t hbDataRid = (int64_t)param;
42,630✔
2730
  int64_t tsNow = taosGetTimestampMs();
42,630✔
2731

2732
  SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid);
42,630✔
2733
  if (pData == NULL) {
42,630!
2734
    sError("hb timer get pData NULL, %" PRId64, hbDataRid);
×
2735
    return;
×
2736
  }
2737

2738
  SSyncNode* pSyncNode = syncNodeAcquire(pData->syncNodeRid);
42,630✔
2739
  if (pSyncNode == NULL) {
42,630✔
2740
    syncHbTimerDataRelease(pData);
3✔
2741
    sError("hb timer get pSyncNode NULL");
3!
2742
    return;
3✔
2743
  }
2744

2745
  SSyncTimer* pSyncTimer = pData->pTimer;
42,627✔
2746

2747
  if (!pSyncNode->isStart) {
42,627!
2748
    syncNodeRelease(pSyncNode);
×
2749
    syncHbTimerDataRelease(pData);
×
2750
    sError("vgId:%d, hb timer sync node already stop", pSyncNode->vgId);
×
2751
    return;
×
2752
  }
2753

2754
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
42,627!
2755
    syncNodeRelease(pSyncNode);
×
2756
    syncHbTimerDataRelease(pData);
×
2757
    sError("vgId:%d, hb timer sync node not leader", pSyncNode->vgId);
×
2758
    return;
×
2759
  }
2760

2761
  sTrace("vgId:%d, peer hb timer execution, rid:%" PRId64 " addr:0x%" PRIx64, pSyncNode->vgId, hbDataRid,
42,627!
2762
         pData->destId.addr);
2763

2764
  if (pSyncNode->totalReplicaNum > 1) {
42,627✔
2765
    int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
42,625✔
2766
    int64_t msgLogicClock = atomic_load_64(&pData->logicClock);
42,625✔
2767

2768
    if (timerLogicClock == msgLogicClock) {
42,625✔
2769
      if (tsNow > pData->execTime) {
42,622✔
2770
        pData->execTime += pSyncTimer->timerMS;
42,568✔
2771

2772
        SRpcMsg rpcMsg = {0};
42,568✔
2773
        if ((code = syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId)) != 0) {
42,568!
2774
          sError("vgId:%d, failed to build heartbeat msg since %s", pSyncNode->vgId, tstrerror(code));
×
2775
          syncNodeRelease(pSyncNode);
×
2776
          syncHbTimerDataRelease(pData);
×
2777
          return;
×
2778
        }
2779

2780
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);
42,568✔
2781

2782
        SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
42,568✔
2783
        pSyncMsg->srcId = pSyncNode->myRaftId;
42,568✔
2784
        pSyncMsg->destId = pData->destId;
42,568✔
2785
        pSyncMsg->term = raftStoreGetTerm(pSyncNode);
42,568✔
2786
        pSyncMsg->commitIndex = pSyncNode->commitIndex;
42,568✔
2787
        pSyncMsg->minMatchIndex = pSyncNode->minMatchIndex;
42,568✔
2788
        pSyncMsg->privateTerm = 0;
42,568✔
2789
        pSyncMsg->timeStamp = tsNow;
42,568✔
2790

2791
        // update reset time
2792
        int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
42,568✔
2793
        pSyncTimer->timeStamp = tsNow;
42,568✔
2794

2795
        // send msg
2796
        TRACE_SET_MSGID(&(rpcMsg.info.traceId), tGenIdPI64());
42,568✔
2797
        TRACE_SET_ROOTID(&(rpcMsg.info.traceId), tGenIdPI64());
42,568✔
2798
        sGTrace(&rpcMsg.info.traceId, "vgId:%d, send sync-heartbeat to dnode:%d", pSyncNode->vgId,
42,568!
2799
                DID(&(pSyncMsg->destId)));
2800
        syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime);
42,568✔
2801
        int ret = syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
42,568✔
2802
        if (ret != 0) {
42,568✔
2803
          sError("vgId:%d, failed to send heartbeat since %s", pSyncNode->vgId, tstrerror(ret));
80!
2804
        }
2805
      }
2806

2807
      if (syncIsInit()) {
42,622!
2808
        sTrace("vgId:%d, reset peer hb timer at %d", pSyncNode->vgId, pSyncTimer->timerMS);
42,622!
2809
        bool stopped = taosTmrResetPriority(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, (void*)hbDataRid,
42,622✔
2810
                                            syncEnv()->pTimerManager, &pSyncTimer->pTimer, 2);
42,622✔
2811
        if (stopped) sError("vgId:%d, reset peer hb timer error, %s", pSyncNode->vgId, tstrerror(code));
42,622!
2812

2813
      } else {
2814
        sError("sync env is stop, reset peer hb timer error");
×
2815
      }
2816

2817
    } else {
2818
      sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64, pSyncNode->vgId,
3!
2819
             timerLogicClock, msgLogicClock);
2820
    }
2821
  }
2822

2823
  syncHbTimerDataRelease(pData);
42,627✔
2824
  syncNodeRelease(pSyncNode);
42,627✔
2825
}
2826

2827
#ifdef BUILD_NO_CALL
2828
static void deleteCacheEntry(const void* key, size_t keyLen, void* value, void* ud) {
2829
  (void)ud;
2830
  taosMemoryFree(value);
2831
}
2832

2833
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
2834
  SSyncLogStoreData* pData = pLogStore->data;
2835
  sNTrace(pData->pSyncNode, "in cache index:%" PRId64 ", bytes:%u, %p", pEntry->index, pEntry->bytes, pEntry);
2836

2837
  int32_t   code = 0;
2838
  int32_t   entryLen = sizeof(*pEntry) + pEntry->dataLen;
2839
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
2840
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW, NULL);
2841
  if (status != TAOS_LRU_STATUS_OK) {
2842
    code = -1;
2843
  }
2844

2845
  return code;
2846
}
2847
#endif
2848

2849
void syncBuildConfigFromReq(SAlterVnodeReplicaReq* pReq, SSyncCfg* cfg) {  // TODO SAlterVnodeReplicaReq name is proper?
×
2850
  cfg->replicaNum = 0;
×
2851
  cfg->totalReplicaNum = 0;
×
2852
  int32_t code = 0;
×
2853

2854
  for (int i = 0; i < pReq->replica; ++i) {
×
2855
    SNodeInfo* pNode = &cfg->nodeInfo[i];
×
2856
    pNode->nodeId = pReq->replicas[i].id;
×
2857
    pNode->nodePort = pReq->replicas[i].port;
×
2858
    tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
×
2859
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2860
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
×
2861
    sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d, update:%d", pReq->vgId, i, pNode->nodeFqdn,
×
2862
          pNode->nodePort, pNode->nodeId, pNode->nodeRole, update);
2863
    cfg->replicaNum++;
×
2864
  }
2865
  if (pReq->selfIndex != -1) {
×
2866
    cfg->myIndex = pReq->selfIndex;
×
2867
  }
2868
  for (int i = cfg->replicaNum; i < pReq->replica + pReq->learnerReplica; ++i) {
×
2869
    SNodeInfo* pNode = &cfg->nodeInfo[i];
×
2870
    pNode->nodeId = pReq->learnerReplicas[cfg->totalReplicaNum].id;
×
2871
    pNode->nodePort = pReq->learnerReplicas[cfg->totalReplicaNum].port;
×
2872
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
2873
    tstrncpy(pNode->nodeFqdn, pReq->learnerReplicas[cfg->totalReplicaNum].fqdn, sizeof(pNode->nodeFqdn));
×
2874
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
×
2875
    sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d, update:%d", pReq->vgId, i, pNode->nodeFqdn,
×
2876
          pNode->nodePort, pNode->nodeId, pNode->nodeRole, update);
2877
    cfg->totalReplicaNum++;
×
2878
  }
2879
  cfg->totalReplicaNum += pReq->replica;
×
2880
  if (pReq->learnerSelfIndex != -1) {
×
2881
    cfg->myIndex = pReq->replica + pReq->learnerSelfIndex;
×
2882
  }
2883
  cfg->changeVersion = pReq->changeVersion;
×
2884
}
×
2885

2886
int32_t syncNodeCheckChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry) {
×
2887
  int32_t code = 0;
×
2888
  if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) {
×
2889
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
2890
  }
2891

2892
  SMsgHead* head = (SMsgHead*)pEntry->data;
×
2893
  void*     pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
×
2894

2895
  SAlterVnodeTypeReq req = {0};
×
2896
  if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
×
2897
    code = TSDB_CODE_INVALID_MSG;
×
2898
    TAOS_RETURN(code);
×
2899
  }
2900

2901
  SSyncCfg cfg = {0};
×
2902
  syncBuildConfigFromReq(&req, &cfg);
×
2903

2904
  if (cfg.totalReplicaNum >= 1 &&
×
2905
      (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
×
2906
    bool incfg = false;
×
2907
    for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
2908
      if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
2909
          ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
2910
        incfg = true;
×
2911
        break;
×
2912
      }
2913
    }
2914

2915
    if (!incfg) {
×
2916
      SyncTerm currentTerm = raftStoreGetTerm(ths);
×
2917
      SRaftId  id = EMPTY_RAFT_ID;
×
2918
      syncNodeStepDown(ths, currentTerm, id);
×
2919
      return 1;
×
2920
    }
2921
  }
2922
  return 0;
×
2923
}
2924

2925
void syncNodeLogConfigInfo(SSyncNode* ths, SSyncCfg* cfg, char* str) {
×
2926
  sInfo("vgId:%d, %s. SyncNode, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64
×
2927
        ", changeVersion:%d, "
2928
        "restoreFinish:%d",
2929
        ths->vgId, str, ths->replicaNum, ths->peersNum, ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion,
2930
        ths->restoreFinish);
2931

2932
  sInfo("vgId:%d, %s, myNodeInfo, clusterId:0x%" PRIx64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str,
×
2933
        ths->myNodeInfo.clusterId, ths->myNodeInfo.nodeId, ths->myNodeInfo.nodeFqdn, ths->myNodeInfo.nodePort,
2934
        ths->myNodeInfo.nodeRole);
2935

2936
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
2937
    sInfo("vgId:%d, %s, peersNodeInfo%d, clusterId:0x%" PRIx64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str,
×
2938
          i, ths->peersNodeInfo[i].clusterId, ths->peersNodeInfo[i].nodeId, ths->peersNodeInfo[i].nodeFqdn,
2939
          ths->peersNodeInfo[i].nodePort, ths->peersNodeInfo[i].nodeRole);
2940
  }
2941

2942
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
2943
    char    buf[256];
2944
    int32_t len = 256;
×
2945
    int32_t n = 0;
×
2946
    n += tsnprintf(buf + n, len - n, "%s", "{");
×
2947
    for (int i = 0; i < ths->peersEpset->numOfEps; i++) {
×
2948
      n += tsnprintf(buf + n, len - n, "%s:%d%s", ths->peersEpset->eps[i].fqdn, ths->peersEpset->eps[i].port,
×
2949
                     (i + 1 < ths->peersEpset->numOfEps ? ", " : ""));
×
2950
    }
2951
    n += tsnprintf(buf + n, len - n, "%s", "}");
×
2952

2953
    sInfo("vgId:%d, %s, peersEpset%d, %s, inUse:%d", ths->vgId, str, i, buf, ths->peersEpset->inUse);
×
2954
  }
2955

2956
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
2957
    sInfo("vgId:%d, %s, peersId%d, addr:0x%" PRIx64, ths->vgId, str, i, ths->peersId[i].addr);
×
2958
  }
2959

2960
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
2961
    sInfo("vgId:%d, %s, nodeInfo%d, clusterId:0x%" PRIx64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str, i,
×
2962
          ths->raftCfg.cfg.nodeInfo[i].clusterId, ths->raftCfg.cfg.nodeInfo[i].nodeId,
2963
          ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, ths->raftCfg.cfg.nodeInfo[i].nodePort,
2964
          ths->raftCfg.cfg.nodeInfo[i].nodeRole);
2965
  }
2966

2967
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
2968
    sInfo("vgId:%d, %s, replicasId%d, addr:0x%" PRIx64, ths->vgId, str, i, ths->replicasId[i].addr);
×
2969
  }
2970
}
×
2971

2972
int32_t syncNodeRebuildPeerAndCfg(SSyncNode* ths, SSyncCfg* cfg) {
×
2973
  int32_t i = 0;
×
2974

2975
  // change peersNodeInfo
2976
  i = 0;
×
2977
  for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
2978
    if (!(strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
2979
          ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)) {
×
2980
      ths->peersNodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
×
2981
      ths->peersNodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
×
2982
      tstrncpy(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
×
2983
      ths->peersNodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
×
2984
      ths->peersNodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
×
2985

2986
      syncUtilNodeInfo2EpSet(&ths->peersNodeInfo[i], &ths->peersEpset[i]);
×
2987

2988
      if (syncUtilNodeInfo2RaftId(&ths->peersNodeInfo[i], ths->vgId, &ths->peersId[i]) == false) {
×
2989
        sError("vgId:%d, failed to determine raft member id, peer:%d", ths->vgId, i);
×
2990
        return terrno;
×
2991
      }
2992

2993
      i++;
×
2994
    }
2995
  }
2996
  ths->peersNum = i;
×
2997

2998
  // change cfg nodeInfo
2999
  ths->raftCfg.cfg.replicaNum = 0;
×
3000
  i = 0;
×
3001
  for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3002
    if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3003
      ths->raftCfg.cfg.replicaNum++;
×
3004
    }
3005
    ths->raftCfg.cfg.nodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
×
3006
    ths->raftCfg.cfg.nodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
×
3007
    tstrncpy(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
×
3008
    ths->raftCfg.cfg.nodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
×
3009
    ths->raftCfg.cfg.nodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
×
3010
    if ((strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3011
         ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)) {
×
3012
      ths->raftCfg.cfg.myIndex = i;
×
3013
    }
3014
    i++;
×
3015
  }
3016
  ths->raftCfg.cfg.totalReplicaNum = i;
×
3017

3018
  return 0;
×
3019
}
3020

3021
void syncNodeChangePeerAndCfgToVoter(SSyncNode* ths, SSyncCfg* cfg) {
×
3022
  // change peersNodeInfo
3023
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3024
    for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3025
      if (strcmp(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3026
          ths->peersNodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort) {
×
3027
        if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3028
          ths->peersNodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3029
        }
3030
      }
3031
    }
3032
  }
3033

3034
  // change cfg nodeInfo
3035
  ths->raftCfg.cfg.replicaNum = 0;
×
3036
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3037
    for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3038
      if (strcmp(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3039
          ths->raftCfg.cfg.nodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort) {
×
3040
        if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3041
          ths->raftCfg.cfg.nodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3042
          ths->raftCfg.cfg.replicaNum++;
×
3043
        }
3044
      }
3045
    }
3046
  }
3047
}
×
3048

3049
int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum) {
×
3050
  int32_t code = 0;
×
3051
  // 1.rebuild replicasId, remove deleted one
3052
  SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
3053
  memcpy(oldReplicasId, ths->replicasId, sizeof(oldReplicasId));
×
3054

3055
  ths->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3056
  ths->totalReplicaNum = ths->raftCfg.cfg.totalReplicaNum;
×
3057
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3058
    if (syncUtilNodeInfo2RaftId(&ths->raftCfg.cfg.nodeInfo[i], ths->vgId, &ths->replicasId[i]) == false) return terrno;
×
3059
  }
3060

3061
  // 2.rebuild MatchIndex, remove deleted one
3062
  SSyncIndexMgr* oldIndex = ths->pMatchIndex;
×
3063

3064
  ths->pMatchIndex = syncIndexMgrCreate(ths);
×
3065
  if (ths->pMatchIndex == NULL) {
×
3066
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3067
    if (terrno != 0) code = terrno;
×
3068
    TAOS_RETURN(code);
×
3069
  }
3070

3071
  syncIndexMgrCopyIfExist(ths->pMatchIndex, oldIndex, oldReplicasId);
×
3072

3073
  syncIndexMgrDestroy(oldIndex);
×
3074

3075
  // 3.rebuild NextIndex, remove deleted one
3076
  SSyncIndexMgr* oldNextIndex = ths->pNextIndex;
×
3077

3078
  ths->pNextIndex = syncIndexMgrCreate(ths);
×
3079
  if (ths->pNextIndex == NULL) {
×
3080
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3081
    if (terrno != 0) code = terrno;
×
3082
    TAOS_RETURN(code);
×
3083
  }
3084

3085
  syncIndexMgrCopyIfExist(ths->pNextIndex, oldNextIndex, oldReplicasId);
×
3086

3087
  syncIndexMgrDestroy(oldNextIndex);
×
3088

3089
  // 4.rebuild pVotesGranted, pVotesRespond, no need to keep old vote state, only rebuild
3090
  voteGrantedUpdate(ths->pVotesGranted, ths);
×
3091
  votesRespondUpdate(ths->pVotesRespond, ths);
×
3092

3093
  // 5.rebuild logReplMgr
3094
  for (int i = 0; i < oldtotalReplicaNum; ++i) {
×
3095
    sDebug("vgId:%d, old logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId,
×
3096
           i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
3097
           ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
3098
  }
3099

3100
  SSyncLogReplMgr* oldLogReplMgrs = NULL;
×
3101
  int64_t          length = sizeof(SSyncLogReplMgr) * (TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA);
×
3102
  oldLogReplMgrs = taosMemoryMalloc(length);
×
3103
  if (NULL == oldLogReplMgrs) return terrno;
×
3104
  memset(oldLogReplMgrs, 0, length);
×
3105

3106
  for (int i = 0; i < oldtotalReplicaNum; i++) {
×
3107
    oldLogReplMgrs[i] = *(ths->logReplMgrs[i]);
×
3108
  }
3109

3110
  syncNodeLogReplDestroy(ths);
×
3111
  if ((code = syncNodeLogReplInit(ths)) != 0) {
×
3112
    taosMemoryFree(oldLogReplMgrs);
×
3113
    TAOS_RETURN(code);
×
3114
  }
3115

3116
  for (int i = 0; i < ths->totalReplicaNum; ++i) {
×
3117
    for (int j = 0; j < oldtotalReplicaNum; j++) {
×
3118
      if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
×
3119
        *(ths->logReplMgrs[i]) = oldLogReplMgrs[j];
×
3120
        ths->logReplMgrs[i]->peerId = i;
×
3121
      }
3122
    }
3123
  }
3124

3125
  for (int i = 0; i < ths->totalReplicaNum; ++i) {
×
3126
    sDebug("vgId:%d, new logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId,
×
3127
           i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
3128
           ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
3129
  }
3130

3131
  // 6.rebuild sender
3132
  for (int i = 0; i < oldtotalReplicaNum; ++i) {
×
3133
    sDebug("vgId:%d, old sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i,
×
3134
           ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
3135
  }
3136

3137
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3138
    if (ths->senders[i] != NULL) {
×
3139
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", ths->vgId, ths->senders[i]);
×
3140

3141
      if (snapshotSenderIsStart(ths->senders[i])) {
×
3142
        snapshotSenderStop(ths->senders[i], false);
×
3143
      }
3144

3145
      snapshotSenderDestroy(ths->senders[i]);
×
3146
      ths->senders[i] = NULL;
×
3147
    }
3148
  }
3149

3150
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3151
    SSyncSnapshotSender* pSender = NULL;
×
3152
    int32_t              code = snapshotSenderCreate(ths, i, &pSender);
×
3153
    if (pSender == NULL) return terrno = code;
×
3154

3155
    ths->senders[i] = pSender;
×
3156
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
×
3157
  }
3158

3159
  for (int i = 0; i < ths->totalReplicaNum; i++) {
×
3160
    sDebug("vgId:%d, new sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i,
×
3161
           ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
3162
  }
3163

3164
  // 7.rebuild synctimer
3165
  if ((code = syncNodeStopHeartbeatTimer(ths)) != 0) {
×
3166
    taosMemoryFree(oldLogReplMgrs);
×
3167
    TAOS_RETURN(code);
×
3168
  }
3169

3170
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3171
    if ((code = syncHbTimerInit(ths, &ths->peerHeartbeatTimerArr[i], ths->replicasId[i])) != 0) {
×
3172
      taosMemoryFree(oldLogReplMgrs);
×
3173
      TAOS_RETURN(code);
×
3174
    }
3175
  }
3176

3177
  if ((code = syncNodeStartHeartbeatTimer(ths)) != 0) {
×
3178
    taosMemoryFree(oldLogReplMgrs);
×
3179
    TAOS_RETURN(code);
×
3180
  }
3181

3182
  // 8.rebuild peerStates
3183
  SPeerState oldState[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0};
×
3184
  for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) {
×
3185
    oldState[i] = ths->peerStates[i];
×
3186
  }
3187

3188
  for (int i = 0; i < ths->totalReplicaNum; i++) {
×
3189
    for (int j = 0; j < oldtotalReplicaNum; j++) {
×
3190
      if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
×
3191
        ths->peerStates[i] = oldState[j];
×
3192
      }
3193
    }
3194
  }
3195

3196
  taosMemoryFree(oldLogReplMgrs);
×
3197

3198
  return 0;
×
3199
}
3200

3201
void syncNodeChangeToVoter(SSyncNode* ths) {
×
3202
  // replicasId, only need to change replicaNum when 1->3
3203
  ths->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3204
  sDebug("vgId:%d, totalReplicaNum:%d", ths->vgId, ths->totalReplicaNum);
×
3205
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
×
3206
    sDebug("vgId:%d, i:%d, replicaId.addr:0x%" PRIx64, ths->vgId, i, ths->replicasId[i].addr);
×
3207
  }
3208

3209
  // pMatchIndex, pNextIndex, only need to change replicaNum when 1->3
3210
  ths->pMatchIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3211
  ths->pNextIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3212

3213
  sDebug("vgId:%d, pMatchIndex->totalReplicaNum:%d", ths->vgId, ths->pMatchIndex->totalReplicaNum);
×
3214
  for (int32_t i = 0; i < ths->pMatchIndex->totalReplicaNum; ++i) {
×
3215
    sDebug("vgId:%d, i:%d, match.index:%" PRId64, ths->vgId, i, ths->pMatchIndex->index[i]);
×
3216
  }
3217

3218
  // pVotesGranted, pVotesRespond
3219
  voteGrantedUpdate(ths->pVotesGranted, ths);
×
3220
  votesRespondUpdate(ths->pVotesRespond, ths);
×
3221

3222
  // logRepMgrs
3223
  // no need to change logRepMgrs when 1->3
3224
}
×
3225

3226
void syncNodeResetPeerAndCfg(SSyncNode* ths) {
×
3227
  SNodeInfo node = {0};
×
3228
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3229
    memcpy(&ths->peersNodeInfo[i], &node, sizeof(SNodeInfo));
×
3230
  }
3231

3232
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3233
    memcpy(&ths->raftCfg.cfg.nodeInfo[i], &node, sizeof(SNodeInfo));
×
3234
  }
3235
}
×
3236

3237
int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str) {
×
3238
  int32_t code = 0;
×
3239
  if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) {
×
3240
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3241
  }
3242

3243
  SMsgHead* head = (SMsgHead*)pEntry->data;
×
3244
  void*     pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
×
3245

3246
  SAlterVnodeTypeReq req = {0};
×
3247
  if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
×
3248
    code = TSDB_CODE_INVALID_MSG;
×
3249
    TAOS_RETURN(code);
×
3250
  }
3251

3252
  SSyncCfg cfg = {0};
×
3253
  syncBuildConfigFromReq(&req, &cfg);
×
3254

3255
  if (cfg.changeVersion <= ths->raftCfg.cfg.changeVersion) {
×
3256
    sInfo(
×
3257
        "vgId:%d, skip conf change entry since lower version. "
3258
        "this entry, index:%" PRId64 ", term:%" PRId64
3259
        ", totalReplicaNum:%d, changeVersion:%d; "
3260
        "current node, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64 ", changeVersion:%d",
3261
        ths->vgId, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum,
3262
        ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion);
3263
    return 0;
×
3264
  }
3265

3266
  if (strcmp(str, "Commit") == 0) {
×
3267
    sInfo(
×
3268
        "vgId:%d, change config from %s. "
3269
        "this, i:%" PRId64
3270
        ", trNum:%d, vers:%d; "
3271
        "node, rNum:%d, pNum:%d, trNum:%d, "
3272
        "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
3273
        "), "
3274
        "cond:(next i:%" PRId64 ", t:%" PRId64 " ==%s)",
3275
        ths->vgId, str, pEntry->index - 1, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum,
3276
        ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, ths->pLogBuf->matchIndex,
3277
        ths->pLogBuf->endIndex, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType));
3278
  } else {
3279
    sInfo(
×
3280
        "vgId:%d, change config from %s. "
3281
        "this, i:%" PRId64 ", t:%" PRId64
3282
        ", trNum:%d, vers:%d; "
3283
        "node, rNum:%d, pNum:%d, trNum:%d, "
3284
        "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
3285
        "), "
3286
        "cond:(pre i:%" PRId64 "==ci:%" PRId64 ", bci:%" PRId64 ")",
3287
        ths->vgId, str, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum,
3288
        ths->peersNum, ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
3289
        ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex, pEntry->index - 1, ths->commitIndex,
3290
        ths->pLogBuf->commitIndex);
3291
  }
3292

3293
  syncNodeLogConfigInfo(ths, &cfg, "before config change");
×
3294

3295
  int32_t oldTotalReplicaNum = ths->totalReplicaNum;
×
3296

3297
  if (cfg.totalReplicaNum == 1 || cfg.totalReplicaNum == 2) {  // remove replica
×
3298

3299
    bool incfg = false;
×
3300
    for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3301
      if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3302
          ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3303
        incfg = true;
×
3304
        break;
×
3305
      }
3306
    }
3307

3308
    if (incfg) {  // remove other
×
3309
      syncNodeResetPeerAndCfg(ths);
×
3310

3311
      // no need to change myNodeInfo
3312

3313
      if ((code = syncNodeRebuildPeerAndCfg(ths, &cfg)) != 0) {
×
3314
        TAOS_RETURN(code);
×
3315
      };
3316

3317
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3318
        TAOS_RETURN(code);
×
3319
      };
3320
    } else {  // remove myself
3321
      // no need to do anything actually, to change the following to reduce distruptive server chance
3322

3323
      syncNodeResetPeerAndCfg(ths);
×
3324

3325
      // change myNodeInfo
3326
      ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3327

3328
      // change peer and cfg
3329
      ths->peersNum = 0;
×
3330
      memcpy(&ths->raftCfg.cfg.nodeInfo[0], &ths->myNodeInfo, sizeof(SNodeInfo));
×
3331
      ths->raftCfg.cfg.replicaNum = 0;
×
3332
      ths->raftCfg.cfg.totalReplicaNum = 1;
×
3333

3334
      // change other
3335
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3336
        TAOS_RETURN(code);
×
3337
      }
3338

3339
      // change state
3340
      ths->state = TAOS_SYNC_STATE_LEARNER;
×
3341
    }
3342

3343
    ths->restoreFinish = false;
×
3344
  } else {                            // add replica, or change replica type
3345
    if (ths->totalReplicaNum == 3) {  // change replica type
×
3346
      sInfo("vgId:%d, begin change replica type", ths->vgId);
×
3347

3348
      // change myNodeInfo
3349
      for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3350
        if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3351
            ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3352
          if (cfg.nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3353
            ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3354
          }
3355
        }
3356
      }
3357

3358
      // change peer and cfg
3359
      syncNodeChangePeerAndCfgToVoter(ths, &cfg);
×
3360

3361
      // change other
3362
      syncNodeChangeToVoter(ths);
×
3363

3364
      // change state
3365
      if (ths->state == TAOS_SYNC_STATE_LEARNER) {
×
3366
        if (ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3367
          ths->state = TAOS_SYNC_STATE_FOLLOWER;
×
3368
        }
3369
      }
3370

3371
      ths->restoreFinish = false;
×
3372
    } else {  // add replica
3373
      sInfo("vgId:%d, begin add replica", ths->vgId);
×
3374

3375
      // no need to change myNodeInfo
3376

3377
      // change peer and cfg
3378
      if ((code = syncNodeRebuildPeerAndCfg(ths, &cfg)) != 0) {
×
3379
        TAOS_RETURN(code);
×
3380
      };
3381

3382
      // change other
3383
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3384
        TAOS_RETURN(code);
×
3385
      };
3386

3387
      // no need to change state
3388

3389
      if (ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) {
×
3390
        ths->restoreFinish = false;
×
3391
      }
3392
    }
3393
  }
3394

3395
  ths->quorum = syncUtilQuorum(ths->replicaNum);
×
3396

3397
  ths->raftCfg.lastConfigIndex = pEntry->index;
×
3398
  ths->raftCfg.cfg.lastIndex = pEntry->index;
×
3399
  ths->raftCfg.cfg.changeVersion = cfg.changeVersion;
×
3400

3401
  syncNodeLogConfigInfo(ths, &cfg, "after config change");
×
3402

3403
  if ((code = syncWriteCfgFile(ths)) != 0) {
×
3404
    sError("vgId:%d, failed to create sync cfg file", ths->vgId);
×
3405
    TAOS_RETURN(code);
×
3406
  };
3407

3408
  TAOS_RETURN(code);
×
3409
}
3410

3411
int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry, SRpcMsg* pMsg) {
10,456,649✔
3412
  int32_t code = -1;
10,456,649✔
3413
  if (pEntry->dataLen < sizeof(SMsgHead)) {
10,456,649!
3414
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
3415
    sError("vgId:%d, msg:%p, cannot append an invalid client request with no msg head, type:%s dataLen:%d", ths->vgId,
×
3416
           pMsg, TMSG_INFO(pEntry->originalRpcType), pEntry->dataLen);
3417
    syncEntryDestroy(pEntry);
×
3418
    pEntry = NULL;
×
3419
    goto _out;
×
3420
  }
3421

3422
  // append to log buffer
3423
  if ((code = syncLogBufferAppend(ths->pLogBuf, ths, pEntry)) < 0) {
10,456,649✔
3424
    sError("vgId:%d, index:%" PRId64 ", failed to enqueue sync log buffer", ths->vgId, pEntry->index);
266!
3425
    int32_t ret = 0;
266✔
3426
    if ((ret = syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno, false)) != 0) {
266!
3427
      sError("vgId:%d, index:%" PRId64 ", failed to execute fsm since %s", ths->vgId, pEntry->index, tstrerror(ret));
×
3428
    }
3429
    syncEntryDestroy(pEntry);
×
3430
    pEntry = NULL;
×
3431
    goto _out;
×
3432
  }
3433

3434
  code = 0;
10,456,560✔
3435
_out:;
10,456,560✔
3436
  // proceed match index, with replicating on needed
3437
  SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL, "Append", pMsg);
10,456,560✔
3438
  const STraceId* trace = pEntry ? &pEntry->originRpcTraceId : NULL;
10,456,465!
3439

3440
  if (pEntry != NULL) {
10,456,465!
3441
    sGDebug(trace,
10,456,698!
3442
            "vgId:%d, index:%" PRId64 ", raft entry appended, msg:%p term:%" PRId64 " buf:[%" PRId64 " %" PRId64
3443
            " %" PRId64 ", %" PRId64 ")",
3444
            ths->vgId, pEntry->index, pMsg, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
3445
            ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
3446
  }
3447

3448
  if (code == 0 && ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
10,456,467!
3449
    int64_t index = syncNodeUpdateAssignedCommitIndex(ths, matchIndex);
×
3450
    sGTrace(trace, "vgId:%d, index:%" PRId64 ", update assigned commit, msg:%p", ths->vgId, index, pMsg);
×
3451

3452
    if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
×
3453
        syncLogBufferCommit(ths->pLogBuf, ths, ths->assignedCommitIndex, trace, "append-entry") < 0) {
×
3454
      sGError(trace, "vgId:%d, index:%" PRId64 ", failed to commit, msg:%p commit index:%" PRId64, ths->vgId, index,
×
3455
              pMsg, ths->commitIndex);
3456
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
3457
    }
3458
  }
3459

3460
  // multi replica
3461
  if (ths->replicaNum > 1) {
10,456,400✔
3462
    TAOS_RETURN(code);
142,199✔
3463
  }
3464

3465
  // single replica
3466
  SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, matchIndex);
10,314,201✔
3467
  sGTrace(trace, "vgId:%d, index:%" PRId64 ", raft entry update commit, msg:%p return index:%" PRId64, ths->vgId,
10,314,379!
3468
          matchIndex, pMsg, returnIndex);
3469

3470
  if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
20,628,848!
3471
      (code = syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex, trace, "append-entry")) < 0) {
10,314,258✔
3472
    sGError(trace,
×
3473
            "vgId:%d, index:%" PRId64 ", failed to commit, msg:%p commit index:%" PRId64 " return index:%" PRId64,
3474
            ths->vgId, matchIndex, pMsg, ths->commitIndex, returnIndex);
3475
  }
3476

3477
  TAOS_RETURN(code);
10,314,590✔
3478
}
3479

3480
bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
10,446,444✔
3481
  if (pSyncNode->totalReplicaNum == 1) {
10,446,444✔
3482
    return false;
10,255,658✔
3483
  }
3484

3485
  int32_t toCount = 0;
190,786✔
3486
  int64_t tsNow = taosGetTimestampMs();
190,967✔
3487
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
523,351✔
3488
    if (pSyncNode->peersNodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
332,386✔
3489
      continue;
48,800✔
3490
    }
3491
    int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
283,586✔
3492
    if (recvTime == 0 || recvTime == -1) {
283,584!
3493
      continue;
×
3494
    }
3495

3496
    if (tsNow - recvTime > tsHeartbeatTimeout) {
283,586✔
3497
      toCount++;
7,893✔
3498
    }
3499
  }
3500

3501
  bool b = (toCount >= pSyncNode->quorum ? true : false);
190,965✔
3502

3503
  return b;
190,965✔
3504
}
3505

3506
bool syncNodeSnapshotSending(SSyncNode* pSyncNode) {
×
3507
  if (pSyncNode == NULL) return false;
×
3508
  bool b = false;
×
3509
  for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
×
3510
    if (pSyncNode->senders[i] != NULL && pSyncNode->senders[i]->start) {
×
3511
      b = true;
×
3512
      break;
×
3513
    }
3514
  }
3515
  return b;
×
3516
}
3517

3518
bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) {
×
3519
  if (pSyncNode == NULL) return false;
×
3520
  if (pSyncNode->pNewNodeReceiver == NULL) return false;
×
3521
  if (pSyncNode->pNewNodeReceiver->start) return true;
×
3522
  return false;
×
3523
}
3524

3525
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
11,361✔
3526
  int32_t   code = 0;
11,361✔
3527
  SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
11,361✔
3528
  SyncTerm  term = raftStoreGetTerm(ths);
11,361✔
3529

3530
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
11,361✔
3531
  if (pEntry == NULL) {
11,361!
3532
    code = TSDB_CODE_OUT_OF_MEMORY;
×
3533
    TAOS_RETURN(code);
×
3534
  }
3535

3536
  code = syncNodeAppend(ths, pEntry, NULL);
11,361✔
3537
  TAOS_RETURN(code);
11,361✔
3538
}
3539

3540
#ifdef BUILD_NO_CALL
3541
static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
3542
  int32_t ret = 0;
3543

3544
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
3545
  SyncTerm        term = raftStoreGetTerm(ths);
3546
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
3547
  if (pEntry == NULL) return -1;
3548

3549
  LRUHandle* h = NULL;
3550

3551
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
3552
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry, false);
3553
    if (code != 0) {
3554
      sError("append noop error");
3555
      return -1;
3556
    }
3557

3558
    syncCacheEntry(ths->pLogStore, pEntry, &h);
3559
  }
3560

3561
  if (h) {
3562
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
3563
  } else {
3564
    syncEntryDestroy(pEntry);
3565
  }
3566

3567
  return ret;
3568
}
3569
#endif
3570

3571
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
38,514✔
3572
  SyncHeartbeat* pMsg = pRpcMsg->pCont;
38,514✔
3573
  bool           resetElect = false;
38,514✔
3574

3575
  int64_t tsMs = taosGetTimestampMs();
38,514✔
3576

3577
  int64_t lastRecvTime = syncIndexMgrGetRecvTime(ths->pNextIndex, &(pMsg->srcId));
38,514✔
3578
  syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);
38,518✔
3579

3580
  int64_t netElapsed = tsMs - pMsg->timeStamp;
38,519✔
3581
  int64_t timeDiff = tsMs - lastRecvTime;
38,519✔
3582
  syncLogRecvHeartbeat(ths, pMsg, netElapsed, &pRpcMsg->info.traceId, timeDiff);
38,519✔
3583

3584
  if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) {
38,516✔
3585
    sWarn(
7!
3586
        "vgId:%d, drop heartbeat msg from dnode:%d, because it come from another cluster:%d, differ from current "
3587
        "cluster:%d",
3588
        ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), CID(&(ths->myRaftId)));
3589
    return 0;
7✔
3590
  }
3591

3592
  SyncTerm currentTerm = raftStoreGetTerm(ths);
38,512✔
3593

3594
  if (pMsg->term > currentTerm && ths->state == TAOS_SYNC_STATE_LEARNER) {
38,511✔
3595
    raftStoreSetTerm(ths, pMsg->term);
251✔
3596
    currentTerm = pMsg->term;
251✔
3597
  }
3598

3599
  int64_t tsMs2 = taosGetTimestampMs();
38,512✔
3600

3601
  int64_t processTime = tsMs2 - tsMs;
38,512✔
3602
  if (processTime > SYNC_HEARTBEAT_SLOW_MS) {
38,512!
3603
    sGError(&pRpcMsg->info.traceId,
×
3604
            "vgId:%d, process sync-heartbeat msg from dnode:%d, commit-index:%" PRId64 ", cluster:%d msgTerm:%" PRId64
3605
            " currentTerm:%" PRId64 ", processTime:%" PRId64,
3606
            ths->vgId, DID(&(pMsg->srcId)), pMsg->commitIndex, CID(&(pMsg->srcId)), pMsg->term, currentTerm,
3607
            processTime);
3608
  } else {
3609
    sGDebug(&pRpcMsg->info.traceId,
38,512!
3610
            "vgId:%d, process sync-heartbeat msg from dnode:%d, commit-index:%" PRId64 ", cluster:%d msgTerm:%" PRId64
3611
            " currentTerm:%" PRId64 ", processTime:%" PRId64,
3612
            ths->vgId, DID(&(pMsg->srcId)), pMsg->commitIndex, CID(&(pMsg->srcId)), pMsg->term, currentTerm,
3613
            processTime);
3614
  }
3615

3616
  if (pMsg->term == currentTerm &&
38,512✔
3617
      (ths->state != TAOS_SYNC_STATE_LEADER && ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
38,298!
3618
    resetElect = true;
38,299✔
3619

3620
    ths->minMatchIndex = pMsg->minMatchIndex;
38,299✔
3621

3622
    if (ths->state == TAOS_SYNC_STATE_FOLLOWER || ths->state == TAOS_SYNC_STATE_LEARNER) {
38,299✔
3623
      SRpcMsg rpcMsgLocalCmd = {0};
38,298✔
3624
      TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId));
38,298!
3625
      rpcMsgLocalCmd.info.traceId = pRpcMsg->info.traceId;
38,302✔
3626

3627
      SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
38,302✔
3628
      pSyncMsg->cmd =
38,302✔
3629
          (ths->state == TAOS_SYNC_STATE_LEARNER) ? SYNC_LOCAL_CMD_LEARNER_CMT : SYNC_LOCAL_CMD_FOLLOWER_CMT;
38,302✔
3630
      pSyncMsg->commitIndex = pMsg->commitIndex;
38,302✔
3631
      pSyncMsg->currentTerm = pMsg->term;
38,302✔
3632

3633
      if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
38,302!
3634
        int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
38,302✔
3635
        if (code != 0) {
38,296!
3636
          sError("vgId:%d, failed to enqueue commit msg from heartbeat since %s, code:%d", ths->vgId, terrstr(), code);
×
3637
          rpcFreeCont(rpcMsgLocalCmd.pCont);
×
3638
        } else {
3639
          sGTrace(&pRpcMsg->info.traceId,
38,296!
3640
                  "vgId:%d, enqueue commit msg from heartbeat, commit-index:%" PRId64 ", term:%" PRId64, ths->vgId,
3641
                  pMsg->commitIndex, pMsg->term);
3642
        }
3643
      }
3644
    }
3645
  }
3646

3647
  if (pMsg->term >= currentTerm &&
38,510✔
3648
      (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
38,507!
3649
    SRpcMsg rpcMsgLocalCmd = {0};
×
3650
    TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId));
×
3651
    rpcMsgLocalCmd.info.traceId = pRpcMsg->info.traceId;
×
3652

3653
    SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
×
3654
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
×
3655
    pSyncMsg->currentTerm = pMsg->term;
×
3656
    pSyncMsg->commitIndex = pMsg->commitIndex;
×
3657

3658
    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
×
3659
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
×
3660
      if (code != 0) {
×
3661
        sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
×
3662
        rpcFreeCont(rpcMsgLocalCmd.pCont);
×
3663
      } else {
3664
        sTrace("vgId:%d, sync enqueue step-down msg, new-term:%" PRId64, ths->vgId, pMsg->term);
×
3665
      }
3666
    }
3667
  }
3668

3669
  SRpcMsg rpcMsg = {0};
38,511✔
3670
  TAOS_CHECK_RETURN(syncBuildHeartbeatReply(&rpcMsg, ths->vgId));
38,511!
3671
  SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
38,510✔
3672
  pMsgReply->destId = pMsg->srcId;
38,510✔
3673
  pMsgReply->srcId = ths->myRaftId;
38,510✔
3674
  pMsgReply->term = currentTerm;
38,510✔
3675
  pMsgReply->privateTerm = 8864;  // magic number
38,510✔
3676
  pMsgReply->startTime = ths->startTime;
38,510✔
3677
  pMsgReply->timeStamp = tsMs;
38,510✔
3678
  rpcMsg.info.traceId = pRpcMsg->info.traceId;
38,510✔
3679

3680
  // reply
3681
  int64_t tsMs3 = taosGetTimestampMs();
38,510✔
3682

3683
  int64_t processTime2 = tsMs3 - tsMs2;
38,510✔
3684
  TRACE_SET_MSGID(&(rpcMsg.info.traceId), tGenIdPI64());
38,510✔
3685
  if (processTime2 > SYNC_HEARTBEAT_SLOW_MS) {
38,511!
3686
    sGError(&rpcMsg.info.traceId,
×
3687
            "vgId:%d, send sync-heartbeat-reply to dnode:%d term:%" PRId64 " timestamp:%" PRId64
3688
            ", processTime:%" PRId64,
3689
            ths->vgId, DID(&(pMsgReply->destId)), pMsgReply->term, pMsgReply->timeStamp, processTime2);
3690
  } else {
3691
    sGDebug(&rpcMsg.info.traceId,
38,511!
3692
            "vgId:%d, send sync-heartbeat-reply to dnode:%d term:%" PRId64 " timestamp:%" PRId64
3693
            ", processTime:%" PRId64,
3694
            ths->vgId, DID(&(pMsgReply->destId)), pMsgReply->term, pMsgReply->timeStamp, processTime2);
3695
  }
3696

3697
  TAOS_CHECK_RETURN(syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg));
38,511!
3698

3699
  if (resetElect) syncNodeResetElectTimer(ths);
38,512✔
3700
  return 0;
38,512✔
3701
}
3702

3703
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
37,761✔
3704
  int32_t code = 0;
37,761✔
3705

3706
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
37,761✔
3707
  SSyncLogReplMgr*    pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId);
37,761✔
3708
  if (pMgr == NULL) {
37,761!
3709
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3710
    if (terrno != 0) code = terrno;
×
3711
    sError("vgId:%d, failed to get log repl mgr for the peer at addr 0x016%" PRIx64, ths->vgId, pMsg->srcId.addr);
×
3712
    TAOS_RETURN(code);
×
3713
  }
3714

3715
  int64_t tsMs = taosGetTimestampMs();
37,761✔
3716
  int64_t lastRecvTime = syncIndexMgrGetRecvTime(ths->pMatchIndex, &pMsg->srcId);
37,761✔
3717
  syncLogRecvHeartbeatReply(ths, pMsg, tsMs - pMsg->timeStamp, &pRpcMsg->info.traceId, tsMs - lastRecvTime);
37,761✔
3718

3719
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
37,761✔
3720

3721
  return syncLogReplProcessHeartbeatReply(pMgr, ths, pMsg);
37,761✔
3722
}
3723

3724
#ifdef BUILD_NO_CALL
3725
int32_t syncNodeOnHeartbeatReplyOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
3726
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
3727

3728
  int64_t tsMs = taosGetTimestampMs();
3729
  int64_t timeDiff = tsMs - pMsg->timeStamp;
3730
  syncLogRecvHeartbeatReply(ths, pMsg, timeDiff, &pRpcMsg->info.traceId, timeDiff);
3731

3732
  // update last reply time, make decision whether the other node is alive or not
3733
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
3734
  return 0;
3735
}
3736
#endif
3737

3738
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
38,302✔
3739
  SyncLocalCmd* pMsg = pRpcMsg->pCont;
38,302✔
3740
  syncLogRecvLocalCmd(ths, pMsg, &pRpcMsg->info.traceId);
38,302✔
3741

3742
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
38,302!
3743
    SRaftId id = EMPTY_RAFT_ID;
×
3744
    syncNodeStepDown(ths, pMsg->currentTerm, id);
×
3745

3746
  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT || pMsg->cmd == SYNC_LOCAL_CMD_LEARNER_CMT) {
76,604!
3747
    if (syncLogBufferIsEmpty(ths->pLogBuf)) {
38,302!
3748
      sError("vgId:%d, sync log buffer is empty", ths->vgId);
×
3749
      return 0;
×
3750
    }
3751
    SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf);
38,302✔
3752
    if (matchTerm < 0) {
38,302!
3753
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3754
    }
3755
    if (pMsg->currentTerm == matchTerm) {
38,302✔
3756
      SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
35,217✔
3757
      sTrace("vgId:%d, raft entry update commit, return index:%" PRId64, ths->vgId, returnIndex);
35,217!
3758
    }
3759
    if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
76,604!
3760
        syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex, &pRpcMsg->info.traceId, "heartbeat") < 0) {
38,302✔
3761
      sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64, ths->vgId, terrstr(),
×
3762
             ths->commitIndex);
3763
    }
3764
  } else {
3765
    sError("error local cmd");
×
3766
  }
3767

3768
  return 0;
38,302✔
3769
}
3770

3771
// TLA+ Spec
3772
// ClientRequest(i, v) ==
3773
//     /\ state[i] = Leader
3774
//     /\ LET entry == [term  |-> currentTerm[i],
3775
//                      value |-> v]
3776
//            newLog == Append(log[i], entry)
3777
//        IN  log' = [log EXCEPT ![i] = newLog]
3778
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
3779
//                    leaderVars, commitIndex>>
3780
//
3781

3782
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
10,445,237✔
3783
  sGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, process client request", ths->vgId, pMsg);
10,445,237!
3784
  int32_t code = 0;
10,445,239✔
3785

3786
  SyncIndex       index = syncLogBufferGetEndIndex(ths->pLogBuf);
10,445,239✔
3787
  SyncTerm        term = raftStoreGetTerm(ths);
10,445,394✔
3788
  SSyncRaftEntry* pEntry = NULL;
10,445,405✔
3789
  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
10,445,405✔
3790
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index, &pMsg->info.traceId);
230,947✔
3791
  } else {
3792
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
10,214,458✔
3793
  }
3794

3795
  if (pEntry == NULL) {
10,445,421!
3796
    sGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to process client request since %s", ths->vgId, pMsg, terrstr());
×
3797
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3798
  }
3799

3800
  // 1->2, config change is add in write thread, and will continue in sync thread
3801
  // need save message for it
3802
  if (pMsg->msgType == TDMT_SYNC_CONFIG_CHANGE) {
10,445,421!
3803
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
×
3804
    uint64_t  seqNum = syncRespMgrAdd(ths->pSyncRespMgr, &stub);
×
3805
    pEntry->seqNum = seqNum;
×
3806
  }
3807

3808
  if (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
10,445,421!
3809
    if (pRetIndex) {
10,445,421✔
3810
      (*pRetIndex) = index;
10,214,439✔
3811
    }
3812

3813
    if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
10,445,421!
3814
      int32_t code = syncNodeCheckChangeConfig(ths, pEntry);
×
3815
      if (code < 0) {
×
3816
        sGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to check change config since %s", ths->vgId, pMsg, terrstr());
×
3817
        syncEntryDestroy(pEntry);
×
3818
        pEntry = NULL;
×
3819
        TAOS_RETURN(code);
×
3820
      }
3821

3822
      if (code > 0) {
×
3823
        SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
×
3824
        int32_t num = syncRespMgrGetAndDel(ths->pSyncRespMgr, pEntry->seqNum, &rsp.info);
×
3825
        sGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, get response stub for config change, seqNum:%" PRIu64 " num:%d",
×
3826
                ths->vgId, pMsg, pEntry->seqNum, num);
3827
        if (rsp.info.handle != NULL) {
×
3828
          tmsgSendRsp(&rsp);
×
3829
        }
3830
        syncEntryDestroy(pEntry);
×
3831
        pEntry = NULL;
×
3832
        TAOS_RETURN(code);
×
3833
      }
3834
    }
3835

3836
    code = syncNodeAppend(ths, pEntry, pMsg);
10,445,421✔
3837
    return code;
10,445,011✔
3838
  } else {
3839
    syncEntryDestroy(pEntry);
×
3840
    pEntry = NULL;
×
3841
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3842
  }
3843
}
3844

3845
const char* syncStr(ESyncState state) {
2,174,548✔
3846
  switch (state) {
2,174,548!
3847
    case TAOS_SYNC_STATE_FOLLOWER:
464,613✔
3848
      return "follower";
464,613✔
3849
    case TAOS_SYNC_STATE_CANDIDATE:
10,269✔
3850
      return "candidate";
10,269✔
3851
    case TAOS_SYNC_STATE_LEADER:
1,676,433✔
3852
      return "leader";
1,676,433✔
3853
    case TAOS_SYNC_STATE_ERROR:
×
3854
      return "error";
×
3855
    case TAOS_SYNC_STATE_OFFLINE:
4,083✔
3856
      return "offline";
4,083✔
3857
    case TAOS_SYNC_STATE_LEARNER:
19,218✔
3858
      return "learner";
19,218✔
3859
    case TAOS_SYNC_STATE_ASSIGNED_LEADER:
×
3860
      return "assigned leader";
×
3861
    default:
×
3862
      return "unknown";
×
3863
  }
3864
}
3865

3866
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
1,678✔
3867
  for (int32_t i = 0; i < pNewCfg->totalReplicaNum; ++i) {
1,932!
3868
    SRaftId raftId = {
1,932✔
3869
        .addr = SYNC_ADDR(&pNewCfg->nodeInfo[i]),
1,932✔
3870
        .vgId = ths->vgId,
1,932✔
3871
    };
3872

3873
    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
1,932✔
3874
      pNewCfg->myIndex = i;
1,678✔
3875
      return 0;
1,678✔
3876
    }
3877
  }
3878

3879
  return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3880
}
3881

3882
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
10,446,450✔
3883
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
10,446,450!
3884
}
3885

3886
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
2,407,118✔
3887
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
3,960,546!
3888
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
3,960,546✔
3889
      return true;
2,407,113✔
3890
    }
3891
  }
UNCOV
3892
  return false;
×
3893
}
3894

3895
SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
34,176✔
3896
  SSyncSnapshotSender* pSender = NULL;
34,176✔
3897
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
131,338✔
3898
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
97,129✔
3899
      pSender = (ths->senders)[i];
34,222✔
3900
    }
3901
  }
3902
  return pSender;
34,209✔
3903
}
3904

3905
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
26,845✔
3906
  SSyncTimer* pTimer = NULL;
26,845✔
3907
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
112,759✔
3908
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
85,914✔
3909
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
26,845✔
3910
    }
3911
  }
3912
  return pTimer;
26,845✔
3913
}
3914

3915
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
131,248✔
3916
  SPeerState* pState = NULL;
131,248✔
3917
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
417,637✔
3918
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
286,391✔
3919
      pState = &((ths->peerStates)[i]);
131,248✔
3920
    }
3921
  }
3922
  return pState;
131,246✔
3923
}
3924

3925
#ifdef BUILD_NO_CALL
3926
bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
3927
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
3928
  if (pState == NULL) {
3929
    sError("vgId:%d, replica maybe dropped", ths->vgId);
3930
    return false;
3931
  }
3932

3933
  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
3934
  int64_t   tsNow = taosGetTimestampMs();
3935

3936
  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
3937
    return false;
3938
  }
3939

3940
  return true;
3941
}
3942

3943
bool syncNodeCanChange(SSyncNode* pSyncNode) {
3944
  if (pSyncNode->changing) {
3945
    sError("sync cannot change");
3946
    return false;
3947
  }
3948

3949
  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
3950
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
3951
    if (pSyncNode->commitIndex != lastIndex) {
3952
      sError("sync cannot change2");
3953
      return false;
3954
    }
3955
  }
3956

3957
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
3958
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
3959
    if (pSender != NULL && pSender->start) {
3960
      sError("sync cannot change3");
3961
      return false;
3962
    }
3963
  }
3964

3965
  return true;
3966
}
3967
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc