• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3543

29 Nov 2024 02:58AM UTC coverage: 60.842% (+0.02%) from 60.819%
#3543

push

travis-ci

web-flow
Merge pull request #28973 from taosdata/merge/mainto3.0

merge: from main to 3.0

120460 of 253224 branches covered (47.57%)

Branch coverage included in aggregate %.

706 of 908 new or added lines in 18 files covered. (77.75%)

2401 existing lines in 137 files now uncovered.

201633 of 276172 relevant lines covered (73.01%)

19045673.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.77
/source/libs/sync/src/syncMain.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "sync.h"
18
#include "syncAppendEntries.h"
19
#include "syncAppendEntriesReply.h"
20
#include "syncCommit.h"
21
#include "syncElection.h"
22
#include "syncEnv.h"
23
#include "syncIndexMgr.h"
24
#include "syncInt.h"
25
#include "syncMessage.h"
26
#include "syncPipeline.h"
27
#include "syncRaftCfg.h"
28
#include "syncRaftLog.h"
29
#include "syncRaftStore.h"
30
#include "syncReplication.h"
31
#include "syncRequestVote.h"
32
#include "syncRequestVoteReply.h"
33
#include "syncRespMgr.h"
34
#include "syncSnapshot.h"
35
#include "syncTimeout.h"
36
#include "syncUtil.h"
37
#include "syncVoteMgr.h"
38
#include "tglobal.h"
39
#include "tmisce.h"
40
#include "tref.h"
41

42
static void    syncNodeEqPingTimer(void* param, void* tmrId);
43
static void    syncNodeEqElectTimer(void* param, void* tmrId);
44
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
45
static int32_t syncNodeAppendNoop(SSyncNode* ths);
46
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
47
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
48
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
49
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
50
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
51
static int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
52
static bool    syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
53
static int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
54
static bool    syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);
55

56
static bool    syncNodeCanChange(SSyncNode* pSyncNode);
57
static int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
58
static int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
59
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);
60

61
static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
62

63
int64_t syncOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
15,904✔
64
  sInfo("vgId:%d, start to open sync", pSyncInfo->vgId);
15,904!
65
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo, vnodeVersion);
15,904✔
66
  if (pSyncNode == NULL) {
15,908!
67
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
×
68
    return -1;
×
69
  }
70

71
  pSyncNode->rid = syncNodeAdd(pSyncNode);
15,908✔
72
  if (pSyncNode->rid < 0) {
15,908!
73
    syncNodeClose(pSyncNode);
×
74
    return -1;
×
75
  }
76

77
  pSyncNode->pingBaseLine = pSyncInfo->pingMs;
15,908✔
78
  pSyncNode->pingTimerMS = pSyncInfo->pingMs;
15,908✔
79
  pSyncNode->electBaseLine = pSyncInfo->electMs;
15,908✔
80
  pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
15,908✔
81
  pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
15,908✔
82
  pSyncNode->msgcb = pSyncInfo->msgcb;
15,908✔
83
  sInfo("vgId:%d, sync opened", pSyncInfo->vgId);
15,908!
84
  return pSyncNode->rid;
15,908✔
85
}
86

87
int32_t syncStart(int64_t rid) {
15,907✔
88
  int32_t    code = 0;
15,907✔
89
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
15,907✔
90
  if (pSyncNode == NULL) {
15,907!
91
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
92
    if (terrno != 0) code = terrno;
×
93
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
94
    TAOS_RETURN(code);
×
95
  }
96
  sInfo("vgId:%d, begin to start sync", pSyncNode->vgId);
15,907!
97

98
  if ((code = syncNodeRestore(pSyncNode)) < 0) {
15,907!
99
    sError("vgId:%d, failed to restore sync log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
100
    goto _err;
×
101
  }
102

103
  if ((code = syncNodeStart(pSyncNode)) < 0) {
15,907!
104
    sError("vgId:%d, failed to start sync node since %s", pSyncNode->vgId, tstrerror(code));
×
105
    goto _err;
×
106
  }
107

108
  syncNodeRelease(pSyncNode);
15,907✔
109

110
  sInfo("vgId:%d, sync started", pSyncNode->vgId);
15,907!
111

112
  TAOS_RETURN(code);
15,907✔
113

114
_err:
×
115
  syncNodeRelease(pSyncNode);
×
116
  TAOS_RETURN(code);
×
117
}
118

119
int32_t syncNodeGetConfig(int64_t rid, SSyncCfg* cfg) {
31,836✔
120
  int32_t    code = 0;
31,836✔
121
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
31,836✔
122

123
  if (pSyncNode == NULL) {
31,838!
124
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
125
    if (terrno != 0) code = terrno;
×
126
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
127
    TAOS_RETURN(code);
×
128
  }
129

130
  *cfg = pSyncNode->raftCfg.cfg;
31,838✔
131

132
  syncNodeRelease(pSyncNode);
31,838✔
133

134
  return 0;
31,837✔
135
}
136

137
void syncStop(int64_t rid) {
15,907✔
138
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
15,907✔
139
  if (pSyncNode != NULL) {
15,907!
140
    pSyncNode->isStart = false;
15,907✔
141
    syncNodeRelease(pSyncNode);
15,907✔
142
    syncNodeRemove(rid);
15,907✔
143
  }
144
}
15,903✔
145

146
void syncPreStop(int64_t rid) {
15,907✔
147
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
15,907✔
148
  if (pSyncNode != NULL) {
15,907!
149
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
15,907!
150
      sInfo("vgId:%d, stop snapshot receiver", pSyncNode->vgId);
×
151
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
152
    }
153
    syncNodePreClose(pSyncNode);
15,907✔
154
    syncNodeRelease(pSyncNode);
15,907✔
155
  }
156
}
15,905✔
157

158
void syncPostStop(int64_t rid) {
13,901✔
159
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
13,901✔
160
  if (pSyncNode != NULL) {
13,901!
161
    syncNodePostClose(pSyncNode);
13,901✔
162
    syncNodeRelease(pSyncNode);
13,900✔
163
  }
164
}
13,900✔
165

166
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
1,875✔
167
  if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
1,875!
168
  return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
1,875!
169
}
170

171
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
1,985✔
172
  int32_t    code = 0;
1,985✔
173
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
1,985✔
174
  if (pSyncNode == NULL) {
1,985!
175
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
176
    if (terrno != 0) code = terrno;
×
177
    TAOS_RETURN(code);
×
178
  }
179

180
  if (pSyncNode->raftCfg.lastConfigIndex >= pNewCfg->lastIndex) {
1,985✔
181
    syncNodeRelease(pSyncNode);
110✔
182
    sInfo("vgId:%d, no need Reconfig, current index:%" PRId64 ", new index:%" PRId64, pSyncNode->vgId,
110!
183
          pSyncNode->raftCfg.lastConfigIndex, pNewCfg->lastIndex);
184
    return 0;
110✔
185
  }
186

187
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
1,875!
188
    syncNodeRelease(pSyncNode);
×
189
    code = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
×
190
    sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
×
191
    TAOS_RETURN(code);
×
192
  }
193

194
  TAOS_CHECK_RETURN(syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg));
1,875!
195

196
  if (syncNodeDoConfigChange(pSyncNode, pNewCfg, pNewCfg->lastIndex) != 0) {
1,875!
197
    code = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
×
198
    sError("vgId:%d, failed to reconfig since do change error", pSyncNode->vgId);
×
199
    TAOS_RETURN(code);
×
200
  }
201

202
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER || pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
1,875!
203
    // TODO check return value
204
    TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1,686!
205

206
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
26,976✔
207
      TAOS_CHECK_RETURN(syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]));
25,290!
208
    }
209

210
    TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
1,686!
211
    // syncNodeReplicate(pSyncNode);
212
  }
213

214
  syncNodeRelease(pSyncNode);
1,875✔
215
  TAOS_RETURN(code);
1,875✔
216
}
217

218
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
6,240,710✔
219
  int32_t code = -1;
6,240,710✔
220
  if (!syncIsInit()) {
6,240,710!
221
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
222
    if (terrno != 0) code = terrno;
×
223
    TAOS_RETURN(code);
×
224
  }
225

226
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
6,240,709✔
227
  if (pSyncNode == NULL) {
6,241,113✔
228
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
225✔
229
    if (terrno != 0) code = terrno;
225!
230
    TAOS_RETURN(code);
×
231
  }
232

233
  switch (pMsg->msgType) {
6,240,888!
234
    case TDMT_SYNC_HEARTBEAT:
45,207✔
235
      code = syncNodeOnHeartbeat(pSyncNode, pMsg);
45,207✔
236
      break;
45,207✔
237
    case TDMT_SYNC_HEARTBEAT_REPLY:
44,688✔
238
      code = syncNodeOnHeartbeatReply(pSyncNode, pMsg);
44,688✔
239
      break;
44,688✔
240
    case TDMT_SYNC_TIMEOUT:
204,908✔
241
      code = syncNodeOnTimeout(pSyncNode, pMsg);
204,908✔
242
      break;
204,888✔
243
    case TDMT_SYNC_TIMEOUT_ELECTION:
1,341✔
244
      code = syncNodeOnTimeout(pSyncNode, pMsg);
1,341✔
245
      break;
1,341✔
246
    case TDMT_SYNC_CLIENT_REQUEST:
248,577✔
247
      code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
248,577✔
248
      break;
248,582✔
249
    case TDMT_SYNC_REQUEST_VOTE:
2,244✔
250
      code = syncNodeOnRequestVote(pSyncNode, pMsg);
2,244✔
251
      break;
2,244✔
252
    case TDMT_SYNC_REQUEST_VOTE_REPLY:
2,161✔
253
      code = syncNodeOnRequestVoteReply(pSyncNode, pMsg);
2,161✔
254
      break;
2,161✔
255
    case TDMT_SYNC_APPEND_ENTRIES:
2,738,866✔
256
      code = syncNodeOnAppendEntries(pSyncNode, pMsg);
2,738,866✔
257
      break;
2,738,875✔
258
    case TDMT_SYNC_APPEND_ENTRIES_REPLY:
2,736,586✔
259
      code = syncNodeOnAppendEntriesReply(pSyncNode, pMsg);
2,736,586✔
260
      break;
2,736,585✔
261
    case TDMT_SYNC_SNAPSHOT_SEND:
85,614✔
262
      code = syncNodeOnSnapshot(pSyncNode, pMsg);
85,614✔
263
      break;
85,614✔
264
    case TDMT_SYNC_SNAPSHOT_RSP:
85,646✔
265
      code = syncNodeOnSnapshotRsp(pSyncNode, pMsg);
85,646✔
266
      break;
85,646✔
267
    case TDMT_SYNC_LOCAL_CMD:
45,047✔
268
      code = syncNodeOnLocalCmd(pSyncNode, pMsg);
45,047✔
269
      break;
45,047✔
270
    case TDMT_SYNC_FORCE_FOLLOWER:
3✔
271
      code = syncForceBecomeFollower(pSyncNode, pMsg);
3✔
272
      break;
3✔
273
    case TDMT_SYNC_SET_ASSIGNED_LEADER:
×
274
      code = syncBecomeAssignedLeader(pSyncNode, pMsg);
×
275
      break;
×
276
    default:
×
277
      code = TSDB_CODE_MSG_NOT_PROCESSED;
×
278
  }
279

280
  syncNodeRelease(pSyncNode);
6,240,881✔
281
  if (code != 0) {
6,240,878✔
282
    sDebug("vgId:%d, failed to process sync msg:%p type:%s since %s", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
24!
283
           tstrerror(code));
284
  }
285
  TAOS_RETURN(code);
6,240,878✔
286
}
287

288
int32_t syncLeaderTransfer(int64_t rid) {
15,907✔
289
  int32_t    code = 0;
15,907✔
290
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
15,907✔
291
  if (pSyncNode == NULL) {
15,907!
292
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
293
    if (terrno != 0) code = terrno;
×
294
    TAOS_RETURN(code);
×
295
  }
296

297
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
15,907✔
298
  syncNodeRelease(pSyncNode);
15,906✔
299
  return ret;
15,907✔
300
}
301

302
int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
3✔
303
  syncNodeBecomeFollower(ths, "force election");
3✔
304

305
  SRpcMsg rsp = {
3✔
306
      .code = 0,
307
      .pCont = pRpcMsg->info.rsp,
3✔
308
      .contLen = pRpcMsg->info.rspLen,
3✔
309
      .info = pRpcMsg->info,
310
  };
311
  tmsgSendRsp(&rsp);
3✔
312

313
  return 0;
3✔
314
}
315

316
int32_t syncBecomeAssignedLeader(SSyncNode* ths, SRpcMsg* pRpcMsg) {
×
317
  int32_t code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
318
  void*   pHead = NULL;
×
319
  int32_t contLen = 0;
×
320

321
  SVArbSetAssignedLeaderReq req = {0};
×
322
  if (tDeserializeSVArbSetAssignedLeaderReq((char*)pRpcMsg->pCont + sizeof(SMsgHead), pRpcMsg->contLen, &req) != 0) {
×
323
    sError("vgId:%d, failed to deserialize SVArbSetAssignedLeaderReq", ths->vgId);
×
324
    code = TSDB_CODE_INVALID_MSG;
×
325
    goto _OVER;
×
326
  }
327

328
  if (ths->arbTerm > req.arbTerm) {
×
329
    sInfo("vgId:%d, skip to set assigned leader, msg with lower term, local:%" PRId64 "msg:%" PRId64, ths->vgId,
×
330
          ths->arbTerm, req.arbTerm);
331
    goto _OVER;
×
332
  }
333

334
  ths->arbTerm = TMAX(req.arbTerm, ths->arbTerm);
×
335

336
  if (strncmp(req.memberToken, ths->arbToken, TSDB_ARB_TOKEN_SIZE) != 0) {
×
337
    sInfo("vgId:%d, skip to set assigned leader, token mismatch, local:%s, msg:%s", ths->vgId, ths->arbToken,
×
338
          req.memberToken);
339
    goto _OVER;
×
340
  }
341

342
  if (ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
×
343
    code = TSDB_CODE_SUCCESS;
×
344
    raftStoreNextTerm(ths);
×
345
    if (terrno != TSDB_CODE_SUCCESS) {
×
346
      code = terrno;
×
347
      sError("vgId:%d, failed to set next term since:%s", ths->vgId, tstrerror(code));
×
348
      goto _OVER;
×
349
    }
350
    syncNodeBecomeAssignedLeader(ths);
×
351

352
    if ((code = syncNodeAppendNoop(ths)) < 0) {
×
353
      sError("vgId:%d, assigned leader failed to append noop entry since %s", ths->vgId, tstrerror(code));
×
354
    }
355
  }
356

357
  SVArbSetAssignedLeaderRsp rsp = {0};
×
358
  rsp.arbToken = req.arbToken;
×
359
  rsp.memberToken = req.memberToken;
×
360
  rsp.vgId = ths->vgId;
×
361

362
  contLen = tSerializeSVArbSetAssignedLeaderRsp(NULL, 0, &rsp);
×
363
  if (contLen <= 0) {
×
364
    code = TSDB_CODE_OUT_OF_MEMORY;
×
365
    sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId);
×
366
    goto _OVER;
×
367
  }
368
  pHead = rpcMallocCont(contLen);
×
369
  if (!pHead) {
×
370
    code = terrno;
×
371
    sError("vgId:%d, failed to malloc memory for SVArbSetAssignedLeaderRsp", ths->vgId);
×
372
    goto _OVER;
×
373
  }
374
  if (tSerializeSVArbSetAssignedLeaderRsp(pHead, contLen, &rsp) <= 0) {
×
375
    code = TSDB_CODE_OUT_OF_MEMORY;
×
376
    sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId);
×
377
    rpcFreeCont(pHead);
×
378
    goto _OVER;
×
379
  }
380

381
  code = TSDB_CODE_SUCCESS;
×
382

383
_OVER:;
×
384
  SRpcMsg rspMsg = {
×
385
      .code = code,
386
      .pCont = pHead,
387
      .contLen = contLen,
388
      .info = pRpcMsg->info,
389
  };
390

391
  tmsgSendRsp(&rspMsg);
×
392

393
  tFreeSVArbSetAssignedLeaderReq(&req);
×
394
  TAOS_RETURN(code);
×
395
}
396

397
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
×
398
  int32_t    code = 0;
×
399
  SSyncNode* pNode = syncNodeAcquire(rid);
×
400
  if (pNode == NULL) {
×
401
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
402
    if (terrno != 0) code = terrno;
×
403
    TAOS_RETURN(code);
×
404
  }
405

406
  SRpcMsg rpcMsg = {0, .info.notFreeAhandle = 1};
×
407
  int32_t ret = syncRespMgrGetAndDel(pNode->pSyncRespMgr, seq, &rpcMsg.info);
×
408
  rpcMsg.code = TSDB_CODE_SYN_TIMEOUT;
×
409

410
  syncNodeRelease(pNode);
×
411
  if (ret == 1) {
×
412
    sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle);
×
413
    code = rpcSendResponse(&rpcMsg);
×
414
    return code;
×
415
  } else {
416
    sError("no message handle to send timeout response, seq:%" PRId64, seq);
×
417
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
418
  }
419
}
420

421
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
48,403✔
422
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;
48,403✔
423

424
  if (pSyncNode->peersNum > 0) {
48,403!
425
    minMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
48,403✔
426
  }
427

428
  for (int32_t i = 1; i < pSyncNode->peersNum; ++i) {
95,757✔
429
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
47,354✔
430
    if (matchIndex < minMatchIndex) {
47,354✔
431
      minMatchIndex = matchIndex;
2,930✔
432
    }
433
  }
434
  return minMatchIndex;
48,403✔
435
}
436

437
static SyncIndex syncLogRetentionIndex(SSyncNode* pSyncNode, int64_t bytes) {
1,762✔
438
  return pSyncNode->pLogStore->syncLogIndexRetention(pSyncNode->pLogStore, bytes);
1,762✔
439
}
440

441
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
35,348✔
442
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
35,348✔
443
  int32_t    code = 0;
35,347✔
444
  if (pSyncNode == NULL) {
35,347✔
445
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
12✔
446
    if (terrno != 0) code = terrno;
12!
447
    sError("sync begin snapshot error");
12!
448
    TAOS_RETURN(code);
12✔
449
  }
450

451
  SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
35,335✔
452
  SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
35,335✔
453
  bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);
35,336✔
454

455
  if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
35,336!
456
    sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
105!
457
    syncNodeRelease(pSyncNode);
105✔
458
    return 0;
105✔
459
  }
460

461
  int64_t logRetention = 0;
35,231✔
462

463
  if (syncNodeIsMnode(pSyncNode)) {
35,231✔
464
    // mnode
465
    logRetention = tsMndLogRetention;
3,495✔
466
  } else {
467
    // vnode
468
    if (pSyncNode->replicaNum > 1) {
31,736✔
469
      logRetention = SYNC_VNODE_LOG_RETENTION;
1,409✔
470
    }
471
  }
472

473
  if (pSyncNode->totalReplicaNum > 1) {
35,231✔
474
    if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER &&
1,780✔
475
        pSyncNode->state != TAOS_SYNC_STATE_LEARNER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
165!
476
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate or unknown state, do not delete wal",
18!
477
              lastApplyIndex);
478
      syncNodeRelease(pSyncNode);
18✔
479
      return 0;
18✔
480
    }
481
    SyncIndex retentionIndex =
1,762✔
482
        TMAX(pSyncNode->minMatchIndex, syncLogRetentionIndex(pSyncNode, SYNC_WAL_LOG_RETENTION_SIZE));
1,762✔
483
    logRetention += TMAX(0, lastApplyIndex - retentionIndex);
1,762✔
484
  }
485

486
_DEL_WAL:
33,451✔
487

488
  do {
489
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
35,213✔
490
    SyncIndex          snapshotVer = walGetSnapshotVer(pData->pWal);
35,213✔
491
    SyncIndex          walCommitVer = walGetCommittedVer(pData->pWal);
35,212✔
492
    SyncIndex          wallastVer = walGetLastVer(pData->pWal);
35,212✔
493
    if (lastApplyIndex <= walCommitVer) {
35,212!
494
      SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);
35,212✔
495

496
      if (snapshottingIndex == SYNC_INDEX_INVALID) {
35,212!
497
        atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
35,212✔
498
        pSyncNode->snapshottingTime = taosGetTimestampMs();
35,212✔
499

500
        code = walBeginSnapshot(pData->pWal, lastApplyIndex, logRetention);
35,212✔
501
        if (code == 0) {
35,213!
502
          sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
35,213✔
503
                  pSyncNode->snapshottingIndex, lastApplyIndex);
504
        } else {
505
          sNError(pSyncNode, "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64,
×
506
                  terrstr(), pSyncNode->snapshottingIndex, lastApplyIndex);
507
          atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
×
508
        }
509

510
      } else {
511
        sNTrace(pSyncNode, "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64,
×
512
                snapshottingIndex, lastApplyIndex);
513
      }
514
    }
515
  } while (0);
516

517
  syncNodeRelease(pSyncNode);
35,213✔
518
  TAOS_RETURN(code);
35,211✔
519
}
520

521
int32_t syncEndSnapshot(int64_t rid) {
35,335✔
522
  int32_t    code = 0;
35,335✔
523
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
35,335✔
524
  if (pSyncNode == NULL) {
35,336!
525
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
526
    if (terrno != 0) code = terrno;
×
527
    sError("sync end snapshot error");
×
528
    TAOS_RETURN(code);
×
529
  }
530

531
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
35,336✔
532
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
35,213✔
533
    code = walEndSnapshot(pData->pWal);
35,213✔
534
    if (code != 0) {
35,213!
535
      sNError(pSyncNode, "wal snapshot end error since:%s", tstrerror(code));
×
536
      syncNodeRelease(pSyncNode);
×
537
      TAOS_RETURN(code);
×
538
    } else {
539
      sNTrace(pSyncNode, "wal snapshot end, index:%" PRId64, atomic_load_64(&pSyncNode->snapshottingIndex));
35,213✔
540
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
35,213✔
541
    }
542
  }
543

544
  syncNodeRelease(pSyncNode);
35,336✔
545
  TAOS_RETURN(code);
35,336✔
546
}
547

548
bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
15,288,168✔
549
  if (pSyncNode == NULL) {
15,288,168!
550
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
551
    sError("sync ready for read error");
×
552
    return false;
×
553
  }
554

555
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
15,288,168!
556
    terrno = TSDB_CODE_SYN_NOT_LEADER;
102,643✔
557
    return false;
102,642✔
558
  }
559

560
  if (!pSyncNode->restoreFinish) {
15,185,525✔
561
    terrno = TSDB_CODE_SYN_RESTORING;
53,065✔
562
    return false;
53,063✔
563
  }
564

565
  return true;
15,132,460✔
566
}
567

568
bool syncIsReadyForRead(int64_t rid) {
13,539,878✔
569
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
13,539,878✔
570
  if (pSyncNode == NULL) {
13,545,021!
571
    sError("sync ready for read error");
×
572
    return false;
×
573
  }
574

575
  bool ready = syncNodeIsReadyForRead(pSyncNode);
13,545,021✔
576

577
  syncNodeRelease(pSyncNode);
13,543,355✔
578
  return ready;
13,540,735✔
579
}
580

581
#ifdef BUILD_NO_CALL
582
bool syncSnapshotSending(int64_t rid) {
583
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
584
  if (pSyncNode == NULL) {
585
    return false;
586
  }
587

588
  bool b = syncNodeSnapshotSending(pSyncNode);
589
  syncNodeRelease(pSyncNode);
590
  return b;
591
}
592

593
bool syncSnapshotRecving(int64_t rid) {
594
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
595
  if (pSyncNode == NULL) {
596
    return false;
597
  }
598

599
  bool b = syncNodeSnapshotRecving(pSyncNode);
600
  syncNodeRelease(pSyncNode);
601
  return b;
602
}
603
#endif
604

605
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
15,907✔
606
  if (pSyncNode->peersNum == 0) {
15,907✔
607
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
12,267✔
608
    return 0;
12,267✔
609
  }
610

611
  int32_t ret = 0;
3,640✔
612
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->replicaNum > 1) {
3,640✔
613
    SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
1,084✔
614
    if (pSyncNode->peersNum == 2) {
1,084✔
615
      SyncIndex matchIndex0 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
736✔
616
      SyncIndex matchIndex1 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[1]));
736✔
617
      if (matchIndex1 > matchIndex0) {
736✔
618
        newLeader = (pSyncNode->peersNodeInfo)[1];
40✔
619
      }
620
    }
621
    ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
1,084✔
622
  }
623

624
  return ret;
3,640✔
625
}
626

627
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
1,084✔
628
  if (pSyncNode->replicaNum == 1) {
1,084!
629
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
×
630
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
631
  }
632

633
  sNTrace(pSyncNode, "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
1,084!
634

635
  SRpcMsg rpcMsg = {0};
1,084✔
636
  TAOS_CHECK_RETURN(syncBuildLeaderTransfer(&rpcMsg, pSyncNode->vgId));
1,084!
637

638
  SyncLeaderTransfer* pMsg = rpcMsg.pCont;
1,084✔
639
  pMsg->newLeaderId.addr = SYNC_ADDR(&newLeader);
1,084✔
640
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
1,084✔
641
  pMsg->newNodeInfo = newLeader;
1,084✔
642

643
  int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false, NULL);
1,084✔
644
  rpcFreeCont(rpcMsg.pCont);
1,084✔
645
  return ret;
1,084✔
646
}
647

648
SSyncState syncGetState(int64_t rid) {
5,497,463✔
649
  SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
5,497,463✔
650

651
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
5,497,463✔
652
  if (pSyncNode != NULL) {
5,498,309✔
653
    state.state = pSyncNode->state;
5,498,294✔
654
    state.roleTimeMs = pSyncNode->roleTimeMs;
5,498,294✔
655
    state.startTimeMs = pSyncNode->startTime;
5,498,294✔
656
    state.restored = pSyncNode->restoreFinish;
5,498,294✔
657
    if (pSyncNode->vgId != 1) {
5,498,294✔
658
      state.canRead = syncNodeIsReadyForRead(pSyncNode);
1,745,875✔
659
    } else {
660
      state.canRead = state.restored;
3,752,419✔
661
    }
662
    /*
663
    double progress = 0;
664
    if(pSyncNode->pLogBuf->totalIndex > 0 && pSyncNode->pLogBuf->commitIndex > 0){
665
      progress = (double)pSyncNode->pLogBuf->commitIndex/(double)pSyncNode->pLogBuf->totalIndex;
666
      state.progress = (int32_t)(progress * 100);
667
    }
668
    else{
669
      state.progress = -1;
670
    }
671
    sDebug("vgId:%d, learner progress state, commitIndex:%" PRId64 " totalIndex:%" PRId64 ", "
672
            "progress:%lf, progress:%d",
673
          pSyncNode->vgId,
674
         pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->totalIndex, progress, state.progress);
675
    */
676
    state.term = raftStoreGetTerm(pSyncNode);
5,498,219✔
677
    syncNodeRelease(pSyncNode);
5,498,355✔
678
  }
679

680
  return state;
5,498,235✔
681
}
682

683
int32_t syncGetArbToken(int64_t rid, char* outToken) {
18,141✔
684
  int32_t    code = 0;
18,141✔
685
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
18,141✔
686
  if (pSyncNode == NULL) {
18,141!
687
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
688
    if (terrno != 0) code = terrno;
×
689
    TAOS_RETURN(code);
×
690
  }
691

692
  memset(outToken, 0, TSDB_ARB_TOKEN_SIZE);
18,141✔
693
  (void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
18,141✔
694
  strncpy(outToken, pSyncNode->arbToken, TSDB_ARB_TOKEN_SIZE);
18,141✔
695
  (void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
18,141✔
696

697
  syncNodeRelease(pSyncNode);
18,141✔
698
  TAOS_RETURN(code);
18,141✔
699
}
700

701
int32_t syncGetAssignedLogSynced(int64_t rid) {
×
702
  int32_t    code = 0;
×
703
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
704
  if (pSyncNode == NULL) {
×
705
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
706
    if (terrno != 0) code = terrno;
×
707
    TAOS_RETURN(code);
×
708
  }
709

710
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
×
711
    code = TSDB_CODE_VND_ARB_NOT_SYNCED;
×
712
    syncNodeRelease(pSyncNode);
×
713
    TAOS_RETURN(code);
×
714
  }
715

716
  bool isSync = pSyncNode->commitIndex >= pSyncNode->assignedCommitIndex;
×
717
  code = (isSync ? TSDB_CODE_SUCCESS : TSDB_CODE_VND_ARB_NOT_SYNCED);
×
718

719
  syncNodeRelease(pSyncNode);
×
720
  TAOS_RETURN(code);
×
721
}
722

723
int32_t syncUpdateArbTerm(int64_t rid, SyncTerm arbTerm) {
18✔
724
  int32_t    code = 0;
18✔
725
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
18✔
726
  if (pSyncNode == NULL) {
18!
727
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
728
    if (terrno != 0) code = terrno;
×
729
    TAOS_RETURN(code);
×
730
  }
731

732
  pSyncNode->arbTerm = TMAX(arbTerm, pSyncNode->arbTerm);
18✔
733
  syncNodeRelease(pSyncNode);
18✔
734
  TAOS_RETURN(code);
18✔
735
}
736

737
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
3,249,599✔
738
  if (!(pSyncNode->raftCfg.configIndexCount >= 1)) {
3,249,599!
739
    sError("vgId:%d, failed get snapshot config index, configIndexCount:%d", pSyncNode->vgId,
×
740
           pSyncNode->raftCfg.configIndexCount);
741
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
742
    return -2;
×
743
  }
744
  SyncIndex lastIndex = (pSyncNode->raftCfg.configIndexArr)[0];
3,249,599✔
745

746
  for (int32_t i = 0; i < pSyncNode->raftCfg.configIndexCount; ++i) {
6,515,520✔
747
    if ((pSyncNode->raftCfg.configIndexArr)[i] > lastIndex &&
3,265,921✔
748
        (pSyncNode->raftCfg.configIndexArr)[i] <= snapshotLastApplyIndex) {
16,325!
749
      lastIndex = (pSyncNode->raftCfg.configIndexArr)[i];
16,325✔
750
    }
751
  }
752
  sTrace("vgId:%d, sync get last config index, index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId,
3,249,599✔
753
         snapshotLastApplyIndex, lastIndex);
754

755
  return lastIndex;
3,249,597✔
756
}
757

758
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
41,439✔
759
  pEpSet->numOfEps = 0;
41,439✔
760

761
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
41,439✔
762
  if (pSyncNode == NULL) return;
41,439!
763

764
  int j = 0;
41,439✔
765
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
157,494✔
766
    if (pSyncNode->raftCfg.cfg.nodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) continue;
116,055✔
767
    SEp* pEp = &pEpSet->eps[j];
114,901✔
768
    tstrncpy(pEp->fqdn, pSyncNode->raftCfg.cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
114,901✔
769
    pEp->port = (pSyncNode->raftCfg.cfg.nodeInfo)[i].nodePort;
114,901✔
770
    pEpSet->numOfEps++;
114,901✔
771
    sDebug("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port);
114,901✔
772
    j++;
114,901✔
773
  }
774
  if (pEpSet->numOfEps > 0) {
41,439!
775
    pEpSet->inUse = (pSyncNode->raftCfg.cfg.myIndex + 1) % pEpSet->numOfEps;
41,439✔
776
    // pEpSet->inUse = 0;
777
  }
778
  epsetSort(pEpSet);
41,439✔
779

780
  sInfo("vgId:%d, sync get retry epset numOfEps:%d inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, pEpSet->inUse);
41,439!
781
  syncNodeRelease(pSyncNode);
41,439✔
782
}
783

784
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
10,529,162✔
785
  int32_t    code = 0;
10,529,162✔
786
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
10,529,162✔
787
  if (pSyncNode == NULL) {
10,529,404!
788
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
789
    if (terrno != 0) code = terrno;
×
790
    sError("sync propose error");
×
791
    TAOS_RETURN(code);
×
792
  }
793

794
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak, seq);
10,529,404✔
795
  syncNodeRelease(pSyncNode);
10,528,956✔
796
  return ret;
10,529,124✔
797
}
798

799
int32_t syncCheckMember(int64_t rid) {
×
800
  int32_t    code = 0;
×
801
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
802
  if (pSyncNode == NULL) {
×
803
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
804
    if (terrno != 0) code = terrno;
×
805
    sError("sync propose error");
×
806
    TAOS_RETURN(code);
×
807
  }
808

809
  if (pSyncNode->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) {
×
810
    syncNodeRelease(pSyncNode);
×
811
    return TSDB_CODE_SYN_WRONG_ROLE;
×
812
  }
813

814
  syncNodeRelease(pSyncNode);
×
815
  return 0;
×
816
}
817

818
int32_t syncIsCatchUp(int64_t rid) {
6,144✔
819
  int32_t    code = 0;
6,144✔
820
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
6,144✔
821
  if (pSyncNode == NULL) {
6,144!
822
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
823
    if (terrno != 0) code = terrno;
×
824
    sError("sync Node Acquire error since %d", errno);
×
825
    TAOS_RETURN(code);
×
826
  }
827

828
  int32_t isCatchUp = 0;
6,144✔
829
  if (pSyncNode->pLogBuf->totalIndex < 0 || pSyncNode->pLogBuf->commitIndex < 0 ||
6,144!
830
      pSyncNode->pLogBuf->totalIndex < pSyncNode->pLogBuf->commitIndex ||
1,386!
831
      pSyncNode->pLogBuf->totalIndex - pSyncNode->pLogBuf->commitIndex > SYNC_LEARNER_CATCHUP) {
1,386✔
832
    sInfo("vgId:%d, Not catch up, wait one second, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64,
5,834!
833
          pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex,
834
          pSyncNode->pLogBuf->matchIndex);
835
    isCatchUp = 0;
5,834✔
836
  } else {
837
    sInfo("vgId:%d, Catch up, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, pSyncNode->vgId,
310!
838
          pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->matchIndex);
839
    isCatchUp = 1;
310✔
840
  }
841

842
  syncNodeRelease(pSyncNode);
6,144✔
843
  return isCatchUp;
6,144✔
844
}
845

846
ESyncRole syncGetRole(int64_t rid) {
6,144✔
847
  int32_t    code = 0;
6,144✔
848
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
6,144✔
849
  if (pSyncNode == NULL) {
6,144!
850
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
851
    if (terrno != 0) code = terrno;
×
852
    sError("sync Node Acquire error since %d", errno);
×
853
    TAOS_RETURN(code);
×
854
  }
855

856
  ESyncRole role = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole;
6,144✔
857

858
  syncNodeRelease(pSyncNode);
6,144✔
859
  return role;
6,144✔
860
}
861

862
int64_t syncGetTerm(int64_t rid) {
5,852✔
863
  int32_t    code = 0;
5,852✔
864
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
5,852✔
865
  if (pSyncNode == NULL) {
5,852!
866
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
867
    if (terrno != 0) code = terrno;
×
868
    sError("sync Node Acquire error since %d", errno);
×
869
    TAOS_RETURN(code);
×
870
  }
871

872
  int64_t term = raftStoreGetTerm(pSyncNode);
5,852✔
873

874
  syncNodeRelease(pSyncNode);
5,852✔
875
  return term;
5,852✔
876
}
877

878
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
10,530,265✔
879
  int32_t code = 0;
10,530,265✔
880
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
10,530,265!
881
    code = TSDB_CODE_SYN_NOT_LEADER;
9,950✔
882
    sNWarn(pSyncNode, "sync propose not leader, type:%s", TMSG_INFO(pMsg->msgType));
9,950!
883
    TAOS_RETURN(code);
9,950✔
884
  }
885

886
  if (!pSyncNode->restoreFinish) {
10,520,315✔
887
    code = TSDB_CODE_SYN_PROPOSE_NOT_READY;
1✔
888
    sNWarn(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
1!
889
           TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
890
    TAOS_RETURN(code);
1✔
891
  }
892

893
  // heartbeat timeout
894
  if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER && syncNodeHeartbeatReplyTimeout(pSyncNode)) {
10,520,314!
895
    code = TSDB_CODE_SYN_PROPOSE_NOT_READY;
×
896
    sNError(pSyncNode, "failed to sync propose since heartbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
×
897
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
898
    TAOS_RETURN(code);
×
899
  }
900

901
  // optimized one replica
902
  if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
10,520,207✔
903
    SyncIndex retIndex;
904
    int32_t   code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
10,270,477✔
905
    if (code >= 0) {
10,270,067!
906
      pMsg->info.conn.applyIndex = retIndex;
10,270,170✔
907
      pMsg->info.conn.applyTerm = raftStoreGetTerm(pSyncNode);
10,270,170✔
908

909
      // after raft member change, need to handle 1->2 switching point
910
      // at this point, need to switch entry handling thread
911
      if (pSyncNode->replicaNum == 1) {
10,270,533✔
912
        sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
10,270,509!
913
               TMSG_INFO(pMsg->msgType));
914
        return 1;
10,270,370✔
915
      } else {
916
        sTrace("vgId:%d, propose optimized msg, return to normal, index:%" PRId64
24!
917
               " type:%s, "
918
               "handle:%p",
919
               pSyncNode->vgId, retIndex, TMSG_INFO(pMsg->msgType), pMsg->info.handle);
920
        return 0;
×
921
      }
922
    } else {
923
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
924
      sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
×
925
             TMSG_INFO(pMsg->msgType));
926
      TAOS_RETURN(code);
×
927
    }
928
  } else {
929
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
250,021✔
930
    uint64_t  seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
250,034✔
931
    SRpcMsg   rpcMsg = {0};
250,034✔
932
    int32_t   code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
250,034✔
933
    if (code != 0) {
250,034!
934
      sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
×
935
      code = syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
×
936
      TAOS_RETURN(code);
×
937
    }
938

939
    sNTrace(pSyncNode, "propose msg, type:%s", TMSG_INFO(pMsg->msgType));
250,034!
940
    code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
250,034✔
941
    if (code != 0) {
250,032✔
942
      sWarn("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
1,451!
943
      TAOS_CHECK_RETURN(syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum));
1,452✔
944
    }
945

946
    if (seq != NULL) *seq = seqNum;
249,914✔
947
    TAOS_RETURN(code);
249,914✔
948
  }
949
}
950

951
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
263,908✔
952
  pSyncTimer->pTimer = NULL;
263,908✔
953
  pSyncTimer->counter = 0;
263,908✔
954
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
263,908✔
955
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
263,908✔
956
  pSyncTimer->destId = destId;
263,908✔
957
  pSyncTimer->timeStamp = taosGetTimestampMs();
263,908✔
958
  atomic_store_64(&pSyncTimer->logicClock, 0);
263,908✔
959
  return 0;
263,909✔
960
}
961

962
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
2,500✔
963
  int32_t code = 0;
2,500✔
964
  int64_t tsNow = taosGetTimestampMs();
2,500✔
965
  if (syncIsInit()) {
2,500!
966
    SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid);
2,500✔
967
    if (pData == NULL) {
2,500!
968
      pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
2,500✔
969
      pData->rid = syncHbTimerDataAdd(pData);
2,500✔
970
    }
971
    pSyncTimer->hbDataRid = pData->rid;
2,500✔
972
    pSyncTimer->timeStamp = tsNow;
2,500✔
973

974
    pData->syncNodeRid = pSyncNode->rid;
2,500✔
975
    pData->pTimer = pSyncTimer;
2,500✔
976
    pData->destId = pSyncTimer->destId;
2,500✔
977
    pData->logicClock = pSyncTimer->logicClock;
2,500✔
978
    pData->execTime = tsNow + pSyncTimer->timerMS;
2,500✔
979

980
    sTrace("vgId:%d, start hb timer, rid:%" PRId64 " addr:%" PRId64 " at %d", pSyncNode->vgId, pData->rid,
2,500!
981
           pData->destId.addr, pSyncTimer->timerMS);
982

983
    bool stopped = taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, (void*)(pData->rid), syncEnv()->pTimerManager,
2,500✔
984
                                &pSyncTimer->pTimer);
2,500✔
985
    if (stopped) {
2,500!
986
      sError("vgId:%d, failed to reset hb timer success", pSyncNode->vgId);
×
987
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
988
    }
989
  } else {
990
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
991
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
×
992
  }
993
  return code;
2,500✔
994
}
995

996
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
25,742✔
997
  int32_t ret = 0;
25,742✔
998
  (void)atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
25,742✔
999
  bool stop = taosTmrStop(pSyncTimer->pTimer);
25,743✔
1000
  sDebug("vgId:%d, stop hb timer stop:%d", pSyncNode->vgId, stop);
25,744✔
1001
  pSyncTimer->pTimer = NULL;
25,744✔
1002
  syncHbTimerDataRemove(pSyncTimer->hbDataRid);
25,744✔
1003
  pSyncTimer->hbDataRid = -1;
25,743✔
1004
  return ret;
25,743✔
1005
}
1006

1007
int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
15,908✔
1008
  int32_t code = 0;
15,908✔
1009
  if (pNode->pLogStore == NULL) {
15,908!
1010
    sError("vgId:%d, log store not created", pNode->vgId);
×
1011
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1012
  }
1013
  if (pNode->pFsm == NULL) {
15,908!
1014
    sError("vgId:%d, pFsm not registered", pNode->vgId);
×
1015
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1016
  }
1017
  if (pNode->pFsm->FpGetSnapshotInfo == NULL) {
15,908!
1018
    sError("vgId:%d, FpGetSnapshotInfo not registered", pNode->vgId);
×
1019
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1020
  }
1021
  SSnapshot snapshot = {0};
15,908✔
1022
  // TODO check return value
1023
  (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
15,908✔
1024

1025
  SyncIndex commitIndex = snapshot.lastApplyIndex;
15,908✔
1026
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
15,908✔
1027
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
15,908✔
1028
  if ((lastVer < commitIndex || firstVer > commitIndex + 1) || pNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
15,908!
1029
    if ((code = pNode->pLogStore->syncLogRestoreFromSnapshot(pNode->pLogStore, commitIndex)) != 0) {
1!
1030
      sError("vgId:%d, failed to restore log store from snapshot since %s. lastVer:%" PRId64 ", snapshotVer:%" PRId64,
×
1031
             pNode->vgId, terrstr(), lastVer, commitIndex);
1032
      TAOS_RETURN(code);
×
1033
    }
1034
  }
1035
  TAOS_RETURN(code);
15,908✔
1036
}
1037

1038
// open/close --------------
1039
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
15,903✔
1040
  int32_t    code = 0;
15,903✔
1041
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
15,903✔
1042
  if (pSyncNode == NULL) {
15,907!
1043
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1044
    goto _error;
×
1045
  }
1046

1047
  if (!taosDirExist((char*)(pSyncInfo->path))) {
15,907✔
1048
    if (taosMkDir(pSyncInfo->path) != 0) {
12,830!
1049
      terrno = TAOS_SYSTEM_ERROR(errno);
×
1050
      sError("vgId:%d, failed to create dir:%s since %s", pSyncInfo->vgId, pSyncInfo->path, terrstr());
×
1051
      goto _error;
×
1052
    }
1053
  }
1054

1055
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
15,906✔
1056
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
15,906✔
1057
           TD_DIRSEP);
1058
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
15,906✔
1059

1060
  if (!taosCheckExistFile(pSyncNode->configPath)) {
15,906✔
1061
    // create a new raft config file
1062
    sInfo("vgId:%d, create a new raft config file", pSyncInfo->vgId);
12,830!
1063
    pSyncNode->vgId = pSyncInfo->vgId;
12,830✔
1064
    pSyncNode->raftCfg.isStandBy = pSyncInfo->isStandBy;
12,830✔
1065
    pSyncNode->raftCfg.snapshotStrategy = pSyncInfo->snapshotStrategy;
12,830✔
1066
    pSyncNode->raftCfg.lastConfigIndex = pSyncInfo->syncCfg.lastIndex;
12,830✔
1067
    pSyncNode->raftCfg.batchSize = pSyncInfo->batchSize;
12,830✔
1068
    pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
12,830✔
1069
    pSyncNode->raftCfg.configIndexCount = 1;
12,830✔
1070
    pSyncNode->raftCfg.configIndexArr[0] = -1;
12,830✔
1071

1072
    if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
12,830!
1073
      terrno = code;
×
1074
      sError("vgId:%d, failed to create sync cfg file", pSyncNode->vgId);
×
1075
      goto _error;
×
1076
    }
1077
  } else {
1078
    // update syncCfg by raft_config.json
1079
    if ((code = syncReadCfgFile(pSyncNode)) != 0) {
3,070!
1080
      terrno = code;
×
1081
      sError("vgId:%d, failed to read sync cfg file", pSyncNode->vgId);
×
1082
      goto _error;
×
1083
    }
1084

1085
    if (vnodeVersion > pSyncNode->raftCfg.cfg.changeVersion) {
3,077✔
1086
      if (pSyncInfo->syncCfg.totalReplicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) {
1,993!
1087
        sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
1,665!
1088
        pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
1,665✔
1089
        if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
1,665!
1090
          terrno = code;
×
1091
          sError("vgId:%d, failed to write sync cfg file", pSyncNode->vgId);
×
1092
          goto _error;
×
1093
        }
1094
      } else {
1095
        sInfo("vgId:%d, use sync config from sync cfg file", pSyncNode->vgId);
328!
1096
        pSyncInfo->syncCfg = pSyncNode->raftCfg.cfg;
328✔
1097
      }
1098
    } else {
1099
      sInfo("vgId:%d, skip save sync cfg file since request ver:%d <= file ver:%d", pSyncNode->vgId, vnodeVersion,
1,084!
1100
            pSyncInfo->syncCfg.changeVersion);
1101
    }
1102
  }
1103

1104
  // init by SSyncInfo
1105
  pSyncNode->vgId = pSyncInfo->vgId;
15,908✔
1106
  SSyncCfg* pCfg = &pSyncNode->raftCfg.cfg;
15,908✔
1107
  bool      updated = false;
15,908✔
1108
  sInfo("vgId:%d, start to open sync node, totalReplicaNum:%d replicaNum:%d selfIndex:%d", pSyncNode->vgId,
15,908!
1109
        pCfg->totalReplicaNum, pCfg->replicaNum, pCfg->myIndex);
1110
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
38,917✔
1111
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
23,009✔
1112
    if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) {
23,009!
1113
      updated = true;
×
1114
    }
1115
    sInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort,
23,009!
1116
          pNode->nodeId, pNode->clusterId);
1117
  }
1118

1119
  if (vnodeVersion > pSyncInfo->syncCfg.changeVersion) {
15,908✔
1120
    if (updated) {
2,007!
1121
      sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId);
×
1122
      if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
×
1123
        terrno = code;
×
1124
        sError("vgId:%d, failed to write sync cfg file on dnode info updated", pSyncNode->vgId);
×
1125
        goto _error;
×
1126
      }
1127
    }
1128
  }
1129

1130
  pSyncNode->pWal = pSyncInfo->pWal;
15,908✔
1131
  pSyncNode->msgcb = pSyncInfo->msgcb;
15,908✔
1132
  pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
15,908✔
1133
  pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
15,908✔
1134
  pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
15,908✔
1135

1136
  // create raft log ring buffer
1137
  code = syncLogBufferCreate(&pSyncNode->pLogBuf);
15,908✔
1138
  if (pSyncNode->pLogBuf == NULL) {
15,907!
1139
    sError("failed to init sync log buffer since %s. vgId:%d", tstrerror(code), pSyncNode->vgId);
×
1140
    goto _error;
×
1141
  }
1142

1143
  // init replicaNum, replicasId
1144
  pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
15,907✔
1145
  pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
15,907✔
1146
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
38,916✔
1147
    if (syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]) ==
23,008!
1148
        false) {
1149
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
×
1150
      goto _error;
×
1151
    }
1152
  }
1153

1154
  // init internal
1155
  pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
15,908✔
1156
  pSyncNode->myRaftId = pSyncNode->replicasId[pSyncNode->raftCfg.cfg.myIndex];
15,908✔
1157

1158
  // init peersNum, peers, peersId
1159
  pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
15,908✔
1160
  int32_t j = 0;
15,908✔
1161
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
38,917✔
1162
    if (i != pSyncNode->raftCfg.cfg.myIndex) {
23,009✔
1163
      pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
7,101✔
1164
      pSyncNode->peersId[j] = pSyncNode->replicasId[i];
7,101✔
1165
      syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
7,101✔
1166
      j++;
7,101✔
1167
    }
1168
  }
1169

1170
  pSyncNode->arbTerm = -1;
15,908✔
1171
  (void)taosThreadMutexInit(&pSyncNode->arbTokenMutex, NULL);
15,908✔
1172
  syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncInfo->vgId, pSyncNode->arbToken);
15,908✔
1173
  sInfo("vgId:%d, generate arb token:%s", pSyncNode->vgId, pSyncNode->arbToken);
15,908!
1174

1175
  // init raft algorithm
1176
  pSyncNode->pFsm = pSyncInfo->pFsm;
15,908✔
1177
  pSyncInfo->pFsm = NULL;
15,908✔
1178
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
15,908✔
1179
  pSyncNode->leaderCache = EMPTY_RAFT_ID;
15,908✔
1180

1181
  // init life cycle outside
1182

1183
  // TLA+ Spec
1184
  // InitHistoryVars == /\ elections = {}
1185
  //                    /\ allLogs   = {}
1186
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
1187
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
1188
  //                   /\ state       = [i \in Server |-> Follower]
1189
  //                   /\ votedFor    = [i \in Server |-> Nil]
1190
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
1191
  //                      /\ votesGranted   = [i \in Server |-> {}]
1192
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
1193
  // \* leader does not send itself messages. It's still easier to include these
1194
  // \* in the functions.
1195
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
1196
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
1197
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
1198
  //                /\ commitIndex  = [i \in Server |-> 0]
1199
  // Init == /\ messages = [m \in {} |-> 0]
1200
  //         /\ InitHistoryVars
1201
  //         /\ InitServerVars
1202
  //         /\ InitCandidateVars
1203
  //         /\ InitLeaderVars
1204
  //         /\ InitLogVars
1205
  //
1206

1207
  // init TLA+ server vars
1208
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
15,908✔
1209
  pSyncNode->roleTimeMs = taosGetTimestampMs();
15,908✔
1210
  if ((code = raftStoreOpen(pSyncNode)) != 0) {
15,908!
1211
    terrno = code;
×
1212
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
×
1213
    goto _error;
×
1214
  }
1215

1216
  // init TLA+ candidate vars
1217
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
15,908✔
1218
  if (pSyncNode->pVotesGranted == NULL) {
15,908!
1219
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
×
1220
    goto _error;
×
1221
  }
1222
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
15,908✔
1223
  if (pSyncNode->pVotesRespond == NULL) {
15,908!
1224
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
×
1225
    goto _error;
×
1226
  }
1227

1228
  // init TLA+ leader vars
1229
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
15,908✔
1230
  if (pSyncNode->pNextIndex == NULL) {
15,908!
1231
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
×
1232
    goto _error;
×
1233
  }
1234
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
15,908✔
1235
  if (pSyncNode->pMatchIndex == NULL) {
15,908!
1236
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
×
1237
    goto _error;
×
1238
  }
1239

1240
  // init TLA+ log vars
1241
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
15,908✔
1242
  if (pSyncNode->pLogStore == NULL) {
15,908!
1243
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
×
1244
    goto _error;
×
1245
  }
1246

1247
  SyncIndex commitIndex = SYNC_INDEX_INVALID;
15,908✔
1248
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
15,908!
1249
    SSnapshot snapshot = {0};
15,906✔
1250
    // TODO check return value
1251
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
15,906✔
1252
    if (snapshot.lastApplyIndex > commitIndex) {
15,908✔
1253
      commitIndex = snapshot.lastApplyIndex;
1,608✔
1254
      sNTrace(pSyncNode, "reset commit index by snapshot");
1,608✔
1255
    }
1256
    pSyncNode->fsmState = snapshot.state;
15,908✔
1257
    if (pSyncNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
15,908!
1258
      sError("vgId:%d, fsm state is incomplete.", pSyncNode->vgId);
×
1259
      if (pSyncNode->replicaNum == 1) {
×
1260
        terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1261
        goto _error;
×
1262
      }
1263
    }
1264
  }
1265
  pSyncNode->commitIndex = commitIndex;
15,910✔
1266
  sInfo("vgId:%d, sync node commitIndex initialized as %" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
15,910✔
1267

1268
  // restore log store on need
1269
  if ((code = syncNodeLogStoreRestoreOnNeed(pSyncNode)) < 0) {
15,910!
1270
    terrno = code;
×
1271
    sError("vgId:%d, failed to restore log store since %s.", pSyncNode->vgId, terrstr());
×
1272
    goto _error;
×
1273
  }
1274

1275
  // timer ms init
1276
  pSyncNode->pingBaseLine = PING_TIMER_MS;
15,908✔
1277
  pSyncNode->electBaseLine = tsElectInterval;
15,908✔
1278
  pSyncNode->hbBaseLine = tsHeartbeatInterval;
15,908✔
1279

1280
  // init ping timer
1281
  pSyncNode->pPingTimer = NULL;
15,908✔
1282
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
15,908✔
1283
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
15,908✔
1284
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
15,908✔
1285
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
15,908✔
1286
  pSyncNode->pingTimerCounter = 0;
15,908✔
1287

1288
  // init elect timer
1289
  pSyncNode->pElectTimer = NULL;
15,908✔
1290
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
15,908✔
1291
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
15,908✔
1292
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
15,908✔
1293
  pSyncNode->electTimerCounter = 0;
15,908✔
1294

1295
  // init heartbeat timer
1296
  pSyncNode->pHeartbeatTimer = NULL;
15,908✔
1297
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
15,908✔
1298
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
15,908✔
1299
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
15,908✔
1300
#ifdef BUILD_NO_CALL
1301
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
1302
#endif
1303
  pSyncNode->heartbeatTimerCounter = 0;
15,908✔
1304

1305
  // init peer heartbeat timer
1306
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
254,527✔
1307
    if ((code = syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i])) != 0) {
238,619!
1308
      errno = code;
×
1309
      goto _error;
×
1310
    }
1311
  }
1312

1313
  // tools
1314
  if ((code = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS, &pSyncNode->pSyncRespMgr)) != 0) {
15,908!
UNCOV
1315
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
×
1316
    goto _error;
×
1317
  }
1318
  if (pSyncNode->pSyncRespMgr == NULL) {
15,908!
1319
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
×
1320
    goto _error;
×
1321
  }
1322

1323
  // restore state
1324
  pSyncNode->restoreFinish = false;
15,908✔
1325

1326
  // snapshot senders
1327
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
254,394✔
1328
    SSyncSnapshotSender* pSender = NULL;
238,491✔
1329
    code = snapshotSenderCreate(pSyncNode, i, &pSender);
238,491✔
1330
    if (pSender == NULL) return NULL;
238,483!
1331

1332
    pSyncNode->senders[i] = pSender;
238,483✔
1333
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
238,483✔
1334
  }
1335

1336
  // snapshot receivers
1337
  code = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID, &pSyncNode->pNewNodeReceiver);
15,903✔
1338
  if (pSyncNode->pNewNodeReceiver == NULL) return NULL;
15,908!
1339
  sRDebug(pSyncNode->pNewNodeReceiver, "snapshot receiver create while open sync node, data:%p",
15,908✔
1340
          pSyncNode->pNewNodeReceiver);
1341

1342
  // is config changing
1343
  pSyncNode->changing = false;
15,908✔
1344

1345
  // replication mgr
1346
  if ((code = syncNodeLogReplInit(pSyncNode)) < 0) {
15,908!
1347
    terrno = code;
×
1348
    sError("vgId:%d, failed to init repl mgr since %s.", pSyncNode->vgId, terrstr());
×
1349
    goto _error;
×
1350
  }
1351

1352
  // peer state
1353
  if ((code = syncNodePeerStateInit(pSyncNode)) < 0) {
15,908!
1354
    terrno = code;
×
1355
    sError("vgId:%d, failed to init peer stat since %s.", pSyncNode->vgId, terrstr());
×
1356
    goto _error;
×
1357
  }
1358

1359
  //
1360
  // min match index
1361
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
15,908✔
1362

1363
  // start in syncNodeStart
1364
  // start raft
1365

1366
  int64_t timeNow = taosGetTimestampMs();
15,908✔
1367
  pSyncNode->startTime = timeNow;
15,908✔
1368
  pSyncNode->lastReplicateTime = timeNow;
15,908✔
1369

1370
  // snapshotting
1371
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
15,908✔
1372

1373
  // init log buffer
1374
  if ((code = syncLogBufferInit(pSyncNode->pLogBuf, pSyncNode)) < 0) {
15,908!
1375
    terrno = code;
×
1376
    sError("vgId:%d, failed to init sync log buffer since %s", pSyncNode->vgId, terrstr());
×
1377
    goto _error;
×
1378
  }
1379

1380
  pSyncNode->isStart = true;
15,908✔
1381
  pSyncNode->electNum = 0;
15,908✔
1382
  pSyncNode->becomeLeaderNum = 0;
15,908✔
1383
  pSyncNode->becomeAssignedLeaderNum = 0;
15,908✔
1384
  pSyncNode->configChangeNum = 0;
15,908✔
1385
  pSyncNode->hbSlowNum = 0;
15,908✔
1386
  pSyncNode->hbrSlowNum = 0;
15,908✔
1387
  pSyncNode->tmrRoutineNum = 0;
15,908✔
1388

1389
  sNInfo(pSyncNode, "sync node opened, node:%p electInterval:%d heartbeatInterval:%d heartbeatTimeout:%d", pSyncNode,
15,908!
1390
         tsElectInterval, tsHeartbeatInterval, tsHeartbeatTimeout);
1391
  return pSyncNode;
15,908✔
1392

1393
_error:
×
1394
  if (pSyncInfo->pFsm) {
×
1395
    taosMemoryFree(pSyncInfo->pFsm);
×
1396
    pSyncInfo->pFsm = NULL;
×
1397
  }
1398
  syncNodeClose(pSyncNode);
×
1399
  pSyncNode = NULL;
×
1400
  return NULL;
×
1401
}
1402

1403
#ifdef BUILD_NO_CALL
1404
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
1405
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
1406
    SSnapshot snapshot = {0};
1407
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
1408
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
1409
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
1410
    }
1411
  }
1412
}
1413
#endif
1414

1415
int32_t syncNodeRestore(SSyncNode* pSyncNode) {
15,907✔
1416
  int32_t code = 0;
15,907✔
1417
  if (pSyncNode->pLogStore == NULL) {
15,907!
1418
    sError("vgId:%d, log store not created", pSyncNode->vgId);
×
1419
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1420
  }
1421
  if (pSyncNode->pLogBuf == NULL) {
15,907!
1422
    sError("vgId:%d, ring log buffer not created", pSyncNode->vgId);
×
1423
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1424
  }
1425

1426
  (void)taosThreadMutexLock(&pSyncNode->pLogBuf->mutex);
15,907✔
1427
  SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
15,907✔
1428
  SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore);
15,907✔
1429
  SyncIndex endIndex = pSyncNode->pLogBuf->endIndex;
15,907✔
1430
  (void)taosThreadMutexUnlock(&pSyncNode->pLogBuf->mutex);
15,907✔
1431

1432
  if (lastVer != -1 && endIndex != lastVer + 1) {
15,907!
UNCOV
1433
    code = TSDB_CODE_WAL_LOG_INCOMPLETE;
×
UNCOV
1434
    sWarn("vgId:%d, failed to restore sync node since %s. expected lastLogIndex:%" PRId64 ", lastVer:%" PRId64 "",
×
1435
          pSyncNode->vgId, terrstr(), endIndex - 1, lastVer);
1436
    // TAOS_RETURN(code);
1437
  }
1438

1439
  // if (endIndex != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
1440
  pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);
15,907✔
1441
  sInfo("vgId:%d, restore sync until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
15,907!
1442

1443
  if (pSyncNode->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
31,814!
1444
      (code = syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex)) < 0) {
15,907✔
1445
    TAOS_RETURN(code);
×
1446
  }
1447

1448
  TAOS_RETURN(code);
15,907✔
1449
}
1450

1451
int32_t syncNodeStart(SSyncNode* pSyncNode) {
15,907✔
1452
  // start raft
1453
  sInfo("vgId:%d, begin to start sync node", pSyncNode->vgId);
15,907!
1454
  if (pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
15,907✔
1455
    syncNodeBecomeLearner(pSyncNode, "first start");
311✔
1456
  } else {
1457
    if (pSyncNode->replicaNum == 1) {
15,596✔
1458
      raftStoreNextTerm(pSyncNode);
12,450✔
1459
      syncNodeBecomeLeader(pSyncNode, "one replica start");
12,450✔
1460

1461
      // Raft 3.6.2 Committing entries from previous terms
1462
      TAOS_CHECK_RETURN(syncNodeAppendNoop(pSyncNode));
12,450!
1463
    } else {
1464
      syncNodeBecomeFollower(pSyncNode, "first start");
3,146✔
1465
    }
1466
  }
1467

1468
  int32_t ret = 0;
15,907✔
1469
  ret = syncNodeStartPingTimer(pSyncNode);
15,907✔
1470
  if (ret != 0) {
15,907!
1471
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, tstrerror(ret));
×
1472
  }
1473
  sInfo("vgId:%d, sync node started", pSyncNode->vgId);
15,907!
1474
  return ret;
15,907✔
1475
}
1476

1477
#ifdef BUILD_NO_CALL
1478
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
1479
  // state change
1480
  int32_t code = 0;
1481
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
1482
  pSyncNode->roleTimeMs = taosGetTimestampMs();
1483
  // TODO check return value
1484
  TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1485

1486
  // reset elect timer, long enough
1487
  int32_t electMS = TIMER_MAX_MS;
1488
  code = syncNodeRestartElectTimer(pSyncNode, electMS);
1489
  if (code < 0) {
1490
    sError("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, terrstr());
1491
    return -1;
1492
  }
1493

1494
  code = syncNodeStartPingTimer(pSyncNode);
1495
  if (code < 0) {
1496
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr());
1497
    return -1;
1498
  }
1499
  return code;
1500
}
1501
#endif
1502

1503
void syncNodePreClose(SSyncNode* pSyncNode) {
15,906✔
1504
  int32_t code = 0;
15,906✔
1505
  if (pSyncNode == NULL) {
15,906!
1506
    sError("failed to pre close sync node since sync node is null");
×
1507
    return;
×
1508
  }
1509
  if (pSyncNode->pFsm == NULL) {
15,906!
1510
    sError("failed to pre close sync node since fsm is null");
×
1511
    return;
×
1512
  }
1513
  if (pSyncNode->pFsm->FpApplyQueueItems == NULL) {
15,906!
1514
    sError("failed to pre close sync node since FpApplyQueueItems is null");
×
1515
    return;
×
1516
  }
1517

1518
  // stop elect timer
1519
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
15,906!
1520
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1521
    return;
×
1522
  }
1523

1524
  // stop heartbeat timer
1525
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
15,904!
1526
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
1527
    return;
×
1528
  }
1529

1530
  // stop ping timer
1531
  if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) {
15,904!
1532
    sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code));
×
1533
    return;
×
1534
  }
1535

1536
  // clean rsp
1537
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
15,907✔
1538
}
1539

1540
void syncNodePostClose(SSyncNode* pSyncNode) {
13,901✔
1541
  if (pSyncNode->pNewNodeReceiver != NULL) {
13,901!
1542
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
13,901!
1543
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
1544
    }
1545

1546
    sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
13,901✔
1547
           pSyncNode->pNewNodeReceiver);
1548
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
13,901✔
1549
    pSyncNode->pNewNodeReceiver = NULL;
13,900✔
1550
  }
1551
}
13,900✔
1552

1553
void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }
2,496✔
1554

1555
void syncNodeClose(SSyncNode* pSyncNode) {
15,906✔
1556
  int32_t code = 0;
15,906✔
1557
  if (pSyncNode == NULL) return;
15,906!
1558
  sNInfo(pSyncNode, "sync close, node:%p", pSyncNode);
15,906!
1559

1560
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
15,905✔
1561

1562
  if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) {
15,907!
1563
    sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code));
×
1564
    return;
×
1565
  }
1566
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
15,907!
1567
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1568
    return;
×
1569
  }
1570
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
15,907!
1571
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
1572
    return;
×
1573
  }
1574
  syncNodeLogReplDestroy(pSyncNode);
15,907✔
1575

1576
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
15,907✔
1577
  pSyncNode->pSyncRespMgr = NULL;
15,907✔
1578
  voteGrantedDestroy(pSyncNode->pVotesGranted);
15,907✔
1579
  pSyncNode->pVotesGranted = NULL;
15,907✔
1580
  votesRespondDestory(pSyncNode->pVotesRespond);
15,907✔
1581
  pSyncNode->pVotesRespond = NULL;
15,907✔
1582
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
15,907✔
1583
  pSyncNode->pNextIndex = NULL;
15,907✔
1584
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
15,907✔
1585
  pSyncNode->pMatchIndex = NULL;
15,907✔
1586
  logStoreDestory(pSyncNode->pLogStore);
15,907✔
1587
  pSyncNode->pLogStore = NULL;
15,906✔
1588
  syncLogBufferDestroy(pSyncNode->pLogBuf);
15,906✔
1589
  pSyncNode->pLogBuf = NULL;
15,907✔
1590

1591
  (void)taosThreadMutexDestroy(&pSyncNode->arbTokenMutex);
15,907✔
1592

1593
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
254,454✔
1594
    if (pSyncNode->senders[i] != NULL) {
238,547✔
1595
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", pSyncNode->vgId, pSyncNode->senders[i]);
238,543✔
1596

1597
      if (snapshotSenderIsStart(pSyncNode->senders[i])) {
238,543!
1598
        snapshotSenderStop(pSyncNode->senders[i], false);
×
1599
      }
1600

1601
      snapshotSenderDestroy(pSyncNode->senders[i]);
238,546✔
1602
      pSyncNode->senders[i] = NULL;
238,559✔
1603
    }
1604
  }
1605

1606
  if (pSyncNode->pNewNodeReceiver != NULL) {
15,907✔
1607
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
2,006!
1608
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
1609
    }
1610

1611
    sDebug("vgId:%d, snapshot receiver destroy while close, data:%p", pSyncNode->vgId, pSyncNode->pNewNodeReceiver);
2,006✔
1612
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
2,006✔
1613
    pSyncNode->pNewNodeReceiver = NULL;
2,006✔
1614
  }
1615

1616
  if (pSyncNode->pFsm != NULL) {
15,907✔
1617
    taosMemoryFree(pSyncNode->pFsm);
15,906✔
1618
  }
1619

1620
  raftStoreClose(pSyncNode);
15,908✔
1621

1622
  taosMemoryFree(pSyncNode);
15,907✔
1623
}
1624

1625
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->raftCfg.snapshotStrategy; }
×
1626

1627
// timer control --------------
1628
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
15,907✔
1629
  int32_t code = 0;
15,907✔
1630
  if (syncIsInit()) {
15,907!
1631
    bool stopped = taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, (void*)pSyncNode->rid,
15,907✔
1632
                                syncEnv()->pTimerManager, &pSyncNode->pPingTimer);
15,907✔
1633
    if (stopped) {
15,907!
1634
      sError("vgId:%d, failed to reset ping timer, ms:%d", pSyncNode->vgId, pSyncNode->pingTimerMS);
×
1635
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1636
    }
1637
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
15,907✔
1638
  } else {
1639
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
×
1640
  }
1641
  return code;
15,907✔
1642
}
1643

1644
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
31,811✔
1645
  int32_t code = 0;
31,811✔
1646
  (void)atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
31,811✔
1647
  bool stop = taosTmrStop(pSyncNode->pPingTimer);
31,814✔
1648
  sDebug("vgId:%d, stop ping timer, stop:%d", pSyncNode->vgId, stop);
31,814✔
1649
  pSyncNode->pPingTimer = NULL;
31,814✔
1650
  return code;
31,814✔
1651
}
1652

1653
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
571,748✔
1654
  int32_t code = 0;
571,748✔
1655
  if (syncIsInit()) {
571,748!
1656
    pSyncNode->electTimerMS = ms;
571,748✔
1657

1658
    int64_t execTime = taosGetTimestampMs() + ms;
571,748✔
1659
    atomic_store_64(&(pSyncNode->electTimerParam.executeTime), execTime);
571,748✔
1660
    atomic_store_64(&(pSyncNode->electTimerParam.logicClock), pSyncNode->electTimerLogicClock);
571,749✔
1661
    pSyncNode->electTimerParam.pSyncNode = pSyncNode;
571,748✔
1662
    pSyncNode->electTimerParam.pData = NULL;
571,748✔
1663

1664
    bool stopped = taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid),
571,748✔
1665
                                syncEnv()->pTimerManager, &pSyncNode->pElectTimer);
571,748✔
1666
    if (stopped) sError("vgId:%d, failed to reset elect timer, ms:%d", pSyncNode->vgId, ms);
571,749!
1667
  } else {
1668
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
×
1669
  }
1670
  return code;
571,749✔
1671
}
1672

1673
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
617,063✔
1674
  int32_t code = 0;
617,063✔
1675
  (void)atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
617,063✔
1676
  bool stop = taosTmrStop(pSyncNode->pElectTimer);
617,063✔
1677
  sDebug("vgId:%d, stop elect timer, stop:%d", pSyncNode->vgId, stop);
617,058✔
1678
  pSyncNode->pElectTimer = NULL;
617,059✔
1679

1680
  return code;
617,059✔
1681
}
1682

1683
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
571,749✔
1684
  int32_t ret = 0;
571,749✔
1685
  TAOS_CHECK_RETURN(syncNodeStopElectTimer(pSyncNode));
571,749!
1686
  TAOS_CHECK_RETURN(syncNodeStartElectTimer(pSyncNode, ms));
571,748!
1687
  return ret;
571,749✔
1688
}
1689

1690
void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
571,747✔
1691
  int32_t code = 0;
571,747✔
1692
  int32_t electMS;
1693

1694
  if (pSyncNode->raftCfg.isStandBy) {
571,747!
1695
    electMS = TIMER_MAX_MS;
×
1696
  } else {
1697
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
571,747✔
1698
  }
1699

1700
  // TODO check return value
1701
  if ((code = syncNodeRestartElectTimer(pSyncNode, electMS)) != 0) {
571,749!
1702
    sError("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1703
    return;
×
1704
  };
1705

1706
  sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
571,749!
1707
          electMS);
1708
}
1709

1710
#ifdef BUILD_NO_CALL
1711
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
1712
  int32_t code = 0;
1713
  if (syncIsInit()) {
1714
    TAOS_CHECK_RETURN(taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, (void*)pSyncNode->rid,
1715
                                   syncEnv()->pTimerManager, &pSyncNode->pHeartbeatTimer));
1716
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
1717
  } else {
1718
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1719
  }
1720

1721
  sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
1722
  return code;
1723
}
1724
#endif
1725

1726
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
15,186✔
1727
  int32_t ret = 0;
15,186✔
1728

1729
#if 0
1730
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1731
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
1732
#endif
1733

1734
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
17,686✔
1735
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
2,500✔
1736
    if (pSyncTimer != NULL) {
2,500!
1737
      TAOS_CHECK_RETURN(syncHbTimerStart(pSyncNode, pSyncTimer));
2,500!
1738
    }
1739
  }
1740

1741
  return ret;
15,186✔
1742
}
1743

1744
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
38,869✔
1745
  int32_t code = 0;
38,869✔
1746

1747
#if 0
1748
  TAOS_CHECK_RETURN(atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1));
1749
  bool stop = taosTmrStop(pSyncNode->pHeartbeatTimer);
1750
  sDebug("vgId:%d, stop heartbeat timer, stop:%d", pSyncNode->vgId, stop);
1751
  pSyncNode->pHeartbeatTimer = NULL;
1752
#endif
1753

1754
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
64,612✔
1755
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
25,744✔
1756
    if (pSyncTimer != NULL) {
25,741!
1757
      TAOS_CHECK_RETURN(syncHbTimerStop(pSyncNode, pSyncTimer));
25,742!
1758
    }
1759
  }
1760

1761
  return code;
38,868✔
1762
}
1763

1764
#ifdef BUILD_NO_CALL
1765
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
1766
  // TODO check return value
1767
  int32_t code = 0;
1768
  TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1769
  TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
1770
  return 0;
1771
}
1772
#endif
1773

1774
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pNode, SRpcMsg* pMsg) {
5,762,163✔
1775
  SEpSet* epSet = NULL;
5,762,163✔
1776
  for (int32_t i = 0; i < pNode->peersNum; ++i) {
6,182,636✔
1777
    if (destRaftId->addr == pNode->peersId[i].addr) {
6,182,572✔
1778
      epSet = &pNode->peersEpset[i];
5,762,099✔
1779
      break;
5,762,099✔
1780
    }
1781
  }
1782

1783
  int32_t code = -1;
5,762,163✔
1784
  if (pNode->syncSendMSg != NULL && epSet != NULL) {
5,762,163!
1785
    syncUtilMsgHtoN(pMsg->pCont);
5,762,103✔
1786
    pMsg->info.noResp = 1;
5,762,092✔
1787
    code = pNode->syncSendMSg(epSet, pMsg);
5,762,092✔
1788
  }
1789

1790
  if (code < 0) {
5,762,188✔
1791
    sError("vgId:%d, failed to send sync msg since %s. epset:%p dnode:%d addr:%" PRId64, pNode->vgId, tstrerror(code),
82!
1792
           epSet, DID(destRaftId), destRaftId->addr);
1793
    rpcFreeCont(pMsg->pCont);
82✔
1794
  }
1795

1796
  TAOS_RETURN(code);
5,762,188✔
1797
}
1798

1799
inline bool syncNodeInConfig(SSyncNode* pNode, const SSyncCfg* pCfg) {
2,457✔
1800
  bool b1 = false;
2,457✔
1801
  bool b2 = false;
2,457✔
1802

1803
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
2,915!
1804
    if (strcmp(pCfg->nodeInfo[i].nodeFqdn, pNode->myNodeInfo.nodeFqdn) == 0 &&
2,915!
1805
        pCfg->nodeInfo[i].nodePort == pNode->myNodeInfo.nodePort) {
2,915✔
1806
      b1 = true;
2,457✔
1807
      break;
2,457✔
1808
    }
1809
  }
1810

1811
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
2,915!
1812
    SRaftId raftId = {
2,915✔
1813
        .addr = SYNC_ADDR(&pCfg->nodeInfo[i]),
2,915✔
1814
        .vgId = pNode->vgId,
2,915✔
1815
    };
1816

1817
    if (syncUtilSameId(&raftId, &pNode->myRaftId)) {
2,915✔
1818
      b2 = true;
2,457✔
1819
      break;
2,457✔
1820
    }
1821
  }
1822

1823
  if (b1 != b2) {
2,457!
1824
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1825
    return false;
×
1826
  }
1827
  return b1;
2,457✔
1828
}
1829

1830
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
3,540✔
1831
  if (pOldCfg->totalReplicaNum != pNewCfg->totalReplicaNum) return true;
3,540✔
1832
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
2,389✔
1833
  for (int32_t i = 0; i < pOldCfg->totalReplicaNum; ++i) {
5,598✔
1834
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
4,014✔
1835
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
4,014✔
1836
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
4,014!
1837
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
4,014✔
1838
    if (pOldInfo->nodeRole != pNewInfo->nodeRole) return true;
4,012✔
1839
  }
1840

1841
  return false;
1,584✔
1842
}
1843

1844
int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
1,875✔
1845
  int32_t  code = 0;
1,875✔
1846
  SSyncCfg oldConfig = pSyncNode->raftCfg.cfg;
1,875✔
1847
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
1,875✔
1848
    sInfo("vgId:1, sync not reconfig since not changed");
1,584!
1849
    return 0;
1,584✔
1850
  }
1851

1852
  pSyncNode->raftCfg.cfg = *pNewConfig;
291✔
1853
  pSyncNode->raftCfg.lastConfigIndex = lastConfigChangeIndex;
291✔
1854

1855
  pSyncNode->configChangeNum++;
291✔
1856

1857
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
291✔
1858
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
291✔
1859

1860
  bool isDrop = false;
291✔
1861
  bool isAdd = false;
291✔
1862

1863
  if (IamInOld && !IamInNew) {
291!
1864
    isDrop = true;
×
1865
  } else {
1866
    isDrop = false;
291✔
1867
  }
1868

1869
  if (!IamInOld && IamInNew) {
291!
1870
    isAdd = true;
×
1871
  } else {
1872
    isAdd = false;
291✔
1873
  }
1874

1875
  // log begin config change
1876
  sNInfo(pSyncNode, "begin do config change, from %d to %d, from %" PRId64 " to %" PRId64 ", replicas:%d",
291!
1877
         pSyncNode->vgId, oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum, oldConfig.lastIndex,
1878
         pNewConfig->lastIndex);
1879

1880
  if (IamInNew) {
291!
1881
    pSyncNode->raftCfg.isStandBy = 0;  // change isStandBy to normal
291✔
1882
  }
1883
  if (isDrop) {
291!
1884
    pSyncNode->raftCfg.isStandBy = 1;  // set standby
×
1885
  }
1886

1887
  // add last config index
1888
  SRaftCfg* pCfg = &pSyncNode->raftCfg;
291✔
1889
  if (pCfg->configIndexCount >= MAX_CONFIG_INDEX_COUNT) {
291!
1890
    sNError(pSyncNode, "failed to add cfg index:%d since out of range", pCfg->configIndexCount);
×
1891
    terrno = TSDB_CODE_OUT_OF_RANGE;
×
1892
    return -1;
×
1893
  }
1894

1895
  pCfg->configIndexArr[pCfg->configIndexCount] = lastConfigChangeIndex;
291✔
1896
  pCfg->configIndexCount++;
291✔
1897

1898
  if (IamInNew) {
291!
1899
    //-----------------------------------------
1900
    int32_t ret = 0;
291✔
1901

1902
    // save snapshot senders
1903
    SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
1904
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
291✔
1905
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
1906
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
4,656✔
1907
      oldSenders[i] = pSyncNode->senders[i];
4,365✔
1908
      sSTrace(oldSenders[i], "snapshot sender save old");
4,365!
1909
    }
1910

1911
    // init internal
1912
    pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
291✔
1913
    if (syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId) == false) return terrno;
291!
1914

1915
    // init peersNum, peers, peersId
1916
    pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
291✔
1917
    int32_t j = 0;
291✔
1918
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
1,054✔
1919
      if (i != pSyncNode->raftCfg.cfg.myIndex) {
763✔
1920
        pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
472✔
1921
        syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
472✔
1922
        j++;
472✔
1923
      }
1924
    }
1925
    for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
763✔
1926
      if (syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]) == false)
472!
1927
        return terrno;
×
1928
    }
1929

1930
    // init replicaNum, replicasId
1931
    pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
291✔
1932
    pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
291✔
1933
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
1,054✔
1934
      if (syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]) ==
763!
1935
          false)
1936
        return terrno;
×
1937
    }
1938

1939
    // update quorum first
1940
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
291✔
1941

1942
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
291✔
1943
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
291✔
1944
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
291✔
1945
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
291✔
1946

1947
    // reset snapshot senders
1948

1949
    // clear new
1950
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
4,656✔
1951
      pSyncNode->senders[i] = NULL;
4,365✔
1952
    }
1953

1954
    // reset new
1955
    for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
1,054✔
1956
      // reset sender
1957
      bool reset = false;
763✔
1958
      for (int32_t j = 0; j < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++j) {
3,256✔
1959
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j]) && oldSenders[j] != NULL) {
3,119!
1960
          sNTrace(pSyncNode, "snapshot sender reset for:%" PRId64 ", newIndex:%d, dnode:%d, %p",
626!
1961
                  (pSyncNode->replicasId)[i].addr, i, DID(&pSyncNode->replicasId[i]), oldSenders[j]);
1962

1963
          pSyncNode->senders[i] = oldSenders[j];
626✔
1964
          oldSenders[j] = NULL;
626✔
1965
          reset = true;
626✔
1966

1967
          // reset replicaIndex
1968
          int32_t oldreplicaIndex = pSyncNode->senders[i]->replicaIndex;
626✔
1969
          pSyncNode->senders[i]->replicaIndex = i;
626✔
1970

1971
          sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, dnode:%d, %p, reset:%d",
626!
1972
                  oldreplicaIndex, i, DID(&pSyncNode->replicasId[i]), pSyncNode->senders[i], reset);
1973

1974
          break;
626✔
1975
        }
1976
      }
1977
    }
1978

1979
    // create new
1980
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
4,656✔
1981
      if (pSyncNode->senders[i] == NULL) {
4,365✔
1982
        TAOS_CHECK_RETURN(snapshotSenderCreate(pSyncNode, i, &pSyncNode->senders[i]));
3,739!
1983
        if (pSyncNode->senders[i] == NULL) {
3,739!
1984
          // will be created later while send snapshot
1985
          sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig");
×
1986
        } else {
1987
          sSDebug(pSyncNode->senders[i], "snapshot sender create while reconfig, data:%p", pSyncNode->senders[i]);
3,739!
1988
        }
1989
      } else {
1990
        sSDebug(pSyncNode->senders[i], "snapshot sender already exist, data:%p", pSyncNode->senders[i]);
626!
1991
      }
1992
    }
1993

1994
    // free old
1995
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
4,656✔
1996
      if (oldSenders[i] != NULL) {
4,365✔
1997
        sSDebug(oldSenders[i], "snapshot sender destroy old, data:%p replica-index:%d", oldSenders[i], i);
3,739!
1998
        snapshotSenderDestroy(oldSenders[i]);
3,739✔
1999
        oldSenders[i] = NULL;
3,739✔
2000
      }
2001
    }
2002

2003
    // persist cfg
2004
    TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode));
291!
2005
  } else {
2006
    // persist cfg
2007
    TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode));
×
2008
    sNInfo(pSyncNode, "do not config change from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
×
2009
  }
2010

2011
_END:
×
2012
  // log end config change
2013
  sNInfo(pSyncNode, "end do config change, from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
291!
2014
  return 0;
291✔
2015
}
2016

2017
// raft state change --------------
2018
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
85,022✔
2019
  if (term > raftStoreGetTerm(pSyncNode)) {
85,022!
2020
    raftStoreSetTerm(pSyncNode, term);
×
2021
  }
2022
}
85,022✔
2023

2024
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
436,514✔
2025
  SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
436,514✔
2026
  if (currentTerm > newTerm) {
436,516!
2027
    sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
×
2028
    return;
×
2029
  }
2030

2031
  do {
2032
    sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
436,516!
2033
  } while (0);
2034

2035
  if (pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
436,516!
2036
    (void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
×
2037
    syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncNode->vgId, pSyncNode->arbToken);
×
2038
    sInfo("vgId:%d, step down as assigned leader, new arbToken:%s", pSyncNode->vgId, pSyncNode->arbToken);
×
2039
    (void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
×
2040
  }
2041

2042
  if (currentTerm < newTerm) {
436,512✔
2043
    raftStoreSetTerm(pSyncNode, newTerm);
2,213✔
2044
    char tmpBuf[64];
2045
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
2,213✔
2046
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
2,213✔
2047
    raftStoreClearVote(pSyncNode);
2,213✔
2048
  } else {
2049
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
434,299✔
2050
      syncNodeBecomeFollower(pSyncNode, "step down");
10✔
2051
    }
2052
  }
2053
}
2054

2055
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }
5,372✔
2056

2057
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
5,372✔
2058
  int32_t code = 0;  // maybe clear leader cache
5,372✔
2059
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
5,372✔
2060
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
12✔
2061
  }
2062

2063
  pSyncNode->hbSlowNum = 0;
5,372✔
2064

2065
  // state change
2066
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
5,372✔
2067
  pSyncNode->roleTimeMs = taosGetTimestampMs();
5,372✔
2068
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
5,372!
2069
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2070
    return;
×
2071
  }
2072

2073
  // trace log
2074
  sNTrace(pSyncNode, "become follower %s", debugStr);
5,372!
2075

2076
  // send rsp to client
2077
  syncNodeLeaderChangeRsp(pSyncNode);
5,372✔
2078

2079
  // call back
2080
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
5,372!
2081
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
5,372✔
2082
  }
2083

2084
  // min match index
2085
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
5,372✔
2086

2087
  // reset log buffer
2088
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
5,372!
2089
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2090
    return;
×
2091
  }
2092

2093
  // reset elect timer
2094
  syncNodeResetElectTimer(pSyncNode);
5,372✔
2095

2096
  sInfo("vgId:%d, become follower. %s", pSyncNode->vgId, debugStr);
5,372!
2097
}
2098

2099
void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) {
311✔
2100
  pSyncNode->hbSlowNum = 0;
311✔
2101

2102
  // state change
2103
  pSyncNode->state = TAOS_SYNC_STATE_LEARNER;
311✔
2104
  pSyncNode->roleTimeMs = taosGetTimestampMs();
311✔
2105

2106
  // trace log
2107
  sNTrace(pSyncNode, "become learner %s", debugStr);
311!
2108

2109
  // call back
2110
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLearnerCb != NULL) {
311!
2111
    pSyncNode->pFsm->FpBecomeLearnerCb(pSyncNode->pFsm);
311✔
2112
  }
2113

2114
  // min match index
2115
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
311✔
2116

2117
  // reset log buffer
2118
  int32_t code = 0;
311✔
2119
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
311!
2120
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2121
    return;
×
2122
  };
2123
}
2124

2125
// TLA+ Spec
2126
// \* Candidate i transitions to leader.
2127
// BecomeLeader(i) ==
2128
//     /\ state[i] = Candidate
2129
//     /\ votesGranted[i] \in Quorum
2130
//     /\ state'      = [state EXCEPT ![i] = Leader]
2131
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
2132
//                          [j \in Server |-> Len(log[i]) + 1]]
2133
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
2134
//                          [j \in Server |-> 0]]
2135
//     /\ elections'  = elections \cup
2136
//                          {[eterm     |-> currentTerm[i],
2137
//                            eleader   |-> i,
2138
//                            elog      |-> log[i],
2139
//                            evotes    |-> votesGranted[i],
2140
//                            evoterLog |-> voterLog[i]]}
2141
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
2142
//
2143
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
13,500✔
2144
  int32_t code = 0;
13,500✔
2145
  pSyncNode->becomeLeaderNum++;
13,500✔
2146
  pSyncNode->hbrSlowNum = 0;
13,500✔
2147

2148
  // reset restoreFinish
2149
  pSyncNode->restoreFinish = false;
13,500✔
2150

2151
  // state change
2152
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
13,500✔
2153
  pSyncNode->roleTimeMs = taosGetTimestampMs();
13,500✔
2154

2155
  // set leader cache
2156
  pSyncNode->leaderCache = pSyncNode->myRaftId;
13,500✔
2157

2158
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
29,006✔
2159
    SyncIndex lastIndex;
2160
    SyncTerm  lastTerm;
2161
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
15,506✔
2162
    if (code != 0) {
15,506!
2163
      sError("vgId:%d, failed to become leader since %s", pSyncNode->vgId, tstrerror(code));
×
2164
      return;
×
2165
    }
2166
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
15,506✔
2167
  }
2168

2169
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
29,006✔
2170
    // maybe overwrite myself, no harm
2171
    // just do it!
2172
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
15,506✔
2173
  }
2174

2175
  // init peer mgr
2176
  if ((code = syncNodePeerStateInit(pSyncNode)) != 0) {
13,500!
2177
    sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code));
×
2178
    return;
×
2179
  }
2180

2181
#if 0
2182
  // update sender private term
2183
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
2184
  if (pMySender != NULL) {
2185
    for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
2186
      if (pSyncNode->senders[i]->privateTerm > pMySender->privateTerm) {
2187
        pMySender->privateTerm = pSyncNode->senders[i]->privateTerm;
2188
      }
2189
    }
2190
    (pMySender->privateTerm) += 100;
2191
  }
2192
#endif
2193

2194
  // close receiver
2195
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
13,500!
2196
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
2197
  }
2198

2199
  // stop elect timer
2200
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
13,500!
2201
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
2202
    return;
×
2203
  }
2204

2205
  // start heartbeat timer
2206
  if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) {
13,500!
2207
    sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2208
    return;
×
2209
  }
2210

2211
  // send heartbeat right now
2212
  if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) {
13,500!
2213
    sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code));
×
2214
    return;
×
2215
  }
2216

2217
  // call back
2218
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
13,500!
2219
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
13,500✔
2220
  }
2221

2222
  // min match index
2223
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
13,500✔
2224

2225
  // reset log buffer
2226
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
13,500!
2227
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2228
    return;
×
2229
  }
2230

2231
  // trace log
2232
  sNInfo(pSyncNode, "become leader %s", debugStr);
13,500!
2233
}
2234

2235
void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) {
×
2236
  int32_t code = 0;
×
2237
  pSyncNode->becomeAssignedLeaderNum++;
×
2238
  pSyncNode->hbrSlowNum = 0;
×
2239

2240
  // reset restoreFinish
2241
  // pSyncNode->restoreFinish = false;
2242

2243
  // state change
2244
  pSyncNode->state = TAOS_SYNC_STATE_ASSIGNED_LEADER;
×
2245
  pSyncNode->roleTimeMs = taosGetTimestampMs();
×
2246

2247
  // set leader cache
2248
  pSyncNode->leaderCache = pSyncNode->myRaftId;
×
2249

2250
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
×
2251
    SyncIndex lastIndex;
2252
    SyncTerm  lastTerm;
2253
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
×
2254
    if (code != 0) {
×
2255
      sError("vgId:%d, failed to become assigned leader since %s", pSyncNode->vgId, tstrerror(code));
×
2256
      return;
×
2257
    }
2258
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
×
2259
  }
2260

2261
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
×
2262
    // maybe overwrite myself, no harm
2263
    // just do it!
2264
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
×
2265
  }
2266

2267
  // init peer mgr
2268
  if ((code = syncNodePeerStateInit(pSyncNode)) != 0) {
×
2269
    sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code));
×
2270
    return;
×
2271
  }
2272

2273
  // close receiver
2274
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
×
2275
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
2276
  }
2277

2278
  // stop elect timer
2279
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
×
2280
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
2281
    return;
×
2282
  }
2283

2284
  // start heartbeat timer
2285
  if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) {
×
2286
    sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2287
    return;
×
2288
  }
2289

2290
  // send heartbeat right now
2291
  if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) {
×
2292
    sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code));
×
2293
    return;
×
2294
  }
2295

2296
  // call back
2297
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeAssignedLeaderCb != NULL) {
×
2298
    pSyncNode->pFsm->FpBecomeAssignedLeaderCb(pSyncNode->pFsm);
×
2299
  }
2300

2301
  // min match index
2302
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
×
2303

2304
  // reset log buffer
2305
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
×
2306
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2307
    return;
×
2308
  }
2309

2310
  // trace log
2311
  sNInfo(pSyncNode, "become assigned leader");
×
2312
}
2313

2314
void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
1,050✔
2315
  if (pSyncNode->state != TAOS_SYNC_STATE_CANDIDATE) {
1,050!
2316
    sError("vgId:%d, failed leader from candidate since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state);
×
2317
    return;
×
2318
  }
2319
  bool granted = voteGrantedMajority(pSyncNode->pVotesGranted);
1,050✔
2320
  if (!granted) {
1,050!
2321
    sError("vgId:%d, not granted by majority.", pSyncNode->vgId);
×
2322
    return;
×
2323
  }
2324
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");
1,050✔
2325

2326
  sNTrace(pSyncNode, "state change syncNodeCandidate2Leader");
1,050!
2327

2328
  int32_t ret = syncNodeAppendNoop(pSyncNode);
1,050✔
2329
  if (ret < 0) {
1,050!
2330
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, terrstr());
×
2331
  }
2332

2333
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
1,050✔
2334

2335
  sInfo("vgId:%d, become leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64 "", pSyncNode->vgId,
1,050!
2336
        raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
2337
}
2338

2339
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }
445,075✔
2340

2341
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
29,408✔
2342
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
470,501✔
2343
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
441,093✔
2344
    pSyncNode->peerStates[i].lastSendTime = 0;
441,093✔
2345
  }
2346

2347
  return 0;
29,408✔
2348
}
2349

2350
void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
1,257✔
2351
  if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
1,257!
2352
    sError("vgId:%d, failed candidate from follower since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state);
×
2353
    return;
×
2354
  }
2355
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
1,257✔
2356
  pSyncNode->roleTimeMs = taosGetTimestampMs();
1,257✔
2357
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
1,257✔
2358
  sInfo("vgId:%d, become candidate from follower. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
1,257!
2359
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
2360

2361
  sNTrace(pSyncNode, "follower to candidate");
1,257!
2362
}
2363

2364
int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) {
×
2365
  if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) return TSDB_CODE_SYN_INTERNAL_ERROR;
×
2366
  syncNodeBecomeLeader(pSyncNode, "assigned leader to leader");
×
2367

2368
  sNTrace(pSyncNode, "assigned leader to leader");
×
2369

2370
  int32_t ret = syncNodeAppendNoop(pSyncNode);
×
2371
  if (ret < 0) {
×
2372
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, tstrerror(ret));
×
2373
  }
2374

2375
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
×
2376
  sInfo("vgId:%d, become leader from assigned leader. term:%" PRId64 ", commit index:%" PRId64
×
2377
        "assigned commit index:%" PRId64 ", last index:%" PRId64,
2378
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, pSyncNode->assignedCommitIndex,
2379
        lastIndex);
2380
  return 0;
×
2381
}
2382

2383
// just called by syncNodeVoteForSelf
2384
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
1,341✔
2385
  SyncTerm storeTerm = raftStoreGetTerm(pSyncNode);
1,341✔
2386
  if (term != storeTerm) {
1,341!
2387
    sError("vgId:%d, failed to vote for term, term:%" PRId64 ", storeTerm:%" PRId64, pSyncNode->vgId, term, storeTerm);
×
2388
    return;
×
2389
  }
2390
  bool voted = raftStoreHasVoted(pSyncNode);
1,341✔
2391
  if (voted) {
1,341!
2392
    sError("vgId:%d, failed to vote for term since not voted", pSyncNode->vgId);
×
2393
    return;
×
2394
  }
2395

2396
  raftStoreVote(pSyncNode, pRaftId);
1,341✔
2397
}
2398

2399
// simulate get vote from outside
2400
void syncNodeVoteForSelf(SSyncNode* pSyncNode, SyncTerm currentTerm) {
1,341✔
2401
  syncNodeVoteForTerm(pSyncNode, currentTerm, &pSyncNode->myRaftId);
1,341✔
2402

2403
  SRpcMsg rpcMsg = {0};
1,341✔
2404
  int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
1,341✔
2405
  if (ret != 0) return;
1,341!
2406

2407
  SyncRequestVoteReply* pMsg = rpcMsg.pCont;
1,341✔
2408
  pMsg->srcId = pSyncNode->myRaftId;
1,341✔
2409
  pMsg->destId = pSyncNode->myRaftId;
1,341✔
2410
  pMsg->term = currentTerm;
1,341✔
2411
  pMsg->voteGranted = true;
1,341✔
2412

2413
  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
1,341✔
2414
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
1,341✔
2415
  rpcFreeCont(rpcMsg.pCont);
1,341✔
2416
}
2417

2418
// return if has a snapshot
2419
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
20,201✔
2420
  bool      ret = false;
20,201✔
2421
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
20,201✔
2422
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
20,201!
2423
    // TODO check return value
2424
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
20,201✔
2425
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
20,201✔
2426
      ret = true;
2,712✔
2427
    }
2428
  }
2429
  return ret;
20,201✔
2430
}
2431

2432
// return max(logLastIndex, snapshotLastIndex)
2433
// if no snapshot and log, return -1
2434
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
20,202✔
2435
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
20,202✔
2436
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
20,202!
2437
    // TODO check return value
2438
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
20,202✔
2439
  }
2440
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
20,202✔
2441

2442
  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
20,202✔
2443
  return lastIndex;
20,202✔
2444
}
2445

2446
// return the last term of snapshot and log
2447
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
2448
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
20,201✔
2449
  SyncTerm lastTerm = 0;
20,201✔
2450
  if (syncNodeHasSnapshot(pSyncNode)) {
20,201✔
2451
    // has snapshot
2452
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
2,712✔
2453
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
2,712!
2454
      // TODO check return value
2455
      (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
2,712✔
2456
    }
2457

2458
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
2,712✔
2459
    if (logLastIndex > snapshot.lastApplyIndex) {
2,712✔
2460
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
1,659✔
2461
    } else {
2462
      lastTerm = snapshot.lastApplyTerm;
1,053✔
2463
    }
2464

2465
  } else {
2466
    // no snapshot
2467
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
17,489✔
2468
  }
2469

2470
  return lastTerm;
20,201✔
2471
}
2472

2473
// get last index and term along with snapshot
2474
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
17,957✔
2475
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
17,957✔
2476
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
17,957✔
2477
  return 0;
17,957✔
2478
}
2479

2480
#ifdef BUILD_NO_CALL
2481
// return append-entries first try index
2482
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
2483
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
2484
  return syncStartIndex;
2485
}
2486

2487
// if index > 0, return index - 1
2488
// else, return -1
2489
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
2490
  SyncIndex preIndex = index - 1;
2491
  if (preIndex < SYNC_INDEX_INVALID) {
2492
    preIndex = SYNC_INDEX_INVALID;
2493
  }
2494

2495
  return preIndex;
2496
}
2497

2498
// if index < 0, return SYNC_TERM_INVALID
2499
// if index == 0, return 0
2500
// if index > 0, return preTerm
2501
// if error, return SYNC_TERM_INVALID
2502
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
2503
  if (index < SYNC_INDEX_BEGIN) {
2504
    return SYNC_TERM_INVALID;
2505
  }
2506

2507
  if (index == SYNC_INDEX_BEGIN) {
2508
    return 0;
2509
  }
2510

2511
  SyncTerm  preTerm = 0;
2512
  SyncIndex preIndex = index - 1;
2513

2514
  SSyncRaftEntry* pPreEntry = NULL;
2515
  SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
2516
  LRUHandle*      h = taosLRUCacheLookup(pCache, &preIndex, sizeof(preIndex));
2517
  int32_t         code = 0;
2518
  if (h) {
2519
    pPreEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
2520
    code = 0;
2521

2522
    pSyncNode->pLogStore->cacheHit++;
2523
    sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", preIndex, pPreEntry->bytes, pPreEntry);
2524

2525
  } else {
2526
    pSyncNode->pLogStore->cacheMiss++;
2527
    sNTrace(pSyncNode, "miss cache index:%" PRId64, preIndex);
2528

2529
    code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
2530
  }
2531

2532
  SSnapshot snapshot = {.data = NULL,
2533
                        .lastApplyIndex = SYNC_INDEX_INVALID,
2534
                        .lastApplyTerm = SYNC_TERM_INVALID,
2535
                        .lastConfigIndex = SYNC_INDEX_INVALID};
2536

2537
  if (code == 0) {
2538
    if (pPreEntry == NULL) return -1;
2539
    preTerm = pPreEntry->term;
2540

2541
    if (h) {
2542
      taosLRUCacheRelease(pCache, h, false);
2543
    } else {
2544
      syncEntryDestroy(pPreEntry);
2545
    }
2546

2547
    return preTerm;
2548
  } else {
2549
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
2550
      // TODO check return value
2551
      (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
2552
      if (snapshot.lastApplyIndex == preIndex) {
2553
        return snapshot.lastApplyTerm;
2554
      }
2555
    }
2556
  }
2557

2558
  sNError(pSyncNode, "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRId64,
2559
          index, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
2560
  return SYNC_TERM_INVALID;
2561
}
2562

2563
// get pre index and term of "index"
2564
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
2565
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
2566
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
2567
  return 0;
2568
}
2569
#endif
2570

2571
static void syncNodeEqPingTimer(void* param, void* tmrId) {
205,220✔
2572
  if (!syncIsInit()) return;
205,220!
2573

2574
  int64_t    rid = (int64_t)param;
205,220✔
2575
  SSyncNode* pNode = syncNodeAcquire(rid);
205,220✔
2576

2577
  if (pNode == NULL) return;
205,220!
2578

2579
  if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
205,220!
2580
    SRpcMsg rpcMsg = {0};
205,220✔
2581
    int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
205,220✔
2582
                                    pNode->pingTimerMS, pNode);
2583
    if (code != 0) {
205,220!
2584
      sError("failed to build ping msg");
×
2585
      rpcFreeCont(rpcMsg.pCont);
×
2586
      goto _out;
×
2587
    }
2588

2589
    // sTrace("enqueue ping msg");
2590
    code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
205,220✔
2591
    if (code != 0) {
205,220!
2592
      sError("failed to sync enqueue ping msg since %s", terrstr());
×
2593
      rpcFreeCont(rpcMsg.pCont);
×
2594
      goto _out;
×
2595
    }
2596

2597
  _out:
205,220✔
2598
    if (taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
205,220!
2599
                     &pNode->pPingTimer))
2600
      sError("failed to reset ping timer");
×
2601
  }
2602
  syncNodeRelease(pNode);
205,220✔
2603
}
2604

2605
static void syncNodeEqElectTimer(void* param, void* tmrId) {
1,344✔
2606
  if (!syncIsInit()) return;
1,347!
2607

2608
  int64_t    rid = (int64_t)param;
1,344✔
2609
  SSyncNode* pNode = syncNodeAcquire(rid);
1,344✔
2610

2611
  if (pNode == NULL) return;
1,344✔
2612

2613
  if (pNode->syncEqMsg == NULL) {
1,343!
2614
    syncNodeRelease(pNode);
×
2615
    return;
×
2616
  }
2617

2618
  int64_t tsNow = taosGetTimestampMs();
1,343✔
2619
  if (tsNow < pNode->electTimerParam.executeTime) {
1,343✔
2620
    syncNodeRelease(pNode);
2✔
2621
    return;
2✔
2622
  }
2623

2624
  SRpcMsg rpcMsg = {0};
1,341✔
2625
  int32_t code =
2626
      syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerParam.logicClock, pNode->electTimerMS, pNode);
1,341✔
2627

2628
  if (code != 0) {
1,341!
2629
    sError("failed to build elect msg");
×
2630
    syncNodeRelease(pNode);
×
2631
    return;
×
2632
  }
2633

2634
  SyncTimeout* pTimeout = rpcMsg.pCont;
1,341✔
2635
  sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);
1,341!
2636

2637
  code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
1,341✔
2638
  if (code != 0) {
1,341!
2639
    sError("failed to sync enqueue elect msg since %s", terrstr());
×
2640
    rpcFreeCont(rpcMsg.pCont);
×
2641
    syncNodeRelease(pNode);
×
2642
    return;
×
2643
  }
2644

2645
  syncNodeRelease(pNode);
1,341✔
2646
}
2647

2648
#ifdef BUILD_NO_CALL
2649
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
2650
  if (!syncIsInit()) return;
2651

2652
  int64_t    rid = (int64_t)param;
2653
  SSyncNode* pNode = syncNodeAcquire(rid);
2654

2655
  if (pNode == NULL) return;
2656

2657
  if (pNode->totalReplicaNum > 1) {
2658
    if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
2659
      SRpcMsg rpcMsg = {0};
2660
      int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
2661
                                      pNode->heartbeatTimerMS, pNode);
2662

2663
      if (code != 0) {
2664
        sError("failed to build heartbeat msg");
2665
        goto _out;
2666
      }
2667

2668
      sTrace("vgId:%d, enqueue heartbeat timer", pNode->vgId);
2669
      code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
2670
      if (code != 0) {
2671
        sError("failed to enqueue heartbeat msg since %s", terrstr());
2672
        rpcFreeCont(rpcMsg.pCont);
2673
        goto _out;
2674
      }
2675

2676
    _out:
2677
      if (taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
2678
                       &pNode->pHeartbeatTimer) != 0)
2679
        return;
2680

2681
    } else {
2682
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
2683
             pNode->heartbeatTimerLogicClock, pNode->heartbeatTimerLogicClockUser);
2684
    }
2685
  }
2686
}
2687
#endif
2688

2689
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
46,220✔
2690
  int32_t code = 0;
46,220✔
2691
  int64_t hbDataRid = (int64_t)param;
46,220✔
2692
  int64_t tsNow = taosGetTimestampMs();
46,220✔
2693

2694
  SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid);
46,220✔
2695
  if (pData == NULL) {
46,220!
2696
    sError("hb timer get pData NULL, %" PRId64, hbDataRid);
×
2697
    return;
×
2698
  }
2699

2700
  SSyncNode* pSyncNode = syncNodeAcquire(pData->syncNodeRid);
46,220✔
2701
  if (pSyncNode == NULL) {
46,220✔
2702
    syncHbTimerDataRelease(pData);
3✔
2703
    sError("hb timer get pSyncNode NULL");
3!
2704
    return;
3✔
2705
  }
2706

2707
  SSyncTimer* pSyncTimer = pData->pTimer;
46,217✔
2708

2709
  if (!pSyncNode->isStart) {
46,217!
2710
    syncNodeRelease(pSyncNode);
×
2711
    syncHbTimerDataRelease(pData);
×
2712
    sError("vgId:%d, hb timer sync node already stop", pSyncNode->vgId);
×
2713
    return;
×
2714
  }
2715

2716
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
46,217!
2717
    syncNodeRelease(pSyncNode);
×
2718
    syncHbTimerDataRelease(pData);
×
2719
    sError("vgId:%d, hb timer sync node not leader", pSyncNode->vgId);
×
2720
    return;
×
2721
  }
2722

2723
  sTrace("vgId:%d, peer hb timer execution, rid:%" PRId64 " addr:%" PRId64, pSyncNode->vgId, hbDataRid,
46,217!
2724
         pData->destId.addr);
2725

2726
  if (pSyncNode->totalReplicaNum > 1) {
46,217✔
2727
    int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
46,215✔
2728
    int64_t msgLogicClock = atomic_load_64(&pData->logicClock);
46,215✔
2729

2730
    if (timerLogicClock == msgLogicClock) {
46,215✔
2731
      if (tsNow > pData->execTime) {
46,212✔
2732
        pData->execTime += pSyncTimer->timerMS;
46,188✔
2733

2734
        SRpcMsg rpcMsg = {0};
46,188✔
2735
        if ((code = syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId)) != 0) {
46,188!
2736
          sError("vgId:%d, failed to build heartbeat msg since %s", pSyncNode->vgId, tstrerror(code));
×
2737
          syncNodeRelease(pSyncNode);
×
2738
          syncHbTimerDataRelease(pData);
×
2739
          return;
×
2740
        }
2741

2742
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);
46,188✔
2743

2744
        SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
46,188✔
2745
        pSyncMsg->srcId = pSyncNode->myRaftId;
46,188✔
2746
        pSyncMsg->destId = pData->destId;
46,188✔
2747
        pSyncMsg->term = raftStoreGetTerm(pSyncNode);
46,188✔
2748
        pSyncMsg->commitIndex = pSyncNode->commitIndex;
46,188✔
2749
        pSyncMsg->minMatchIndex = pSyncNode->minMatchIndex;
46,188✔
2750
        pSyncMsg->privateTerm = 0;
46,188✔
2751
        pSyncMsg->timeStamp = tsNow;
46,188✔
2752

2753
        // update reset time
2754
        int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
46,188✔
2755
        pSyncTimer->timeStamp = tsNow;
46,188✔
2756

2757
        // send msg
2758
        TRACE_SET_MSGID(&(rpcMsg.info.traceId), tGenIdPI64());
46,188✔
2759
        STraceId* trace = &(rpcMsg.info.traceId);
46,188✔
2760
        sGTrace("vgId:%d, send sync-heartbeat to dnode:%d", pSyncNode->vgId, DID(&(pSyncMsg->destId)));
46,188!
2761
        syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime);
46,188✔
2762
        int ret = syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
46,188✔
2763
        if (ret != 0) {
46,188✔
2764
          sError("vgId:%d, failed to send heartbeat since %s", pSyncNode->vgId, tstrerror(ret));
82!
2765
        }
2766
      }
2767

2768
      if (syncIsInit()) {
46,212!
2769
        sTrace("vgId:%d, reset peer hb timer at %d", pSyncNode->vgId, pSyncTimer->timerMS);
46,212!
2770
        bool stopped = taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, (void*)hbDataRid,
46,212✔
2771
                                    syncEnv()->pTimerManager, &pSyncTimer->pTimer);
46,212✔
2772
        if (stopped) sError("vgId:%d, reset peer hb timer error, %s", pSyncNode->vgId, tstrerror(code));
46,212!
2773

2774
      } else {
2775
        sError("sync env is stop, reset peer hb timer error");
×
2776
      }
2777

2778
    } else {
2779
      sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", pSyncNode->vgId,
3!
2780
             timerLogicClock, msgLogicClock);
2781
    }
2782
  }
2783

2784
  syncHbTimerDataRelease(pData);
46,217✔
2785
  syncNodeRelease(pSyncNode);
46,217✔
2786
}
2787

2788
#ifdef BUILD_NO_CALL
2789
static void deleteCacheEntry(const void* key, size_t keyLen, void* value, void* ud) {
2790
  (void)ud;
2791
  taosMemoryFree(value);
2792
}
2793

2794
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
2795
  SSyncLogStoreData* pData = pLogStore->data;
2796
  sNTrace(pData->pSyncNode, "in cache index:%" PRId64 ", bytes:%u, %p", pEntry->index, pEntry->bytes, pEntry);
2797

2798
  int32_t   code = 0;
2799
  int32_t   entryLen = sizeof(*pEntry) + pEntry->dataLen;
2800
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
2801
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW, NULL);
2802
  if (status != TAOS_LRU_STATUS_OK) {
2803
    code = -1;
2804
  }
2805

2806
  return code;
2807
}
2808
#endif
2809

2810
void syncBuildConfigFromReq(SAlterVnodeReplicaReq* pReq, SSyncCfg* cfg) {  // TODO SAlterVnodeReplicaReq name is proper?
×
2811
  cfg->replicaNum = 0;
×
2812
  cfg->totalReplicaNum = 0;
×
2813
  int32_t code = 0;
×
2814

2815
  for (int i = 0; i < pReq->replica; ++i) {
×
2816
    SNodeInfo* pNode = &cfg->nodeInfo[i];
×
2817
    pNode->nodeId = pReq->replicas[i].id;
×
2818
    pNode->nodePort = pReq->replicas[i].port;
×
2819
    tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
×
2820
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2821
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
×
2822
    sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d, update:%d", pReq->vgId, i, pNode->nodeFqdn,
×
2823
          pNode->nodePort, pNode->nodeId, pNode->nodeRole, update);
2824
    cfg->replicaNum++;
×
2825
  }
2826
  if (pReq->selfIndex != -1) {
×
2827
    cfg->myIndex = pReq->selfIndex;
×
2828
  }
2829
  for (int i = cfg->replicaNum; i < pReq->replica + pReq->learnerReplica; ++i) {
×
2830
    SNodeInfo* pNode = &cfg->nodeInfo[i];
×
2831
    pNode->nodeId = pReq->learnerReplicas[cfg->totalReplicaNum].id;
×
2832
    pNode->nodePort = pReq->learnerReplicas[cfg->totalReplicaNum].port;
×
2833
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
2834
    tstrncpy(pNode->nodeFqdn, pReq->learnerReplicas[cfg->totalReplicaNum].fqdn, sizeof(pNode->nodeFqdn));
×
2835
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
×
2836
    sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d, update:%d", pReq->vgId, i, pNode->nodeFqdn,
×
2837
          pNode->nodePort, pNode->nodeId, pNode->nodeRole, update);
2838
    cfg->totalReplicaNum++;
×
2839
  }
2840
  cfg->totalReplicaNum += pReq->replica;
×
2841
  if (pReq->learnerSelfIndex != -1) {
×
2842
    cfg->myIndex = pReq->replica + pReq->learnerSelfIndex;
×
2843
  }
2844
  cfg->changeVersion = pReq->changeVersion;
×
2845
}
×
2846

2847
int32_t syncNodeCheckChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry) {
×
2848
  int32_t code = 0;
×
2849
  if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) {
×
2850
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
2851
  }
2852

2853
  SMsgHead* head = (SMsgHead*)pEntry->data;
×
2854
  void*     pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
×
2855

2856
  SAlterVnodeTypeReq req = {0};
×
2857
  if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
×
2858
    code = TSDB_CODE_INVALID_MSG;
×
2859
    TAOS_RETURN(code);
×
2860
  }
2861

2862
  SSyncCfg cfg = {0};
×
2863
  syncBuildConfigFromReq(&req, &cfg);
×
2864

2865
  if (cfg.totalReplicaNum >= 1 &&
×
2866
      (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
×
2867
    bool incfg = false;
×
2868
    for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
2869
      if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
2870
          ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
2871
        incfg = true;
×
2872
        break;
×
2873
      }
2874
    }
2875

2876
    if (!incfg) {
×
2877
      SyncTerm currentTerm = raftStoreGetTerm(ths);
×
2878
      syncNodeStepDown(ths, currentTerm);
×
2879
      return 1;
×
2880
    }
2881
  }
2882
  return 0;
×
2883
}
2884

2885
void syncNodeLogConfigInfo(SSyncNode* ths, SSyncCfg* cfg, char* str) {
×
2886
  sInfo("vgId:%d, %s. SyncNode, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64
×
2887
        ", changeVersion:%d, "
2888
        "restoreFinish:%d",
2889
        ths->vgId, str, ths->replicaNum, ths->peersNum, ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion,
2890
        ths->restoreFinish);
2891

2892
  sInfo("vgId:%d, %s, myNodeInfo, clusterId:%" PRId64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str,
×
2893
        ths->myNodeInfo.clusterId, ths->myNodeInfo.nodeId, ths->myNodeInfo.nodeFqdn, ths->myNodeInfo.nodePort,
2894
        ths->myNodeInfo.nodeRole);
2895

2896
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
2897
    sInfo("vgId:%d, %s, peersNodeInfo%d, clusterId:%" PRId64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str,
×
2898
          i, ths->peersNodeInfo[i].clusterId, ths->peersNodeInfo[i].nodeId, ths->peersNodeInfo[i].nodeFqdn,
2899
          ths->peersNodeInfo[i].nodePort, ths->peersNodeInfo[i].nodeRole);
2900
  }
2901

2902
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
2903
    char    buf[256];
2904
    int32_t len = 256;
×
2905
    int32_t n = 0;
×
2906
    n += tsnprintf(buf + n, len - n, "%s", "{");
×
2907
    for (int i = 0; i < ths->peersEpset->numOfEps; i++) {
×
2908
      n += tsnprintf(buf + n, len - n, "%s:%d%s", ths->peersEpset->eps[i].fqdn, ths->peersEpset->eps[i].port,
×
2909
                    (i + 1 < ths->peersEpset->numOfEps ? ", " : ""));
×
2910
    }
2911
    n += tsnprintf(buf + n, len - n, "%s", "}");
×
2912

2913
    sInfo("vgId:%d, %s, peersEpset%d, %s, inUse:%d", ths->vgId, str, i, buf, ths->peersEpset->inUse);
×
2914
  }
2915

2916
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
2917
    sInfo("vgId:%d, %s, peersId%d, addr:%" PRId64, ths->vgId, str, i, ths->peersId[i].addr);
×
2918
  }
2919

2920
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
2921
    sInfo("vgId:%d, %s, nodeInfo%d, clusterId:%" PRId64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str, i,
×
2922
          ths->raftCfg.cfg.nodeInfo[i].clusterId, ths->raftCfg.cfg.nodeInfo[i].nodeId,
2923
          ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, ths->raftCfg.cfg.nodeInfo[i].nodePort,
2924
          ths->raftCfg.cfg.nodeInfo[i].nodeRole);
2925
  }
2926

2927
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
2928
    sInfo("vgId:%d, %s, replicasId%d, addr:%" PRId64, ths->vgId, str, i, ths->replicasId[i].addr);
×
2929
  }
2930
}
×
2931

2932
int32_t syncNodeRebuildPeerAndCfg(SSyncNode* ths, SSyncCfg* cfg) {
×
2933
  int32_t i = 0;
×
2934

2935
  // change peersNodeInfo
2936
  i = 0;
×
2937
  for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
2938
    if (!(strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
2939
          ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)) {
×
2940
      ths->peersNodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
×
2941
      ths->peersNodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
×
2942
      tstrncpy(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
×
2943
      ths->peersNodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
×
2944
      ths->peersNodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
×
2945

2946
      syncUtilNodeInfo2EpSet(&ths->peersNodeInfo[i], &ths->peersEpset[i]);
×
2947

2948
      if (syncUtilNodeInfo2RaftId(&ths->peersNodeInfo[i], ths->vgId, &ths->peersId[i]) == false) {
×
2949
        sError("vgId:%d, failed to determine raft member id, peer:%d", ths->vgId, i);
×
2950
        return terrno;
×
2951
      }
2952

2953
      i++;
×
2954
    }
2955
  }
2956
  ths->peersNum = i;
×
2957

2958
  // change cfg nodeInfo
2959
  ths->raftCfg.cfg.replicaNum = 0;
×
2960
  i = 0;
×
2961
  for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
2962
    if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
2963
      ths->raftCfg.cfg.replicaNum++;
×
2964
    }
2965
    ths->raftCfg.cfg.nodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
×
2966
    ths->raftCfg.cfg.nodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
×
2967
    tstrncpy(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
×
2968
    ths->raftCfg.cfg.nodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
×
2969
    ths->raftCfg.cfg.nodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
×
2970
    if ((strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
2971
         ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)) {
×
2972
      ths->raftCfg.cfg.myIndex = i;
×
2973
    }
2974
    i++;
×
2975
  }
2976
  ths->raftCfg.cfg.totalReplicaNum = i;
×
2977

2978
  return 0;
×
2979
}
2980

2981
void syncNodeChangePeerAndCfgToVoter(SSyncNode* ths, SSyncCfg* cfg) {
×
2982
  // change peersNodeInfo
2983
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
2984
    for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
2985
      if (strcmp(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
2986
          ths->peersNodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort) {
×
2987
        if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
2988
          ths->peersNodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2989
        }
2990
      }
2991
    }
2992
  }
2993

2994
  // change cfg nodeInfo
2995
  ths->raftCfg.cfg.replicaNum = 0;
×
2996
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
2997
    for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
2998
      if (strcmp(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
2999
          ths->raftCfg.cfg.nodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort) {
×
3000
        if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3001
          ths->raftCfg.cfg.nodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3002
          ths->raftCfg.cfg.replicaNum++;
×
3003
        }
3004
      }
3005
    }
3006
  }
3007
}
×
3008

3009
int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum) {
×
3010
  int32_t code = 0;
×
3011
  // 1.rebuild replicasId, remove deleted one
3012
  SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
3013
  memcpy(oldReplicasId, ths->replicasId, sizeof(oldReplicasId));
×
3014

3015
  ths->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3016
  ths->totalReplicaNum = ths->raftCfg.cfg.totalReplicaNum;
×
3017
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3018
    if (syncUtilNodeInfo2RaftId(&ths->raftCfg.cfg.nodeInfo[i], ths->vgId, &ths->replicasId[i]) == false) return terrno;
×
3019
  }
3020

3021
  // 2.rebuild MatchIndex, remove deleted one
3022
  SSyncIndexMgr* oldIndex = ths->pMatchIndex;
×
3023

3024
  ths->pMatchIndex = syncIndexMgrCreate(ths);
×
3025
  if (ths->pMatchIndex == NULL) {
×
3026
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3027
    if (terrno != 0) code = terrno;
×
3028
    TAOS_RETURN(code);
×
3029
  }
3030

3031
  syncIndexMgrCopyIfExist(ths->pMatchIndex, oldIndex, oldReplicasId);
×
3032

3033
  syncIndexMgrDestroy(oldIndex);
×
3034

3035
  // 3.rebuild NextIndex, remove deleted one
3036
  SSyncIndexMgr* oldNextIndex = ths->pNextIndex;
×
3037

3038
  ths->pNextIndex = syncIndexMgrCreate(ths);
×
3039
  if (ths->pNextIndex == NULL) {
×
3040
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3041
    if (terrno != 0) code = terrno;
×
3042
    TAOS_RETURN(code);
×
3043
  }
3044

3045
  syncIndexMgrCopyIfExist(ths->pNextIndex, oldNextIndex, oldReplicasId);
×
3046

3047
  syncIndexMgrDestroy(oldNextIndex);
×
3048

3049
  // 4.rebuild pVotesGranted, pVotesRespond, no need to keep old vote state, only rebuild
3050
  voteGrantedUpdate(ths->pVotesGranted, ths);
×
3051
  votesRespondUpdate(ths->pVotesRespond, ths);
×
3052

3053
  // 5.rebuild logReplMgr
3054
  for (int i = 0; i < oldtotalReplicaNum; ++i) {
×
3055
    sDebug("vgId:%d, old logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId,
×
3056
           i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
3057
           ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
3058
  }
3059

3060
  SSyncLogReplMgr* oldLogReplMgrs = NULL;
×
3061
  int64_t          length = sizeof(SSyncLogReplMgr) * (TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA);
×
3062
  oldLogReplMgrs = taosMemoryMalloc(length);
×
3063
  if (NULL == oldLogReplMgrs) return terrno;
×
3064
  memset(oldLogReplMgrs, 0, length);
×
3065

3066
  for (int i = 0; i < oldtotalReplicaNum; i++) {
×
3067
    oldLogReplMgrs[i] = *(ths->logReplMgrs[i]);
×
3068
  }
3069

3070
  syncNodeLogReplDestroy(ths);
×
3071
  if ((code = syncNodeLogReplInit(ths)) != 0) {
×
3072
    taosMemoryFree(oldLogReplMgrs);
×
3073
    TAOS_RETURN(code);
×
3074
  }
3075

3076
  for (int i = 0; i < ths->totalReplicaNum; ++i) {
×
3077
    for (int j = 0; j < oldtotalReplicaNum; j++) {
×
3078
      if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
×
3079
        *(ths->logReplMgrs[i]) = oldLogReplMgrs[j];
×
3080
        ths->logReplMgrs[i]->peerId = i;
×
3081
      }
3082
    }
3083
  }
3084

3085
  for (int i = 0; i < ths->totalReplicaNum; ++i) {
×
3086
    sDebug("vgId:%d, new logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId,
×
3087
           i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
3088
           ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
3089
  }
3090

3091
  // 6.rebuild sender
3092
  for (int i = 0; i < oldtotalReplicaNum; ++i) {
×
3093
    sDebug("vgId:%d, old sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i,
×
3094
           ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
3095
  }
3096

3097
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3098
    if (ths->senders[i] != NULL) {
×
3099
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", ths->vgId, ths->senders[i]);
×
3100

3101
      if (snapshotSenderIsStart(ths->senders[i])) {
×
3102
        snapshotSenderStop(ths->senders[i], false);
×
3103
      }
3104

3105
      snapshotSenderDestroy(ths->senders[i]);
×
3106
      ths->senders[i] = NULL;
×
3107
    }
3108
  }
3109

3110
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3111
    SSyncSnapshotSender* pSender = NULL;
×
3112
    int32_t              code = snapshotSenderCreate(ths, i, &pSender);
×
3113
    if (pSender == NULL) return terrno = code;
×
3114

3115
    ths->senders[i] = pSender;
×
3116
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
×
3117
  }
3118

3119
  for (int i = 0; i < ths->totalReplicaNum; i++) {
×
3120
    sDebug("vgId:%d, new sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i,
×
3121
           ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
3122
  }
3123

3124
  // 7.rebuild synctimer
3125
  if ((code = syncNodeStopHeartbeatTimer(ths)) != 0) {
×
3126
    taosMemoryFree(oldLogReplMgrs);
×
3127
    TAOS_RETURN(code);
×
3128
  }
3129

3130
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3131
    if ((code = syncHbTimerInit(ths, &ths->peerHeartbeatTimerArr[i], ths->replicasId[i])) != 0) {
×
3132
      taosMemoryFree(oldLogReplMgrs);
×
3133
      TAOS_RETURN(code);
×
3134
    }
3135
  }
3136

3137
  if ((code = syncNodeStartHeartbeatTimer(ths)) != 0) {
×
3138
    taosMemoryFree(oldLogReplMgrs);
×
3139
    TAOS_RETURN(code);
×
3140
  }
3141

3142
  // 8.rebuild peerStates
3143
  SPeerState oldState[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0};
×
3144
  for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) {
×
3145
    oldState[i] = ths->peerStates[i];
×
3146
  }
3147

3148
  for (int i = 0; i < ths->totalReplicaNum; i++) {
×
3149
    for (int j = 0; j < oldtotalReplicaNum; j++) {
×
3150
      if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
×
3151
        ths->peerStates[i] = oldState[j];
×
3152
      }
3153
    }
3154
  }
3155

3156
  taosMemoryFree(oldLogReplMgrs);
×
3157

3158
  return 0;
×
3159
}
3160

3161
void syncNodeChangeToVoter(SSyncNode* ths) {
×
3162
  // replicasId, only need to change replicaNum when 1->3
3163
  ths->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3164
  sDebug("vgId:%d, totalReplicaNum:%d", ths->vgId, ths->totalReplicaNum);
×
3165
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
×
3166
    sDebug("vgId:%d, i:%d, replicaId.addr:%" PRIx64, ths->vgId, i, ths->replicasId[i].addr);
×
3167
  }
3168

3169
  // pMatchIndex, pNextIndex, only need to change replicaNum when 1->3
3170
  ths->pMatchIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3171
  ths->pNextIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3172

3173
  sDebug("vgId:%d, pMatchIndex->totalReplicaNum:%d", ths->vgId, ths->pMatchIndex->totalReplicaNum);
×
3174
  for (int32_t i = 0; i < ths->pMatchIndex->totalReplicaNum; ++i) {
×
3175
    sDebug("vgId:%d, i:%d, match.index:%" PRId64, ths->vgId, i, ths->pMatchIndex->index[i]);
×
3176
  }
3177

3178
  // pVotesGranted, pVotesRespond
3179
  voteGrantedUpdate(ths->pVotesGranted, ths);
×
3180
  votesRespondUpdate(ths->pVotesRespond, ths);
×
3181

3182
  // logRepMgrs
3183
  // no need to change logRepMgrs when 1->3
3184
}
×
3185

3186
void syncNodeResetPeerAndCfg(SSyncNode* ths) {
×
3187
  SNodeInfo node = {0};
×
3188
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3189
    memcpy(&ths->peersNodeInfo[i], &node, sizeof(SNodeInfo));
×
3190
  }
3191

3192
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3193
    memcpy(&ths->raftCfg.cfg.nodeInfo[i], &node, sizeof(SNodeInfo));
×
3194
  }
3195
}
×
3196

3197
int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str) {
×
3198
  int32_t code = 0;
×
3199
  if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) {
×
3200
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3201
  }
3202

3203
  SMsgHead* head = (SMsgHead*)pEntry->data;
×
3204
  void*     pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
×
3205

3206
  SAlterVnodeTypeReq req = {0};
×
3207
  if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
×
3208
    code = TSDB_CODE_INVALID_MSG;
×
3209
    TAOS_RETURN(code);
×
3210
  }
3211

3212
  SSyncCfg cfg = {0};
×
3213
  syncBuildConfigFromReq(&req, &cfg);
×
3214

3215
  if (cfg.changeVersion <= ths->raftCfg.cfg.changeVersion) {
×
3216
    sInfo(
×
3217
        "vgId:%d, skip conf change entry since lower version. "
3218
        "this entry, index:%" PRId64 ", term:%" PRId64
3219
        ", totalReplicaNum:%d, changeVersion:%d; "
3220
        "current node, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64 ", changeVersion:%d",
3221
        ths->vgId, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum,
3222
        ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion);
3223
    return 0;
×
3224
  }
3225

3226
  if (strcmp(str, "Commit") == 0) {
×
3227
    sInfo(
×
3228
        "vgId:%d, change config from %s. "
3229
        "this, i:%" PRId64
3230
        ", trNum:%d, vers:%d; "
3231
        "node, rNum:%d, pNum:%d, trNum:%d, "
3232
        "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
3233
        "), "
3234
        "cond:(next i:%" PRId64 ", t:%" PRId64 " ==%s)",
3235
        ths->vgId, str, pEntry->index - 1, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum,
3236
        ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, ths->pLogBuf->matchIndex,
3237
        ths->pLogBuf->endIndex, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType));
3238
  } else {
3239
    sInfo(
×
3240
        "vgId:%d, change config from %s. "
3241
        "this, i:%" PRId64 ", t:%" PRId64
3242
        ", trNum:%d, vers:%d; "
3243
        "node, rNum:%d, pNum:%d, trNum:%d, "
3244
        "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
3245
        "), "
3246
        "cond:(pre i:%" PRId64 "==ci:%" PRId64 ", bci:%" PRId64 ")",
3247
        ths->vgId, str, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum,
3248
        ths->peersNum, ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
3249
        ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex, pEntry->index - 1, ths->commitIndex,
3250
        ths->pLogBuf->commitIndex);
3251
  }
3252

3253
  syncNodeLogConfigInfo(ths, &cfg, "before config change");
×
3254

3255
  int32_t oldTotalReplicaNum = ths->totalReplicaNum;
×
3256

3257
  if (cfg.totalReplicaNum == 1 || cfg.totalReplicaNum == 2) {  // remove replica
×
3258

3259
    bool incfg = false;
×
3260
    for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3261
      if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3262
          ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3263
        incfg = true;
×
3264
        break;
×
3265
      }
3266
    }
3267

3268
    if (incfg) {  // remove other
×
3269
      syncNodeResetPeerAndCfg(ths);
×
3270

3271
      // no need to change myNodeInfo
3272

3273
      if ((code = syncNodeRebuildPeerAndCfg(ths, &cfg)) != 0) {
×
3274
        TAOS_RETURN(code);
×
3275
      };
3276

3277
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3278
        TAOS_RETURN(code);
×
3279
      };
3280
    } else {  // remove myself
3281
      // no need to do anything actually, to change the following to reduce distruptive server chance
3282

3283
      syncNodeResetPeerAndCfg(ths);
×
3284

3285
      // change myNodeInfo
3286
      ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3287

3288
      // change peer and cfg
3289
      ths->peersNum = 0;
×
3290
      memcpy(&ths->raftCfg.cfg.nodeInfo[0], &ths->myNodeInfo, sizeof(SNodeInfo));
×
3291
      ths->raftCfg.cfg.replicaNum = 0;
×
3292
      ths->raftCfg.cfg.totalReplicaNum = 1;
×
3293

3294
      // change other
3295
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3296
        TAOS_RETURN(code);
×
3297
      }
3298

3299
      // change state
3300
      ths->state = TAOS_SYNC_STATE_LEARNER;
×
3301
    }
3302

3303
    ths->restoreFinish = false;
×
3304
  } else {                            // add replica, or change replica type
3305
    if (ths->totalReplicaNum == 3) {  // change replica type
×
3306
      sInfo("vgId:%d, begin change replica type", ths->vgId);
×
3307

3308
      // change myNodeInfo
3309
      for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3310
        if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3311
            ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3312
          if (cfg.nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3313
            ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3314
          }
3315
        }
3316
      }
3317

3318
      // change peer and cfg
3319
      syncNodeChangePeerAndCfgToVoter(ths, &cfg);
×
3320

3321
      // change other
3322
      syncNodeChangeToVoter(ths);
×
3323

3324
      // change state
3325
      if (ths->state == TAOS_SYNC_STATE_LEARNER) {
×
3326
        if (ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3327
          ths->state = TAOS_SYNC_STATE_FOLLOWER;
×
3328
        }
3329
      }
3330

3331
      ths->restoreFinish = false;
×
3332
    } else {  // add replica
3333
      sInfo("vgId:%d, begin add replica", ths->vgId);
×
3334

3335
      // no need to change myNodeInfo
3336

3337
      // change peer and cfg
3338
      if ((code = syncNodeRebuildPeerAndCfg(ths, &cfg)) != 0) {
×
3339
        TAOS_RETURN(code);
×
3340
      };
3341

3342
      // change other
3343
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3344
        TAOS_RETURN(code);
×
3345
      };
3346

3347
      // no need to change state
3348

3349
      if (ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) {
×
3350
        ths->restoreFinish = false;
×
3351
      }
3352
    }
3353
  }
3354

3355
  ths->quorum = syncUtilQuorum(ths->replicaNum);
×
3356

3357
  ths->raftCfg.lastConfigIndex = pEntry->index;
×
3358
  ths->raftCfg.cfg.lastIndex = pEntry->index;
×
3359
  ths->raftCfg.cfg.changeVersion = cfg.changeVersion;
×
3360

3361
  syncNodeLogConfigInfo(ths, &cfg, "after config change");
×
3362

3363
  if ((code = syncWriteCfgFile(ths)) != 0) {
×
3364
    sError("vgId:%d, failed to create sync cfg file", ths->vgId);
×
3365
    TAOS_RETURN(code);
×
3366
  };
3367

3368
  TAOS_RETURN(code);
×
3369
}
3370

3371
int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
10,532,355✔
3372
  int32_t code = -1;
10,532,355✔
3373
  if (pEntry->dataLen < sizeof(SMsgHead)) {
10,532,355!
3374
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
3375
    sError("vgId:%d, cannot append an invalid client request with no msg head. type:%s, dataLen:%d", ths->vgId,
×
3376
           TMSG_INFO(pEntry->originalRpcType), pEntry->dataLen);
3377
    syncEntryDestroy(pEntry);
×
3378
    pEntry = NULL;
×
3379
    goto _out;
×
3380
  }
3381

3382
  // append to log buffer
3383
  if ((code = syncLogBufferAppend(ths->pLogBuf, ths, pEntry)) < 0) {
10,532,355✔
3384
    sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index);
108!
3385
    int32_t ret = 0;
108✔
3386
    if ((ret = syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno, false)) != 0) {
108!
3387
      sError("vgId:%d, failed to execute fsm, since %s", ths->vgId, tstrerror(ret));
×
3388
    }
3389
    syncEntryDestroy(pEntry);
×
3390
    pEntry = NULL;
×
3391
    goto _out;
×
3392
  }
3393

3394
  code = 0;
10,532,272✔
3395
_out:;
10,532,272✔
3396
  // proceed match index, with replicating on needed
3397
  SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL, "Append");
10,532,272✔
3398

3399
  if (pEntry != NULL)
10,532,250✔
3400
    sTrace("vgId:%d, append raft entry. index:%" PRId64 ", term:%" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64
10,532,150✔
3401
           ", %" PRId64 ")",
3402
           ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
3403
           ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
3404

3405
  if (code == 0 && ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
10,532,220!
3406
    TAOS_CHECK_RETURN(syncNodeUpdateAssignedCommitIndex(ths, matchIndex));
×
3407

3408
    if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
×
3409
        syncLogBufferCommit(ths->pLogBuf, ths, ths->assignedCommitIndex) < 0) {
×
3410
      sError("vgId:%d, failed to commit until commitIndex:%" PRId64 "", ths->vgId, ths->commitIndex);
×
3411
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
3412
    }
3413
  }
3414

3415
  // multi replica
3416
  if (ths->replicaNum > 1) {
10,532,220✔
3417
    TAOS_RETURN(code);
152,760✔
3418
  }
3419

3420
  // single replica
3421
  SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, matchIndex);
10,379,460✔
3422
  sTrace("vgId:%d, update commit return index %" PRId64 "", ths->vgId, returnIndex);
10,379,598✔
3423

3424
  if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
20,759,405!
3425
      (code = syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex)) < 0) {
10,379,547✔
3426
    sError("vgId:%d, failed to commit until commitIndex:%" PRId64 "", ths->vgId, ths->commitIndex);
×
3427
  }
3428

3429
  TAOS_RETURN(code);
10,379,858✔
3430
}
3431

3432
bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
10,520,211✔
3433
  if (pSyncNode->totalReplicaNum == 1) {
10,520,211✔
3434
    return false;
10,224,614✔
3435
  }
3436

3437
  int32_t toCount = 0;
295,597✔
3438
  int64_t tsNow = taosGetTimestampMs();
295,759✔
3439
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
740,875✔
3440
    if (pSyncNode->peersNodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
445,117✔
3441
      continue;
142,882✔
3442
    }
3443
    int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
302,235✔
3444
    if (recvTime == 0 || recvTime == -1) {
302,234!
3445
      continue;
×
3446
    }
3447

3448
    if (tsNow - recvTime > tsHeartbeatTimeout) {
302,235✔
3449
      toCount++;
567✔
3450
    }
3451
  }
3452

3453
  bool b = (toCount >= pSyncNode->quorum ? true : false);
295,758✔
3454

3455
  return b;
295,758✔
3456
}
3457

3458
bool syncNodeSnapshotSending(SSyncNode* pSyncNode) {
×
3459
  if (pSyncNode == NULL) return false;
×
3460
  bool b = false;
×
3461
  for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
×
3462
    if (pSyncNode->senders[i] != NULL && pSyncNode->senders[i]->start) {
×
3463
      b = true;
×
3464
      break;
×
3465
    }
3466
  }
3467
  return b;
×
3468
}
3469

3470
bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) {
×
3471
  if (pSyncNode == NULL) return false;
×
3472
  if (pSyncNode->pNewNodeReceiver == NULL) return false;
×
3473
  if (pSyncNode->pNewNodeReceiver->start) return true;
×
3474
  return false;
×
3475
}
3476

3477
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
13,500✔
3478
  int32_t   code = 0;
13,500✔
3479
  SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
13,500✔
3480
  SyncTerm  term = raftStoreGetTerm(ths);
13,500✔
3481

3482
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
13,500✔
3483
  if (pEntry == NULL) {
13,500!
3484
    code = TSDB_CODE_OUT_OF_MEMORY;
×
3485
    TAOS_RETURN(code);
×
3486
  }
3487

3488
  code = syncNodeAppend(ths, pEntry);
13,500✔
3489
  TAOS_RETURN(code);
13,500✔
3490
}
3491

3492
#ifdef BUILD_NO_CALL
3493
static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
3494
  int32_t ret = 0;
3495

3496
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
3497
  SyncTerm        term = raftStoreGetTerm(ths);
3498
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
3499
  if (pEntry == NULL) return -1;
3500

3501
  LRUHandle* h = NULL;
3502

3503
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
3504
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry, false);
3505
    if (code != 0) {
3506
      sError("append noop error");
3507
      return -1;
3508
    }
3509

3510
    syncCacheEntry(ths->pLogStore, pEntry, &h);
3511
  }
3512

3513
  if (h) {
3514
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
3515
  } else {
3516
    syncEntryDestroy(pEntry);
3517
  }
3518

3519
  return ret;
3520
}
3521
#endif
3522

3523
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
45,207✔
3524
  SyncHeartbeat* pMsg = pRpcMsg->pCont;
45,207✔
3525
  bool           resetElect = false;
45,207✔
3526

3527
  const STraceId* trace = &pRpcMsg->info.traceId;
45,207✔
3528
  char            tbuf[40] = {0};
45,207✔
3529
  TRACE_TO_STR(trace, tbuf);
45,207!
3530

3531
  int64_t tsMs = taosGetTimestampMs();
45,207✔
3532
  int64_t timeDiff = tsMs - pMsg->timeStamp;
45,207✔
3533
  syncLogRecvHeartbeat(ths, pMsg, timeDiff, tbuf);
45,207✔
3534

3535
  if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) {
45,207✔
3536
    sWarn(
5!
3537
        "vgId:%d, drop heartbeat msg from dnode:%d, because it come from another cluster:%d, differ from current "
3538
        "cluster:%d",
3539
        ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), CID(&(ths->myRaftId)));
3540
    return 0;
5✔
3541
  }
3542

3543
  SRpcMsg rpcMsg = {0};
45,202✔
3544
  TAOS_CHECK_RETURN(syncBuildHeartbeatReply(&rpcMsg, ths->vgId));
45,202!
3545
  SyncTerm currentTerm = raftStoreGetTerm(ths);
45,202✔
3546

3547
  SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
45,202✔
3548
  pMsgReply->destId = pMsg->srcId;
45,202✔
3549
  pMsgReply->srcId = ths->myRaftId;
45,202✔
3550
  pMsgReply->term = currentTerm;
45,202✔
3551
  pMsgReply->privateTerm = 8864;  // magic number
45,202✔
3552
  pMsgReply->startTime = ths->startTime;
45,202✔
3553
  pMsgReply->timeStamp = tsMs;
45,202✔
3554

3555
  sGTrace("vgId:%d, process sync-heartbeat msg from dnode:%d, cluster:%d, Msgterm:%" PRId64 " currentTerm:%" PRId64,
45,202!
3556
          ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), pMsg->term, currentTerm);
3557

3558
  if (pMsg->term > currentTerm && ths->state == TAOS_SYNC_STATE_LEARNER) {
45,202✔
3559
    raftStoreSetTerm(ths, pMsg->term);
304✔
3560
    currentTerm = pMsg->term;
304✔
3561
  }
3562

3563
  if (pMsg->term == currentTerm &&
45,202✔
3564
      (ths->state != TAOS_SYNC_STATE_LEADER && ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
45,056!
3565
    syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);
45,056✔
3566
    resetElect = true;
45,056✔
3567

3568
    ths->minMatchIndex = pMsg->minMatchIndex;
45,056✔
3569

3570
    if (ths->state == TAOS_SYNC_STATE_FOLLOWER || ths->state == TAOS_SYNC_STATE_LEARNER) {
45,056✔
3571
      SRpcMsg rpcMsgLocalCmd = {0};
45,047✔
3572
      TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId));
45,047!
3573

3574
      SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
45,046✔
3575
      pSyncMsg->cmd =
45,046✔
3576
          (ths->state == TAOS_SYNC_STATE_LEARNER) ? SYNC_LOCAL_CMD_LEARNER_CMT : SYNC_LOCAL_CMD_FOLLOWER_CMT;
45,046✔
3577
      pSyncMsg->commitIndex = pMsg->commitIndex;
45,046✔
3578
      pSyncMsg->currentTerm = pMsg->term;
45,046✔
3579

3580
      if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
45,046!
3581
        int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
45,047✔
3582
        if (code != 0) {
45,047!
3583
          sError("vgId:%d, failed to enqueue commit msg from heartbeat since %s, code:%d", ths->vgId, terrstr(), code);
×
3584
          rpcFreeCont(rpcMsgLocalCmd.pCont);
×
3585
        } else {
3586
          sTrace("vgId:%d, enqueue commit msg from heartbeat, commit-index:%" PRId64 ", term:%" PRId64, ths->vgId,
45,047!
3587
                 pMsg->commitIndex, pMsg->term);
3588
        }
3589
      }
3590
    }
3591
  }
3592

3593
  if (pMsg->term >= currentTerm &&
45,202!
3594
      (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
45,202!
3595
    SRpcMsg rpcMsgLocalCmd = {0};
×
3596
    TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId));
×
3597

3598
    SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
×
3599
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
×
3600
    pSyncMsg->currentTerm = pMsg->term;
×
3601
    pSyncMsg->commitIndex = pMsg->commitIndex;
×
3602

3603
    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
×
3604
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
×
3605
      if (code != 0) {
×
3606
        sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
×
3607
        rpcFreeCont(rpcMsgLocalCmd.pCont);
×
3608
      } else {
3609
        sTrace("vgId:%d, sync enqueue step-down msg, new-term:%" PRId64, ths->vgId, pMsg->term);
×
3610
      }
3611
    }
3612
  }
3613

3614
  // reply
3615
  TAOS_CHECK_RETURN(syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg));
45,202!
3616

3617
  if (resetElect) syncNodeResetElectTimer(ths);
45,202✔
3618
  return 0;
45,202✔
3619
}
3620

3621
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
44,688✔
3622
  int32_t         code = 0;
44,688✔
3623
  const STraceId* trace = &pRpcMsg->info.traceId;
44,688✔
3624
  char            tbuf[40] = {0};
44,688✔
3625
  TRACE_TO_STR(trace, tbuf);
44,688!
3626

3627
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
44,688✔
3628
  SSyncLogReplMgr*    pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId);
44,688✔
3629
  if (pMgr == NULL) {
44,688!
3630
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3631
    if (terrno != 0) code = terrno;
×
3632
    sError("vgId:%d, failed to get log repl mgr for the peer at addr 0x016%" PRIx64 "", ths->vgId, pMsg->srcId.addr);
×
3633
    TAOS_RETURN(code);
×
3634
  }
3635

3636
  int64_t tsMs = taosGetTimestampMs();
44,688✔
3637
  syncLogRecvHeartbeatReply(ths, pMsg, tsMs - pMsg->timeStamp, tbuf);
44,688✔
3638

3639
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
44,688✔
3640

3641
  return syncLogReplProcessHeartbeatReply(pMgr, ths, pMsg);
44,688✔
3642
}
3643

3644
#ifdef BUILD_NO_CALL
3645
int32_t syncNodeOnHeartbeatReplyOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
3646
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
3647

3648
  const STraceId* trace = &pRpcMsg->info.traceId;
3649
  char            tbuf[40] = {0};
3650
  TRACE_TO_STR(trace, tbuf);
3651

3652
  int64_t tsMs = taosGetTimestampMs();
3653
  int64_t timeDiff = tsMs - pMsg->timeStamp;
3654
  syncLogRecvHeartbeatReply(ths, pMsg, timeDiff, tbuf);
3655

3656
  // update last reply time, make decision whether the other node is alive or not
3657
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
3658
  return 0;
3659
}
3660
#endif
3661

3662
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
45,047✔
3663
  SyncLocalCmd* pMsg = pRpcMsg->pCont;
45,047✔
3664
  syncLogRecvLocalCmd(ths, pMsg, "");
45,047✔
3665

3666
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
45,047!
3667
    syncNodeStepDown(ths, pMsg->currentTerm);
×
3668

3669
  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT || pMsg->cmd == SYNC_LOCAL_CMD_LEARNER_CMT) {
90,094!
3670
    if (syncLogBufferIsEmpty(ths->pLogBuf)) {
45,047!
3671
      sError("vgId:%d, sync log buffer is empty.", ths->vgId);
×
3672
      return 0;
×
3673
    }
3674
    SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf);
45,047✔
3675
    if (matchTerm < 0) {
45,047!
3676
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3677
    }
3678
    if (pMsg->currentTerm == matchTerm) {
45,047✔
3679
      SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
41,248✔
3680
      sTrace("vgId:%d, update commit return index %" PRId64 "", ths->vgId, returnIndex);
41,248!
3681
    }
3682
    if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE && syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
45,047!
UNCOV
3683
      sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64 "", ths->vgId, terrstr(),
×
3684
             ths->commitIndex);
3685
    }
3686
  } else {
3687
    sError("error local cmd");
×
3688
  }
3689

3690
  return 0;
45,047✔
3691
}
3692

3693
// TLA+ Spec
3694
// ClientRequest(i, v) ==
3695
//     /\ state[i] = Leader
3696
//     /\ LET entry == [term  |-> currentTerm[i],
3697
//                      value |-> v]
3698
//            newLog == Append(log[i], entry)
3699
//        IN  log' = [log EXCEPT ![i] = newLog]
3700
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
3701
//                    leaderVars, commitIndex>>
3702
//
3703

3704
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
10,518,886✔
3705
  sNTrace(ths, "on client request");
10,518,886✔
3706

3707
  int32_t code = 0;
10,518,886✔
3708

3709
  SyncIndex       index = syncLogBufferGetEndIndex(ths->pLogBuf);
10,518,886✔
3710
  SyncTerm        term = raftStoreGetTerm(ths);
10,519,075✔
3711
  SSyncRaftEntry* pEntry = NULL;
10,519,092✔
3712
  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
10,519,092✔
3713
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index);
248,580✔
3714
  } else {
3715
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
10,270,512✔
3716
  }
3717

3718
  if (pEntry == NULL) {
10,519,012!
3719
    sError("vgId:%d, failed to process client request since %s.", ths->vgId, terrstr());
×
3720
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3721
  }
3722

3723
  // 1->2, config change is add in write thread, and will continue in sync thread
3724
  // need save message for it
3725
  if (pMsg->msgType == TDMT_SYNC_CONFIG_CHANGE) {
10,519,012!
3726
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
×
3727
    uint64_t  seqNum = syncRespMgrAdd(ths->pSyncRespMgr, &stub);
×
3728
    pEntry->seqNum = seqNum;
×
3729
  }
3730

3731
  if (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
10,519,012!
3732
    if (pRetIndex) {
10,519,012✔
3733
      (*pRetIndex) = index;
10,270,321✔
3734
    }
3735

3736
    if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
10,519,012!
3737
      int32_t code = syncNodeCheckChangeConfig(ths, pEntry);
×
3738
      if (code < 0) {
×
3739
        sError("vgId:%d, failed to check change config since %s.", ths->vgId, terrstr());
×
3740
        syncEntryDestroy(pEntry);
×
3741
        pEntry = NULL;
×
3742
        TAOS_RETURN(code);
×
3743
      }
3744

3745
      if (code > 0) {
×
3746
        SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
×
3747
        int32_t num = syncRespMgrGetAndDel(ths->pSyncRespMgr, pEntry->seqNum, &rsp.info);
×
3748
        sDebug("vgId:%d, get response stub for config change, seqNum:%" PRIu64 ", num:%d", ths->vgId, pEntry->seqNum,
×
3749
               num);
3750
        if (rsp.info.handle != NULL) {
×
3751
          tmsgSendRsp(&rsp);
×
3752
        }
3753
        syncEntryDestroy(pEntry);
×
3754
        pEntry = NULL;
×
3755
        TAOS_RETURN(code);
×
3756
      }
3757
    }
3758

3759
    code = syncNodeAppend(ths, pEntry);
10,519,012✔
3760
    return code;
10,518,657✔
3761
  } else {
3762
    syncEntryDestroy(pEntry);
×
3763
    pEntry = NULL;
×
3764
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3765
  }
3766
}
3767

3768
const char* syncStr(ESyncState state) {
1,448,751✔
3769
  switch (state) {
1,448,751!
3770
    case TAOS_SYNC_STATE_FOLLOWER:
115,208✔
3771
      return "follower";
115,208✔
3772
    case TAOS_SYNC_STATE_CANDIDATE:
5,790✔
3773
      return "candidate";
5,790✔
3774
    case TAOS_SYNC_STATE_LEADER:
1,316,005✔
3775
      return "leader";
1,316,005✔
3776
    case TAOS_SYNC_STATE_ERROR:
×
3777
      return "error";
×
3778
    case TAOS_SYNC_STATE_OFFLINE:
3,916✔
3779
      return "offline";
3,916✔
3780
    case TAOS_SYNC_STATE_LEARNER:
7,876✔
3781
      return "learner";
7,876✔
3782
    case TAOS_SYNC_STATE_ASSIGNED_LEADER:
×
3783
      return "assigned leader";
×
3784
    default:
×
3785
      return "unknown";
×
3786
  }
3787
}
3788

3789
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
1,875✔
3790
  for (int32_t i = 0; i < pNewCfg->totalReplicaNum; ++i) {
2,115!
3791
    SRaftId raftId = {
2,115✔
3792
        .addr = SYNC_ADDR(&pNewCfg->nodeInfo[i]),
2,115✔
3793
        .vgId = ths->vgId,
2,115✔
3794
    };
3795

3796
    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
2,115✔
3797
      pNewCfg->myIndex = i;
1,875✔
3798
      return 0;
1,875✔
3799
    }
3800
  }
3801

3802
  return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3803
}
3804

3805
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
10,520,178✔
3806
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
10,520,178!
3807
}
3808

3809
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
5,696,321✔
3810
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
8,929,582!
3811
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
8,929,588✔
3812
      return true;
5,696,326✔
3813
    }
3814
  }
UNCOV
3815
  return false;
×
3816
}
3817

3818
SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
115,764✔
3819
  SSyncSnapshotSender* pSender = NULL;
115,764✔
3820
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
378,620✔
3821
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
262,846✔
3822
      pSender = (ths->senders)[i];
115,784✔
3823
    }
3824
  }
3825
  return pSender;
115,774✔
3826
}
3827

3828
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
28,244✔
3829
  SSyncTimer* pTimer = NULL;
28,244✔
3830
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
117,612✔
3831
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
89,372✔
3832
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
28,243✔
3833
    }
3834
  }
3835
  return pTimer;
28,240✔
3836
}
3837

3838
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
20,139✔
3839
  SPeerState* pState = NULL;
20,139✔
3840
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
44,703✔
3841
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
24,564✔
3842
      pState = &((ths->peerStates)[i]);
20,140✔
3843
    }
3844
  }
3845
  return pState;
20,139✔
3846
}
3847

3848
#ifdef BUILD_NO_CALL
3849
bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
3850
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
3851
  if (pState == NULL) {
3852
    sError("vgId:%d, replica maybe dropped", ths->vgId);
3853
    return false;
3854
  }
3855

3856
  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
3857
  int64_t   tsNow = taosGetTimestampMs();
3858

3859
  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
3860
    return false;
3861
  }
3862

3863
  return true;
3864
}
3865

3866
bool syncNodeCanChange(SSyncNode* pSyncNode) {
3867
  if (pSyncNode->changing) {
3868
    sError("sync cannot change");
3869
    return false;
3870
  }
3871

3872
  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
3873
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
3874
    if (pSyncNode->commitIndex != lastIndex) {
3875
      sError("sync cannot change2");
3876
      return false;
3877
    }
3878
  }
3879

3880
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
3881
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
3882
    if (pSender != NULL && pSender->start) {
3883
      sError("sync cannot change3");
3884
      return false;
3885
    }
3886
  }
3887

3888
  return true;
3889
}
3890
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc