• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3530

16 Nov 2024 07:44AM UTC coverage: 60.219% (-0.7%) from 60.888%
#3530

push

travis-ci

web-flow
Update 03-ad.md

118417 of 252124 branches covered (46.97%)

Branch coverage included in aggregate %.

198982 of 274951 relevant lines covered (72.37%)

6072359.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.65
/source/libs/sync/src/syncMain.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "sync.h"
18
#include "syncAppendEntries.h"
19
#include "syncAppendEntriesReply.h"
20
#include "syncCommit.h"
21
#include "syncElection.h"
22
#include "syncEnv.h"
23
#include "syncIndexMgr.h"
24
#include "syncInt.h"
25
#include "syncMessage.h"
26
#include "syncPipeline.h"
27
#include "syncRaftCfg.h"
28
#include "syncRaftLog.h"
29
#include "syncRaftStore.h"
30
#include "syncReplication.h"
31
#include "syncRequestVote.h"
32
#include "syncRequestVoteReply.h"
33
#include "syncRespMgr.h"
34
#include "syncSnapshot.h"
35
#include "syncTimeout.h"
36
#include "syncUtil.h"
37
#include "syncVoteMgr.h"
38
#include "tglobal.h"
39
#include "tmisce.h"
40
#include "tref.h"
41

42
static void    syncNodeEqPingTimer(void* param, void* tmrId);
43
static void    syncNodeEqElectTimer(void* param, void* tmrId);
44
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
45
static int32_t syncNodeAppendNoop(SSyncNode* ths);
46
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
47
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
48
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
49
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
50
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
51
static int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
52
static bool    syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
53
static int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
54
static bool    syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);
55

56
static bool    syncNodeCanChange(SSyncNode* pSyncNode);
57
static int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
58
static int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
59
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);
60

61
static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
62

63
int64_t syncOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
14,226✔
64
  sInfo("vgId:%d, start to open sync", pSyncInfo->vgId);
14,226!
65
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo, vnodeVersion);
14,226✔
66
  if (pSyncNode == NULL) {
14,228!
67
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
×
68
    return -1;
×
69
  }
70

71
  pSyncNode->rid = syncNodeAdd(pSyncNode);
14,228✔
72
  if (pSyncNode->rid < 0) {
14,228!
73
    syncNodeClose(pSyncNode);
×
74
    return -1;
×
75
  }
76

77
  pSyncNode->pingBaseLine = pSyncInfo->pingMs;
14,228✔
78
  pSyncNode->pingTimerMS = pSyncInfo->pingMs;
14,228✔
79
  pSyncNode->electBaseLine = pSyncInfo->electMs;
14,228✔
80
  pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
14,228✔
81
  pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
14,228✔
82
  pSyncNode->msgcb = pSyncInfo->msgcb;
14,228✔
83
  sInfo("vgId:%d, sync opened", pSyncInfo->vgId);
14,228!
84
  return pSyncNode->rid;
14,228✔
85
}
86

87
int32_t syncStart(int64_t rid) {
14,227✔
88
  int32_t    code = 0;
14,227✔
89
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
14,227✔
90
  if (pSyncNode == NULL) {
14,227!
91
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
92
    if (terrno != 0) code = terrno;
×
93
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
94
    TAOS_RETURN(code);
×
95
  }
96
  sInfo("vgId:%d, begin to start sync", pSyncNode->vgId);
14,227!
97

98
  if ((code = syncNodeRestore(pSyncNode)) < 0) {
14,227!
99
    sError("vgId:%d, failed to restore sync log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
100
    goto _err;
×
101
  }
102

103
  if ((code = syncNodeStart(pSyncNode)) < 0) {
14,225!
104
    sError("vgId:%d, failed to start sync node since %s", pSyncNode->vgId, tstrerror(code));
×
105
    goto _err;
×
106
  }
107

108
  syncNodeRelease(pSyncNode);
14,227✔
109

110
  sInfo("vgId:%d, sync started", pSyncNode->vgId);
14,226!
111

112
  TAOS_RETURN(code);
14,227✔
113

114
_err:
×
115
  syncNodeRelease(pSyncNode);
×
116
  TAOS_RETURN(code);
×
117
}
118

119
int32_t syncNodeGetConfig(int64_t rid, SSyncCfg* cfg) {
26,105✔
120
  int32_t    code = 0;
26,105✔
121
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
26,105✔
122

123
  if (pSyncNode == NULL) {
26,106!
124
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
125
    if (terrno != 0) code = terrno;
×
126
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
127
    TAOS_RETURN(code);
×
128
  }
129

130
  *cfg = pSyncNode->raftCfg.cfg;
26,106✔
131

132
  syncNodeRelease(pSyncNode);
26,106✔
133

134
  return 0;
26,105✔
135
}
136

137
void syncStop(int64_t rid) {
14,227✔
138
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
14,227✔
139
  if (pSyncNode != NULL) {
14,227!
140
    pSyncNode->isStart = false;
14,227✔
141
    syncNodeRelease(pSyncNode);
14,227✔
142
    syncNodeRemove(rid);
14,227✔
143
  }
144
}
14,224✔
145

146
void syncPreStop(int64_t rid) {
14,227✔
147
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
14,227✔
148
  if (pSyncNode != NULL) {
14,227!
149
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
14,227!
150
      sInfo("vgId:%d, stop snapshot receiver", pSyncNode->vgId);
×
151
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
152
    }
153
    syncNodePreClose(pSyncNode);
14,227✔
154
    syncNodeRelease(pSyncNode);
14,226✔
155
  }
156
}
14,226✔
157

158
void syncPostStop(int64_t rid) {
12,271✔
159
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
12,271✔
160
  if (pSyncNode != NULL) {
12,271!
161
    syncNodePostClose(pSyncNode);
12,271✔
162
    syncNodeRelease(pSyncNode);
12,270✔
163
  }
164
}
12,271✔
165

166
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
1,834✔
167
  if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
1,834!
168
  return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
1,834!
169
}
170

171
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
1,936✔
172
  int32_t    code = 0;
1,936✔
173
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
1,936✔
174
  if (pSyncNode == NULL) {
1,936!
175
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
176
    if (terrno != 0) code = terrno;
×
177
    TAOS_RETURN(code);
×
178
  }
179

180
  if (pSyncNode->raftCfg.lastConfigIndex >= pNewCfg->lastIndex) {
1,936✔
181
    syncNodeRelease(pSyncNode);
102✔
182
    sInfo("vgId:%d, no need Reconfig, current index:%" PRId64 ", new index:%" PRId64, pSyncNode->vgId,
102!
183
          pSyncNode->raftCfg.lastConfigIndex, pNewCfg->lastIndex);
184
    return 0;
102✔
185
  }
186

187
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
1,834!
188
    syncNodeRelease(pSyncNode);
×
189
    code = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
×
190
    sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
×
191
    TAOS_RETURN(code);
×
192
  }
193

194
  TAOS_CHECK_RETURN(syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg));
1,834!
195

196
  if (syncNodeDoConfigChange(pSyncNode, pNewCfg, pNewCfg->lastIndex) != 0) {
1,834!
197
    code = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
×
198
    sError("vgId:%d, failed to reconfig since do change error", pSyncNode->vgId);
×
199
    TAOS_RETURN(code);
×
200
  }
201

202
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER || pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
1,834!
203
    // TODO check return value
204
    TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1,654!
205

206
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
26,464✔
207
      TAOS_CHECK_RETURN(syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]));
24,810!
208
    }
209

210
    TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
1,654!
211
    // syncNodeReplicate(pSyncNode);
212
  }
213

214
  syncNodeRelease(pSyncNode);
1,834✔
215
  TAOS_RETURN(code);
1,834✔
216
}
217

218
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
1,130,961✔
219
  int32_t code = -1;
1,130,961✔
220
  if (!syncIsInit()) {
1,130,961!
221
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
222
    if (terrno != 0) code = terrno;
×
223
    TAOS_RETURN(code);
×
224
  }
225

226
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
1,130,970✔
227
  if (pSyncNode == NULL) {
1,131,056✔
228
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
65✔
229
    if (terrno != 0) code = terrno;
65!
230
    TAOS_RETURN(code);
×
231
  }
232

233
  switch (pMsg->msgType) {
1,130,991!
234
    case TDMT_SYNC_HEARTBEAT:
42,429✔
235
      code = syncNodeOnHeartbeat(pSyncNode, pMsg);
42,429✔
236
      break;
42,429✔
237
    case TDMT_SYNC_HEARTBEAT_REPLY:
42,263✔
238
      code = syncNodeOnHeartbeatReply(pSyncNode, pMsg);
42,263✔
239
      break;
42,263✔
240
    case TDMT_SYNC_TIMEOUT:
46,185✔
241
      code = syncNodeOnTimeout(pSyncNode, pMsg);
46,185✔
242
      break;
46,222✔
243
    case TDMT_SYNC_TIMEOUT_ELECTION:
1,260✔
244
      code = syncNodeOnTimeout(pSyncNode, pMsg);
1,260✔
245
      break;
1,260✔
246
    case TDMT_SYNC_CLIENT_REQUEST:
233,961✔
247
      code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
233,961✔
248
      break;
233,965✔
249
    case TDMT_SYNC_REQUEST_VOTE:
2,149✔
250
      code = syncNodeOnRequestVote(pSyncNode, pMsg);
2,149✔
251
      break;
2,149✔
252
    case TDMT_SYNC_REQUEST_VOTE_REPLY:
2,078✔
253
      code = syncNodeOnRequestVoteReply(pSyncNode, pMsg);
2,078✔
254
      break;
2,078✔
255
    case TDMT_SYNC_APPEND_ENTRIES:
356,860✔
256
      code = syncNodeOnAppendEntries(pSyncNode, pMsg);
356,860✔
257
      break;
356,866✔
258
    case TDMT_SYNC_APPEND_ENTRIES_REPLY:
356,597✔
259
      code = syncNodeOnAppendEntriesReply(pSyncNode, pMsg);
356,597✔
260
      break;
356,598✔
261
    case TDMT_SYNC_SNAPSHOT_SEND:
2,400✔
262
      code = syncNodeOnSnapshot(pSyncNode, pMsg);
2,400✔
263
      break;
2,400✔
264
    case TDMT_SYNC_SNAPSHOT_RSP:
2,462✔
265
      code = syncNodeOnSnapshotRsp(pSyncNode, pMsg);
2,462✔
266
      break;
2,462✔
267
    case TDMT_SYNC_LOCAL_CMD:
42,341✔
268
      code = syncNodeOnLocalCmd(pSyncNode, pMsg);
42,341✔
269
      break;
42,341✔
270
    case TDMT_SYNC_FORCE_FOLLOWER:
6✔
271
      code = syncForceBecomeFollower(pSyncNode, pMsg);
6✔
272
      break;
6✔
273
    case TDMT_SYNC_SET_ASSIGNED_LEADER:
×
274
      code = syncBecomeAssignedLeader(pSyncNode, pMsg);
×
275
      break;
×
276
    default:
×
277
      code = TSDB_CODE_MSG_NOT_PROCESSED;
×
278
  }
279

280
  syncNodeRelease(pSyncNode);
1,131,039✔
281
  if (code != 0) {
1,131,042✔
282
    sDebug("vgId:%d, failed to process sync msg:%p type:%s since %s", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
20!
283
           tstrerror(code));
284
  }
285
  TAOS_RETURN(code);
1,131,042✔
286
}
287

288
int32_t syncLeaderTransfer(int64_t rid) {
14,226✔
289
  int32_t    code = 0;
14,226✔
290
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
14,226✔
291
  if (pSyncNode == NULL) {
14,227!
292
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
293
    if (terrno != 0) code = terrno;
×
294
    TAOS_RETURN(code);
×
295
  }
296

297
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
14,227✔
298
  syncNodeRelease(pSyncNode);
14,227✔
299
  return ret;
14,227✔
300
}
301

302
int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
6✔
303
  syncNodeBecomeFollower(ths, "force election");
6✔
304

305
  SRpcMsg rsp = {
6✔
306
      .code = 0,
307
      .pCont = pRpcMsg->info.rsp,
6✔
308
      .contLen = pRpcMsg->info.rspLen,
6✔
309
      .info = pRpcMsg->info,
310
  };
311
  tmsgSendRsp(&rsp);
6✔
312

313
  return 0;
6✔
314
}
315

316
int32_t syncBecomeAssignedLeader(SSyncNode* ths, SRpcMsg* pRpcMsg) {
×
317
  int32_t code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
318
  void*   pHead = NULL;
×
319
  int32_t contLen = 0;
×
320

321
  SVArbSetAssignedLeaderReq req = {0};
×
322
  if (tDeserializeSVArbSetAssignedLeaderReq((char*)pRpcMsg->pCont + sizeof(SMsgHead), pRpcMsg->contLen, &req) != 0) {
×
323
    sError("vgId:%d, failed to deserialize SVArbSetAssignedLeaderReq", ths->vgId);
×
324
    code = TSDB_CODE_INVALID_MSG;
×
325
    goto _OVER;
×
326
  }
327

328
  if (ths->arbTerm > req.arbTerm) {
×
329
    sInfo("vgId:%d, skip to set assigned leader, msg with lower term, local:%" PRId64 "msg:%" PRId64, ths->vgId,
×
330
          ths->arbTerm, req.arbTerm);
331
    goto _OVER;
×
332
  }
333

334
  ths->arbTerm = TMAX(req.arbTerm, ths->arbTerm);
×
335

336
  if (strncmp(req.memberToken, ths->arbToken, TSDB_ARB_TOKEN_SIZE) != 0) {
×
337
    sInfo("vgId:%d, skip to set assigned leader, token mismatch, local:%s, msg:%s", ths->vgId, ths->arbToken,
×
338
          req.memberToken);
339
    goto _OVER;
×
340
  }
341

342
  if (ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
×
343
    code = TSDB_CODE_SUCCESS;
×
344
    raftStoreNextTerm(ths);
×
345
    if (terrno != TSDB_CODE_SUCCESS) {
×
346
      code = terrno;
×
347
      sError("vgId:%d, failed to set next term since:%s", ths->vgId, tstrerror(code));
×
348
      goto _OVER;
×
349
    }
350
    syncNodeBecomeAssignedLeader(ths);
×
351

352
    if ((code = syncNodeAppendNoop(ths)) < 0) {
×
353
      sError("vgId:%d, assigned leader failed to append noop entry since %s", ths->vgId, tstrerror(code));
×
354
    }
355
  }
356

357
  SVArbSetAssignedLeaderRsp rsp = {0};
×
358
  rsp.arbToken = req.arbToken;
×
359
  rsp.memberToken = req.memberToken;
×
360
  rsp.vgId = ths->vgId;
×
361

362
  contLen = tSerializeSVArbSetAssignedLeaderRsp(NULL, 0, &rsp);
×
363
  if (contLen <= 0) {
×
364
    code = TSDB_CODE_OUT_OF_MEMORY;
×
365
    sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId);
×
366
    goto _OVER;
×
367
  }
368
  pHead = rpcMallocCont(contLen);
×
369
  if (!pHead) {
×
370
    code = terrno;
×
371
    sError("vgId:%d, failed to malloc memory for SVArbSetAssignedLeaderRsp", ths->vgId);
×
372
    goto _OVER;
×
373
  }
374
  if (tSerializeSVArbSetAssignedLeaderRsp(pHead, contLen, &rsp) <= 0) {
×
375
    code = TSDB_CODE_OUT_OF_MEMORY;
×
376
    sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId);
×
377
    rpcFreeCont(pHead);
×
378
    goto _OVER;
×
379
  }
380

381
  code = TSDB_CODE_SUCCESS;
×
382

383
_OVER:;
×
384
  SRpcMsg rspMsg = {
×
385
      .code = code,
386
      .pCont = pHead,
387
      .contLen = contLen,
388
      .info = pRpcMsg->info,
389
  };
390

391
  tmsgSendRsp(&rspMsg);
×
392

393
  tFreeSVArbSetAssignedLeaderReq(&req);
×
394
  TAOS_RETURN(code);
×
395
}
396

397
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
×
398
  int32_t    code = 0;
×
399
  SSyncNode* pNode = syncNodeAcquire(rid);
×
400
  if (pNode == NULL) {
×
401
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
402
    if (terrno != 0) code = terrno;
×
403
    TAOS_RETURN(code);
×
404
  }
405

406
  SRpcMsg rpcMsg = {0, .info.notFreeAhandle = 1};
×
407
  int32_t ret = syncRespMgrGetAndDel(pNode->pSyncRespMgr, seq, &rpcMsg.info);
×
408
  rpcMsg.code = TSDB_CODE_SYN_TIMEOUT;
×
409

410
  syncNodeRelease(pNode);
×
411
  if (ret == 1) {
×
412
    sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle);
×
413
    code = rpcSendResponse(&rpcMsg);
×
414
    return code;
×
415
  } else {
416
    sError("no message handle to send timeout response, seq:%" PRId64, seq);
×
417
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
418
  }
419
}
420

421
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
45,708✔
422
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;
45,708✔
423

424
  if (pSyncNode->peersNum > 0) {
45,708!
425
    minMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
45,708✔
426
  }
427

428
  for (int32_t i = 1; i < pSyncNode->peersNum; ++i) {
90,938✔
429
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
45,230✔
430
    if (matchIndex < minMatchIndex) {
45,230✔
431
      minMatchIndex = matchIndex;
2,728✔
432
    }
433
  }
434
  return minMatchIndex;
45,708✔
435
}
436

437
static SyncIndex syncLogRetentionIndex(SSyncNode* pSyncNode, int64_t bytes) {
1,702✔
438
  return pSyncNode->pLogStore->syncLogIndexRetention(pSyncNode->pLogStore, bytes);
1,702✔
439
}
440

441
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
29,455✔
442
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
29,455✔
443
  int32_t    code = 0;
29,455✔
444
  if (pSyncNode == NULL) {
29,455✔
445
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
13✔
446
    if (terrno != 0) code = terrno;
13!
447
    sError("sync begin snapshot error");
13!
448
    TAOS_RETURN(code);
13✔
449
  }
450

451
  SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
29,442✔
452
  SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
29,442✔
453
  bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);
29,442✔
454

455
  if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
29,442!
456
    sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
84!
457
    syncNodeRelease(pSyncNode);
84✔
458
    return 0;
84✔
459
  }
460

461
  int64_t logRetention = 0;
29,358✔
462

463
  if (syncNodeIsMnode(pSyncNode)) {
29,358✔
464
    // mnode
465
    logRetention = tsMndLogRetention;
3,335✔
466
  } else {
467
    // vnode
468
    if (pSyncNode->replicaNum > 1) {
26,023✔
469
      logRetention = SYNC_VNODE_LOG_RETENTION;
1,399✔
470
    }
471
  }
472

473
  if (pSyncNode->totalReplicaNum > 1) {
29,358✔
474
    if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER &&
1,720✔
475
        pSyncNode->state != TAOS_SYNC_STATE_LEARNER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
140!
476
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate or unknown state, do not delete wal",
18!
477
              lastApplyIndex);
478
      syncNodeRelease(pSyncNode);
18✔
479
      return 0;
18✔
480
    }
481
    SyncIndex retentionIndex =
1,702✔
482
        TMAX(pSyncNode->minMatchIndex, syncLogRetentionIndex(pSyncNode, SYNC_WAL_LOG_RETENTION_SIZE));
1,702✔
483
    logRetention += TMAX(0, lastApplyIndex - retentionIndex);
1,702✔
484
  }
485

486
_DEL_WAL:
27,638✔
487

488
  do {
489
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
29,340✔
490
    SyncIndex          snapshotVer = walGetSnapshotVer(pData->pWal);
29,340✔
491
    SyncIndex          walCommitVer = walGetCommittedVer(pData->pWal);
29,340✔
492
    SyncIndex          wallastVer = walGetLastVer(pData->pWal);
29,340✔
493
    if (lastApplyIndex <= walCommitVer) {
29,340!
494
      SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);
29,340✔
495

496
      if (snapshottingIndex == SYNC_INDEX_INVALID) {
29,340!
497
        atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
29,340✔
498
        pSyncNode->snapshottingTime = taosGetTimestampMs();
29,340✔
499

500
        code = walBeginSnapshot(pData->pWal, lastApplyIndex, logRetention);
29,340✔
501
        if (code == 0) {
29,340!
502
          sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
29,340✔
503
                  pSyncNode->snapshottingIndex, lastApplyIndex);
504
        } else {
505
          sNError(pSyncNode, "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64,
×
506
                  terrstr(), pSyncNode->snapshottingIndex, lastApplyIndex);
507
          atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
×
508
        }
509

510
      } else {
511
        sNTrace(pSyncNode, "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64,
×
512
                snapshottingIndex, lastApplyIndex);
513
      }
514
    }
515
  } while (0);
516

517
  syncNodeRelease(pSyncNode);
29,340✔
518
  TAOS_RETURN(code);
29,339✔
519
}
520

521
int32_t syncEndSnapshot(int64_t rid) {
29,442✔
522
  int32_t    code = 0;
29,442✔
523
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
29,442✔
524
  if (pSyncNode == NULL) {
29,442!
525
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
526
    if (terrno != 0) code = terrno;
×
527
    sError("sync end snapshot error");
×
528
    TAOS_RETURN(code);
×
529
  }
530

531
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
29,442✔
532
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
29,340✔
533
    code = walEndSnapshot(pData->pWal);
29,340✔
534
    if (code != 0) {
29,340!
535
      sNError(pSyncNode, "wal snapshot end error since:%s", tstrerror(code));
×
536
      syncNodeRelease(pSyncNode);
×
537
      TAOS_RETURN(code);
×
538
    } else {
539
      sNTrace(pSyncNode, "wal snapshot end, index:%" PRId64, atomic_load_64(&pSyncNode->snapshottingIndex));
29,340✔
540
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
29,340✔
541
    }
542
  }
543

544
  syncNodeRelease(pSyncNode);
29,441✔
545
  TAOS_RETURN(code);
29,442✔
546
}
547

548
bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
3,282,048✔
549
  if (pSyncNode == NULL) {
3,282,048!
550
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
551
    sError("sync ready for read error");
×
552
    return false;
×
553
  }
554

555
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
3,282,048!
556
    terrno = TSDB_CODE_SYN_NOT_LEADER;
69,819✔
557
    return false;
69,817✔
558
  }
559

560
  if (!pSyncNode->restoreFinish) {
3,212,229✔
561
    terrno = TSDB_CODE_SYN_RESTORING;
20,998✔
562
    return false;
20,998✔
563
  }
564

565
  return true;
3,191,231✔
566
}
567

568
bool syncIsReadyForRead(int64_t rid) {
2,841,932✔
569
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
2,841,932✔
570
  if (pSyncNode == NULL) {
2,843,088!
571
    sError("sync ready for read error");
×
572
    return false;
×
573
  }
574

575
  bool ready = syncNodeIsReadyForRead(pSyncNode);
2,843,088✔
576

577
  syncNodeRelease(pSyncNode);
2,842,997✔
578
  return ready;
2,843,009✔
579
}
580

581
#ifdef BUILD_NO_CALL
582
bool syncSnapshotSending(int64_t rid) {
583
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
584
  if (pSyncNode == NULL) {
585
    return false;
586
  }
587

588
  bool b = syncNodeSnapshotSending(pSyncNode);
589
  syncNodeRelease(pSyncNode);
590
  return b;
591
}
592

593
bool syncSnapshotRecving(int64_t rid) {
594
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
595
  if (pSyncNode == NULL) {
596
    return false;
597
  }
598

599
  bool b = syncNodeSnapshotRecving(pSyncNode);
600
  syncNodeRelease(pSyncNode);
601
  return b;
602
}
603
#endif
604

605
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
14,227✔
606
  if (pSyncNode->peersNum == 0) {
14,227✔
607
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
10,825✔
608
    return 0;
10,825✔
609
  }
610

611
  int32_t ret = 0;
3,402✔
612
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->replicaNum > 1) {
3,402✔
613
    SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
1,021✔
614
    if (pSyncNode->peersNum == 2) {
1,021✔
615
      SyncIndex matchIndex0 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
709✔
616
      SyncIndex matchIndex1 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[1]));
709✔
617
      if (matchIndex1 > matchIndex0) {
708✔
618
        newLeader = (pSyncNode->peersNodeInfo)[1];
31✔
619
      }
620
    }
621
    ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
1,020✔
622
  }
623

624
  return ret;
3,402✔
625
}
626

627
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
1,020✔
628
  if (pSyncNode->replicaNum == 1) {
1,020!
629
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
×
630
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
631
  }
632

633
  sNTrace(pSyncNode, "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
1,020!
634

635
  SRpcMsg rpcMsg = {0};
1,020✔
636
  TAOS_CHECK_RETURN(syncBuildLeaderTransfer(&rpcMsg, pSyncNode->vgId));
1,020!
637

638
  SyncLeaderTransfer* pMsg = rpcMsg.pCont;
1,021✔
639
  pMsg->newLeaderId.addr = SYNC_ADDR(&newLeader);
1,021✔
640
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
1,021✔
641
  pMsg->newNodeInfo = newLeader;
1,021✔
642

643
  int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false, NULL);
1,021✔
644
  rpcFreeCont(rpcMsg.pCont);
1,021✔
645
  return ret;
1,021✔
646
}
647

648
SSyncState syncGetState(int64_t rid) {
1,991,518✔
649
  SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
1,991,518✔
650

651
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
1,991,518✔
652
  if (pSyncNode != NULL) {
1,991,739✔
653
    state.state = pSyncNode->state;
1,991,732✔
654
    state.roleTimeMs = pSyncNode->roleTimeMs;
1,991,732✔
655
    state.startTimeMs = pSyncNode->startTime;
1,991,732✔
656
    state.restored = pSyncNode->restoreFinish;
1,991,732✔
657
    if (pSyncNode->vgId != 1) {
1,991,732✔
658
      state.canRead = syncNodeIsReadyForRead(pSyncNode);
440,052✔
659
    } else {
660
      state.canRead = state.restored;
1,551,680✔
661
    }
662
    /*
663
    double progress = 0;
664
    if(pSyncNode->pLogBuf->totalIndex > 0 && pSyncNode->pLogBuf->commitIndex > 0){
665
      progress = (double)pSyncNode->pLogBuf->commitIndex/(double)pSyncNode->pLogBuf->totalIndex;
666
      state.progress = (int32_t)(progress * 100);
667
    }
668
    else{
669
      state.progress = -1;
670
    }
671
    sDebug("vgId:%d, learner progress state, commitIndex:%" PRId64 " totalIndex:%" PRId64 ", "
672
            "progress:%lf, progress:%d",
673
          pSyncNode->vgId,
674
         pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->totalIndex, progress, state.progress);
675
    */
676
    state.term = raftStoreGetTerm(pSyncNode);
1,991,729✔
677
    syncNodeRelease(pSyncNode);
1,991,738✔
678
  }
679

680
  return state;
1,991,732✔
681
}
682

683
int32_t syncGetArbToken(int64_t rid, char* outToken) {
11,686✔
684
  int32_t    code = 0;
11,686✔
685
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
11,686✔
686
  if (pSyncNode == NULL) {
11,686!
687
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
688
    if (terrno != 0) code = terrno;
×
689
    TAOS_RETURN(code);
×
690
  }
691

692
  memset(outToken, 0, TSDB_ARB_TOKEN_SIZE);
11,686✔
693
  (void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
11,686✔
694
  strncpy(outToken, pSyncNode->arbToken, TSDB_ARB_TOKEN_SIZE);
11,686✔
695
  (void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
11,686✔
696

697
  syncNodeRelease(pSyncNode);
11,686✔
698
  TAOS_RETURN(code);
11,686✔
699
}
700

701
int32_t syncGetAssignedLogSynced(int64_t rid) {
×
702
  int32_t    code = 0;
×
703
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
704
  if (pSyncNode == NULL) {
×
705
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
706
    if (terrno != 0) code = terrno;
×
707
    TAOS_RETURN(code);
×
708
  }
709

710
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
×
711
    code = TSDB_CODE_VND_ARB_NOT_SYNCED;
×
712
    syncNodeRelease(pSyncNode);
×
713
    TAOS_RETURN(code);
×
714
  }
715

716
  bool isSync = pSyncNode->commitIndex >= pSyncNode->assignedCommitIndex;
×
717
  code = (isSync ? TSDB_CODE_SUCCESS : TSDB_CODE_VND_ARB_NOT_SYNCED);
×
718

719
  syncNodeRelease(pSyncNode);
×
720
  TAOS_RETURN(code);
×
721
}
722

723
int32_t syncUpdateArbTerm(int64_t rid, SyncTerm arbTerm) {
18✔
724
  int32_t    code = 0;
18✔
725
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
18✔
726
  if (pSyncNode == NULL) {
18!
727
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
728
    if (terrno != 0) code = terrno;
×
729
    TAOS_RETURN(code);
×
730
  }
731

732
  pSyncNode->arbTerm = TMAX(arbTerm, pSyncNode->arbTerm);
18✔
733
  syncNodeRelease(pSyncNode);
18✔
734
  TAOS_RETURN(code);
18✔
735
}
736

737
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
834,504✔
738
  if (!(pSyncNode->raftCfg.configIndexCount >= 1)) {
834,504!
739
    sError("vgId:%d, failed get snapshot config index, configIndexCount:%d", pSyncNode->vgId,
×
740
           pSyncNode->raftCfg.configIndexCount);
741
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
742
    return -2;
×
743
  }
744
  SyncIndex lastIndex = (pSyncNode->raftCfg.configIndexArr)[0];
834,504✔
745

746
  for (int32_t i = 0; i < pSyncNode->raftCfg.configIndexCount; ++i) {
1,684,983✔
747
    if ((pSyncNode->raftCfg.configIndexArr)[i] > lastIndex &&
850,479✔
748
        (pSyncNode->raftCfg.configIndexArr)[i] <= snapshotLastApplyIndex) {
15,971!
749
      lastIndex = (pSyncNode->raftCfg.configIndexArr)[i];
15,971✔
750
    }
751
  }
752
  sTrace("vgId:%d, sync get last config index, index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId,
834,504✔
753
         snapshotLastApplyIndex, lastIndex);
754

755
  return lastIndex;
834,509✔
756
}
757

758
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
34,022✔
759
  pEpSet->numOfEps = 0;
34,022✔
760

761
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
34,022✔
762
  if (pSyncNode == NULL) return;
34,022!
763

764
  int j = 0;
34,022✔
765
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
131,106✔
766
    if (pSyncNode->raftCfg.cfg.nodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) continue;
97,084✔
767
    SEp* pEp = &pEpSet->eps[j];
96,080✔
768
    tstrncpy(pEp->fqdn, pSyncNode->raftCfg.cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
96,080✔
769
    pEp->port = (pSyncNode->raftCfg.cfg.nodeInfo)[i].nodePort;
96,080✔
770
    pEpSet->numOfEps++;
96,080✔
771
    sDebug("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port);
96,080✔
772
    j++;
96,080✔
773
  }
774
  if (pEpSet->numOfEps > 0) {
34,022!
775
    pEpSet->inUse = (pSyncNode->raftCfg.cfg.myIndex + 1) % pEpSet->numOfEps;
34,022✔
776
    // pEpSet->inUse = 0;
777
  }
778
  epsetSort(pEpSet);
34,022✔
779

780
  sInfo("vgId:%d, sync get retry epset numOfEps:%d inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, pEpSet->inUse);
34,022!
781
  syncNodeRelease(pSyncNode);
34,022✔
782
}
783

784
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
1,971,601✔
785
  int32_t    code = 0;
1,971,601✔
786
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
1,971,601✔
787
  if (pSyncNode == NULL) {
1,971,743!
788
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
789
    if (terrno != 0) code = terrno;
×
790
    sError("sync propose error");
×
791
    TAOS_RETURN(code);
×
792
  }
793

794
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak, seq);
1,971,743✔
795
  syncNodeRelease(pSyncNode);
1,971,459✔
796
  return ret;
1,971,732✔
797
}
798

799
int32_t syncCheckMember(int64_t rid) {
×
800
  int32_t    code = 0;
×
801
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
802
  if (pSyncNode == NULL) {
×
803
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
804
    if (terrno != 0) code = terrno;
×
805
    sError("sync propose error");
×
806
    TAOS_RETURN(code);
×
807
  }
808

809
  if (pSyncNode->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) {
×
810
    syncNodeRelease(pSyncNode);
×
811
    return TSDB_CODE_SYN_WRONG_ROLE;
×
812
  }
813

814
  syncNodeRelease(pSyncNode);
×
815
  return 0;
×
816
}
817

818
int32_t syncIsCatchUp(int64_t rid) {
4,828✔
819
  int32_t    code = 0;
4,828✔
820
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
4,828✔
821
  if (pSyncNode == NULL) {
4,828!
822
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
823
    if (terrno != 0) code = terrno;
×
824
    sError("sync Node Acquire error since %d", errno);
×
825
    TAOS_RETURN(code);
×
826
  }
827

828
  int32_t isCatchUp = 0;
4,828✔
829
  if (pSyncNode->pLogBuf->totalIndex < 0 || pSyncNode->pLogBuf->commitIndex < 0 ||
4,828!
830
      pSyncNode->pLogBuf->totalIndex < pSyncNode->pLogBuf->commitIndex ||
788!
831
      pSyncNode->pLogBuf->totalIndex - pSyncNode->pLogBuf->commitIndex > SYNC_LEARNER_CATCHUP) {
788✔
832
    sInfo("vgId:%d, Not catch up, wait one second, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64,
4,556!
833
          pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex,
834
          pSyncNode->pLogBuf->matchIndex);
835
    isCatchUp = 0;
4,556✔
836
  } else {
837
    sInfo("vgId:%d, Catch up, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, pSyncNode->vgId,
272!
838
          pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->matchIndex);
839
    isCatchUp = 1;
272✔
840
  }
841

842
  syncNodeRelease(pSyncNode);
4,828✔
843
  return isCatchUp;
4,828✔
844
}
845

846
ESyncRole syncGetRole(int64_t rid) {
4,828✔
847
  int32_t    code = 0;
4,828✔
848
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
4,828✔
849
  if (pSyncNode == NULL) {
4,828!
850
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
851
    if (terrno != 0) code = terrno;
×
852
    sError("sync Node Acquire error since %d", errno);
×
853
    TAOS_RETURN(code);
×
854
  }
855

856
  ESyncRole role = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole;
4,828✔
857

858
  syncNodeRelease(pSyncNode);
4,828✔
859
  return role;
4,828✔
860
}
861

862
int64_t syncGetTerm(int64_t rid) {
3,702✔
863
  int32_t    code = 0;
3,702✔
864
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
3,702✔
865
  if (pSyncNode == NULL) {
3,702!
866
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
867
    if (terrno != 0) code = terrno;
×
868
    sError("sync Node Acquire error since %d", errno);
×
869
    TAOS_RETURN(code);
×
870
  }
871

872
  int64_t term = raftStoreGetTerm(pSyncNode);
3,702✔
873

874
  syncNodeRelease(pSyncNode);
3,702✔
875
  return term;
3,702✔
876
}
877

878
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
1,972,587✔
879
  int32_t code = 0;
1,972,587✔
880
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
1,972,587!
881
    code = TSDB_CODE_SYN_NOT_LEADER;
7,489✔
882
    sNWarn(pSyncNode, "sync propose not leader, type:%s", TMSG_INFO(pMsg->msgType));
7,489!
883
    TAOS_RETURN(code);
7,489✔
884
  }
885

886
  if (!pSyncNode->restoreFinish) {
1,965,098✔
887
    code = TSDB_CODE_SYN_PROPOSE_NOT_READY;
1✔
888
    sNWarn(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
1!
889
           TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
890
    TAOS_RETURN(code);
1✔
891
  }
892

893
  // heartbeat timeout
894
  if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER && syncNodeHeartbeatReplyTimeout(pSyncNode)) {
1,965,097!
895
    code = TSDB_CODE_SYN_PROPOSE_NOT_READY;
×
896
    sNError(pSyncNode, "failed to sync propose since heartbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
×
897
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
898
    TAOS_RETURN(code);
×
899
  }
900

901
  // optimized one replica
902
  if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
1,965,083✔
903
    SyncIndex retIndex;
904
    int32_t   code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
1,729,924✔
905
    if (code >= 0) {
1,729,694!
906
      pMsg->info.conn.applyIndex = retIndex;
1,729,736✔
907
      pMsg->info.conn.applyTerm = raftStoreGetTerm(pSyncNode);
1,729,736✔
908

909
      // after raft member change, need to handle 1->2 switching point
910
      // at this point, need to switch entry handling thread
911
      if (pSyncNode->replicaNum == 1) {
1,729,930✔
912
        sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
1,729,922!
913
               TMSG_INFO(pMsg->msgType));
914
        return 1;
1,729,872✔
915
      } else {
916
        sTrace("vgId:%d, propose optimized msg, return to normal, index:%" PRId64
8!
917
               " type:%s, "
918
               "handle:%p",
919
               pSyncNode->vgId, retIndex, TMSG_INFO(pMsg->msgType), pMsg->info.handle);
920
        return 0;
×
921
      }
922
    } else {
923
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
924
      sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
×
925
             TMSG_INFO(pMsg->msgType));
926
      TAOS_RETURN(code);
×
927
    }
928
  } else {
929
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
235,331✔
930
    uint64_t  seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
235,335✔
931
    SRpcMsg   rpcMsg = {0};
235,339✔
932
    int32_t   code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
235,339✔
933
    if (code != 0) {
235,337!
934
      sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
×
935
      code = syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
×
936
      TAOS_RETURN(code);
×
937
    }
938

939
    sNTrace(pSyncNode, "propose msg, type:%s", TMSG_INFO(pMsg->msgType));
235,337!
940
    code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
235,337✔
941
    if (code != 0) {
235,337✔
942
      sWarn("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
1,373!
943
      TAOS_CHECK_RETURN(syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum));
1,374✔
944
    }
945

946
    if (seq != NULL) *seq = seqNum;
235,221✔
947
    TAOS_RETURN(code);
235,221✔
948
  }
949
}
950

951
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
238,230✔
952
  pSyncTimer->pTimer = NULL;
238,230✔
953
  pSyncTimer->counter = 0;
238,230✔
954
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
238,230✔
955
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
238,230✔
956
  pSyncTimer->destId = destId;
238,230✔
957
  pSyncTimer->timeStamp = taosGetTimestampMs();
238,230✔
958
  atomic_store_64(&pSyncTimer->logicClock, 0);
238,230✔
959
  return 0;
238,230✔
960
}
961

962
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
2,373✔
963
  int32_t code = 0;
2,373✔
964
  int64_t tsNow = taosGetTimestampMs();
2,373✔
965
  if (syncIsInit()) {
2,373!
966
    SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid);
2,373✔
967
    if (pData == NULL) {
2,373!
968
      pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
2,373✔
969
      pData->rid = syncHbTimerDataAdd(pData);
2,373✔
970
    }
971
    pSyncTimer->hbDataRid = pData->rid;
2,373✔
972
    pSyncTimer->timeStamp = tsNow;
2,373✔
973

974
    pData->syncNodeRid = pSyncNode->rid;
2,373✔
975
    pData->pTimer = pSyncTimer;
2,373✔
976
    pData->destId = pSyncTimer->destId;
2,373✔
977
    pData->logicClock = pSyncTimer->logicClock;
2,373✔
978
    pData->execTime = tsNow + pSyncTimer->timerMS;
2,373✔
979

980
    sTrace("vgId:%d, start hb timer, rid:%" PRId64 " addr:%" PRId64 " at %d", pSyncNode->vgId, pData->rid,
2,373!
981
           pData->destId.addr, pSyncTimer->timerMS);
982

983
    TAOS_CHECK_RETURN(taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, (void*)(pData->rid),
2,373!
984
                                   syncEnv()->pTimerManager, &pSyncTimer->pTimer));
985
  } else {
986
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
987
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
×
988
  }
989
  return code;
2,373✔
990
}
991

992
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
24,649✔
993
  int32_t ret = 0;
24,649✔
994
  (void)atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
24,649✔
995
  bool stop = taosTmrStop(pSyncTimer->pTimer);
24,650✔
996
  sDebug("vgId:%d, stop hb timer stop:%d", pSyncNode->vgId, stop);
24,651✔
997
  pSyncTimer->pTimer = NULL;
24,651✔
998
  syncHbTimerDataRemove(pSyncTimer->hbDataRid);
24,651✔
999
  pSyncTimer->hbDataRid = -1;
24,651✔
1000
  return ret;
24,651✔
1001
}
1002

1003
int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
14,228✔
1004
  int32_t code = 0;
14,228✔
1005
  if (pNode->pLogStore == NULL) {
14,228!
1006
    sError("vgId:%d, log store not created", pNode->vgId);
×
1007
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1008
  }
1009
  if (pNode->pFsm == NULL) {
14,228!
1010
    sError("vgId:%d, pFsm not registered", pNode->vgId);
×
1011
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1012
  }
1013
  if (pNode->pFsm->FpGetSnapshotInfo == NULL) {
14,228!
1014
    sError("vgId:%d, FpGetSnapshotInfo not registered", pNode->vgId);
×
1015
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1016
  }
1017
  SSnapshot snapshot = {0};
14,228✔
1018
  // TODO check return value
1019
  (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
14,228✔
1020

1021
  SyncIndex commitIndex = snapshot.lastApplyIndex;
14,228✔
1022
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
14,228✔
1023
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
14,228✔
1024
  if ((lastVer < commitIndex || firstVer > commitIndex + 1) || pNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
14,228!
1025
    if ((code = pNode->pLogStore->syncLogRestoreFromSnapshot(pNode->pLogStore, commitIndex)) != 0) {
1!
1026
      sError("vgId:%d, failed to restore log store from snapshot since %s. lastVer:%" PRId64 ", snapshotVer:%" PRId64,
×
1027
             pNode->vgId, terrstr(), lastVer, commitIndex);
1028
      TAOS_RETURN(code);
×
1029
    }
1030
  }
1031
  TAOS_RETURN(code);
14,228✔
1032
}
1033

1034
// open/close --------------
1035
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
14,224✔
1036
  int32_t    code = 0;
14,224✔
1037
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
14,224✔
1038
  if (pSyncNode == NULL) {
14,226!
1039
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1040
    goto _error;
×
1041
  }
1042

1043
  if (!taosDirExist((char*)(pSyncInfo->path))) {
14,226✔
1044
    if (taosMkDir(pSyncInfo->path) != 0) {
11,421!
1045
      terrno = TAOS_SYSTEM_ERROR(errno);
×
1046
      sError("vgId:%d, failed to create dir:%s since %s", pSyncInfo->vgId, pSyncInfo->path, terrstr());
×
1047
      goto _error;
×
1048
    }
1049
  }
1050

1051
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
14,226✔
1052
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
14,226✔
1053
           TD_DIRSEP);
1054
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
14,226✔
1055

1056
  if (!taosCheckExistFile(pSyncNode->configPath)) {
14,226✔
1057
    // create a new raft config file
1058
    sInfo("vgId:%d, create a new raft config file", pSyncInfo->vgId);
11,421!
1059
    pSyncNode->vgId = pSyncInfo->vgId;
11,421✔
1060
    pSyncNode->raftCfg.isStandBy = pSyncInfo->isStandBy;
11,421✔
1061
    pSyncNode->raftCfg.snapshotStrategy = pSyncInfo->snapshotStrategy;
11,421✔
1062
    pSyncNode->raftCfg.lastConfigIndex = pSyncInfo->syncCfg.lastIndex;
11,421✔
1063
    pSyncNode->raftCfg.batchSize = pSyncInfo->batchSize;
11,421✔
1064
    pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
11,421✔
1065
    pSyncNode->raftCfg.configIndexCount = 1;
11,421✔
1066
    pSyncNode->raftCfg.configIndexArr[0] = -1;
11,421✔
1067

1068
    if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
11,421!
1069
      terrno = code;
×
1070
      sError("vgId:%d, failed to create sync cfg file", pSyncNode->vgId);
×
1071
      goto _error;
×
1072
    }
1073
  } else {
1074
    // update syncCfg by raft_config.json
1075
    if ((code = syncReadCfgFile(pSyncNode)) != 0) {
2,807!
1076
      terrno = code;
×
1077
      sError("vgId:%d, failed to read sync cfg file", pSyncNode->vgId);
×
1078
      goto _error;
×
1079
    }
1080

1081
    if (vnodeVersion > pSyncNode->raftCfg.cfg.changeVersion) {
2,806✔
1082
      if (pSyncInfo->syncCfg.totalReplicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) {
1,814!
1083
        sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
1,505!
1084
        pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
1,505✔
1085
        if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
1,505!
1086
          terrno = code;
×
1087
          sError("vgId:%d, failed to write sync cfg file", pSyncNode->vgId);
×
1088
          goto _error;
×
1089
        }
1090
      } else {
1091
        sInfo("vgId:%d, use sync config from sync cfg file", pSyncNode->vgId);
309!
1092
        pSyncInfo->syncCfg = pSyncNode->raftCfg.cfg;
309✔
1093
      }
1094
    } else {
1095
      sInfo("vgId:%d, skip save sync cfg file since request ver:%d <= file ver:%d", pSyncNode->vgId, vnodeVersion,
992!
1096
            pSyncInfo->syncCfg.changeVersion);
1097
    }
1098
  }
1099

1100
  // init by SSyncInfo
1101
  pSyncNode->vgId = pSyncInfo->vgId;
14,228✔
1102
  SSyncCfg* pCfg = &pSyncNode->raftCfg.cfg;
14,228✔
1103
  bool      updated = false;
14,228✔
1104
  sInfo("vgId:%d, start to open sync node, totalReplicaNum:%d replicaNum:%d selfIndex:%d", pSyncNode->vgId,
14,228!
1105
        pCfg->totalReplicaNum, pCfg->replicaNum, pCfg->myIndex);
1106
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
35,228✔
1107
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
21,000✔
1108
    if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) {
21,000!
1109
      updated = true;
×
1110
    }
1111
    sInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort,
21,000!
1112
          pNode->nodeId, pNode->clusterId);
1113
  }
1114

1115
  if (vnodeVersion > pSyncInfo->syncCfg.changeVersion) {
14,228✔
1116
    if (updated) {
1,957!
1117
      sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId);
×
1118
      if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
×
1119
        terrno = code;
×
1120
        sError("vgId:%d, failed to write sync cfg file on dnode info updated", pSyncNode->vgId);
×
1121
        goto _error;
×
1122
      }
1123
    }
1124
  }
1125

1126
  pSyncNode->pWal = pSyncInfo->pWal;
14,228✔
1127
  pSyncNode->msgcb = pSyncInfo->msgcb;
14,228✔
1128
  pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
14,228✔
1129
  pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
14,228✔
1130
  pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
14,228✔
1131

1132
  // create raft log ring buffer
1133
  code = syncLogBufferCreate(&pSyncNode->pLogBuf);
14,228✔
1134
  if (pSyncNode->pLogBuf == NULL) {
14,226!
1135
    sError("failed to init sync log buffer since %s. vgId:%d", tstrerror(code), pSyncNode->vgId);
×
1136
    goto _error;
×
1137
  }
1138

1139
  // init replicaNum, replicasId
1140
  pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
14,226✔
1141
  pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
14,226✔
1142
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
35,226✔
1143
    if (syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]) ==
20,998!
1144
        false) {
1145
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
×
1146
      goto _error;
×
1147
    }
1148
  }
1149

1150
  // init internal
1151
  pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
14,228✔
1152
  pSyncNode->myRaftId = pSyncNode->replicasId[pSyncNode->raftCfg.cfg.myIndex];
14,228✔
1153

1154
  // init peersNum, peers, peersId
1155
  pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
14,228✔
1156
  int32_t j = 0;
14,228✔
1157
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
35,228✔
1158
    if (i != pSyncNode->raftCfg.cfg.myIndex) {
21,000✔
1159
      pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
6,772✔
1160
      pSyncNode->peersId[j] = pSyncNode->replicasId[i];
6,772✔
1161
      syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
6,772✔
1162
      j++;
6,772✔
1163
    }
1164
  }
1165

1166
  pSyncNode->arbTerm = -1;
14,228✔
1167
  (void)taosThreadMutexInit(&pSyncNode->arbTokenMutex, NULL);
14,228✔
1168
  syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncInfo->vgId, pSyncNode->arbToken);
14,228✔
1169
  sInfo("vgId:%d, generate arb token:%s", pSyncNode->vgId, pSyncNode->arbToken);
14,228!
1170

1171
  // init raft algorithm
1172
  pSyncNode->pFsm = pSyncInfo->pFsm;
14,228✔
1173
  pSyncInfo->pFsm = NULL;
14,228✔
1174
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
14,228✔
1175
  pSyncNode->leaderCache = EMPTY_RAFT_ID;
14,228✔
1176

1177
  // init life cycle outside
1178

1179
  // TLA+ Spec
1180
  // InitHistoryVars == /\ elections = {}
1181
  //                    /\ allLogs   = {}
1182
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
1183
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
1184
  //                   /\ state       = [i \in Server |-> Follower]
1185
  //                   /\ votedFor    = [i \in Server |-> Nil]
1186
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
1187
  //                      /\ votesGranted   = [i \in Server |-> {}]
1188
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
1189
  // \* leader does not send itself messages. It's still easier to include these
1190
  // \* in the functions.
1191
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
1192
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
1193
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
1194
  //                /\ commitIndex  = [i \in Server |-> 0]
1195
  // Init == /\ messages = [m \in {} |-> 0]
1196
  //         /\ InitHistoryVars
1197
  //         /\ InitServerVars
1198
  //         /\ InitCandidateVars
1199
  //         /\ InitLeaderVars
1200
  //         /\ InitLogVars
1201
  //
1202

1203
  // init TLA+ server vars
1204
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
14,228✔
1205
  pSyncNode->roleTimeMs = taosGetTimestampMs();
14,228✔
1206
  if ((code = raftStoreOpen(pSyncNode)) != 0) {
14,228!
1207
    terrno = code;
×
1208
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
×
1209
    goto _error;
×
1210
  }
1211

1212
  // init TLA+ candidate vars
1213
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
14,228✔
1214
  if (pSyncNode->pVotesGranted == NULL) {
14,228!
1215
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
×
1216
    goto _error;
×
1217
  }
1218
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
14,228✔
1219
  if (pSyncNode->pVotesRespond == NULL) {
14,228!
1220
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
×
1221
    goto _error;
×
1222
  }
1223

1224
  // init TLA+ leader vars
1225
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
14,228✔
1226
  if (pSyncNode->pNextIndex == NULL) {
14,228!
1227
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
×
1228
    goto _error;
×
1229
  }
1230
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
14,228✔
1231
  if (pSyncNode->pMatchIndex == NULL) {
14,228!
1232
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
×
1233
    goto _error;
×
1234
  }
1235

1236
  // init TLA+ log vars
1237
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
14,228✔
1238
  if (pSyncNode->pLogStore == NULL) {
14,226!
1239
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
×
1240
    goto _error;
×
1241
  }
1242

1243
  SyncIndex commitIndex = SYNC_INDEX_INVALID;
14,226✔
1244
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
14,226!
1245
    SSnapshot snapshot = {0};
14,227✔
1246
    // TODO check return value
1247
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
14,227✔
1248
    if (snapshot.lastApplyIndex > commitIndex) {
14,228✔
1249
      commitIndex = snapshot.lastApplyIndex;
1,492✔
1250
      sNTrace(pSyncNode, "reset commit index by snapshot");
1,492✔
1251
    }
1252
    pSyncNode->fsmState = snapshot.state;
14,228✔
1253
    if (pSyncNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
14,228!
1254
      sError("vgId:%d, fsm state is incomplete.", pSyncNode->vgId);
×
1255
      if (pSyncNode->replicaNum == 1) {
×
1256
        terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1257
        goto _error;
×
1258
      }
1259
    }
1260
  }
1261
  pSyncNode->commitIndex = commitIndex;
14,227✔
1262
  sInfo("vgId:%d, sync node commitIndex initialized as %" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
14,227!
1263

1264
  // restore log store on need
1265
  if ((code = syncNodeLogStoreRestoreOnNeed(pSyncNode)) < 0) {
14,228!
1266
    terrno = code;
×
1267
    sError("vgId:%d, failed to restore log store since %s.", pSyncNode->vgId, terrstr());
×
1268
    goto _error;
×
1269
  }
1270

1271
  // timer ms init
1272
  pSyncNode->pingBaseLine = PING_TIMER_MS;
14,228✔
1273
  pSyncNode->electBaseLine = tsElectInterval;
14,228✔
1274
  pSyncNode->hbBaseLine = tsHeartbeatInterval;
14,228✔
1275

1276
  // init ping timer
1277
  pSyncNode->pPingTimer = NULL;
14,228✔
1278
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
14,228✔
1279
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
14,228✔
1280
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
14,228✔
1281
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
14,228✔
1282
  pSyncNode->pingTimerCounter = 0;
14,228✔
1283

1284
  // init elect timer
1285
  pSyncNode->pElectTimer = NULL;
14,228✔
1286
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
14,228✔
1287
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
14,228✔
1288
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
14,228✔
1289
  pSyncNode->electTimerCounter = 0;
14,228✔
1290

1291
  // init heartbeat timer
1292
  pSyncNode->pHeartbeatTimer = NULL;
14,228✔
1293
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
14,228✔
1294
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
14,228✔
1295
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
14,228✔
1296
#ifdef BUILD_NO_CALL
1297
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
1298
#endif
1299
  pSyncNode->heartbeatTimerCounter = 0;
14,227✔
1300

1301
  // init peer heartbeat timer
1302
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
227,647✔
1303
    if ((code = syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i])) != 0) {
213,419!
1304
      errno = code;
×
1305
      goto _error;
×
1306
    }
1307
  }
1308

1309
  // tools
1310
  if ((code = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS, &pSyncNode->pSyncRespMgr)) != 0) {
14,228✔
1311
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
1!
1312
    goto _error;
×
1313
  }
1314
  if (pSyncNode->pSyncRespMgr == NULL) {
14,227!
1315
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
×
1316
    goto _error;
×
1317
  }
1318

1319
  // restore state
1320
  pSyncNode->restoreFinish = false;
14,227✔
1321

1322
  // snapshot senders
1323
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
227,509✔
1324
    SSyncSnapshotSender* pSender = NULL;
213,293✔
1325
    code = snapshotSenderCreate(pSyncNode, i, &pSender);
213,293✔
1326
    if (pSender == NULL) return NULL;
213,279!
1327

1328
    pSyncNode->senders[i] = pSender;
213,279✔
1329
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
213,279✔
1330
  }
1331

1332
  // snapshot receivers
1333
  code = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID, &pSyncNode->pNewNodeReceiver);
14,216✔
1334
  if (pSyncNode->pNewNodeReceiver == NULL) return NULL;
14,227!
1335
  sRDebug(pSyncNode->pNewNodeReceiver, "snapshot receiver create while open sync node, data:%p",
14,227✔
1336
          pSyncNode->pNewNodeReceiver);
1337

1338
  // is config changing
1339
  pSyncNode->changing = false;
14,227✔
1340

1341
  // replication mgr
1342
  if ((code = syncNodeLogReplInit(pSyncNode)) < 0) {
14,227!
1343
    terrno = code;
×
1344
    sError("vgId:%d, failed to init repl mgr since %s.", pSyncNode->vgId, terrstr());
×
1345
    goto _error;
×
1346
  }
1347

1348
  // peer state
1349
  if ((code = syncNodePeerStateInit(pSyncNode)) < 0) {
14,228!
1350
    terrno = code;
×
1351
    sError("vgId:%d, failed to init peer stat since %s.", pSyncNode->vgId, terrstr());
×
1352
    goto _error;
×
1353
  }
1354

1355
  //
1356
  // min match index
1357
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
14,228✔
1358

1359
  // start in syncNodeStart
1360
  // start raft
1361

1362
  int64_t timeNow = taosGetTimestampMs();
14,228✔
1363
  pSyncNode->startTime = timeNow;
14,228✔
1364
  pSyncNode->lastReplicateTime = timeNow;
14,228✔
1365

1366
  // snapshotting
1367
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
14,228✔
1368

1369
  // init log buffer
1370
  if ((code = syncLogBufferInit(pSyncNode->pLogBuf, pSyncNode)) < 0) {
14,228!
1371
    terrno = code;
×
1372
    sError("vgId:%d, failed to init sync log buffer since %s", pSyncNode->vgId, terrstr());
×
1373
    goto _error;
×
1374
  }
1375

1376
  pSyncNode->isStart = true;
14,228✔
1377
  pSyncNode->electNum = 0;
14,228✔
1378
  pSyncNode->becomeLeaderNum = 0;
14,228✔
1379
  pSyncNode->becomeAssignedLeaderNum = 0;
14,228✔
1380
  pSyncNode->configChangeNum = 0;
14,228✔
1381
  pSyncNode->hbSlowNum = 0;
14,228✔
1382
  pSyncNode->hbrSlowNum = 0;
14,228✔
1383
  pSyncNode->tmrRoutineNum = 0;
14,228✔
1384

1385
  sNInfo(pSyncNode, "sync node opened, node:%p electInterval:%d heartbeatInterval:%d heartbeatTimeout:%d", pSyncNode,
14,228!
1386
         tsElectInterval, tsHeartbeatInterval, tsHeartbeatTimeout);
1387
  return pSyncNode;
14,228✔
1388

1389
_error:
×
1390
  if (pSyncInfo->pFsm) {
×
1391
    taosMemoryFree(pSyncInfo->pFsm);
×
1392
    pSyncInfo->pFsm = NULL;
×
1393
  }
1394
  syncNodeClose(pSyncNode);
×
1395
  pSyncNode = NULL;
×
1396
  return NULL;
×
1397
}
1398

1399
#ifdef BUILD_NO_CALL
1400
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
1401
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
1402
    SSnapshot snapshot = {0};
1403
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
1404
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
1405
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
1406
    }
1407
  }
1408
}
1409
#endif
1410

1411
int32_t syncNodeRestore(SSyncNode* pSyncNode) {
14,227✔
1412
  int32_t code = 0;
14,227✔
1413
  if (pSyncNode->pLogStore == NULL) {
14,227!
1414
    sError("vgId:%d, log store not created", pSyncNode->vgId);
×
1415
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1416
  }
1417
  if (pSyncNode->pLogBuf == NULL) {
14,227!
1418
    sError("vgId:%d, ring log buffer not created", pSyncNode->vgId);
×
1419
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1420
  }
1421

1422
  (void)taosThreadMutexLock(&pSyncNode->pLogBuf->mutex);
14,227✔
1423
  SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
14,227✔
1424
  SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore);
14,227✔
1425
  SyncIndex endIndex = pSyncNode->pLogBuf->endIndex;
14,227✔
1426
  (void)taosThreadMutexUnlock(&pSyncNode->pLogBuf->mutex);
14,227✔
1427

1428
  if (lastVer != -1 && endIndex != lastVer + 1) {
14,227!
1429
    code = TSDB_CODE_WAL_LOG_INCOMPLETE;
×
1430
    sWarn("vgId:%d, failed to restore sync node since %s. expected lastLogIndex:%" PRId64 ", lastVer:%" PRId64 "",
×
1431
          pSyncNode->vgId, terrstr(), endIndex - 1, lastVer);
1432
    // TAOS_RETURN(code);
1433
  }
1434

1435
  // if (endIndex != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
1436
  pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);
14,227✔
1437
  sInfo("vgId:%d, restore sync until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
14,227!
1438

1439
  if (pSyncNode->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
28,454!
1440
      (code = syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex)) < 0) {
14,227✔
1441
    TAOS_RETURN(code);
×
1442
  }
1443

1444
  TAOS_RETURN(code);
14,227✔
1445
}
1446

1447
int32_t syncNodeStart(SSyncNode* pSyncNode) {
14,225✔
1448
  // start raft
1449
  sInfo("vgId:%d, begin to start sync node", pSyncNode->vgId);
14,225!
1450
  if (pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
14,227✔
1451
    syncNodeBecomeLearner(pSyncNode, "first start");
273✔
1452
  } else {
1453
    if (pSyncNode->replicaNum == 1) {
13,954✔
1454
      raftStoreNextTerm(pSyncNode);
10,971✔
1455
      syncNodeBecomeLeader(pSyncNode, "one replica start");
10,971✔
1456

1457
      // Raft 3.6.2 Committing entries from previous terms
1458
      TAOS_CHECK_RETURN(syncNodeAppendNoop(pSyncNode));
10,971!
1459
    } else {
1460
      syncNodeBecomeFollower(pSyncNode, "first start");
2,983✔
1461
    }
1462
  }
1463

1464
  int32_t ret = 0;
14,227✔
1465
  ret = syncNodeStartPingTimer(pSyncNode);
14,227✔
1466
  if (ret != 0) {
14,227!
1467
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, tstrerror(ret));
×
1468
  }
1469
  sInfo("vgId:%d, sync node started", pSyncNode->vgId);
14,227!
1470
  return ret;
14,227✔
1471
}
1472

1473
#ifdef BUILD_NO_CALL
1474
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
1475
  // state change
1476
  int32_t code = 0;
1477
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
1478
  pSyncNode->roleTimeMs = taosGetTimestampMs();
1479
  // TODO check return value
1480
  TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1481

1482
  // reset elect timer, long enough
1483
  int32_t electMS = TIMER_MAX_MS;
1484
  code = syncNodeRestartElectTimer(pSyncNode, electMS);
1485
  if (code < 0) {
1486
    sError("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, terrstr());
1487
    return -1;
1488
  }
1489

1490
  code = syncNodeStartPingTimer(pSyncNode);
1491
  if (code < 0) {
1492
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr());
1493
    return -1;
1494
  }
1495
  return code;
1496
}
1497
#endif
1498

1499
void syncNodePreClose(SSyncNode* pSyncNode) {
14,225✔
1500
  int32_t code = 0;
14,225✔
1501
  if (pSyncNode == NULL) {
14,225!
1502
    sError("failed to pre close sync node since sync node is null");
×
1503
    return;
×
1504
  }
1505
  if (pSyncNode->pFsm == NULL) {
14,225!
1506
    sError("failed to pre close sync node since fsm is null");
×
1507
    return;
×
1508
  }
1509
  if (pSyncNode->pFsm->FpApplyQueueItems == NULL) {
14,225!
1510
    sError("failed to pre close sync node since FpApplyQueueItems is null");
×
1511
    return;
×
1512
  }
1513

1514
  // stop elect timer
1515
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
14,225!
1516
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1517
    return;
×
1518
  }
1519

1520
  // stop heartbeat timer
1521
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
14,224!
1522
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
1523
    return;
×
1524
  }
1525

1526
  // stop ping timer
1527
  if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) {
14,225!
1528
    sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code));
×
1529
    return;
×
1530
  }
1531

1532
  // clean rsp
1533
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
14,227✔
1534
}
1535

1536
void syncNodePostClose(SSyncNode* pSyncNode) {
12,271✔
1537
  if (pSyncNode->pNewNodeReceiver != NULL) {
12,271!
1538
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
12,271!
1539
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
1540
    }
1541

1542
    sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
12,271✔
1543
           pSyncNode->pNewNodeReceiver);
1544
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
12,271✔
1545
    pSyncNode->pNewNodeReceiver = NULL;
12,271✔
1546
  }
1547
}
12,271✔
1548

1549
void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }
2,369✔
1550

1551
void syncNodeClose(SSyncNode* pSyncNode) {
14,227✔
1552
  int32_t code = 0;
14,227✔
1553
  if (pSyncNode == NULL) return;
14,227!
1554
  sNInfo(pSyncNode, "sync close, node:%p", pSyncNode);
14,227!
1555

1556
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
14,227✔
1557

1558
  if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) {
14,227!
1559
    sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code));
×
1560
    return;
×
1561
  }
1562
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
14,227!
1563
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1564
    return;
×
1565
  }
1566
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
14,227!
1567
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
1568
    return;
×
1569
  }
1570
  syncNodeLogReplDestroy(pSyncNode);
14,227✔
1571

1572
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
14,225✔
1573
  pSyncNode->pSyncRespMgr = NULL;
14,226✔
1574
  voteGrantedDestroy(pSyncNode->pVotesGranted);
14,226✔
1575
  pSyncNode->pVotesGranted = NULL;
14,227✔
1576
  votesRespondDestory(pSyncNode->pVotesRespond);
14,227✔
1577
  pSyncNode->pVotesRespond = NULL;
14,227✔
1578
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
14,227✔
1579
  pSyncNode->pNextIndex = NULL;
14,226✔
1580
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
14,226✔
1581
  pSyncNode->pMatchIndex = NULL;
14,226✔
1582
  logStoreDestory(pSyncNode->pLogStore);
14,226✔
1583
  pSyncNode->pLogStore = NULL;
14,227✔
1584
  syncLogBufferDestroy(pSyncNode->pLogBuf);
14,227✔
1585
  pSyncNode->pLogBuf = NULL;
14,227✔
1586

1587
  (void)taosThreadMutexDestroy(&pSyncNode->arbTokenMutex);
14,227✔
1588

1589
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
227,582✔
1590
    if (pSyncNode->senders[i] != NULL) {
213,355✔
1591
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", pSyncNode->vgId, pSyncNode->senders[i]);
213,352✔
1592

1593
      if (snapshotSenderIsStart(pSyncNode->senders[i])) {
213,353!
1594
        snapshotSenderStop(pSyncNode->senders[i], false);
×
1595
      }
1596

1597
      snapshotSenderDestroy(pSyncNode->senders[i]);
213,354✔
1598
      pSyncNode->senders[i] = NULL;
213,366✔
1599
    }
1600
  }
1601

1602
  if (pSyncNode->pNewNodeReceiver != NULL) {
14,227✔
1603
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
1,956!
1604
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
1605
    }
1606

1607
    sDebug("vgId:%d, snapshot receiver destroy while close, data:%p", pSyncNode->vgId, pSyncNode->pNewNodeReceiver);
1,956✔
1608
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
1,956✔
1609
    pSyncNode->pNewNodeReceiver = NULL;
1,956✔
1610
  }
1611

1612
  if (pSyncNode->pFsm != NULL) {
14,227!
1613
    taosMemoryFree(pSyncNode->pFsm);
14,227✔
1614
  }
1615

1616
  raftStoreClose(pSyncNode);
14,226✔
1617

1618
  taosMemoryFree(pSyncNode);
14,226✔
1619
}
1620

1621
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->raftCfg.snapshotStrategy; }
×
1622

1623
// timer control --------------
1624
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
14,227✔
1625
  int32_t code = 0;
14,227✔
1626
  if (syncIsInit()) {
14,227!
1627
    TAOS_CHECK_RETURN(taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, (void*)pSyncNode->rid,
14,227!
1628
                                   syncEnv()->pTimerManager, &pSyncNode->pPingTimer));
1629
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
14,227✔
1630
  } else {
1631
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
×
1632
  }
1633
  return code;
14,227✔
1634
}
1635

1636
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
28,452✔
1637
  int32_t code = 0;
28,452✔
1638
  (void)atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
28,452✔
1639
  bool stop = taosTmrStop(pSyncNode->pPingTimer);
28,453✔
1640
  sDebug("vgId:%d, stop ping timer, stop:%d", pSyncNode->vgId, stop);
28,454✔
1641
  pSyncNode->pPingTimer = NULL;
28,454✔
1642
  return code;
28,454✔
1643
}
1644

1645
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
383,297✔
1646
  int32_t code = 0;
383,297✔
1647
  if (syncIsInit()) {
383,297!
1648
    pSyncNode->electTimerMS = ms;
383,297✔
1649

1650
    int64_t execTime = taosGetTimestampMs() + ms;
383,297✔
1651
    atomic_store_64(&(pSyncNode->electTimerParam.executeTime), execTime);
383,297✔
1652
    atomic_store_64(&(pSyncNode->electTimerParam.logicClock), pSyncNode->electTimerLogicClock);
383,297✔
1653
    pSyncNode->electTimerParam.pSyncNode = pSyncNode;
383,297✔
1654
    pSyncNode->electTimerParam.pData = NULL;
383,297✔
1655

1656
    TAOS_CHECK_RETURN(taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid),
383,297!
1657
                                   syncEnv()->pTimerManager, &pSyncNode->pElectTimer));
1658
  } else {
1659
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
×
1660
  }
1661
  return code;
383,297✔
1662
}
1663

1664
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
423,711✔
1665
  int32_t code = 0;
423,711✔
1666
  (void)atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
423,711✔
1667
  bool stop = taosTmrStop(pSyncNode->pElectTimer);
423,710✔
1668
  sDebug("vgId:%d, stop elect timer, stop:%d", pSyncNode->vgId, stop);
423,709✔
1669
  pSyncNode->pElectTimer = NULL;
423,710✔
1670

1671
  return code;
423,710✔
1672
}
1673

1674
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
383,297✔
1675
  int32_t ret = 0;
383,297✔
1676
  TAOS_CHECK_RETURN(syncNodeStopElectTimer(pSyncNode));
383,297!
1677
  TAOS_CHECK_RETURN(syncNodeStartElectTimer(pSyncNode, ms));
383,297!
1678
  return ret;
383,297✔
1679
}
1680

1681
void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
383,296✔
1682
  int32_t code = 0;
383,296✔
1683
  int32_t electMS;
1684

1685
  if (pSyncNode->raftCfg.isStandBy) {
383,296!
1686
    electMS = TIMER_MAX_MS;
×
1687
  } else {
1688
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
383,296✔
1689
  }
1690

1691
  // TODO check return value
1692
  if ((code = syncNodeRestartElectTimer(pSyncNode, electMS)) != 0) {
383,297!
1693
    sError("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, terrstr());
×
1694
    return;
×
1695
  };
1696

1697
  sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
383,297!
1698
          electMS);
1699
}
1700

1701
#ifdef BUILD_NO_CALL
1702
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
1703
  int32_t code = 0;
1704
  if (syncIsInit()) {
1705
    TAOS_CHECK_RETURN(taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, (void*)pSyncNode->rid,
1706
                                   syncEnv()->pTimerManager, &pSyncNode->pHeartbeatTimer));
1707
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
1708
  } else {
1709
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1710
  }
1711

1712
  sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
1713
  return code;
1714
}
1715
#endif
1716

1717
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
13,616✔
1718
  int32_t ret = 0;
13,616✔
1719

1720
#if 0
1721
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1722
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
1723
#endif
1724

1725
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
15,989✔
1726
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
2,373✔
1727
    if (pSyncTimer != NULL) {
2,373!
1728
      TAOS_CHECK_RETURN(syncHbTimerStart(pSyncNode, pSyncTimer));
2,373!
1729
    }
1730
  }
1731

1732
  return ret;
13,616✔
1733
}
1734

1735
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
35,211✔
1736
  int32_t code = 0;
35,211✔
1737

1738
#if 0
1739
  TAOS_CHECK_RETURN(atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1));
1740
  bool stop = taosTmrStop(pSyncNode->pHeartbeatTimer);
1741
  sDebug("vgId:%d, stop heartbeat timer, stop:%d", pSyncNode->vgId, stop);
1742
  pSyncNode->pHeartbeatTimer = NULL;
1743
#endif
1744

1745
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
59,862✔
1746
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
24,649✔
1747
    if (pSyncTimer != NULL) {
24,649!
1748
      TAOS_CHECK_RETURN(syncHbTimerStop(pSyncNode, pSyncTimer));
24,649!
1749
    }
1750
  }
1751

1752
  return code;
35,213✔
1753
}
1754

1755
#ifdef BUILD_NO_CALL
1756
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
1757
  // TODO check return value
1758
  int32_t code = 0;
1759
  TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1760
  TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
1761
  return 0;
1762
}
1763
#endif
1764

1765
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pNode, SRpcMsg* pMsg) {
825,207✔
1766
  SEpSet* epSet = NULL;
825,207✔
1767
  for (int32_t i = 0; i < pNode->peersNum; ++i) {
1,244,944✔
1768
    if (destRaftId->addr == pNode->peersId[i].addr) {
1,244,904✔
1769
      epSet = &pNode->peersEpset[i];
825,167✔
1770
      break;
825,167✔
1771
    }
1772
  }
1773

1774
  int32_t code = -1;
825,207✔
1775
  if (pNode->syncSendMSg != NULL && epSet != NULL) {
825,207!
1776
    syncUtilMsgHtoN(pMsg->pCont);
825,171✔
1777
    pMsg->info.noResp = 1;
825,160✔
1778
    code = pNode->syncSendMSg(epSet, pMsg);
825,160✔
1779
  }
1780

1781
  if (code < 0) {
825,235✔
1782
    sError("vgId:%d, failed to send sync msg since %s. epset:%p dnode:%d addr:%" PRId64, pNode->vgId, tstrerror(code),
62!
1783
           epSet, DID(destRaftId), destRaftId->addr);
1784
    rpcFreeCont(pMsg->pCont);
62✔
1785
  }
1786

1787
  TAOS_RETURN(code);
825,235✔
1788
}
1789

1790
inline bool syncNodeInConfig(SSyncNode* pNode, const SSyncCfg* pCfg) {
2,392✔
1791
  bool b1 = false;
2,392✔
1792
  bool b2 = false;
2,392✔
1793

1794
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
2,817!
1795
    if (strcmp(pCfg->nodeInfo[i].nodeFqdn, pNode->myNodeInfo.nodeFqdn) == 0 &&
2,817!
1796
        pCfg->nodeInfo[i].nodePort == pNode->myNodeInfo.nodePort) {
2,817✔
1797
      b1 = true;
2,392✔
1798
      break;
2,392✔
1799
    }
1800
  }
1801

1802
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
2,817!
1803
    SRaftId raftId = {
2,817✔
1804
        .addr = SYNC_ADDR(&pCfg->nodeInfo[i]),
2,817✔
1805
        .vgId = pNode->vgId,
2,817✔
1806
    };
1807

1808
    if (syncUtilSameId(&raftId, &pNode->myRaftId)) {
2,817✔
1809
      b2 = true;
2,392✔
1810
      break;
2,392✔
1811
    }
1812
  }
1813

1814
  if (b1 != b2) {
2,392!
1815
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1816
    return false;
×
1817
  }
1818
  return b1;
2,392✔
1819
}
1820

1821
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
3,339✔
1822
  if (pOldCfg->totalReplicaNum != pNewCfg->totalReplicaNum) return true;
3,339✔
1823
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
2,281✔
1824
  for (int32_t i = 0; i < pOldCfg->totalReplicaNum; ++i) {
5,373✔
1825
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
3,818✔
1826
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
3,818✔
1827
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
3,818!
1828
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
3,818✔
1829
    if (pOldInfo->nodeRole != pNewInfo->nodeRole) return true;
3,816✔
1830
  }
1831

1832
  return false;
1,555✔
1833
}
1834

1835
int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
1,834✔
1836
  int32_t  code = 0;
1,834✔
1837
  SSyncCfg oldConfig = pSyncNode->raftCfg.cfg;
1,834✔
1838
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
1,834✔
1839
    sInfo("vgId:1, sync not reconfig since not changed");
1,555!
1840
    return 0;
1,555✔
1841
  }
1842

1843
  pSyncNode->raftCfg.cfg = *pNewConfig;
279✔
1844
  pSyncNode->raftCfg.lastConfigIndex = lastConfigChangeIndex;
279✔
1845

1846
  pSyncNode->configChangeNum++;
279✔
1847

1848
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
279✔
1849
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
279✔
1850

1851
  bool isDrop = false;
279✔
1852
  bool isAdd = false;
279✔
1853

1854
  if (IamInOld && !IamInNew) {
279!
1855
    isDrop = true;
×
1856
  } else {
1857
    isDrop = false;
279✔
1858
  }
1859

1860
  if (!IamInOld && IamInNew) {
279!
1861
    isAdd = true;
×
1862
  } else {
1863
    isAdd = false;
279✔
1864
  }
1865

1866
  // log begin config change
1867
  sNInfo(pSyncNode, "begin do config change, from %d to %d, from %" PRId64 " to %" PRId64 ", replicas:%d",
279!
1868
         pSyncNode->vgId, oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum, oldConfig.lastIndex,
1869
         pNewConfig->lastIndex);
1870

1871
  if (IamInNew) {
279!
1872
    pSyncNode->raftCfg.isStandBy = 0;  // change isStandBy to normal
279✔
1873
  }
1874
  if (isDrop) {
279!
1875
    pSyncNode->raftCfg.isStandBy = 1;  // set standby
×
1876
  }
1877

1878
  // add last config index
1879
  SRaftCfg* pCfg = &pSyncNode->raftCfg;
279✔
1880
  if (pCfg->configIndexCount >= MAX_CONFIG_INDEX_COUNT) {
279!
1881
    sNError(pSyncNode, "failed to add cfg index:%d since out of range", pCfg->configIndexCount);
×
1882
    terrno = TSDB_CODE_OUT_OF_RANGE;
×
1883
    return -1;
×
1884
  }
1885

1886
  pCfg->configIndexArr[pCfg->configIndexCount] = lastConfigChangeIndex;
279✔
1887
  pCfg->configIndexCount++;
279✔
1888

1889
  if (IamInNew) {
279!
1890
    //-----------------------------------------
1891
    int32_t ret = 0;
279✔
1892

1893
    // save snapshot senders
1894
    SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
1895
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
279✔
1896
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
1897
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
4,464✔
1898
      oldSenders[i] = pSyncNode->senders[i];
4,185✔
1899
      sSTrace(oldSenders[i], "snapshot sender save old");
4,185!
1900
    }
1901

1902
    // init internal
1903
    pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
279✔
1904
    if (syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId) == false) return terrno;
279!
1905

1906
    // init peersNum, peers, peersId
1907
    pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
279✔
1908
    int32_t j = 0;
279✔
1909
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
1,010✔
1910
      if (i != pSyncNode->raftCfg.cfg.myIndex) {
731✔
1911
        pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
452✔
1912
        syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
452✔
1913
        j++;
452✔
1914
      }
1915
    }
1916
    for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
731✔
1917
      if (syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]) == false)
452!
1918
        return terrno;
×
1919
    }
1920

1921
    // init replicaNum, replicasId
1922
    pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
279✔
1923
    pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
279✔
1924
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
1,010✔
1925
      if (syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]) ==
731!
1926
          false)
1927
        return terrno;
×
1928
    }
1929

1930
    // update quorum first
1931
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
279✔
1932

1933
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
279✔
1934
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
279✔
1935
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
279✔
1936
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
279✔
1937

1938
    // reset snapshot senders
1939

1940
    // clear new
1941
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
4,464✔
1942
      pSyncNode->senders[i] = NULL;
4,185✔
1943
    }
1944

1945
    // reset new
1946
    for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
1,010✔
1947
      // reset sender
1948
      bool reset = false;
731✔
1949
      for (int32_t j = 0; j < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++j) {
3,126✔
1950
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j]) && oldSenders[j] != NULL) {
2,994!
1951
          sNTrace(pSyncNode, "snapshot sender reset for:%" PRId64 ", newIndex:%d, dnode:%d, %p",
599!
1952
                  (pSyncNode->replicasId)[i].addr, i, DID(&pSyncNode->replicasId[i]), oldSenders[j]);
1953

1954
          pSyncNode->senders[i] = oldSenders[j];
599✔
1955
          oldSenders[j] = NULL;
599✔
1956
          reset = true;
599✔
1957

1958
          // reset replicaIndex
1959
          int32_t oldreplicaIndex = pSyncNode->senders[i]->replicaIndex;
599✔
1960
          pSyncNode->senders[i]->replicaIndex = i;
599✔
1961

1962
          sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, dnode:%d, %p, reset:%d",
599!
1963
                  oldreplicaIndex, i, DID(&pSyncNode->replicasId[i]), pSyncNode->senders[i], reset);
1964

1965
          break;
599✔
1966
        }
1967
      }
1968
    }
1969

1970
    // create new
1971
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
4,464✔
1972
      if (pSyncNode->senders[i] == NULL) {
4,185✔
1973
        TAOS_CHECK_RETURN(snapshotSenderCreate(pSyncNode, i, &pSyncNode->senders[i]));
3,586!
1974
        if (pSyncNode->senders[i] == NULL) {
3,586!
1975
          // will be created later while send snapshot
1976
          sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig");
×
1977
        } else {
1978
          sSDebug(pSyncNode->senders[i], "snapshot sender create while reconfig, data:%p", pSyncNode->senders[i]);
3,586!
1979
        }
1980
      } else {
1981
        sSDebug(pSyncNode->senders[i], "snapshot sender already exist, data:%p", pSyncNode->senders[i]);
599!
1982
      }
1983
    }
1984

1985
    // free old
1986
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
4,464✔
1987
      if (oldSenders[i] != NULL) {
4,185✔
1988
        sSDebug(oldSenders[i], "snapshot sender destroy old, data:%p replica-index:%d", oldSenders[i], i);
3,586!
1989
        snapshotSenderDestroy(oldSenders[i]);
3,586✔
1990
        oldSenders[i] = NULL;
3,586✔
1991
      }
1992
    }
1993

1994
    // persist cfg
1995
    TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode));
279!
1996
  } else {
1997
    // persist cfg
1998
    TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode));
×
1999
    sNInfo(pSyncNode, "do not config change from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
×
2000
  }
2001

2002
_END:
×
2003
  // log end config change
2004
  sNInfo(pSyncNode, "end do config change, from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
279!
2005
  return 0;
279✔
2006
}
2007

2008
// raft state change --------------
2009
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
1,802✔
2010
  if (term > raftStoreGetTerm(pSyncNode)) {
1,802!
2011
    raftStoreSetTerm(pSyncNode, term);
×
2012
  }
2013
}
1,802✔
2014

2015
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
334,257✔
2016
  SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
334,257✔
2017
  if (currentTerm > newTerm) {
334,260!
2018
    sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
×
2019
    return;
×
2020
  }
2021

2022
  do {
2023
    sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
334,260!
2024
  } while (0);
2025

2026
  if (pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
334,260!
2027
    (void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
×
2028
    syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncNode->vgId, pSyncNode->arbToken);
×
2029
    sInfo("vgId:%d, step down as assigned leader, new arbToken:%s", pSyncNode->vgId, pSyncNode->arbToken);
×
2030
    (void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
×
2031
  }
2032

2033
  if (currentTerm < newTerm) {
334,257✔
2034
    raftStoreSetTerm(pSyncNode, newTerm);
2,114✔
2035
    char tmpBuf[64];
2036
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
2,114✔
2037
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
2,114✔
2038
    raftStoreClearVote(pSyncNode);
2,114✔
2039
  } else {
2040
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
332,143✔
2041
      syncNodeBecomeFollower(pSyncNode, "step down");
3✔
2042
    }
2043
  }
2044
}
2045

2046
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }
5,106✔
2047

2048
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
5,106✔
2049
  int32_t code = 0;  // maybe clear leader cache
5,106✔
2050
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
5,106✔
2051
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
16✔
2052
  }
2053

2054
  pSyncNode->hbSlowNum = 0;
5,106✔
2055

2056
  // state change
2057
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
5,106✔
2058
  pSyncNode->roleTimeMs = taosGetTimestampMs();
5,106✔
2059
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
5,106!
2060
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2061
    return;
×
2062
  }
2063

2064
  // trace log
2065
  sNTrace(pSyncNode, "become follower %s", debugStr);
5,106!
2066

2067
  // send rsp to client
2068
  syncNodeLeaderChangeRsp(pSyncNode);
5,106✔
2069

2070
  // call back
2071
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
5,106!
2072
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
5,106✔
2073
  }
2074

2075
  // min match index
2076
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
5,105✔
2077

2078
  // reset log buffer
2079
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
5,105!
2080
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2081
    return;
×
2082
  }
2083

2084
  // reset elect timer
2085
  syncNodeResetElectTimer(pSyncNode);
5,106✔
2086

2087
  sInfo("vgId:%d, become follower. %s", pSyncNode->vgId, debugStr);
5,106!
2088
}
2089

2090
void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) {
273✔
2091
  pSyncNode->hbSlowNum = 0;
273✔
2092

2093
  // state change
2094
  pSyncNode->state = TAOS_SYNC_STATE_LEARNER;
273✔
2095
  pSyncNode->roleTimeMs = taosGetTimestampMs();
273✔
2096

2097
  // trace log
2098
  sNTrace(pSyncNode, "become learner %s", debugStr);
273!
2099

2100
  // call back
2101
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLearnerCb != NULL) {
273!
2102
    pSyncNode->pFsm->FpBecomeLearnerCb(pSyncNode->pFsm);
273✔
2103
  }
2104

2105
  // min match index
2106
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
273✔
2107

2108
  // reset log buffer
2109
  int32_t code = 0;
273✔
2110
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
273!
2111
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2112
    return;
×
2113
  };
2114
}
2115

2116
// TLA+ Spec
2117
// \* Candidate i transitions to leader.
2118
// BecomeLeader(i) ==
2119
//     /\ state[i] = Candidate
2120
//     /\ votesGranted[i] \in Quorum
2121
//     /\ state'      = [state EXCEPT ![i] = Leader]
2122
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
2123
//                          [j \in Server |-> Len(log[i]) + 1]]
2124
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
2125
//                          [j \in Server |-> 0]]
2126
//     /\ elections'  = elections \cup
2127
//                          {[eterm     |-> currentTerm[i],
2128
//                            eleader   |-> i,
2129
//                            elog      |-> log[i],
2130
//                            evotes    |-> votesGranted[i],
2131
//                            evoterLog |-> voterLog[i]]}
2132
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
2133
//
2134
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
11,963✔
2135
  int32_t code = 0;
11,963✔
2136
  pSyncNode->becomeLeaderNum++;
11,963✔
2137
  pSyncNode->hbrSlowNum = 0;
11,963✔
2138

2139
  // reset restoreFinish
2140
  pSyncNode->restoreFinish = false;
11,963✔
2141

2142
  // state change
2143
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
11,963✔
2144
  pSyncNode->roleTimeMs = taosGetTimestampMs();
11,963✔
2145

2146
  // set leader cache
2147
  pSyncNode->leaderCache = pSyncNode->myRaftId;
11,963✔
2148

2149
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
25,852✔
2150
    SyncIndex lastIndex;
2151
    SyncTerm  lastTerm;
2152
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
13,889✔
2153
    if (code != 0) {
13,889!
2154
      sError("vgId:%d, failed to become leader since %s", pSyncNode->vgId, tstrerror(code));
×
2155
      return;
×
2156
    }
2157
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
13,889✔
2158
  }
2159

2160
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
25,851✔
2161
    // maybe overwrite myself, no harm
2162
    // just do it!
2163
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
13,888✔
2164
  }
2165

2166
  // init peer mgr
2167
  if ((code = syncNodePeerStateInit(pSyncNode)) != 0) {
11,963!
2168
    sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code));
×
2169
    return;
×
2170
  }
2171

2172
#if 0
2173
  // update sender private term
2174
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
2175
  if (pMySender != NULL) {
2176
    for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
2177
      if (pSyncNode->senders[i]->privateTerm > pMySender->privateTerm) {
2178
        pMySender->privateTerm = pSyncNode->senders[i]->privateTerm;
2179
      }
2180
    }
2181
    (pMySender->privateTerm) += 100;
2182
  }
2183
#endif
2184

2185
  // close receiver
2186
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
11,962!
2187
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
2188
  }
2189

2190
  // stop elect timer
2191
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
11,963!
2192
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
2193
    return;
×
2194
  }
2195

2196
  // start heartbeat timer
2197
  if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) {
11,962!
2198
    sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2199
    return;
×
2200
  }
2201

2202
  // send heartbeat right now
2203
  if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) {
11,963!
2204
    sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code));
×
2205
    return;
×
2206
  }
2207

2208
  // call back
2209
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
11,962!
2210
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
11,962✔
2211
  }
2212

2213
  // min match index
2214
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
11,963✔
2215

2216
  // reset log buffer
2217
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
11,963!
2218
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2219
    return;
×
2220
  }
2221

2222
  // trace log
2223
  sNInfo(pSyncNode, "become leader %s", debugStr);
11,963!
2224
}
2225

2226
void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) {
×
2227
  int32_t code = 0;
×
2228
  pSyncNode->becomeAssignedLeaderNum++;
×
2229
  pSyncNode->hbrSlowNum = 0;
×
2230

2231
  // reset restoreFinish
2232
  // pSyncNode->restoreFinish = false;
2233

2234
  // state change
2235
  pSyncNode->state = TAOS_SYNC_STATE_ASSIGNED_LEADER;
×
2236
  pSyncNode->roleTimeMs = taosGetTimestampMs();
×
2237

2238
  // set leader cache
2239
  pSyncNode->leaderCache = pSyncNode->myRaftId;
×
2240

2241
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
×
2242
    SyncIndex lastIndex;
2243
    SyncTerm  lastTerm;
2244
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
×
2245
    if (code != 0) {
×
2246
      sError("vgId:%d, failed to become assigned leader since %s", pSyncNode->vgId, tstrerror(code));
×
2247
      return;
×
2248
    }
2249
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
×
2250
  }
2251

2252
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
×
2253
    // maybe overwrite myself, no harm
2254
    // just do it!
2255
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
×
2256
  }
2257

2258
  // init peer mgr
2259
  if ((code = syncNodePeerStateInit(pSyncNode)) != 0) {
×
2260
    sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code));
×
2261
    return;
×
2262
  }
2263

2264
  // close receiver
2265
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
×
2266
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
2267
  }
2268

2269
  // stop elect timer
2270
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
×
2271
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
2272
    return;
×
2273
  }
2274

2275
  // start heartbeat timer
2276
  if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) {
×
2277
    sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2278
    return;
×
2279
  }
2280

2281
  // send heartbeat right now
2282
  if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) {
×
2283
    sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code));
×
2284
    return;
×
2285
  }
2286

2287
  // call back
2288
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeAssignedLeaderCb != NULL) {
×
2289
    pSyncNode->pFsm->FpBecomeAssignedLeaderCb(pSyncNode->pFsm);
×
2290
  }
2291

2292
  // min match index
2293
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
×
2294

2295
  // reset log buffer
2296
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
×
2297
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2298
    return;
×
2299
  }
2300

2301
  // trace log
2302
  sNInfo(pSyncNode, "become assigned leader");
×
2303
}
2304

2305
void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
992✔
2306
  if (pSyncNode->state != TAOS_SYNC_STATE_CANDIDATE) {
992!
2307
    sError("vgId:%d, failed leader from candidate since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state);
×
2308
    return;
×
2309
  }
2310
  bool granted = voteGrantedMajority(pSyncNode->pVotesGranted);
992✔
2311
  if (!granted) {
992!
2312
    sError("vgId:%d, not granted by majority.", pSyncNode->vgId);
×
2313
    return;
×
2314
  }
2315
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");
992✔
2316

2317
  sNTrace(pSyncNode, "state change syncNodeCandidate2Leader");
992!
2318

2319
  int32_t ret = syncNodeAppendNoop(pSyncNode);
992✔
2320
  if (ret < 0) {
992!
2321
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, terrstr());
×
2322
  }
2323

2324
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
992✔
2325

2326
  sInfo("vgId:%d, become leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64 "", pSyncNode->vgId,
992!
2327
        raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
2328
}
2329

2330
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }
121,935✔
2331

2332
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
26,190✔
2333
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
419,024✔
2334
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
392,834✔
2335
    pSyncNode->peerStates[i].lastSendTime = 0;
392,834✔
2336
  }
2337

2338
  return 0;
26,190✔
2339
}
2340

2341
void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
1,164✔
2342
  if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
1,164!
2343
    sError("vgId:%d, failed candidate from follower since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state);
×
2344
    return;
×
2345
  }
2346
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
1,164✔
2347
  pSyncNode->roleTimeMs = taosGetTimestampMs();
1,164✔
2348
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
1,164✔
2349
  sInfo("vgId:%d, become candidate from follower. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
1,164!
2350
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
2351

2352
  sNTrace(pSyncNode, "follower to candidate");
1,164!
2353
}
2354

2355
int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) {
×
2356
  if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) return TSDB_CODE_SYN_INTERNAL_ERROR;
×
2357
  syncNodeBecomeLeader(pSyncNode, "assigned leader to leader");
×
2358

2359
  sNTrace(pSyncNode, "assigned leader to leader");
×
2360

2361
  int32_t ret = syncNodeAppendNoop(pSyncNode);
×
2362
  if (ret < 0) {
×
2363
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, tstrerror(ret));
×
2364
  }
2365

2366
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
×
2367
  sInfo("vgId:%d, become leader from assigned leader. term:%" PRId64 ", commit index:%" PRId64
×
2368
        "assigned commit index:%" PRId64 ", last index:%" PRId64,
2369
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, pSyncNode->assignedCommitIndex,
2370
        lastIndex);
2371
  return 0;
×
2372
}
2373

2374
// just called by syncNodeVoteForSelf
2375
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
1,257✔
2376
  SyncTerm storeTerm = raftStoreGetTerm(pSyncNode);
1,257✔
2377
  if (term != storeTerm) {
1,257!
2378
    sError("vgId:%d, failed to vote for term, term:%" PRId64 ", storeTerm:%" PRId64, pSyncNode->vgId, term, storeTerm);
×
2379
    return;
×
2380
  }
2381
  bool voted = raftStoreHasVoted(pSyncNode);
1,257✔
2382
  if (voted) {
1,257!
2383
    sError("vgId:%d, failed to vote for term since not voted", pSyncNode->vgId);
×
2384
    return;
×
2385
  }
2386

2387
  raftStoreVote(pSyncNode, pRaftId);
1,257✔
2388
}
2389

2390
// simulate get vote from outside
2391
void syncNodeVoteForSelf(SSyncNode* pSyncNode, SyncTerm currentTerm) {
1,257✔
2392
  syncNodeVoteForTerm(pSyncNode, currentTerm, &pSyncNode->myRaftId);
1,257✔
2393

2394
  SRpcMsg rpcMsg = {0};
1,257✔
2395
  int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
1,257✔
2396
  if (ret != 0) return;
1,257!
2397

2398
  SyncRequestVoteReply* pMsg = rpcMsg.pCont;
1,257✔
2399
  pMsg->srcId = pSyncNode->myRaftId;
1,257✔
2400
  pMsg->destId = pSyncNode->myRaftId;
1,257✔
2401
  pMsg->term = currentTerm;
1,257✔
2402
  pMsg->voteGranted = true;
1,257✔
2403

2404
  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
1,257✔
2405
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
1,257✔
2406
  rpcFreeCont(rpcMsg.pCont);
1,257✔
2407
}
2408

2409
// return if has a snapshot
2410
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
18,368✔
2411
  bool      ret = false;
18,368✔
2412
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
18,368✔
2413
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
18,368!
2414
    // TODO check return value
2415
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
18,368✔
2416
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
18,368✔
2417
      ret = true;
2,549✔
2418
    }
2419
  }
2420
  return ret;
18,368✔
2421
}
2422

2423
// return max(logLastIndex, snapshotLastIndex)
2424
// if no snapshot and log, return -1
2425
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
18,369✔
2426
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
18,369✔
2427
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
18,369!
2428
    // TODO check return value
2429
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
18,369✔
2430
  }
2431
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
18,369✔
2432

2433
  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
18,369✔
2434
  return lastIndex;
18,369✔
2435
}
2436

2437
// return the last term of snapshot and log
2438
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
2439
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
18,368✔
2440
  SyncTerm lastTerm = 0;
18,368✔
2441
  if (syncNodeHasSnapshot(pSyncNode)) {
18,368✔
2442
    // has snapshot
2443
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
2,549✔
2444
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
2,549!
2445
      // TODO check return value
2446
      (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
2,549✔
2447
    }
2448

2449
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
2,549✔
2450
    if (logLastIndex > snapshot.lastApplyIndex) {
2,549✔
2451
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
1,586✔
2452
    } else {
2453
      lastTerm = snapshot.lastApplyTerm;
963✔
2454
    }
2455

2456
  } else {
2457
    // no snapshot
2458
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
15,819✔
2459
  }
2460

2461
  return lastTerm;
18,368✔
2462
}
2463

2464
// get last index and term along with snapshot
2465
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
16,220✔
2466
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
16,220✔
2467
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
16,220✔
2468
  return 0;
16,220✔
2469
}
2470

2471
#ifdef BUILD_NO_CALL
2472
// return append-entries first try index
2473
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
2474
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
2475
  return syncStartIndex;
2476
}
2477

2478
// if index > 0, return index - 1
2479
// else, return -1
2480
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
2481
  SyncIndex preIndex = index - 1;
2482
  if (preIndex < SYNC_INDEX_INVALID) {
2483
    preIndex = SYNC_INDEX_INVALID;
2484
  }
2485

2486
  return preIndex;
2487
}
2488

2489
// if index < 0, return SYNC_TERM_INVALID
2490
// if index == 0, return 0
2491
// if index > 0, return preTerm
2492
// if error, return SYNC_TERM_INVALID
2493
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
2494
  if (index < SYNC_INDEX_BEGIN) {
2495
    return SYNC_TERM_INVALID;
2496
  }
2497

2498
  if (index == SYNC_INDEX_BEGIN) {
2499
    return 0;
2500
  }
2501

2502
  SyncTerm  preTerm = 0;
2503
  SyncIndex preIndex = index - 1;
2504

2505
  SSyncRaftEntry* pPreEntry = NULL;
2506
  SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
2507
  LRUHandle*      h = taosLRUCacheLookup(pCache, &preIndex, sizeof(preIndex));
2508
  int32_t         code = 0;
2509
  if (h) {
2510
    pPreEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
2511
    code = 0;
2512

2513
    pSyncNode->pLogStore->cacheHit++;
2514
    sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", preIndex, pPreEntry->bytes, pPreEntry);
2515

2516
  } else {
2517
    pSyncNode->pLogStore->cacheMiss++;
2518
    sNTrace(pSyncNode, "miss cache index:%" PRId64, preIndex);
2519

2520
    code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
2521
  }
2522

2523
  SSnapshot snapshot = {.data = NULL,
2524
                        .lastApplyIndex = SYNC_INDEX_INVALID,
2525
                        .lastApplyTerm = SYNC_TERM_INVALID,
2526
                        .lastConfigIndex = SYNC_INDEX_INVALID};
2527

2528
  if (code == 0) {
2529
    if (pPreEntry == NULL) return -1;
2530
    preTerm = pPreEntry->term;
2531

2532
    if (h) {
2533
      taosLRUCacheRelease(pCache, h, false);
2534
    } else {
2535
      syncEntryDestroy(pPreEntry);
2536
    }
2537

2538
    return preTerm;
2539
  } else {
2540
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
2541
      // TODO check return value
2542
      (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
2543
      if (snapshot.lastApplyIndex == preIndex) {
2544
        return snapshot.lastApplyTerm;
2545
      }
2546
    }
2547
  }
2548

2549
  sNError(pSyncNode, "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRId64,
2550
          index, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
2551
  return SYNC_TERM_INVALID;
2552
}
2553

2554
// get pre index and term of "index"
2555
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
2556
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
2557
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
2558
  return 0;
2559
}
2560
#endif
2561

2562
static void syncNodeEqPingTimer(void* param, void* tmrId) {
46,298✔
2563
  if (!syncIsInit()) return;
46,298!
2564

2565
  int64_t    rid = (int64_t)param;
46,298✔
2566
  SSyncNode* pNode = syncNodeAcquire(rid);
46,298✔
2567

2568
  if (pNode == NULL) return;
46,298!
2569

2570
  if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
46,298!
2571
    SRpcMsg rpcMsg = {0};
46,298✔
2572
    int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
46,298✔
2573
                                    pNode->pingTimerMS, pNode);
2574
    if (code != 0) {
46,298!
2575
      sError("failed to build ping msg");
×
2576
      rpcFreeCont(rpcMsg.pCont);
×
2577
      goto _out;
×
2578
    }
2579

2580
    // sTrace("enqueue ping msg");
2581
    code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
46,298✔
2582
    if (code != 0) {
46,298!
2583
      sError("failed to sync enqueue ping msg since %s", terrstr());
×
2584
      rpcFreeCont(rpcMsg.pCont);
×
2585
      goto _out;
×
2586
    }
2587

2588
  _out:
46,298✔
2589
    if ((code = taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
46,298!
2590
                             &pNode->pPingTimer)) != 0) {
2591
      sError("failed to reset ping timer since %s", tstrerror(code));
×
2592
    };
2593
  }
2594
  syncNodeRelease(pNode);
46,298✔
2595
}
2596

2597
static void syncNodeEqElectTimer(void* param, void* tmrId) {
1,261✔
2598
  if (!syncIsInit()) return;
1,262!
2599

2600
  int64_t    rid = (int64_t)param;
1,261✔
2601
  SSyncNode* pNode = syncNodeAcquire(rid);
1,261✔
2602

2603
  if (pNode == NULL) return;
1,261✔
2604

2605
  if (pNode->syncEqMsg == NULL) {
1,260!
2606
    syncNodeRelease(pNode);
×
2607
    return;
×
2608
  }
2609

2610
  int64_t tsNow = taosGetTimestampMs();
1,260✔
2611
  if (tsNow < pNode->electTimerParam.executeTime) {
1,260!
2612
    syncNodeRelease(pNode);
×
2613
    return;
×
2614
  }
2615

2616
  SRpcMsg rpcMsg = {0};
1,260✔
2617
  int32_t code =
2618
      syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerParam.logicClock, pNode->electTimerMS, pNode);
1,260✔
2619

2620
  if (code != 0) {
1,260!
2621
    sError("failed to build elect msg");
×
2622
    syncNodeRelease(pNode);
×
2623
    return;
×
2624
  }
2625

2626
  SyncTimeout* pTimeout = rpcMsg.pCont;
1,260✔
2627
  sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);
1,260!
2628

2629
  code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
1,260✔
2630
  if (code != 0) {
1,260!
2631
    sError("failed to sync enqueue elect msg since %s", terrstr());
×
2632
    rpcFreeCont(rpcMsg.pCont);
×
2633
    syncNodeRelease(pNode);
×
2634
    return;
×
2635
  }
2636

2637
  syncNodeRelease(pNode);
1,260✔
2638
}
2639

2640
#ifdef BUILD_NO_CALL
2641
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
2642
  if (!syncIsInit()) return;
2643

2644
  int64_t    rid = (int64_t)param;
2645
  SSyncNode* pNode = syncNodeAcquire(rid);
2646

2647
  if (pNode == NULL) return;
2648

2649
  if (pNode->totalReplicaNum > 1) {
2650
    if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
2651
      SRpcMsg rpcMsg = {0};
2652
      int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
2653
                                      pNode->heartbeatTimerMS, pNode);
2654

2655
      if (code != 0) {
2656
        sError("failed to build heartbeat msg");
2657
        goto _out;
2658
      }
2659

2660
      sTrace("vgId:%d, enqueue heartbeat timer", pNode->vgId);
2661
      code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
2662
      if (code != 0) {
2663
        sError("failed to enqueue heartbeat msg since %s", terrstr());
2664
        rpcFreeCont(rpcMsg.pCont);
2665
        goto _out;
2666
      }
2667

2668
    _out:
2669
      if (taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
2670
                       &pNode->pHeartbeatTimer) != 0)
2671
        return;
2672

2673
    } else {
2674
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
2675
             pNode->heartbeatTimerLogicClock, pNode->heartbeatTimerLogicClockUser);
2676
    }
2677
  }
2678
}
2679
#endif
2680

2681
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
43,636✔
2682
  int32_t code = 0;
43,636✔
2683
  int64_t hbDataRid = (int64_t)param;
43,636✔
2684
  int64_t tsNow = taosGetTimestampMs();
43,636✔
2685

2686
  SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid);
43,636✔
2687
  if (pData == NULL) {
43,636!
2688
    sError("hb timer get pData NULL, %" PRId64, hbDataRid);
×
2689
    return;
×
2690
  }
2691

2692
  SSyncNode* pSyncNode = syncNodeAcquire(pData->syncNodeRid);
43,636✔
2693
  if (pSyncNode == NULL) {
43,636✔
2694
    syncHbTimerDataRelease(pData);
3✔
2695
    sError("hb timer get pSyncNode NULL");
3!
2696
    return;
3✔
2697
  }
2698

2699
  SSyncTimer* pSyncTimer = pData->pTimer;
43,633✔
2700

2701
  if (!pSyncNode->isStart) {
43,633!
2702
    syncNodeRelease(pSyncNode);
×
2703
    syncHbTimerDataRelease(pData);
×
2704
    sError("vgId:%d, hb timer sync node already stop", pSyncNode->vgId);
×
2705
    return;
×
2706
  }
2707

2708
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
43,633!
2709
    syncNodeRelease(pSyncNode);
×
2710
    syncHbTimerDataRelease(pData);
×
2711
    sError("vgId:%d, hb timer sync node not leader", pSyncNode->vgId);
×
2712
    return;
×
2713
  }
2714

2715
  sTrace("vgId:%d, peer hb timer execution, rid:%" PRId64 " addr:%" PRId64, pSyncNode->vgId, hbDataRid,
43,633!
2716
         pData->destId.addr);
2717

2718
  if (pSyncNode->totalReplicaNum > 1) {
43,633✔
2719
    int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
43,631✔
2720
    int64_t msgLogicClock = atomic_load_64(&pData->logicClock);
43,631✔
2721

2722
    if (timerLogicClock == msgLogicClock) {
43,631✔
2723
      if (tsNow > pData->execTime) {
43,629✔
2724
        pData->execTime += pSyncTimer->timerMS;
43,609✔
2725

2726
        SRpcMsg rpcMsg = {0};
43,609✔
2727
        if ((code = syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId)) != 0) {
43,609!
2728
          sError("vgId:%d, failed to build heartbeat msg since %s", pSyncNode->vgId, tstrerror(code));
×
2729
          syncNodeRelease(pSyncNode);
×
2730
          syncHbTimerDataRelease(pData);
×
2731
          return;
×
2732
        }
2733

2734
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);
43,609✔
2735

2736
        SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
43,609✔
2737
        pSyncMsg->srcId = pSyncNode->myRaftId;
43,609✔
2738
        pSyncMsg->destId = pData->destId;
43,609✔
2739
        pSyncMsg->term = raftStoreGetTerm(pSyncNode);
43,609✔
2740
        pSyncMsg->commitIndex = pSyncNode->commitIndex;
43,609✔
2741
        pSyncMsg->minMatchIndex = pSyncNode->minMatchIndex;
43,609✔
2742
        pSyncMsg->privateTerm = 0;
43,609✔
2743
        pSyncMsg->timeStamp = tsNow;
43,609✔
2744

2745
        // update reset time
2746
        int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
43,609✔
2747
        pSyncTimer->timeStamp = tsNow;
43,609✔
2748

2749
        // send msg
2750
        TRACE_SET_MSGID(&(rpcMsg.info.traceId), tGenIdPI64());
43,609✔
2751
        STraceId* trace = &(rpcMsg.info.traceId);
43,609✔
2752
        sGTrace("vgId:%d, send sync-heartbeat to dnode:%d", pSyncNode->vgId, DID(&(pSyncMsg->destId)));
43,609!
2753
        syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime);
43,609✔
2754
        int ret = syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
43,609✔
2755
        if (ret != 0) {
43,609✔
2756
          sError("vgId:%d, failed to send heartbeat since %s", pSyncNode->vgId, tstrerror(ret));
62!
2757
        }
2758
      }
2759

2760
      if (syncIsInit()) {
43,629!
2761
        sTrace("vgId:%d, reset peer hb timer at %d", pSyncNode->vgId, pSyncTimer->timerMS);
43,629!
2762
        if ((code = taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, (void*)hbDataRid,
43,629!
2763
                                 syncEnv()->pTimerManager, &pSyncTimer->pTimer)) != 0) {
43,629✔
2764
          sError("vgId:%d, reset peer hb timer error, %s", pSyncNode->vgId, tstrerror(code));
×
2765
          syncNodeRelease(pSyncNode);
×
2766
          syncHbTimerDataRelease(pData);
×
2767
          return;
×
2768
        }
2769
      } else {
2770
        sError("sync env is stop, reset peer hb timer error");
×
2771
      }
2772

2773
    } else {
2774
      sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", pSyncNode->vgId,
2!
2775
             timerLogicClock, msgLogicClock);
2776
    }
2777
  }
2778

2779
  syncHbTimerDataRelease(pData);
43,633✔
2780
  syncNodeRelease(pSyncNode);
43,633✔
2781
}
2782

2783
#ifdef BUILD_NO_CALL
2784
static void deleteCacheEntry(const void* key, size_t keyLen, void* value, void* ud) {
2785
  (void)ud;
2786
  taosMemoryFree(value);
2787
}
2788

2789
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
2790
  SSyncLogStoreData* pData = pLogStore->data;
2791
  sNTrace(pData->pSyncNode, "in cache index:%" PRId64 ", bytes:%u, %p", pEntry->index, pEntry->bytes, pEntry);
2792

2793
  int32_t   code = 0;
2794
  int32_t   entryLen = sizeof(*pEntry) + pEntry->dataLen;
2795
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
2796
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW, NULL);
2797
  if (status != TAOS_LRU_STATUS_OK) {
2798
    code = -1;
2799
  }
2800

2801
  return code;
2802
}
2803
#endif
2804

2805
void syncBuildConfigFromReq(SAlterVnodeReplicaReq* pReq, SSyncCfg* cfg) {  // TODO SAlterVnodeReplicaReq name is proper?
×
2806
  cfg->replicaNum = 0;
×
2807
  cfg->totalReplicaNum = 0;
×
2808
  int32_t code = 0;
×
2809

2810
  for (int i = 0; i < pReq->replica; ++i) {
×
2811
    SNodeInfo* pNode = &cfg->nodeInfo[i];
×
2812
    pNode->nodeId = pReq->replicas[i].id;
×
2813
    pNode->nodePort = pReq->replicas[i].port;
×
2814
    tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
×
2815
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2816
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
×
2817
    sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d, update:%d", pReq->vgId, i, pNode->nodeFqdn,
×
2818
          pNode->nodePort, pNode->nodeId, pNode->nodeRole, update);
2819
    cfg->replicaNum++;
×
2820
  }
2821
  if (pReq->selfIndex != -1) {
×
2822
    cfg->myIndex = pReq->selfIndex;
×
2823
  }
2824
  for (int i = cfg->replicaNum; i < pReq->replica + pReq->learnerReplica; ++i) {
×
2825
    SNodeInfo* pNode = &cfg->nodeInfo[i];
×
2826
    pNode->nodeId = pReq->learnerReplicas[cfg->totalReplicaNum].id;
×
2827
    pNode->nodePort = pReq->learnerReplicas[cfg->totalReplicaNum].port;
×
2828
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
2829
    tstrncpy(pNode->nodeFqdn, pReq->learnerReplicas[cfg->totalReplicaNum].fqdn, sizeof(pNode->nodeFqdn));
×
2830
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
×
2831
    sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d, update:%d", pReq->vgId, i, pNode->nodeFqdn,
×
2832
          pNode->nodePort, pNode->nodeId, pNode->nodeRole, update);
2833
    cfg->totalReplicaNum++;
×
2834
  }
2835
  cfg->totalReplicaNum += pReq->replica;
×
2836
  if (pReq->learnerSelfIndex != -1) {
×
2837
    cfg->myIndex = pReq->replica + pReq->learnerSelfIndex;
×
2838
  }
2839
  cfg->changeVersion = pReq->changeVersion;
×
2840
}
×
2841

2842
int32_t syncNodeCheckChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry) {
×
2843
  int32_t code = 0;
×
2844
  if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) {
×
2845
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
2846
  }
2847

2848
  SMsgHead* head = (SMsgHead*)pEntry->data;
×
2849
  void*     pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
×
2850

2851
  SAlterVnodeTypeReq req = {0};
×
2852
  if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
×
2853
    code = TSDB_CODE_INVALID_MSG;
×
2854
    TAOS_RETURN(code);
×
2855
  }
2856

2857
  SSyncCfg cfg = {0};
×
2858
  syncBuildConfigFromReq(&req, &cfg);
×
2859

2860
  if (cfg.totalReplicaNum >= 1 &&
×
2861
      (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
×
2862
    bool incfg = false;
×
2863
    for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
2864
      if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
2865
          ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
2866
        incfg = true;
×
2867
        break;
×
2868
      }
2869
    }
2870

2871
    if (!incfg) {
×
2872
      SyncTerm currentTerm = raftStoreGetTerm(ths);
×
2873
      syncNodeStepDown(ths, currentTerm);
×
2874
      return 1;
×
2875
    }
2876
  }
2877
  return 0;
×
2878
}
2879

2880
void syncNodeLogConfigInfo(SSyncNode* ths, SSyncCfg* cfg, char* str) {
×
2881
  sInfo("vgId:%d, %s. SyncNode, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64
×
2882
        ", changeVersion:%d, "
2883
        "restoreFinish:%d",
2884
        ths->vgId, str, ths->replicaNum, ths->peersNum, ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion,
2885
        ths->restoreFinish);
2886

2887
  sInfo("vgId:%d, %s, myNodeInfo, clusterId:%" PRId64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str,
×
2888
        ths->myNodeInfo.clusterId, ths->myNodeInfo.nodeId, ths->myNodeInfo.nodeFqdn, ths->myNodeInfo.nodePort,
2889
        ths->myNodeInfo.nodeRole);
2890

2891
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
2892
    sInfo("vgId:%d, %s, peersNodeInfo%d, clusterId:%" PRId64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str,
×
2893
          i, ths->peersNodeInfo[i].clusterId, ths->peersNodeInfo[i].nodeId, ths->peersNodeInfo[i].nodeFqdn,
2894
          ths->peersNodeInfo[i].nodePort, ths->peersNodeInfo[i].nodeRole);
2895
  }
2896

2897
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
2898
    char    buf[256];
2899
    int32_t len = 256;
×
2900
    int32_t n = 0;
×
2901
    n += tsnprintf(buf + n, len - n, "%s", "{");
×
2902
    for (int i = 0; i < ths->peersEpset->numOfEps; i++) {
×
2903
      n += tsnprintf(buf + n, len - n, "%s:%d%s", ths->peersEpset->eps[i].fqdn, ths->peersEpset->eps[i].port,
×
2904
                    (i + 1 < ths->peersEpset->numOfEps ? ", " : ""));
×
2905
    }
2906
    n += tsnprintf(buf + n, len - n, "%s", "}");
×
2907

2908
    sInfo("vgId:%d, %s, peersEpset%d, %s, inUse:%d", ths->vgId, str, i, buf, ths->peersEpset->inUse);
×
2909
  }
2910

2911
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
2912
    sInfo("vgId:%d, %s, peersId%d, addr:%" PRId64, ths->vgId, str, i, ths->peersId[i].addr);
×
2913
  }
2914

2915
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
2916
    sInfo("vgId:%d, %s, nodeInfo%d, clusterId:%" PRId64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str, i,
×
2917
          ths->raftCfg.cfg.nodeInfo[i].clusterId, ths->raftCfg.cfg.nodeInfo[i].nodeId,
2918
          ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, ths->raftCfg.cfg.nodeInfo[i].nodePort,
2919
          ths->raftCfg.cfg.nodeInfo[i].nodeRole);
2920
  }
2921

2922
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
2923
    sInfo("vgId:%d, %s, replicasId%d, addr:%" PRId64, ths->vgId, str, i, ths->replicasId[i].addr);
×
2924
  }
2925
}
×
2926

2927
int32_t syncNodeRebuildPeerAndCfg(SSyncNode* ths, SSyncCfg* cfg) {
×
2928
  int32_t i = 0;
×
2929

2930
  // change peersNodeInfo
2931
  i = 0;
×
2932
  for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
2933
    if (!(strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
2934
          ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)) {
×
2935
      ths->peersNodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
×
2936
      ths->peersNodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
×
2937
      tstrncpy(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
×
2938
      ths->peersNodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
×
2939
      ths->peersNodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
×
2940

2941
      syncUtilNodeInfo2EpSet(&ths->peersNodeInfo[i], &ths->peersEpset[i]);
×
2942

2943
      if (syncUtilNodeInfo2RaftId(&ths->peersNodeInfo[i], ths->vgId, &ths->peersId[i]) == false) {
×
2944
        sError("vgId:%d, failed to determine raft member id, peer:%d", ths->vgId, i);
×
2945
        return terrno;
×
2946
      }
2947

2948
      i++;
×
2949
    }
2950
  }
2951
  ths->peersNum = i;
×
2952

2953
  // change cfg nodeInfo
2954
  ths->raftCfg.cfg.replicaNum = 0;
×
2955
  i = 0;
×
2956
  for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
2957
    if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
2958
      ths->raftCfg.cfg.replicaNum++;
×
2959
    }
2960
    ths->raftCfg.cfg.nodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
×
2961
    ths->raftCfg.cfg.nodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
×
2962
    tstrncpy(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
×
2963
    ths->raftCfg.cfg.nodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
×
2964
    ths->raftCfg.cfg.nodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
×
2965
    if ((strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
2966
         ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)) {
×
2967
      ths->raftCfg.cfg.myIndex = i;
×
2968
    }
2969
    i++;
×
2970
  }
2971
  ths->raftCfg.cfg.totalReplicaNum = i;
×
2972

2973
  return 0;
×
2974
}
2975

2976
void syncNodeChangePeerAndCfgToVoter(SSyncNode* ths, SSyncCfg* cfg) {
×
2977
  // change peersNodeInfo
2978
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
2979
    for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
2980
      if (strcmp(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
2981
          ths->peersNodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort) {
×
2982
        if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
2983
          ths->peersNodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2984
        }
2985
      }
2986
    }
2987
  }
2988

2989
  // change cfg nodeInfo
2990
  ths->raftCfg.cfg.replicaNum = 0;
×
2991
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
2992
    for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
2993
      if (strcmp(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
2994
          ths->raftCfg.cfg.nodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort) {
×
2995
        if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
2996
          ths->raftCfg.cfg.nodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2997
          ths->raftCfg.cfg.replicaNum++;
×
2998
        }
2999
      }
3000
    }
3001
  }
3002
}
×
3003

3004
int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum) {
×
3005
  int32_t code = 0;
×
3006
  // 1.rebuild replicasId, remove deleted one
3007
  SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
3008
  memcpy(oldReplicasId, ths->replicasId, sizeof(oldReplicasId));
×
3009

3010
  ths->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3011
  ths->totalReplicaNum = ths->raftCfg.cfg.totalReplicaNum;
×
3012
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3013
    if (syncUtilNodeInfo2RaftId(&ths->raftCfg.cfg.nodeInfo[i], ths->vgId, &ths->replicasId[i]) == false) return terrno;
×
3014
  }
3015

3016
  // 2.rebuild MatchIndex, remove deleted one
3017
  SSyncIndexMgr* oldIndex = ths->pMatchIndex;
×
3018

3019
  ths->pMatchIndex = syncIndexMgrCreate(ths);
×
3020
  if (ths->pMatchIndex == NULL) {
×
3021
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3022
    if (terrno != 0) code = terrno;
×
3023
    TAOS_RETURN(code);
×
3024
  }
3025

3026
  syncIndexMgrCopyIfExist(ths->pMatchIndex, oldIndex, oldReplicasId);
×
3027

3028
  syncIndexMgrDestroy(oldIndex);
×
3029

3030
  // 3.rebuild NextIndex, remove deleted one
3031
  SSyncIndexMgr* oldNextIndex = ths->pNextIndex;
×
3032

3033
  ths->pNextIndex = syncIndexMgrCreate(ths);
×
3034
  if (ths->pNextIndex == NULL) {
×
3035
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3036
    if (terrno != 0) code = terrno;
×
3037
    TAOS_RETURN(code);
×
3038
  }
3039

3040
  syncIndexMgrCopyIfExist(ths->pNextIndex, oldNextIndex, oldReplicasId);
×
3041

3042
  syncIndexMgrDestroy(oldNextIndex);
×
3043

3044
  // 4.rebuild pVotesGranted, pVotesRespond, no need to keep old vote state, only rebuild
3045
  voteGrantedUpdate(ths->pVotesGranted, ths);
×
3046
  votesRespondUpdate(ths->pVotesRespond, ths);
×
3047

3048
  // 5.rebuild logReplMgr
3049
  for (int i = 0; i < oldtotalReplicaNum; ++i) {
×
3050
    sDebug("vgId:%d, old logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId,
×
3051
           i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
3052
           ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
3053
  }
3054

3055
  SSyncLogReplMgr* oldLogReplMgrs = NULL;
×
3056
  int64_t          length = sizeof(SSyncLogReplMgr) * (TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA);
×
3057
  oldLogReplMgrs = taosMemoryMalloc(length);
×
3058
  if (NULL == oldLogReplMgrs) return terrno;
×
3059
  memset(oldLogReplMgrs, 0, length);
×
3060

3061
  for (int i = 0; i < oldtotalReplicaNum; i++) {
×
3062
    oldLogReplMgrs[i] = *(ths->logReplMgrs[i]);
×
3063
  }
3064

3065
  syncNodeLogReplDestroy(ths);
×
3066
  if ((code = syncNodeLogReplInit(ths)) != 0) {
×
3067
    taosMemoryFree(oldLogReplMgrs);
×
3068
    TAOS_RETURN(code);
×
3069
  }
3070

3071
  for (int i = 0; i < ths->totalReplicaNum; ++i) {
×
3072
    for (int j = 0; j < oldtotalReplicaNum; j++) {
×
3073
      if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
×
3074
        *(ths->logReplMgrs[i]) = oldLogReplMgrs[j];
×
3075
        ths->logReplMgrs[i]->peerId = i;
×
3076
      }
3077
    }
3078
  }
3079

3080
  for (int i = 0; i < ths->totalReplicaNum; ++i) {
×
3081
    sDebug("vgId:%d, new logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId,
×
3082
           i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
3083
           ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
3084
  }
3085

3086
  // 6.rebuild sender
3087
  for (int i = 0; i < oldtotalReplicaNum; ++i) {
×
3088
    sDebug("vgId:%d, old sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i,
×
3089
           ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
3090
  }
3091

3092
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3093
    if (ths->senders[i] != NULL) {
×
3094
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", ths->vgId, ths->senders[i]);
×
3095

3096
      if (snapshotSenderIsStart(ths->senders[i])) {
×
3097
        snapshotSenderStop(ths->senders[i], false);
×
3098
      }
3099

3100
      snapshotSenderDestroy(ths->senders[i]);
×
3101
      ths->senders[i] = NULL;
×
3102
    }
3103
  }
3104

3105
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3106
    SSyncSnapshotSender* pSender = NULL;
×
3107
    int32_t              code = snapshotSenderCreate(ths, i, &pSender);
×
3108
    if (pSender == NULL) return terrno = code;
×
3109

3110
    ths->senders[i] = pSender;
×
3111
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
×
3112
  }
3113

3114
  for (int i = 0; i < ths->totalReplicaNum; i++) {
×
3115
    sDebug("vgId:%d, new sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i,
×
3116
           ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
3117
  }
3118

3119
  // 7.rebuild synctimer
3120
  if ((code = syncNodeStopHeartbeatTimer(ths)) != 0) {
×
3121
    taosMemoryFree(oldLogReplMgrs);
×
3122
    TAOS_RETURN(code);
×
3123
  }
3124

3125
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3126
    if ((code = syncHbTimerInit(ths, &ths->peerHeartbeatTimerArr[i], ths->replicasId[i])) != 0) {
×
3127
      taosMemoryFree(oldLogReplMgrs);
×
3128
      TAOS_RETURN(code);
×
3129
    }
3130
  }
3131

3132
  if ((code = syncNodeStartHeartbeatTimer(ths)) != 0) {
×
3133
    taosMemoryFree(oldLogReplMgrs);
×
3134
    TAOS_RETURN(code);
×
3135
  }
3136

3137
  // 8.rebuild peerStates
3138
  SPeerState oldState[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0};
×
3139
  for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) {
×
3140
    oldState[i] = ths->peerStates[i];
×
3141
  }
3142

3143
  for (int i = 0; i < ths->totalReplicaNum; i++) {
×
3144
    for (int j = 0; j < oldtotalReplicaNum; j++) {
×
3145
      if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
×
3146
        ths->peerStates[i] = oldState[j];
×
3147
      }
3148
    }
3149
  }
3150

3151
  taosMemoryFree(oldLogReplMgrs);
×
3152

3153
  return 0;
×
3154
}
3155

3156
void syncNodeChangeToVoter(SSyncNode* ths) {
×
3157
  // replicasId, only need to change replicaNum when 1->3
3158
  ths->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3159
  sDebug("vgId:%d, totalReplicaNum:%d", ths->vgId, ths->totalReplicaNum);
×
3160
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
×
3161
    sDebug("vgId:%d, i:%d, replicaId.addr:%" PRIx64, ths->vgId, i, ths->replicasId[i].addr);
×
3162
  }
3163

3164
  // pMatchIndex, pNextIndex, only need to change replicaNum when 1->3
3165
  ths->pMatchIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3166
  ths->pNextIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3167

3168
  sDebug("vgId:%d, pMatchIndex->totalReplicaNum:%d", ths->vgId, ths->pMatchIndex->totalReplicaNum);
×
3169
  for (int32_t i = 0; i < ths->pMatchIndex->totalReplicaNum; ++i) {
×
3170
    sDebug("vgId:%d, i:%d, match.index:%" PRId64, ths->vgId, i, ths->pMatchIndex->index[i]);
×
3171
  }
3172

3173
  // pVotesGranted, pVotesRespond
3174
  voteGrantedUpdate(ths->pVotesGranted, ths);
×
3175
  votesRespondUpdate(ths->pVotesRespond, ths);
×
3176

3177
  // logRepMgrs
3178
  // no need to change logRepMgrs when 1->3
3179
}
×
3180

3181
void syncNodeResetPeerAndCfg(SSyncNode* ths) {
×
3182
  SNodeInfo node = {0};
×
3183
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3184
    memcpy(&ths->peersNodeInfo[i], &node, sizeof(SNodeInfo));
×
3185
  }
3186

3187
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3188
    memcpy(&ths->raftCfg.cfg.nodeInfo[i], &node, sizeof(SNodeInfo));
×
3189
  }
3190
}
×
3191

3192
int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str) {
×
3193
  int32_t code = 0;
×
3194
  if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) {
×
3195
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3196
  }
3197

3198
  SMsgHead* head = (SMsgHead*)pEntry->data;
×
3199
  void*     pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
×
3200

3201
  SAlterVnodeTypeReq req = {0};
×
3202
  if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
×
3203
    code = TSDB_CODE_INVALID_MSG;
×
3204
    TAOS_RETURN(code);
×
3205
  }
3206

3207
  SSyncCfg cfg = {0};
×
3208
  syncBuildConfigFromReq(&req, &cfg);
×
3209

3210
  if (cfg.changeVersion <= ths->raftCfg.cfg.changeVersion) {
×
3211
    sInfo(
×
3212
        "vgId:%d, skip conf change entry since lower version. "
3213
        "this entry, index:%" PRId64 ", term:%" PRId64
3214
        ", totalReplicaNum:%d, changeVersion:%d; "
3215
        "current node, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64 ", changeVersion:%d",
3216
        ths->vgId, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum,
3217
        ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion);
3218
    return 0;
×
3219
  }
3220

3221
  if (strcmp(str, "Commit") == 0) {
×
3222
    sInfo(
×
3223
        "vgId:%d, change config from %s. "
3224
        "this, i:%" PRId64
3225
        ", trNum:%d, vers:%d; "
3226
        "node, rNum:%d, pNum:%d, trNum:%d, "
3227
        "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
3228
        "), "
3229
        "cond:(next i:%" PRId64 ", t:%" PRId64 " ==%s)",
3230
        ths->vgId, str, pEntry->index - 1, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum,
3231
        ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, ths->pLogBuf->matchIndex,
3232
        ths->pLogBuf->endIndex, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType));
3233
  } else {
3234
    sInfo(
×
3235
        "vgId:%d, change config from %s. "
3236
        "this, i:%" PRId64 ", t:%" PRId64
3237
        ", trNum:%d, vers:%d; "
3238
        "node, rNum:%d, pNum:%d, trNum:%d, "
3239
        "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
3240
        "), "
3241
        "cond:(pre i:%" PRId64 "==ci:%" PRId64 ", bci:%" PRId64 ")",
3242
        ths->vgId, str, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum,
3243
        ths->peersNum, ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
3244
        ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex, pEntry->index - 1, ths->commitIndex,
3245
        ths->pLogBuf->commitIndex);
3246
  }
3247

3248
  syncNodeLogConfigInfo(ths, &cfg, "before config change");
×
3249

3250
  int32_t oldTotalReplicaNum = ths->totalReplicaNum;
×
3251

3252
  if (cfg.totalReplicaNum == 1 || cfg.totalReplicaNum == 2) {  // remove replica
×
3253

3254
    bool incfg = false;
×
3255
    for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3256
      if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3257
          ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3258
        incfg = true;
×
3259
        break;
×
3260
      }
3261
    }
3262

3263
    if (incfg) {  // remove other
×
3264
      syncNodeResetPeerAndCfg(ths);
×
3265

3266
      // no need to change myNodeInfo
3267

3268
      if ((code = syncNodeRebuildPeerAndCfg(ths, &cfg)) != 0) {
×
3269
        TAOS_RETURN(code);
×
3270
      };
3271

3272
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3273
        TAOS_RETURN(code);
×
3274
      };
3275
    } else {  // remove myself
3276
      // no need to do anything actually, to change the following to reduce distruptive server chance
3277

3278
      syncNodeResetPeerAndCfg(ths);
×
3279

3280
      // change myNodeInfo
3281
      ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3282

3283
      // change peer and cfg
3284
      ths->peersNum = 0;
×
3285
      memcpy(&ths->raftCfg.cfg.nodeInfo[0], &ths->myNodeInfo, sizeof(SNodeInfo));
×
3286
      ths->raftCfg.cfg.replicaNum = 0;
×
3287
      ths->raftCfg.cfg.totalReplicaNum = 1;
×
3288

3289
      // change other
3290
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3291
        TAOS_RETURN(code);
×
3292
      }
3293

3294
      // change state
3295
      ths->state = TAOS_SYNC_STATE_LEARNER;
×
3296
    }
3297

3298
    ths->restoreFinish = false;
×
3299
  } else {                            // add replica, or change replica type
3300
    if (ths->totalReplicaNum == 3) {  // change replica type
×
3301
      sInfo("vgId:%d, begin change replica type", ths->vgId);
×
3302

3303
      // change myNodeInfo
3304
      for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3305
        if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3306
            ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3307
          if (cfg.nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3308
            ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3309
          }
3310
        }
3311
      }
3312

3313
      // change peer and cfg
3314
      syncNodeChangePeerAndCfgToVoter(ths, &cfg);
×
3315

3316
      // change other
3317
      syncNodeChangeToVoter(ths);
×
3318

3319
      // change state
3320
      if (ths->state == TAOS_SYNC_STATE_LEARNER) {
×
3321
        if (ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3322
          ths->state = TAOS_SYNC_STATE_FOLLOWER;
×
3323
        }
3324
      }
3325

3326
      ths->restoreFinish = false;
×
3327
    } else {  // add replica
3328
      sInfo("vgId:%d, begin add replica", ths->vgId);
×
3329

3330
      // no need to change myNodeInfo
3331

3332
      // change peer and cfg
3333
      if ((code = syncNodeRebuildPeerAndCfg(ths, &cfg)) != 0) {
×
3334
        TAOS_RETURN(code);
×
3335
      };
3336

3337
      // change other
3338
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3339
        TAOS_RETURN(code);
×
3340
      };
3341

3342
      // no need to change state
3343

3344
      if (ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) {
×
3345
        ths->restoreFinish = false;
×
3346
      }
3347
    }
3348
  }
3349

3350
  ths->quorum = syncUtilQuorum(ths->replicaNum);
×
3351

3352
  ths->raftCfg.lastConfigIndex = pEntry->index;
×
3353
  ths->raftCfg.cfg.lastIndex = pEntry->index;
×
3354
  ths->raftCfg.cfg.changeVersion = cfg.changeVersion;
×
3355

3356
  syncNodeLogConfigInfo(ths, &cfg, "after config change");
×
3357

3358
  if ((code = syncWriteCfgFile(ths)) != 0) {
×
3359
    sError("vgId:%d, failed to create sync cfg file", ths->vgId);
×
3360
    TAOS_RETURN(code);
×
3361
  };
3362

3363
  TAOS_RETURN(code);
×
3364
}
3365

3366
int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
1,975,717✔
3367
  int32_t code = -1;
1,975,717✔
3368
  if (pEntry->dataLen < sizeof(SMsgHead)) {
1,975,717!
3369
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
3370
    sError("vgId:%d, cannot append an invalid client request with no msg head. type:%s, dataLen:%d", ths->vgId,
×
3371
           TMSG_INFO(pEntry->originalRpcType), pEntry->dataLen);
3372
    syncEntryDestroy(pEntry);
×
3373
    pEntry = NULL;
×
3374
    goto _out;
×
3375
  }
3376

3377
  // append to log buffer
3378
  if ((code = syncLogBufferAppend(ths->pLogBuf, ths, pEntry)) < 0) {
1,975,717✔
3379
    sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index);
137!
3380
    int32_t ret = 0;
137✔
3381
    if ((ret = syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno, false)) != 0) {
137!
3382
      sError("vgId:%d, failed to execute fsm, since %s", ths->vgId, tstrerror(ret));
×
3383
    }
3384
    syncEntryDestroy(pEntry);
×
3385
    pEntry = NULL;
×
3386
    goto _out;
×
3387
  }
3388

3389
  code = 0;
1,975,679✔
3390
_out:;
1,975,679✔
3391
  // proceed match index, with replicating on needed
3392
  SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL, "Append");
1,975,679✔
3393

3394
  if (pEntry != NULL)
1,975,629!
3395
    sTrace("vgId:%d, append raft entry. index:%" PRId64 ", term:%" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64
1,975,699✔
3396
           ", %" PRId64 ")",
3397
           ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
3398
           ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
3399

3400
  if (code == 0 && ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
1,975,585!
3401
    TAOS_CHECK_RETURN(syncNodeUpdateAssignedCommitIndex(ths, matchIndex));
×
3402

3403
    if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
×
3404
        syncLogBufferCommit(ths->pLogBuf, ths, ths->assignedCommitIndex) < 0) {
×
3405
      sError("vgId:%d, failed to commit until commitIndex:%" PRId64 "", ths->vgId, ths->commitIndex);
×
3406
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
3407
    }
3408
  }
3409

3410
  // multi replica
3411
  if (ths->replicaNum > 1) {
1,975,585✔
3412
    TAOS_RETURN(code);
158,598✔
3413
  }
3414

3415
  // single replica
3416
  SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, matchIndex);
1,816,987✔
3417
  sTrace("vgId:%d, update commit return index %" PRId64 "", ths->vgId, returnIndex);
1,817,119✔
3418

3419
  if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
3,634,358!
3420
      (code = syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex)) < 0) {
1,817,164✔
3421
    sError("vgId:%d, failed to commit until commitIndex:%" PRId64 "", ths->vgId, ths->commitIndex);
×
3422
  }
3423

3424
  TAOS_RETURN(code);
1,817,194✔
3425
}
3426

3427
bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
1,965,075✔
3428
  if (pSyncNode->totalReplicaNum == 1) {
1,965,075✔
3429
    return false;
1,803,606✔
3430
  }
3431

3432
  int32_t toCount = 0;
161,469✔
3433
  int64_t tsNow = taosGetTimestampMs();
161,560✔
3434
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
480,809✔
3435
    if (pSyncNode->peersNodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
319,249✔
3436
      continue;
2,858✔
3437
    }
3438
    int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
316,391✔
3439
    if (recvTime == 0 || recvTime == -1) {
316,391!
3440
      continue;
×
3441
    }
3442

3443
    if (tsNow - recvTime > tsHeartbeatTimeout) {
316,391✔
3444
      toCount++;
133✔
3445
    }
3446
  }
3447

3448
  bool b = (toCount >= pSyncNode->quorum ? true : false);
161,560✔
3449

3450
  return b;
161,560✔
3451
}
3452

3453
bool syncNodeSnapshotSending(SSyncNode* pSyncNode) {
×
3454
  if (pSyncNode == NULL) return false;
×
3455
  bool b = false;
×
3456
  for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
×
3457
    if (pSyncNode->senders[i] != NULL && pSyncNode->senders[i]->start) {
×
3458
      b = true;
×
3459
      break;
×
3460
    }
3461
  }
3462
  return b;
×
3463
}
3464

3465
bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) {
×
3466
  if (pSyncNode == NULL) return false;
×
3467
  if (pSyncNode->pNewNodeReceiver == NULL) return false;
×
3468
  if (pSyncNode->pNewNodeReceiver->start) return true;
×
3469
  return false;
×
3470
}
3471

3472
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
11,963✔
3473
  int32_t   code = 0;
11,963✔
3474
  SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
11,963✔
3475
  SyncTerm  term = raftStoreGetTerm(ths);
11,963✔
3476

3477
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
11,963✔
3478
  if (pEntry == NULL) {
11,963!
3479
    code = TSDB_CODE_OUT_OF_MEMORY;
×
3480
    TAOS_RETURN(code);
×
3481
  }
3482

3483
  code = syncNodeAppend(ths, pEntry);
11,963✔
3484
  TAOS_RETURN(code);
11,963✔
3485
}
3486

3487
#ifdef BUILD_NO_CALL
3488
static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
3489
  int32_t ret = 0;
3490

3491
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
3492
  SyncTerm        term = raftStoreGetTerm(ths);
3493
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
3494
  if (pEntry == NULL) return -1;
3495

3496
  LRUHandle* h = NULL;
3497

3498
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
3499
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry, false);
3500
    if (code != 0) {
3501
      sError("append noop error");
3502
      return -1;
3503
    }
3504

3505
    syncCacheEntry(ths->pLogStore, pEntry, &h);
3506
  }
3507

3508
  if (h) {
3509
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
3510
  } else {
3511
    syncEntryDestroy(pEntry);
3512
  }
3513

3514
  return ret;
3515
}
3516
#endif
3517

3518
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
42,429✔
3519
  SyncHeartbeat* pMsg = pRpcMsg->pCont;
42,429✔
3520
  bool           resetElect = false;
42,429✔
3521

3522
  const STraceId* trace = &pRpcMsg->info.traceId;
42,429✔
3523
  char            tbuf[40] = {0};
42,429✔
3524
  TRACE_TO_STR(trace, tbuf);
42,429!
3525

3526
  int64_t tsMs = taosGetTimestampMs();
42,429✔
3527
  int64_t timeDiff = tsMs - pMsg->timeStamp;
42,429✔
3528
  syncLogRecvHeartbeat(ths, pMsg, timeDiff, tbuf);
42,429✔
3529

3530
  if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) {
42,429✔
3531
    sWarn(
7!
3532
        "vgId:%d, drop heartbeat msg from dnode:%d, because it come from another cluster:%d, differ from current "
3533
        "cluster:%d",
3534
        ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), CID(&(ths->myRaftId)));
3535
    return 0;
7✔
3536
  }
3537

3538
  SRpcMsg rpcMsg = {0};
42,422✔
3539
  TAOS_CHECK_RETURN(syncBuildHeartbeatReply(&rpcMsg, ths->vgId));
42,422!
3540
  SyncTerm currentTerm = raftStoreGetTerm(ths);
42,422✔
3541

3542
  SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
42,422✔
3543
  pMsgReply->destId = pMsg->srcId;
42,422✔
3544
  pMsgReply->srcId = ths->myRaftId;
42,422✔
3545
  pMsgReply->term = currentTerm;
42,422✔
3546
  pMsgReply->privateTerm = 8864;  // magic number
42,422✔
3547
  pMsgReply->startTime = ths->startTime;
42,422✔
3548
  pMsgReply->timeStamp = tsMs;
42,422✔
3549

3550
  sGTrace("vgId:%d, process sync-heartbeat msg from dnode:%d, cluster:%d, Msgterm:%" PRId64 " currentTerm:%" PRId64,
42,422!
3551
          ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), pMsg->term, currentTerm);
3552

3553
  if (pMsg->term > currentTerm && ths->state == TAOS_SYNC_STATE_LEARNER) {
42,422✔
3554
    raftStoreSetTerm(ths, pMsg->term);
265✔
3555
    currentTerm = pMsg->term;
265✔
3556
  }
3557

3558
  if (pMsg->term == currentTerm &&
42,422✔
3559
      (ths->state != TAOS_SYNC_STATE_LEADER && ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
42,343!
3560
    syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);
42,343✔
3561
    resetElect = true;
42,343✔
3562

3563
    ths->minMatchIndex = pMsg->minMatchIndex;
42,343✔
3564

3565
    if (ths->state == TAOS_SYNC_STATE_FOLLOWER || ths->state == TAOS_SYNC_STATE_LEARNER) {
42,343✔
3566
      SRpcMsg rpcMsgLocalCmd = {0};
42,341✔
3567
      TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId));
42,341!
3568

3569
      SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
42,341✔
3570
      pSyncMsg->cmd =
42,341✔
3571
          (ths->state == TAOS_SYNC_STATE_LEARNER) ? SYNC_LOCAL_CMD_LEARNER_CMT : SYNC_LOCAL_CMD_FOLLOWER_CMT;
42,341✔
3572
      pSyncMsg->commitIndex = pMsg->commitIndex;
42,341✔
3573
      pSyncMsg->currentTerm = pMsg->term;
42,341✔
3574

3575
      if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
42,341!
3576
        int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
42,341✔
3577
        if (code != 0) {
42,341!
3578
          sError("vgId:%d, failed to enqueue commit msg from heartbeat since %s, code:%d", ths->vgId, terrstr(), code);
×
3579
          rpcFreeCont(rpcMsgLocalCmd.pCont);
×
3580
        } else {
3581
          sTrace("vgId:%d, enqueue commit msg from heartbeat, commit-index:%" PRId64 ", term:%" PRId64, ths->vgId,
42,341!
3582
                 pMsg->commitIndex, pMsg->term);
3583
        }
3584
      }
3585
    }
3586
  }
3587

3588
  if (pMsg->term >= currentTerm &&
42,422!
3589
      (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
42,422!
3590
    SRpcMsg rpcMsgLocalCmd = {0};
×
3591
    TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId));
×
3592

3593
    SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
×
3594
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
×
3595
    pSyncMsg->currentTerm = pMsg->term;
×
3596
    pSyncMsg->commitIndex = pMsg->commitIndex;
×
3597

3598
    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
×
3599
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
×
3600
      if (code != 0) {
×
3601
        sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
×
3602
        rpcFreeCont(rpcMsgLocalCmd.pCont);
×
3603
      } else {
3604
        sTrace("vgId:%d, sync enqueue step-down msg, new-term:%" PRId64, ths->vgId, pMsg->term);
×
3605
      }
3606
    }
3607
  }
3608

3609
  // reply
3610
  TAOS_CHECK_RETURN(syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg));
42,422!
3611

3612
  if (resetElect) syncNodeResetElectTimer(ths);
42,422✔
3613
  return 0;
42,422✔
3614
}
3615

3616
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
42,263✔
3617
  int32_t         code = 0;
42,263✔
3618
  const STraceId* trace = &pRpcMsg->info.traceId;
42,263✔
3619
  char            tbuf[40] = {0};
42,263✔
3620
  TRACE_TO_STR(trace, tbuf);
42,263!
3621

3622
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
42,263✔
3623
  SSyncLogReplMgr*    pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId);
42,263✔
3624
  if (pMgr == NULL) {
42,263!
3625
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3626
    if (terrno != 0) code = terrno;
×
3627
    sError("vgId:%d, failed to get log repl mgr for the peer at addr 0x016%" PRIx64 "", ths->vgId, pMsg->srcId.addr);
×
3628
    TAOS_RETURN(code);
×
3629
  }
3630

3631
  int64_t tsMs = taosGetTimestampMs();
42,263✔
3632
  syncLogRecvHeartbeatReply(ths, pMsg, tsMs - pMsg->timeStamp, tbuf);
42,263✔
3633

3634
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
42,263✔
3635

3636
  return syncLogReplProcessHeartbeatReply(pMgr, ths, pMsg);
42,263✔
3637
}
3638

3639
#ifdef BUILD_NO_CALL
3640
int32_t syncNodeOnHeartbeatReplyOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
3641
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
3642

3643
  const STraceId* trace = &pRpcMsg->info.traceId;
3644
  char            tbuf[40] = {0};
3645
  TRACE_TO_STR(trace, tbuf);
3646

3647
  int64_t tsMs = taosGetTimestampMs();
3648
  int64_t timeDiff = tsMs - pMsg->timeStamp;
3649
  syncLogRecvHeartbeatReply(ths, pMsg, timeDiff, tbuf);
3650

3651
  // update last reply time, make decision whether the other node is alive or not
3652
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
3653
  return 0;
3654
}
3655
#endif
3656

3657
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
42,341✔
3658
  SyncLocalCmd* pMsg = pRpcMsg->pCont;
42,341✔
3659
  syncLogRecvLocalCmd(ths, pMsg, "");
42,341✔
3660

3661
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
42,341!
3662
    syncNodeStepDown(ths, pMsg->currentTerm);
×
3663

3664
  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT || pMsg->cmd == SYNC_LOCAL_CMD_LEARNER_CMT) {
84,682!
3665
    if (syncLogBufferIsEmpty(ths->pLogBuf)) {
42,341!
3666
      sError("vgId:%d, sync log buffer is empty.", ths->vgId);
×
3667
      return 0;
×
3668
    }
3669
    SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf);
42,341✔
3670
    if (matchTerm < 0) {
42,341!
3671
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3672
    }
3673
    if (pMsg->currentTerm == matchTerm) {
42,341✔
3674
      SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
39,041✔
3675
      sTrace("vgId:%d, update commit return index %" PRId64 "", ths->vgId, returnIndex);
39,041!
3676
    }
3677
    if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE && syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
42,341!
3678
      sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64 "", ths->vgId, terrstr(),
×
3679
             ths->commitIndex);
3680
    }
3681
  } else {
3682
    sError("error local cmd");
×
3683
  }
3684

3685
  return 0;
42,341✔
3686
}
3687

3688
// TLA+ Spec
3689
// ClientRequest(i, v) ==
3690
//     /\ state[i] = Leader
3691
//     /\ LET entry == [term  |-> currentTerm[i],
3692
//                      value |-> v]
3693
//            newLog == Append(log[i], entry)
3694
//        IN  log' = [log EXCEPT ![i] = newLog]
3695
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
3696
//                    leaderVars, commitIndex>>
3697
//
3698

3699
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
1,963,758✔
3700
  sNTrace(ths, "on client request");
1,963,758✔
3701

3702
  int32_t code = 0;
1,963,758✔
3703

3704
  SyncIndex       index = syncLogBufferGetEndIndex(ths->pLogBuf);
1,963,758✔
3705
  SyncTerm        term = raftStoreGetTerm(ths);
1,963,888✔
3706
  SSyncRaftEntry* pEntry = NULL;
1,963,870✔
3707
  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
1,963,870✔
3708
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index);
233,964✔
3709
  } else {
3710
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
1,729,906✔
3711
  }
3712

3713
  if (pEntry == NULL) {
1,963,830!
3714
    sError("vgId:%d, failed to process client request since %s.", ths->vgId, terrstr());
×
3715
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3716
  }
3717

3718
  // 1->2, config change is add in write thread, and will continue in sync thread
3719
  // need save message for it
3720
  if (pMsg->msgType == TDMT_SYNC_CONFIG_CHANGE) {
1,963,830!
3721
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
×
3722
    uint64_t  seqNum = syncRespMgrAdd(ths->pSyncRespMgr, &stub);
×
3723
    pEntry->seqNum = seqNum;
×
3724
  }
3725

3726
  if (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
1,963,830!
3727
    if (pRetIndex) {
1,963,830✔
3728
      (*pRetIndex) = index;
1,729,832✔
3729
    }
3730

3731
    if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
1,963,830!
3732
      int32_t code = syncNodeCheckChangeConfig(ths, pEntry);
×
3733
      if (code < 0) {
×
3734
        sError("vgId:%d, failed to check change config since %s.", ths->vgId, terrstr());
×
3735
        syncEntryDestroy(pEntry);
×
3736
        pEntry = NULL;
×
3737
        TAOS_RETURN(code);
×
3738
      }
3739

3740
      if (code > 0) {
×
3741
        SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
×
3742
        int32_t num = syncRespMgrGetAndDel(ths->pSyncRespMgr, pEntry->seqNum, &rsp.info);
×
3743
        sDebug("vgId:%d, get response stub for config change, seqNum:%" PRIu64 ", num:%d", ths->vgId, pEntry->seqNum,
×
3744
               num);
3745
        if (rsp.info.handle != NULL) {
×
3746
          tmsgSendRsp(&rsp);
×
3747
        }
3748
        syncEntryDestroy(pEntry);
×
3749
        pEntry = NULL;
×
3750
        TAOS_RETURN(code);
×
3751
      }
3752
    }
3753

3754
    code = syncNodeAppend(ths, pEntry);
1,963,830✔
3755
    return code;
1,963,624✔
3756
  } else {
3757
    syncEntryDestroy(pEntry);
×
3758
    pEntry = NULL;
×
3759
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3760
  }
3761
}
3762

3763
const char* syncStr(ESyncState state) {
430,383✔
3764
  switch (state) {
430,383!
3765
    case TAOS_SYNC_STATE_FOLLOWER:
112,937✔
3766
      return "follower";
112,937✔
3767
    case TAOS_SYNC_STATE_CANDIDATE:
8,849✔
3768
      return "candidate";
8,849✔
3769
    case TAOS_SYNC_STATE_LEADER:
297,489✔
3770
      return "leader";
297,489✔
3771
    case TAOS_SYNC_STATE_ERROR:
×
3772
      return "error";
×
3773
    case TAOS_SYNC_STATE_OFFLINE:
3,829✔
3774
      return "offline";
3,829✔
3775
    case TAOS_SYNC_STATE_LEARNER:
7,277✔
3776
      return "learner";
7,277✔
3777
    case TAOS_SYNC_STATE_ASSIGNED_LEADER:
×
3778
      return "assigned leader";
×
3779
    default:
2✔
3780
      return "unknown";
2✔
3781
  }
3782
}
3783

3784
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
1,834✔
3785
  for (int32_t i = 0; i < pNewCfg->totalReplicaNum; ++i) {
2,059!
3786
    SRaftId raftId = {
2,059✔
3787
        .addr = SYNC_ADDR(&pNewCfg->nodeInfo[i]),
2,059✔
3788
        .vgId = ths->vgId,
2,059✔
3789
    };
3790

3791
    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
2,059✔
3792
      pNewCfg->myIndex = i;
1,834✔
3793
      return 0;
1,834✔
3794
    }
3795
  }
3796

3797
  return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3798
}
3799

3800
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
1,965,071✔
3801
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
1,965,071!
3802
}
3803

3804
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
764,972✔
3805
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
1,531,909!
3806
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
1,531,913✔
3807
      return true;
764,968✔
3808
    }
3809
  }
3810
  return false;
×
3811
}
3812

3813
SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
31,052✔
3814
  SSyncSnapshotSender* pSender = NULL;
31,052✔
3815
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
123,333✔
3816
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
92,276✔
3817
      pSender = (ths->senders)[i];
31,081✔
3818
    }
3819
  }
3820
  return pSender;
31,057✔
3821
}
3822

3823
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
27,022✔
3824
  SSyncTimer* pTimer = NULL;
27,022✔
3825
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
113,201✔
3826
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
86,178✔
3827
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
27,022✔
3828
    }
3829
  }
3830
  return pTimer;
27,023✔
3831
}
3832

3833
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
20,048✔
3834
  SPeerState* pState = NULL;
20,048✔
3835
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
44,199✔
3836
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
24,151✔
3837
      pState = &((ths->peerStates)[i]);
20,048✔
3838
    }
3839
  }
3840
  return pState;
20,048✔
3841
}
3842

3843
#ifdef BUILD_NO_CALL
3844
bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
3845
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
3846
  if (pState == NULL) {
3847
    sError("vgId:%d, replica maybe dropped", ths->vgId);
3848
    return false;
3849
  }
3850

3851
  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
3852
  int64_t   tsNow = taosGetTimestampMs();
3853

3854
  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
3855
    return false;
3856
  }
3857

3858
  return true;
3859
}
3860

3861
bool syncNodeCanChange(SSyncNode* pSyncNode) {
3862
  if (pSyncNode->changing) {
3863
    sError("sync cannot change");
3864
    return false;
3865
  }
3866

3867
  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
3868
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
3869
    if (pSyncNode->commitIndex != lastIndex) {
3870
      sError("sync cannot change2");
3871
      return false;
3872
    }
3873
  }
3874

3875
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
3876
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
3877
    if (pSender != NULL && pSender->start) {
3878
      sError("sync cannot change3");
3879
      return false;
3880
    }
3881
  }
3882

3883
  return true;
3884
}
3885
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc