• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4712

06 Sep 2025 04:27PM UTC coverage: 58.144% (-1.0%) from 59.134%
#4712

push

travis-ci

GitHub
test: update case description (#32878)

133123 of 291691 branches covered (45.64%)

Branch coverage included in aggregate %.

201244 of 283375 relevant lines covered (71.02%)

5637899.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.66
/source/libs/sync/src/syncMain.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "sync.h"
18
#include "syncAppendEntries.h"
19
#include "syncAppendEntriesReply.h"
20
#include "syncCommit.h"
21
#include "syncElection.h"
22
#include "syncEnv.h"
23
#include "syncIndexMgr.h"
24
#include "syncInt.h"
25
#include "syncMessage.h"
26
#include "syncPipeline.h"
27
#include "syncRaftCfg.h"
28
#include "syncRaftLog.h"
29
#include "syncRaftStore.h"
30
#include "syncReplication.h"
31
#include "syncRequestVote.h"
32
#include "syncRequestVoteReply.h"
33
#include "syncRespMgr.h"
34
#include "syncSnapshot.h"
35
#include "syncTimeout.h"
36
#include "syncUtil.h"
37
#include "syncVoteMgr.h"
38
#include "tglobal.h"
39
#include "tmisce.h"
40
#include "tref.h"
41

42
static void    syncNodeEqPingTimer(void* param, void* tmrId);
43
static void    syncNodeEqElectTimer(void* param, void* tmrId);
44
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
45
static int32_t syncNodeAppendNoop(SSyncNode* ths);
46
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
47
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
48
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
49
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
50
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
51
static int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
52
static bool    syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
53
static int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
54
static bool    syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);
55

56
static bool    syncNodeCanChange(SSyncNode* pSyncNode);
57
static int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
58
static int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
59
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);
60

61
static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
62

63
int64_t syncOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
14,382✔
64
  sInfo("vgId:%d, start to open sync", pSyncInfo->vgId);
14,382✔
65
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo, vnodeVersion);
14,384✔
66
  if (pSyncNode == NULL) {
14,387!
67
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
×
68
    return -1;
×
69
  }
70

71
  pSyncNode->rid = syncNodeAdd(pSyncNode);
14,387✔
72
  if (pSyncNode->rid < 0) {
14,387!
73
    syncNodeClose(pSyncNode);
×
74
    return -1;
×
75
  }
76

77
  pSyncNode->pingBaseLine = pSyncInfo->pingMs;
14,387✔
78
  pSyncNode->pingTimerMS = pSyncInfo->pingMs;
14,387✔
79
  pSyncNode->electBaseLine = pSyncInfo->electMs;
14,387✔
80
  pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
14,387✔
81
  pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
14,387✔
82
  pSyncNode->msgcb = pSyncInfo->msgcb;
14,387✔
83
  sInfo("vgId:%d, sync opened", pSyncInfo->vgId);
14,387✔
84
  return pSyncNode->rid;
14,387✔
85
}
86

87
int32_t syncStart(int64_t rid) {
14,387✔
88
  int32_t    code = 0;
14,387✔
89
  int32_t    vgId = 0;
14,387✔
90
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
14,387✔
91
  if (pSyncNode == NULL) {
14,387!
92
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
93
    if (terrno != 0) code = terrno;
×
94
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
95
    TAOS_RETURN(code);
×
96
  }
97
  vgId = pSyncNode->vgId;
14,387✔
98
  sInfo("vgId:%d, begin to start sync", pSyncNode->vgId);
14,387✔
99

100
  if ((code = syncNodeRestore(pSyncNode)) < 0) {
14,387!
101
    sError("vgId:%d, failed to restore sync log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
102
    goto _err;
×
103
  }
104
  sInfo("vgId:%d, sync node restore is executed", pSyncNode->vgId);
14,387✔
105

106
  if ((code = syncNodeStart(pSyncNode)) < 0) {
14,387!
107
    sError("vgId:%d, failed to start sync node since %s", pSyncNode->vgId, tstrerror(code));
×
108
    goto _err;
×
109
  }
110
  sInfo("vgId:%d, sync node start is executed", pSyncNode->vgId);
14,387✔
111

112
  syncNodeRelease(pSyncNode);
14,387✔
113

114
  sInfo("vgId:%d, sync started", vgId);
14,387✔
115

116
  TAOS_RETURN(code);
14,387✔
117

118
_err:
×
119
  syncNodeRelease(pSyncNode);
×
120
  TAOS_RETURN(code);
×
121
}
122

123
int32_t syncNodeGetConfig(int64_t rid, SSyncCfg* cfg) {
22,671✔
124
  int32_t    code = 0;
22,671✔
125
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
22,671✔
126

127
  if (pSyncNode == NULL) {
22,672!
128
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
129
    if (terrno != 0) code = terrno;
×
130
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
131
    TAOS_RETURN(code);
×
132
  }
133

134
  *cfg = pSyncNode->raftCfg.cfg;
22,672✔
135

136
  syncNodeRelease(pSyncNode);
22,672✔
137

138
  return 0;
22,672✔
139
}
140

141
void syncStop(int64_t rid) {
14,387✔
142
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
14,387✔
143
  if (pSyncNode != NULL) {
14,387!
144
    pSyncNode->isStart = false;
14,387✔
145
    syncNodeRelease(pSyncNode);
14,387✔
146
    syncNodeRemove(rid);
14,387✔
147
  }
148
}
14,385✔
149

150
void syncPreStop(int64_t rid) {
14,387✔
151
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
14,387✔
152
  if (pSyncNode != NULL) {
14,387!
153
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
14,387!
154
      sInfo("vgId:%d, stop snapshot receiver", pSyncNode->vgId);
×
155
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
156
    }
157
    syncNodePreClose(pSyncNode);
14,387✔
158
    syncNodeRelease(pSyncNode);
14,387✔
159
  }
160
}
14,387✔
161

162
void syncPostStop(int64_t rid) {
12,467✔
163
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
12,467✔
164
  if (pSyncNode != NULL) {
12,467!
165
    syncNodePostClose(pSyncNode);
12,467✔
166
    syncNodeRelease(pSyncNode);
12,467✔
167
  }
168
}
12,467✔
169

170
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
1,796✔
171
  if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
1,796!
172
  return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
1,796!
173
}
174

175
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
1,919✔
176
  int32_t    code = 0;
1,919✔
177
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
1,919✔
178
  if (pSyncNode == NULL) {
1,919!
179
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
180
    if (terrno != 0) code = terrno;
×
181
    TAOS_RETURN(code);
×
182
  }
183

184
  if (pSyncNode->raftCfg.lastConfigIndex >= pNewCfg->lastIndex) {
1,919✔
185
    syncNodeRelease(pSyncNode);
123✔
186
    sInfo("vgId:%d, no need Reconfig, current index:%" PRId64 ", new index:%" PRId64, pSyncNode->vgId,
123!
187
          pSyncNode->raftCfg.lastConfigIndex, pNewCfg->lastIndex);
188
    return 0;
123✔
189
  }
190

191
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
1,796!
192
    syncNodeRelease(pSyncNode);
×
193
    code = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
×
194
    sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
×
195
    TAOS_RETURN(code);
×
196
  }
197

198
  TAOS_CHECK_RETURN(syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg));
1,796!
199

200
  if (syncNodeDoConfigChange(pSyncNode, pNewCfg, pNewCfg->lastIndex) != 0) {
1,796!
201
    code = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
×
202
    sError("vgId:%d, failed to reconfig since do change error", pSyncNode->vgId);
×
203
    TAOS_RETURN(code);
×
204
  }
205

206
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER || pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
1,796!
207
    // TODO check return value
208
    TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1,606!
209

210
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
25,696✔
211
      TAOS_CHECK_RETURN(syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]));
24,090!
212
    }
213

214
    TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
1,606!
215
    // syncNodeReplicate(pSyncNode);
216
  }
217

218
  syncNodeRelease(pSyncNode);
1,796✔
219
  TAOS_RETURN(code);
1,796✔
220
}
221

222
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
924,200✔
223
  int32_t code = -1;
924,200✔
224
  if (!syncIsInit()) {
924,200!
225
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
226
    if (terrno != 0) code = terrno;
×
227
    TAOS_RETURN(code);
×
228
  }
229

230
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
924,200✔
231
  if (pSyncNode == NULL) {
924,206✔
232
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
3✔
233
    if (terrno != 0) code = terrno;
3!
234
    TAOS_RETURN(code);
×
235
  }
236

237
  switch (pMsg->msgType) {
924,203!
238
    case TDMT_SYNC_HEARTBEAT:
60,167✔
239
      code = syncNodeOnHeartbeat(pSyncNode, pMsg);
60,167✔
240
      break;
60,167✔
241
    case TDMT_SYNC_HEARTBEAT_REPLY:
59,498✔
242
      code = syncNodeOnHeartbeatReply(pSyncNode, pMsg);
59,498✔
243
      break;
59,498✔
244
    case TDMT_SYNC_TIMEOUT:
61,969✔
245
      code = syncNodeOnTimeout(pSyncNode, pMsg);
61,969✔
246
      break;
61,962✔
247
    case TDMT_SYNC_TIMEOUT_ELECTION:
1,346✔
248
      code = syncNodeOnTimeout(pSyncNode, pMsg);
1,346✔
249
      break;
1,346✔
250
    case TDMT_SYNC_CLIENT_REQUEST:
191,378✔
251
      code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
191,378✔
252
      break;
191,378✔
253
    case TDMT_SYNC_REQUEST_VOTE:
2,431✔
254
      code = syncNodeOnRequestVote(pSyncNode, pMsg);
2,431✔
255
      break;
2,431✔
256
    case TDMT_SYNC_REQUEST_VOTE_REPLY:
2,391✔
257
      code = syncNodeOnRequestVoteReply(pSyncNode, pMsg);
2,391✔
258
      break;
2,391✔
259
    case TDMT_SYNC_APPEND_ENTRIES:
242,221✔
260
      code = syncNodeOnAppendEntries(pSyncNode, pMsg);
242,221✔
261
      break;
242,221✔
262
    case TDMT_SYNC_APPEND_ENTRIES_REPLY:
239,479✔
263
      code = syncNodeOnAppendEntriesReply(pSyncNode, pMsg);
239,479✔
264
      break;
239,479✔
265
    case TDMT_SYNC_SNAPSHOT_SEND:
1,624✔
266
      code = syncNodeOnSnapshot(pSyncNode, pMsg);
1,624✔
267
      break;
1,624✔
268
    case TDMT_SYNC_SNAPSHOT_RSP:
1,624✔
269
      code = syncNodeOnSnapshotRsp(pSyncNode, pMsg);
1,624✔
270
      break;
1,624✔
271
    case TDMT_SYNC_LOCAL_CMD:
60,054✔
272
      code = syncNodeOnLocalCmd(pSyncNode, pMsg);
60,054✔
273
      break;
60,053✔
274
    case TDMT_SYNC_FORCE_FOLLOWER:
16✔
275
      code = syncForceBecomeFollower(pSyncNode, pMsg);
16✔
276
      break;
16✔
277
    case TDMT_SYNC_SET_ASSIGNED_LEADER:
5✔
278
      code = syncBecomeAssignedLeader(pSyncNode, pMsg);
5✔
279
      break;
5✔
280
    default:
×
281
      code = TSDB_CODE_MSG_NOT_PROCESSED;
×
282
  }
283

284
  syncNodeRelease(pSyncNode);
924,195✔
285
  if (code != 0) {
924,191✔
286
    sDebug("vgId:%d, failed to process sync msg:%p type:%s since %s", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
22!
287
           tstrerror(code));
288
  }
289
  TAOS_RETURN(code);
924,191✔
290
}
291

292
int32_t syncLeaderTransfer(int64_t rid) {
14,387✔
293
  int32_t    code = 0;
14,387✔
294
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
14,387✔
295
  if (pSyncNode == NULL) {
14,387!
296
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
297
    if (terrno != 0) code = terrno;
×
298
    TAOS_RETURN(code);
×
299
  }
300

301
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
14,387✔
302
  syncNodeRelease(pSyncNode);
14,387✔
303
  return ret;
14,387✔
304
}
305

306
int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
16✔
307
  SRaftId id = {0};
16✔
308
  syncNodeBecomeFollower(ths, id, "force election");
16✔
309

310
  SRpcMsg rsp = {
16✔
311
      .code = 0,
312
      .pCont = pRpcMsg->info.rsp,
16✔
313
      .contLen = pRpcMsg->info.rspLen,
16✔
314
      .info = pRpcMsg->info,
315
  };
316
  tmsgSendRsp(&rsp);
16✔
317

318
  return 0;
16✔
319
}
320

321
int32_t syncBecomeAssignedLeader(SSyncNode* ths, SRpcMsg* pRpcMsg) {
5✔
322
  int32_t code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
5✔
323
  void*   pHead = NULL;
5✔
324
  int32_t contLen = 0;
5✔
325

326
  SVArbSetAssignedLeaderReq req = {0};
5✔
327
  if (tDeserializeSVArbSetAssignedLeaderReq((char*)pRpcMsg->pCont + sizeof(SMsgHead), pRpcMsg->contLen, &req) != 0) {
5!
328
    sError("vgId:%d, failed to deserialize SVArbSetAssignedLeaderReq", ths->vgId);
×
329
    code = TSDB_CODE_INVALID_MSG;
×
330
    goto _OVER;
×
331
  }
332

333
  if (ths->arbTerm > req.arbTerm) {
5!
334
    sInfo("vgId:%d, skip to set assigned leader, msg with lower term, local:%" PRId64 "msg:%" PRId64, ths->vgId,
×
335
          ths->arbTerm, req.arbTerm);
336
    goto _OVER;
×
337
  }
338

339
  ths->arbTerm = TMAX(req.arbTerm, ths->arbTerm);
5✔
340

341
  if (!req.force) {
5✔
342
    if (strncmp(req.memberToken, ths->arbToken, TSDB_ARB_TOKEN_SIZE) != 0) {
4✔
343
      sInfo("vgId:%d, skip to set assigned leader, token mismatch, local:%s, msg:%s", ths->vgId, ths->arbToken,
3!
344
            req.memberToken);
345
      goto _OVER;
3✔
346
    }
347
  }
348

349
  if (ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
2!
350
    code = TSDB_CODE_SUCCESS;
2✔
351
    raftStoreNextTerm(ths);
2✔
352
    if (terrno != TSDB_CODE_SUCCESS) {
2!
353
      code = terrno;
×
354
      sError("vgId:%d, failed to set next term since:%s", ths->vgId, tstrerror(code));
×
355
      goto _OVER;
×
356
    }
357
    syncNodeBecomeAssignedLeader(ths);
2✔
358

359
    if ((code = syncNodeAppendNoop(ths)) < 0) {
2!
360
      sError("vgId:%d, assigned leader failed to append noop entry since %s", ths->vgId, tstrerror(code));
×
361
    }
362
  }
363

364
  SVArbSetAssignedLeaderRsp rsp = {0};
2✔
365
  rsp.arbToken = req.arbToken;
2✔
366
  rsp.memberToken = req.memberToken;
2✔
367
  rsp.vgId = ths->vgId;
2✔
368

369
  contLen = tSerializeSVArbSetAssignedLeaderRsp(NULL, 0, &rsp);
2✔
370
  if (contLen <= 0) {
2!
371
    code = TSDB_CODE_OUT_OF_MEMORY;
×
372
    sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId);
×
373
    goto _OVER;
×
374
  }
375
  pHead = rpcMallocCont(contLen);
2✔
376
  if (!pHead) {
2!
377
    code = terrno;
×
378
    sError("vgId:%d, failed to malloc memory for SVArbSetAssignedLeaderRsp", ths->vgId);
×
379
    goto _OVER;
×
380
  }
381
  if (tSerializeSVArbSetAssignedLeaderRsp(pHead, contLen, &rsp) <= 0) {
2!
382
    code = TSDB_CODE_OUT_OF_MEMORY;
×
383
    sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId);
×
384
    rpcFreeCont(pHead);
×
385
    goto _OVER;
×
386
  }
387

388
  code = TSDB_CODE_SUCCESS;
2✔
389

390
_OVER:;
5✔
391
  SRpcMsg rspMsg = {
5✔
392
      .code = code,
393
      .pCont = pHead,
394
      .contLen = contLen,
395
      .info = pRpcMsg->info,
396
  };
397

398
  tmsgSendRsp(&rspMsg);
5✔
399

400
  tFreeSVArbSetAssignedLeaderReq(&req);
5✔
401
  TAOS_RETURN(code);
5✔
402
}
403

404
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
×
405
  int32_t    code = 0;
×
406
  SSyncNode* pNode = syncNodeAcquire(rid);
×
407
  if (pNode == NULL) {
×
408
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
409
    if (terrno != 0) code = terrno;
×
410
    TAOS_RETURN(code);
×
411
  }
412

413
  SRpcMsg rpcMsg = {0, .info.notFreeAhandle = 1};
×
414
  int32_t ret = syncRespMgrGetAndDel(pNode->pSyncRespMgr, seq, &rpcMsg.info);
×
415
  rpcMsg.code = TSDB_CODE_SYN_TIMEOUT;
×
416

417
  syncNodeRelease(pNode);
×
418
  if (ret == 1) {
×
419
    sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle);
×
420
    code = rpcSendResponse(&rpcMsg);
×
421
    return code;
×
422
  } else {
423
    sError("no message handle to send timeout response, seq:%" PRId64, seq);
×
424
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
425
  }
426
}
427

428
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
63,828✔
429
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;
63,828✔
430

431
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
190,754✔
432
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
126,926✔
433
    if (minMatchIndex == SYNC_INDEX_INVALID) {
126,926✔
434
      minMatchIndex = matchIndex;
67,412✔
435
    } else if (matchIndex > 0 && matchIndex < minMatchIndex) {
59,514✔
436
      minMatchIndex = matchIndex;
1,101✔
437
    }
438
  }
439
  return minMatchIndex;
63,828✔
440
}
441

442
static SyncIndex syncLogRetentionIndex(SSyncNode* pSyncNode, int64_t bytes) {
1,321✔
443
  return pSyncNode->pLogStore->syncLogIndexRetention(pSyncNode->pLogStore, bytes);
1,321✔
444
}
445

446
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
26,214✔
447
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
26,214✔
448
  int32_t    code = 0;
26,214✔
449
  if (pSyncNode == NULL) {
26,214✔
450
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
6✔
451
    if (terrno != 0) code = terrno;
6!
452
    sError("sync begin snapshot error");
6!
453
    TAOS_RETURN(code);
6✔
454
  }
455

456
  SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
26,208✔
457
  SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
26,208✔
458
  bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);
26,208✔
459

460
  if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
26,208!
461
    sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
95!
462
    syncNodeRelease(pSyncNode);
95✔
463
    return 0;
95✔
464
  }
465

466
  int64_t logRetention = 0;
26,113✔
467

468
  if (syncNodeIsMnode(pSyncNode)) {
26,113✔
469
    // mnode
470
    logRetention = tsMndLogRetention;
3,536✔
471
  } else {
472
    // vnode
473
    if (pSyncNode->replicaNum > 1) {
22,577✔
474
      logRetention = SYNC_VNODE_LOG_RETENTION;
879✔
475
    }
476
  }
477

478
  if (pSyncNode->totalReplicaNum > 1) {
26,113✔
479
    if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER &&
1,323✔
480
        pSyncNode->state != TAOS_SYNC_STATE_LEARNER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
132!
481
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate or unknown state, do not delete wal",
2!
482
              lastApplyIndex);
483
      syncNodeRelease(pSyncNode);
2✔
484
      return 0;
2✔
485
    }
486
    SyncIndex retentionIndex =
1,321✔
487
        TMAX(pSyncNode->minMatchIndex, syncLogRetentionIndex(pSyncNode, SYNC_WAL_LOG_RETENTION_SIZE));
1,321✔
488
    logRetention += TMAX(0, lastApplyIndex - retentionIndex);
1,321✔
489
  }
490

491
_DEL_WAL:
24,790✔
492

493
  do {
494
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
26,111✔
495
    SyncIndex          snapshotVer = walGetSnapshotVer(pData->pWal);
26,111✔
496
    SyncIndex          walCommitVer = walGetCommittedVer(pData->pWal);
26,111✔
497
    SyncIndex          wallastVer = walGetLastVer(pData->pWal);
26,111✔
498
    if (lastApplyIndex <= walCommitVer) {
26,111!
499
      SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);
26,111✔
500

501
      if (snapshottingIndex == SYNC_INDEX_INVALID) {
26,111!
502
        atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
26,111✔
503
        pSyncNode->snapshottingTime = taosGetTimestampMs();
26,110✔
504

505
        code = walBeginSnapshot(pData->pWal, lastApplyIndex, logRetention);
26,110✔
506
        if (code == 0) {
26,111!
507
          sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
26,111✔
508
                  pSyncNode->snapshottingIndex, lastApplyIndex);
509
        } else {
510
          sNError(pSyncNode, "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64,
×
511
                  terrstr(), pSyncNode->snapshottingIndex, lastApplyIndex);
512
          atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
×
513
        }
514

515
      } else {
516
        sNTrace(pSyncNode, "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64,
×
517
                snapshottingIndex, lastApplyIndex);
518
      }
519
    }
520
  } while (0);
521

522
  syncNodeRelease(pSyncNode);
26,111✔
523
  TAOS_RETURN(code);
26,109✔
524
}
525

526
int32_t syncEndSnapshot(int64_t rid) {
26,205✔
527
  int32_t    code = 0;
26,205✔
528
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
26,205✔
529
  if (pSyncNode == NULL) {
26,208!
530
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
531
    if (terrno != 0) code = terrno;
×
532
    sError("sync end snapshot error");
×
533
    TAOS_RETURN(code);
×
534
  }
535

536
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
26,208✔
537
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
26,111✔
538
    code = walEndSnapshot(pData->pWal);
26,111✔
539
    if (code != 0) {
26,111!
540
      sNError(pSyncNode, "wal snapshot end error since:%s", tstrerror(code));
×
541
      syncNodeRelease(pSyncNode);
×
542
      TAOS_RETURN(code);
×
543
    } else {
544
      sNTrace(pSyncNode, "wal snapshot end, index:%" PRId64, atomic_load_64(&pSyncNode->snapshottingIndex));
26,111✔
545
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
26,111✔
546
    }
547
  }
548

549
  syncNodeRelease(pSyncNode);
26,208✔
550
  TAOS_RETURN(code);
26,208✔
551
}
552

553
bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
2,884,841✔
554
  if (pSyncNode == NULL) {
2,884,841!
555
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
556
    sError("sync ready for read error");
×
557
    return false;
×
558
  }
559

560
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
2,884,841✔
561
    terrno = TSDB_CODE_SYN_NOT_LEADER;
72,702✔
562
    return false;
72,702✔
563
  }
564

565
  if (!pSyncNode->restoreFinish) {
2,812,139✔
566
    terrno = TSDB_CODE_SYN_RESTORING;
405✔
567
    return false;
405✔
568
  }
569

570
  return true;
2,811,734✔
571
}
572

573
bool syncIsReadyForRead(int64_t rid) {
2,606,264✔
574
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
2,606,264✔
575
  if (pSyncNode == NULL) {
2,606,953!
576
    sError("sync ready for read error");
×
577
    return false;
×
578
  }
579

580
  bool ready = syncNodeIsReadyForRead(pSyncNode);
2,606,953✔
581

582
  syncNodeRelease(pSyncNode);
2,606,862✔
583
  return ready;
2,606,126✔
584
}
585

586
#ifdef BUILD_NO_CALL
587
bool syncSnapshotSending(int64_t rid) {
588
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
589
  if (pSyncNode == NULL) {
590
    return false;
591
  }
592

593
  bool b = syncNodeSnapshotSending(pSyncNode);
594
  syncNodeRelease(pSyncNode);
595
  return b;
596
}
597

598
bool syncSnapshotRecving(int64_t rid) {
599
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
600
  if (pSyncNode == NULL) {
601
    return false;
602
  }
603

604
  bool b = syncNodeSnapshotRecving(pSyncNode);
605
  syncNodeRelease(pSyncNode);
606
  return b;
607
}
608
#endif
609

610
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
14,387✔
611
  if (pSyncNode->peersNum == 0) {
14,387✔
612
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
10,628✔
613
    return 0;
10,628✔
614
  }
615

616
  int32_t ret = 0;
3,759✔
617
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->replicaNum > 1) {
3,759✔
618
    SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
1,211✔
619
    if (pSyncNode->peersNum == 2) {
1,211✔
620
      SyncIndex matchIndex0 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
889✔
621
      SyncIndex matchIndex1 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[1]));
889✔
622
      if (matchIndex1 > matchIndex0) {
889✔
623
        newLeader = (pSyncNode->peersNodeInfo)[1];
18✔
624
      }
625
    }
626
    ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
1,211✔
627
  }
628

629
  return ret;
3,759✔
630
}
631

632
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
1,211✔
633
  if (pSyncNode->replicaNum == 1) {
1,211!
634
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
×
635
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
636
  }
637

638
  sNTrace(pSyncNode, "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
1,211!
639

640
  SRpcMsg rpcMsg = {0};
1,211✔
641
  TAOS_CHECK_RETURN(syncBuildLeaderTransfer(&rpcMsg, pSyncNode->vgId));
1,211!
642

643
  SyncLeaderTransfer* pMsg = rpcMsg.pCont;
1,211✔
644
  pMsg->newLeaderId.addr = SYNC_ADDR(&newLeader);
1,211✔
645
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
1,211✔
646
  pMsg->newNodeInfo = newLeader;
1,211✔
647

648
  int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false, NULL);
1,211✔
649
  rpcFreeCont(rpcMsg.pCont);
1,211✔
650
  return ret;
1,211✔
651
}
652

653
SSyncState syncGetState(int64_t rid) {
1,277,980✔
654
  SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
1,277,980✔
655

656
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
1,277,980✔
657
  if (pSyncNode != NULL) {
1,278,788✔
658
    state.state = pSyncNode->state;
1,278,745✔
659
    state.roleTimeMs = pSyncNode->roleTimeMs;
1,278,745✔
660
    state.startTimeMs = pSyncNode->startTime;
1,278,745✔
661
    state.restored = pSyncNode->restoreFinish;
1,278,745✔
662
    if (pSyncNode->vgId != 1) {
1,278,745✔
663
      state.canRead = syncNodeIsReadyForRead(pSyncNode);
278,487✔
664
    } else {
665
      state.canRead = state.restored;
1,000,258✔
666
    }
667
    /*
668
    double progress = 0;
669
    if(pSyncNode->pLogBuf->totalIndex > 0 && pSyncNode->pLogBuf->commitIndex > 0){
670
      progress = (double)pSyncNode->pLogBuf->commitIndex/(double)pSyncNode->pLogBuf->totalIndex;
671
      state.progress = (int32_t)(progress * 100);
672
    }
673
    else{
674
      state.progress = -1;
675
    }
676
    sDebug("vgId:%d, learner progress state, commitIndex:%" PRId64 " totalIndex:%" PRId64 ", "
677
            "progress:%lf, progress:%d",
678
          pSyncNode->vgId,
679
         pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->totalIndex, progress, state.progress);
680
    */
681
    state.term = raftStoreGetTerm(pSyncNode);
1,278,745✔
682
    syncNodeRelease(pSyncNode);
1,278,798✔
683
  }
684

685
  return state;
1,278,795✔
686
}
687

688
SSyncMetrics syncGetMetrics(int64_t rid) {
×
689
  SSyncMetrics metrics = {0};
×
690

691
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
692
  if (pSyncNode != NULL) {
×
693
    sDebug("vgId:%d, sync get metrics, wal_write_bytes:%" PRId64 ", wal_write_time:%" PRId64, pSyncNode->vgId,
×
694
           pSyncNode->wal_write_bytes, pSyncNode->wal_write_time);
695
    metrics.wal_write_bytes = atomic_load_64(&pSyncNode->wal_write_bytes);
×
696
    metrics.wal_write_time = atomic_load_64(&pSyncNode->wal_write_time);
×
697
    syncNodeRelease(pSyncNode);
×
698
  }
699
  return metrics;
×
700
}
701

702
void syncResetMetrics(int64_t rid, const SSyncMetrics* pOldMetrics) {
×
703
  if (pOldMetrics == NULL) return;
×
704

705
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
706
  if (pSyncNode != NULL) {
×
707
    // Atomically subtract the old metrics values from current metrics
708
    (void)atomic_sub_fetch_64(&pSyncNode->wal_write_bytes, pOldMetrics->wal_write_bytes);
×
709
    (void)atomic_sub_fetch_64(&pSyncNode->wal_write_time, pOldMetrics->wal_write_time);
×
710
    syncNodeRelease(pSyncNode);
×
711
  }
712
}
713

714
void syncGetCommitIndex(int64_t rid, int64_t* syncCommitIndex) {
265,914✔
715
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
265,914✔
716
  if (pSyncNode != NULL) {
265,914!
717
    *syncCommitIndex = pSyncNode->commitIndex;
265,914✔
718
    syncNodeRelease(pSyncNode);
265,914✔
719
  }
720
}
265,914✔
721

722
int32_t syncGetArbToken(int64_t rid, char* outToken) {
47,170✔
723
  int32_t    code = 0;
47,170✔
724
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
47,170✔
725
  if (pSyncNode == NULL) {
47,170!
726
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
727
    if (terrno != 0) code = terrno;
×
728
    TAOS_RETURN(code);
×
729
  }
730

731
  memset(outToken, 0, TSDB_ARB_TOKEN_SIZE);
47,170✔
732
  (void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
47,170✔
733
  tstrncpy(outToken, pSyncNode->arbToken, TSDB_ARB_TOKEN_SIZE);
47,170✔
734
  (void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
47,170✔
735

736
  syncNodeRelease(pSyncNode);
47,170✔
737
  TAOS_RETURN(code);
47,170✔
738
}
739

740
int32_t syncCheckSynced(int64_t rid) {
10✔
741
  int32_t    code = 0;
10✔
742
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
10✔
743
  if (pSyncNode == NULL) {
10!
744
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
745
    if (terrno != 0) code = terrno;
×
746
    TAOS_RETURN(code);
×
747
  }
748

749
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
10!
750
    code = TSDB_CODE_SYN_NOT_LEADER;
×
751
    syncNodeRelease(pSyncNode);
×
752
    TAOS_RETURN(code);
×
753
  }
754

755
  bool isSync = pSyncNode->commitIndex >= pSyncNode->assignedCommitIndex;
10✔
756
  code = (isSync ? TSDB_CODE_SUCCESS : TSDB_CODE_VND_ARB_NOT_SYNCED);
10!
757
  if (!isSync) {
10!
758
    sInfo("vgId:%d, not synced, assignedCommitIndex:%" PRId64 ", commitIndex:%" PRId64, pSyncNode->vgId,
×
759
          pSyncNode->assignedCommitIndex, pSyncNode->commitIndex);
760
  }
761

762
  syncNodeRelease(pSyncNode);
10✔
763
  TAOS_RETURN(code);
10✔
764
}
765

766
int32_t syncUpdateArbTerm(int64_t rid, SyncTerm arbTerm) {
185✔
767
  int32_t    code = 0;
185✔
768
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
185✔
769
  if (pSyncNode == NULL) {
185!
770
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
771
    if (terrno != 0) code = terrno;
×
772
    TAOS_RETURN(code);
×
773
  }
774

775
  pSyncNode->arbTerm = TMAX(arbTerm, pSyncNode->arbTerm);
185✔
776
  syncNodeRelease(pSyncNode);
185✔
777
  TAOS_RETURN(code);
185✔
778
}
779

780
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
577,204✔
781
  if (pSyncNode->raftCfg.configIndexCount < 1) {
577,204!
782
    sError("vgId:%d, failed get snapshot config index, configIndexCount:%d", pSyncNode->vgId,
×
783
           pSyncNode->raftCfg.configIndexCount);
784
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
785
    return -2;
×
786
  }
787
  SyncIndex lastIndex = (pSyncNode->raftCfg.configIndexArr)[0];
577,204✔
788

789
  for (int32_t i = 0; i < pSyncNode->raftCfg.configIndexCount; ++i) {
1,198,527✔
790
    if ((pSyncNode->raftCfg.configIndexArr)[i] > lastIndex &&
621,323✔
791
        (pSyncNode->raftCfg.configIndexArr)[i] <= snapshotLastApplyIndex) {
44,118✔
792
      lastIndex = (pSyncNode->raftCfg.configIndexArr)[i];
43,963✔
793
    }
794
  }
795
  sTrace("vgId:%d, index:%" PRId64 ", get last snapshot config index:%" PRId64, pSyncNode->vgId, snapshotLastApplyIndex,
577,204✔
796
         lastIndex);
797

798
  return lastIndex;
577,205✔
799
}
800

801
static SRaftId syncGetRaftIdByEp(SSyncNode* pSyncNode, const SEp* pEp) {
175,667✔
802
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
299,220✔
803
    if (strcmp(pEp->fqdn, (pSyncNode->peersNodeInfo)[i].nodeFqdn) == 0 &&
203,781✔
804
        pEp->port == (pSyncNode->peersNodeInfo)[i].nodePort) {
203,768✔
805
      return pSyncNode->peersId[i];
80,228✔
806
    }
807
  }
808
  return EMPTY_RAFT_ID;
95,439✔
809
}
810

811
static void epsetToString(const SEpSet* pEpSet, char* buffer, size_t bufferSize) {
95,542✔
812
  if (pEpSet == NULL || buffer == NULL) {
95,542!
813
    snprintf(buffer, bufferSize, "EpSet is NULL");
×
814
    return;
×
815
  }
816

817
  size_t offset = 0;
95,546✔
818
  offset += snprintf(buffer + offset, bufferSize - offset, "EpSet: [");
95,546✔
819

820
  for (int i = 0; i < pEpSet->numOfEps; ++i) {
271,210✔
821
    if (offset >= bufferSize) break;
175,664!
822
    offset += snprintf(buffer + offset, bufferSize - offset, "%s:%d%s", pEpSet->eps[i].fqdn, pEpSet->eps[i].port,
175,664✔
823
                       (i + 1 < pEpSet->numOfEps) ? ", " : "");
175,664✔
824
  }
825

826
  if (offset < bufferSize) {
95,546!
827
    snprintf(buffer + offset, bufferSize - offset, "]");
95,546✔
828
  }
829
}
830

831
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
95,544✔
832
  pEpSet->numOfEps = 0;
95,544✔
833

834
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
95,544✔
835
  if (pSyncNode == NULL) return;
95,546!
836

837
  int index = -1;
95,546✔
838

839
  sDebug("vgId:%d, sync get retry epset, leaderCache:%" PRIx64 ", leaderCacheEp.fqdn:%s, leaderCacheEp.port:%d",
95,546✔
840
         pSyncNode->vgId, pSyncNode->leaderCache.addr, pSyncNode->leaderCacheEp.fqdn, pSyncNode->leaderCacheEp.port);
841
  int j = 0;
95,546✔
842
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
272,743✔
843
    if (pSyncNode->raftCfg.cfg.nodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) continue;
177,195✔
844
    SEp* pEp = &pEpSet->eps[j];
175,676✔
845
    tstrncpy(pEp->fqdn, pSyncNode->raftCfg.cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
175,676✔
846
    pEp->port = (pSyncNode->raftCfg.cfg.nodeInfo)[i].nodePort;
175,676✔
847
    pEpSet->numOfEps++;
175,676✔
848
    SRaftId id = syncGetRaftIdByEp(pSyncNode, pEp);
175,676✔
849
    sDebug("vgId:%d, sync get retry epset, index:%d id:%" PRIx64 " %s:%d", pSyncNode->vgId, i, id.addr, pEp->fqdn,
175,678✔
850
           pEp->port);
851
    if (pEp->port == pSyncNode->leaderCacheEp.port &&
175,678✔
852
        strncmp(pEp->fqdn, pSyncNode->leaderCacheEp.fqdn, TSDB_FQDN_LEN) == 0 /*&&
66,390✔
853
        id.vgId == pSyncNode->leaderCache.vgId && id.addr != 0 && id.vgId != 0*/) {
854
      index = j;
66,388✔
855
    }
856
    j++;
175,678✔
857
  }
858
  if (pEpSet->numOfEps > 0) {
95,548✔
859
    if (index != -1) {
95,543✔
860
      pEpSet->inUse = index;
66,390✔
861
    } else {
862
      if (pSyncNode->myRaftId.addr == pSyncNode->leaderCache.addr &&
29,153!
863
          pSyncNode->myRaftId.vgId == pSyncNode->leaderCache.vgId) {
×
864
        pEpSet->inUse = pSyncNode->raftCfg.cfg.myIndex;
×
865
      } else {
866
        pEpSet->inUse = (pSyncNode->raftCfg.cfg.myIndex + 1) % pEpSet->numOfEps;
29,153✔
867
      }
868
    }
869
    // pEpSet->inUse = 0;
870
  }
871
  epsetSort(pEpSet);
95,548✔
872

873
  char buffer[1024];
874
  epsetToString(pEpSet, buffer, sizeof(buffer));
95,546✔
875
  sDebug("vgId:%d, sync get retry epset numOfEps:%d %s inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, buffer,
95,542✔
876
         pEpSet->inUse);
877
  syncNodeRelease(pSyncNode);
95,542✔
878
}
879

880
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
1,975,828✔
881
  int32_t    code = 0;
1,975,828✔
882
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
1,975,828✔
883
  if (pSyncNode == NULL) {
1,975,843!
884
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
885
    if (terrno != 0) code = terrno;
×
886
    sError("sync propose error");
×
887
    TAOS_RETURN(code);
×
888
  }
889

890
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak, seq);
1,975,843✔
891
  syncNodeRelease(pSyncNode);
1,975,724✔
892
  return ret;
1,975,758✔
893
}
894

895
int32_t syncCheckMember(int64_t rid) {
×
896
  int32_t    code = 0;
×
897
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
898
  if (pSyncNode == NULL) {
×
899
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
900
    if (terrno != 0) code = terrno;
×
901
    sError("sync propose error");
×
902
    TAOS_RETURN(code);
×
903
  }
904

905
  if (pSyncNode->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) {
×
906
    syncNodeRelease(pSyncNode);
×
907
    return TSDB_CODE_SYN_WRONG_ROLE;
×
908
  }
909

910
  syncNodeRelease(pSyncNode);
×
911
  return 0;
×
912
}
913

914
int32_t syncIsCatchUp(int64_t rid) {
4,726✔
915
  int32_t    code = 0;
4,726✔
916
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
4,726✔
917
  if (pSyncNode == NULL) {
4,726!
918
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
919
    if (terrno != 0) code = terrno;
×
920
    sError("sync Node Acquire error since %d", ERRNO);
×
921
    TAOS_RETURN(code);
×
922
  }
923

924
  int32_t isCatchUp = 0;
4,726✔
925
  if (pSyncNode->pLogBuf->totalIndex < 0 || pSyncNode->pLogBuf->commitIndex < 0 ||
4,726!
926
      pSyncNode->pLogBuf->totalIndex < pSyncNode->pLogBuf->commitIndex ||
937✔
927
      pSyncNode->pLogBuf->totalIndex - pSyncNode->pLogBuf->commitIndex > SYNC_LEARNER_CATCHUP) {
933✔
928
    sInfo("vgId:%d, Not catch up, wait one second, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64,
4,453!
929
          pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex,
930
          pSyncNode->pLogBuf->matchIndex);
931
    isCatchUp = 0;
4,453✔
932
  } else {
933
    sInfo("vgId:%d, Catch up, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, pSyncNode->vgId,
273!
934
          pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->matchIndex);
935
    isCatchUp = 1;
273✔
936
  }
937

938
  syncNodeRelease(pSyncNode);
4,726✔
939
  return isCatchUp;
4,726✔
940
}
941

942
ESyncRole syncGetRole(int64_t rid) {
4,726✔
943
  int32_t    code = 0;
4,726✔
944
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
4,726✔
945
  if (pSyncNode == NULL) {
4,726!
946
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
947
    if (terrno != 0) code = terrno;
×
948
    sError("sync Node Acquire error since %d", ERRNO);
×
949
    TAOS_RETURN(code);
×
950
  }
951

952
  ESyncRole role = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole;
4,726✔
953

954
  syncNodeRelease(pSyncNode);
4,726✔
955
  return role;
4,726✔
956
}
957

958
int64_t syncGetTerm(int64_t rid) {
18,749✔
959
  int32_t    code = 0;
18,749✔
960
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
18,749✔
961
  if (pSyncNode == NULL) {
18,749!
962
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
963
    if (terrno != 0) code = terrno;
×
964
    sError("sync Node Acquire error since %d", ERRNO);
×
965
    TAOS_RETURN(code);
×
966
  }
967

968
  int64_t term = raftStoreGetTerm(pSyncNode);
18,749✔
969

970
  syncNodeRelease(pSyncNode);
18,749✔
971
  return term;
18,749✔
972
}
973

974
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
1,977,044✔
975
  int32_t code = 0;
1,977,044✔
976
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
1,977,044✔
977
    code = TSDB_CODE_SYN_NOT_LEADER;
2,961✔
978
    sNWarn(pSyncNode, "sync propose not leader, type:%s", TMSG_INFO(pMsg->msgType));
2,961!
979
    TAOS_RETURN(code);
2,961✔
980
  }
981

982
  if (!pSyncNode->restoreFinish) {
1,974,083✔
983
    code = TSDB_CODE_SYN_RESTORING;
54✔
984
    sNWarn(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
54!
985
           TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
986
    TAOS_RETURN(code);
54✔
987
  }
988

989
  // heartbeat timeout
990
  if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER && syncNodeHeartbeatReplyTimeout(pSyncNode)) {
1,974,029!
991
    code = TSDB_CODE_SYN_PROPOSE_NOT_READY;
×
992
    sNError(pSyncNode, "failed to sync propose since heartbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
×
993
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
994
    TAOS_RETURN(code);
×
995
  }
996

997
  // optimized one replica
998
  if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
1,974,035✔
999
    SyncIndex retIndex;
1000
    int32_t   code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
1,781,093✔
1001
    if (code >= 0) {
1,780,980!
1002
      pMsg->info.conn.applyIndex = retIndex;
1,780,996✔
1003
      pMsg->info.conn.applyTerm = raftStoreGetTerm(pSyncNode);
1,780,996✔
1004

1005
      // after raft member change, need to handle 1->2 switching point
1006
      // at this point, need to switch entry handling thread
1007
      if (pSyncNode->replicaNum == 1) {
1,781,082!
1008
        sGDebug(&pMsg->info.traceId, "vgId:%d, index:%" PRId64 ", propose optimized msg type:%s", pSyncNode->vgId,
1,781,085!
1009
                retIndex, TMSG_INFO(pMsg->msgType));
1010
        return 1;
1,781,034✔
1011
      } else {
1012
        sGDebug(&pMsg->info.traceId,
×
1013
                "vgId:%d, index:%" PRId64 ", propose optimized msg, return to normal, type:%s, handle:%p",
1014
                pSyncNode->vgId, retIndex, TMSG_INFO(pMsg->msgType), pMsg->info.handle);
1015
        return 0;
×
1016
      }
1017
    } else {
1018
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1019
      sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
×
1020
             TMSG_INFO(pMsg->msgType));
1021
      TAOS_RETURN(code);
×
1022
    }
1023
  } else {
1024
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
192,936✔
1025
    uint64_t  seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
192,937✔
1026
    SRpcMsg   rpcMsg = {.info.traceId = pMsg->info.traceId};
192,936✔
1027
    int32_t   code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
192,936✔
1028
    if (code != 0) {
192,937!
1029
      sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
×
1030
      code = syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
×
1031
      TAOS_RETURN(code);
×
1032
    }
1033

1034
    sGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, propose msg, type:%s", pSyncNode->vgId, pMsg,
192,937!
1035
            TMSG_INFO(pMsg->msgType));
1036
    code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
192,937✔
1037
    if (code != 0) {
192,936✔
1038
      sWarn("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
1,558!
1039
      TAOS_CHECK_RETURN(syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum));
1,559✔
1040
    }
1041

1042
    if (seq != NULL) *seq = seqNum;
192,928✔
1043
    TAOS_RETURN(code);
192,928✔
1044
  }
1045
}
1046

1047
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
239,884✔
1048
  pSyncTimer->pTimer = NULL;
239,884✔
1049
  pSyncTimer->counter = 0;
239,884✔
1050
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
239,884✔
1051
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
239,884✔
1052
  pSyncTimer->destId = destId;
239,884✔
1053
  pSyncTimer->timeStamp = taosGetTimestampMs();
239,888✔
1054
  atomic_store_64(&pSyncTimer->logicClock, 0);
239,888✔
1055
  return 0;
239,885✔
1056
}
1057

1058
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
2,814✔
1059
  int32_t code = 0;
2,814✔
1060
  int64_t tsNow = taosGetTimestampMs();
2,814✔
1061
  if (syncIsInit()) {
2,814!
1062
    SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid);
2,814✔
1063
    if (pData == NULL) {
2,814✔
1064
      pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
2,813!
1065
      pData->rid = syncHbTimerDataAdd(pData);
2,813✔
1066
    }
1067
    pSyncTimer->hbDataRid = pData->rid;
2,814✔
1068
    pSyncTimer->timeStamp = tsNow;
2,814✔
1069

1070
    pData->syncNodeRid = pSyncNode->rid;
2,814✔
1071
    pData->pTimer = pSyncTimer;
2,814✔
1072
    pData->destId = pSyncTimer->destId;
2,814✔
1073
    pData->logicClock = pSyncTimer->logicClock;
2,814✔
1074
    pData->execTime = tsNow + pSyncTimer->timerMS;
2,814✔
1075

1076
    sTrace("vgId:%d, start hb timer, rid:%" PRId64 " addr:0x%" PRIx64 " at %d", pSyncNode->vgId, pData->rid,
2,814!
1077
           pData->destId.addr, pSyncTimer->timerMS);
1078

1079
    bool stopped = taosTmrResetPriority(pSyncTimer->timerCb, pSyncTimer->timerMS, (void*)(pData->rid),
2,814✔
1080
                                        syncEnv()->pTimerManager, &pSyncTimer->pTimer, 2);
2,814✔
1081
    if (stopped) {
2,814✔
1082
      sWarn("vgId:%d, reset hb timer stopped:%d", pSyncNode->vgId, stopped);
1!
1083
    }
1084
  } else {
1085
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1086
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
×
1087
  }
1088
  return code;
2,814✔
1089
}
1090

1091
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
27,389✔
1092
  int32_t ret = 0;
27,389✔
1093
  (void)atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
27,389✔
1094
  bool stop = taosTmrStop(pSyncTimer->pTimer);
27,390✔
1095
  sDebug("vgId:%d, stop hb timer stop:%d", pSyncNode->vgId, stop);
27,390!
1096
  pSyncTimer->pTimer = NULL;
27,390✔
1097
  syncHbTimerDataRemove(pSyncTimer->hbDataRid);
27,390✔
1098
  pSyncTimer->hbDataRid = -1;
27,390✔
1099
  return ret;
27,390✔
1100
}
1101

1102
int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
14,387✔
1103
  int32_t code = 0;
14,387✔
1104
  if (pNode->pLogStore == NULL) {
14,387!
1105
    sError("vgId:%d, log store not created", pNode->vgId);
×
1106
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1107
  }
1108
  if (pNode->pFsm == NULL) {
14,387!
1109
    sError("vgId:%d, pFsm not registered", pNode->vgId);
×
1110
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1111
  }
1112
  if (pNode->pFsm->FpGetSnapshotInfo == NULL) {
14,387!
1113
    sError("vgId:%d, FpGetSnapshotInfo not registered", pNode->vgId);
×
1114
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1115
  }
1116
  SSnapshot snapshot = {0};
14,387✔
1117
  // TODO check return value
1118
  (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
14,387✔
1119

1120
  SyncIndex commitIndex = snapshot.lastApplyIndex;
14,387✔
1121
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
14,387✔
1122
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
14,387✔
1123
  if ((lastVer < commitIndex || firstVer > commitIndex + 1) || pNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
14,387!
1124
    sInfo("vgId:%d, restore log store from snapshot, firstVer:%" PRId64 ", lastVer:%" PRId64 ", commitIndex:%" PRId64,
×
1125
          pNode->vgId, firstVer, lastVer, commitIndex);
1126
    if ((code = pNode->pLogStore->syncLogRestoreFromSnapshot(pNode->pLogStore, commitIndex)) != 0) {
×
1127
      sError("vgId:%d, failed to restore log store from snapshot since %s. lastVer:%" PRId64 ", snapshotVer:%" PRId64,
×
1128
             pNode->vgId, terrstr(), lastVer, commitIndex);
1129
      TAOS_RETURN(code);
×
1130
    }
1131
  }
1132
  TAOS_RETURN(code);
14,387✔
1133
}
1134

1135
// open/close --------------
1136
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
14,383✔
1137
  int32_t    code = 0;
14,383✔
1138
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
14,383!
1139
  if (pSyncNode == NULL) {
14,387!
1140
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1141
    goto _error;
×
1142
  }
1143

1144
  if (!taosDirExist((char*)(pSyncInfo->path))) {
14,387✔
1145
    if (taosMkDir(pSyncInfo->path) != 0) {
11,335!
1146
      terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1147
      sError("vgId:%d, failed to create dir:%s since %s", pSyncInfo->vgId, pSyncInfo->path, terrstr());
×
1148
      goto _error;
×
1149
    }
1150
  }
1151

1152
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
14,386✔
1153
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
14,386✔
1154
           TD_DIRSEP);
1155
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
14,386✔
1156

1157
  if (!taosCheckExistFile(pSyncNode->configPath)) {
14,386✔
1158
    // create a new raft config file
1159
    sInfo("vgId:%d, create a new raft config file", pSyncInfo->vgId);
11,335✔
1160
    pSyncNode->vgId = pSyncInfo->vgId;
11,335✔
1161
    pSyncNode->mountVgId = pSyncInfo->mountVgId;
11,335✔
1162
    pSyncNode->raftCfg.isStandBy = pSyncInfo->isStandBy;
11,335✔
1163
    pSyncNode->raftCfg.snapshotStrategy = pSyncInfo->snapshotStrategy;
11,335✔
1164
    pSyncNode->raftCfg.lastConfigIndex = pSyncInfo->syncCfg.lastIndex;
11,335✔
1165
    pSyncNode->raftCfg.batchSize = pSyncInfo->batchSize;
11,335✔
1166
    pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
11,335✔
1167
    pSyncNode->raftCfg.configIndexCount = 1;
11,335✔
1168
    pSyncNode->raftCfg.configIndexArr[0] = -1;
11,335✔
1169

1170
    if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
11,335!
1171
      terrno = code;
×
1172
      sError("vgId:%d, failed to create sync cfg file", pSyncNode->vgId);
×
1173
      goto _error;
×
1174
    }
1175
  } else {
1176
    // update syncCfg by raft_config.json
1177
    if ((code = syncReadCfgFile(pSyncNode)) != 0) {
3,051!
1178
      terrno = code;
×
1179
      sError("vgId:%d, failed to read sync cfg file", pSyncNode->vgId);
×
1180
      goto _error;
×
1181
    }
1182

1183
    if (vnodeVersion > pSyncNode->raftCfg.cfg.changeVersion) {
3,050✔
1184
      if (pSyncInfo->syncCfg.totalReplicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) {
1,845!
1185
        sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
1,530!
1186
        pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
1,530✔
1187
        if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
1,530!
1188
          terrno = code;
×
1189
          sError("vgId:%d, failed to write sync cfg file", pSyncNode->vgId);
×
1190
          goto _error;
×
1191
        }
1192
      } else {
1193
        sInfo("vgId:%d, use sync config from sync cfg file", pSyncNode->vgId);
315!
1194
        pSyncInfo->syncCfg = pSyncNode->raftCfg.cfg;
315✔
1195
      }
1196
    } else {
1197
      sInfo("vgId:%d, skip save sync cfg file since request ver:%d <= file ver:%d", pSyncNode->vgId, vnodeVersion,
1,205!
1198
            pSyncInfo->syncCfg.changeVersion);
1199
    }
1200
  }
1201

1202
  // init by SSyncInfo
1203
  pSyncNode->vgId = pSyncInfo->vgId;
14,387✔
1204
  pSyncNode->mountVgId = pSyncInfo->mountVgId;
14,387✔
1205
  SSyncCfg* pCfg = &pSyncNode->raftCfg.cfg;
14,387✔
1206
  bool      updated = false;
14,387✔
1207
  sInfo("vgId:%d, start to open sync node, totalReplicaNum:%d replicaNum:%d selfIndex:%d", pSyncNode->vgId,
14,387✔
1208
        pCfg->totalReplicaNum, pCfg->replicaNum, pCfg->myIndex);
1209
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
36,217✔
1210
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
21,830✔
1211
    if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) {
21,830!
1212
      updated = true;
×
1213
    }
1214
    sInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort,
21,830✔
1215
          pNode->nodeId, pNode->clusterId);
1216
  }
1217

1218
  if (vnodeVersion > pSyncInfo->syncCfg.changeVersion) {
14,387✔
1219
    if (updated) {
1,920!
1220
      sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId);
×
1221
      if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
×
1222
        terrno = code;
×
1223
        sError("vgId:%d, failed to write sync cfg file on dnode info updated", pSyncNode->vgId);
×
1224
        goto _error;
×
1225
      }
1226
    }
1227
  }
1228

1229
  pSyncNode->pWal = pSyncInfo->pWal;
14,387✔
1230
  pSyncNode->msgcb = pSyncInfo->msgcb;
14,387✔
1231
  pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
14,387✔
1232
  pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
14,387✔
1233
  pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
14,387✔
1234

1235
  // create raft log ring buffer
1236
  code = syncLogBufferCreate(&pSyncNode->pLogBuf);
14,387✔
1237
  if (pSyncNode->pLogBuf == NULL) {
14,387!
1238
    sError("failed to init sync log buffer since %s. vgId:%d", tstrerror(code), pSyncNode->vgId);
×
1239
    goto _error;
×
1240
  }
1241

1242
  // init replicaNum, replicasId
1243
  pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
14,387✔
1244
  pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
14,387✔
1245
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
36,217✔
1246
    if (syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]) ==
21,830!
1247
        false) {
1248
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
×
1249
      goto _error;
×
1250
    }
1251
  }
1252

1253
  // init internal
1254
  pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
14,387✔
1255
  pSyncNode->myRaftId = pSyncNode->replicasId[pSyncNode->raftCfg.cfg.myIndex];
14,387✔
1256

1257
  // init peersNum, peers, peersId
1258
  pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
14,387✔
1259
  int32_t j = 0;
14,387✔
1260
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
36,217✔
1261
    if (i != pSyncNode->raftCfg.cfg.myIndex) {
21,830✔
1262
      pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
7,443✔
1263
      pSyncNode->peersId[j] = pSyncNode->replicasId[i];
7,443✔
1264
      syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
7,443✔
1265
      j++;
7,443✔
1266
    }
1267
  }
1268

1269
  pSyncNode->arbTerm = -1;
14,387✔
1270
  (void)taosThreadMutexInit(&pSyncNode->arbTokenMutex, NULL);
14,387✔
1271
  syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncInfo->vgId, pSyncNode->arbToken);
14,387✔
1272
  sInfo("vgId:%d, generate arb token:%s", pSyncNode->vgId, pSyncNode->arbToken);
14,387✔
1273

1274
  // init raft algorithm
1275
  pSyncNode->pFsm = pSyncInfo->pFsm;
14,387✔
1276
  pSyncInfo->pFsm = NULL;
14,387✔
1277
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
14,387✔
1278
  pSyncNode->leaderCache = EMPTY_RAFT_ID;
14,387✔
1279
  pSyncNode->leaderCacheEp.port = 0;
14,387✔
1280
  pSyncNode->leaderCacheEp.fqdn[0] = '\0';
14,387✔
1281

1282
  // init life cycle outside
1283

1284
  // TLA+ Spec
1285
  // InitHistoryVars == /\ elections = {}
1286
  //                    /\ allLogs   = {}
1287
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
1288
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
1289
  //                   /\ state       = [i \in Server |-> Follower]
1290
  //                   /\ votedFor    = [i \in Server |-> Nil]
1291
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
1292
  //                      /\ votesGranted   = [i \in Server |-> {}]
1293
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
1294
  // \* leader does not send itself messages. It's still easier to include these
1295
  // \* in the functions.
1296
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
1297
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
1298
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
1299
  //                /\ commitIndex  = [i \in Server |-> 0]
1300
  // Init == /\ messages = [m \in {} |-> 0]
1301
  //         /\ InitHistoryVars
1302
  //         /\ InitServerVars
1303
  //         /\ InitCandidateVars
1304
  //         /\ InitLeaderVars
1305
  //         /\ InitLogVars
1306
  //
1307

1308
  // init TLA+ server vars
1309
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
14,387✔
1310
  pSyncNode->roleTimeMs = taosGetTimestampMs();
14,387✔
1311
  if ((code = raftStoreOpen(pSyncNode)) != 0) {
14,387!
1312
    terrno = code;
×
1313
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
×
1314
    goto _error;
×
1315
  }
1316

1317
  // init TLA+ candidate vars
1318
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
14,387✔
1319
  if (pSyncNode->pVotesGranted == NULL) {
14,387!
1320
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
×
1321
    goto _error;
×
1322
  }
1323
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
14,387✔
1324
  if (pSyncNode->pVotesRespond == NULL) {
14,387!
1325
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
×
1326
    goto _error;
×
1327
  }
1328

1329
  // init TLA+ leader vars
1330
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
14,387✔
1331
  if (pSyncNode->pNextIndex == NULL) {
14,387!
1332
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
×
1333
    goto _error;
×
1334
  }
1335
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
14,387✔
1336
  if (pSyncNode->pMatchIndex == NULL) {
14,387!
1337
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
×
1338
    goto _error;
×
1339
  }
1340

1341
  // init TLA+ log vars
1342
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
14,387✔
1343
  if (pSyncNode->pLogStore == NULL) {
14,387!
1344
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
×
1345
    goto _error;
×
1346
  }
1347

1348
  SyncIndex commitIndex = SYNC_INDEX_INVALID;
14,387✔
1349
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
14,387!
1350
    SSnapshot snapshot = {0};
14,387✔
1351
    // TODO check return value
1352
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
14,387✔
1353
    if (snapshot.lastApplyIndex > commitIndex) {
14,387✔
1354
      commitIndex = snapshot.lastApplyIndex;
1,504✔
1355
      sNTrace(pSyncNode, "reset commit index by snapshot");
1,504✔
1356
    }
1357
    pSyncNode->fsmState = snapshot.state;
14,387✔
1358
    if (pSyncNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
14,387!
1359
      sError("vgId:%d, fsm state is incomplete.", pSyncNode->vgId);
×
1360
      if (pSyncNode->replicaNum == 1) {
×
1361
        terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1362
        goto _error;
×
1363
      }
1364
    }
1365
  }
1366
  pSyncNode->commitIndex = commitIndex;
14,387✔
1367
  sInfo("vgId:%d, sync node commitIndex initialized as %" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
14,387✔
1368

1369
  // restore log store on need
1370
  if ((code = syncNodeLogStoreRestoreOnNeed(pSyncNode)) < 0) {
14,387!
1371
    terrno = code;
×
1372
    sError("vgId:%d, failed to restore log store since %s.", pSyncNode->vgId, terrstr());
×
1373
    goto _error;
×
1374
  }
1375

1376
  // timer ms init
1377
  pSyncNode->pingBaseLine = PING_TIMER_MS;
14,387✔
1378
  pSyncNode->electBaseLine = tsElectInterval;
14,387✔
1379
  pSyncNode->hbBaseLine = tsHeartbeatInterval;
14,387✔
1380

1381
  // init ping timer
1382
  pSyncNode->pPingTimer = NULL;
14,387✔
1383
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
14,387✔
1384
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
14,387✔
1385
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
14,387✔
1386
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
14,387✔
1387
  pSyncNode->pingTimerCounter = 0;
14,387✔
1388

1389
  // init elect timer
1390
  pSyncNode->pElectTimer = NULL;
14,387✔
1391
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
14,387✔
1392
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
14,387✔
1393
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
14,387✔
1394
  pSyncNode->electTimerCounter = 0;
14,387✔
1395

1396
  // init heartbeat timer
1397
  pSyncNode->pHeartbeatTimer = NULL;
14,387✔
1398
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
14,387✔
1399
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
14,387✔
1400
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
14,387✔
1401
#ifdef BUILD_NO_CALL
1402
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
1403
#endif
1404
  pSyncNode->heartbeatTimerCounter = 0;
14,387✔
1405

1406
  // init peer heartbeat timer
1407
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
230,182✔
1408
    if ((code = syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i])) != 0) {
215,795!
1409
      terrno = code;
×
1410
      goto _error;
×
1411
    }
1412
  }
1413

1414
  // tools
1415
  if ((code = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS, &pSyncNode->pSyncRespMgr)) != 0) {
14,387!
1416
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
×
1417
    goto _error;
×
1418
  }
1419
  if (pSyncNode->pSyncRespMgr == NULL) {
14,387!
1420
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
×
1421
    goto _error;
×
1422
  }
1423

1424
  // restore state
1425
  pSyncNode->restoreFinish = false;
14,387✔
1426

1427
  // snapshot senders
1428
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
230,103✔
1429
    SSyncSnapshotSender* pSender = NULL;
215,725✔
1430
    code = snapshotSenderCreate(pSyncNode, i, &pSender);
215,725✔
1431
    if (pSender == NULL) return NULL;
215,713!
1432

1433
    pSyncNode->senders[i] = pSender;
215,713✔
1434
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
215,713✔
1435
  }
1436

1437
  // snapshot receivers
1438
  code = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID, &pSyncNode->pNewNodeReceiver);
14,378✔
1439
  if (pSyncNode->pNewNodeReceiver == NULL) return NULL;
14,387!
1440
  sRDebug(pSyncNode->pNewNodeReceiver, "snapshot receiver create while open sync node, data:%p",
14,387✔
1441
          pSyncNode->pNewNodeReceiver);
1442

1443
  // is config changing
1444
  pSyncNode->changing = false;
14,387✔
1445

1446
  // replication mgr
1447
  if ((code = syncNodeLogReplInit(pSyncNode)) < 0) {
14,387!
1448
    terrno = code;
×
1449
    sError("vgId:%d, failed to init repl mgr since %s.", pSyncNode->vgId, terrstr());
×
1450
    goto _error;
×
1451
  }
1452

1453
  // peer state
1454
  if ((code = syncNodePeerStateInit(pSyncNode)) < 0) {
14,387!
1455
    terrno = code;
×
1456
    sError("vgId:%d, failed to init peer stat since %s.", pSyncNode->vgId, terrstr());
×
1457
    goto _error;
×
1458
  }
1459

1460
  //
1461
  // min match index
1462
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
14,386✔
1463

1464
  // start in syncNodeStart
1465
  // start raft
1466

1467
  int64_t timeNow = taosGetTimestampMs();
14,387✔
1468
  pSyncNode->startTime = timeNow;
14,387✔
1469
  pSyncNode->lastReplicateTime = timeNow;
14,387✔
1470

1471
  // snapshotting
1472
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
14,387✔
1473

1474
  // init log buffer
1475
  if ((code = syncLogBufferInit(pSyncNode->pLogBuf, pSyncNode)) < 0) {
14,387!
1476
    terrno = code;
×
1477
    sError("vgId:%d, failed to init sync log buffer since %s", pSyncNode->vgId, terrstr());
×
1478
    goto _error;
×
1479
  }
1480

1481
  pSyncNode->isStart = true;
14,387✔
1482
  pSyncNode->electNum = 0;
14,387✔
1483
  pSyncNode->becomeLeaderNum = 0;
14,387✔
1484
  pSyncNode->becomeAssignedLeaderNum = 0;
14,387✔
1485
  pSyncNode->configChangeNum = 0;
14,387✔
1486
  pSyncNode->hbSlowNum = 0;
14,387✔
1487
  pSyncNode->hbrSlowNum = 0;
14,387✔
1488
  pSyncNode->tmrRoutineNum = 0;
14,387✔
1489

1490
  sNInfo(pSyncNode, "sync node opened, node:%p electInterval:%d heartbeatInterval:%d heartbeatTimeout:%d", pSyncNode,
14,387✔
1491
         tsElectInterval, tsHeartbeatInterval, tsHeartbeatTimeout);
1492
  return pSyncNode;
14,387✔
1493

1494
_error:
×
1495
  if (pSyncInfo->pFsm) {
×
1496
    taosMemoryFree(pSyncInfo->pFsm);
×
1497
    pSyncInfo->pFsm = NULL;
×
1498
  }
1499
  syncNodeClose(pSyncNode);
×
1500
  pSyncNode = NULL;
×
1501
  return NULL;
×
1502
}
1503

1504
#ifdef BUILD_NO_CALL
1505
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
1506
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
1507
    SSnapshot snapshot = {0};
1508
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
1509
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
1510
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
1511
    }
1512
  }
1513
}
1514
#endif
1515

1516
int32_t syncNodeRestore(SSyncNode* pSyncNode) {
14,387✔
1517
  int32_t code = 0;
14,387✔
1518
  if (pSyncNode->pLogStore == NULL) {
14,387!
1519
    sError("vgId:%d, log store not created", pSyncNode->vgId);
×
1520
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1521
  }
1522
  if (pSyncNode->pLogBuf == NULL) {
14,387!
1523
    sError("vgId:%d, ring log buffer not created", pSyncNode->vgId);
×
1524
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1525
  }
1526

1527
  (void)taosThreadMutexLock(&pSyncNode->pLogBuf->mutex);
14,387✔
1528
  SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
14,387✔
1529
  SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore);
14,387✔
1530
  SyncIndex endIndex = pSyncNode->pLogBuf->endIndex;
14,387✔
1531
  (void)taosThreadMutexUnlock(&pSyncNode->pLogBuf->mutex);
14,387✔
1532

1533
  if (lastVer != -1 && endIndex != lastVer + 1) {
14,387!
1534
    code = TSDB_CODE_WAL_LOG_INCOMPLETE;
×
1535
    sWarn("vgId:%d, failed to restore sync node since %s. expected lastLogIndex:%" PRId64 ", lastVer:%" PRId64,
×
1536
          pSyncNode->vgId, terrstr(), endIndex - 1, lastVer);
1537
    // TAOS_RETURN(code);
1538
  }
1539

1540
  // if (endIndex != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
1541
  pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);
14,387✔
1542
  sInfo("vgId:%d, restore began, and keep syncing until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
14,387✔
1543

1544
  if (pSyncNode->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
28,774!
1545
      (code = syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex, NULL, "restore")) < 0) {
14,387✔
1546
    TAOS_RETURN(code);
×
1547
  }
1548

1549
  TAOS_RETURN(code);
14,387✔
1550
}
1551

1552
int32_t syncNodeStart(SSyncNode* pSyncNode) {
14,387✔
1553
  // start raft
1554
  sInfo("vgId:%d, begin to start sync node", pSyncNode->vgId);
14,387✔
1555
  if (pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
14,387✔
1556
    syncNodeBecomeLearner(pSyncNode, "first start");
288✔
1557
  } else {
1558
    if (pSyncNode->replicaNum == 1) {
14,099✔
1559
      raftStoreNextTerm(pSyncNode);
10,795✔
1560
      syncNodeBecomeLeader(pSyncNode, "one replica start");
10,795✔
1561

1562
      // Raft 3.6.2 Committing entries from previous terms
1563
      TAOS_CHECK_RETURN(syncNodeAppendNoop(pSyncNode));
10,795!
1564
    } else {
1565
      SRaftId id = {0};
3,304✔
1566
      syncNodeBecomeFollower(pSyncNode, id, "first start");
3,304✔
1567
    }
1568
  }
1569

1570
  int32_t ret = 0;
14,387✔
1571
  ret = syncNodeStartPingTimer(pSyncNode);
14,387✔
1572
  if (ret != 0) {
14,387!
1573
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, tstrerror(ret));
×
1574
  }
1575
  sInfo("vgId:%d, sync node started", pSyncNode->vgId);
14,387✔
1576
  return ret;
14,387✔
1577
}
1578

1579
#ifdef BUILD_NO_CALL
1580
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
1581
  // state change
1582
  int32_t code = 0;
1583
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
1584
  pSyncNode->roleTimeMs = taosGetTimestampMs();
1585
  // TODO check return value
1586
  TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1587

1588
  // reset elect timer, long enough
1589
  int32_t electMS = TIMER_MAX_MS;
1590
  code = syncNodeRestartElectTimer(pSyncNode, electMS);
1591
  if (code < 0) {
1592
    sError("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, terrstr());
1593
    return -1;
1594
  }
1595

1596
  code = syncNodeStartPingTimer(pSyncNode);
1597
  if (code < 0) {
1598
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr());
1599
    return -1;
1600
  }
1601
  return code;
1602
}
1603
#endif
1604

1605
void syncNodePreClose(SSyncNode* pSyncNode) {
14,386✔
1606
  int32_t code = 0;
14,386✔
1607
  if (pSyncNode == NULL) {
14,386!
1608
    sError("failed to pre close sync node since sync node is null");
×
1609
    return;
×
1610
  }
1611
  if (pSyncNode->pFsm == NULL) {
14,386!
1612
    sError("failed to pre close sync node since fsm is null");
×
1613
    return;
×
1614
  }
1615
  if (pSyncNode->pFsm->FpApplyQueueItems == NULL) {
14,386!
1616
    sError("failed to pre close sync node since FpApplyQueueItems is null");
×
1617
    return;
×
1618
  }
1619

1620
  // stop elect timer
1621
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
14,386!
1622
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1623
    return;
×
1624
  }
1625

1626
  // stop heartbeat timer
1627
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
14,387!
1628
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
1629
    return;
×
1630
  }
1631

1632
  // stop ping timer
1633
  if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) {
14,387!
1634
    sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code));
×
1635
    return;
×
1636
  }
1637

1638
  // clean rsp
1639
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
14,387✔
1640
}
1641

1642
void syncNodePostClose(SSyncNode* pSyncNode) {
12,467✔
1643
  if (pSyncNode->pNewNodeReceiver != NULL) {
12,467!
1644
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
12,467!
1645
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
1646
    }
1647

1648
    sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
12,467✔
1649
           pSyncNode->pNewNodeReceiver);
1650
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
12,467✔
1651
    pSyncNode->pNewNodeReceiver = NULL;
12,467✔
1652
  }
1653
}
12,467✔
1654

1655
void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }
2,809!
1656

1657
void syncNodeClose(SSyncNode* pSyncNode) {
14,387✔
1658
  int32_t code = 0;
14,387✔
1659
  if (pSyncNode == NULL) return;
14,387!
1660
  sNInfo(pSyncNode, "sync close, node:%p", pSyncNode);
14,387✔
1661

1662
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
14,387✔
1663

1664
  if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) {
14,387!
1665
    sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code));
×
1666
    return;
×
1667
  }
1668
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
14,387!
1669
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1670
    return;
×
1671
  }
1672
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
14,387!
1673
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
1674
    return;
×
1675
  }
1676
  syncNodeLogReplDestroy(pSyncNode);
14,387✔
1677

1678
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
14,385✔
1679
  pSyncNode->pSyncRespMgr = NULL;
14,387✔
1680
  voteGrantedDestroy(pSyncNode->pVotesGranted);
14,387✔
1681
  pSyncNode->pVotesGranted = NULL;
14,386✔
1682
  votesRespondDestory(pSyncNode->pVotesRespond);
14,386✔
1683
  pSyncNode->pVotesRespond = NULL;
14,387✔
1684
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
14,387✔
1685
  pSyncNode->pNextIndex = NULL;
14,387✔
1686
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
14,387✔
1687
  pSyncNode->pMatchIndex = NULL;
14,387✔
1688
  logStoreDestory(pSyncNode->pLogStore);
14,387✔
1689
  pSyncNode->pLogStore = NULL;
14,385✔
1690
  syncLogBufferDestroy(pSyncNode->pLogBuf);
14,385✔
1691
  pSyncNode->pLogBuf = NULL;
14,385✔
1692

1693
  (void)taosThreadMutexDestroy(&pSyncNode->arbTokenMutex);
14,385✔
1694

1695
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
230,117✔
1696
    if (pSyncNode->senders[i] != NULL) {
215,730✔
1697
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", pSyncNode->vgId, pSyncNode->senders[i]);
215,727✔
1698

1699
      if (snapshotSenderIsStart(pSyncNode->senders[i])) {
215,727!
1700
        snapshotSenderStop(pSyncNode->senders[i], false);
×
1701
      }
1702

1703
      snapshotSenderDestroy(pSyncNode->senders[i]);
215,749✔
1704
      pSyncNode->senders[i] = NULL;
215,743✔
1705
    }
1706
  }
1707

1708
  if (pSyncNode->pNewNodeReceiver != NULL) {
14,387✔
1709
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
1,920!
1710
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
1711
    }
1712

1713
    sDebug("vgId:%d, snapshot receiver destroy while close, data:%p", pSyncNode->vgId, pSyncNode->pNewNodeReceiver);
1,920✔
1714
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
1,920✔
1715
    pSyncNode->pNewNodeReceiver = NULL;
1,920✔
1716
  }
1717

1718
  if (pSyncNode->pFsm != NULL) {
14,387!
1719
    taosMemoryFree(pSyncNode->pFsm);
14,387!
1720
  }
1721

1722
  raftStoreClose(pSyncNode);
14,387✔
1723

1724
  taosMemoryFree(pSyncNode);
14,387!
1725
}
1726

1727
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->raftCfg.snapshotStrategy; }
×
1728

1729
// timer control --------------
1730
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
14,387✔
1731
  int32_t code = 0;
14,387✔
1732
  if (syncIsInit()) {
14,387!
1733
    bool stopped = taosTmrResetPriority(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, (void*)pSyncNode->rid,
14,387✔
1734
                                        syncEnv()->pTimerManager, &pSyncNode->pPingTimer, 2);
14,387✔
1735
    if (stopped) {
14,387!
1736
      sError("vgId:%d, failed to reset ping timer, ms:%d, stopped:%d", pSyncNode->vgId, pSyncNode->pingTimerMS,
×
1737
             stopped);
1738
    }
1739
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
14,387✔
1740
  } else {
1741
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
×
1742
  }
1743
  return code;
14,387✔
1744
}
1745

1746
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
28,774✔
1747
  int32_t code = 0;
28,774✔
1748
  (void)atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
28,774✔
1749
  bool stop = taosTmrStop(pSyncNode->pPingTimer);
28,774✔
1750
  sDebug("vgId:%d, stop ping timer, stop:%d", pSyncNode->vgId, stop);
28,774✔
1751
  pSyncNode->pPingTimer = NULL;
28,774✔
1752
  return code;
28,774✔
1753
}
1754

1755
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
288,522✔
1756
  int32_t code = 0;
288,522✔
1757
  if (syncIsInit()) {
288,522!
1758
    pSyncNode->electTimerMS = ms;
288,523✔
1759

1760
    int64_t execTime = taosGetTimestampMs() + ms;
288,523✔
1761
    atomic_store_64(&(pSyncNode->electTimerParam.executeTime), execTime);
288,523✔
1762
    atomic_store_64(&(pSyncNode->electTimerParam.logicClock), pSyncNode->electTimerLogicClock);
288,523✔
1763
    pSyncNode->electTimerParam.pSyncNode = pSyncNode;
288,523✔
1764
    pSyncNode->electTimerParam.pData = NULL;
288,523✔
1765

1766
    bool stopped = taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid),
288,522✔
1767
                                syncEnv()->pTimerManager, &pSyncNode->pElectTimer);
288,523✔
1768
    if (stopped) sWarn("vgId:%d, failed to reset elect timer, ms:%d", pSyncNode->vgId, ms);
288,523!
1769
  } else {
1770
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
×
1771
  }
1772
  return code;
288,523✔
1773
}
1774

1775
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
329,291✔
1776
  int32_t code = 0;
329,291✔
1777
  (void)atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
329,291✔
1778
  bool stop = taosTmrStop(pSyncNode->pElectTimer);
329,290✔
1779
  sTrace("vgId:%d, stop elect timer, stop:%d", pSyncNode->vgId, stop);
329,291✔
1780
  pSyncNode->pElectTimer = NULL;
329,291✔
1781

1782
  return code;
329,291✔
1783
}
1784

1785
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
288,523✔
1786
  int32_t ret = 0;
288,523✔
1787
  TAOS_CHECK_RETURN(syncNodeStopElectTimer(pSyncNode));
288,523!
1788
  TAOS_CHECK_RETURN(syncNodeStartElectTimer(pSyncNode, ms));
288,523!
1789
  return ret;
288,523✔
1790
}
1791

1792
void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
288,522✔
1793
  int32_t code = 0;
288,522✔
1794
  int32_t electMS;
1795

1796
  if (pSyncNode->raftCfg.isStandBy) {
288,522!
1797
    electMS = TIMER_MAX_MS;
×
1798
  } else {
1799
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
288,522✔
1800
  }
1801

1802
  if ((code = syncNodeRestartElectTimer(pSyncNode, electMS)) != 0) {
288,523!
1803
    sWarn("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1804
    return;
×
1805
  };
1806

1807
  sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
288,523!
1808
          electMS);
1809
}
1810

1811
#ifdef BUILD_NO_CALL
1812
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
1813
  int32_t code = 0;
1814
  if (syncIsInit()) {
1815
    TAOS_CHECK_RETURN(taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, (void*)pSyncNode->rid,
1816
                                   syncEnv()->pTimerManager, &pSyncNode->pHeartbeatTimer));
1817
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
1818
  } else {
1819
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1820
  }
1821

1822
  sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
1823
  return code;
1824
}
1825
#endif
1826

1827
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
13,600✔
1828
  int32_t ret = 0;
13,600✔
1829

1830
#if 0
1831
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1832
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
1833
#endif
1834

1835
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
16,414✔
1836
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
2,814✔
1837
    if (pSyncTimer != NULL) {
2,814!
1838
      TAOS_CHECK_RETURN(syncHbTimerStart(pSyncNode, pSyncTimer));
2,814!
1839
    }
1840
  }
1841

1842
  return ret;
13,600✔
1843
}
1844

1845
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
36,160✔
1846
  int32_t code = 0;
36,160✔
1847

1848
#if 0
1849
  TAOS_CHECK_RETURN(atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1));
1850
  bool stop = taosTmrStop(pSyncNode->pHeartbeatTimer);
1851
  sDebug("vgId:%d, stop heartbeat timer, stop:%d", pSyncNode->vgId, stop);
1852
  pSyncNode->pHeartbeatTimer = NULL;
1853
#endif
1854

1855
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
63,550✔
1856
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
27,389✔
1857
    if (pSyncTimer != NULL) {
27,389!
1858
      TAOS_CHECK_RETURN(syncHbTimerStop(pSyncNode, pSyncTimer));
27,389!
1859
    }
1860
  }
1861

1862
  return code;
36,161✔
1863
}
1864

1865
#ifdef BUILD_NO_CALL
1866
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
1867
  // TODO check return value
1868
  int32_t code = 0;
1869
  TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1870
  TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
1871
  return 0;
1872
}
1873
#endif
1874

1875
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pNode, SRpcMsg* pMsg) {
616,642✔
1876
  SEpSet* epSet = NULL;
616,642✔
1877
  for (int32_t i = 0; i < pNode->peersNum; ++i) {
908,604✔
1878
    if (destRaftId->addr == pNode->peersId[i].addr) {
908,540✔
1879
      epSet = &pNode->peersEpset[i];
616,578✔
1880
      break;
616,578✔
1881
    }
1882
  }
1883

1884
  int32_t code = -1;
616,642✔
1885
  if (pNode->syncSendMSg != NULL && epSet != NULL) {
616,642!
1886
    syncUtilMsgHtoN(pMsg->pCont);
616,578✔
1887
    pMsg->info.noResp = 1;
616,574✔
1888
    code = pNode->syncSendMSg(epSet, pMsg);
616,574✔
1889
  }
1890

1891
  if (code < 0) {
616,642✔
1892
    sError("vgId:%d, failed to send sync msg since %s. epset:%p dnode:%d addr:0x%" PRIx64, pNode->vgId, tstrerror(code),
66!
1893
           epSet, DID(destRaftId), destRaftId->addr);
1894
    rpcFreeCont(pMsg->pCont);
66✔
1895
  }
1896

1897
  TAOS_RETURN(code);
616,642✔
1898
}
1899

1900
inline bool syncNodeInConfig(SSyncNode* pNode, const SSyncCfg* pCfg) {
2,358✔
1901
  bool b1 = false;
2,358✔
1902
  bool b2 = false;
2,358✔
1903

1904
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
2,810!
1905
    if (strcmp(pCfg->nodeInfo[i].nodeFqdn, pNode->myNodeInfo.nodeFqdn) == 0 &&
2,810!
1906
        pCfg->nodeInfo[i].nodePort == pNode->myNodeInfo.nodePort) {
2,810✔
1907
      b1 = true;
2,358✔
1908
      break;
2,358✔
1909
    }
1910
  }
1911

1912
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
2,810!
1913
    SRaftId raftId = {
2,810✔
1914
        .addr = SYNC_ADDR(&pCfg->nodeInfo[i]),
2,810✔
1915
        .vgId = pNode->vgId,
2,810✔
1916
    };
1917

1918
    if (syncUtilSameId(&raftId, &pNode->myRaftId)) {
2,810✔
1919
      b2 = true;
2,358✔
1920
      break;
2,358✔
1921
    }
1922
  }
1923

1924
  if (b1 != b2) {
2,358!
1925
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1926
    return false;
×
1927
  }
1928
  return b1;
2,358✔
1929
}
1930

1931
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
3,326✔
1932
  if (pOldCfg->totalReplicaNum != pNewCfg->totalReplicaNum) return true;
3,326✔
1933
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
2,240✔
1934
  for (int32_t i = 0; i < pOldCfg->totalReplicaNum; ++i) {
5,308✔
1935
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
3,793✔
1936
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
3,793✔
1937
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
3,793!
1938
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
3,793!
1939
    if (pOldInfo->nodeRole != pNewInfo->nodeRole) return true;
3,793✔
1940
  }
1941

1942
  return false;
1,515✔
1943
}
1944

1945
int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
1,796✔
1946
  int32_t  code = 0;
1,796✔
1947
  SSyncCfg oldConfig = pSyncNode->raftCfg.cfg;
1,796✔
1948
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
1,796✔
1949
    sInfo("vgId:1, sync not reconfig since not changed");
1,515✔
1950
    return 0;
1,515✔
1951
  }
1952

1953
  pSyncNode->raftCfg.cfg = *pNewConfig;
281✔
1954
  pSyncNode->raftCfg.lastConfigIndex = lastConfigChangeIndex;
281✔
1955

1956
  pSyncNode->configChangeNum++;
281✔
1957

1958
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
281✔
1959
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
281✔
1960

1961
  bool isDrop = false;
281✔
1962
  bool isAdd = false;
281✔
1963

1964
  if (IamInOld && !IamInNew) {
281!
1965
    isDrop = true;
×
1966
  } else {
1967
    isDrop = false;
281✔
1968
  }
1969

1970
  if (!IamInOld && IamInNew) {
281!
1971
    isAdd = true;
×
1972
  } else {
1973
    isAdd = false;
281✔
1974
  }
1975

1976
  // log begin config change
1977
  sNInfo(pSyncNode, "begin do config change, from %d to %d, from %" PRId64 " to %" PRId64 ", replicas:%d",
281!
1978
         pSyncNode->vgId, oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum, oldConfig.lastIndex,
1979
         pNewConfig->lastIndex);
1980

1981
  if (IamInNew) {
281!
1982
    pSyncNode->raftCfg.isStandBy = 0;  // change isStandBy to normal
281✔
1983
  }
1984
  if (isDrop) {
281!
1985
    pSyncNode->raftCfg.isStandBy = 1;  // set standby
×
1986
  }
1987

1988
  // add last config index
1989
  SRaftCfg* pCfg = &pSyncNode->raftCfg;
281✔
1990
  if (pCfg->configIndexCount >= MAX_CONFIG_INDEX_COUNT) {
281!
1991
    sNError(pSyncNode, "failed to add cfg index:%d since out of range", pCfg->configIndexCount);
×
1992
    terrno = TSDB_CODE_OUT_OF_RANGE;
×
1993
    return -1;
×
1994
  }
1995

1996
  pCfg->configIndexArr[pCfg->configIndexCount] = lastConfigChangeIndex;
281✔
1997
  pCfg->configIndexCount++;
281✔
1998

1999
  if (IamInNew) {
281!
2000
    //-----------------------------------------
2001
    int32_t ret = 0;
281✔
2002

2003
    // save snapshot senders
2004
    SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
2005
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
281✔
2006
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
2007
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
4,496✔
2008
      oldSenders[i] = pSyncNode->senders[i];
4,215✔
2009
      sSTrace(oldSenders[i], "snapshot sender save old");
4,215!
2010
    }
2011

2012
    // init internal
2013
    pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
281✔
2014
    if (syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId) == false) return terrno;
281!
2015

2016
    // init peersNum, peers, peersId
2017
    pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
281✔
2018
    int32_t j = 0;
281✔
2019
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
1,022✔
2020
      if (i != pSyncNode->raftCfg.cfg.myIndex) {
741✔
2021
        pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
460✔
2022
        syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
460✔
2023
        j++;
460✔
2024
      }
2025
    }
2026
    for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
741✔
2027
      if (syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]) == false)
460!
2028
        return terrno;
×
2029
    }
2030

2031
    // init replicaNum, replicasId
2032
    pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
281✔
2033
    pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
281✔
2034
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
1,022✔
2035
      if (syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]) ==
741!
2036
          false)
2037
        return terrno;
×
2038
    }
2039

2040
    // update quorum first
2041
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
281✔
2042

2043
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
281✔
2044
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
281✔
2045
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
281✔
2046
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
281✔
2047

2048
    // reset snapshot senders
2049

2050
    // clear new
2051
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
4,496✔
2052
      pSyncNode->senders[i] = NULL;
4,215✔
2053
    }
2054

2055
    // reset new
2056
    for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
1,022✔
2057
      // reset sender
2058
      bool reset = false;
741✔
2059
      for (int32_t j = 0; j < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++j) {
3,179✔
2060
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j]) && oldSenders[j] != NULL) {
3,045!
2061
          sNTrace(pSyncNode, "snapshot sender reset for:%" PRId64 ", newIndex:%d, dnode:%d, %p",
607!
2062
                  (pSyncNode->replicasId)[i].addr, i, DID(&pSyncNode->replicasId[i]), oldSenders[j]);
2063

2064
          pSyncNode->senders[i] = oldSenders[j];
607✔
2065
          oldSenders[j] = NULL;
607✔
2066
          reset = true;
607✔
2067

2068
          // reset replicaIndex
2069
          int32_t oldreplicaIndex = pSyncNode->senders[i]->replicaIndex;
607✔
2070
          pSyncNode->senders[i]->replicaIndex = i;
607✔
2071

2072
          sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, dnode:%d, %p, reset:%d",
607!
2073
                  oldreplicaIndex, i, DID(&pSyncNode->replicasId[i]), pSyncNode->senders[i], reset);
2074

2075
          break;
607✔
2076
        }
2077
      }
2078
    }
2079

2080
    // create new
2081
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
4,496✔
2082
      if (pSyncNode->senders[i] == NULL) {
4,215✔
2083
        TAOS_CHECK_RETURN(snapshotSenderCreate(pSyncNode, i, &pSyncNode->senders[i]));
3,608!
2084
        if (pSyncNode->senders[i] == NULL) {
3,608!
2085
          // will be created later while send snapshot
2086
          sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig");
×
2087
        } else {
2088
          sSDebug(pSyncNode->senders[i], "snapshot sender create while reconfig, data:%p", pSyncNode->senders[i]);
3,608!
2089
        }
2090
      } else {
2091
        sSDebug(pSyncNode->senders[i], "snapshot sender already exist, data:%p", pSyncNode->senders[i]);
607!
2092
      }
2093
    }
2094

2095
    // free old
2096
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
4,496✔
2097
      if (oldSenders[i] != NULL) {
4,215✔
2098
        sSDebug(oldSenders[i], "snapshot sender destroy old, data:%p replica-index:%d", oldSenders[i], i);
3,608!
2099
        snapshotSenderDestroy(oldSenders[i]);
3,608✔
2100
        oldSenders[i] = NULL;
3,608✔
2101
      }
2102
    }
2103

2104
    // persist cfg
2105
    TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode));
281!
2106
  } else {
2107
    // persist cfg
2108
    TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode));
×
2109
    sNInfo(pSyncNode, "do not config change from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
×
2110
  }
2111

2112
_END:
×
2113
  // log end config change
2114
  sNInfo(pSyncNode, "end do config change, from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
281!
2115
  return 0;
281✔
2116
}
2117

2118
// raft state change --------------
2119
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
851✔
2120
  if (term > raftStoreGetTerm(pSyncNode)) {
851!
2121
    raftStoreSetTerm(pSyncNode, term);
×
2122
  }
2123
}
851✔
2124

2125
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm, SRaftId id) {
222,110✔
2126
  SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
222,110✔
2127
  if (currentTerm > newTerm) {
222,110!
2128
    sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
×
2129
    return;
×
2130
  }
2131

2132
  do {
2133
    sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
222,110!
2134
  } while (0);
2135

2136
  if (pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
222,110✔
2137
    (void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
1✔
2138
    syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncNode->vgId, pSyncNode->arbToken);
1✔
2139
    sInfo("vgId:%d, step down as assigned leader, new arbToken:%s", pSyncNode->vgId, pSyncNode->arbToken);
1!
2140
    (void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
1✔
2141
  }
2142

2143
  if (currentTerm < newTerm) {
222,109✔
2144
    raftStoreSetTerm(pSyncNode, newTerm);
2,456✔
2145
    char tmpBuf[64];
2146
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
2,456✔
2147
    syncNodeBecomeFollower(pSyncNode, id, tmpBuf);
2,456✔
2148
    raftStoreClearVote(pSyncNode);
2,456✔
2149
  } else {
2150
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
219,653✔
2151
      syncNodeBecomeFollower(pSyncNode, id, "step down");
4✔
2152
    }
2153
  }
2154
}
2155

2156
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }
5,780✔
2157

2158
void syncNodeBecomeFollower(SSyncNode* pSyncNode, SRaftId leaderId, const char* debugStr) {
5,780✔
2159
  int32_t code = 0;  // maybe clear leader cache
5,780✔
2160
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
5,780✔
2161
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
29✔
2162
    pSyncNode->leaderCacheEp.port = 0;
29✔
2163
    pSyncNode->leaderCacheEp.fqdn[0] = '\0';
29✔
2164
  }
2165

2166
  pSyncNode->hbSlowNum = 0;
5,780✔
2167

2168
  pSyncNode->leaderCache = leaderId;  // state change
5,780✔
2169

2170
  for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
18,418✔
2171
    if (syncUtilSameId(&pSyncNode->replicasId[i], &leaderId)) {
15,098✔
2172
      pSyncNode->leaderCacheEp.port = pSyncNode->raftCfg.cfg.nodeInfo[i].nodePort;
2,460✔
2173
      strncpy(pSyncNode->leaderCacheEp.fqdn, pSyncNode->raftCfg.cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
2,460✔
2174
      break;
2,460✔
2175
    }
2176
  }
2177
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
5,780✔
2178
  pSyncNode->roleTimeMs = taosGetTimestampMs();
5,780✔
2179
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
5,780!
2180
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2181
    return;
×
2182
  }
2183

2184
  // trace log
2185
  sNTrace(pSyncNode, "become follower %s", debugStr);
5,780!
2186

2187
  // send rsp to client
2188
  syncNodeLeaderChangeRsp(pSyncNode);
5,780✔
2189

2190
  // call back
2191
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
5,780!
2192
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
5,780✔
2193
  }
2194

2195
  // min match index
2196
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
5,780✔
2197

2198
  // reset log buffer
2199
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
5,780!
2200
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2201
    return;
×
2202
  }
2203

2204
  // reset elect timer
2205
  syncNodeResetElectTimer(pSyncNode);
5,780✔
2206

2207
  sInfo("vgId:%d, become follower. %s", pSyncNode->vgId, debugStr);
5,780!
2208
}
2209

2210
void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) {
288✔
2211
  pSyncNode->hbSlowNum = 0;
288✔
2212

2213
  // state change
2214
  pSyncNode->state = TAOS_SYNC_STATE_LEARNER;
288✔
2215
  pSyncNode->roleTimeMs = taosGetTimestampMs();
288✔
2216

2217
  // trace log
2218
  sNTrace(pSyncNode, "become learner %s", debugStr);
288!
2219

2220
  // call back
2221
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLearnerCb != NULL) {
288!
2222
    pSyncNode->pFsm->FpBecomeLearnerCb(pSyncNode->pFsm);
288✔
2223
  }
2224

2225
  // min match index
2226
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
288✔
2227

2228
  // reset log buffer
2229
  int32_t code = 0;
288✔
2230
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
288!
2231
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2232
    return;
×
2233
  };
2234
}
2235

2236
// TLA+ Spec
2237
// \* Candidate i transitions to leader.
2238
// BecomeLeader(i) ==
2239
//     /\ state[i] = Candidate
2240
//     /\ votesGranted[i] \in Quorum
2241
//     /\ state'      = [state EXCEPT ![i] = Leader]
2242
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
2243
//                          [j \in Server |-> Len(log[i]) + 1]]
2244
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
2245
//                          [j \in Server |-> 0]]
2246
//     /\ elections'  = elections \cup
2247
//                          {[eterm     |-> currentTerm[i],
2248
//                            eleader   |-> i,
2249
//                            elog      |-> log[i],
2250
//                            evotes    |-> votesGranted[i],
2251
//                            evoterLog |-> voterLog[i]]}
2252
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
2253
//
2254
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
11,992✔
2255
  int32_t code = 0;
11,992✔
2256
  pSyncNode->becomeLeaderNum++;
11,992✔
2257
  pSyncNode->hbrSlowNum = 0;
11,992✔
2258

2259
  // reset restoreFinish
2260
  pSyncNode->restoreFinish = false;
11,992✔
2261

2262
  // state change
2263
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
11,992✔
2264
  pSyncNode->roleTimeMs = taosGetTimestampMs();
11,992✔
2265

2266
  // set leader cache
2267
  pSyncNode->leaderCache = pSyncNode->myRaftId;
11,992✔
2268
  strncpy(pSyncNode->leaderCacheEp.fqdn, pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeFqdn,
11,992✔
2269
          TSDB_FQDN_LEN);
2270
  pSyncNode->leaderCacheEp.port = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodePort;
11,992✔
2271

2272
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
26,326✔
2273
    SyncIndex lastIndex;
2274
    SyncTerm  lastTerm;
2275
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
14,334✔
2276
    if (code != 0) {
14,334!
2277
      sError("vgId:%d, failed to become leader since %s", pSyncNode->vgId, tstrerror(code));
×
2278
      return;
×
2279
    }
2280
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
14,334✔
2281
  }
2282

2283
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
26,326✔
2284
    // maybe overwrite myself, no harm
2285
    // just do it!
2286
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
14,334✔
2287
  }
2288

2289
  // init peer mgr
2290
  if ((code = syncNodePeerStateInit(pSyncNode)) != 0) {
11,992!
2291
    sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code));
×
2292
    return;
×
2293
  }
2294

2295
#if 0
2296
  // update sender private term
2297
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
2298
  if (pMySender != NULL) {
2299
    for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
2300
      if (pSyncNode->senders[i]->privateTerm > pMySender->privateTerm) {
2301
        pMySender->privateTerm = pSyncNode->senders[i]->privateTerm;
2302
      }
2303
    }
2304
    (pMySender->privateTerm) += 100;
2305
  }
2306
#endif
2307

2308
  // close receiver
2309
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
11,992!
2310
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
2311
  }
2312

2313
  // stop elect timer
2314
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
11,992!
2315
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
2316
    return;
×
2317
  }
2318

2319
  // start heartbeat timer
2320
  if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) {
11,992!
2321
    sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2322
    return;
×
2323
  }
2324

2325
  // send heartbeat right now
2326
  if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) {
11,992!
2327
    sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code));
×
2328
    return;
×
2329
  }
2330

2331
  // call back
2332
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
11,992!
2333
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
11,992✔
2334
  }
2335

2336
  // min match index
2337
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
11,992✔
2338

2339
  // reset log buffer
2340
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
11,992!
2341
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2342
    return;
×
2343
  }
2344

2345
  // trace log
2346
  sNInfo(pSyncNode, "become leader %s", debugStr);
11,992✔
2347
}
2348

2349
void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) {
2✔
2350
  int32_t code = 0;
2✔
2351
  pSyncNode->becomeAssignedLeaderNum++;
2✔
2352
  pSyncNode->hbrSlowNum = 0;
2✔
2353

2354
  // reset restoreFinish
2355
  // pSyncNode->restoreFinish = false;
2356

2357
  // state change
2358
  pSyncNode->state = TAOS_SYNC_STATE_ASSIGNED_LEADER;
2✔
2359
  pSyncNode->roleTimeMs = taosGetTimestampMs();
2✔
2360

2361
  // set leader cache
2362
  pSyncNode->leaderCache = pSyncNode->myRaftId;
2✔
2363

2364
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
6✔
2365
    SyncIndex lastIndex;
2366
    SyncTerm  lastTerm;
2367
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
4✔
2368
    if (code != 0) {
4!
2369
      sError("vgId:%d, failed to become assigned leader since %s", pSyncNode->vgId, tstrerror(code));
×
2370
      return;
×
2371
    }
2372
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
4✔
2373
  }
2374

2375
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
6✔
2376
    // maybe overwrite myself, no harm
2377
    // just do it!
2378
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
4✔
2379
  }
2380

2381
  // init peer mgr
2382
  if ((code = syncNodePeerStateInit(pSyncNode)) != 0) {
2!
2383
    sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code));
×
2384
    return;
×
2385
  }
2386

2387
  // close receiver
2388
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
2!
2389
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
2390
  }
2391

2392
  // stop elect timer
2393
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
2!
2394
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
2395
    return;
×
2396
  }
2397

2398
  // start heartbeat timer
2399
  if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) {
2!
2400
    sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2401
    return;
×
2402
  }
2403

2404
  // send heartbeat right now
2405
  if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) {
2!
2406
    sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code));
×
2407
    return;
×
2408
  }
2409

2410
  // call back
2411
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeAssignedLeaderCb != NULL) {
2!
2412
    pSyncNode->pFsm->FpBecomeAssignedLeaderCb(pSyncNode->pFsm);
2✔
2413
  }
2414

2415
  // min match index
2416
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
2✔
2417

2418
  // reset log buffer
2419
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
2!
2420
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2421
    return;
×
2422
  }
2423

2424
  // trace log
2425
  sNInfo(pSyncNode, "become assigned leader");
2!
2426
}
2427

2428
void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
1,197✔
2429
  if (pSyncNode->state != TAOS_SYNC_STATE_CANDIDATE) {
1,197!
2430
    sError("vgId:%d, failed leader from candidate since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state);
×
2431
    return;
×
2432
  }
2433
  bool granted = voteGrantedMajority(pSyncNode->pVotesGranted);
1,197✔
2434
  if (!granted) {
1,197!
2435
    sError("vgId:%d, not granted by majority.", pSyncNode->vgId);
×
2436
    return;
×
2437
  }
2438
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");
1,197✔
2439

2440
  sNTrace(pSyncNode, "state change syncNodeCandidate2Leader");
1,197!
2441

2442
  int32_t ret = syncNodeAppendNoop(pSyncNode);
1,197✔
2443
  if (ret < 0) {
1,197!
2444
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, terrstr());
×
2445
  }
2446

2447
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
1,197✔
2448

2449
  sInfo("vgId:%d, become leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64, pSyncNode->vgId,
1,197!
2450
        raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
2451
}
2452

2453
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }
150,238✔
2454

2455
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
26,380✔
2456
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
422,079✔
2457
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
395,699✔
2458
    pSyncNode->peerStates[i].lastSendTime = 0;
395,699✔
2459
  }
2460

2461
  return 0;
26,380✔
2462
}
2463

2464
void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
1,284✔
2465
  if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
1,284!
2466
    sError("vgId:%d, failed candidate from follower since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state);
×
2467
    return;
×
2468
  }
2469
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
1,284✔
2470
  pSyncNode->roleTimeMs = taosGetTimestampMs();
1,284✔
2471
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
1,284✔
2472
  sInfo("vgId:%d, become candidate from follower. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
1,284!
2473
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
2474

2475
  sNTrace(pSyncNode, "follower to candidate");
1,284!
2476
}
2477

2478
int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) {
×
2479
  if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) return TSDB_CODE_SYN_INTERNAL_ERROR;
×
2480
  syncNodeBecomeLeader(pSyncNode, "assigned leader to leader");
×
2481

2482
  sNTrace(pSyncNode, "assigned leader to leader");
×
2483

2484
  int32_t ret = syncNodeAppendNoop(pSyncNode);
×
2485
  if (ret < 0) {
×
2486
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, tstrerror(ret));
×
2487
  }
2488

2489
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
×
2490
  sInfo("vgId:%d, become leader from assigned leader. term:%" PRId64 ", commit index:%" PRId64
×
2491
        "assigned commit index:%" PRId64 ", last index:%" PRId64,
2492
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, pSyncNode->assignedCommitIndex,
2493
        lastIndex);
2494
  return 0;
×
2495
}
2496

2497
// just called by syncNodeVoteForSelf
2498
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
1,345✔
2499
  SyncTerm storeTerm = raftStoreGetTerm(pSyncNode);
1,345✔
2500
  if (term != storeTerm) {
1,345!
2501
    sError("vgId:%d, failed to vote for term, term:%" PRId64 ", storeTerm:%" PRId64, pSyncNode->vgId, term, storeTerm);
×
2502
    return;
×
2503
  }
2504
  sTrace("vgId:%d, begin hasVoted", pSyncNode->vgId);
1,345!
2505
  bool voted = raftStoreHasVoted(pSyncNode);
1,345✔
2506
  if (voted) {
1,345!
2507
    sError("vgId:%d, failed to vote for term since not voted", pSyncNode->vgId);
×
2508
    return;
×
2509
  }
2510

2511
  raftStoreVote(pSyncNode, pRaftId);
1,345✔
2512
}
2513

2514
// simulate get vote from outside
2515
void syncNodeVoteForSelf(SSyncNode* pSyncNode, SyncTerm currentTerm) {
1,345✔
2516
  syncNodeVoteForTerm(pSyncNode, currentTerm, &pSyncNode->myRaftId);
1,345✔
2517

2518
  SRpcMsg rpcMsg = {0};
1,345✔
2519
  int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
1,345✔
2520
  if (ret != 0) return;
1,345!
2521

2522
  SyncRequestVoteReply* pMsg = rpcMsg.pCont;
1,345✔
2523
  pMsg->srcId = pSyncNode->myRaftId;
1,345✔
2524
  pMsg->destId = pSyncNode->myRaftId;
1,345✔
2525
  pMsg->term = currentTerm;
1,345✔
2526
  pMsg->voteGranted = true;
1,345✔
2527

2528
  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
1,345✔
2529
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
1,345✔
2530
  rpcFreeCont(rpcMsg.pCont);
1,345✔
2531
}
2532

2533
// return if has a snapshot
2534
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
19,317✔
2535
  bool      ret = false;
19,317✔
2536
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
19,317✔
2537
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
19,317!
2538
    // TODO check return value
2539
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
19,317✔
2540
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
19,317✔
2541
      ret = true;
2,520✔
2542
    }
2543
  }
2544
  return ret;
19,317✔
2545
}
2546

2547
// return max(logLastIndex, snapshotLastIndex)
2548
// if no snapshot and log, return -1
2549
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
19,371✔
2550
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
19,371✔
2551
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
19,371!
2552
    // TODO check return value
2553
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
19,371✔
2554
  }
2555
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
19,371✔
2556

2557
  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
19,371✔
2558
  return lastIndex;
19,371✔
2559
}
2560

2561
// return the last term of snapshot and log
2562
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
2563
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
19,317✔
2564
  SyncTerm lastTerm = 0;
19,317✔
2565
  if (syncNodeHasSnapshot(pSyncNode)) {
19,317✔
2566
    // has snapshot
2567
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
2,520✔
2568
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
2,520!
2569
      // TODO check return value
2570
      (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
2,520✔
2571
    }
2572

2573
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
2,520✔
2574
    if (logLastIndex > snapshot.lastApplyIndex) {
2,520✔
2575
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
1,447✔
2576
    } else {
2577
      lastTerm = snapshot.lastApplyTerm;
1,073✔
2578
    }
2579

2580
  } else {
2581
    // no snapshot
2582
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
16,797✔
2583
  }
2584

2585
  return lastTerm;
19,317✔
2586
}
2587

2588
// get last index and term along with snapshot
2589
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
16,886✔
2590
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
16,886✔
2591
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
16,886✔
2592
  return 0;
16,886✔
2593
}
2594

2595
#ifdef BUILD_NO_CALL
2596
// return append-entries first try index
2597
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
2598
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
2599
  return syncStartIndex;
2600
}
2601

2602
// if index > 0, return index - 1
2603
// else, return -1
2604
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
2605
  SyncIndex preIndex = index - 1;
2606
  if (preIndex < SYNC_INDEX_INVALID) {
2607
    preIndex = SYNC_INDEX_INVALID;
2608
  }
2609

2610
  return preIndex;
2611
}
2612

2613
// if index < 0, return SYNC_TERM_INVALID
2614
// if index == 0, return 0
2615
// if index > 0, return preTerm
2616
// if error, return SYNC_TERM_INVALID
2617
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
2618
  if (index < SYNC_INDEX_BEGIN) {
2619
    return SYNC_TERM_INVALID;
2620
  }
2621

2622
  if (index == SYNC_INDEX_BEGIN) {
2623
    return 0;
2624
  }
2625

2626
  SyncTerm  preTerm = 0;
2627
  SyncIndex preIndex = index - 1;
2628

2629
  SSyncRaftEntry* pPreEntry = NULL;
2630
  SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
2631
  LRUHandle*      h = taosLRUCacheLookup(pCache, &preIndex, sizeof(preIndex));
2632
  int32_t         code = 0;
2633
  if (h) {
2634
    pPreEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
2635
    code = 0;
2636

2637
    pSyncNode->pLogStore->cacheHit++;
2638
    sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", preIndex, pPreEntry->bytes, pPreEntry);
2639

2640
  } else {
2641
    pSyncNode->pLogStore->cacheMiss++;
2642
    sNTrace(pSyncNode, "miss cache index:%" PRId64, preIndex);
2643

2644
    code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
2645
  }
2646

2647
  SSnapshot snapshot = {.data = NULL,
2648
                        .lastApplyIndex = SYNC_INDEX_INVALID,
2649
                        .lastApplyTerm = SYNC_TERM_INVALID,
2650
                        .lastConfigIndex = SYNC_INDEX_INVALID};
2651

2652
  if (code == 0) {
2653
    if (pPreEntry == NULL) return -1;
2654
    preTerm = pPreEntry->term;
2655

2656
    if (h) {
2657
      taosLRUCacheRelease(pCache, h, false);
2658
    } else {
2659
      syncEntryDestroy(pPreEntry);
2660
    }
2661

2662
    return preTerm;
2663
  } else {
2664
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
2665
      // TODO check return value
2666
      (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
2667
      if (snapshot.lastApplyIndex == preIndex) {
2668
        return snapshot.lastApplyTerm;
2669
      }
2670
    }
2671
  }
2672

2673
  sNError(pSyncNode, "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRId64,
2674
          index, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
2675
  return SYNC_TERM_INVALID;
2676
}
2677

2678
// get pre index and term of "index"
2679
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
2680
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
2681
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
2682
  return 0;
2683
}
2684
#endif
2685

2686
static void syncNodeEqPingTimer(void* param, void* tmrId) {
61,973✔
2687
  if (!syncIsInit()) return;
61,973!
2688

2689
  int64_t    rid = (int64_t)param;
61,973✔
2690
  SSyncNode* pNode = syncNodeAcquire(rid);
61,973✔
2691

2692
  if (pNode == NULL) return;
61,973!
2693

2694
  if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
61,973!
2695
    SRpcMsg rpcMsg = {0};
61,973✔
2696
    int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
61,973✔
2697
                                    pNode->pingTimerMS, pNode);
2698
    if (code != 0) {
61,973!
2699
      sError("failed to build ping msg");
×
2700
      rpcFreeCont(rpcMsg.pCont);
×
2701
      goto _out;
×
2702
    }
2703

2704
    // sTrace("enqueue ping msg");
2705
    code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
61,973✔
2706
    if (code != 0) {
61,973✔
2707
      sError("failed to sync enqueue ping msg since %s", terrstr());
1!
2708
      rpcFreeCont(rpcMsg.pCont);
1✔
2709
      goto _out;
1✔
2710
    }
2711

2712
  _out:
61,972✔
2713
    if (taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
61,973!
2714
                     &pNode->pPingTimer))
2715
      sError("failed to reset ping timer");
×
2716
  }
2717
  syncNodeRelease(pNode);
61,973✔
2718
}
2719

2720
static void syncNodeEqElectTimer(void* param, void* tmrId) {
1,352✔
2721
  if (!syncIsInit()) return;
1,358!
2722

2723
  int64_t    rid = (int64_t)param;
1,352✔
2724
  SSyncNode* pNode = syncNodeAcquire(rid);
1,352✔
2725

2726
  if (pNode == NULL) return;
1,352✔
2727

2728
  if (pNode->syncEqMsg == NULL) {
1,348!
2729
    syncNodeRelease(pNode);
×
2730
    return;
×
2731
  }
2732

2733
  int64_t tsNow = taosGetTimestampMs();
1,348✔
2734
  if (tsNow < pNode->electTimerParam.executeTime) {
1,348✔
2735
    syncNodeRelease(pNode);
2✔
2736
    return;
2✔
2737
  }
2738

2739
  SRpcMsg rpcMsg = {0};
1,346✔
2740
  int32_t code =
2741
      syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerParam.logicClock, pNode->electTimerMS, pNode);
1,346✔
2742

2743
  if (code != 0) {
1,346!
2744
    sError("failed to build elect msg");
×
2745
    syncNodeRelease(pNode);
×
2746
    return;
×
2747
  }
2748

2749
  SyncTimeout* pTimeout = rpcMsg.pCont;
1,346✔
2750
  sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);
1,346!
2751

2752
  code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
1,346✔
2753
  if (code != 0) {
1,346!
2754
    sError("failed to sync enqueue elect msg since %s", terrstr());
×
2755
    rpcFreeCont(rpcMsg.pCont);
×
2756
    syncNodeRelease(pNode);
×
2757
    return;
×
2758
  }
2759

2760
  syncNodeRelease(pNode);
1,346✔
2761
}
2762

2763
#ifdef BUILD_NO_CALL
2764
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
2765
  if (!syncIsInit()) return;
2766

2767
  int64_t    rid = (int64_t)param;
2768
  SSyncNode* pNode = syncNodeAcquire(rid);
2769

2770
  if (pNode == NULL) return;
2771

2772
  if (pNode->totalReplicaNum > 1) {
2773
    if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
2774
      SRpcMsg rpcMsg = {0};
2775
      int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
2776
                                      pNode->heartbeatTimerMS, pNode);
2777

2778
      if (code != 0) {
2779
        sError("failed to build heartbeat msg");
2780
        goto _out;
2781
      }
2782

2783
      sTrace("vgId:%d, enqueue heartbeat timer", pNode->vgId);
2784
      code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
2785
      if (code != 0) {
2786
        sError("failed to enqueue heartbeat msg since %s", terrstr());
2787
        rpcFreeCont(rpcMsg.pCont);
2788
        goto _out;
2789
      }
2790

2791
    _out:
2792
      if (taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
2793
                       &pNode->pHeartbeatTimer) != 0)
2794
        return;
2795

2796
    } else {
2797
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
2798
             pNode->heartbeatTimerLogicClock, pNode->heartbeatTimerLogicClockUser);
2799
    }
2800
  }
2801
}
2802
#endif
2803

2804
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
61,322✔
2805
  if (tsSyncLogHeartbeat) {
61,322!
2806
    sInfo("heartbeat timer start");
×
2807
  }
2808
  int32_t code = 0;
61,322✔
2809
  int64_t hbDataRid = (int64_t)param;
61,322✔
2810
  int64_t tsNow = taosGetTimestampMs();
61,322✔
2811

2812
  SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid);
61,322✔
2813
  if (pData == NULL) {
61,322!
2814
    sError("hb timer get pData NULL, %" PRId64, hbDataRid);
×
2815
    return;
×
2816
  }
2817

2818
  SSyncNode* pSyncNode = syncNodeAcquire(pData->syncNodeRid);
61,322✔
2819
  if (pSyncNode == NULL) {
61,322✔
2820
    syncHbTimerDataRelease(pData);
2✔
2821
    sError("hb timer get pSyncNode NULL");
2!
2822
    return;
2✔
2823
  }
2824

2825
  SSyncTimer* pSyncTimer = pData->pTimer;
61,320✔
2826

2827
  if (!pSyncNode->isStart) {
61,320!
2828
    syncNodeRelease(pSyncNode);
×
2829
    syncHbTimerDataRelease(pData);
×
2830
    sError("vgId:%d, hb timer sync node already stop", pSyncNode->vgId);
×
2831
    return;
×
2832
  }
2833

2834
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
61,320!
2835
    syncNodeRelease(pSyncNode);
×
2836
    syncHbTimerDataRelease(pData);
×
2837
    sError("vgId:%d, hb timer sync node not leader", pSyncNode->vgId);
×
2838
    return;
×
2839
  }
2840

2841
  if (tsSyncLogHeartbeat) {
61,320!
2842
    sInfo("vgId:%d, peer hb timer execution, rid:%" PRId64 " addr:0x%" PRIx64, pSyncNode->vgId, hbDataRid,
×
2843
          pData->destId.addr);
2844
  } else {
2845
    sTrace("vgId:%d, peer hb timer execution, rid:%" PRId64 " addr:0x%" PRIx64, pSyncNode->vgId, hbDataRid,
61,320!
2846
           pData->destId.addr);
2847
  }
2848

2849
  if (pSyncNode->totalReplicaNum > 1) {
61,320✔
2850
    int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
61,318✔
2851
    int64_t msgLogicClock = atomic_load_64(&pData->logicClock);
61,318✔
2852

2853
    if (timerLogicClock == msgLogicClock) {
61,318✔
2854
      if (tsNow > pData->execTime) {
61,314✔
2855
        pData->execTime += pSyncTimer->timerMS;
61,289✔
2856

2857
        SRpcMsg rpcMsg = {0};
61,289✔
2858
        if ((code = syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId)) != 0) {
61,289!
2859
          sError("vgId:%d, failed to build heartbeat msg since %s", pSyncNode->vgId, tstrerror(code));
×
2860
          syncNodeRelease(pSyncNode);
×
2861
          syncHbTimerDataRelease(pData);
×
2862
          return;
×
2863
        }
2864

2865
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);
61,289✔
2866

2867
        SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
61,289✔
2868
        pSyncMsg->srcId = pSyncNode->myRaftId;
61,289✔
2869
        pSyncMsg->destId = pData->destId;
61,289✔
2870
        pSyncMsg->term = raftStoreGetTerm(pSyncNode);
61,289✔
2871
        pSyncMsg->commitIndex = pSyncNode->commitIndex;
61,289✔
2872
        pSyncMsg->minMatchIndex = pSyncNode->minMatchIndex;
61,289✔
2873
        pSyncMsg->privateTerm = 0;
61,289✔
2874
        pSyncMsg->timeStamp = tsNow;
61,289✔
2875

2876
        // update reset time
2877
        int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
61,289✔
2878
        pSyncTimer->timeStamp = tsNow;
61,289✔
2879

2880
        // send msg
2881
        TRACE_SET_MSGID(&(rpcMsg.info.traceId), tGenIdPI64());
61,289✔
2882
        TRACE_SET_ROOTID(&(rpcMsg.info.traceId), tGenIdPI64());
61,289✔
2883
        syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime, &(rpcMsg.info.traceId));
61,289✔
2884
        int ret = syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
61,289✔
2885
        if (ret != 0) {
61,289✔
2886
          sError("vgId:%d, failed to send heartbeat since %s", pSyncNode->vgId, tstrerror(ret));
66!
2887
        }
2888
      }
2889

2890
      if (syncIsInit()) {
61,314!
2891
        if (tsSyncLogHeartbeat) {
61,314!
2892
          sInfo("vgId:%d, reset peer hb timer at %d", pSyncNode->vgId, pSyncTimer->timerMS);
×
2893
        } else {
2894
          sTrace("vgId:%d, reset peer hb timer at %d", pSyncNode->vgId, pSyncTimer->timerMS);
61,314!
2895
        }
2896
        bool stopped = taosTmrResetPriority(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, (void*)hbDataRid,
61,314✔
2897
                                            syncEnv()->pTimerManager, &pSyncTimer->pTimer, 2);
61,314✔
2898
        if (stopped) sError("vgId:%d, reset peer hb timer error, %s", pSyncNode->vgId, tstrerror(code));
61,314!
2899

2900
      } else {
2901
        sError("sync env is stop, reset peer hb timer error");
×
2902
      }
2903

2904
    } else {
2905
      sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64, pSyncNode->vgId,
4!
2906
             timerLogicClock, msgLogicClock);
2907
    }
2908

2909
    if (tsSyncLogHeartbeat) {
61,318!
2910
      sInfo("vgId:%d, finish send sync-heartbeat", pSyncNode->vgId);
×
2911
    }
2912
  }
2913

2914
  syncHbTimerDataRelease(pData);
61,320✔
2915
  syncNodeRelease(pSyncNode);
61,320✔
2916
  if (tsSyncLogHeartbeat) {
61,320!
2917
    sInfo("heartbeat timer stop");
×
2918
  }
2919
}
2920

2921
#ifdef BUILD_NO_CALL
2922
static void deleteCacheEntry(const void* key, size_t keyLen, void* value, void* ud) {
2923
  (void)ud;
2924
  taosMemoryFree(value);
2925
}
2926

2927
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
2928
  SSyncLogStoreData* pData = pLogStore->data;
2929
  sNTrace(pData->pSyncNode, "in cache index:%" PRId64 ", bytes:%u, %p", pEntry->index, pEntry->bytes, pEntry);
2930

2931
  int32_t   code = 0;
2932
  int32_t   entryLen = sizeof(*pEntry) + pEntry->dataLen;
2933
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
2934
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW, NULL);
2935
  if (status != TAOS_LRU_STATUS_OK) {
2936
    code = -1;
2937
  }
2938

2939
  return code;
2940
}
2941
#endif
2942

2943
void syncBuildConfigFromReq(SAlterVnodeReplicaReq* pReq, SSyncCfg* cfg) {  // TODO SAlterVnodeReplicaReq name is proper?
×
2944
  cfg->replicaNum = 0;
×
2945
  cfg->totalReplicaNum = 0;
×
2946
  int32_t code = 0;
×
2947

2948
  for (int i = 0; i < pReq->replica; ++i) {
×
2949
    SNodeInfo* pNode = &cfg->nodeInfo[i];
×
2950
    pNode->nodeId = pReq->replicas[i].id;
×
2951
    pNode->nodePort = pReq->replicas[i].port;
×
2952
    tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
×
2953
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2954
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
×
2955
    sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d, update:%d", pReq->vgId, i, pNode->nodeFqdn,
×
2956
          pNode->nodePort, pNode->nodeId, pNode->nodeRole, update);
2957
    cfg->replicaNum++;
×
2958
  }
2959
  if (pReq->selfIndex != -1) {
×
2960
    cfg->myIndex = pReq->selfIndex;
×
2961
  }
2962
  for (int i = cfg->replicaNum; i < pReq->replica + pReq->learnerReplica; ++i) {
×
2963
    SNodeInfo* pNode = &cfg->nodeInfo[i];
×
2964
    pNode->nodeId = pReq->learnerReplicas[cfg->totalReplicaNum].id;
×
2965
    pNode->nodePort = pReq->learnerReplicas[cfg->totalReplicaNum].port;
×
2966
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
2967
    tstrncpy(pNode->nodeFqdn, pReq->learnerReplicas[cfg->totalReplicaNum].fqdn, sizeof(pNode->nodeFqdn));
×
2968
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
×
2969
    sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d, update:%d", pReq->vgId, i, pNode->nodeFqdn,
×
2970
          pNode->nodePort, pNode->nodeId, pNode->nodeRole, update);
2971
    cfg->totalReplicaNum++;
×
2972
  }
2973
  cfg->totalReplicaNum += pReq->replica;
×
2974
  if (pReq->learnerSelfIndex != -1) {
×
2975
    cfg->myIndex = pReq->replica + pReq->learnerSelfIndex;
×
2976
  }
2977
  cfg->changeVersion = pReq->changeVersion;
×
2978
}
×
2979

2980
int32_t syncNodeCheckChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry) {
×
2981
  int32_t code = 0;
×
2982
  if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) {
×
2983
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
2984
  }
2985

2986
  SMsgHead* head = (SMsgHead*)pEntry->data;
×
2987
  void*     pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
×
2988

2989
  SAlterVnodeTypeReq req = {0};
×
2990
  if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
×
2991
    code = TSDB_CODE_INVALID_MSG;
×
2992
    TAOS_RETURN(code);
×
2993
  }
2994

2995
  SSyncCfg cfg = {0};
×
2996
  syncBuildConfigFromReq(&req, &cfg);
×
2997

2998
  if (cfg.totalReplicaNum >= 1 &&
×
2999
      (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
×
3000
    bool incfg = false;
×
3001
    for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3002
      if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3003
          ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3004
        incfg = true;
×
3005
        break;
×
3006
      }
3007
    }
3008

3009
    if (!incfg) {
×
3010
      SyncTerm currentTerm = raftStoreGetTerm(ths);
×
3011
      SRaftId  id = EMPTY_RAFT_ID;
×
3012
      syncNodeStepDown(ths, currentTerm, id);
×
3013
      return 1;
×
3014
    }
3015
  }
3016
  return 0;
×
3017
}
3018

3019
void syncNodeLogConfigInfo(SSyncNode* ths, SSyncCfg* cfg, char* str) {
×
3020
  sInfo("vgId:%d, %s. SyncNode, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64
×
3021
        ", changeVersion:%d, "
3022
        "restoreFinish:%d",
3023
        ths->vgId, str, ths->replicaNum, ths->peersNum, ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion,
3024
        ths->restoreFinish);
3025

3026
  sInfo("vgId:%d, %s, myNodeInfo, clusterId:0x%" PRIx64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str,
×
3027
        ths->myNodeInfo.clusterId, ths->myNodeInfo.nodeId, ths->myNodeInfo.nodeFqdn, ths->myNodeInfo.nodePort,
3028
        ths->myNodeInfo.nodeRole);
3029

3030
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3031
    sInfo("vgId:%d, %s, peersNodeInfo%d, clusterId:0x%" PRIx64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str,
×
3032
          i, ths->peersNodeInfo[i].clusterId, ths->peersNodeInfo[i].nodeId, ths->peersNodeInfo[i].nodeFqdn,
3033
          ths->peersNodeInfo[i].nodePort, ths->peersNodeInfo[i].nodeRole);
3034
  }
3035

3036
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3037
    char    buf[256];
3038
    int32_t len = 256;
×
3039
    int32_t n = 0;
×
3040
    n += tsnprintf(buf + n, len - n, "%s", "{");
×
3041
    for (int i = 0; i < ths->peersEpset->numOfEps; i++) {
×
3042
      n += tsnprintf(buf + n, len - n, "%s:%d%s", ths->peersEpset->eps[i].fqdn, ths->peersEpset->eps[i].port,
×
3043
                     (i + 1 < ths->peersEpset->numOfEps ? ", " : ""));
×
3044
    }
3045
    n += tsnprintf(buf + n, len - n, "%s", "}");
×
3046

3047
    sInfo("vgId:%d, %s, peersEpset%d, %s, inUse:%d", ths->vgId, str, i, buf, ths->peersEpset->inUse);
×
3048
  }
3049

3050
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3051
    sInfo("vgId:%d, %s, peersId%d, addr:0x%" PRIx64, ths->vgId, str, i, ths->peersId[i].addr);
×
3052
  }
3053

3054
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3055
    sInfo("vgId:%d, %s, nodeInfo%d, clusterId:0x%" PRIx64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str, i,
×
3056
          ths->raftCfg.cfg.nodeInfo[i].clusterId, ths->raftCfg.cfg.nodeInfo[i].nodeId,
3057
          ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, ths->raftCfg.cfg.nodeInfo[i].nodePort,
3058
          ths->raftCfg.cfg.nodeInfo[i].nodeRole);
3059
  }
3060

3061
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3062
    sInfo("vgId:%d, %s, replicasId%d, addr:0x%" PRIx64, ths->vgId, str, i, ths->replicasId[i].addr);
×
3063
  }
3064
}
×
3065

3066
int32_t syncNodeRebuildPeerAndCfg(SSyncNode* ths, SSyncCfg* cfg) {
×
3067
  int32_t i = 0;
×
3068

3069
  // change peersNodeInfo
3070
  i = 0;
×
3071
  for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3072
    if (!(strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3073
          ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)) {
×
3074
      ths->peersNodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
×
3075
      ths->peersNodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
×
3076
      tstrncpy(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
×
3077
      ths->peersNodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
×
3078
      ths->peersNodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
×
3079

3080
      syncUtilNodeInfo2EpSet(&ths->peersNodeInfo[i], &ths->peersEpset[i]);
×
3081

3082
      if (syncUtilNodeInfo2RaftId(&ths->peersNodeInfo[i], ths->vgId, &ths->peersId[i]) == false) {
×
3083
        sError("vgId:%d, failed to determine raft member id, peer:%d", ths->vgId, i);
×
3084
        return terrno;
×
3085
      }
3086

3087
      i++;
×
3088
    }
3089
  }
3090
  ths->peersNum = i;
×
3091

3092
  // change cfg nodeInfo
3093
  ths->raftCfg.cfg.replicaNum = 0;
×
3094
  i = 0;
×
3095
  for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3096
    if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3097
      ths->raftCfg.cfg.replicaNum++;
×
3098
    }
3099
    ths->raftCfg.cfg.nodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
×
3100
    ths->raftCfg.cfg.nodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
×
3101
    tstrncpy(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
×
3102
    ths->raftCfg.cfg.nodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
×
3103
    ths->raftCfg.cfg.nodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
×
3104
    if ((strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3105
         ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)) {
×
3106
      ths->raftCfg.cfg.myIndex = i;
×
3107
    }
3108
    i++;
×
3109
  }
3110
  ths->raftCfg.cfg.totalReplicaNum = i;
×
3111

3112
  return 0;
×
3113
}
3114

3115
void syncNodeChangePeerAndCfgToVoter(SSyncNode* ths, SSyncCfg* cfg) {
×
3116
  // change peersNodeInfo
3117
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3118
    for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3119
      if (strcmp(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3120
          ths->peersNodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort) {
×
3121
        if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3122
          ths->peersNodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3123
        }
3124
      }
3125
    }
3126
  }
3127

3128
  // change cfg nodeInfo
3129
  ths->raftCfg.cfg.replicaNum = 0;
×
3130
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3131
    for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3132
      if (strcmp(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3133
          ths->raftCfg.cfg.nodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort) {
×
3134
        if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3135
          ths->raftCfg.cfg.nodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3136
          ths->raftCfg.cfg.replicaNum++;
×
3137
        }
3138
      }
3139
    }
3140
  }
3141
}
×
3142

3143
int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum) {
×
3144
  int32_t code = 0;
×
3145
  // 1.rebuild replicasId, remove deleted one
3146
  SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
3147
  memcpy(oldReplicasId, ths->replicasId, sizeof(oldReplicasId));
×
3148

3149
  ths->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3150
  ths->totalReplicaNum = ths->raftCfg.cfg.totalReplicaNum;
×
3151
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3152
    if (syncUtilNodeInfo2RaftId(&ths->raftCfg.cfg.nodeInfo[i], ths->vgId, &ths->replicasId[i]) == false) return terrno;
×
3153
  }
3154

3155
  // 2.rebuild MatchIndex, remove deleted one
3156
  SSyncIndexMgr* oldIndex = ths->pMatchIndex;
×
3157

3158
  ths->pMatchIndex = syncIndexMgrCreate(ths);
×
3159
  if (ths->pMatchIndex == NULL) {
×
3160
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3161
    if (terrno != 0) code = terrno;
×
3162
    TAOS_RETURN(code);
×
3163
  }
3164

3165
  syncIndexMgrCopyIfExist(ths->pMatchIndex, oldIndex, oldReplicasId);
×
3166

3167
  syncIndexMgrDestroy(oldIndex);
×
3168

3169
  // 3.rebuild NextIndex, remove deleted one
3170
  SSyncIndexMgr* oldNextIndex = ths->pNextIndex;
×
3171

3172
  ths->pNextIndex = syncIndexMgrCreate(ths);
×
3173
  if (ths->pNextIndex == NULL) {
×
3174
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3175
    if (terrno != 0) code = terrno;
×
3176
    TAOS_RETURN(code);
×
3177
  }
3178

3179
  syncIndexMgrCopyIfExist(ths->pNextIndex, oldNextIndex, oldReplicasId);
×
3180

3181
  syncIndexMgrDestroy(oldNextIndex);
×
3182

3183
  // 4.rebuild pVotesGranted, pVotesRespond, no need to keep old vote state, only rebuild
3184
  voteGrantedUpdate(ths->pVotesGranted, ths);
×
3185
  votesRespondUpdate(ths->pVotesRespond, ths);
×
3186

3187
  // 5.rebuild logReplMgr
3188
  for (int i = 0; i < oldtotalReplicaNum; ++i) {
×
3189
    sDebug("vgId:%d, old logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId,
×
3190
           i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
3191
           ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
3192
  }
3193

3194
  SSyncLogReplMgr* oldLogReplMgrs = NULL;
×
3195
  int64_t          length = sizeof(SSyncLogReplMgr) * (TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA);
×
3196
  oldLogReplMgrs = taosMemoryMalloc(length);
×
3197
  if (NULL == oldLogReplMgrs) return terrno;
×
3198
  memset(oldLogReplMgrs, 0, length);
×
3199

3200
  for (int i = 0; i < oldtotalReplicaNum; i++) {
×
3201
    oldLogReplMgrs[i] = *(ths->logReplMgrs[i]);
×
3202
  }
3203

3204
  syncNodeLogReplDestroy(ths);
×
3205
  if ((code = syncNodeLogReplInit(ths)) != 0) {
×
3206
    taosMemoryFree(oldLogReplMgrs);
×
3207
    TAOS_RETURN(code);
×
3208
  }
3209

3210
  for (int i = 0; i < ths->totalReplicaNum; ++i) {
×
3211
    for (int j = 0; j < oldtotalReplicaNum; j++) {
×
3212
      if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
×
3213
        *(ths->logReplMgrs[i]) = oldLogReplMgrs[j];
×
3214
        ths->logReplMgrs[i]->peerId = i;
×
3215
      }
3216
    }
3217
  }
3218

3219
  for (int i = 0; i < ths->totalReplicaNum; ++i) {
×
3220
    sDebug("vgId:%d, new logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId,
×
3221
           i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
3222
           ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
3223
  }
3224

3225
  // 6.rebuild sender
3226
  for (int i = 0; i < oldtotalReplicaNum; ++i) {
×
3227
    sDebug("vgId:%d, old sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i,
×
3228
           ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
3229
  }
3230

3231
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3232
    if (ths->senders[i] != NULL) {
×
3233
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", ths->vgId, ths->senders[i]);
×
3234

3235
      if (snapshotSenderIsStart(ths->senders[i])) {
×
3236
        snapshotSenderStop(ths->senders[i], false);
×
3237
      }
3238

3239
      snapshotSenderDestroy(ths->senders[i]);
×
3240
      ths->senders[i] = NULL;
×
3241
    }
3242
  }
3243

3244
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3245
    SSyncSnapshotSender* pSender = NULL;
×
3246
    int32_t              code = snapshotSenderCreate(ths, i, &pSender);
×
3247
    if (pSender == NULL) return terrno = code;
×
3248

3249
    ths->senders[i] = pSender;
×
3250
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
×
3251
  }
3252

3253
  for (int i = 0; i < ths->totalReplicaNum; i++) {
×
3254
    sDebug("vgId:%d, new sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i,
×
3255
           ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
3256
  }
3257

3258
  // 7.rebuild synctimer
3259
  if ((code = syncNodeStopHeartbeatTimer(ths)) != 0) {
×
3260
    taosMemoryFree(oldLogReplMgrs);
×
3261
    TAOS_RETURN(code);
×
3262
  }
3263

3264
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3265
    if ((code = syncHbTimerInit(ths, &ths->peerHeartbeatTimerArr[i], ths->replicasId[i])) != 0) {
×
3266
      taosMemoryFree(oldLogReplMgrs);
×
3267
      TAOS_RETURN(code);
×
3268
    }
3269
  }
3270

3271
  if ((code = syncNodeStartHeartbeatTimer(ths)) != 0) {
×
3272
    taosMemoryFree(oldLogReplMgrs);
×
3273
    TAOS_RETURN(code);
×
3274
  }
3275

3276
  // 8.rebuild peerStates
3277
  SPeerState oldState[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0};
×
3278
  for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) {
×
3279
    oldState[i] = ths->peerStates[i];
×
3280
  }
3281

3282
  for (int i = 0; i < ths->totalReplicaNum; i++) {
×
3283
    for (int j = 0; j < oldtotalReplicaNum; j++) {
×
3284
      if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
×
3285
        ths->peerStates[i] = oldState[j];
×
3286
      }
3287
    }
3288
  }
3289

3290
  taosMemoryFree(oldLogReplMgrs);
×
3291

3292
  return 0;
×
3293
}
3294

3295
void syncNodeChangeToVoter(SSyncNode* ths) {
×
3296
  // replicasId, only need to change replicaNum when 1->3
3297
  ths->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3298
  sDebug("vgId:%d, totalReplicaNum:%d", ths->vgId, ths->totalReplicaNum);
×
3299
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
×
3300
    sDebug("vgId:%d, i:%d, replicaId.addr:0x%" PRIx64, ths->vgId, i, ths->replicasId[i].addr);
×
3301
  }
3302

3303
  // pMatchIndex, pNextIndex, only need to change replicaNum when 1->3
3304
  ths->pMatchIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3305
  ths->pNextIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3306

3307
  sDebug("vgId:%d, pMatchIndex->totalReplicaNum:%d", ths->vgId, ths->pMatchIndex->totalReplicaNum);
×
3308
  for (int32_t i = 0; i < ths->pMatchIndex->totalReplicaNum; ++i) {
×
3309
    sDebug("vgId:%d, i:%d, match.index:%" PRId64, ths->vgId, i, ths->pMatchIndex->index[i]);
×
3310
  }
3311

3312
  // pVotesGranted, pVotesRespond
3313
  voteGrantedUpdate(ths->pVotesGranted, ths);
×
3314
  votesRespondUpdate(ths->pVotesRespond, ths);
×
3315

3316
  // logRepMgrs
3317
  // no need to change logRepMgrs when 1->3
3318
}
×
3319

3320
void syncNodeResetPeerAndCfg(SSyncNode* ths) {
×
3321
  SNodeInfo node = {0};
×
3322
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3323
    memcpy(&ths->peersNodeInfo[i], &node, sizeof(SNodeInfo));
×
3324
  }
3325

3326
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3327
    memcpy(&ths->raftCfg.cfg.nodeInfo[i], &node, sizeof(SNodeInfo));
×
3328
  }
3329
}
×
3330

3331
int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str) {
×
3332
  int32_t code = 0;
×
3333
  if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) {
×
3334
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3335
  }
3336

3337
  SMsgHead* head = (SMsgHead*)pEntry->data;
×
3338
  void*     pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
×
3339

3340
  SAlterVnodeTypeReq req = {0};
×
3341
  if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
×
3342
    code = TSDB_CODE_INVALID_MSG;
×
3343
    TAOS_RETURN(code);
×
3344
  }
3345

3346
  SSyncCfg cfg = {0};
×
3347
  syncBuildConfigFromReq(&req, &cfg);
×
3348

3349
  if (cfg.changeVersion <= ths->raftCfg.cfg.changeVersion) {
×
3350
    sInfo(
×
3351
        "vgId:%d, skip conf change entry since lower version. "
3352
        "this entry, index:%" PRId64 ", term:%" PRId64
3353
        ", totalReplicaNum:%d, changeVersion:%d; "
3354
        "current node, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64 ", changeVersion:%d",
3355
        ths->vgId, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum,
3356
        ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion);
3357
    return 0;
×
3358
  }
3359

3360
  if (strcmp(str, "Commit") == 0) {
×
3361
    sInfo(
×
3362
        "vgId:%d, change config from %s. "
3363
        "this, i:%" PRId64
3364
        ", trNum:%d, vers:%d; "
3365
        "node, rNum:%d, pNum:%d, trNum:%d, "
3366
        "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
3367
        "), "
3368
        "cond:(next i:%" PRId64 ", t:%" PRId64 " ==%s)",
3369
        ths->vgId, str, pEntry->index - 1, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum,
3370
        ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, ths->pLogBuf->matchIndex,
3371
        ths->pLogBuf->endIndex, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType));
3372
  } else {
3373
    sInfo(
×
3374
        "vgId:%d, change config from %s. "
3375
        "this, i:%" PRId64 ", t:%" PRId64
3376
        ", trNum:%d, vers:%d; "
3377
        "node, rNum:%d, pNum:%d, trNum:%d, "
3378
        "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
3379
        "), "
3380
        "cond:(pre i:%" PRId64 "==ci:%" PRId64 ", bci:%" PRId64 ")",
3381
        ths->vgId, str, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum,
3382
        ths->peersNum, ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
3383
        ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex, pEntry->index - 1, ths->commitIndex,
3384
        ths->pLogBuf->commitIndex);
3385
  }
3386

3387
  syncNodeLogConfigInfo(ths, &cfg, "before config change");
×
3388

3389
  int32_t oldTotalReplicaNum = ths->totalReplicaNum;
×
3390

3391
  if (cfg.totalReplicaNum == 1 || cfg.totalReplicaNum == 2) {  // remove replica
×
3392

3393
    bool incfg = false;
×
3394
    for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3395
      if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3396
          ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3397
        incfg = true;
×
3398
        break;
×
3399
      }
3400
    }
3401

3402
    if (incfg) {  // remove other
×
3403
      syncNodeResetPeerAndCfg(ths);
×
3404

3405
      // no need to change myNodeInfo
3406

3407
      if ((code = syncNodeRebuildPeerAndCfg(ths, &cfg)) != 0) {
×
3408
        TAOS_RETURN(code);
×
3409
      };
3410

3411
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3412
        TAOS_RETURN(code);
×
3413
      };
3414
    } else {  // remove myself
3415
      // no need to do anything actually, to change the following to reduce distruptive server chance
3416

3417
      syncNodeResetPeerAndCfg(ths);
×
3418

3419
      // change myNodeInfo
3420
      ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3421

3422
      // change peer and cfg
3423
      ths->peersNum = 0;
×
3424
      memcpy(&ths->raftCfg.cfg.nodeInfo[0], &ths->myNodeInfo, sizeof(SNodeInfo));
×
3425
      ths->raftCfg.cfg.replicaNum = 0;
×
3426
      ths->raftCfg.cfg.totalReplicaNum = 1;
×
3427

3428
      // change other
3429
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3430
        TAOS_RETURN(code);
×
3431
      }
3432

3433
      // change state
3434
      ths->state = TAOS_SYNC_STATE_LEARNER;
×
3435
    }
3436

3437
    ths->restoreFinish = false;
×
3438
  } else {                            // add replica, or change replica type
3439
    if (ths->totalReplicaNum == 3) {  // change replica type
×
3440
      sInfo("vgId:%d, begin change replica type", ths->vgId);
×
3441

3442
      // change myNodeInfo
3443
      for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3444
        if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3445
            ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3446
          if (cfg.nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3447
            ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3448
          }
3449
        }
3450
      }
3451

3452
      // change peer and cfg
3453
      syncNodeChangePeerAndCfgToVoter(ths, &cfg);
×
3454

3455
      // change other
3456
      syncNodeChangeToVoter(ths);
×
3457

3458
      // change state
3459
      if (ths->state == TAOS_SYNC_STATE_LEARNER) {
×
3460
        if (ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3461
          ths->state = TAOS_SYNC_STATE_FOLLOWER;
×
3462
        }
3463
      }
3464

3465
      ths->restoreFinish = false;
×
3466
    } else {  // add replica
3467
      sInfo("vgId:%d, begin add replica", ths->vgId);
×
3468

3469
      // no need to change myNodeInfo
3470

3471
      // change peer and cfg
3472
      if ((code = syncNodeRebuildPeerAndCfg(ths, &cfg)) != 0) {
×
3473
        TAOS_RETURN(code);
×
3474
      };
3475

3476
      // change other
3477
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3478
        TAOS_RETURN(code);
×
3479
      };
3480

3481
      // no need to change state
3482

3483
      if (ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) {
×
3484
        ths->restoreFinish = false;
×
3485
      }
3486
    }
3487
  }
3488

3489
  ths->quorum = syncUtilQuorum(ths->replicaNum);
×
3490

3491
  ths->raftCfg.lastConfigIndex = pEntry->index;
×
3492
  ths->raftCfg.cfg.lastIndex = pEntry->index;
×
3493
  ths->raftCfg.cfg.changeVersion = cfg.changeVersion;
×
3494

3495
  syncNodeLogConfigInfo(ths, &cfg, "after config change");
×
3496

3497
  if ((code = syncWriteCfgFile(ths)) != 0) {
×
3498
    sError("vgId:%d, failed to create sync cfg file", ths->vgId);
×
3499
    TAOS_RETURN(code);
×
3500
  };
3501

3502
  TAOS_RETURN(code);
×
3503
}
3504

3505
int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry, SRpcMsg* pMsg) {
1,984,449✔
3506
  int32_t code = -1;
1,984,449✔
3507
  if (pEntry->dataLen < sizeof(SMsgHead)) {
1,984,449!
3508
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
3509
    sError("vgId:%d, msg:%p, cannot append an invalid client request with no msg head, type:%s dataLen:%d", ths->vgId,
×
3510
           pMsg, TMSG_INFO(pEntry->originalRpcType), pEntry->dataLen);
3511
    syncEntryDestroy(pEntry);
×
3512
    pEntry = NULL;
×
3513
    goto _out;
×
3514
  }
3515

3516
  // append to log buffer
3517
  if ((code = syncLogBufferAppend(ths->pLogBuf, ths, pEntry)) < 0) {
1,984,449✔
3518
    sError("vgId:%d, index:%" PRId64 ", failed to enqueue sync log buffer", ths->vgId, pEntry->index);
17!
3519
    int32_t ret = 0;
17✔
3520
    if ((ret = syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno, false)) != 0) {
17!
3521
      sError("vgId:%d, index:%" PRId64 ", failed to execute fsm since %s", ths->vgId, pEntry->index, tstrerror(ret));
×
3522
    }
3523
    syncEntryDestroy(pEntry);
×
3524
    pEntry = NULL;
×
3525
    goto _out;
×
3526
  }
3527

3528
  code = 0;
1,984,440✔
3529
_out:;
1,984,440✔
3530
  // proceed match index, with replicating on needed
3531
  SyncIndex       matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL, "Append", pMsg);
1,984,440✔
3532
  const STraceId* trace = pEntry ? &pEntry->originRpcTraceId : NULL;
1,984,409!
3533

3534
  if (pEntry != NULL) {
1,984,409!
3535
    sGDebug(trace,
1,984,439!
3536
            "vgId:%d, index:%" PRId64 ", raft entry appended, msg:%p term:%" PRId64 " buf:[%" PRId64 " %" PRId64
3537
            " %" PRId64 ", %" PRId64 ")",
3538
            ths->vgId, pEntry->index, pMsg, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
3539
            ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
3540
  }
3541

3542
  if (code == 0 && ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
1,984,409✔
3543
    int64_t index = syncNodeUpdateAssignedCommitIndex(ths, matchIndex);
5✔
3544
    sGTrace(trace, "vgId:%d, index:%" PRId64 ", update assigned commit, msg:%p", ths->vgId, index, pMsg);
5!
3545

3546
    if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
×
3547
        syncLogBufferCommit(ths->pLogBuf, ths, ths->assignedCommitIndex, trace, "append-entry") < 0) {
5✔
3548
      sGError(trace, "vgId:%d, index:%" PRId64 ", failed to commit, msg:%p commit index:%" PRId64, ths->vgId, index,
×
3549
              pMsg, ths->commitIndex);
3550
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
3551
    }
3552
  }
3553

3554
  // multi replica
3555
  if (ths->replicaNum > 1) {
1,984,372✔
3556
    TAOS_RETURN(code);
102,018✔
3557
  }
3558

3559
  // single replica
3560
  SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, matchIndex);
1,882,354✔
3561
  sGTrace(trace, "vgId:%d, index:%" PRId64 ", raft entry update commit, msg:%p return index:%" PRId64, ths->vgId,
1,882,342!
3562
          matchIndex, pMsg, returnIndex);
3563

3564
  if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
3,764,776!
3565
      (code = syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex, trace, "append-entry")) < 0) {
1,882,315✔
3566
    sGError(trace,
×
3567
            "vgId:%d, index:%" PRId64 ", failed to commit, msg:%p commit index:%" PRId64 " return index:%" PRId64,
3568
            ths->vgId, matchIndex, pMsg, ths->commitIndex, returnIndex);
3569
  }
3570

3571
  TAOS_RETURN(code);
1,882,461✔
3572
}
3573

3574
bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
1,974,019✔
3575
  if (pSyncNode->totalReplicaNum == 1) {
1,974,019✔
3576
    return false;
1,869,595✔
3577
  }
3578

3579
  int32_t toCount = 0;
104,424✔
3580
  int64_t tsNow = taosGetTimestampMs();
104,432✔
3581
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
310,652✔
3582
    if (pSyncNode->peersNodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
206,221✔
3583
      continue;
2,448✔
3584
    }
3585
    int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
203,773✔
3586
    if (recvTime == 0 || recvTime == -1) {
203,772!
3587
      continue;
×
3588
    }
3589

3590
    if (tsNow - recvTime > tsHeartbeatTimeout) {
203,772✔
3591
      toCount++;
693✔
3592
    }
3593
  }
3594

3595
  bool b = (toCount >= pSyncNode->quorum ? true : false);
104,431✔
3596

3597
  return b;
104,431✔
3598
}
3599

3600
bool syncNodeSnapshotSending(SSyncNode* pSyncNode) {
×
3601
  if (pSyncNode == NULL) return false;
×
3602
  bool b = false;
×
3603
  for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
×
3604
    if (pSyncNode->senders[i] != NULL && pSyncNode->senders[i]->start) {
×
3605
      b = true;
×
3606
      break;
×
3607
    }
3608
  }
3609
  return b;
×
3610
}
3611

3612
bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) {
×
3613
  if (pSyncNode == NULL) return false;
×
3614
  if (pSyncNode->pNewNodeReceiver == NULL) return false;
×
3615
  if (pSyncNode->pNewNodeReceiver->start) return true;
×
3616
  return false;
×
3617
}
3618

3619
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
11,994✔
3620
  int32_t   code = 0;
11,994✔
3621
  SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
11,994✔
3622
  SyncTerm  term = raftStoreGetTerm(ths);
11,994✔
3623

3624
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
11,994✔
3625
  if (pEntry == NULL) {
11,994!
3626
    code = TSDB_CODE_OUT_OF_MEMORY;
×
3627
    TAOS_RETURN(code);
×
3628
  }
3629

3630
  code = syncNodeAppend(ths, pEntry, NULL);
11,994✔
3631
  TAOS_RETURN(code);
11,994✔
3632
}
3633

3634
#ifdef BUILD_NO_CALL
3635
static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
3636
  int32_t ret = 0;
3637

3638
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
3639
  SyncTerm        term = raftStoreGetTerm(ths);
3640
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
3641
  if (pEntry == NULL) return -1;
3642

3643
  LRUHandle* h = NULL;
3644

3645
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
3646
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry, false);
3647
    if (code != 0) {
3648
      sError("append noop error");
3649
      return -1;
3650
    }
3651

3652
    syncCacheEntry(ths->pLogStore, pEntry, &h);
3653
  }
3654

3655
  if (h) {
3656
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
3657
  } else {
3658
    syncEntryDestroy(pEntry);
3659
  }
3660

3661
  return ret;
3662
}
3663
#endif
3664

3665
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
60,167✔
3666
  SyncHeartbeat* pMsg = pRpcMsg->pCont;
60,167✔
3667
  bool           resetElect = false;
60,167✔
3668

3669
  int64_t tsMs = taosGetTimestampMs();
60,167✔
3670

3671
  int64_t lastRecvTime = syncIndexMgrGetRecvTime(ths->pNextIndex, &(pMsg->srcId));
60,167✔
3672
  syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);
60,167✔
3673
  syncIndexMgrIncRecvCount(ths->pNextIndex, &(pMsg->srcId));
60,167✔
3674

3675
  int64_t netElapsed = tsMs - pMsg->timeStamp;
60,166✔
3676
  int64_t timeDiff = tsMs - lastRecvTime;
60,166✔
3677
  syncLogRecvHeartbeat(ths, pMsg, netElapsed, &pRpcMsg->info.traceId, timeDiff, pRpcMsg);
60,166✔
3678

3679
  if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) {
60,166!
3680
    sWarn(
×
3681
        "vgId:%d, drop heartbeat msg from dnode:%d, because it come from another cluster:%d, differ from current "
3682
        "cluster:%d",
3683
        ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), CID(&(ths->myRaftId)));
3684
    return 0;
×
3685
  }
3686

3687
  SyncTerm currentTerm = raftStoreGetTerm(ths);
60,166✔
3688

3689
  if (pMsg->term > currentTerm && ths->state == TAOS_SYNC_STATE_LEARNER) {
60,167✔
3690
    raftStoreSetTerm(ths, pMsg->term);
274✔
3691
    currentTerm = pMsg->term;
274✔
3692
  }
3693

3694
  int64_t tsMs2 = taosGetTimestampMs();
60,167✔
3695

3696
  int64_t processTime = tsMs2 - tsMs;
60,167✔
3697
  if (processTime > SYNC_HEARTBEAT_SLOW_MS) {
60,167!
3698
    sGError(&pRpcMsg->info.traceId,
×
3699
            "vgId:%d, process sync-heartbeat msg from dnode:%d, commit-index:%" PRId64 ", cluster:%d msgTerm:%" PRId64
3700
            " currentTerm:%" PRId64 ", processTime:%" PRId64,
3701
            ths->vgId, DID(&(pMsg->srcId)), pMsg->commitIndex, CID(&(pMsg->srcId)), pMsg->term, currentTerm,
3702
            processTime);
3703
  } else {
3704
    sGDebug(&pRpcMsg->info.traceId,
60,167!
3705
            "vgId:%d, process sync-heartbeat msg from dnode:%d, commit-index:%" PRId64 ", cluster:%d msgTerm:%" PRId64
3706
            " currentTerm:%" PRId64 ", processTime:%" PRId64,
3707
            ths->vgId, DID(&(pMsg->srcId)), pMsg->commitIndex, CID(&(pMsg->srcId)), pMsg->term, currentTerm,
3708
            processTime);
3709
  }
3710

3711
  if (pMsg->term == currentTerm &&
60,167✔
3712
      (ths->state != TAOS_SYNC_STATE_LEADER && ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
60,058!
3713
    resetElect = true;
60,058✔
3714

3715
    ths->minMatchIndex = pMsg->minMatchIndex;
60,058✔
3716

3717
    if (ths->state == TAOS_SYNC_STATE_FOLLOWER || ths->state == TAOS_SYNC_STATE_LEARNER) {
60,058✔
3718
      SRpcMsg rpcMsgLocalCmd = {0};
60,055✔
3719
      TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId));
60,055!
3720
      rpcMsgLocalCmd.info.traceId = pRpcMsg->info.traceId;
60,055✔
3721

3722
      SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
60,055✔
3723
      pSyncMsg->cmd =
60,055✔
3724
          (ths->state == TAOS_SYNC_STATE_LEARNER) ? SYNC_LOCAL_CMD_LEARNER_CMT : SYNC_LOCAL_CMD_FOLLOWER_CMT;
60,055✔
3725
      pSyncMsg->commitIndex = pMsg->commitIndex;
60,055✔
3726
      pSyncMsg->currentTerm = pMsg->term;
60,055✔
3727

3728
      if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
60,055!
3729
        int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
60,055✔
3730
        if (code != 0) {
60,054✔
3731
          sError("vgId:%d, failed to enqueue sync-local-cmd msg(cmd=commit) from heartbeat since %s",
1!
3732
                 ths->vgId, tstrerror(code));
3733
          rpcFreeCont(rpcMsgLocalCmd.pCont);
1✔
3734
        } else {
3735
          sGTrace(&pRpcMsg->info.traceId,
60,053!
3736
                  "vgId:%d, enqueue sync-local-cmd msg(cmd=commit) from heartbeat, commit-index:%" PRId64
3737
                  ", term:%" PRId64,
3738
                  ths->vgId, pMsg->commitIndex, pMsg->term);
3739
        }
3740
      }
3741
    }
3742
  }
3743

3744
  if (pMsg->term >= currentTerm &&
60,166!
3745
      (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
60,166!
3746
    SRpcMsg rpcMsgLocalCmd = {0};
×
3747
    TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId));
×
3748
    rpcMsgLocalCmd.info.traceId = pRpcMsg->info.traceId;
×
3749

3750
    SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
×
3751
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
×
3752
    pSyncMsg->currentTerm = pMsg->term;
×
3753
    pSyncMsg->commitIndex = pMsg->commitIndex;
×
3754

3755
    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
×
3756
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
×
3757
      if (code != 0) {
×
3758
        sError("vgId:%d, sync enqueue sync-local-cmd msg(cmd=step-down) error, code:%d", ths->vgId, code);
×
3759
        rpcFreeCont(rpcMsgLocalCmd.pCont);
×
3760
      } else {
3761
        sTrace("vgId:%d, sync enqueue sync-local-cmd msg(cmd=step-down), new-term:%" PRId64, ths->vgId, pMsg->term);
×
3762
      }
3763
    }
3764
  }
3765

3766
  SRpcMsg rpcMsg = {0};
60,166✔
3767
  TAOS_CHECK_RETURN(syncBuildHeartbeatReply(&rpcMsg, ths->vgId));
60,166!
3768
  SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
60,166✔
3769
  pMsgReply->destId = pMsg->srcId;
60,166✔
3770
  pMsgReply->srcId = ths->myRaftId;
60,166✔
3771
  pMsgReply->term = currentTerm;
60,166✔
3772
  pMsgReply->privateTerm = 8864;  // magic number
60,166✔
3773
  pMsgReply->startTime = ths->startTime;
60,166✔
3774
  pMsgReply->timeStamp = tsMs;
60,166✔
3775
  rpcMsg.info.traceId = pRpcMsg->info.traceId;
60,166✔
3776

3777
  // reply
3778
  int64_t tsMs3 = taosGetTimestampMs();
60,167✔
3779

3780
  int64_t processTime2 = tsMs3 - tsMs2;
60,167✔
3781
  TRACE_SET_MSGID(&(rpcMsg.info.traceId), tGenIdPI64());
60,167✔
3782
  if (processTime2 > SYNC_HEARTBEAT_SLOW_MS) {
60,167!
3783
    sGError(&rpcMsg.info.traceId,
×
3784
            "vgId:%d, send sync-heartbeat-reply to dnode:%d term:%" PRId64 " timestamp:%" PRId64
3785
            ", processTime:%" PRId64,
3786
            ths->vgId, DID(&(pMsgReply->destId)), pMsgReply->term, pMsgReply->timeStamp, processTime2);
3787
  } else {
3788
    if(tsSyncLogHeartbeat){
60,167!
3789
      sGInfo(&rpcMsg.info.traceId,
×
3790
            "vgId:%d, send sync-heartbeat-reply to dnode:%d term:%" PRId64 " timestamp:%" PRId64
3791
            ", processTime:%" PRId64,
3792
            ths->vgId, DID(&(pMsgReply->destId)), pMsgReply->term, pMsgReply->timeStamp, processTime2);
3793
    }
3794
    else{
3795
      sGDebug(&rpcMsg.info.traceId,
60,167!
3796
            "vgId:%d, send sync-heartbeat-reply to dnode:%d term:%" PRId64 " timestamp:%" PRId64
3797
            ", processTime:%" PRId64,
3798
            ths->vgId, DID(&(pMsgReply->destId)), pMsgReply->term, pMsgReply->timeStamp, processTime2);
3799
    }
3800
  }
3801

3802
  TAOS_CHECK_RETURN(syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg));
60,167!
3803

3804
  if (resetElect) syncNodeResetElectTimer(ths);
60,167✔
3805
  return 0;
60,167✔
3806
}
3807

3808
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
59,498✔
3809
  int32_t code = 0;
59,498✔
3810

3811
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
59,498✔
3812
  SSyncLogReplMgr*    pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId);
59,498✔
3813
  if (pMgr == NULL) {
59,498!
3814
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3815
    if (terrno != 0) code = terrno;
×
3816
    sError("vgId:%d, failed to get log repl mgr for the peer at addr 0x016%" PRIx64, ths->vgId, pMsg->srcId.addr);
×
3817
    TAOS_RETURN(code);
×
3818
  }
3819

3820
  int64_t tsMs = taosGetTimestampMs();
59,498✔
3821
  int64_t lastRecvTime = syncIndexMgrGetRecvTime(ths->pMatchIndex, &pMsg->srcId);
59,498✔
3822
  syncLogRecvHeartbeatReply(ths, pMsg, tsMs - pMsg->timeStamp, &pRpcMsg->info.traceId, tsMs - lastRecvTime);
59,498✔
3823

3824
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
59,498✔
3825
  syncIndexMgrIncRecvCount(ths->pMatchIndex, &(pMsg->srcId));
59,498✔
3826

3827
  return syncLogReplProcessHeartbeatReply(pMgr, ths, pMsg);
59,498✔
3828
}
3829

3830
#ifdef BUILD_NO_CALL
3831
int32_t syncNodeOnHeartbeatReplyOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
3832
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
3833

3834
  int64_t tsMs = taosGetTimestampMs();
3835
  int64_t timeDiff = tsMs - pMsg->timeStamp;
3836
  syncLogRecvHeartbeatReply(ths, pMsg, timeDiff, &pRpcMsg->info.traceId, timeDiff);
3837

3838
  // update last reply time, make decision whether the other node is alive or not
3839
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
3840
  return 0;
3841
}
3842
#endif
3843

3844
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
60,054✔
3845
  SyncLocalCmd* pMsg = pRpcMsg->pCont;
60,054✔
3846
  syncLogRecvLocalCmd(ths, pMsg, &pRpcMsg->info.traceId);
60,054✔
3847

3848
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
60,054!
3849
    SRaftId id = EMPTY_RAFT_ID;
×
3850
    syncNodeStepDown(ths, pMsg->currentTerm, id);
×
3851

3852
  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT || pMsg->cmd == SYNC_LOCAL_CMD_LEARNER_CMT) {
120,107!
3853
    if (syncLogBufferIsEmpty(ths->pLogBuf)) {
60,054!
3854
      sError("vgId:%d, sync log buffer is empty", ths->vgId);
×
3855
      return 0;
×
3856
    }
3857
    SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf);
60,054✔
3858
    if (matchTerm < 0) {
60,054!
3859
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3860
    }
3861
    if (pMsg->currentTerm == matchTerm) {
60,054✔
3862
      SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
56,261✔
3863
      sTrace("vgId:%d, raft entry update commit, return index:%" PRId64, ths->vgId, returnIndex);
56,261!
3864
    }
3865
    if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
120,107!
3866
        syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex, &pRpcMsg->info.traceId, "heartbeat") < 0) {
60,054✔
3867
      sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64, ths->vgId, terrstr(),
×
3868
             ths->commitIndex);
3869
    }
3870
  } else {
3871
    sError("error local cmd");
×
3872
  }
3873

3874
  return 0;
60,053✔
3875
}
3876

3877
// TLA+ Spec
3878
// ClientRequest(i, v) ==
3879
//     /\ state[i] = Leader
3880
//     /\ LET entry == [term  |-> currentTerm[i],
3881
//                      value |-> v]
3882
//            newLog == Append(log[i], entry)
3883
//        IN  log' = [log EXCEPT ![i] = newLog]
3884
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
3885
//                    leaderVars, commitIndex>>
3886
//
3887

3888
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
1,972,462✔
3889
  sGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, process client request", ths->vgId, pMsg);
1,972,462!
3890
  int32_t code = 0;
1,972,462✔
3891

3892
  SyncIndex       index = syncLogBufferGetEndIndex(ths->pLogBuf);
1,972,462✔
3893
  SyncTerm        term = raftStoreGetTerm(ths);
1,972,472✔
3894
  SSyncRaftEntry* pEntry = NULL;
1,972,474✔
3895
  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
1,972,474✔
3896
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index, &pMsg->info.traceId);
191,378✔
3897
  } else {
3898
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
1,781,096✔
3899
  }
3900

3901
  if (pEntry == NULL) {
1,972,462!
3902
    sGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to process client request since %s", ths->vgId, pMsg,
×
3903
            terrstr());
3904
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3905
  }
3906

3907
  // 1->2, config change is add in write thread, and will continue in sync thread
3908
  // need save message for it
3909
  if (pMsg->msgType == TDMT_SYNC_CONFIG_CHANGE) {
1,972,462!
3910
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
×
3911
    uint64_t  seqNum = syncRespMgrAdd(ths->pSyncRespMgr, &stub);
×
3912
    pEntry->seqNum = seqNum;
×
3913
  }
3914

3915
  if (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
1,972,462!
3916
    if (pRetIndex) {
1,972,462✔
3917
      (*pRetIndex) = index;
1,781,085✔
3918
    }
3919

3920
    if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
1,972,462!
3921
      int32_t code = syncNodeCheckChangeConfig(ths, pEntry);
×
3922
      if (code < 0) {
×
3923
        sGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to check change config since %s", ths->vgId, pMsg,
×
3924
                terrstr());
3925
        syncEntryDestroy(pEntry);
×
3926
        pEntry = NULL;
×
3927
        TAOS_RETURN(code);
×
3928
      }
3929

3930
      if (code > 0) {
×
3931
        SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
×
3932
        int32_t num = syncRespMgrGetAndDel(ths->pSyncRespMgr, pEntry->seqNum, &rsp.info);
×
3933
        sGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, get response stub for config change, seqNum:%" PRIu64 " num:%d",
×
3934
                ths->vgId, pMsg, pEntry->seqNum, num);
3935
        if (rsp.info.handle != NULL) {
×
3936
          tmsgSendRsp(&rsp);
×
3937
        }
3938
        syncEntryDestroy(pEntry);
×
3939
        pEntry = NULL;
×
3940
        TAOS_RETURN(code);
×
3941
      }
3942
    }
3943

3944
    code = syncNodeAppend(ths, pEntry, pMsg);
1,972,462✔
3945
    return code;
1,972,351✔
3946
  } else {
3947
    syncEntryDestroy(pEntry);
×
3948
    pEntry = NULL;
×
3949
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3950
  }
3951
}
3952

3953
const char* syncStr(ESyncState state) {
711,052✔
3954
  switch (state) {
711,052!
3955
    case TAOS_SYNC_STATE_FOLLOWER:
384,548✔
3956
      return "follower";
384,548✔
3957
    case TAOS_SYNC_STATE_CANDIDATE:
2,326✔
3958
      return "candidate";
2,326✔
3959
    case TAOS_SYNC_STATE_LEADER:
291,001✔
3960
      return "leader";
291,001✔
3961
    case TAOS_SYNC_STATE_ERROR:
×
3962
      return "error";
×
3963
    case TAOS_SYNC_STATE_OFFLINE:
5,374✔
3964
      return "offline";
5,374✔
3965
    case TAOS_SYNC_STATE_LEARNER:
27,786✔
3966
      return "learner";
27,786✔
3967
    case TAOS_SYNC_STATE_ASSIGNED_LEADER:
20✔
3968
      return "assigned leader";
20✔
3969
    default:
×
3970
      return "unknown";
×
3971
  }
3972
}
3973

3974
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
1,796✔
3975
  for (int32_t i = 0; i < pNewCfg->totalReplicaNum; ++i) {
2,038!
3976
    SRaftId raftId = {
2,038✔
3977
        .addr = SYNC_ADDR(&pNewCfg->nodeInfo[i]),
2,038✔
3978
        .vgId = ths->vgId,
2,038✔
3979
    };
3980

3981
    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
2,038✔
3982
      pNewCfg->myIndex = i;
1,796✔
3983
      return 0;
1,796✔
3984
    }
3985
  }
3986

3987
  return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3988
}
3989

3990
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
1,974,021✔
3991
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
1,974,021!
3992
}
3993

3994
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
549,936✔
3995
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
1,077,319!
3996
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
1,077,319✔
3997
      return true;
549,936✔
3998
    }
3999
  }
4000
  return false;
×
4001
}
4002

4003
SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
40,156✔
4004
  SSyncSnapshotSender* pSender = NULL;
40,156✔
4005
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
160,772✔
4006
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
120,616✔
4007
      pSender = (ths->senders)[i];
40,156✔
4008
    }
4009
  }
4010
  return pSender;
40,156✔
4011
}
4012

4013
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
30,203✔
4014
  SSyncTimer* pTimer = NULL;
30,203✔
4015
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
125,955✔
4016
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
95,747✔
4017
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
30,204✔
4018
    }
4019
  }
4020
  return pTimer;
30,208✔
4021
}
4022

4023
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
3,463✔
4024
  SPeerState* pState = NULL;
3,463✔
4025
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
11,745✔
4026
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
8,282✔
4027
      pState = &((ths->peerStates)[i]);
3,463✔
4028
    }
4029
  }
4030
  return pState;
3,463✔
4031
}
4032

4033
#ifdef BUILD_NO_CALL
4034
bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
4035
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
4036
  if (pState == NULL) {
4037
    sError("vgId:%d, replica maybe dropped", ths->vgId);
4038
    return false;
4039
  }
4040

4041
  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
4042
  int64_t   tsNow = taosGetTimestampMs();
4043

4044
  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
4045
    return false;
4046
  }
4047

4048
  return true;
4049
}
4050

4051
bool syncNodeCanChange(SSyncNode* pSyncNode) {
4052
  if (pSyncNode->changing) {
4053
    sError("sync cannot change");
4054
    return false;
4055
  }
4056

4057
  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
4058
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
4059
    if (pSyncNode->commitIndex != lastIndex) {
4060
      sError("sync cannot change2");
4061
      return false;
4062
    }
4063
  }
4064

4065
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
4066
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
4067
    if (pSender != NULL && pSender->start) {
4068
      sError("sync cannot change3");
4069
      return false;
4070
    }
4071
  }
4072

4073
  return true;
4074
}
4075
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc