• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4541

19 Jul 2025 01:13AM UTC coverage: 56.753% (-1.6%) from 58.31%
#4541

push

travis-ci

web-flow
fix: subquery memleak (#32024)

124299 of 282344 branches covered (44.02%)

Branch coverage included in aggregate %.

181106 of 255787 relevant lines covered (70.8%)

24937406.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.52
/source/libs/sync/src/syncMain.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "sync.h"
18
#include "syncAppendEntries.h"
19
#include "syncAppendEntriesReply.h"
20
#include "syncCommit.h"
21
#include "syncElection.h"
22
#include "syncEnv.h"
23
#include "syncIndexMgr.h"
24
#include "syncInt.h"
25
#include "syncMessage.h"
26
#include "syncPipeline.h"
27
#include "syncRaftCfg.h"
28
#include "syncRaftLog.h"
29
#include "syncRaftStore.h"
30
#include "syncReplication.h"
31
#include "syncRequestVote.h"
32
#include "syncRequestVoteReply.h"
33
#include "syncRespMgr.h"
34
#include "syncSnapshot.h"
35
#include "syncTimeout.h"
36
#include "syncUtil.h"
37
#include "syncVoteMgr.h"
38
#include "tglobal.h"
39
#include "tmisce.h"
40
#include "tref.h"
41

42
static void    syncNodeEqPingTimer(void* param, void* tmrId);
43
static void    syncNodeEqElectTimer(void* param, void* tmrId);
44
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
45
static int32_t syncNodeAppendNoop(SSyncNode* ths);
46
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
47
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
48
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
49
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
50
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
51
static int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
52
static bool    syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
53
static int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
54
static bool    syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);
55

56
static bool    syncNodeCanChange(SSyncNode* pSyncNode);
57
static int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
58
static int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
59
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);
60

61
static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
62

63
int64_t syncOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
17,546✔
64
  sInfo("vgId:%d, start to open sync", pSyncInfo->vgId);
17,546✔
65
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo, vnodeVersion);
17,546✔
66
  if (pSyncNode == NULL) {
17,551!
67
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
×
68
    return -1;
×
69
  }
70

71
  pSyncNode->rid = syncNodeAdd(pSyncNode);
17,551✔
72
  if (pSyncNode->rid < 0) {
17,551!
73
    syncNodeClose(pSyncNode);
×
74
    return -1;
×
75
  }
76

77
  pSyncNode->pingBaseLine = pSyncInfo->pingMs;
17,551✔
78
  pSyncNode->pingTimerMS = pSyncInfo->pingMs;
17,551✔
79
  pSyncNode->electBaseLine = pSyncInfo->electMs;
17,551✔
80
  pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
17,551✔
81
  pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
17,551✔
82
  pSyncNode->msgcb = pSyncInfo->msgcb;
17,551✔
83
  sInfo("vgId:%d, sync opened", pSyncInfo->vgId);
17,551✔
84
  return pSyncNode->rid;
17,551✔
85
}
86

87
int32_t syncStart(int64_t rid) {
17,550✔
88
  int32_t    code = 0;
17,550✔
89
  int32_t    vgId = 0;
17,550✔
90
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
17,550✔
91
  if (pSyncNode == NULL) {
17,550!
92
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
93
    if (terrno != 0) code = terrno;
×
94
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
95
    TAOS_RETURN(code);
×
96
  }
97
  vgId = pSyncNode->vgId;
17,550✔
98
  sInfo("vgId:%d, begin to start sync", pSyncNode->vgId);
17,550✔
99

100
  if ((code = syncNodeRestore(pSyncNode)) < 0) {
17,550!
101
    sError("vgId:%d, failed to restore sync log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
102
    goto _err;
×
103
  }
104
  sInfo("vgId:%d, sync node restore is executed", pSyncNode->vgId);
17,549✔
105

106
  if ((code = syncNodeStart(pSyncNode)) < 0) {
17,549!
107
    sError("vgId:%d, failed to start sync node since %s", pSyncNode->vgId, tstrerror(code));
×
108
    goto _err;
×
109
  }
110
  sInfo("vgId:%d, sync node start is executed", pSyncNode->vgId);
17,550✔
111

112
  syncNodeRelease(pSyncNode);
17,550✔
113

114
  sInfo("vgId:%d, sync started", vgId);
17,550✔
115

116
  TAOS_RETURN(code);
17,550✔
117

118
_err:
×
119
  syncNodeRelease(pSyncNode);
×
120
  TAOS_RETURN(code);
×
121
}
122

123
int32_t syncNodeGetConfig(int64_t rid, SSyncCfg* cfg) {
34,746✔
124
  int32_t    code = 0;
34,746✔
125
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
34,746✔
126

127
  if (pSyncNode == NULL) {
34,748!
128
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
129
    if (terrno != 0) code = terrno;
×
130
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
131
    TAOS_RETURN(code);
×
132
  }
133

134
  *cfg = pSyncNode->raftCfg.cfg;
34,748✔
135

136
  syncNodeRelease(pSyncNode);
34,748✔
137

138
  return 0;
34,745✔
139
}
140

141
void syncStop(int64_t rid) {
17,550✔
142
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
17,550✔
143
  if (pSyncNode != NULL) {
17,550!
144
    pSyncNode->isStart = false;
17,550✔
145
    syncNodeRelease(pSyncNode);
17,550✔
146
    syncNodeRemove(rid);
17,550✔
147
  }
148
}
17,549✔
149

150
void syncPreStop(int64_t rid) {
17,550✔
151
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
17,550✔
152
  if (pSyncNode != NULL) {
17,550!
153
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
17,550!
154
      sInfo("vgId:%d, stop snapshot receiver", pSyncNode->vgId);
×
155
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
156
    }
157
    syncNodePreClose(pSyncNode);
17,550✔
158
    syncNodeRelease(pSyncNode);
17,550✔
159
  }
160
}
17,549✔
161

162
void syncPostStop(int64_t rid) {
15,061✔
163
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
15,061✔
164
  if (pSyncNode != NULL) {
15,061!
165
    syncNodePostClose(pSyncNode);
15,061✔
166
    syncNodeRelease(pSyncNode);
15,061✔
167
  }
168
}
15,061✔
169

170
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
2,313✔
171
  if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
2,313!
172
  return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
2,313!
173
}
174

175
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
2,459✔
176
  int32_t    code = 0;
2,459✔
177
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
2,459✔
178
  if (pSyncNode == NULL) {
2,459!
179
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
180
    if (terrno != 0) code = terrno;
×
181
    TAOS_RETURN(code);
×
182
  }
183

184
  if (pSyncNode->raftCfg.lastConfigIndex >= pNewCfg->lastIndex) {
2,459✔
185
    syncNodeRelease(pSyncNode);
146✔
186
    sInfo("vgId:%d, no need Reconfig, current index:%" PRId64 ", new index:%" PRId64, pSyncNode->vgId,
146!
187
          pSyncNode->raftCfg.lastConfigIndex, pNewCfg->lastIndex);
188
    return 0;
146✔
189
  }
190

191
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
2,313!
192
    syncNodeRelease(pSyncNode);
×
193
    code = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
×
194
    sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
×
195
    TAOS_RETURN(code);
×
196
  }
197

198
  TAOS_CHECK_RETURN(syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg));
2,313!
199

200
  if (syncNodeDoConfigChange(pSyncNode, pNewCfg, pNewCfg->lastIndex) != 0) {
2,313!
201
    code = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
×
202
    sError("vgId:%d, failed to reconfig since do change error", pSyncNode->vgId);
×
203
    TAOS_RETURN(code);
×
204
  }
205

206
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER || pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
2,313!
207
    // TODO check return value
208
    TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
2,082!
209

210
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
33,312✔
211
      TAOS_CHECK_RETURN(syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]));
31,230!
212
    }
213

214
    TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
2,082!
215
    // syncNodeReplicate(pSyncNode);
216
  }
217

218
  syncNodeRelease(pSyncNode);
2,313✔
219
  TAOS_RETURN(code);
2,313✔
220
}
221

222
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
15,309,307✔
223
  int32_t code = -1;
15,309,307✔
224
  if (!syncIsInit()) {
15,309,307!
225
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
226
    if (terrno != 0) code = terrno;
×
227
    TAOS_RETURN(code);
×
228
  }
229

230
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
15,309,225✔
231
  if (pSyncNode == NULL) {
15,310,084✔
232
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
607✔
233
    if (terrno != 0) code = terrno;
607!
234
    TAOS_RETURN(code);
×
235
  }
236

237
  switch (pMsg->msgType) {
15,309,477!
238
    case TDMT_SYNC_HEARTBEAT:
47,187✔
239
      code = syncNodeOnHeartbeat(pSyncNode, pMsg);
47,187✔
240
      break;
47,187✔
241
    case TDMT_SYNC_HEARTBEAT_REPLY:
46,519✔
242
      code = syncNodeOnHeartbeatReply(pSyncNode, pMsg);
46,519✔
243
      break;
46,519✔
244
    case TDMT_SYNC_TIMEOUT:
254,574✔
245
      code = syncNodeOnTimeout(pSyncNode, pMsg);
254,574✔
246
      break;
254,654✔
247
    case TDMT_SYNC_TIMEOUT_ELECTION:
1,787✔
248
      code = syncNodeOnTimeout(pSyncNode, pMsg);
1,787✔
249
      break;
1,787✔
250
    case TDMT_SYNC_CLIENT_REQUEST:
306,238✔
251
      code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
306,238✔
252
      break;
306,249✔
253
    case TDMT_SYNC_REQUEST_VOTE:
2,968✔
254
      code = syncNodeOnRequestVote(pSyncNode, pMsg);
2,968✔
255
      break;
2,968✔
256
    case TDMT_SYNC_REQUEST_VOTE_REPLY:
2,865✔
257
      code = syncNodeOnRequestVoteReply(pSyncNode, pMsg);
2,865✔
258
      break;
2,865✔
259
    case TDMT_SYNC_APPEND_ENTRIES:
7,205,984✔
260
      code = syncNodeOnAppendEntries(pSyncNode, pMsg);
7,205,984✔
261
      break;
7,205,987✔
262
    case TDMT_SYNC_APPEND_ENTRIES_REPLY:
7,200,748✔
263
      code = syncNodeOnAppendEntriesReply(pSyncNode, pMsg);
7,200,748✔
264
      break;
7,200,745✔
265
    case TDMT_SYNC_SNAPSHOT_SEND:
96,888✔
266
      code = syncNodeOnSnapshot(pSyncNode, pMsg);
96,888✔
267
      break;
96,888✔
268
    case TDMT_SYNC_SNAPSHOT_RSP:
96,919✔
269
      code = syncNodeOnSnapshotRsp(pSyncNode, pMsg);
96,919✔
270
      break;
96,919✔
271
    case TDMT_SYNC_LOCAL_CMD:
46,786✔
272
      code = syncNodeOnLocalCmd(pSyncNode, pMsg);
46,786✔
273
      break;
46,786✔
274
    case TDMT_SYNC_FORCE_FOLLOWER:
14✔
275
      code = syncForceBecomeFollower(pSyncNode, pMsg);
14✔
276
      break;
14✔
277
    case TDMT_SYNC_SET_ASSIGNED_LEADER:
×
278
      code = syncBecomeAssignedLeader(pSyncNode, pMsg);
×
279
      break;
×
280
    default:
×
281
      code = TSDB_CODE_MSG_NOT_PROCESSED;
×
282
  }
283

284
  syncNodeRelease(pSyncNode);
15,309,568✔
285
  if (code != 0) {
15,309,563✔
286
    sDebug("vgId:%d, failed to process sync msg:%p type:%s since %s", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
161!
287
           tstrerror(code));
288
  }
289
  TAOS_RETURN(code);
15,309,563✔
290
}
291

292
int32_t syncLeaderTransfer(int64_t rid) {
17,550✔
293
  int32_t    code = 0;
17,550✔
294
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
17,550✔
295
  if (pSyncNode == NULL) {
17,550!
296
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
297
    if (terrno != 0) code = terrno;
×
298
    TAOS_RETURN(code);
×
299
  }
300

301
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
17,550✔
302
  syncNodeRelease(pSyncNode);
17,549✔
303
  return ret;
17,550✔
304
}
305

306
int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
14✔
307
  SRaftId id = {0};
14✔
308
  syncNodeBecomeFollower(ths, id, "force election");
14✔
309

310
  SRpcMsg rsp = {
14✔
311
      .code = 0,
312
      .pCont = pRpcMsg->info.rsp,
14✔
313
      .contLen = pRpcMsg->info.rspLen,
14✔
314
      .info = pRpcMsg->info,
315
  };
316
  tmsgSendRsp(&rsp);
14✔
317

318
  return 0;
14✔
319
}
320

321
int32_t syncBecomeAssignedLeader(SSyncNode* ths, SRpcMsg* pRpcMsg) {
×
322
  int32_t code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
323
  void*   pHead = NULL;
×
324
  int32_t contLen = 0;
×
325

326
  SVArbSetAssignedLeaderReq req = {0};
×
327
  if (tDeserializeSVArbSetAssignedLeaderReq((char*)pRpcMsg->pCont + sizeof(SMsgHead), pRpcMsg->contLen, &req) != 0) {
×
328
    sError("vgId:%d, failed to deserialize SVArbSetAssignedLeaderReq", ths->vgId);
×
329
    code = TSDB_CODE_INVALID_MSG;
×
330
    goto _OVER;
×
331
  }
332

333
  if (ths->arbTerm > req.arbTerm) {
×
334
    sInfo("vgId:%d, skip to set assigned leader, msg with lower term, local:%" PRId64 "msg:%" PRId64, ths->vgId,
×
335
          ths->arbTerm, req.arbTerm);
336
    goto _OVER;
×
337
  }
338

339
  ths->arbTerm = TMAX(req.arbTerm, ths->arbTerm);
×
340

341
  if (!req.force) {
×
342
    if (strncmp(req.memberToken, ths->arbToken, TSDB_ARB_TOKEN_SIZE) != 0) {
×
343
      sInfo("vgId:%d, skip to set assigned leader, token mismatch, local:%s, msg:%s", ths->vgId, ths->arbToken,
×
344
            req.memberToken);
345
      goto _OVER;
×
346
    }
347
  }
348

349
  if (ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
×
350
    code = TSDB_CODE_SUCCESS;
×
351
    raftStoreNextTerm(ths);
×
352
    if (terrno != TSDB_CODE_SUCCESS) {
×
353
      code = terrno;
×
354
      sError("vgId:%d, failed to set next term since:%s", ths->vgId, tstrerror(code));
×
355
      goto _OVER;
×
356
    }
357
    syncNodeBecomeAssignedLeader(ths);
×
358

359
    if ((code = syncNodeAppendNoop(ths)) < 0) {
×
360
      sError("vgId:%d, assigned leader failed to append noop entry since %s", ths->vgId, tstrerror(code));
×
361
    }
362
  }
363

364
  SVArbSetAssignedLeaderRsp rsp = {0};
×
365
  rsp.arbToken = req.arbToken;
×
366
  rsp.memberToken = req.memberToken;
×
367
  rsp.vgId = ths->vgId;
×
368

369
  contLen = tSerializeSVArbSetAssignedLeaderRsp(NULL, 0, &rsp);
×
370
  if (contLen <= 0) {
×
371
    code = TSDB_CODE_OUT_OF_MEMORY;
×
372
    sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId);
×
373
    goto _OVER;
×
374
  }
375
  pHead = rpcMallocCont(contLen);
×
376
  if (!pHead) {
×
377
    code = terrno;
×
378
    sError("vgId:%d, failed to malloc memory for SVArbSetAssignedLeaderRsp", ths->vgId);
×
379
    goto _OVER;
×
380
  }
381
  if (tSerializeSVArbSetAssignedLeaderRsp(pHead, contLen, &rsp) <= 0) {
×
382
    code = TSDB_CODE_OUT_OF_MEMORY;
×
383
    sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId);
×
384
    rpcFreeCont(pHead);
×
385
    goto _OVER;
×
386
  }
387

388
  code = TSDB_CODE_SUCCESS;
×
389

390
_OVER:;
×
391
  SRpcMsg rspMsg = {
×
392
      .code = code,
393
      .pCont = pHead,
394
      .contLen = contLen,
395
      .info = pRpcMsg->info,
396
  };
397

398
  tmsgSendRsp(&rspMsg);
×
399

400
  tFreeSVArbSetAssignedLeaderReq(&req);
×
401
  TAOS_RETURN(code);
×
402
}
403

404
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
×
405
  int32_t    code = 0;
×
406
  SSyncNode* pNode = syncNodeAcquire(rid);
×
407
  if (pNode == NULL) {
×
408
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
409
    if (terrno != 0) code = terrno;
×
410
    TAOS_RETURN(code);
×
411
  }
412

413
  SRpcMsg rpcMsg = {0, .info.notFreeAhandle = 1};
×
414
  int32_t ret = syncRespMgrGetAndDel(pNode->pSyncRespMgr, seq, &rpcMsg.info);
×
415
  rpcMsg.code = TSDB_CODE_SYN_TIMEOUT;
×
416

417
  syncNodeRelease(pNode);
×
418
  if (ret == 1) {
×
419
    sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle);
×
420
    code = rpcSendResponse(&rpcMsg);
×
421
    return code;
×
422
  } else {
423
    sError("no message handle to send timeout response, seq:%" PRId64, seq);
×
424
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
425
  }
426
}
427

428
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
52,826✔
429
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;
52,826✔
430

431
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
158,157✔
432
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
105,331✔
433
    if (minMatchIndex == SYNC_INDEX_INVALID) {
105,331✔
434
      minMatchIndex = matchIndex;
57,901✔
435
    } else if (matchIndex > 0 && matchIndex < minMatchIndex) {
47,430✔
436
      minMatchIndex = matchIndex;
1,886✔
437
    }
438
  }
439
  return minMatchIndex;
52,826✔
440
}
441

442
static SyncIndex syncLogRetentionIndex(SSyncNode* pSyncNode, int64_t bytes) {
1,042✔
443
  return pSyncNode->pLogStore->syncLogIndexRetention(pSyncNode->pLogStore, bytes);
1,042✔
444
}
445

446
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
39,360✔
447
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
39,360✔
448
  int32_t    code = 0;
39,360✔
449
  if (pSyncNode == NULL) {
39,360✔
450
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
8✔
451
    if (terrno != 0) code = terrno;
8!
452
    sError("sync begin snapshot error");
8!
453
    TAOS_RETURN(code);
8✔
454
  }
455

456
  SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
39,352✔
457
  SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
39,352✔
458
  bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);
39,352✔
459

460
  if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
39,351!
461
    sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
90!
462
    syncNodeRelease(pSyncNode);
90✔
463
    return 0;
90✔
464
  }
465

466
  int64_t logRetention = 0;
39,261✔
467

468
  if (syncNodeIsMnode(pSyncNode)) {
39,261✔
469
    // mnode
470
    logRetention = tsMndLogRetention;
4,601✔
471
  } else {
472
    // vnode
473
    if (pSyncNode->replicaNum > 1) {
34,661✔
474
      logRetention = SYNC_VNODE_LOG_RETENTION;
491✔
475
    }
476
  }
477

478
  if (pSyncNode->totalReplicaNum > 1) {
39,262✔
479
    if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER &&
1,052✔
480
        pSyncNode->state != TAOS_SYNC_STATE_LEARNER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
209!
481
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate or unknown state, do not delete wal",
10!
482
              lastApplyIndex);
483
      syncNodeRelease(pSyncNode);
10✔
484
      return 0;
10✔
485
    }
486
    SyncIndex retentionIndex =
1,042✔
487
        TMAX(pSyncNode->minMatchIndex, syncLogRetentionIndex(pSyncNode, SYNC_WAL_LOG_RETENTION_SIZE));
1,042✔
488
    logRetention += TMAX(0, lastApplyIndex - retentionIndex);
1,042✔
489
  }
490

491
_DEL_WAL:
38,210✔
492

493
  do {
494
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
39,252✔
495
    SyncIndex          snapshotVer = walGetSnapshotVer(pData->pWal);
39,252✔
496
    SyncIndex          walCommitVer = walGetCommittedVer(pData->pWal);
39,252✔
497
    SyncIndex          wallastVer = walGetLastVer(pData->pWal);
39,252✔
498
    if (lastApplyIndex <= walCommitVer) {
39,252!
499
      SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);
39,252✔
500

501
      if (snapshottingIndex == SYNC_INDEX_INVALID) {
39,252!
502
        atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
39,252✔
503
        pSyncNode->snapshottingTime = taosGetTimestampMs();
39,252✔
504

505
        code = walBeginSnapshot(pData->pWal, lastApplyIndex, logRetention);
39,252✔
506
        if (code == 0) {
39,252!
507
          sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
39,252✔
508
                  pSyncNode->snapshottingIndex, lastApplyIndex);
509
        } else {
510
          sNError(pSyncNode, "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64,
×
511
                  terrstr(), pSyncNode->snapshottingIndex, lastApplyIndex);
512
          atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
×
513
        }
514

515
      } else {
516
        sNTrace(pSyncNode, "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64,
×
517
                snapshottingIndex, lastApplyIndex);
518
      }
519
    }
520
  } while (0);
521

522
  syncNodeRelease(pSyncNode);
39,252✔
523
  TAOS_RETURN(code);
39,252✔
524
}
525

526
int32_t syncEndSnapshot(int64_t rid) {
39,349✔
527
  int32_t    code = 0;
39,349✔
528
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
39,349✔
529
  if (pSyncNode == NULL) {
39,351!
530
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
531
    if (terrno != 0) code = terrno;
×
532
    sError("sync end snapshot error");
×
533
    TAOS_RETURN(code);
×
534
  }
535

536
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
39,351✔
537
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
39,250✔
538
    code = walEndSnapshot(pData->pWal);
39,250✔
539
    if (code != 0) {
39,252!
540
      sNError(pSyncNode, "wal snapshot end error since:%s", tstrerror(code));
×
541
      syncNodeRelease(pSyncNode);
×
542
      TAOS_RETURN(code);
×
543
    } else {
544
      sNTrace(pSyncNode, "wal snapshot end, index:%" PRId64, atomic_load_64(&pSyncNode->snapshottingIndex));
39,252✔
545
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
39,252✔
546
    }
547
  }
548

549
  syncNodeRelease(pSyncNode);
39,352✔
550
  TAOS_RETURN(code);
39,352✔
551
}
552

553
bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
25,263,344✔
554
  if (pSyncNode == NULL) {
25,263,344!
555
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
556
    sError("sync ready for read error");
×
557
    return false;
×
558
  }
559

560
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
25,263,344!
561
    terrno = TSDB_CODE_SYN_NOT_LEADER;
74,019✔
562
    return false;
74,019✔
563
  }
564

565
  if (!pSyncNode->restoreFinish) {
25,189,325✔
566
    terrno = TSDB_CODE_SYN_RESTORING;
508✔
567
    return false;
508✔
568
  }
569

570
  return true;
25,188,817✔
571
}
572

573
bool syncIsReadyForRead(int64_t rid) {
24,035,831✔
574
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
24,035,831✔
575
  if (pSyncNode == NULL) {
24,043,387!
576
    sError("sync ready for read error");
×
577
    return false;
×
578
  }
579

580
  bool ready = syncNodeIsReadyForRead(pSyncNode);
24,043,387✔
581

582
  syncNodeRelease(pSyncNode);
24,044,195✔
583
  return ready;
24,039,459✔
584
}
585

586
#ifdef BUILD_NO_CALL
587
bool syncSnapshotSending(int64_t rid) {
588
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
589
  if (pSyncNode == NULL) {
590
    return false;
591
  }
592

593
  bool b = syncNodeSnapshotSending(pSyncNode);
594
  syncNodeRelease(pSyncNode);
595
  return b;
596
}
597

598
bool syncSnapshotRecving(int64_t rid) {
599
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
600
  if (pSyncNode == NULL) {
601
    return false;
602
  }
603

604
  bool b = syncNodeSnapshotRecving(pSyncNode);
605
  syncNodeRelease(pSyncNode);
606
  return b;
607
}
608
#endif
609

610
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
17,550✔
611
  if (pSyncNode->peersNum == 0) {
17,550✔
612
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
12,858✔
613
    return 0;
12,858✔
614
  }
615

616
  int32_t ret = 0;
4,692✔
617
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->replicaNum > 1) {
4,692✔
618
    SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
1,466✔
619
    if (pSyncNode->peersNum == 2) {
1,466✔
620
      SyncIndex matchIndex0 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
959✔
621
      SyncIndex matchIndex1 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[1]));
959✔
622
      if (matchIndex1 > matchIndex0) {
959✔
623
        newLeader = (pSyncNode->peersNodeInfo)[1];
73✔
624
      }
625
    }
626
    ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
1,466✔
627
  }
628

629
  return ret;
4,692✔
630
}
631

632
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
1,466✔
633
  if (pSyncNode->replicaNum == 1) {
1,466!
634
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
×
635
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
636
  }
637

638
  sNTrace(pSyncNode, "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
1,466!
639

640
  SRpcMsg rpcMsg = {0};
1,466✔
641
  TAOS_CHECK_RETURN(syncBuildLeaderTransfer(&rpcMsg, pSyncNode->vgId));
1,466!
642

643
  SyncLeaderTransfer* pMsg = rpcMsg.pCont;
1,466✔
644
  pMsg->newLeaderId.addr = SYNC_ADDR(&newLeader);
1,466✔
645
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
1,466✔
646
  pMsg->newNodeInfo = newLeader;
1,466✔
647

648
  int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false, NULL);
1,466✔
649
  rpcFreeCont(rpcMsg.pCont);
1,466✔
650
  return ret;
1,466✔
651
}
652

653
SSyncState syncGetState(int64_t rid) {
7,692,920✔
654
  SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
7,692,920✔
655

656
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
7,692,920✔
657
  if (pSyncNode != NULL) {
7,694,377✔
658
    state.state = pSyncNode->state;
7,694,274✔
659
    state.roleTimeMs = pSyncNode->roleTimeMs;
7,694,274✔
660
    state.startTimeMs = pSyncNode->startTime;
7,694,274✔
661
    state.restored = pSyncNode->restoreFinish;
7,694,274✔
662
    if (pSyncNode->vgId != 1) {
7,694,274✔
663
      state.canRead = syncNodeIsReadyForRead(pSyncNode);
1,225,808✔
664
    } else {
665
      state.canRead = state.restored;
6,468,466✔
666
    }
667
    /*
668
    double progress = 0;
669
    if(pSyncNode->pLogBuf->totalIndex > 0 && pSyncNode->pLogBuf->commitIndex > 0){
670
      progress = (double)pSyncNode->pLogBuf->commitIndex/(double)pSyncNode->pLogBuf->totalIndex;
671
      state.progress = (int32_t)(progress * 100);
672
    }
673
    else{
674
      state.progress = -1;
675
    }
676
    sDebug("vgId:%d, learner progress state, commitIndex:%" PRId64 " totalIndex:%" PRId64 ", "
677
            "progress:%lf, progress:%d",
678
          pSyncNode->vgId,
679
         pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->totalIndex, progress, state.progress);
680
    */
681
    state.term = raftStoreGetTerm(pSyncNode);
7,694,275✔
682
    syncNodeRelease(pSyncNode);
7,694,446✔
683
  }
684

685
  return state;
7,694,310✔
686
}
687

688
SSyncMetrics syncGetMetrics(int64_t rid) {
×
689
  SSyncMetrics metrics = {0};
×
690

691
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
692
  if (pSyncNode != NULL) {
×
693
    sDebug("vgId:%d, sync get metrics, wal_write_bytes:%" PRId64 ", wal_write_time:%" PRId64, pSyncNode->vgId,
×
694
           pSyncNode->wal_write_bytes, pSyncNode->wal_write_time);
695
    metrics.wal_write_bytes = atomic_load_64(&pSyncNode->wal_write_bytes);
×
696
    metrics.wal_write_time = atomic_load_64(&pSyncNode->wal_write_time);
×
697
    syncNodeRelease(pSyncNode);
×
698
  }
699
  return metrics;
×
700
}
701

702
void syncResetMetrics(int64_t rid, const SSyncMetrics* pOldMetrics) {
×
703
  if (pOldMetrics == NULL) return;
×
704

705
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
706
  if (pSyncNode != NULL) {
×
707
    // Atomically subtract the old metrics values from current metrics
708
    (void)atomic_sub_fetch_64(&pSyncNode->wal_write_bytes, pOldMetrics->wal_write_bytes);
×
709
    (void)atomic_sub_fetch_64(&pSyncNode->wal_write_time, pOldMetrics->wal_write_time);
×
710
    syncNodeRelease(pSyncNode);
×
711
  }
712
}
713

714
void syncGetCommitIndex(int64_t rid, int64_t* syncCommitIndex) {
1,210,701✔
715
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
1,210,701✔
716
  if (pSyncNode != NULL) {
1,210,701!
717
    *syncCommitIndex = pSyncNode->commitIndex;
1,210,701✔
718
    syncNodeRelease(pSyncNode);
1,210,701✔
719
  }
720
}
1,210,701✔
721

722
int32_t syncGetArbToken(int64_t rid, char* outToken) {
67,288✔
723
  int32_t    code = 0;
67,288✔
724
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
67,288✔
725
  if (pSyncNode == NULL) {
67,288!
726
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
727
    if (terrno != 0) code = terrno;
×
728
    TAOS_RETURN(code);
×
729
  }
730

731
  memset(outToken, 0, TSDB_ARB_TOKEN_SIZE);
67,288✔
732
  (void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
67,288✔
733
  tstrncpy(outToken, pSyncNode->arbToken, TSDB_ARB_TOKEN_SIZE);
67,288✔
734
  (void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
67,288✔
735

736
  syncNodeRelease(pSyncNode);
67,288✔
737
  TAOS_RETURN(code);
67,288✔
738
}
739

740
int32_t syncCheckSynced(int64_t rid) {
4✔
741
  int32_t    code = 0;
4✔
742
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
4✔
743
  if (pSyncNode == NULL) {
4!
744
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
745
    if (terrno != 0) code = terrno;
×
746
    TAOS_RETURN(code);
×
747
  }
748

749
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
4!
750
    code = TSDB_CODE_VND_ARB_NOT_SYNCED;
×
751
    syncNodeRelease(pSyncNode);
×
752
    TAOS_RETURN(code);
×
753
  }
754

755
  bool isSync = pSyncNode->commitIndex >= pSyncNode->assignedCommitIndex;
4✔
756
  code = (isSync ? TSDB_CODE_SUCCESS : TSDB_CODE_VND_ARB_NOT_SYNCED);
4!
757

758
  syncNodeRelease(pSyncNode);
4✔
759
  TAOS_RETURN(code);
4✔
760
}
761

762
int32_t syncUpdateArbTerm(int64_t rid, SyncTerm arbTerm) {
134✔
763
  int32_t    code = 0;
134✔
764
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
134✔
765
  if (pSyncNode == NULL) {
134!
766
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
767
    if (terrno != 0) code = terrno;
×
768
    TAOS_RETURN(code);
×
769
  }
770

771
  pSyncNode->arbTerm = TMAX(arbTerm, pSyncNode->arbTerm);
134✔
772
  syncNodeRelease(pSyncNode);
134✔
773
  TAOS_RETURN(code);
134✔
774
}
775

776
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
8,002,614✔
777
  if (pSyncNode->raftCfg.configIndexCount < 1) {
8,002,614!
778
    sError("vgId:%d, failed get snapshot config index, configIndexCount:%d", pSyncNode->vgId,
×
779
           pSyncNode->raftCfg.configIndexCount);
780
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
781
    return -2;
×
782
  }
783
  SyncIndex lastIndex = (pSyncNode->raftCfg.configIndexArr)[0];
8,002,614✔
784

785
  for (int32_t i = 0; i < pSyncNode->raftCfg.configIndexCount; ++i) {
16,046,807✔
786
    if ((pSyncNode->raftCfg.configIndexArr)[i] > lastIndex &&
8,044,193✔
787
        (pSyncNode->raftCfg.configIndexArr)[i] <= snapshotLastApplyIndex) {
41,580✔
788
      lastIndex = (pSyncNode->raftCfg.configIndexArr)[i];
41,504✔
789
    }
790
  }
791
  sTrace("vgId:%d, index:%" PRId64 ", get last snapshot config index:%" PRId64, pSyncNode->vgId, snapshotLastApplyIndex,
8,002,614✔
792
         lastIndex);
793

794
  return lastIndex;
8,002,616✔
795
}
796

797
static SRaftId syncGetRaftIdByEp(SSyncNode* pSyncNode, const SEp* pEp) {
830,437✔
798
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1,064,290✔
799
    if (strcmp(pEp->fqdn, (pSyncNode->peersNodeInfo)[i].nodeFqdn) == 0 &&
385,782✔
800
        pEp->port == (pSyncNode->peersNodeInfo)[i].nodePort) {
385,744✔
801
      return pSyncNode->peersId[i];
151,929✔
802
    }
803
  }
804
  return EMPTY_RAFT_ID;
678,508✔
805
}
806

807
static void epsetToString(const SEpSet* pEpSet, char* buffer, size_t bufferSize) {
678,699✔
808
  if (pEpSet == NULL || buffer == NULL) {
678,699!
809
    snprintf(buffer, bufferSize, "EpSet is NULL");
×
810
    return;
×
811
  }
812

813
  size_t offset = 0;
678,732✔
814
  offset += snprintf(buffer + offset, bufferSize - offset, "EpSet: [");
678,732✔
815

816
  for (int i = 0; i < pEpSet->numOfEps; ++i) {
1,509,162✔
817
    if (offset >= bufferSize) break;
830,430!
818
    offset += snprintf(buffer + offset, bufferSize - offset, "%s:%d%s", pEpSet->eps[i].fqdn, pEpSet->eps[i].port,
830,430✔
819
                       (i + 1 < pEpSet->numOfEps) ? ", " : "");
830,430✔
820
  }
821

822
  if (offset < bufferSize) {
678,732!
823
    snprintf(buffer + offset, bufferSize - offset, "]");
678,771✔
824
  }
825
}
826

827
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
678,721✔
828
  pEpSet->numOfEps = 0;
678,721✔
829

830
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
678,721✔
831
  if (pSyncNode == NULL) return;
678,792!
832

833
  int index = -1;
678,792✔
834

835
  sDebug("vgId:%d, sync get retry epset, leaderCache:%" PRIx64 ", leaderCacheEp.fqdn:%s, leaderCacheEp.port:%d",
678,792✔
836
         pSyncNode->vgId, pSyncNode->leaderCache.addr, pSyncNode->leaderCacheEp.fqdn, pSyncNode->leaderCacheEp.port);
837
  int j = 0;
678,785✔
838
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
1,511,457✔
839
    if (pSyncNode->raftCfg.cfg.nodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) continue;
832,697✔
840
    SEp* pEp = &pEpSet->eps[j];
830,481✔
841
    tstrncpy(pEp->fqdn, pSyncNode->raftCfg.cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
830,481✔
842
    pEp->port = (pSyncNode->raftCfg.cfg.nodeInfo)[i].nodePort;
830,481✔
843
    pEpSet->numOfEps++;
830,481✔
844
    SRaftId id = syncGetRaftIdByEp(pSyncNode, pEp);
830,481✔
845
    sDebug("vgId:%d, sync get retry epset, index:%d id:%" PRIx64 " %s:%d", pSyncNode->vgId, i, id.addr, pEp->fqdn,
830,461✔
846
           pEp->port);
847
    if (pEp->port == pSyncNode->leaderCacheEp.port &&
830,456✔
848
        strncmp(pEp->fqdn, pSyncNode->leaderCacheEp.fqdn, TSDB_FQDN_LEN) == 0 /*&&
636,076✔
849
        id.vgId == pSyncNode->leaderCache.vgId && id.addr != 0 && id.vgId != 0*/) {
850
      index = j;
636,035✔
851
    }
852
    j++;
830,456✔
853
  }
854
  if (pEpSet->numOfEps > 0) {
678,760✔
855
    if (index != -1) {
678,747✔
856
      pEpSet->inUse = index;
636,089✔
857
    } else {
858
      if (pSyncNode->myRaftId.addr == pSyncNode->leaderCache.addr &&
42,658!
859
          pSyncNode->myRaftId.vgId == pSyncNode->leaderCache.vgId) {
×
860
        pEpSet->inUse = pSyncNode->raftCfg.cfg.myIndex;
×
861
      } else {
862
        pEpSet->inUse = (pSyncNode->raftCfg.cfg.myIndex + 1) % pEpSet->numOfEps;
42,658✔
863
      }
864
    }
865
    // pEpSet->inUse = 0;
866
  }
867
  epsetSort(pEpSet);
678,760✔
868

869
  char buffer[1024];
870
  epsetToString(pEpSet, buffer, sizeof(buffer));
678,693✔
871
  sDebug("vgId:%d, sync get retry epset numOfEps:%d %s inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, buffer,
678,761✔
872
         pEpSet->inUse);
873
  syncNodeRelease(pSyncNode);
678,761✔
874
}
875

876
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
19,382,187✔
877
  int32_t    code = 0;
19,382,187✔
878
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
19,382,187✔
879
  if (pSyncNode == NULL) {
19,382,370!
880
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
881
    if (terrno != 0) code = terrno;
×
882
    sError("sync propose error");
×
883
    TAOS_RETURN(code);
×
884
  }
885

886
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak, seq);
19,382,370✔
887
  syncNodeRelease(pSyncNode);
19,381,941✔
888
  return ret;
19,382,274✔
889
}
890

891
int32_t syncCheckMember(int64_t rid) {
×
892
  int32_t    code = 0;
×
893
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
894
  if (pSyncNode == NULL) {
×
895
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
896
    if (terrno != 0) code = terrno;
×
897
    sError("sync propose error");
×
898
    TAOS_RETURN(code);
×
899
  }
900

901
  if (pSyncNode->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) {
×
902
    syncNodeRelease(pSyncNode);
×
903
    return TSDB_CODE_SYN_WRONG_ROLE;
×
904
  }
905

906
  syncNodeRelease(pSyncNode);
×
907
  return 0;
×
908
}
909

910
int32_t syncIsCatchUp(int64_t rid) {
8,067✔
911
  int32_t    code = 0;
8,067✔
912
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
8,067✔
913
  if (pSyncNode == NULL) {
8,067!
914
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
915
    if (terrno != 0) code = terrno;
×
916
    sError("sync Node Acquire error since %d", ERRNO);
×
917
    TAOS_RETURN(code);
×
918
  }
919

920
  int32_t isCatchUp = 0;
8,067✔
921
  if (pSyncNode->pLogBuf->totalIndex < 0 || pSyncNode->pLogBuf->commitIndex < 0 ||
8,067!
922
      pSyncNode->pLogBuf->totalIndex < pSyncNode->pLogBuf->commitIndex ||
2,318!
923
      pSyncNode->pLogBuf->totalIndex - pSyncNode->pLogBuf->commitIndex > SYNC_LEARNER_CATCHUP) {
2,318✔
924
    sInfo("vgId:%d, Not catch up, wait one second, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64,
7,677!
925
          pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex,
926
          pSyncNode->pLogBuf->matchIndex);
927
    isCatchUp = 0;
7,677✔
928
  } else {
929
    sInfo("vgId:%d, Catch up, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, pSyncNode->vgId,
390!
930
          pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->matchIndex);
931
    isCatchUp = 1;
390✔
932
  }
933

934
  syncNodeRelease(pSyncNode);
8,067✔
935
  return isCatchUp;
8,067✔
936
}
937

938
ESyncRole syncGetRole(int64_t rid) {
8,067✔
939
  int32_t    code = 0;
8,067✔
940
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
8,067✔
941
  if (pSyncNode == NULL) {
8,067!
942
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
943
    if (terrno != 0) code = terrno;
×
944
    sError("sync Node Acquire error since %d", ERRNO);
×
945
    TAOS_RETURN(code);
×
946
  }
947

948
  ESyncRole role = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole;
8,067✔
949

950
  syncNodeRelease(pSyncNode);
8,067✔
951
  return role;
8,067✔
952
}
953

954
int64_t syncGetTerm(int64_t rid) {
26,680✔
955
  int32_t    code = 0;
26,680✔
956
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
26,680✔
957
  if (pSyncNode == NULL) {
26,680!
958
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
959
    if (terrno != 0) code = terrno;
×
960
    sError("sync Node Acquire error since %d", ERRNO);
×
961
    TAOS_RETURN(code);
×
962
  }
963

964
  int64_t term = raftStoreGetTerm(pSyncNode);
26,680✔
965

966
  syncNodeRelease(pSyncNode);
26,680✔
967
  return term;
26,680✔
968
}
969

970
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
19,383,654✔
971
  int32_t code = 0;
19,383,654✔
972
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
19,383,654!
973
    code = TSDB_CODE_SYN_NOT_LEADER;
8,035✔
974
    sNWarn(pSyncNode, "sync propose not leader, type:%s", TMSG_INFO(pMsg->msgType));
8,035!
975
    TAOS_RETURN(code);
8,035✔
976
  }
977

978
  if (!pSyncNode->restoreFinish) {
19,375,619✔
979
    code = TSDB_CODE_SYN_PROPOSE_NOT_READY;
294✔
980
    sNWarn(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
294!
981
           TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
982
    TAOS_RETURN(code);
294✔
983
  }
984

985
  // heartbeat timeout
986
  if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER && syncNodeHeartbeatReplyTimeout(pSyncNode)) {
19,375,325!
987
    code = TSDB_CODE_SYN_PROPOSE_NOT_READY;
2✔
988
    sNError(pSyncNode, "failed to sync propose since heartbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
2!
989
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
990
    TAOS_RETURN(code);
2✔
991
  }
992

993
  // optimized one replica
994
  if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
19,375,278✔
995
    SyncIndex retIndex;
996
    int32_t   code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
19,067,215✔
997
    if (code >= 0) {
19,066,814!
998
      pMsg->info.conn.applyIndex = retIndex;
19,066,910✔
999
      pMsg->info.conn.applyTerm = raftStoreGetTerm(pSyncNode);
19,066,910✔
1000

1001
      // after raft member change, need to handle 1->2 switching point
1002
      // at this point, need to switch entry handling thread
1003
      if (pSyncNode->replicaNum == 1) {
19,067,316✔
1004
        sGDebug(&pMsg->info.traceId, "vgId:%d, index:%" PRId64 ", propose optimized msg type:%s", pSyncNode->vgId,
19,067,296!
1005
                retIndex, TMSG_INFO(pMsg->msgType));
1006
        return 1;
19,067,126✔
1007
      } else {
1008
        sGDebug(&pMsg->info.traceId,
20!
1009
                "vgId:%d, index:%" PRId64 ", propose optimized msg, return to normal, type:%s, handle:%p",
1010
                pSyncNode->vgId, retIndex, TMSG_INFO(pMsg->msgType), pMsg->info.handle);
1011
        return 0;
×
1012
      }
1013
    } else {
1014
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1015
      sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
×
1016
             TMSG_INFO(pMsg->msgType));
1017
      TAOS_RETURN(code);
×
1018
    }
1019
  } else {
1020
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
308,247✔
1021
    uint64_t  seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
308,233✔
1022
    SRpcMsg   rpcMsg = {.info.traceId = pMsg->info.traceId};
308,242✔
1023
    int32_t   code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
308,242✔
1024
    if (code != 0) {
308,240!
1025
      sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
×
1026
      code = syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
×
1027
      TAOS_RETURN(code);
×
1028
    }
1029

1030
    sGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, propose msg, type:%s", pSyncNode->vgId, pMsg,
308,240!
1031
            TMSG_INFO(pMsg->msgType));
1032
    code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
308,240✔
1033
    if (code != 0) {
308,239✔
1034
      sWarn("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
1,993!
1035
      TAOS_CHECK_RETURN(syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum));
1,993✔
1036
    }
1037

1038
    if (seq != NULL) *seq = seqNum;
308,207✔
1039
    TAOS_RETURN(code);
308,207✔
1040
  }
1041
}
1042

1043
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
294,493✔
1044
  pSyncTimer->pTimer = NULL;
294,493✔
1045
  pSyncTimer->counter = 0;
294,493✔
1046
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
294,493✔
1047
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
294,493✔
1048
  pSyncTimer->destId = destId;
294,493✔
1049
  pSyncTimer->timeStamp = taosGetTimestampMs();
294,493✔
1050
  atomic_store_64(&pSyncTimer->logicClock, 0);
294,493✔
1051
  return 0;
294,492✔
1052
}
1053

1054
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
3,483✔
1055
  int32_t code = 0;
3,483✔
1056
  int64_t tsNow = taosGetTimestampMs();
3,483✔
1057
  if (syncIsInit()) {
3,483!
1058
    SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid);
3,483✔
1059
    if (pData == NULL) {
3,483!
1060
      pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
3,483!
1061
      pData->rid = syncHbTimerDataAdd(pData);
3,483✔
1062
    }
1063
    pSyncTimer->hbDataRid = pData->rid;
3,483✔
1064
    pSyncTimer->timeStamp = tsNow;
3,483✔
1065

1066
    pData->syncNodeRid = pSyncNode->rid;
3,483✔
1067
    pData->pTimer = pSyncTimer;
3,483✔
1068
    pData->destId = pSyncTimer->destId;
3,483✔
1069
    pData->logicClock = pSyncTimer->logicClock;
3,483✔
1070
    pData->execTime = tsNow + pSyncTimer->timerMS;
3,483✔
1071

1072
    sTrace("vgId:%d, start hb timer, rid:%" PRId64 " addr:0x%" PRIx64 " at %d", pSyncNode->vgId, pData->rid,
3,483!
1073
           pData->destId.addr, pSyncTimer->timerMS);
1074

1075
    bool stopped = taosTmrResetPriority(pSyncTimer->timerCb, pSyncTimer->timerMS, (void*)(pData->rid),
3,483✔
1076
                                        syncEnv()->pTimerManager, &pSyncTimer->pTimer, 2);
3,483✔
1077
    if (stopped) {
3,483!
1078
      sError("vgId:%d, failed to reset hb timer success", pSyncNode->vgId);
×
1079
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1080
    }
1081
  } else {
1082
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1083
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
×
1084
  }
1085
  return code;
3,483✔
1086
}
1087

1088
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
35,109✔
1089
  int32_t ret = 0;
35,109✔
1090
  (void)atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
35,109✔
1091
  bool stop = taosTmrStop(pSyncTimer->pTimer);
35,111✔
1092
  sDebug("vgId:%d, stop hb timer stop:%d", pSyncNode->vgId, stop);
35,111✔
1093
  pSyncTimer->pTimer = NULL;
35,111✔
1094
  syncHbTimerDataRemove(pSyncTimer->hbDataRid);
35,111✔
1095
  pSyncTimer->hbDataRid = -1;
35,111✔
1096
  return ret;
35,111✔
1097
}
1098

1099
int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
17,551✔
1100
  int32_t code = 0;
17,551✔
1101
  if (pNode->pLogStore == NULL) {
17,551!
1102
    sError("vgId:%d, log store not created", pNode->vgId);
×
1103
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1104
  }
1105
  if (pNode->pFsm == NULL) {
17,551!
1106
    sError("vgId:%d, pFsm not registered", pNode->vgId);
×
1107
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1108
  }
1109
  if (pNode->pFsm->FpGetSnapshotInfo == NULL) {
17,551!
1110
    sError("vgId:%d, FpGetSnapshotInfo not registered", pNode->vgId);
×
1111
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1112
  }
1113
  SSnapshot snapshot = {0};
17,551✔
1114
  // TODO check return value
1115
  (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
17,551✔
1116

1117
  SyncIndex commitIndex = snapshot.lastApplyIndex;
17,551✔
1118
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
17,551✔
1119
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
17,551✔
1120
  if ((lastVer < commitIndex || firstVer > commitIndex + 1) || pNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
17,551!
1121
    sInfo("vgId:%d, restore log store from snapshot, firstVer:%" PRId64 ", lastVer:%" PRId64 ", commitIndex:%" PRId64,
×
1122
          pNode->vgId, firstVer, lastVer, commitIndex);
1123
    if ((code = pNode->pLogStore->syncLogRestoreFromSnapshot(pNode->pLogStore, commitIndex)) != 0) {
×
1124
      sError("vgId:%d, failed to restore log store from snapshot since %s. lastVer:%" PRId64 ", snapshotVer:%" PRId64,
×
1125
             pNode->vgId, terrstr(), lastVer, commitIndex);
1126
      TAOS_RETURN(code);
×
1127
    }
1128
  }
1129
  TAOS_RETURN(code);
17,551✔
1130
}
1131

1132
// open/close --------------
1133
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
17,545✔
1134
  int32_t    code = 0;
17,545✔
1135
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
17,545!
1136
  if (pSyncNode == NULL) {
17,551!
1137
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1138
    goto _error;
×
1139
  }
1140

1141
  if (!taosDirExist((char*)(pSyncInfo->path))) {
17,551✔
1142
    if (taosMkDir(pSyncInfo->path) != 0) {
13,462!
1143
      terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1144
      sError("vgId:%d, failed to create dir:%s since %s", pSyncInfo->vgId, pSyncInfo->path, terrstr());
×
1145
      goto _error;
×
1146
    }
1147
  }
1148

1149
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
17,549✔
1150
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
17,549✔
1151
           TD_DIRSEP);
1152
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
17,549✔
1153

1154
  if (!taosCheckExistFile(pSyncNode->configPath)) {
17,549✔
1155
    // create a new raft config file
1156
    sInfo("vgId:%d, create a new raft config file", pSyncInfo->vgId);
13,460✔
1157
    pSyncNode->vgId = pSyncInfo->vgId;
13,461✔
1158
    pSyncNode->mountVgId = pSyncInfo->mountVgId;
13,461✔
1159
    pSyncNode->raftCfg.isStandBy = pSyncInfo->isStandBy;
13,461✔
1160
    pSyncNode->raftCfg.snapshotStrategy = pSyncInfo->snapshotStrategy;
13,461✔
1161
    pSyncNode->raftCfg.lastConfigIndex = pSyncInfo->syncCfg.lastIndex;
13,461✔
1162
    pSyncNode->raftCfg.batchSize = pSyncInfo->batchSize;
13,461✔
1163
    pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
13,461✔
1164
    pSyncNode->raftCfg.configIndexCount = 1;
13,461✔
1165
    pSyncNode->raftCfg.configIndexArr[0] = -1;
13,461✔
1166

1167
    if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
13,461!
1168
      terrno = code;
×
1169
      sError("vgId:%d, failed to create sync cfg file", pSyncNode->vgId);
×
1170
      goto _error;
×
1171
    }
1172
  } else {
1173
    // update syncCfg by raft_config.json
1174
    if ((code = syncReadCfgFile(pSyncNode)) != 0) {
4,089!
1175
      terrno = code;
×
1176
      sError("vgId:%d, failed to read sync cfg file", pSyncNode->vgId);
×
1177
      goto _error;
×
1178
    }
1179

1180
    if (vnodeVersion > pSyncNode->raftCfg.cfg.changeVersion) {
4,088✔
1181
      if (pSyncInfo->syncCfg.totalReplicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) {
2,748!
1182
        sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
2,333!
1183
        pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
2,333✔
1184
        if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
2,333!
1185
          terrno = code;
×
1186
          sError("vgId:%d, failed to write sync cfg file", pSyncNode->vgId);
×
1187
          goto _error;
×
1188
        }
1189
      } else {
1190
        sInfo("vgId:%d, use sync config from sync cfg file", pSyncNode->vgId);
415!
1191
        pSyncInfo->syncCfg = pSyncNode->raftCfg.cfg;
415✔
1192
      }
1193
    } else {
1194
      sInfo("vgId:%d, skip save sync cfg file since request ver:%d <= file ver:%d", pSyncNode->vgId, vnodeVersion,
1,340!
1195
            pSyncInfo->syncCfg.changeVersion);
1196
    }
1197
  }
1198

1199
  // init by SSyncInfo
1200
  pSyncNode->vgId = pSyncInfo->vgId;
17,551✔
1201
  pSyncNode->mountVgId = pSyncInfo->mountVgId;
17,551✔
1202
  SSyncCfg* pCfg = &pSyncNode->raftCfg.cfg;
17,551✔
1203
  bool      updated = false;
17,551✔
1204
  sInfo("vgId:%d, start to open sync node, totalReplicaNum:%d replicaNum:%d selfIndex:%d", pSyncNode->vgId,
17,551✔
1205
        pCfg->totalReplicaNum, pCfg->replicaNum, pCfg->myIndex);
1206
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
44,723✔
1207
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
27,172✔
1208
    if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) {
27,172!
1209
      updated = true;
×
1210
    }
1211
    sInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort,
27,172✔
1212
          pNode->nodeId, pNode->clusterId);
1213
  }
1214

1215
  if (vnodeVersion > pSyncInfo->syncCfg.changeVersion) {
17,551✔
1216
    if (updated) {
2,490!
1217
      sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId);
×
1218
      if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
×
1219
        terrno = code;
×
1220
        sError("vgId:%d, failed to write sync cfg file on dnode info updated", pSyncNode->vgId);
×
1221
        goto _error;
×
1222
      }
1223
    }
1224
  }
1225

1226
  pSyncNode->pWal = pSyncInfo->pWal;
17,551✔
1227
  pSyncNode->msgcb = pSyncInfo->msgcb;
17,551✔
1228
  pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
17,551✔
1229
  pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
17,551✔
1230
  pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
17,551✔
1231

1232
  // create raft log ring buffer
1233
  code = syncLogBufferCreate(&pSyncNode->pLogBuf);
17,551✔
1234
  if (pSyncNode->pLogBuf == NULL) {
17,550!
1235
    sError("failed to init sync log buffer since %s. vgId:%d", tstrerror(code), pSyncNode->vgId);
×
1236
    goto _error;
×
1237
  }
1238

1239
  // init replicaNum, replicasId
1240
  pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
17,550✔
1241
  pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
17,550✔
1242
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
44,722✔
1243
    if (syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]) ==
27,171!
1244
        false) {
1245
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
×
1246
      goto _error;
×
1247
    }
1248
  }
1249

1250
  // init internal
1251
  pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
17,551✔
1252
  pSyncNode->myRaftId = pSyncNode->replicasId[pSyncNode->raftCfg.cfg.myIndex];
17,551✔
1253

1254
  // init peersNum, peers, peersId
1255
  pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
17,551✔
1256
  int32_t j = 0;
17,551✔
1257
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
44,723✔
1258
    if (i != pSyncNode->raftCfg.cfg.myIndex) {
27,172✔
1259
      pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
9,621✔
1260
      pSyncNode->peersId[j] = pSyncNode->replicasId[i];
9,621✔
1261
      syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
9,621✔
1262
      j++;
9,621✔
1263
    }
1264
  }
1265

1266
  pSyncNode->arbTerm = -1;
17,551✔
1267
  (void)taosThreadMutexInit(&pSyncNode->arbTokenMutex, NULL);
17,551✔
1268
  syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncInfo->vgId, pSyncNode->arbToken);
17,551✔
1269
  sInfo("vgId:%d, generate arb token:%s", pSyncNode->vgId, pSyncNode->arbToken);
17,551✔
1270

1271
  // init raft algorithm
1272
  pSyncNode->pFsm = pSyncInfo->pFsm;
17,551✔
1273
  pSyncInfo->pFsm = NULL;
17,551✔
1274
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
17,551✔
1275
  pSyncNode->leaderCache = EMPTY_RAFT_ID;
17,551✔
1276
  pSyncNode->leaderCacheEp.port = 0;
17,551✔
1277
  pSyncNode->leaderCacheEp.fqdn[0] = '\0';
17,551✔
1278

1279
  // init life cycle outside
1280

1281
  // TLA+ Spec
1282
  // InitHistoryVars == /\ elections = {}
1283
  //                    /\ allLogs   = {}
1284
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
1285
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
1286
  //                   /\ state       = [i \in Server |-> Follower]
1287
  //                   /\ votedFor    = [i \in Server |-> Nil]
1288
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
1289
  //                      /\ votesGranted   = [i \in Server |-> {}]
1290
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
1291
  // \* leader does not send itself messages. It's still easier to include these
1292
  // \* in the functions.
1293
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
1294
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
1295
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
1296
  //                /\ commitIndex  = [i \in Server |-> 0]
1297
  // Init == /\ messages = [m \in {} |-> 0]
1298
  //         /\ InitHistoryVars
1299
  //         /\ InitServerVars
1300
  //         /\ InitCandidateVars
1301
  //         /\ InitLeaderVars
1302
  //         /\ InitLogVars
1303
  //
1304

1305
  // init TLA+ server vars
1306
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
17,551✔
1307
  pSyncNode->roleTimeMs = taosGetTimestampMs();
17,551✔
1308
  if ((code = raftStoreOpen(pSyncNode)) != 0) {
17,551!
1309
    terrno = code;
×
1310
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
×
1311
    goto _error;
×
1312
  }
1313

1314
  // init TLA+ candidate vars
1315
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
17,551✔
1316
  if (pSyncNode->pVotesGranted == NULL) {
17,551!
1317
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
×
1318
    goto _error;
×
1319
  }
1320
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
17,551✔
1321
  if (pSyncNode->pVotesRespond == NULL) {
17,551!
1322
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
×
1323
    goto _error;
×
1324
  }
1325

1326
  // init TLA+ leader vars
1327
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
17,551✔
1328
  if (pSyncNode->pNextIndex == NULL) {
17,549!
1329
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
×
1330
    goto _error;
×
1331
  }
1332
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
17,549✔
1333
  if (pSyncNode->pMatchIndex == NULL) {
17,550!
1334
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
×
1335
    goto _error;
×
1336
  }
1337

1338
  // init TLA+ log vars
1339
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
17,550✔
1340
  if (pSyncNode->pLogStore == NULL) {
17,550!
1341
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
×
1342
    goto _error;
×
1343
  }
1344

1345
  SyncIndex commitIndex = SYNC_INDEX_INVALID;
17,550✔
1346
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
17,550!
1347
    SSnapshot snapshot = {0};
17,550✔
1348
    // TODO check return value
1349
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
17,550✔
1350
    if (snapshot.lastApplyIndex > commitIndex) {
17,551✔
1351
      commitIndex = snapshot.lastApplyIndex;
1,885✔
1352
      sNTrace(pSyncNode, "reset commit index by snapshot");
1,885!
1353
    }
1354
    pSyncNode->fsmState = snapshot.state;
17,551✔
1355
    if (pSyncNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
17,551!
1356
      sError("vgId:%d, fsm state is incomplete.", pSyncNode->vgId);
×
1357
      if (pSyncNode->replicaNum == 1) {
×
1358
        terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1359
        goto _error;
×
1360
      }
1361
    }
1362
  }
1363
  pSyncNode->commitIndex = commitIndex;
17,551✔
1364
  sInfo("vgId:%d, sync node commitIndex initialized as %" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
17,551✔
1365

1366
  // restore log store on need
1367
  if ((code = syncNodeLogStoreRestoreOnNeed(pSyncNode)) < 0) {
17,551!
1368
    terrno = code;
×
1369
    sError("vgId:%d, failed to restore log store since %s.", pSyncNode->vgId, terrstr());
×
1370
    goto _error;
×
1371
  }
1372

1373
  // timer ms init
1374
  pSyncNode->pingBaseLine = PING_TIMER_MS;
17,551✔
1375
  pSyncNode->electBaseLine = tsElectInterval;
17,551✔
1376
  pSyncNode->hbBaseLine = tsHeartbeatInterval;
17,551✔
1377

1378
  // init ping timer
1379
  pSyncNode->pPingTimer = NULL;
17,551✔
1380
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
17,551✔
1381
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
17,551✔
1382
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
17,551✔
1383
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
17,551✔
1384
  pSyncNode->pingTimerCounter = 0;
17,551✔
1385

1386
  // init elect timer
1387
  pSyncNode->pElectTimer = NULL;
17,551✔
1388
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
17,551✔
1389
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
17,551✔
1390
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
17,551✔
1391
  pSyncNode->electTimerCounter = 0;
17,551✔
1392

1393
  // init heartbeat timer
1394
  pSyncNode->pHeartbeatTimer = NULL;
17,551✔
1395
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
17,551✔
1396
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
17,551✔
1397
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
17,551✔
1398
#ifdef BUILD_NO_CALL
1399
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
1400
#endif
1401
  pSyncNode->heartbeatTimerCounter = 0;
17,552✔
1402

1403
  // init peer heartbeat timer
1404
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
280,814✔
1405
    if ((code = syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i])) != 0) {
263,263!
1406
      terrno = code;
×
1407
      goto _error;
×
1408
    }
1409
  }
1410

1411
  // tools
1412
  if ((code = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS, &pSyncNode->pSyncRespMgr)) != 0) {
17,551!
1413
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
×
1414
    goto _error;
×
1415
  }
1416
  if (pSyncNode->pSyncRespMgr == NULL) {
17,550!
1417
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
×
1418
    goto _error;
×
1419
  }
1420

1421
  // restore state
1422
  pSyncNode->restoreFinish = false;
17,550✔
1423

1424
  // snapshot senders
1425
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
280,718✔
1426
    SSyncSnapshotSender* pSender = NULL;
263,177✔
1427
    code = snapshotSenderCreate(pSyncNode, i, &pSender);
263,177✔
1428
    if (pSender == NULL) return NULL;
263,153!
1429

1430
    pSyncNode->senders[i] = pSender;
263,153✔
1431
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
263,153✔
1432
  }
1433

1434
  // snapshot receivers
1435
  code = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID, &pSyncNode->pNewNodeReceiver);
17,541✔
1436
  if (pSyncNode->pNewNodeReceiver == NULL) return NULL;
17,551!
1437
  sRDebug(pSyncNode->pNewNodeReceiver, "snapshot receiver create while open sync node, data:%p",
17,551✔
1438
          pSyncNode->pNewNodeReceiver);
1439

1440
  // is config changing
1441
  pSyncNode->changing = false;
17,551✔
1442

1443
  // replication mgr
1444
  if ((code = syncNodeLogReplInit(pSyncNode)) < 0) {
17,551!
1445
    terrno = code;
×
1446
    sError("vgId:%d, failed to init repl mgr since %s.", pSyncNode->vgId, terrstr());
×
1447
    goto _error;
×
1448
  }
1449

1450
  // peer state
1451
  if ((code = syncNodePeerStateInit(pSyncNode)) < 0) {
17,551!
1452
    terrno = code;
×
1453
    sError("vgId:%d, failed to init peer stat since %s.", pSyncNode->vgId, terrstr());
×
1454
    goto _error;
×
1455
  }
1456

1457
  //
1458
  // min match index
1459
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
17,551✔
1460

1461
  // start in syncNodeStart
1462
  // start raft
1463

1464
  int64_t timeNow = taosGetTimestampMs();
17,549✔
1465
  pSyncNode->startTime = timeNow;
17,549✔
1466
  pSyncNode->lastReplicateTime = timeNow;
17,549✔
1467

1468
  // snapshotting
1469
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
17,549✔
1470

1471
  // init log buffer
1472
  if ((code = syncLogBufferInit(pSyncNode->pLogBuf, pSyncNode)) < 0) {
17,549!
1473
    terrno = code;
×
1474
    sError("vgId:%d, failed to init sync log buffer since %s", pSyncNode->vgId, terrstr());
×
1475
    goto _error;
×
1476
  }
1477

1478
  pSyncNode->isStart = true;
17,551✔
1479
  pSyncNode->electNum = 0;
17,551✔
1480
  pSyncNode->becomeLeaderNum = 0;
17,551✔
1481
  pSyncNode->becomeAssignedLeaderNum = 0;
17,551✔
1482
  pSyncNode->configChangeNum = 0;
17,551✔
1483
  pSyncNode->hbSlowNum = 0;
17,551✔
1484
  pSyncNode->hbrSlowNum = 0;
17,551✔
1485
  pSyncNode->tmrRoutineNum = 0;
17,551✔
1486

1487
  sNInfo(pSyncNode, "sync node opened, node:%p electInterval:%d heartbeatInterval:%d heartbeatTimeout:%d", pSyncNode,
17,551✔
1488
         tsElectInterval, tsHeartbeatInterval, tsHeartbeatTimeout);
1489
  return pSyncNode;
17,551✔
1490

1491
_error:
×
1492
  if (pSyncInfo->pFsm) {
×
1493
    taosMemoryFree(pSyncInfo->pFsm);
×
1494
    pSyncInfo->pFsm = NULL;
×
1495
  }
1496
  syncNodeClose(pSyncNode);
×
1497
  pSyncNode = NULL;
×
1498
  return NULL;
×
1499
}
1500

1501
#ifdef BUILD_NO_CALL
1502
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
1503
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
1504
    SSnapshot snapshot = {0};
1505
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
1506
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
1507
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
1508
    }
1509
  }
1510
}
1511
#endif
1512

1513
int32_t syncNodeRestore(SSyncNode* pSyncNode) {
17,550✔
1514
  int32_t code = 0;
17,550✔
1515
  if (pSyncNode->pLogStore == NULL) {
17,550!
1516
    sError("vgId:%d, log store not created", pSyncNode->vgId);
×
1517
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1518
  }
1519
  if (pSyncNode->pLogBuf == NULL) {
17,550!
1520
    sError("vgId:%d, ring log buffer not created", pSyncNode->vgId);
×
1521
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1522
  }
1523

1524
  (void)taosThreadMutexLock(&pSyncNode->pLogBuf->mutex);
17,550✔
1525
  SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
17,550✔
1526
  SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore);
17,550✔
1527
  SyncIndex endIndex = pSyncNode->pLogBuf->endIndex;
17,550✔
1528
  (void)taosThreadMutexUnlock(&pSyncNode->pLogBuf->mutex);
17,550✔
1529

1530
  if (lastVer != -1 && endIndex != lastVer + 1) {
17,550!
1531
    code = TSDB_CODE_WAL_LOG_INCOMPLETE;
×
1532
    sWarn("vgId:%d, failed to restore sync node since %s. expected lastLogIndex:%" PRId64 ", lastVer:%" PRId64,
×
1533
          pSyncNode->vgId, terrstr(), endIndex - 1, lastVer);
1534
    // TAOS_RETURN(code);
1535
  }
1536

1537
  // if (endIndex != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
1538
  pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);
17,550✔
1539
  sInfo("vgId:%d, restore began, and keep syncing until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
17,550✔
1540

1541
  if (pSyncNode->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
35,099!
1542
      (code = syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex, NULL, "restore")) < 0) {
17,550✔
1543
    TAOS_RETURN(code);
×
1544
  }
1545

1546
  TAOS_RETURN(code);
17,549✔
1547
}
1548

1549
int32_t syncNodeStart(SSyncNode* pSyncNode) {
17,550✔
1550
  // start raft
1551
  sInfo("vgId:%d, begin to start sync node", pSyncNode->vgId);
17,550✔
1552
  if (pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
17,550✔
1553
    syncNodeBecomeLearner(pSyncNode, "first start");
393✔
1554
  } else {
1555
    if (pSyncNode->replicaNum == 1) {
17,157✔
1556
      raftStoreNextTerm(pSyncNode);
13,056✔
1557
      syncNodeBecomeLeader(pSyncNode, "one replica start");
13,056✔
1558

1559
      // Raft 3.6.2 Committing entries from previous terms
1560
      TAOS_CHECK_RETURN(syncNodeAppendNoop(pSyncNode));
13,056!
1561
    } else {
1562
      SRaftId id = {0};
4,101✔
1563
      syncNodeBecomeFollower(pSyncNode, id, "first start");
4,101✔
1564
    }
1565
  }
1566

1567
  int32_t ret = 0;
17,549✔
1568
  ret = syncNodeStartPingTimer(pSyncNode);
17,549✔
1569
  if (ret != 0) {
17,550!
1570
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, tstrerror(ret));
×
1571
  }
1572
  sInfo("vgId:%d, sync node started", pSyncNode->vgId);
17,550✔
1573
  return ret;
17,550✔
1574
}
1575

1576
#ifdef BUILD_NO_CALL
1577
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
1578
  // state change
1579
  int32_t code = 0;
1580
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
1581
  pSyncNode->roleTimeMs = taosGetTimestampMs();
1582
  // TODO check return value
1583
  TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1584

1585
  // reset elect timer, long enough
1586
  int32_t electMS = TIMER_MAX_MS;
1587
  code = syncNodeRestartElectTimer(pSyncNode, electMS);
1588
  if (code < 0) {
1589
    sError("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, terrstr());
1590
    return -1;
1591
  }
1592

1593
  code = syncNodeStartPingTimer(pSyncNode);
1594
  if (code < 0) {
1595
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr());
1596
    return -1;
1597
  }
1598
  return code;
1599
}
1600
#endif
1601

1602
void syncNodePreClose(SSyncNode* pSyncNode) {
17,549✔
1603
  int32_t code = 0;
17,549✔
1604
  if (pSyncNode == NULL) {
17,549!
1605
    sError("failed to pre close sync node since sync node is null");
×
1606
    return;
×
1607
  }
1608
  if (pSyncNode->pFsm == NULL) {
17,549!
1609
    sError("failed to pre close sync node since fsm is null");
×
1610
    return;
×
1611
  }
1612
  if (pSyncNode->pFsm->FpApplyQueueItems == NULL) {
17,549!
1613
    sError("failed to pre close sync node since FpApplyQueueItems is null");
×
1614
    return;
×
1615
  }
1616

1617
  // stop elect timer
1618
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
17,549!
1619
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1620
    return;
×
1621
  }
1622

1623
  // stop heartbeat timer
1624
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
17,547!
1625
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
1626
    return;
×
1627
  }
1628

1629
  // stop ping timer
1630
  if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) {
17,547!
1631
    sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code));
×
1632
    return;
×
1633
  }
1634

1635
  // clean rsp
1636
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
17,550✔
1637
}
1638

1639
void syncNodePostClose(SSyncNode* pSyncNode) {
15,061✔
1640
  if (pSyncNode->pNewNodeReceiver != NULL) {
15,061!
1641
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
15,061!
1642
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
1643
    }
1644

1645
    sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
15,060✔
1646
           pSyncNode->pNewNodeReceiver);
1647
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
15,060✔
1648
    pSyncNode->pNewNodeReceiver = NULL;
15,061✔
1649
  }
1650
}
15,061✔
1651

1652
void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }
3,477!
1653

1654
void syncNodeClose(SSyncNode* pSyncNode) {
17,550✔
1655
  int32_t code = 0;
17,550✔
1656
  if (pSyncNode == NULL) return;
17,550!
1657
  sNInfo(pSyncNode, "sync close, node:%p", pSyncNode);
17,550✔
1658

1659
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
17,550✔
1660

1661
  if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) {
17,550!
1662
    sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code));
×
1663
    return;
×
1664
  }
1665
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
17,550!
1666
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1667
    return;
×
1668
  }
1669
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
17,550!
1670
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
1671
    return;
×
1672
  }
1673
  syncNodeLogReplDestroy(pSyncNode);
17,550✔
1674

1675
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
17,550✔
1676
  pSyncNode->pSyncRespMgr = NULL;
17,550✔
1677
  voteGrantedDestroy(pSyncNode->pVotesGranted);
17,550✔
1678
  pSyncNode->pVotesGranted = NULL;
17,548✔
1679
  votesRespondDestory(pSyncNode->pVotesRespond);
17,548✔
1680
  pSyncNode->pVotesRespond = NULL;
17,549✔
1681
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
17,549✔
1682
  pSyncNode->pNextIndex = NULL;
17,549✔
1683
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
17,549✔
1684
  pSyncNode->pMatchIndex = NULL;
17,550✔
1685
  logStoreDestory(pSyncNode->pLogStore);
17,550✔
1686
  pSyncNode->pLogStore = NULL;
17,549✔
1687
  syncLogBufferDestroy(pSyncNode->pLogBuf);
17,549✔
1688
  pSyncNode->pLogBuf = NULL;
17,549✔
1689

1690
  (void)taosThreadMutexDestroy(&pSyncNode->arbTokenMutex);
17,549✔
1691

1692
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
280,755✔
1693
    if (pSyncNode->senders[i] != NULL) {
263,205!
1694
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", pSyncNode->vgId, pSyncNode->senders[i]);
263,205✔
1695

1696
      if (snapshotSenderIsStart(pSyncNode->senders[i])) {
263,207!
1697
        snapshotSenderStop(pSyncNode->senders[i], false);
×
1698
      }
1699

1700
      snapshotSenderDestroy(pSyncNode->senders[i]);
263,211✔
1701
      pSyncNode->senders[i] = NULL;
263,224✔
1702
    }
1703
  }
1704

1705
  if (pSyncNode->pNewNodeReceiver != NULL) {
17,550✔
1706
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
2,489!
1707
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
1708
    }
1709

1710
    sDebug("vgId:%d, snapshot receiver destroy while close, data:%p", pSyncNode->vgId, pSyncNode->pNewNodeReceiver);
2,489✔
1711
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
2,489✔
1712
    pSyncNode->pNewNodeReceiver = NULL;
2,489✔
1713
  }
1714

1715
  if (pSyncNode->pFsm != NULL) {
17,550!
1716
    taosMemoryFree(pSyncNode->pFsm);
17,550!
1717
  }
1718

1719
  raftStoreClose(pSyncNode);
17,550✔
1720

1721
  taosMemoryFree(pSyncNode);
17,550!
1722
}
1723

1724
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->raftCfg.snapshotStrategy; }
×
1725

1726
// timer control --------------
1727
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
17,549✔
1728
  int32_t code = 0;
17,549✔
1729
  if (syncIsInit()) {
17,549!
1730
    bool stopped = taosTmrResetPriority(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, (void*)pSyncNode->rid,
17,550✔
1731
                                        syncEnv()->pTimerManager, &pSyncNode->pPingTimer, 2);
17,550✔
1732
    if (stopped) {
17,550!
1733
      sError("vgId:%d, failed to reset ping timer, ms:%d", pSyncNode->vgId, pSyncNode->pingTimerMS);
×
1734
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1735
    }
1736
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
17,550✔
1737
  } else {
1738
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
×
1739
  }
1740
  return code;
17,550✔
1741
}
1742

1743
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
35,097✔
1744
  int32_t code = 0;
35,097✔
1745
  (void)atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
35,097✔
1746
  bool stop = taosTmrStop(pSyncNode->pPingTimer);
35,097✔
1747
  sDebug("vgId:%d, stop ping timer, stop:%d", pSyncNode->vgId, stop);
35,100✔
1748
  pSyncNode->pPingTimer = NULL;
35,100✔
1749
  return code;
35,100✔
1750
}
1751

1752
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
1,089,667✔
1753
  int32_t code = 0;
1,089,667✔
1754
  if (syncIsInit()) {
1,089,667!
1755
    pSyncNode->electTimerMS = ms;
1,089,668✔
1756

1757
    int64_t execTime = taosGetTimestampMs() + ms;
1,089,667✔
1758
    atomic_store_64(&(pSyncNode->electTimerParam.executeTime), execTime);
1,089,667✔
1759
    atomic_store_64(&(pSyncNode->electTimerParam.logicClock), pSyncNode->electTimerLogicClock);
1,089,669✔
1760
    pSyncNode->electTimerParam.pSyncNode = pSyncNode;
1,089,668✔
1761
    pSyncNode->electTimerParam.pData = NULL;
1,089,668✔
1762

1763
    bool stopped = taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid),
1,089,668✔
1764
                                syncEnv()->pTimerManager, &pSyncNode->pElectTimer);
1,089,668✔
1765
    if (stopped) sWarn("vgId:%d, failed to reset elect timer, ms:%d", pSyncNode->vgId, ms);
1,089,669!
1766
  } else {
1767
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
×
1768
  }
1769
  return code;
1,089,669✔
1770
}
1771

1772
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
1,139,275✔
1773
  int32_t code = 0;
1,139,275✔
1774
  (void)atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
1,139,275✔
1775
  bool stop = taosTmrStop(pSyncNode->pElectTimer);
1,139,279✔
1776
  sTrace("vgId:%d, stop elect timer, stop:%d", pSyncNode->vgId, stop);
1,139,272✔
1777
  pSyncNode->pElectTimer = NULL;
1,139,272✔
1778

1779
  return code;
1,139,272✔
1780
}
1781

1782
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
1,089,669✔
1783
  int32_t ret = 0;
1,089,669✔
1784
  TAOS_CHECK_RETURN(syncNodeStopElectTimer(pSyncNode));
1,089,669!
1785
  TAOS_CHECK_RETURN(syncNodeStartElectTimer(pSyncNode, ms));
1,089,667!
1786
  return ret;
1,089,669✔
1787
}
1788

1789
void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
1,089,666✔
1790
  int32_t code = 0;
1,089,666✔
1791
  int32_t electMS;
1792

1793
  if (pSyncNode->raftCfg.isStandBy) {
1,089,666!
1794
    electMS = TIMER_MAX_MS;
×
1795
  } else {
1796
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
1,089,666✔
1797
  }
1798

1799
  if ((code = syncNodeRestartElectTimer(pSyncNode, electMS)) != 0) {
1,089,669!
1800
    sWarn("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1801
    return;
×
1802
  };
1803

1804
  sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
1,089,669!
1805
          electMS);
1806
}
1807

1808
#ifdef BUILD_NO_CALL
1809
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
1810
  int32_t code = 0;
1811
  if (syncIsInit()) {
1812
    TAOS_CHECK_RETURN(taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, (void*)pSyncNode->rid,
1813
                                   syncEnv()->pTimerManager, &pSyncNode->pHeartbeatTimer));
1814
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
1815
  } else {
1816
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1817
  }
1818

1819
  sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
1820
  return code;
1821
}
1822
#endif
1823

1824
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
16,590✔
1825
  int32_t ret = 0;
16,590✔
1826

1827
#if 0
1828
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1829
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
1830
#endif
1831

1832
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
20,073✔
1833
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
3,483✔
1834
    if (pSyncTimer != NULL) {
3,483!
1835
      TAOS_CHECK_RETURN(syncHbTimerStart(pSyncNode, pSyncTimer));
3,483!
1836
    }
1837
  }
1838

1839
  return ret;
16,590✔
1840
}
1841

1842
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
44,304✔
1843
  int32_t code = 0;
44,304✔
1844

1845
#if 0
1846
  TAOS_CHECK_RETURN(atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1));
1847
  bool stop = taosTmrStop(pSyncNode->pHeartbeatTimer);
1848
  sDebug("vgId:%d, stop heartbeat timer, stop:%d", pSyncNode->vgId, stop);
1849
  pSyncNode->pHeartbeatTimer = NULL;
1850
#endif
1851

1852
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
79,415✔
1853
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
35,111✔
1854
    if (pSyncTimer != NULL) {
35,109!
1855
      TAOS_CHECK_RETURN(syncHbTimerStop(pSyncNode, pSyncTimer));
35,109!
1856
    }
1857
  }
1858

1859
  return code;
44,304✔
1860
}
1861

1862
#ifdef BUILD_NO_CALL
1863
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
1864
  // TODO check return value
1865
  int32_t code = 0;
1866
  TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1867
  TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
1868
  return 0;
1869
}
1870
#endif
1871

1872
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pNode, SRpcMsg* pMsg) {
14,742,090✔
1873
  SEpSet* epSet = NULL;
14,742,090✔
1874
  for (int32_t i = 0; i < pNode->peersNum; ++i) {
15,206,354✔
1875
    if (destRaftId->addr == pNode->peersId[i].addr) {
15,206,280✔
1876
      epSet = &pNode->peersEpset[i];
14,742,016✔
1877
      break;
14,742,016✔
1878
    }
1879
  }
1880

1881
  int32_t code = -1;
14,742,090✔
1882
  if (pNode->syncSendMSg != NULL && epSet != NULL) {
14,742,090!
1883
    syncUtilMsgHtoN(pMsg->pCont);
14,742,017✔
1884
    pMsg->info.noResp = 1;
14,742,006✔
1885
    code = pNode->syncSendMSg(epSet, pMsg);
14,742,006✔
1886
  }
1887

1888
  if (code < 0) {
14,742,120✔
1889
    sError("vgId:%d, failed to send sync msg since %s. epset:%p dnode:%d addr:0x%" PRIx64, pNode->vgId, tstrerror(code),
103!
1890
           epSet, DID(destRaftId), destRaftId->addr);
1891
    rpcFreeCont(pMsg->pCont);
103✔
1892
  }
1893

1894
  TAOS_RETURN(code);
14,742,120✔
1895
}
1896

1897
inline bool syncNodeInConfig(SSyncNode* pNode, const SSyncCfg* pCfg) {
3,013✔
1898
  bool b1 = false;
3,013✔
1899
  bool b2 = false;
3,013✔
1900

1901
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
3,593!
1902
    if (strcmp(pCfg->nodeInfo[i].nodeFqdn, pNode->myNodeInfo.nodeFqdn) == 0 &&
3,593!
1903
        pCfg->nodeInfo[i].nodePort == pNode->myNodeInfo.nodePort) {
3,593✔
1904
      b1 = true;
3,013✔
1905
      break;
3,013✔
1906
    }
1907
  }
1908

1909
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
3,593!
1910
    SRaftId raftId = {
3,593✔
1911
        .addr = SYNC_ADDR(&pCfg->nodeInfo[i]),
3,593✔
1912
        .vgId = pNode->vgId,
3,593✔
1913
    };
1914

1915
    if (syncUtilSameId(&raftId, &pNode->myRaftId)) {
3,593✔
1916
      b2 = true;
3,013✔
1917
      break;
3,013✔
1918
    }
1919
  }
1920

1921
  if (b1 != b2) {
3,013!
1922
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1923
    return false;
×
1924
  }
1925
  return b1;
3,013✔
1926
}
1927

1928
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
4,646✔
1929
  if (pOldCfg->totalReplicaNum != pNewCfg->totalReplicaNum) return true;
4,646✔
1930
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
3,049✔
1931
  for (int32_t i = 0; i < pOldCfg->totalReplicaNum; ++i) {
7,401✔
1932
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
5,438✔
1933
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
5,438✔
1934
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
5,438!
1935
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
5,438✔
1936
    if (pOldInfo->nodeRole != pNewInfo->nodeRole) return true;
5,436✔
1937
  }
1938

1939
  return false;
1,963✔
1940
}
1941

1942
int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
2,313✔
1943
  int32_t  code = 0;
2,313✔
1944
  SSyncCfg oldConfig = pSyncNode->raftCfg.cfg;
2,313✔
1945
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
2,313✔
1946
    sInfo("vgId:1, sync not reconfig since not changed");
1,963✔
1947
    return 0;
1,963✔
1948
  }
1949

1950
  pSyncNode->raftCfg.cfg = *pNewConfig;
350✔
1951
  pSyncNode->raftCfg.lastConfigIndex = lastConfigChangeIndex;
350✔
1952

1953
  pSyncNode->configChangeNum++;
350✔
1954

1955
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
350✔
1956
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
350✔
1957

1958
  bool isDrop = false;
350✔
1959
  bool isAdd = false;
350✔
1960

1961
  if (IamInOld && !IamInNew) {
350!
1962
    isDrop = true;
×
1963
  } else {
1964
    isDrop = false;
350✔
1965
  }
1966

1967
  if (!IamInOld && IamInNew) {
350!
1968
    isAdd = true;
×
1969
  } else {
1970
    isAdd = false;
350✔
1971
  }
1972

1973
  // log begin config change
1974
  sNInfo(pSyncNode, "begin do config change, from %d to %d, from %" PRId64 " to %" PRId64 ", replicas:%d",
350!
1975
         pSyncNode->vgId, oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum, oldConfig.lastIndex,
1976
         pNewConfig->lastIndex);
1977

1978
  if (IamInNew) {
350!
1979
    pSyncNode->raftCfg.isStandBy = 0;  // change isStandBy to normal
350✔
1980
  }
1981
  if (isDrop) {
350!
1982
    pSyncNode->raftCfg.isStandBy = 1;  // set standby
×
1983
  }
1984

1985
  // add last config index
1986
  SRaftCfg* pCfg = &pSyncNode->raftCfg;
350✔
1987
  if (pCfg->configIndexCount >= MAX_CONFIG_INDEX_COUNT) {
350!
1988
    sNError(pSyncNode, "failed to add cfg index:%d since out of range", pCfg->configIndexCount);
×
1989
    terrno = TSDB_CODE_OUT_OF_RANGE;
×
1990
    return -1;
×
1991
  }
1992

1993
  pCfg->configIndexArr[pCfg->configIndexCount] = lastConfigChangeIndex;
350✔
1994
  pCfg->configIndexCount++;
350✔
1995

1996
  if (IamInNew) {
350!
1997
    //-----------------------------------------
1998
    int32_t ret = 0;
350✔
1999

2000
    // save snapshot senders
2001
    SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
2002
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
350✔
2003
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
2004
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
5,600✔
2005
      oldSenders[i] = pSyncNode->senders[i];
5,250✔
2006
      sSTrace(oldSenders[i], "snapshot sender save old");
5,250!
2007
    }
2008

2009
    // init internal
2010
    pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
350✔
2011
    if (syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId) == false) return terrno;
350!
2012

2013
    // init peersNum, peers, peersId
2014
    pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
350✔
2015
    int32_t j = 0;
350✔
2016
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
1,264✔
2017
      if (i != pSyncNode->raftCfg.cfg.myIndex) {
914✔
2018
        pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
564✔
2019
        syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
564✔
2020
        j++;
564✔
2021
      }
2022
    }
2023
    for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
914✔
2024
      if (syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]) == false)
564!
2025
        return terrno;
×
2026
    }
2027

2028
    // init replicaNum, replicasId
2029
    pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
350✔
2030
    pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
350✔
2031
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
1,264✔
2032
      if (syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]) ==
914!
2033
          false)
2034
        return terrno;
×
2035
    }
2036

2037
    // update quorum first
2038
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
350✔
2039

2040
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
350✔
2041
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
350✔
2042
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
350✔
2043
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
350✔
2044

2045
    // reset snapshot senders
2046

2047
    // clear new
2048
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
5,600✔
2049
      pSyncNode->senders[i] = NULL;
5,250✔
2050
    }
2051

2052
    // reset new
2053
    for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
1,264✔
2054
      // reset sender
2055
      bool reset = false;
914✔
2056
      for (int32_t j = 0; j < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++j) {
3,862✔
2057
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j]) && oldSenders[j] != NULL) {
3,701!
2058
          sNTrace(pSyncNode, "snapshot sender reset for:%" PRId64 ", newIndex:%d, dnode:%d, %p",
753!
2059
                  (pSyncNode->replicasId)[i].addr, i, DID(&pSyncNode->replicasId[i]), oldSenders[j]);
2060

2061
          pSyncNode->senders[i] = oldSenders[j];
753✔
2062
          oldSenders[j] = NULL;
753✔
2063
          reset = true;
753✔
2064

2065
          // reset replicaIndex
2066
          int32_t oldreplicaIndex = pSyncNode->senders[i]->replicaIndex;
753✔
2067
          pSyncNode->senders[i]->replicaIndex = i;
753✔
2068

2069
          sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, dnode:%d, %p, reset:%d",
753!
2070
                  oldreplicaIndex, i, DID(&pSyncNode->replicasId[i]), pSyncNode->senders[i], reset);
2071

2072
          break;
753✔
2073
        }
2074
      }
2075
    }
2076

2077
    // create new
2078
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
5,600✔
2079
      if (pSyncNode->senders[i] == NULL) {
5,250✔
2080
        TAOS_CHECK_RETURN(snapshotSenderCreate(pSyncNode, i, &pSyncNode->senders[i]));
4,497!
2081
        if (pSyncNode->senders[i] == NULL) {
4,497!
2082
          // will be created later while send snapshot
2083
          sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig");
×
2084
        } else {
2085
          sSDebug(pSyncNode->senders[i], "snapshot sender create while reconfig, data:%p", pSyncNode->senders[i]);
4,497✔
2086
        }
2087
      } else {
2088
        sSDebug(pSyncNode->senders[i], "snapshot sender already exist, data:%p", pSyncNode->senders[i]);
753✔
2089
      }
2090
    }
2091

2092
    // free old
2093
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
5,600✔
2094
      if (oldSenders[i] != NULL) {
5,250✔
2095
        sSDebug(oldSenders[i], "snapshot sender destroy old, data:%p replica-index:%d", oldSenders[i], i);
4,497✔
2096
        snapshotSenderDestroy(oldSenders[i]);
4,497✔
2097
        oldSenders[i] = NULL;
4,497✔
2098
      }
2099
    }
2100

2101
    // persist cfg
2102
    TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode));
350!
2103
  } else {
2104
    // persist cfg
2105
    TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode));
×
2106
    sNInfo(pSyncNode, "do not config change from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
×
2107
  }
2108

2109
_END:
×
2110
  // log end config change
2111
  sNInfo(pSyncNode, "end do config change, from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
350!
2112
  return 0;
350✔
2113
}
2114

2115
// raft state change --------------
2116
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
96,431✔
2117
  if (term > raftStoreGetTerm(pSyncNode)) {
96,431!
2118
    raftStoreSetTerm(pSyncNode, term);
×
2119
  }
2120
}
96,431✔
2121

2122
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm, SRaftId id) {
939,940✔
2123
  SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
939,940✔
2124
  if (currentTerm > newTerm) {
939,939!
2125
    sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
×
2126
    return;
×
2127
  }
2128

2129
  do {
2130
    sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
939,939!
2131
  } while (0);
2132

2133
  if (pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
939,939!
2134
    (void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
×
2135
    syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncNode->vgId, pSyncNode->arbToken);
×
2136
    sInfo("vgId:%d, step down as assigned leader, new arbToken:%s", pSyncNode->vgId, pSyncNode->arbToken);
×
2137
    (void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
×
2138
  }
2139

2140
  if (currentTerm < newTerm) {
939,940✔
2141
    raftStoreSetTerm(pSyncNode, newTerm);
3,002✔
2142
    char tmpBuf[64];
2143
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
3,002✔
2144
    syncNodeBecomeFollower(pSyncNode, id, tmpBuf);
3,002✔
2145
    raftStoreClearVote(pSyncNode);
3,002✔
2146
  } else {
2147
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
936,938✔
2148
      syncNodeBecomeFollower(pSyncNode, id, "step down");
7✔
2149
    }
2150
  }
2151
}
2152

2153
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }
7,124✔
2154

2155
void syncNodeBecomeFollower(SSyncNode* pSyncNode, SRaftId leaderId, const char* debugStr) {
7,124✔
2156
  int32_t code = 0;  // maybe clear leader cache
7,124✔
2157
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
7,124✔
2158
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
41✔
2159
    pSyncNode->leaderCacheEp.port = 0;
41✔
2160
    pSyncNode->leaderCacheEp.fqdn[0] = '\0';
41✔
2161
  }
2162

2163
  pSyncNode->hbSlowNum = 0;
7,124✔
2164

2165
  pSyncNode->leaderCache = leaderId;  // state change
7,124✔
2166

2167
  for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
22,754✔
2168
    if (syncUtilSameId(&pSyncNode->replicasId[i], &leaderId)) {
18,639✔
2169
      pSyncNode->leaderCacheEp.port = pSyncNode->raftCfg.cfg.nodeInfo[i].nodePort;
3,009✔
2170
      strncpy(pSyncNode->leaderCacheEp.fqdn, pSyncNode->raftCfg.cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
3,009✔
2171
      break;
3,009✔
2172
    }
2173
  }
2174
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
7,124✔
2175
  pSyncNode->roleTimeMs = taosGetTimestampMs();
7,124✔
2176
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
7,124!
2177
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2178
    return;
×
2179
  }
2180

2181
  // trace log
2182
  sNTrace(pSyncNode, "become follower %s", debugStr);
7,124!
2183

2184
  // send rsp to client
2185
  syncNodeLeaderChangeRsp(pSyncNode);
7,124✔
2186

2187
  // call back
2188
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
7,124!
2189
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
7,124✔
2190
  }
2191

2192
  // min match index
2193
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
7,124✔
2194

2195
  // reset log buffer
2196
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
7,124!
2197
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2198
    return;
×
2199
  }
2200

2201
  // reset elect timer
2202
  syncNodeResetElectTimer(pSyncNode);
7,124✔
2203

2204
  sInfo("vgId:%d, become follower. %s", pSyncNode->vgId, debugStr);
7,124!
2205
}
2206

2207
void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) {
393✔
2208
  pSyncNode->hbSlowNum = 0;
393✔
2209

2210
  // state change
2211
  pSyncNode->state = TAOS_SYNC_STATE_LEARNER;
393✔
2212
  pSyncNode->roleTimeMs = taosGetTimestampMs();
393✔
2213

2214
  // trace log
2215
  sNTrace(pSyncNode, "become learner %s", debugStr);
393!
2216

2217
  // call back
2218
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLearnerCb != NULL) {
393!
2219
    pSyncNode->pFsm->FpBecomeLearnerCb(pSyncNode->pFsm);
393✔
2220
  }
2221

2222
  // min match index
2223
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
393✔
2224

2225
  // reset log buffer
2226
  int32_t code = 0;
393✔
2227
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
393!
2228
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2229
    return;
×
2230
  };
2231
}
2232

2233
// TLA+ Spec
2234
// \* Candidate i transitions to leader.
2235
// BecomeLeader(i) ==
2236
//     /\ state[i] = Candidate
2237
//     /\ votesGranted[i] \in Quorum
2238
//     /\ state'      = [state EXCEPT ![i] = Leader]
2239
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
2240
//                          [j \in Server |-> Len(log[i]) + 1]]
2241
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
2242
//                          [j \in Server |-> 0]]
2243
//     /\ elections'  = elections \cup
2244
//                          {[eterm     |-> currentTerm[i],
2245
//                            eleader   |-> i,
2246
//                            elog      |-> log[i],
2247
//                            evotes    |-> votesGranted[i],
2248
//                            evoterLog |-> voterLog[i]]}
2249
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
2250
//
2251
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
14,510✔
2252
  int32_t code = 0;
14,510✔
2253
  pSyncNode->becomeLeaderNum++;
14,510✔
2254
  pSyncNode->hbrSlowNum = 0;
14,510✔
2255

2256
  // reset restoreFinish
2257
  pSyncNode->restoreFinish = false;
14,510✔
2258

2259
  // state change
2260
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
14,510✔
2261
  pSyncNode->roleTimeMs = taosGetTimestampMs();
14,510✔
2262

2263
  // set leader cache
2264
  pSyncNode->leaderCache = pSyncNode->myRaftId;
14,510✔
2265
  strncpy(pSyncNode->leaderCacheEp.fqdn, pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeFqdn,
14,510✔
2266
          TSDB_FQDN_LEN);
2267
  pSyncNode->leaderCacheEp.port = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodePort;
14,510✔
2268

2269
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
31,896✔
2270
    SyncIndex lastIndex;
2271
    SyncTerm  lastTerm;
2272
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
17,387✔
2273
    if (code != 0) {
17,387✔
2274
      sError("vgId:%d, failed to become leader since %s", pSyncNode->vgId, tstrerror(code));
1!
2275
      return;
×
2276
    }
2277
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
17,386✔
2278
  }
2279

2280
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
31,896✔
2281
    // maybe overwrite myself, no harm
2282
    // just do it!
2283
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
17,387✔
2284
  }
2285

2286
  // init peer mgr
2287
  if ((code = syncNodePeerStateInit(pSyncNode)) != 0) {
14,509!
2288
    sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code));
×
2289
    return;
×
2290
  }
2291

2292
#if 0
2293
  // update sender private term
2294
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
2295
  if (pMySender != NULL) {
2296
    for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
2297
      if (pSyncNode->senders[i]->privateTerm > pMySender->privateTerm) {
2298
        pMySender->privateTerm = pSyncNode->senders[i]->privateTerm;
2299
      }
2300
    }
2301
    (pMySender->privateTerm) += 100;
2302
  }
2303
#endif
2304

2305
  // close receiver
2306
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
14,510!
2307
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
2308
  }
2309

2310
  // stop elect timer
2311
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
14,510!
2312
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
2313
    return;
×
2314
  }
2315

2316
  // start heartbeat timer
2317
  if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) {
14,508!
2318
    sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2319
    return;
×
2320
  }
2321

2322
  // send heartbeat right now
2323
  if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) {
14,509!
2324
    sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code));
×
2325
    return;
×
2326
  }
2327

2328
  // call back
2329
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
14,509!
2330
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
14,510✔
2331
  }
2332

2333
  // min match index
2334
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
14,509✔
2335

2336
  // reset log buffer
2337
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
14,509!
2338
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2339
    return;
×
2340
  }
2341

2342
  // trace log
2343
  sNInfo(pSyncNode, "become leader %s", debugStr);
14,510✔
2344
}
2345

2346
void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) {
×
2347
  int32_t code = 0;
×
2348
  pSyncNode->becomeAssignedLeaderNum++;
×
2349
  pSyncNode->hbrSlowNum = 0;
×
2350

2351
  // reset restoreFinish
2352
  // pSyncNode->restoreFinish = false;
2353

2354
  // state change
2355
  pSyncNode->state = TAOS_SYNC_STATE_ASSIGNED_LEADER;
×
2356
  pSyncNode->roleTimeMs = taosGetTimestampMs();
×
2357

2358
  // set leader cache
2359
  pSyncNode->leaderCache = pSyncNode->myRaftId;
×
2360

2361
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
×
2362
    SyncIndex lastIndex;
2363
    SyncTerm  lastTerm;
2364
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
×
2365
    if (code != 0) {
×
2366
      sError("vgId:%d, failed to become assigned leader since %s", pSyncNode->vgId, tstrerror(code));
×
2367
      return;
×
2368
    }
2369
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
×
2370
  }
2371

2372
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
×
2373
    // maybe overwrite myself, no harm
2374
    // just do it!
2375
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
×
2376
  }
2377

2378
  // init peer mgr
2379
  if ((code = syncNodePeerStateInit(pSyncNode)) != 0) {
×
2380
    sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code));
×
2381
    return;
×
2382
  }
2383

2384
  // close receiver
2385
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
×
2386
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
2387
  }
2388

2389
  // stop elect timer
2390
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
×
2391
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
2392
    return;
×
2393
  }
2394

2395
  // start heartbeat timer
2396
  if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) {
×
2397
    sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2398
    return;
×
2399
  }
2400

2401
  // send heartbeat right now
2402
  if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) {
×
2403
    sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code));
×
2404
    return;
×
2405
  }
2406

2407
  // call back
2408
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeAssignedLeaderCb != NULL) {
×
2409
    pSyncNode->pFsm->FpBecomeAssignedLeaderCb(pSyncNode->pFsm);
×
2410
  }
2411

2412
  // min match index
2413
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
×
2414

2415
  // reset log buffer
2416
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
×
2417
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2418
    return;
×
2419
  }
2420

2421
  // trace log
2422
  sNInfo(pSyncNode, "become assigned leader");
×
2423
}
2424

2425
void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
1,454✔
2426
  if (pSyncNode->state != TAOS_SYNC_STATE_CANDIDATE) {
1,454!
2427
    sError("vgId:%d, failed leader from candidate since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state);
×
2428
    return;
×
2429
  }
2430
  bool granted = voteGrantedMajority(pSyncNode->pVotesGranted);
1,454✔
2431
  if (!granted) {
1,454!
2432
    sError("vgId:%d, not granted by majority.", pSyncNode->vgId);
×
2433
    return;
×
2434
  }
2435
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");
1,454✔
2436

2437
  sNTrace(pSyncNode, "state change syncNodeCandidate2Leader");
1,454!
2438

2439
  int32_t ret = syncNodeAppendNoop(pSyncNode);
1,454✔
2440
  if (ret < 0) {
1,454!
2441
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, terrstr());
×
2442
  }
2443

2444
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
1,454✔
2445

2446
  sInfo("vgId:%d, become leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64, pSyncNode->vgId,
1,454!
2447
        raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
2448
}
2449

2450
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }
547,489✔
2451

2452
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
32,054✔
2453
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
512,932✔
2454
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
480,878✔
2455
    pSyncNode->peerStates[i].lastSendTime = 0;
480,878✔
2456
  }
2457

2458
  return 0;
32,054✔
2459
}
2460

2461
void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
1,675✔
2462
  if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
1,675!
2463
    sError("vgId:%d, failed candidate from follower since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state);
×
2464
    return;
×
2465
  }
2466
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
1,675✔
2467
  pSyncNode->roleTimeMs = taosGetTimestampMs();
1,675✔
2468
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
1,675✔
2469
  sInfo("vgId:%d, become candidate from follower. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
1,675!
2470
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
2471

2472
  sNTrace(pSyncNode, "follower to candidate");
1,675!
2473
}
2474

2475
int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) {
×
2476
  if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) return TSDB_CODE_SYN_INTERNAL_ERROR;
×
2477
  syncNodeBecomeLeader(pSyncNode, "assigned leader to leader");
×
2478

2479
  sNTrace(pSyncNode, "assigned leader to leader");
×
2480

2481
  int32_t ret = syncNodeAppendNoop(pSyncNode);
×
2482
  if (ret < 0) {
×
2483
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, tstrerror(ret));
×
2484
  }
2485

2486
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
×
2487
  sInfo("vgId:%d, become leader from assigned leader. term:%" PRId64 ", commit index:%" PRId64
×
2488
        "assigned commit index:%" PRId64 ", last index:%" PRId64,
2489
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, pSyncNode->assignedCommitIndex,
2490
        lastIndex);
2491
  return 0;
×
2492
}
2493

2494
// just called by syncNodeVoteForSelf
2495
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
1,786✔
2496
  SyncTerm storeTerm = raftStoreGetTerm(pSyncNode);
1,786✔
2497
  if (term != storeTerm) {
1,786!
2498
    sError("vgId:%d, failed to vote for term, term:%" PRId64 ", storeTerm:%" PRId64, pSyncNode->vgId, term, storeTerm);
×
2499
    return;
×
2500
  }
2501
  sTrace("vgId:%d, begin hasVoted", pSyncNode->vgId);
1,786!
2502
  bool voted = raftStoreHasVoted(pSyncNode);
1,786✔
2503
  if (voted) {
1,786!
2504
    sError("vgId:%d, failed to vote for term since not voted", pSyncNode->vgId);
×
2505
    return;
×
2506
  }
2507

2508
  raftStoreVote(pSyncNode, pRaftId);
1,786✔
2509
}
2510

2511
// simulate get vote from outside
2512
void syncNodeVoteForSelf(SSyncNode* pSyncNode, SyncTerm currentTerm) {
1,786✔
2513
  syncNodeVoteForTerm(pSyncNode, currentTerm, &pSyncNode->myRaftId);
1,786✔
2514

2515
  SRpcMsg rpcMsg = {0};
1,786✔
2516
  int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
1,786✔
2517
  if (ret != 0) return;
1,786!
2518

2519
  SyncRequestVoteReply* pMsg = rpcMsg.pCont;
1,786✔
2520
  pMsg->srcId = pSyncNode->myRaftId;
1,786✔
2521
  pMsg->destId = pSyncNode->myRaftId;
1,786✔
2522
  pMsg->term = currentTerm;
1,786✔
2523
  pMsg->voteGranted = true;
1,786✔
2524

2525
  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
1,786✔
2526
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
1,786✔
2527
  rpcFreeCont(rpcMsg.pCont);
1,786✔
2528
}
2529

2530
// return if has a snapshot
2531
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
23,778✔
2532
  bool      ret = false;
23,778✔
2533
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
23,778✔
2534
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
23,778!
2535
    // TODO check return value
2536
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
23,778✔
2537
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
23,778✔
2538
      ret = true;
2,943✔
2539
    }
2540
  }
2541
  return ret;
23,778✔
2542
}
2543

2544
// return max(logLastIndex, snapshotLastIndex)
2545
// if no snapshot and log, return -1
2546
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
24,074✔
2547
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
24,074✔
2548
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
24,074!
2549
    // TODO check return value
2550
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
24,074✔
2551
  }
2552
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
24,074✔
2553

2554
  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
24,074✔
2555
  return lastIndex;
24,074✔
2556
}
2557

2558
// return the last term of snapshot and log
2559
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
2560
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
23,778✔
2561
  SyncTerm lastTerm = 0;
23,778✔
2562
  if (syncNodeHasSnapshot(pSyncNode)) {
23,778✔
2563
    // has snapshot
2564
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
2,943✔
2565
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
2,943!
2566
      // TODO check return value
2567
      (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
2,943✔
2568
    }
2569

2570
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
2,943✔
2571
    if (logLastIndex > snapshot.lastApplyIndex) {
2,943✔
2572
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
1,624✔
2573
    } else {
2574
      lastTerm = snapshot.lastApplyTerm;
1,319✔
2575
    }
2576

2577
  } else {
2578
    // no snapshot
2579
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
20,835✔
2580
  }
2581

2582
  return lastTerm;
23,778✔
2583
}
2584

2585
// get last index and term along with snapshot
2586
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
20,811✔
2587
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
20,811✔
2588
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
20,811✔
2589
  return 0;
20,811✔
2590
}
2591

2592
#ifdef BUILD_NO_CALL
2593
// return append-entries first try index
2594
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
2595
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
2596
  return syncStartIndex;
2597
}
2598

2599
// if index > 0, return index - 1
2600
// else, return -1
2601
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
2602
  SyncIndex preIndex = index - 1;
2603
  if (preIndex < SYNC_INDEX_INVALID) {
2604
    preIndex = SYNC_INDEX_INVALID;
2605
  }
2606

2607
  return preIndex;
2608
}
2609

2610
// if index < 0, return SYNC_TERM_INVALID
2611
// if index == 0, return 0
2612
// if index > 0, return preTerm
2613
// if error, return SYNC_TERM_INVALID
2614
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
2615
  if (index < SYNC_INDEX_BEGIN) {
2616
    return SYNC_TERM_INVALID;
2617
  }
2618

2619
  if (index == SYNC_INDEX_BEGIN) {
2620
    return 0;
2621
  }
2622

2623
  SyncTerm  preTerm = 0;
2624
  SyncIndex preIndex = index - 1;
2625

2626
  SSyncRaftEntry* pPreEntry = NULL;
2627
  SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
2628
  LRUHandle*      h = taosLRUCacheLookup(pCache, &preIndex, sizeof(preIndex));
2629
  int32_t         code = 0;
2630
  if (h) {
2631
    pPreEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
2632
    code = 0;
2633

2634
    pSyncNode->pLogStore->cacheHit++;
2635
    sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", preIndex, pPreEntry->bytes, pPreEntry);
2636

2637
  } else {
2638
    pSyncNode->pLogStore->cacheMiss++;
2639
    sNTrace(pSyncNode, "miss cache index:%" PRId64, preIndex);
2640

2641
    code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
2642
  }
2643

2644
  SSnapshot snapshot = {.data = NULL,
2645
                        .lastApplyIndex = SYNC_INDEX_INVALID,
2646
                        .lastApplyTerm = SYNC_TERM_INVALID,
2647
                        .lastConfigIndex = SYNC_INDEX_INVALID};
2648

2649
  if (code == 0) {
2650
    if (pPreEntry == NULL) return -1;
2651
    preTerm = pPreEntry->term;
2652

2653
    if (h) {
2654
      taosLRUCacheRelease(pCache, h, false);
2655
    } else {
2656
      syncEntryDestroy(pPreEntry);
2657
    }
2658

2659
    return preTerm;
2660
  } else {
2661
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
2662
      // TODO check return value
2663
      (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
2664
      if (snapshot.lastApplyIndex == preIndex) {
2665
        return snapshot.lastApplyTerm;
2666
      }
2667
    }
2668
  }
2669

2670
  sNError(pSyncNode, "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRId64,
2671
          index, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
2672
  return SYNC_TERM_INVALID;
2673
}
2674

2675
// get pre index and term of "index"
2676
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
2677
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
2678
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
2679
  return 0;
2680
}
2681
#endif
2682

2683
static void syncNodeEqPingTimer(void* param, void* tmrId) {
255,299✔
2684
  if (!syncIsInit()) return;
255,299!
2685

2686
  int64_t    rid = (int64_t)param;
255,299✔
2687
  SSyncNode* pNode = syncNodeAcquire(rid);
255,299✔
2688

2689
  if (pNode == NULL) return;
255,300!
2690

2691
  if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
255,300!
2692
    SRpcMsg rpcMsg = {0};
255,300✔
2693
    int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
255,300✔
2694
                                    pNode->pingTimerMS, pNode);
2695
    if (code != 0) {
255,299!
2696
      sError("failed to build ping msg");
×
2697
      rpcFreeCont(rpcMsg.pCont);
×
2698
      goto _out;
×
2699
    }
2700

2701
    // sTrace("enqueue ping msg");
2702
    code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
255,299✔
2703
    if (code != 0) {
255,299✔
2704
      sError("failed to sync enqueue ping msg since %s", terrstr());
1!
2705
      rpcFreeCont(rpcMsg.pCont);
1✔
2706
      goto _out;
1✔
2707
    }
2708

2709
  _out:
255,298✔
2710
    if (taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
255,299!
2711
                     &pNode->pPingTimer))
2712
      sError("failed to reset ping timer");
×
2713
  }
2714
  syncNodeRelease(pNode);
255,300✔
2715
}
2716

2717
static void syncNodeEqElectTimer(void* param, void* tmrId) {
1,803✔
2718
  if (!syncIsInit()) return;
1,819!
2719

2720
  int64_t    rid = (int64_t)param;
1,803✔
2721
  SSyncNode* pNode = syncNodeAcquire(rid);
1,803✔
2722

2723
  if (pNode == NULL) return;
1,803✔
2724

2725
  if (pNode->syncEqMsg == NULL) {
1,798!
2726
    syncNodeRelease(pNode);
×
2727
    return;
×
2728
  }
2729

2730
  int64_t tsNow = taosGetTimestampMs();
1,798✔
2731
  if (tsNow < pNode->electTimerParam.executeTime) {
1,798✔
2732
    syncNodeRelease(pNode);
11✔
2733
    return;
11✔
2734
  }
2735

2736
  SRpcMsg rpcMsg = {0};
1,787✔
2737
  int32_t code =
2738
      syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerParam.logicClock, pNode->electTimerMS, pNode);
1,787✔
2739

2740
  if (code != 0) {
1,787!
2741
    sError("failed to build elect msg");
×
2742
    syncNodeRelease(pNode);
×
2743
    return;
×
2744
  }
2745

2746
  SyncTimeout* pTimeout = rpcMsg.pCont;
1,787✔
2747
  sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);
1,787!
2748

2749
  code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
1,787✔
2750
  if (code != 0) {
1,787!
2751
    sError("failed to sync enqueue elect msg since %s", terrstr());
×
2752
    rpcFreeCont(rpcMsg.pCont);
×
2753
    syncNodeRelease(pNode);
×
2754
    return;
×
2755
  }
2756

2757
  syncNodeRelease(pNode);
1,787✔
2758
}
2759

2760
#ifdef BUILD_NO_CALL
2761
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
2762
  if (!syncIsInit()) return;
2763

2764
  int64_t    rid = (int64_t)param;
2765
  SSyncNode* pNode = syncNodeAcquire(rid);
2766

2767
  if (pNode == NULL) return;
2768

2769
  if (pNode->totalReplicaNum > 1) {
2770
    if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
2771
      SRpcMsg rpcMsg = {0};
2772
      int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
2773
                                      pNode->heartbeatTimerMS, pNode);
2774

2775
      if (code != 0) {
2776
        sError("failed to build heartbeat msg");
2777
        goto _out;
2778
      }
2779

2780
      sTrace("vgId:%d, enqueue heartbeat timer", pNode->vgId);
2781
      code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
2782
      if (code != 0) {
2783
        sError("failed to enqueue heartbeat msg since %s", terrstr());
2784
        rpcFreeCont(rpcMsg.pCont);
2785
        goto _out;
2786
      }
2787

2788
    _out:
2789
      if (taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
2790
                       &pNode->pHeartbeatTimer) != 0)
2791
        return;
2792

2793
    } else {
2794
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
2795
             pNode->heartbeatTimerLogicClock, pNode->heartbeatTimerLogicClockUser);
2796
    }
2797
  }
2798
}
2799
#endif
2800

2801
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
49,748✔
2802
  int32_t code = 0;
49,748✔
2803
  int64_t hbDataRid = (int64_t)param;
49,748✔
2804
  int64_t tsNow = taosGetTimestampMs();
49,748✔
2805

2806
  SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid);
49,748✔
2807
  if (pData == NULL) {
49,748!
2808
    sError("hb timer get pData NULL, %" PRId64, hbDataRid);
×
2809
    return;
×
2810
  }
2811

2812
  SSyncNode* pSyncNode = syncNodeAcquire(pData->syncNodeRid);
49,748✔
2813
  if (pSyncNode == NULL) {
49,748✔
2814
    syncHbTimerDataRelease(pData);
4✔
2815
    sError("hb timer get pSyncNode NULL");
4!
2816
    return;
4✔
2817
  }
2818

2819
  SSyncTimer* pSyncTimer = pData->pTimer;
49,744✔
2820

2821
  if (!pSyncNode->isStart) {
49,744!
2822
    syncNodeRelease(pSyncNode);
×
2823
    syncHbTimerDataRelease(pData);
×
2824
    sError("vgId:%d, hb timer sync node already stop", pSyncNode->vgId);
×
2825
    return;
×
2826
  }
2827

2828
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
49,744!
2829
    syncNodeRelease(pSyncNode);
×
2830
    syncHbTimerDataRelease(pData);
×
2831
    sError("vgId:%d, hb timer sync node not leader", pSyncNode->vgId);
×
2832
    return;
×
2833
  }
2834

2835
  sTrace("vgId:%d, peer hb timer execution, rid:%" PRId64 " addr:0x%" PRIx64, pSyncNode->vgId, hbDataRid,
49,744!
2836
         pData->destId.addr);
2837

2838
  if (pSyncNode->totalReplicaNum > 1) {
49,744✔
2839
    int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
49,740✔
2840
    int64_t msgLogicClock = atomic_load_64(&pData->logicClock);
49,740✔
2841

2842
    if (timerLogicClock == msgLogicClock) {
49,740✔
2843
      if (tsNow > pData->execTime) {
49,734✔
2844
        pData->execTime += pSyncTimer->timerMS;
49,676✔
2845

2846
        SRpcMsg rpcMsg = {0};
49,676✔
2847
        if ((code = syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId)) != 0) {
49,676!
2848
          sError("vgId:%d, failed to build heartbeat msg since %s", pSyncNode->vgId, tstrerror(code));
×
2849
          syncNodeRelease(pSyncNode);
×
2850
          syncHbTimerDataRelease(pData);
×
2851
          return;
×
2852
        }
2853

2854
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);
49,676✔
2855

2856
        SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
49,676✔
2857
        pSyncMsg->srcId = pSyncNode->myRaftId;
49,676✔
2858
        pSyncMsg->destId = pData->destId;
49,676✔
2859
        pSyncMsg->term = raftStoreGetTerm(pSyncNode);
49,676✔
2860
        pSyncMsg->commitIndex = pSyncNode->commitIndex;
49,676✔
2861
        pSyncMsg->minMatchIndex = pSyncNode->minMatchIndex;
49,676✔
2862
        pSyncMsg->privateTerm = 0;
49,676✔
2863
        pSyncMsg->timeStamp = tsNow;
49,676✔
2864

2865
        // update reset time
2866
        int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
49,676✔
2867
        pSyncTimer->timeStamp = tsNow;
49,676✔
2868

2869
        // send msg
2870
        TRACE_SET_MSGID(&(rpcMsg.info.traceId), tGenIdPI64());
49,676✔
2871
        TRACE_SET_ROOTID(&(rpcMsg.info.traceId), tGenIdPI64());
49,676✔
2872
        syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime, &(rpcMsg.info.traceId));
49,676✔
2873
        int ret = syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
49,676✔
2874
        if (ret != 0) {
49,676✔
2875
          sError("vgId:%d, failed to send heartbeat since %s", pSyncNode->vgId, tstrerror(ret));
103!
2876
        }
2877
      }
2878

2879
      if (syncIsInit()) {
49,734!
2880
        sTrace("vgId:%d, reset peer hb timer at %d", pSyncNode->vgId, pSyncTimer->timerMS);
49,734!
2881
        bool stopped = taosTmrResetPriority(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, (void*)hbDataRid,
49,734✔
2882
                                            syncEnv()->pTimerManager, &pSyncTimer->pTimer, 2);
49,734✔
2883
        if (stopped) sError("vgId:%d, reset peer hb timer error, %s", pSyncNode->vgId, tstrerror(code));
49,734!
2884

2885
      } else {
2886
        sError("sync env is stop, reset peer hb timer error");
×
2887
      }
2888

2889
    } else {
2890
      sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64, pSyncNode->vgId,
6!
2891
             timerLogicClock, msgLogicClock);
2892
    }
2893
  }
2894

2895
  syncHbTimerDataRelease(pData);
49,744✔
2896
  syncNodeRelease(pSyncNode);
49,744✔
2897
}
2898

2899
#ifdef BUILD_NO_CALL
2900
static void deleteCacheEntry(const void* key, size_t keyLen, void* value, void* ud) {
2901
  (void)ud;
2902
  taosMemoryFree(value);
2903
}
2904

2905
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
2906
  SSyncLogStoreData* pData = pLogStore->data;
2907
  sNTrace(pData->pSyncNode, "in cache index:%" PRId64 ", bytes:%u, %p", pEntry->index, pEntry->bytes, pEntry);
2908

2909
  int32_t   code = 0;
2910
  int32_t   entryLen = sizeof(*pEntry) + pEntry->dataLen;
2911
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
2912
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW, NULL);
2913
  if (status != TAOS_LRU_STATUS_OK) {
2914
    code = -1;
2915
  }
2916

2917
  return code;
2918
}
2919
#endif
2920

2921
void syncBuildConfigFromReq(SAlterVnodeReplicaReq* pReq, SSyncCfg* cfg) {  // TODO SAlterVnodeReplicaReq name is proper?
×
2922
  cfg->replicaNum = 0;
×
2923
  cfg->totalReplicaNum = 0;
×
2924
  int32_t code = 0;
×
2925

2926
  for (int i = 0; i < pReq->replica; ++i) {
×
2927
    SNodeInfo* pNode = &cfg->nodeInfo[i];
×
2928
    pNode->nodeId = pReq->replicas[i].id;
×
2929
    pNode->nodePort = pReq->replicas[i].port;
×
2930
    tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
×
2931
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2932
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
×
2933
    sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d, update:%d", pReq->vgId, i, pNode->nodeFqdn,
×
2934
          pNode->nodePort, pNode->nodeId, pNode->nodeRole, update);
2935
    cfg->replicaNum++;
×
2936
  }
2937
  if (pReq->selfIndex != -1) {
×
2938
    cfg->myIndex = pReq->selfIndex;
×
2939
  }
2940
  for (int i = cfg->replicaNum; i < pReq->replica + pReq->learnerReplica; ++i) {
×
2941
    SNodeInfo* pNode = &cfg->nodeInfo[i];
×
2942
    pNode->nodeId = pReq->learnerReplicas[cfg->totalReplicaNum].id;
×
2943
    pNode->nodePort = pReq->learnerReplicas[cfg->totalReplicaNum].port;
×
2944
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
2945
    tstrncpy(pNode->nodeFqdn, pReq->learnerReplicas[cfg->totalReplicaNum].fqdn, sizeof(pNode->nodeFqdn));
×
2946
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
×
2947
    sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d, update:%d", pReq->vgId, i, pNode->nodeFqdn,
×
2948
          pNode->nodePort, pNode->nodeId, pNode->nodeRole, update);
2949
    cfg->totalReplicaNum++;
×
2950
  }
2951
  cfg->totalReplicaNum += pReq->replica;
×
2952
  if (pReq->learnerSelfIndex != -1) {
×
2953
    cfg->myIndex = pReq->replica + pReq->learnerSelfIndex;
×
2954
  }
2955
  cfg->changeVersion = pReq->changeVersion;
×
2956
}
×
2957

2958
int32_t syncNodeCheckChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry) {
×
2959
  int32_t code = 0;
×
2960
  if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) {
×
2961
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
2962
  }
2963

2964
  SMsgHead* head = (SMsgHead*)pEntry->data;
×
2965
  void*     pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
×
2966

2967
  SAlterVnodeTypeReq req = {0};
×
2968
  if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
×
2969
    code = TSDB_CODE_INVALID_MSG;
×
2970
    TAOS_RETURN(code);
×
2971
  }
2972

2973
  SSyncCfg cfg = {0};
×
2974
  syncBuildConfigFromReq(&req, &cfg);
×
2975

2976
  if (cfg.totalReplicaNum >= 1 &&
×
2977
      (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
×
2978
    bool incfg = false;
×
2979
    for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
2980
      if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
2981
          ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
2982
        incfg = true;
×
2983
        break;
×
2984
      }
2985
    }
2986

2987
    if (!incfg) {
×
2988
      SyncTerm currentTerm = raftStoreGetTerm(ths);
×
2989
      SRaftId  id = EMPTY_RAFT_ID;
×
2990
      syncNodeStepDown(ths, currentTerm, id);
×
2991
      return 1;
×
2992
    }
2993
  }
2994
  return 0;
×
2995
}
2996

2997
void syncNodeLogConfigInfo(SSyncNode* ths, SSyncCfg* cfg, char* str) {
×
2998
  sInfo("vgId:%d, %s. SyncNode, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64
×
2999
        ", changeVersion:%d, "
3000
        "restoreFinish:%d",
3001
        ths->vgId, str, ths->replicaNum, ths->peersNum, ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion,
3002
        ths->restoreFinish);
3003

3004
  sInfo("vgId:%d, %s, myNodeInfo, clusterId:0x%" PRIx64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str,
×
3005
        ths->myNodeInfo.clusterId, ths->myNodeInfo.nodeId, ths->myNodeInfo.nodeFqdn, ths->myNodeInfo.nodePort,
3006
        ths->myNodeInfo.nodeRole);
3007

3008
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3009
    sInfo("vgId:%d, %s, peersNodeInfo%d, clusterId:0x%" PRIx64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str,
×
3010
          i, ths->peersNodeInfo[i].clusterId, ths->peersNodeInfo[i].nodeId, ths->peersNodeInfo[i].nodeFqdn,
3011
          ths->peersNodeInfo[i].nodePort, ths->peersNodeInfo[i].nodeRole);
3012
  }
3013

3014
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3015
    char    buf[256];
3016
    int32_t len = 256;
×
3017
    int32_t n = 0;
×
3018
    n += tsnprintf(buf + n, len - n, "%s", "{");
×
3019
    for (int i = 0; i < ths->peersEpset->numOfEps; i++) {
×
3020
      n += tsnprintf(buf + n, len - n, "%s:%d%s", ths->peersEpset->eps[i].fqdn, ths->peersEpset->eps[i].port,
×
3021
                     (i + 1 < ths->peersEpset->numOfEps ? ", " : ""));
×
3022
    }
3023
    n += tsnprintf(buf + n, len - n, "%s", "}");
×
3024

3025
    sInfo("vgId:%d, %s, peersEpset%d, %s, inUse:%d", ths->vgId, str, i, buf, ths->peersEpset->inUse);
×
3026
  }
3027

3028
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3029
    sInfo("vgId:%d, %s, peersId%d, addr:0x%" PRIx64, ths->vgId, str, i, ths->peersId[i].addr);
×
3030
  }
3031

3032
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3033
    sInfo("vgId:%d, %s, nodeInfo%d, clusterId:0x%" PRIx64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str, i,
×
3034
          ths->raftCfg.cfg.nodeInfo[i].clusterId, ths->raftCfg.cfg.nodeInfo[i].nodeId,
3035
          ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, ths->raftCfg.cfg.nodeInfo[i].nodePort,
3036
          ths->raftCfg.cfg.nodeInfo[i].nodeRole);
3037
  }
3038

3039
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3040
    sInfo("vgId:%d, %s, replicasId%d, addr:0x%" PRIx64, ths->vgId, str, i, ths->replicasId[i].addr);
×
3041
  }
3042
}
×
3043

3044
int32_t syncNodeRebuildPeerAndCfg(SSyncNode* ths, SSyncCfg* cfg) {
×
3045
  int32_t i = 0;
×
3046

3047
  // change peersNodeInfo
3048
  i = 0;
×
3049
  for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3050
    if (!(strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3051
          ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)) {
×
3052
      ths->peersNodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
×
3053
      ths->peersNodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
×
3054
      tstrncpy(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
×
3055
      ths->peersNodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
×
3056
      ths->peersNodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
×
3057

3058
      syncUtilNodeInfo2EpSet(&ths->peersNodeInfo[i], &ths->peersEpset[i]);
×
3059

3060
      if (syncUtilNodeInfo2RaftId(&ths->peersNodeInfo[i], ths->vgId, &ths->peersId[i]) == false) {
×
3061
        sError("vgId:%d, failed to determine raft member id, peer:%d", ths->vgId, i);
×
3062
        return terrno;
×
3063
      }
3064

3065
      i++;
×
3066
    }
3067
  }
3068
  ths->peersNum = i;
×
3069

3070
  // change cfg nodeInfo
3071
  ths->raftCfg.cfg.replicaNum = 0;
×
3072
  i = 0;
×
3073
  for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3074
    if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3075
      ths->raftCfg.cfg.replicaNum++;
×
3076
    }
3077
    ths->raftCfg.cfg.nodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
×
3078
    ths->raftCfg.cfg.nodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
×
3079
    tstrncpy(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
×
3080
    ths->raftCfg.cfg.nodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
×
3081
    ths->raftCfg.cfg.nodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
×
3082
    if ((strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3083
         ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)) {
×
3084
      ths->raftCfg.cfg.myIndex = i;
×
3085
    }
3086
    i++;
×
3087
  }
3088
  ths->raftCfg.cfg.totalReplicaNum = i;
×
3089

3090
  return 0;
×
3091
}
3092

3093
void syncNodeChangePeerAndCfgToVoter(SSyncNode* ths, SSyncCfg* cfg) {
×
3094
  // change peersNodeInfo
3095
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3096
    for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3097
      if (strcmp(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3098
          ths->peersNodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort) {
×
3099
        if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3100
          ths->peersNodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3101
        }
3102
      }
3103
    }
3104
  }
3105

3106
  // change cfg nodeInfo
3107
  ths->raftCfg.cfg.replicaNum = 0;
×
3108
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3109
    for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3110
      if (strcmp(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3111
          ths->raftCfg.cfg.nodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort) {
×
3112
        if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3113
          ths->raftCfg.cfg.nodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3114
          ths->raftCfg.cfg.replicaNum++;
×
3115
        }
3116
      }
3117
    }
3118
  }
3119
}
×
3120

3121
int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum) {
×
3122
  int32_t code = 0;
×
3123
  // 1.rebuild replicasId, remove deleted one
3124
  SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
3125
  memcpy(oldReplicasId, ths->replicasId, sizeof(oldReplicasId));
×
3126

3127
  ths->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3128
  ths->totalReplicaNum = ths->raftCfg.cfg.totalReplicaNum;
×
3129
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3130
    if (syncUtilNodeInfo2RaftId(&ths->raftCfg.cfg.nodeInfo[i], ths->vgId, &ths->replicasId[i]) == false) return terrno;
×
3131
  }
3132

3133
  // 2.rebuild MatchIndex, remove deleted one
3134
  SSyncIndexMgr* oldIndex = ths->pMatchIndex;
×
3135

3136
  ths->pMatchIndex = syncIndexMgrCreate(ths);
×
3137
  if (ths->pMatchIndex == NULL) {
×
3138
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3139
    if (terrno != 0) code = terrno;
×
3140
    TAOS_RETURN(code);
×
3141
  }
3142

3143
  syncIndexMgrCopyIfExist(ths->pMatchIndex, oldIndex, oldReplicasId);
×
3144

3145
  syncIndexMgrDestroy(oldIndex);
×
3146

3147
  // 3.rebuild NextIndex, remove deleted one
3148
  SSyncIndexMgr* oldNextIndex = ths->pNextIndex;
×
3149

3150
  ths->pNextIndex = syncIndexMgrCreate(ths);
×
3151
  if (ths->pNextIndex == NULL) {
×
3152
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3153
    if (terrno != 0) code = terrno;
×
3154
    TAOS_RETURN(code);
×
3155
  }
3156

3157
  syncIndexMgrCopyIfExist(ths->pNextIndex, oldNextIndex, oldReplicasId);
×
3158

3159
  syncIndexMgrDestroy(oldNextIndex);
×
3160

3161
  // 4.rebuild pVotesGranted, pVotesRespond, no need to keep old vote state, only rebuild
3162
  voteGrantedUpdate(ths->pVotesGranted, ths);
×
3163
  votesRespondUpdate(ths->pVotesRespond, ths);
×
3164

3165
  // 5.rebuild logReplMgr
3166
  for (int i = 0; i < oldtotalReplicaNum; ++i) {
×
3167
    sDebug("vgId:%d, old logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId,
×
3168
           i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
3169
           ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
3170
  }
3171

3172
  SSyncLogReplMgr* oldLogReplMgrs = NULL;
×
3173
  int64_t          length = sizeof(SSyncLogReplMgr) * (TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA);
×
3174
  oldLogReplMgrs = taosMemoryMalloc(length);
×
3175
  if (NULL == oldLogReplMgrs) return terrno;
×
3176
  memset(oldLogReplMgrs, 0, length);
×
3177

3178
  for (int i = 0; i < oldtotalReplicaNum; i++) {
×
3179
    oldLogReplMgrs[i] = *(ths->logReplMgrs[i]);
×
3180
  }
3181

3182
  syncNodeLogReplDestroy(ths);
×
3183
  if ((code = syncNodeLogReplInit(ths)) != 0) {
×
3184
    taosMemoryFree(oldLogReplMgrs);
×
3185
    TAOS_RETURN(code);
×
3186
  }
3187

3188
  for (int i = 0; i < ths->totalReplicaNum; ++i) {
×
3189
    for (int j = 0; j < oldtotalReplicaNum; j++) {
×
3190
      if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
×
3191
        *(ths->logReplMgrs[i]) = oldLogReplMgrs[j];
×
3192
        ths->logReplMgrs[i]->peerId = i;
×
3193
      }
3194
    }
3195
  }
3196

3197
  for (int i = 0; i < ths->totalReplicaNum; ++i) {
×
3198
    sDebug("vgId:%d, new logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId,
×
3199
           i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
3200
           ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
3201
  }
3202

3203
  // 6.rebuild sender
3204
  for (int i = 0; i < oldtotalReplicaNum; ++i) {
×
3205
    sDebug("vgId:%d, old sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i,
×
3206
           ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
3207
  }
3208

3209
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3210
    if (ths->senders[i] != NULL) {
×
3211
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", ths->vgId, ths->senders[i]);
×
3212

3213
      if (snapshotSenderIsStart(ths->senders[i])) {
×
3214
        snapshotSenderStop(ths->senders[i], false);
×
3215
      }
3216

3217
      snapshotSenderDestroy(ths->senders[i]);
×
3218
      ths->senders[i] = NULL;
×
3219
    }
3220
  }
3221

3222
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3223
    SSyncSnapshotSender* pSender = NULL;
×
3224
    int32_t              code = snapshotSenderCreate(ths, i, &pSender);
×
3225
    if (pSender == NULL) return terrno = code;
×
3226

3227
    ths->senders[i] = pSender;
×
3228
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
×
3229
  }
3230

3231
  for (int i = 0; i < ths->totalReplicaNum; i++) {
×
3232
    sDebug("vgId:%d, new sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i,
×
3233
           ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
3234
  }
3235

3236
  // 7.rebuild synctimer
3237
  if ((code = syncNodeStopHeartbeatTimer(ths)) != 0) {
×
3238
    taosMemoryFree(oldLogReplMgrs);
×
3239
    TAOS_RETURN(code);
×
3240
  }
3241

3242
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3243
    if ((code = syncHbTimerInit(ths, &ths->peerHeartbeatTimerArr[i], ths->replicasId[i])) != 0) {
×
3244
      taosMemoryFree(oldLogReplMgrs);
×
3245
      TAOS_RETURN(code);
×
3246
    }
3247
  }
3248

3249
  if ((code = syncNodeStartHeartbeatTimer(ths)) != 0) {
×
3250
    taosMemoryFree(oldLogReplMgrs);
×
3251
    TAOS_RETURN(code);
×
3252
  }
3253

3254
  // 8.rebuild peerStates
3255
  SPeerState oldState[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0};
×
3256
  for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) {
×
3257
    oldState[i] = ths->peerStates[i];
×
3258
  }
3259

3260
  for (int i = 0; i < ths->totalReplicaNum; i++) {
×
3261
    for (int j = 0; j < oldtotalReplicaNum; j++) {
×
3262
      if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
×
3263
        ths->peerStates[i] = oldState[j];
×
3264
      }
3265
    }
3266
  }
3267

3268
  taosMemoryFree(oldLogReplMgrs);
×
3269

3270
  return 0;
×
3271
}
3272

3273
void syncNodeChangeToVoter(SSyncNode* ths) {
×
3274
  // replicasId, only need to change replicaNum when 1->3
3275
  ths->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3276
  sDebug("vgId:%d, totalReplicaNum:%d", ths->vgId, ths->totalReplicaNum);
×
3277
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
×
3278
    sDebug("vgId:%d, i:%d, replicaId.addr:0x%" PRIx64, ths->vgId, i, ths->replicasId[i].addr);
×
3279
  }
3280

3281
  // pMatchIndex, pNextIndex, only need to change replicaNum when 1->3
3282
  ths->pMatchIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3283
  ths->pNextIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3284

3285
  sDebug("vgId:%d, pMatchIndex->totalReplicaNum:%d", ths->vgId, ths->pMatchIndex->totalReplicaNum);
×
3286
  for (int32_t i = 0; i < ths->pMatchIndex->totalReplicaNum; ++i) {
×
3287
    sDebug("vgId:%d, i:%d, match.index:%" PRId64, ths->vgId, i, ths->pMatchIndex->index[i]);
×
3288
  }
3289

3290
  // pVotesGranted, pVotesRespond
3291
  voteGrantedUpdate(ths->pVotesGranted, ths);
×
3292
  votesRespondUpdate(ths->pVotesRespond, ths);
×
3293

3294
  // logRepMgrs
3295
  // no need to change logRepMgrs when 1->3
3296
}
×
3297

3298
void syncNodeResetPeerAndCfg(SSyncNode* ths) {
×
3299
  SNodeInfo node = {0};
×
3300
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3301
    memcpy(&ths->peersNodeInfo[i], &node, sizeof(SNodeInfo));
×
3302
  }
3303

3304
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3305
    memcpy(&ths->raftCfg.cfg.nodeInfo[i], &node, sizeof(SNodeInfo));
×
3306
  }
3307
}
×
3308

3309
int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str) {
×
3310
  int32_t code = 0;
×
3311
  if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) {
×
3312
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3313
  }
3314

3315
  SMsgHead* head = (SMsgHead*)pEntry->data;
×
3316
  void*     pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
×
3317

3318
  SAlterVnodeTypeReq req = {0};
×
3319
  if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
×
3320
    code = TSDB_CODE_INVALID_MSG;
×
3321
    TAOS_RETURN(code);
×
3322
  }
3323

3324
  SSyncCfg cfg = {0};
×
3325
  syncBuildConfigFromReq(&req, &cfg);
×
3326

3327
  if (cfg.changeVersion <= ths->raftCfg.cfg.changeVersion) {
×
3328
    sInfo(
×
3329
        "vgId:%d, skip conf change entry since lower version. "
3330
        "this entry, index:%" PRId64 ", term:%" PRId64
3331
        ", totalReplicaNum:%d, changeVersion:%d; "
3332
        "current node, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64 ", changeVersion:%d",
3333
        ths->vgId, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum,
3334
        ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion);
3335
    return 0;
×
3336
  }
3337

3338
  if (strcmp(str, "Commit") == 0) {
×
3339
    sInfo(
×
3340
        "vgId:%d, change config from %s. "
3341
        "this, i:%" PRId64
3342
        ", trNum:%d, vers:%d; "
3343
        "node, rNum:%d, pNum:%d, trNum:%d, "
3344
        "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
3345
        "), "
3346
        "cond:(next i:%" PRId64 ", t:%" PRId64 " ==%s)",
3347
        ths->vgId, str, pEntry->index - 1, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum,
3348
        ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, ths->pLogBuf->matchIndex,
3349
        ths->pLogBuf->endIndex, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType));
3350
  } else {
3351
    sInfo(
×
3352
        "vgId:%d, change config from %s. "
3353
        "this, i:%" PRId64 ", t:%" PRId64
3354
        ", trNum:%d, vers:%d; "
3355
        "node, rNum:%d, pNum:%d, trNum:%d, "
3356
        "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
3357
        "), "
3358
        "cond:(pre i:%" PRId64 "==ci:%" PRId64 ", bci:%" PRId64 ")",
3359
        ths->vgId, str, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum,
3360
        ths->peersNum, ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
3361
        ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex, pEntry->index - 1, ths->commitIndex,
3362
        ths->pLogBuf->commitIndex);
3363
  }
3364

3365
  syncNodeLogConfigInfo(ths, &cfg, "before config change");
×
3366

3367
  int32_t oldTotalReplicaNum = ths->totalReplicaNum;
×
3368

3369
  if (cfg.totalReplicaNum == 1 || cfg.totalReplicaNum == 2) {  // remove replica
×
3370

3371
    bool incfg = false;
×
3372
    for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3373
      if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3374
          ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3375
        incfg = true;
×
3376
        break;
×
3377
      }
3378
    }
3379

3380
    if (incfg) {  // remove other
×
3381
      syncNodeResetPeerAndCfg(ths);
×
3382

3383
      // no need to change myNodeInfo
3384

3385
      if ((code = syncNodeRebuildPeerAndCfg(ths, &cfg)) != 0) {
×
3386
        TAOS_RETURN(code);
×
3387
      };
3388

3389
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3390
        TAOS_RETURN(code);
×
3391
      };
3392
    } else {  // remove myself
3393
      // no need to do anything actually, to change the following to reduce distruptive server chance
3394

3395
      syncNodeResetPeerAndCfg(ths);
×
3396

3397
      // change myNodeInfo
3398
      ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3399

3400
      // change peer and cfg
3401
      ths->peersNum = 0;
×
3402
      memcpy(&ths->raftCfg.cfg.nodeInfo[0], &ths->myNodeInfo, sizeof(SNodeInfo));
×
3403
      ths->raftCfg.cfg.replicaNum = 0;
×
3404
      ths->raftCfg.cfg.totalReplicaNum = 1;
×
3405

3406
      // change other
3407
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3408
        TAOS_RETURN(code);
×
3409
      }
3410

3411
      // change state
3412
      ths->state = TAOS_SYNC_STATE_LEARNER;
×
3413
    }
3414

3415
    ths->restoreFinish = false;
×
3416
  } else {                            // add replica, or change replica type
3417
    if (ths->totalReplicaNum == 3) {  // change replica type
×
3418
      sInfo("vgId:%d, begin change replica type", ths->vgId);
×
3419

3420
      // change myNodeInfo
3421
      for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3422
        if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3423
            ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3424
          if (cfg.nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3425
            ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3426
          }
3427
        }
3428
      }
3429

3430
      // change peer and cfg
3431
      syncNodeChangePeerAndCfgToVoter(ths, &cfg);
×
3432

3433
      // change other
3434
      syncNodeChangeToVoter(ths);
×
3435

3436
      // change state
3437
      if (ths->state == TAOS_SYNC_STATE_LEARNER) {
×
3438
        if (ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3439
          ths->state = TAOS_SYNC_STATE_FOLLOWER;
×
3440
        }
3441
      }
3442

3443
      ths->restoreFinish = false;
×
3444
    } else {  // add replica
3445
      sInfo("vgId:%d, begin add replica", ths->vgId);
×
3446

3447
      // no need to change myNodeInfo
3448

3449
      // change peer and cfg
3450
      if ((code = syncNodeRebuildPeerAndCfg(ths, &cfg)) != 0) {
×
3451
        TAOS_RETURN(code);
×
3452
      };
3453

3454
      // change other
3455
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3456
        TAOS_RETURN(code);
×
3457
      };
3458

3459
      // no need to change state
3460

3461
      if (ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) {
×
3462
        ths->restoreFinish = false;
×
3463
      }
3464
    }
3465
  }
3466

3467
  ths->quorum = syncUtilQuorum(ths->replicaNum);
×
3468

3469
  ths->raftCfg.lastConfigIndex = pEntry->index;
×
3470
  ths->raftCfg.cfg.lastIndex = pEntry->index;
×
3471
  ths->raftCfg.cfg.changeVersion = cfg.changeVersion;
×
3472

3473
  syncNodeLogConfigInfo(ths, &cfg, "after config change");
×
3474

3475
  if ((code = syncWriteCfgFile(ths)) != 0) {
×
3476
    sError("vgId:%d, failed to create sync cfg file", ths->vgId);
×
3477
    TAOS_RETURN(code);
×
3478
  };
3479

3480
  TAOS_RETURN(code);
×
3481
}
3482

3483
int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry, SRpcMsg* pMsg) {
19,387,848✔
3484
  int32_t code = -1;
19,387,848✔
3485
  if (pEntry->dataLen < sizeof(SMsgHead)) {
19,387,848!
3486
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
3487
    sError("vgId:%d, msg:%p, cannot append an invalid client request with no msg head, type:%s dataLen:%d", ths->vgId,
×
3488
           pMsg, TMSG_INFO(pEntry->originalRpcType), pEntry->dataLen);
3489
    syncEntryDestroy(pEntry);
×
3490
    pEntry = NULL;
×
3491
    goto _out;
×
3492
  }
3493

3494
  // append to log buffer
3495
  if ((code = syncLogBufferAppend(ths->pLogBuf, ths, pEntry)) < 0) {
19,387,848✔
3496
    sError("vgId:%d, index:%" PRId64 ", failed to enqueue sync log buffer", ths->vgId, pEntry->index);
165!
3497
    int32_t ret = 0;
165✔
3498
    if ((ret = syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno, false)) != 0) {
165!
3499
      sError("vgId:%d, index:%" PRId64 ", failed to execute fsm since %s", ths->vgId, pEntry->index, tstrerror(ret));
×
3500
    }
3501
    syncEntryDestroy(pEntry);
×
3502
    pEntry = NULL;
×
3503
    goto _out;
×
3504
  }
3505

3506
  code = 0;
19,387,797✔
3507
_out:;
19,387,797✔
3508
  // proceed match index, with replicating on needed
3509
  SyncIndex       matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL, "Append", pMsg);
19,387,797✔
3510
  const STraceId* trace = pEntry ? &pEntry->originRpcTraceId : NULL;
19,387,768✔
3511

3512
  if (pEntry != NULL) {
19,387,768!
3513
    sGDebug(trace,
19,387,786!
3514
            "vgId:%d, index:%" PRId64 ", raft entry appended, msg:%p term:%" PRId64 " buf:[%" PRId64 " %" PRId64
3515
            " %" PRId64 ", %" PRId64 ")",
3516
            ths->vgId, pEntry->index, pMsg, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
3517
            ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
3518
  }
3519

3520
  if (code == 0 && ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
19,387,771!
3521
    int64_t index = syncNodeUpdateAssignedCommitIndex(ths, matchIndex);
×
3522
    sGTrace(trace, "vgId:%d, index:%" PRId64 ", update assigned commit, msg:%p", ths->vgId, index, pMsg);
×
3523

3524
    if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
×
3525
        syncLogBufferCommit(ths->pLogBuf, ths, ths->assignedCommitIndex, trace, "append-entry") < 0) {
×
3526
      sGError(trace, "vgId:%d, index:%" PRId64 ", failed to commit, msg:%p commit index:%" PRId64, ths->vgId, index,
×
3527
              pMsg, ths->commitIndex);
3528
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
3529
    }
3530
  }
3531

3532
  // multi replica
3533
  if (ths->replicaNum > 1) {
19,387,590✔
3534
    TAOS_RETURN(code);
189,614✔
3535
  }
3536

3537
  // single replica
3538
  SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, matchIndex);
19,197,976✔
3539
  sGTrace(trace, "vgId:%d, index:%" PRId64 ", raft entry update commit, msg:%p return index:%" PRId64, ths->vgId,
19,197,953!
3540
          matchIndex, pMsg, returnIndex);
3541

3542
  if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
38,396,216!
3543
      (code = syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex, trace, "append-entry")) < 0) {
19,198,104✔
3544
    sGError(trace,
×
3545
            "vgId:%d, index:%" PRId64 ", failed to commit, msg:%p commit index:%" PRId64 " return index:%" PRId64,
3546
            ths->vgId, matchIndex, pMsg, ths->commitIndex, returnIndex);
3547
  }
3548

3549
  TAOS_RETURN(code);
19,198,112✔
3550
}
3551

3552
bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
19,375,235✔
3553
  if (pSyncNode->totalReplicaNum == 1) {
19,375,235✔
3554
    return false;
18,444,643✔
3555
  }
3556

3557
  int32_t toCount = 0;
930,592✔
3558
  int64_t tsNow = taosGetTimestampMs();
930,756✔
3559
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
2,049,394✔
3560
    if (pSyncNode->peersNodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
1,118,635✔
3561
      continue;
741,269✔
3562
    }
3563
    int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
377,366✔
3564
    if (recvTime == 0 || recvTime == -1) {
377,369!
3565
      continue;
×
3566
    }
3567

3568
    if (tsNow - recvTime > tsHeartbeatTimeout) {
377,369✔
3569
      toCount++;
8,289✔
3570
    }
3571
  }
3572

3573
  bool b = (toCount >= pSyncNode->quorum ? true : false);
930,759✔
3574

3575
  return b;
930,759✔
3576
}
3577

3578
bool syncNodeSnapshotSending(SSyncNode* pSyncNode) {
×
3579
  if (pSyncNode == NULL) return false;
×
3580
  bool b = false;
×
3581
  for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
×
3582
    if (pSyncNode->senders[i] != NULL && pSyncNode->senders[i]->start) {
×
3583
      b = true;
×
3584
      break;
×
3585
    }
3586
  }
3587
  return b;
×
3588
}
3589

3590
bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) {
×
3591
  if (pSyncNode == NULL) return false;
×
3592
  if (pSyncNode->pNewNodeReceiver == NULL) return false;
×
3593
  if (pSyncNode->pNewNodeReceiver->start) return true;
×
3594
  return false;
×
3595
}
3596

3597
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
14,510✔
3598
  int32_t   code = 0;
14,510✔
3599
  SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
14,510✔
3600
  SyncTerm  term = raftStoreGetTerm(ths);
14,510✔
3601

3602
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
14,510✔
3603
  if (pEntry == NULL) {
14,510!
3604
    code = TSDB_CODE_OUT_OF_MEMORY;
×
3605
    TAOS_RETURN(code);
×
3606
  }
3607

3608
  code = syncNodeAppend(ths, pEntry, NULL);
14,510✔
3609
  TAOS_RETURN(code);
14,509✔
3610
}
3611

3612
#ifdef BUILD_NO_CALL
3613
static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
3614
  int32_t ret = 0;
3615

3616
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
3617
  SyncTerm        term = raftStoreGetTerm(ths);
3618
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
3619
  if (pEntry == NULL) return -1;
3620

3621
  LRUHandle* h = NULL;
3622

3623
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
3624
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry, false);
3625
    if (code != 0) {
3626
      sError("append noop error");
3627
      return -1;
3628
    }
3629

3630
    syncCacheEntry(ths->pLogStore, pEntry, &h);
3631
  }
3632

3633
  if (h) {
3634
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
3635
  } else {
3636
    syncEntryDestroy(pEntry);
3637
  }
3638

3639
  return ret;
3640
}
3641
#endif
3642

3643
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
47,186✔
3644
  SyncHeartbeat* pMsg = pRpcMsg->pCont;
47,186✔
3645
  bool           resetElect = false;
47,186✔
3646

3647
  int64_t tsMs = taosGetTimestampMs();
47,186✔
3648

3649
  int64_t lastRecvTime = syncIndexMgrGetRecvTime(ths->pNextIndex, &(pMsg->srcId));
47,186✔
3650
  syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);
47,187✔
3651
  syncIndexMgrIncRecvCount(ths->pNextIndex, &(pMsg->srcId));
47,187✔
3652

3653
  int64_t netElapsed = tsMs - pMsg->timeStamp;
47,187✔
3654
  int64_t timeDiff = tsMs - lastRecvTime;
47,187✔
3655
  syncLogRecvHeartbeat(ths, pMsg, netElapsed, &pRpcMsg->info.traceId, timeDiff, pRpcMsg);
47,187✔
3656

3657
  if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) {
47,186✔
3658
    sWarn(
19!
3659
        "vgId:%d, drop heartbeat msg from dnode:%d, because it come from another cluster:%d, differ from current "
3660
        "cluster:%d",
3661
        ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), CID(&(ths->myRaftId)));
3662
    return 0;
19✔
3663
  }
3664

3665
  SyncTerm currentTerm = raftStoreGetTerm(ths);
47,167✔
3666

3667
  if (pMsg->term > currentTerm && ths->state == TAOS_SYNC_STATE_LEARNER) {
47,167✔
3668
    raftStoreSetTerm(ths, pMsg->term);
377✔
3669
    currentTerm = pMsg->term;
377✔
3670
  }
3671

3672
  int64_t tsMs2 = taosGetTimestampMs();
47,167✔
3673

3674
  int64_t processTime = tsMs2 - tsMs;
47,167✔
3675
  if (processTime > SYNC_HEARTBEAT_SLOW_MS) {
47,167!
3676
    sGError(&pRpcMsg->info.traceId,
×
3677
            "vgId:%d, process sync-heartbeat msg from dnode:%d, commit-index:%" PRId64 ", cluster:%d msgTerm:%" PRId64
3678
            " currentTerm:%" PRId64 ", processTime:%" PRId64,
3679
            ths->vgId, DID(&(pMsg->srcId)), pMsg->commitIndex, CID(&(pMsg->srcId)), pMsg->term, currentTerm,
3680
            processTime);
3681
  } else {
3682
    sGDebug(&pRpcMsg->info.traceId,
47,167!
3683
            "vgId:%d, process sync-heartbeat msg from dnode:%d, commit-index:%" PRId64 ", cluster:%d msgTerm:%" PRId64
3684
            " currentTerm:%" PRId64 ", processTime:%" PRId64,
3685
            ths->vgId, DID(&(pMsg->srcId)), pMsg->commitIndex, CID(&(pMsg->srcId)), pMsg->term, currentTerm,
3686
            processTime);
3687
  }
3688

3689
  if (pMsg->term == currentTerm &&
47,167✔
3690
      (ths->state != TAOS_SYNC_STATE_LEADER && ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
46,795!
3691
    resetElect = true;
46,796✔
3692

3693
    ths->minMatchIndex = pMsg->minMatchIndex;
46,796✔
3694

3695
    if (ths->state == TAOS_SYNC_STATE_FOLLOWER || ths->state == TAOS_SYNC_STATE_LEARNER) {
46,796✔
3696
      SRpcMsg rpcMsgLocalCmd = {0};
46,786✔
3697
      TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId));
46,786!
3698
      rpcMsgLocalCmd.info.traceId = pRpcMsg->info.traceId;
46,786✔
3699

3700
      SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
46,786✔
3701
      pSyncMsg->cmd =
46,786✔
3702
          (ths->state == TAOS_SYNC_STATE_LEARNER) ? SYNC_LOCAL_CMD_LEARNER_CMT : SYNC_LOCAL_CMD_FOLLOWER_CMT;
46,786✔
3703
      pSyncMsg->commitIndex = pMsg->commitIndex;
46,786✔
3704
      pSyncMsg->currentTerm = pMsg->term;
46,786✔
3705

3706
      if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
46,786!
3707
        int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
46,786✔
3708
        if (code != 0) {
46,785!
3709
          sError("vgId:%d, failed to enqueue sync-local-cmd msg(cmd=commit) from heartbeat since %s",
×
3710
                 ths->vgId, tstrerror(code));
3711
          rpcFreeCont(rpcMsgLocalCmd.pCont);
×
3712
        } else {
3713
          sGTrace(&pRpcMsg->info.traceId,
46,785!
3714
                  "vgId:%d, enqueue sync-local-cmd msg(cmd=commit) from heartbeat, commit-index:%" PRId64
3715
                  ", term:%" PRId64,
3716
                  ths->vgId, pMsg->commitIndex, pMsg->term);
3717
        }
3718
      }
3719
    }
3720
  }
3721

3722
  if (pMsg->term >= currentTerm &&
47,166!
3723
      (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
47,167!
3724
    SRpcMsg rpcMsgLocalCmd = {0};
×
3725
    TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId));
×
3726
    rpcMsgLocalCmd.info.traceId = pRpcMsg->info.traceId;
×
3727

3728
    SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
×
3729
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
×
3730
    pSyncMsg->currentTerm = pMsg->term;
×
3731
    pSyncMsg->commitIndex = pMsg->commitIndex;
×
3732

3733
    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
×
3734
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
×
3735
      if (code != 0) {
×
3736
        sError("vgId:%d, sync enqueue sync-local-cmd msg(cmd=step-down) error, code:%d", ths->vgId, code);
×
3737
        rpcFreeCont(rpcMsgLocalCmd.pCont);
×
3738
      } else {
3739
        sTrace("vgId:%d, sync enqueue sync-local-cmd msg(cmd=step-down), new-term:%" PRId64, ths->vgId, pMsg->term);
×
3740
      }
3741
    }
3742
  }
3743

3744
  SRpcMsg rpcMsg = {0};
47,166✔
3745
  TAOS_CHECK_RETURN(syncBuildHeartbeatReply(&rpcMsg, ths->vgId));
47,166!
3746
  SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
47,167✔
3747
  pMsgReply->destId = pMsg->srcId;
47,167✔
3748
  pMsgReply->srcId = ths->myRaftId;
47,167✔
3749
  pMsgReply->term = currentTerm;
47,167✔
3750
  pMsgReply->privateTerm = 8864;  // magic number
47,167✔
3751
  pMsgReply->startTime = ths->startTime;
47,167✔
3752
  pMsgReply->timeStamp = tsMs;
47,167✔
3753
  rpcMsg.info.traceId = pRpcMsg->info.traceId;
47,167✔
3754

3755
  // reply
3756
  int64_t tsMs3 = taosGetTimestampMs();
47,168✔
3757

3758
  int64_t processTime2 = tsMs3 - tsMs2;
47,168✔
3759
  TRACE_SET_MSGID(&(rpcMsg.info.traceId), tGenIdPI64());
47,168✔
3760
  if (processTime2 > SYNC_HEARTBEAT_SLOW_MS) {
47,168✔
3761
    sGError(&rpcMsg.info.traceId,
1!
3762
            "vgId:%d, send sync-heartbeat-reply to dnode:%d term:%" PRId64 " timestamp:%" PRId64
3763
            ", processTime:%" PRId64,
3764
            ths->vgId, DID(&(pMsgReply->destId)), pMsgReply->term, pMsgReply->timeStamp, processTime2);
3765
  } else {
3766
    if(tsSyncLogHeartbeat){
47,167!
3767
      sGInfo(&rpcMsg.info.traceId,
×
3768
            "vgId:%d, send sync-heartbeat-reply to dnode:%d term:%" PRId64 " timestamp:%" PRId64
3769
            ", processTime:%" PRId64,
3770
            ths->vgId, DID(&(pMsgReply->destId)), pMsgReply->term, pMsgReply->timeStamp, processTime2);
3771
    }
3772
    else{
3773
      sGDebug(&rpcMsg.info.traceId,
47,167!
3774
            "vgId:%d, send sync-heartbeat-reply to dnode:%d term:%" PRId64 " timestamp:%" PRId64
3775
            ", processTime:%" PRId64,
3776
            ths->vgId, DID(&(pMsgReply->destId)), pMsgReply->term, pMsgReply->timeStamp, processTime2);
3777
    }
3778
  }
3779

3780
  TAOS_CHECK_RETURN(syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg));
47,168!
3781

3782
  if (resetElect) syncNodeResetElectTimer(ths);
47,168✔
3783
  return 0;
47,168✔
3784
}
3785

3786
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
46,519✔
3787
  int32_t code = 0;
46,519✔
3788

3789
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
46,519✔
3790
  SSyncLogReplMgr*    pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId);
46,519✔
3791
  if (pMgr == NULL) {
46,519!
3792
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3793
    if (terrno != 0) code = terrno;
×
3794
    sError("vgId:%d, failed to get log repl mgr for the peer at addr 0x016%" PRIx64, ths->vgId, pMsg->srcId.addr);
×
3795
    TAOS_RETURN(code);
×
3796
  }
3797

3798
  int64_t tsMs = taosGetTimestampMs();
46,519✔
3799
  int64_t lastRecvTime = syncIndexMgrGetRecvTime(ths->pMatchIndex, &pMsg->srcId);
46,519✔
3800
  syncLogRecvHeartbeatReply(ths, pMsg, tsMs - pMsg->timeStamp, &pRpcMsg->info.traceId, tsMs - lastRecvTime);
46,519✔
3801

3802
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
46,519✔
3803
  syncIndexMgrIncRecvCount(ths->pMatchIndex, &(pMsg->srcId));
46,519✔
3804

3805
  return syncLogReplProcessHeartbeatReply(pMgr, ths, pMsg);
46,519✔
3806
}
3807

3808
#ifdef BUILD_NO_CALL
3809
int32_t syncNodeOnHeartbeatReplyOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
3810
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
3811

3812
  int64_t tsMs = taosGetTimestampMs();
3813
  int64_t timeDiff = tsMs - pMsg->timeStamp;
3814
  syncLogRecvHeartbeatReply(ths, pMsg, timeDiff, &pRpcMsg->info.traceId, timeDiff);
3815

3816
  // update last reply time, make decision whether the other node is alive or not
3817
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
3818
  return 0;
3819
}
3820
#endif
3821

3822
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
46,786✔
3823
  SyncLocalCmd* pMsg = pRpcMsg->pCont;
46,786✔
3824
  syncLogRecvLocalCmd(ths, pMsg, &pRpcMsg->info.traceId);
46,786✔
3825

3826
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
46,786!
3827
    SRaftId id = EMPTY_RAFT_ID;
×
3828
    syncNodeStepDown(ths, pMsg->currentTerm, id);
×
3829

3830
  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT || pMsg->cmd == SYNC_LOCAL_CMD_LEARNER_CMT) {
93,572!
3831
    if (syncLogBufferIsEmpty(ths->pLogBuf)) {
46,786!
3832
      sError("vgId:%d, sync log buffer is empty", ths->vgId);
×
3833
      return 0;
×
3834
    }
3835
    SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf);
46,786✔
3836
    if (matchTerm < 0) {
46,786!
3837
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3838
    }
3839
    if (pMsg->currentTerm == matchTerm) {
46,786✔
3840
      SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
42,103✔
3841
      sTrace("vgId:%d, raft entry update commit, return index:%" PRId64, ths->vgId, returnIndex);
42,102!
3842
    }
3843
    if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
93,571!
3844
        syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex, &pRpcMsg->info.traceId, "heartbeat") < 0) {
46,785✔
3845
      sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64, ths->vgId, terrstr(),
×
3846
             ths->commitIndex);
3847
    }
3848
  } else {
3849
    sError("error local cmd");
×
3850
  }
3851

3852
  return 0;
46,786✔
3853
}
3854

3855
// TLA+ Spec
3856
// ClientRequest(i, v) ==
3857
//     /\ state[i] = Leader
3858
//     /\ LET entry == [term  |-> currentTerm[i],
3859
//                      value |-> v]
3860
//            newLog == Append(log[i], entry)
3861
//        IN  log' = [log EXCEPT ![i] = newLog]
3862
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
3863
//                    leaderVars, commitIndex>>
3864
//
3865

3866
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
19,373,198✔
3867
  sGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, process client request", ths->vgId, pMsg);
19,373,198!
3868
  int32_t code = 0;
19,373,198✔
3869

3870
  SyncIndex       index = syncLogBufferGetEndIndex(ths->pLogBuf);
19,373,198✔
3871
  SyncTerm        term = raftStoreGetTerm(ths);
19,373,526✔
3872
  SSyncRaftEntry* pEntry = NULL;
19,373,507✔
3873
  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
19,373,507✔
3874
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index, &pMsg->info.traceId);
306,248✔
3875
  } else {
3876
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
19,067,259✔
3877
  }
3878

3879
  if (pEntry == NULL) {
19,373,477!
3880
    sGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to process client request since %s", ths->vgId, pMsg,
×
3881
            terrstr());
3882
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3883
  }
3884

3885
  // 1->2, config change is add in write thread, and will continue in sync thread
3886
  // need save message for it
3887
  if (pMsg->msgType == TDMT_SYNC_CONFIG_CHANGE) {
19,373,477!
3888
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
×
3889
    uint64_t  seqNum = syncRespMgrAdd(ths->pSyncRespMgr, &stub);
×
3890
    pEntry->seqNum = seqNum;
×
3891
  }
3892

3893
  if (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
19,373,477!
3894
    if (pRetIndex) {
19,373,477✔
3895
      (*pRetIndex) = index;
19,067,174✔
3896
    }
3897

3898
    if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
19,373,477!
3899
      int32_t code = syncNodeCheckChangeConfig(ths, pEntry);
×
3900
      if (code < 0) {
×
3901
        sGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to check change config since %s", ths->vgId, pMsg,
×
3902
                terrstr());
3903
        syncEntryDestroy(pEntry);
×
3904
        pEntry = NULL;
×
3905
        TAOS_RETURN(code);
×
3906
      }
3907

3908
      if (code > 0) {
×
3909
        SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
×
3910
        int32_t num = syncRespMgrGetAndDel(ths->pSyncRespMgr, pEntry->seqNum, &rsp.info);
×
3911
        sGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, get response stub for config change, seqNum:%" PRIu64 " num:%d",
×
3912
                ths->vgId, pMsg, pEntry->seqNum, num);
3913
        if (rsp.info.handle != NULL) {
×
3914
          tmsgSendRsp(&rsp);
×
3915
        }
3916
        syncEntryDestroy(pEntry);
×
3917
        pEntry = NULL;
×
3918
        TAOS_RETURN(code);
×
3919
      }
3920
    }
3921

3922
    code = syncNodeAppend(ths, pEntry, pMsg);
19,373,477✔
3923
    return code;
19,373,095✔
3924
  } else {
3925
    syncEntryDestroy(pEntry);
×
3926
    pEntry = NULL;
×
3927
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3928
  }
3929
}
3930

3931
const char* syncStr(ESyncState state) {
4,230,428✔
3932
  switch (state) {
4,230,428!
3933
    case TAOS_SYNC_STATE_FOLLOWER:
724,844✔
3934
      return "follower";
724,844✔
3935
    case TAOS_SYNC_STATE_CANDIDATE:
6,267✔
3936
      return "candidate";
6,267✔
3937
    case TAOS_SYNC_STATE_LEADER:
3,478,001✔
3938
      return "leader";
3,478,001✔
3939
    case TAOS_SYNC_STATE_ERROR:
×
3940
      return "error";
×
3941
    case TAOS_SYNC_STATE_OFFLINE:
5,505✔
3942
      return "offline";
5,505✔
3943
    case TAOS_SYNC_STATE_LEARNER:
15,913✔
3944
      return "learner";
15,913✔
3945
    case TAOS_SYNC_STATE_ASSIGNED_LEADER:
×
3946
      return "assigned leader";
×
3947
    default:
×
3948
      return "unknown";
×
3949
  }
3950
}
3951

3952
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
2,313✔
3953
  for (int32_t i = 0; i < pNewCfg->totalReplicaNum; ++i) {
2,610!
3954
    SRaftId raftId = {
2,610✔
3955
        .addr = SYNC_ADDR(&pNewCfg->nodeInfo[i]),
2,610✔
3956
        .vgId = ths->vgId,
2,610✔
3957
    };
3958

3959
    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
2,610✔
3960
      pNewCfg->myIndex = i;
2,313✔
3961
      return 0;
2,313✔
3962
    }
3963
  }
3964

3965
  return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3966
}
3967

3968
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
19,375,210✔
3969
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
19,375,210!
3970
}
3971

3972
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
14,653,557✔
3973
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
22,400,610✔
3974
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
22,400,590✔
3975
      return true;
14,653,535✔
3976
    }
3977
  }
3978
  return false;
20✔
3979
}
3980

3981
SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
129,768✔
3982
  SSyncSnapshotSender* pSender = NULL;
129,768✔
3983
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
424,921✔
3984
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
295,132✔
3985
      pSender = (ths->senders)[i];
129,786✔
3986
    }
3987
  }
3988
  return pSender;
129,789✔
3989
}
3990

3991
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
38,594✔
3992
  SSyncTimer* pTimer = NULL;
38,594✔
3993
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
164,597✔
3994
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
126,005✔
3995
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
38,593✔
3996
    }
3997
  }
3998
  return pTimer;
38,592✔
3999
}
4000

4001
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
111,511✔
4002
  SPeerState* pState = NULL;
111,511✔
4003
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
377,486✔
4004
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
265,973✔
4005
      pState = &((ths->peerStates)[i]);
111,515✔
4006
    }
4007
  }
4008
  return pState;
111,513✔
4009
}
4010

4011
#ifdef BUILD_NO_CALL
4012
bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
4013
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
4014
  if (pState == NULL) {
4015
    sError("vgId:%d, replica maybe dropped", ths->vgId);
4016
    return false;
4017
  }
4018

4019
  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
4020
  int64_t   tsNow = taosGetTimestampMs();
4021

4022
  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
4023
    return false;
4024
  }
4025

4026
  return true;
4027
}
4028

4029
bool syncNodeCanChange(SSyncNode* pSyncNode) {
4030
  if (pSyncNode->changing) {
4031
    sError("sync cannot change");
4032
    return false;
4033
  }
4034

4035
  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
4036
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
4037
    if (pSyncNode->commitIndex != lastIndex) {
4038
      sError("sync cannot change2");
4039
      return false;
4040
    }
4041
  }
4042

4043
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
4044
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
4045
    if (pSender != NULL && pSender->start) {
4046
      sError("sync cannot change3");
4047
      return false;
4048
    }
4049
  }
4050

4051
  return true;
4052
}
4053
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc