• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4452

05 Jul 2025 10:40AM UTC coverage: 60.734% (-2.8%) from 63.58%
#4452

push

travis-ci

web-flow
Merge pull request #31673 from taosdata/fix/huoh/taos_log

test: fix mnode create failure

149869 of 316704 branches covered (47.32%)

Branch coverage included in aggregate %.

233489 of 314508 relevant lines covered (74.24%)

6990477.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.93
/source/libs/sync/src/syncMain.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "sync.h"
18
#include "syncAppendEntries.h"
19
#include "syncAppendEntriesReply.h"
20
#include "syncCommit.h"
21
#include "syncElection.h"
22
#include "syncEnv.h"
23
#include "syncIndexMgr.h"
24
#include "syncInt.h"
25
#include "syncMessage.h"
26
#include "syncPipeline.h"
27
#include "syncRaftCfg.h"
28
#include "syncRaftLog.h"
29
#include "syncRaftStore.h"
30
#include "syncReplication.h"
31
#include "syncRequestVote.h"
32
#include "syncRequestVoteReply.h"
33
#include "syncRespMgr.h"
34
#include "syncSnapshot.h"
35
#include "syncTimeout.h"
36
#include "syncUtil.h"
37
#include "syncVoteMgr.h"
38
#include "tglobal.h"
39
#include "tmisce.h"
40
#include "tref.h"
41

42
static void    syncNodeEqPingTimer(void* param, void* tmrId);
43
static void    syncNodeEqElectTimer(void* param, void* tmrId);
44
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
45
static int32_t syncNodeAppendNoop(SSyncNode* ths);
46
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
47
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
48
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
49
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
50
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
51
static int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
52
static bool    syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
53
static int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
54
static bool    syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);
55

56
static bool    syncNodeCanChange(SSyncNode* pSyncNode);
57
static int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
58
static int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
59
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);
60

61
static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
62

63
int64_t syncOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
18,609✔
64
  sInfo("vgId:%d, start to open sync", pSyncInfo->vgId);
18,609✔
65
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo, vnodeVersion);
18,609✔
66
  if (pSyncNode == NULL) {
18,610!
67
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
×
68
    return -1;
×
69
  }
70

71
  pSyncNode->rid = syncNodeAdd(pSyncNode);
18,610✔
72
  if (pSyncNode->rid < 0) {
18,610!
73
    syncNodeClose(pSyncNode);
×
74
    return -1;
×
75
  }
76

77
  pSyncNode->pingBaseLine = pSyncInfo->pingMs;
18,610✔
78
  pSyncNode->pingTimerMS = pSyncInfo->pingMs;
18,610✔
79
  pSyncNode->electBaseLine = pSyncInfo->electMs;
18,610✔
80
  pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
18,610✔
81
  pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
18,610✔
82
  pSyncNode->msgcb = pSyncInfo->msgcb;
18,610✔
83
  sInfo("vgId:%d, sync opened", pSyncInfo->vgId);
18,610✔
84
  return pSyncNode->rid;
18,610✔
85
}
86

87
int32_t syncStart(int64_t rid) {
18,608✔
88
  int32_t    code = 0;
18,608✔
89
  int32_t    vgId = 0;
18,608✔
90
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
18,608✔
91
  if (pSyncNode == NULL) {
18,608!
92
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
93
    if (terrno != 0) code = terrno;
×
94
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
95
    TAOS_RETURN(code);
×
96
  }
97
  vgId = pSyncNode->vgId;
18,608✔
98
  sInfo("vgId:%d, begin to start sync", pSyncNode->vgId);
18,608✔
99

100
  if ((code = syncNodeRestore(pSyncNode)) < 0) {
18,608!
101
    sError("vgId:%d, failed to restore sync log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
102
    goto _err;
×
103
  }
104
  sInfo("vgId:%d, sync node restore is executed", pSyncNode->vgId);
18,608✔
105

106
  if ((code = syncNodeStart(pSyncNode)) < 0) {
18,608!
107
    sError("vgId:%d, failed to start sync node since %s", pSyncNode->vgId, tstrerror(code));
×
108
    goto _err;
×
109
  }
110
  sInfo("vgId:%d, sync node start is executed", pSyncNode->vgId);
18,608✔
111

112
  syncNodeRelease(pSyncNode);
18,609✔
113

114
  sInfo("vgId:%d, sync started", vgId);
18,608✔
115

116
  TAOS_RETURN(code);
18,608✔
117

118
_err:
×
119
  syncNodeRelease(pSyncNode);
×
120
  TAOS_RETURN(code);
×
121
}
122

123
int32_t syncNodeGetConfig(int64_t rid, SSyncCfg* cfg) {
26,042✔
124
  int32_t    code = 0;
26,042✔
125
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
26,042✔
126

127
  if (pSyncNode == NULL) {
26,042!
128
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
129
    if (terrno != 0) code = terrno;
×
130
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
131
    TAOS_RETURN(code);
×
132
  }
133

134
  *cfg = pSyncNode->raftCfg.cfg;
26,042✔
135

136
  syncNodeRelease(pSyncNode);
26,042✔
137

138
  return 0;
26,041✔
139
}
140

141
void syncStop(int64_t rid) {
18,608✔
142
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
18,608✔
143
  if (pSyncNode != NULL) {
18,608!
144
    pSyncNode->isStart = false;
18,608✔
145
    syncNodeRelease(pSyncNode);
18,608✔
146
    syncNodeRemove(rid);
18,608✔
147
  }
148
}
18,607✔
149

150
void syncPreStop(int64_t rid) {
18,607✔
151
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
18,607✔
152
  if (pSyncNode != NULL) {
18,608!
153
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
18,608!
154
      sInfo("vgId:%d, stop snapshot receiver", pSyncNode->vgId);
×
155
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
156
    }
157
    syncNodePreClose(pSyncNode);
18,608✔
158
    syncNodeRelease(pSyncNode);
18,608✔
159
  }
160
}
18,608✔
161

162
void syncPostStop(int64_t rid) {
16,060✔
163
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
16,060✔
164
  if (pSyncNode != NULL) {
16,060!
165
    syncNodePostClose(pSyncNode);
16,060✔
166
    syncNodeRelease(pSyncNode);
16,060✔
167
  }
168
}
16,058✔
169

170
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
2,381✔
171
  if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
2,381!
172
  return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
2,381!
173
}
174

175
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
2,531✔
176
  int32_t    code = 0;
2,531✔
177
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
2,531✔
178
  if (pSyncNode == NULL) {
2,531!
179
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
180
    if (terrno != 0) code = terrno;
×
181
    TAOS_RETURN(code);
×
182
  }
183

184
  if (pSyncNode->raftCfg.lastConfigIndex >= pNewCfg->lastIndex) {
2,531✔
185
    syncNodeRelease(pSyncNode);
150✔
186
    sInfo("vgId:%d, no need Reconfig, current index:%" PRId64 ", new index:%" PRId64, pSyncNode->vgId,
150!
187
          pSyncNode->raftCfg.lastConfigIndex, pNewCfg->lastIndex);
188
    return 0;
150✔
189
  }
190

191
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
2,381!
192
    syncNodeRelease(pSyncNode);
×
193
    code = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
×
194
    sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
×
195
    TAOS_RETURN(code);
×
196
  }
197

198
  TAOS_CHECK_RETURN(syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg));
2,381!
199

200
  if (syncNodeDoConfigChange(pSyncNode, pNewCfg, pNewCfg->lastIndex) != 0) {
2,381!
201
    code = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
×
202
    sError("vgId:%d, failed to reconfig since do change error", pSyncNode->vgId);
×
203
    TAOS_RETURN(code);
×
204
  }
205

206
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER || pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
2,381!
207
    // TODO check return value
208
    TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
2,154!
209

210
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
34,464✔
211
      TAOS_CHECK_RETURN(syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]));
32,310!
212
    }
213

214
    TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
2,154!
215
    // syncNodeReplicate(pSyncNode);
216
  }
217

218
  syncNodeRelease(pSyncNode);
2,381✔
219
  TAOS_RETURN(code);
2,381✔
220
}
221

222
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
1,619,688✔
223
  int32_t code = -1;
1,619,688✔
224
  if (!syncIsInit()) {
1,619,688!
225
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
226
    if (terrno != 0) code = terrno;
×
227
    TAOS_RETURN(code);
×
228
  }
229

230
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
1,619,718✔
231
  if (pSyncNode == NULL) {
1,619,904✔
232
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
155✔
233
    if (terrno != 0) code = terrno;
155!
234
    TAOS_RETURN(code);
×
235
  }
236

237
  switch (pMsg->msgType) {
1,619,749!
238
    case TDMT_SYNC_HEARTBEAT:
61,688✔
239
      code = syncNodeOnHeartbeat(pSyncNode, pMsg);
61,688✔
240
      break;
61,688✔
241
    case TDMT_SYNC_HEARTBEAT_REPLY:
60,102✔
242
      code = syncNodeOnHeartbeatReply(pSyncNode, pMsg);
60,102✔
243
      break;
60,102✔
244
    case TDMT_SYNC_TIMEOUT:
77,570✔
245
      code = syncNodeOnTimeout(pSyncNode, pMsg);
77,570✔
246
      break;
77,614✔
247
    case TDMT_SYNC_TIMEOUT_ELECTION:
2,357✔
248
      code = syncNodeOnTimeout(pSyncNode, pMsg);
2,357✔
249
      break;
2,357✔
250
    case TDMT_SYNC_CLIENT_REQUEST:
332,296✔
251
      code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
332,296✔
252
      break;
332,299✔
253
    case TDMT_SYNC_REQUEST_VOTE:
3,529✔
254
      code = syncNodeOnRequestVote(pSyncNode, pMsg);
3,529✔
255
      break;
3,529✔
256
    case TDMT_SYNC_REQUEST_VOTE_REPLY:
3,285✔
257
      code = syncNodeOnRequestVoteReply(pSyncNode, pMsg);
3,285✔
258
      break;
3,285✔
259
    case TDMT_SYNC_APPEND_ENTRIES:
508,042✔
260
      code = syncNodeOnAppendEntries(pSyncNode, pMsg);
508,042✔
261
      break;
508,046✔
262
    case TDMT_SYNC_APPEND_ENTRIES_REPLY:
506,939✔
263
      code = syncNodeOnAppendEntriesReply(pSyncNode, pMsg);
506,939✔
264
      break;
506,937✔
265
    case TDMT_SYNC_SNAPSHOT_SEND:
1,278✔
266
      code = syncNodeOnSnapshot(pSyncNode, pMsg);
1,278✔
267
      break;
1,278✔
268
    case TDMT_SYNC_SNAPSHOT_RSP:
1,278✔
269
      code = syncNodeOnSnapshotRsp(pSyncNode, pMsg);
1,278✔
270
      break;
1,278✔
271
    case TDMT_SYNC_LOCAL_CMD:
61,212✔
272
      code = syncNodeOnLocalCmd(pSyncNode, pMsg);
61,212✔
273
      break;
61,212✔
274
    case TDMT_SYNC_FORCE_FOLLOWER:
173✔
275
      code = syncForceBecomeFollower(pSyncNode, pMsg);
173✔
276
      break;
173✔
277
    case TDMT_SYNC_SET_ASSIGNED_LEADER:
×
278
      code = syncBecomeAssignedLeader(pSyncNode, pMsg);
×
279
      break;
×
280
    default:
×
281
      code = TSDB_CODE_MSG_NOT_PROCESSED;
×
282
  }
283

284
  syncNodeRelease(pSyncNode);
1,619,798✔
285
  if (code != 0) {
1,619,759✔
286
    sDebug("vgId:%d, failed to process sync msg:%p type:%s since %s", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
82!
287
           tstrerror(code));
288
  }
289
  TAOS_RETURN(code);
1,619,759✔
290
}
291

292
int32_t syncLeaderTransfer(int64_t rid) {
18,608✔
293
  int32_t    code = 0;
18,608✔
294
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
18,608✔
295
  if (pSyncNode == NULL) {
18,608!
296
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
297
    if (terrno != 0) code = terrno;
×
298
    TAOS_RETURN(code);
×
299
  }
300

301
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
18,608✔
302
  syncNodeRelease(pSyncNode);
18,608✔
303
  return ret;
18,607✔
304
}
305

306
int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
173✔
307
  SRaftId id = {0};
173✔
308
  syncNodeBecomeFollower(ths, id, "force election");
173✔
309

310
  SRpcMsg rsp = {
173✔
311
      .code = 0,
312
      .pCont = pRpcMsg->info.rsp,
173✔
313
      .contLen = pRpcMsg->info.rspLen,
173✔
314
      .info = pRpcMsg->info,
315
  };
316
  tmsgSendRsp(&rsp);
173✔
317

318
  return 0;
173✔
319
}
320

321
int32_t syncBecomeAssignedLeader(SSyncNode* ths, SRpcMsg* pRpcMsg) {
×
322
  int32_t code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
323
  void*   pHead = NULL;
×
324
  int32_t contLen = 0;
×
325

326
  SVArbSetAssignedLeaderReq req = {0};
×
327
  if (tDeserializeSVArbSetAssignedLeaderReq((char*)pRpcMsg->pCont + sizeof(SMsgHead), pRpcMsg->contLen, &req) != 0) {
×
328
    sError("vgId:%d, failed to deserialize SVArbSetAssignedLeaderReq", ths->vgId);
×
329
    code = TSDB_CODE_INVALID_MSG;
×
330
    goto _OVER;
×
331
  }
332

333
  if (ths->arbTerm > req.arbTerm) {
×
334
    sInfo("vgId:%d, skip to set assigned leader, msg with lower term, local:%" PRId64 "msg:%" PRId64, ths->vgId,
×
335
          ths->arbTerm, req.arbTerm);
336
    goto _OVER;
×
337
  }
338

339
  ths->arbTerm = TMAX(req.arbTerm, ths->arbTerm);
×
340

341
  if (!req.force) {
×
342
    if (strncmp(req.memberToken, ths->arbToken, TSDB_ARB_TOKEN_SIZE) != 0) {
×
343
      sInfo("vgId:%d, skip to set assigned leader, token mismatch, local:%s, msg:%s", ths->vgId, ths->arbToken,
×
344
            req.memberToken);
345
      goto _OVER;
×
346
    }
347
  }
348

349
  if (ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
×
350
    code = TSDB_CODE_SUCCESS;
×
351
    raftStoreNextTerm(ths);
×
352
    if (terrno != TSDB_CODE_SUCCESS) {
×
353
      code = terrno;
×
354
      sError("vgId:%d, failed to set next term since:%s", ths->vgId, tstrerror(code));
×
355
      goto _OVER;
×
356
    }
357
    syncNodeBecomeAssignedLeader(ths);
×
358

359
    if ((code = syncNodeAppendNoop(ths)) < 0) {
×
360
      sError("vgId:%d, assigned leader failed to append noop entry since %s", ths->vgId, tstrerror(code));
×
361
    }
362
  }
363

364
  SVArbSetAssignedLeaderRsp rsp = {0};
×
365
  rsp.arbToken = req.arbToken;
×
366
  rsp.memberToken = req.memberToken;
×
367
  rsp.vgId = ths->vgId;
×
368

369
  contLen = tSerializeSVArbSetAssignedLeaderRsp(NULL, 0, &rsp);
×
370
  if (contLen <= 0) {
×
371
    code = TSDB_CODE_OUT_OF_MEMORY;
×
372
    sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId);
×
373
    goto _OVER;
×
374
  }
375
  pHead = rpcMallocCont(contLen);
×
376
  if (!pHead) {
×
377
    code = terrno;
×
378
    sError("vgId:%d, failed to malloc memory for SVArbSetAssignedLeaderRsp", ths->vgId);
×
379
    goto _OVER;
×
380
  }
381
  if (tSerializeSVArbSetAssignedLeaderRsp(pHead, contLen, &rsp) <= 0) {
×
382
    code = TSDB_CODE_OUT_OF_MEMORY;
×
383
    sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId);
×
384
    rpcFreeCont(pHead);
×
385
    goto _OVER;
×
386
  }
387

388
  code = TSDB_CODE_SUCCESS;
×
389

390
_OVER:;
×
391
  SRpcMsg rspMsg = {
×
392
      .code = code,
393
      .pCont = pHead,
394
      .contLen = contLen,
395
      .info = pRpcMsg->info,
396
  };
397

398
  tmsgSendRsp(&rspMsg);
×
399

400
  tFreeSVArbSetAssignedLeaderReq(&req);
×
401
  TAOS_RETURN(code);
×
402
}
403

404
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
×
405
  int32_t    code = 0;
×
406
  SSyncNode* pNode = syncNodeAcquire(rid);
×
407
  if (pNode == NULL) {
×
408
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
409
    if (terrno != 0) code = terrno;
×
410
    TAOS_RETURN(code);
×
411
  }
412

413
  SRpcMsg rpcMsg = {0, .info.notFreeAhandle = 1};
×
414
  int32_t ret = syncRespMgrGetAndDel(pNode->pSyncRespMgr, seq, &rpcMsg.info);
×
415
  rpcMsg.code = TSDB_CODE_SYN_TIMEOUT;
×
416

417
  syncNodeRelease(pNode);
×
418
  if (ret == 1) {
×
419
    sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle);
×
420
    code = rpcSendResponse(&rpcMsg);
×
421
    return code;
×
422
  } else {
423
    sError("no message handle to send timeout response, seq:%" PRId64, seq);
×
424
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
425
  }
426
}
427

428
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
69,836✔
429
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;
69,836✔
430

431
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
209,394✔
432
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
139,558✔
433
    if (minMatchIndex == SYNC_INDEX_INVALID) {
139,558✔
434
      minMatchIndex = matchIndex;
75,812✔
435
    } else if (matchIndex > 0 && matchIndex < minMatchIndex) {
63,746✔
436
      minMatchIndex = matchIndex;
1,650✔
437
    }
438
  }
439
  return minMatchIndex;
69,836✔
440
}
441

442
static SyncIndex syncLogRetentionIndex(SSyncNode* pSyncNode, int64_t bytes) {
1,945✔
443
  return pSyncNode->pLogStore->syncLogIndexRetention(pSyncNode->pLogStore, bytes);
1,945✔
444
}
445

446
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
30,709✔
447
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
30,709✔
448
  int32_t    code = 0;
30,709✔
449
  if (pSyncNode == NULL) {
30,709✔
450
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
12✔
451
    if (terrno != 0) code = terrno;
12!
452
    sError("sync begin snapshot error");
12!
453
    TAOS_RETURN(code);
12✔
454
  }
455

456
  SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
30,697✔
457
  SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
30,697✔
458
  bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);
30,697✔
459

460
  if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
30,697!
461
    sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
88!
462
    syncNodeRelease(pSyncNode);
88✔
463
    return 0;
88✔
464
  }
465

466
  int64_t logRetention = 0;
30,609✔
467

468
  if (syncNodeIsMnode(pSyncNode)) {
30,609✔
469
    // mnode
470
    logRetention = tsMndLogRetention;
4,654✔
471
  } else {
472
    // vnode
473
    if (pSyncNode->replicaNum > 1) {
25,955✔
474
      logRetention = SYNC_VNODE_LOG_RETENTION;
1,445✔
475
    }
476
  }
477

478
  if (pSyncNode->totalReplicaNum > 1) {
30,609✔
479
    if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER &&
1,954✔
480
        pSyncNode->state != TAOS_SYNC_STATE_LEARNER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
181!
481
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate or unknown state, do not delete wal",
9!
482
              lastApplyIndex);
483
      syncNodeRelease(pSyncNode);
9✔
484
      return 0;
9✔
485
    }
486
    SyncIndex retentionIndex =
1,945✔
487
        TMAX(pSyncNode->minMatchIndex, syncLogRetentionIndex(pSyncNode, SYNC_WAL_LOG_RETENTION_SIZE));
1,945✔
488
    logRetention += TMAX(0, lastApplyIndex - retentionIndex);
1,945✔
489
  }
490

491
_DEL_WAL:
28,655✔
492

493
  do {
494
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
30,600✔
495
    SyncIndex          snapshotVer = walGetSnapshotVer(pData->pWal);
30,600✔
496
    SyncIndex          walCommitVer = walGetCommittedVer(pData->pWal);
30,600✔
497
    SyncIndex          wallastVer = walGetLastVer(pData->pWal);
30,600✔
498
    if (lastApplyIndex <= walCommitVer) {
30,600!
499
      SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);
30,600✔
500

501
      if (snapshottingIndex == SYNC_INDEX_INVALID) {
30,600!
502
        atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
30,600✔
503
        pSyncNode->snapshottingTime = taosGetTimestampMs();
30,600✔
504

505
        code = walBeginSnapshot(pData->pWal, lastApplyIndex, logRetention);
30,600✔
506
        if (code == 0) {
30,600!
507
          sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
30,600✔
508
                  pSyncNode->snapshottingIndex, lastApplyIndex);
509
        } else {
510
          sNError(pSyncNode, "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64,
×
511
                  terrstr(), pSyncNode->snapshottingIndex, lastApplyIndex);
512
          atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
×
513
        }
514

515
      } else {
516
        sNTrace(pSyncNode, "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64,
×
517
                snapshottingIndex, lastApplyIndex);
518
      }
519
    }
520
  } while (0);
521

522
  syncNodeRelease(pSyncNode);
30,600✔
523
  TAOS_RETURN(code);
30,600✔
524
}
525

526
int32_t syncEndSnapshot(int64_t rid) {
30,696✔
527
  int32_t    code = 0;
30,696✔
528
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
30,696✔
529
  if (pSyncNode == NULL) {
30,697!
530
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
531
    if (terrno != 0) code = terrno;
×
532
    sError("sync end snapshot error");
×
533
    TAOS_RETURN(code);
×
534
  }
535

536
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
30,697✔
537
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
30,600✔
538
    code = walEndSnapshot(pData->pWal);
30,600✔
539
    if (code != 0) {
30,600!
540
      sNError(pSyncNode, "wal snapshot end error since:%s", tstrerror(code));
×
541
      syncNodeRelease(pSyncNode);
×
542
      TAOS_RETURN(code);
×
543
    } else {
544
      sNTrace(pSyncNode, "wal snapshot end, index:%" PRId64, atomic_load_64(&pSyncNode->snapshottingIndex));
30,600✔
545
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
30,600✔
546
    }
547
  }
548

549
  syncNodeRelease(pSyncNode);
30,697✔
550
  TAOS_RETURN(code);
30,697✔
551
}
552

553
bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
4,073,687✔
554
  if (pSyncNode == NULL) {
4,073,687!
555
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
556
    sError("sync ready for read error");
×
557
    return false;
×
558
  }
559

560
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
4,073,687!
561
    terrno = TSDB_CODE_SYN_NOT_LEADER;
108,705✔
562
    return false;
108,706✔
563
  }
564

565
  if (!pSyncNode->restoreFinish) {
3,964,982✔
566
    terrno = TSDB_CODE_SYN_RESTORING;
25,051✔
567
    return false;
25,051✔
568
  }
569

570
  return true;
3,939,931✔
571
}
572

573
bool syncIsReadyForRead(int64_t rid) {
3,637,998✔
574
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
3,637,998✔
575
  if (pSyncNode == NULL) {
3,639,411!
576
    sError("sync ready for read error");
×
577
    return false;
×
578
  }
579

580
  bool ready = syncNodeIsReadyForRead(pSyncNode);
3,639,411✔
581

582
  syncNodeRelease(pSyncNode);
3,639,739✔
583
  return ready;
3,638,565✔
584
}
585

586
#ifdef BUILD_NO_CALL
587
bool syncSnapshotSending(int64_t rid) {
588
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
589
  if (pSyncNode == NULL) {
590
    return false;
591
  }
592

593
  bool b = syncNodeSnapshotSending(pSyncNode);
594
  syncNodeRelease(pSyncNode);
595
  return b;
596
}
597

598
bool syncSnapshotRecving(int64_t rid) {
599
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
600
  if (pSyncNode == NULL) {
601
    return false;
602
  }
603

604
  bool b = syncNodeSnapshotRecving(pSyncNode);
605
  syncNodeRelease(pSyncNode);
606
  return b;
607
}
608
#endif
609

610
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
18,608✔
611
  if (pSyncNode->peersNum == 0) {
18,608✔
612
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
13,308✔
613
    return 0;
13,308✔
614
  }
615

616
  int32_t ret = 0;
5,300✔
617
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->replicaNum > 1) {
5,300✔
618
    SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
1,504✔
619
    if (pSyncNode->peersNum == 2) {
1,504✔
620
      SyncIndex matchIndex0 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
999✔
621
      SyncIndex matchIndex1 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[1]));
999✔
622
      if (matchIndex1 > matchIndex0) {
999✔
623
        newLeader = (pSyncNode->peersNodeInfo)[1];
79✔
624
      }
625
    }
626
    ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
1,504✔
627
  }
628

629
  return ret;
5,300✔
630
}
631

632
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
1,504✔
633
  if (pSyncNode->replicaNum == 1) {
1,504!
634
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
×
635
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
636
  }
637

638
  sNTrace(pSyncNode, "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
1,504!
639

640
  SRpcMsg rpcMsg = {0};
1,504✔
641
  TAOS_CHECK_RETURN(syncBuildLeaderTransfer(&rpcMsg, pSyncNode->vgId));
1,504!
642

643
  SyncLeaderTransfer* pMsg = rpcMsg.pCont;
1,504✔
644
  pMsg->newLeaderId.addr = SYNC_ADDR(&newLeader);
1,504✔
645
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
1,504✔
646
  pMsg->newNodeInfo = newLeader;
1,504✔
647

648
  int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false, NULL);
1,504✔
649
  rpcFreeCont(rpcMsg.pCont);
1,504✔
650
  return ret;
1,504✔
651
}
652

653
SSyncState syncGetState(int64_t rid) {
1,824,599✔
654
  SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
1,824,599✔
655

656
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
1,824,599✔
657
  if (pSyncNode != NULL) {
1,825,356✔
658
    state.state = pSyncNode->state;
1,825,342✔
659
    state.roleTimeMs = pSyncNode->roleTimeMs;
1,825,342✔
660
    state.startTimeMs = pSyncNode->startTime;
1,825,342✔
661
    state.restored = pSyncNode->restoreFinish;
1,825,342✔
662
    if (pSyncNode->vgId != 1) {
1,825,342✔
663
      state.canRead = syncNodeIsReadyForRead(pSyncNode);
435,633✔
664
    } else {
665
      state.canRead = state.restored;
1,389,709✔
666
    }
667
    /*
668
    double progress = 0;
669
    if(pSyncNode->pLogBuf->totalIndex > 0 && pSyncNode->pLogBuf->commitIndex > 0){
670
      progress = (double)pSyncNode->pLogBuf->commitIndex/(double)pSyncNode->pLogBuf->totalIndex;
671
      state.progress = (int32_t)(progress * 100);
672
    }
673
    else{
674
      state.progress = -1;
675
    }
676
    sDebug("vgId:%d, learner progress state, commitIndex:%" PRId64 " totalIndex:%" PRId64 ", "
677
            "progress:%lf, progress:%d",
678
          pSyncNode->vgId,
679
         pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->totalIndex, progress, state.progress);
680
    */
681
    state.term = raftStoreGetTerm(pSyncNode);
1,825,321✔
682
    syncNodeRelease(pSyncNode);
1,825,373✔
683
  }
684

685
  return state;
1,825,349✔
686
}
687

688
SSyncMetrics syncGetMetrics(int64_t rid) {
×
689
  SSyncMetrics metrics = {0};
×
690

691
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
692
  if (pSyncNode != NULL) {
×
693
    sDebug("vgId:%d, sync get metrics, wal_write_bytes:%" PRId64 ", wal_write_time:%" PRId64, pSyncNode->vgId,
×
694
           pSyncNode->wal_write_bytes, pSyncNode->wal_write_time);
695
    metrics.wal_write_bytes = atomic_load_64(&pSyncNode->wal_write_bytes);
×
696
    metrics.wal_write_time = atomic_load_64(&pSyncNode->wal_write_time);
×
697
    syncNodeRelease(pSyncNode);
×
698
  }
699
  return metrics;
×
700
}
701

702
void syncResetMetrics(int64_t rid, const SSyncMetrics* pOldMetrics) {
×
703
  if (pOldMetrics == NULL) return;
×
704

705
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
706
  if (pSyncNode != NULL) {
×
707
    // Atomically subtract the old metrics values from current metrics
708
    (void)atomic_sub_fetch_64(&pSyncNode->wal_write_bytes, pOldMetrics->wal_write_bytes);
×
709
    (void)atomic_sub_fetch_64(&pSyncNode->wal_write_time, pOldMetrics->wal_write_time);
×
710
    syncNodeRelease(pSyncNode);
×
711
  }
712
}
713

714
void syncGetCommitIndex(int64_t rid, int64_t* syncCommitIndex) {
309,591✔
715
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
309,591✔
716
  if (pSyncNode != NULL) {
309,591!
717
    *syncCommitIndex = pSyncNode->commitIndex;
309,591✔
718
    syncNodeRelease(pSyncNode);
309,591✔
719
  }
720
}
309,591✔
721

722
int32_t syncGetArbToken(int64_t rid, char* outToken) {
51,044✔
723
  int32_t    code = 0;
51,044✔
724
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
51,044✔
725
  if (pSyncNode == NULL) {
51,044!
726
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
727
    if (terrno != 0) code = terrno;
×
728
    TAOS_RETURN(code);
×
729
  }
730

731
  memset(outToken, 0, TSDB_ARB_TOKEN_SIZE);
51,044✔
732
  (void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
51,044✔
733
  tstrncpy(outToken, pSyncNode->arbToken, TSDB_ARB_TOKEN_SIZE);
51,044✔
734
  (void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
51,044✔
735

736
  syncNodeRelease(pSyncNode);
51,044✔
737
  TAOS_RETURN(code);
51,044✔
738
}
739

740
int32_t syncCheckSynced(int64_t rid) {
3✔
741
  int32_t    code = 0;
3✔
742
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
3✔
743
  if (pSyncNode == NULL) {
3!
744
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
745
    if (terrno != 0) code = terrno;
×
746
    TAOS_RETURN(code);
×
747
  }
748

749
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
3!
750
    code = TSDB_CODE_VND_ARB_NOT_SYNCED;
×
751
    syncNodeRelease(pSyncNode);
×
752
    TAOS_RETURN(code);
×
753
  }
754

755
  bool isSync = pSyncNode->commitIndex >= pSyncNode->assignedCommitIndex;
3✔
756
  code = (isSync ? TSDB_CODE_SUCCESS : TSDB_CODE_VND_ARB_NOT_SYNCED);
3!
757

758
  syncNodeRelease(pSyncNode);
3✔
759
  TAOS_RETURN(code);
3✔
760
}
761

762
int32_t syncUpdateArbTerm(int64_t rid, SyncTerm arbTerm) {
122✔
763
  int32_t    code = 0;
122✔
764
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
122✔
765
  if (pSyncNode == NULL) {
122!
766
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
767
    if (terrno != 0) code = terrno;
×
768
    TAOS_RETURN(code);
×
769
  }
770

771
  pSyncNode->arbTerm = TMAX(arbTerm, pSyncNode->arbTerm);
122✔
772
  syncNodeRelease(pSyncNode);
122✔
773
  TAOS_RETURN(code);
122✔
774
}
775

776
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
1,298,053✔
777
  if (pSyncNode->raftCfg.configIndexCount < 1) {
1,298,053!
778
    sError("vgId:%d, failed get snapshot config index, configIndexCount:%d", pSyncNode->vgId,
×
779
           pSyncNode->raftCfg.configIndexCount);
780
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
781
    return -2;
×
782
  }
783
  SyncIndex lastIndex = (pSyncNode->raftCfg.configIndexArr)[0];
1,298,053✔
784

785
  for (int32_t i = 0; i < pSyncNode->raftCfg.configIndexCount; ++i) {
2,640,468✔
786
    if ((pSyncNode->raftCfg.configIndexArr)[i] > lastIndex &&
1,342,415✔
787
        (pSyncNode->raftCfg.configIndexArr)[i] <= snapshotLastApplyIndex) {
44,365✔
788
      lastIndex = (pSyncNode->raftCfg.configIndexArr)[i];
44,281✔
789
    }
790
  }
791
  sTrace("vgId:%d, index:%" PRId64 ", get last snapshot config index:%" PRId64, pSyncNode->vgId, snapshotLastApplyIndex,
1,298,053✔
792
         lastIndex);
793

794
  return lastIndex;
1,298,053✔
795
}
796

797
static SRaftId syncGetRaftIdByEp(SSyncNode* pSyncNode, const SEp* pEp) {
320,306✔
798
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
540,446✔
799
    if (strcmp(pEp->fqdn, (pSyncNode->peersNodeInfo)[i].nodeFqdn) == 0 &&
361,705✔
800
        pEp->port == (pSyncNode->peersNodeInfo)[i].nodePort) {
361,688✔
801
      return pSyncNode->peersId[i];
141,565✔
802
    }
803
  }
804
  return EMPTY_RAFT_ID;
178,741✔
805
}
806

807
static void epsetToString(const SEpSet* pEpSet, char* buffer, size_t bufferSize) {
178,856✔
808
  if (pEpSet == NULL || buffer == NULL) {
178,856!
809
    snprintf(buffer, bufferSize, "EpSet is NULL");
×
810
    return;
×
811
  }
812

813
  size_t offset = 0;
178,860✔
814
  offset += snprintf(buffer + offset, bufferSize - offset, "EpSet: [");
178,860✔
815

816
  for (int i = 0; i < pEpSet->numOfEps; ++i) {
499,165✔
817
    if (offset >= bufferSize) break;
320,305!
818
    offset += snprintf(buffer + offset, bufferSize - offset, "%s:%d%s", pEpSet->eps[i].fqdn, pEpSet->eps[i].port,
320,305✔
819
                       (i + 1 < pEpSet->numOfEps) ? ", " : "");
320,305✔
820
  }
821

822
  if (offset < bufferSize) {
178,860!
823
    snprintf(buffer + offset, bufferSize - offset, "]");
178,866✔
824
  }
825
}
826

827
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
178,863✔
828
  pEpSet->numOfEps = 0;
178,863✔
829

830
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
178,863✔
831
  if (pSyncNode == NULL) return;
178,868!
832

833
  int index = -1;
178,868✔
834

835
  sDebug("vgId:%d, sync get retry epset, leaderCache:%" PRIx64 ", leaderCacheEp.fqdn:%s, leaderCacheEp.port:%d",
178,868✔
836
         pSyncNode->vgId, pSyncNode->leaderCache.addr, pSyncNode->leaderCacheEp.fqdn, pSyncNode->leaderCacheEp.port);
837
  int j = 0;
178,869✔
838
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
501,141✔
839
    if (pSyncNode->raftCfg.cfg.nodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) continue;
322,265✔
840
    SEp* pEp = &pEpSet->eps[j];
320,308✔
841
    tstrncpy(pEp->fqdn, pSyncNode->raftCfg.cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
320,308✔
842
    pEp->port = (pSyncNode->raftCfg.cfg.nodeInfo)[i].nodePort;
320,308✔
843
    pEpSet->numOfEps++;
320,308✔
844
    SRaftId id = syncGetRaftIdByEp(pSyncNode, pEp);
320,308✔
845
    sDebug("vgId:%d, sync get retry epset, index:%d id:%" PRIx64 " %s:%d", pSyncNode->vgId, i, id.addr, pEp->fqdn,
320,310✔
846
           pEp->port);
847
    if (pEp->port == pSyncNode->leaderCacheEp.port &&
320,315✔
848
        strncmp(pEp->fqdn, pSyncNode->leaderCacheEp.fqdn, TSDB_FQDN_LEN) == 0 /*&&
138,682✔
849
        id.vgId == pSyncNode->leaderCache.vgId && id.addr != 0 && id.vgId != 0*/) {
850
      index = j;
138,679✔
851
    }
852
    j++;
320,315✔
853
  }
854
  if (pEpSet->numOfEps > 0) {
178,876✔
855
    if (index != -1) {
178,863✔
856
      pEpSet->inUse = index;
138,683✔
857
    } else {
858
      if (pSyncNode->myRaftId.addr == pSyncNode->leaderCache.addr &&
40,180!
859
          pSyncNode->myRaftId.vgId == pSyncNode->leaderCache.vgId) {
×
860
        pEpSet->inUse = pSyncNode->raftCfg.cfg.myIndex;
×
861
      } else {
862
        pEpSet->inUse = (pSyncNode->raftCfg.cfg.myIndex + 1) % pEpSet->numOfEps;
40,180✔
863
      }
864
    }
865
    // pEpSet->inUse = 0;
866
  }
867
  epsetSort(pEpSet);
178,876✔
868

869
  char buffer[1024];
870
  epsetToString(pEpSet, buffer, sizeof(buffer));
178,855✔
871
  sDebug("vgId:%d, sync get retry epset numOfEps:%d %s inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, buffer,
178,863✔
872
         pEpSet->inUse);
873
  syncNodeRelease(pSyncNode);
178,863✔
874
}
875

876
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
2,972,272✔
877
  int32_t    code = 0;
2,972,272✔
878
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
2,972,272✔
879
  if (pSyncNode == NULL) {
2,972,482✔
880
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
1✔
881
    if (terrno != 0) code = terrno;
1!
882
    sError("sync propose error");
1!
883
    TAOS_RETURN(code);
1✔
884
  }
885

886
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak, seq);
2,972,481✔
887
  syncNodeRelease(pSyncNode);
2,971,876✔
888
  return ret;
2,972,204✔
889
}
890

891
int32_t syncCheckMember(int64_t rid) {
×
892
  int32_t    code = 0;
×
893
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
894
  if (pSyncNode == NULL) {
×
895
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
896
    if (terrno != 0) code = terrno;
×
897
    sError("sync propose error");
×
898
    TAOS_RETURN(code);
×
899
  }
900

901
  if (pSyncNode->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) {
×
902
    syncNodeRelease(pSyncNode);
×
903
    return TSDB_CODE_SYN_WRONG_ROLE;
×
904
  }
905

906
  syncNodeRelease(pSyncNode);
×
907
  return 0;
×
908
}
909

910
int32_t syncIsCatchUp(int64_t rid) {
6,603✔
911
  int32_t    code = 0;
6,603✔
912
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
6,603✔
913
  if (pSyncNode == NULL) {
6,603!
914
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
915
    if (terrno != 0) code = terrno;
×
916
    sError("sync Node Acquire error since %d", ERRNO);
×
917
    TAOS_RETURN(code);
×
918
  }
919

920
  int32_t isCatchUp = 0;
6,603✔
921
  if (pSyncNode->pLogBuf->totalIndex < 0 || pSyncNode->pLogBuf->commitIndex < 0 ||
6,603!
922
      pSyncNode->pLogBuf->totalIndex < pSyncNode->pLogBuf->commitIndex ||
1,262!
923
      pSyncNode->pLogBuf->totalIndex - pSyncNode->pLogBuf->commitIndex > SYNC_LEARNER_CATCHUP) {
1,262✔
924
    sInfo("vgId:%d, Not catch up, wait one second, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64,
6,209!
925
          pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex,
926
          pSyncNode->pLogBuf->matchIndex);
927
    isCatchUp = 0;
6,209✔
928
  } else {
929
    sInfo("vgId:%d, Catch up, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, pSyncNode->vgId,
394!
930
          pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->matchIndex);
931
    isCatchUp = 1;
394✔
932
  }
933

934
  syncNodeRelease(pSyncNode);
6,603✔
935
  return isCatchUp;
6,603✔
936
}
937

938
ESyncRole syncGetRole(int64_t rid) {
6,603✔
939
  int32_t    code = 0;
6,603✔
940
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
6,603✔
941
  if (pSyncNode == NULL) {
6,603!
942
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
943
    if (terrno != 0) code = terrno;
×
944
    sError("sync Node Acquire error since %d", ERRNO);
×
945
    TAOS_RETURN(code);
×
946
  }
947

948
  ESyncRole role = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole;
6,603✔
949

950
  syncNodeRelease(pSyncNode);
6,603✔
951
  return role;
6,603✔
952
}
953

954
int64_t syncGetTerm(int64_t rid) {
20,193✔
955
  int32_t    code = 0;
20,193✔
956
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
20,193✔
957
  if (pSyncNode == NULL) {
20,193!
958
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
959
    if (terrno != 0) code = terrno;
×
960
    sError("sync Node Acquire error since %d", ERRNO);
×
961
    TAOS_RETURN(code);
×
962
  }
963

964
  int64_t term = raftStoreGetTerm(pSyncNode);
20,193✔
965

966
  syncNodeRelease(pSyncNode);
20,193✔
967
  return term;
20,193✔
968
}
969

970
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
2,973,810✔
971
  int32_t code = 0;
2,973,810✔
972
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
2,973,810!
973
    code = TSDB_CODE_SYN_NOT_LEADER;
9,252✔
974
    sNWarn(pSyncNode, "sync propose not leader, type:%s", TMSG_INFO(pMsg->msgType));
9,252!
975
    TAOS_RETURN(code);
9,252✔
976
  }
977

978
  if (!pSyncNode->restoreFinish) {
2,964,558✔
979
    code = TSDB_CODE_SYN_PROPOSE_NOT_READY;
79✔
980
    sNWarn(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
79!
981
           TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
982
    TAOS_RETURN(code);
79✔
983
  }
984

985
  // heartbeat timeout
986
  if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER && syncNodeHeartbeatReplyTimeout(pSyncNode)) {
2,964,479!
987
    code = TSDB_CODE_SYN_PROPOSE_NOT_READY;
4✔
988
    sNError(pSyncNode, "failed to sync propose since heartbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
4!
989
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
990
    TAOS_RETURN(code);
4✔
991
  }
992

993
  // optimized one replica
994
  if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
2,964,416✔
995
    SyncIndex retIndex;
996
    int32_t   code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
2,630,259✔
997
    if (code >= 0) {
2,629,690!
998
      pMsg->info.conn.applyIndex = retIndex;
2,629,792✔
999
      pMsg->info.conn.applyTerm = raftStoreGetTerm(pSyncNode);
2,629,792✔
1000

1001
      // after raft member change, need to handle 1->2 switching point
1002
      // at this point, need to switch entry handling thread
1003
      if (pSyncNode->replicaNum == 1) {
2,630,276✔
1004
        sGDebug(&pMsg->info.traceId, "vgId:%d, index:%" PRId64 ", propose optimized msg type:%s", pSyncNode->vgId,
2,630,253!
1005
                retIndex, TMSG_INFO(pMsg->msgType));
1006
        return 1;
2,630,032✔
1007
      } else {
1008
        sGDebug(&pMsg->info.traceId,
23!
1009
                "vgId:%d, index:%" PRId64 ", propose optimized msg, return to normal, type:%s, handle:%p",
1010
                pSyncNode->vgId, retIndex, TMSG_INFO(pMsg->msgType), pMsg->info.handle);
1011
        return 0;
×
1012
      }
1013
    } else {
1014
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1015
      sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
×
1016
             TMSG_INFO(pMsg->msgType));
1017
      TAOS_RETURN(code);
×
1018
    }
1019
  } else {
1020
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
334,348✔
1021
    uint64_t  seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
334,356✔
1022
    SRpcMsg   rpcMsg = {.info.traceId = pMsg->info.traceId};
334,362✔
1023
    int32_t   code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
334,362✔
1024
    if (code != 0) {
334,361!
1025
      sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
×
1026
      code = syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
×
1027
      TAOS_RETURN(code);
×
1028
    }
1029

1030
    sGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, propose msg, type:%s", pSyncNode->vgId, pMsg,
334,361!
1031
            TMSG_INFO(pMsg->msgType));
1032
    code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
334,361✔
1033
    if (code != 0) {
334,355✔
1034
      sWarn("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
2,062!
1035
      TAOS_CHECK_RETURN(syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum));
2,062✔
1036
    }
1037

1038
    if (seq != NULL) *seq = seqNum;
334,323✔
1039
    TAOS_RETURN(code);
334,323✔
1040
  }
1041
}
1042

1043
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
311,451✔
1044
  pSyncTimer->pTimer = NULL;
311,451✔
1045
  pSyncTimer->counter = 0;
311,451✔
1046
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
311,451✔
1047
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
311,451✔
1048
  pSyncTimer->destId = destId;
311,451✔
1049
  pSyncTimer->timeStamp = taosGetTimestampMs();
311,450✔
1050
  atomic_store_64(&pSyncTimer->logicClock, 0);
311,450✔
1051
  return 0;
311,452✔
1052
}
1053

1054
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
3,917✔
1055
  int32_t code = 0;
3,917✔
1056
  int64_t tsNow = taosGetTimestampMs();
3,917✔
1057
  if (syncIsInit()) {
3,917!
1058
    SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid);
3,917✔
1059
    if (pData == NULL) {
3,917!
1060
      pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
3,917!
1061
      pData->rid = syncHbTimerDataAdd(pData);
3,917✔
1062
    }
1063
    pSyncTimer->hbDataRid = pData->rid;
3,917✔
1064
    pSyncTimer->timeStamp = tsNow;
3,917✔
1065

1066
    pData->syncNodeRid = pSyncNode->rid;
3,917✔
1067
    pData->pTimer = pSyncTimer;
3,917✔
1068
    pData->destId = pSyncTimer->destId;
3,917✔
1069
    pData->logicClock = pSyncTimer->logicClock;
3,917✔
1070
    pData->execTime = tsNow + pSyncTimer->timerMS;
3,917✔
1071

1072
    sTrace("vgId:%d, start hb timer, rid:%" PRId64 " addr:0x%" PRIx64 " at %d", pSyncNode->vgId, pData->rid,
3,917!
1073
           pData->destId.addr, pSyncTimer->timerMS);
1074

1075
    bool stopped = taosTmrResetPriority(pSyncTimer->timerCb, pSyncTimer->timerMS, (void*)(pData->rid),
3,917✔
1076
                                        syncEnv()->pTimerManager, &pSyncTimer->pTimer, 2);
3,917✔
1077
    if (stopped) {
3,917!
1078
      sError("vgId:%d, failed to reset hb timer success", pSyncNode->vgId);
×
1079
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1080
    }
1081
  } else {
1082
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1083
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
×
1084
  }
1085
  return code;
3,917✔
1086
}
1087

1088
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
40,129✔
1089
  int32_t ret = 0;
40,129✔
1090
  (void)atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
40,129✔
1091
  bool stop = taosTmrStop(pSyncTimer->pTimer);
40,129✔
1092
  sDebug("vgId:%d, stop hb timer stop:%d", pSyncNode->vgId, stop);
40,127✔
1093
  pSyncTimer->pTimer = NULL;
40,127✔
1094
  syncHbTimerDataRemove(pSyncTimer->hbDataRid);
40,127✔
1095
  pSyncTimer->hbDataRid = -1;
40,128✔
1096
  return ret;
40,128✔
1097
}
1098

1099
int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
18,610✔
1100
  int32_t code = 0;
18,610✔
1101
  if (pNode->pLogStore == NULL) {
18,610!
1102
    sError("vgId:%d, log store not created", pNode->vgId);
×
1103
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1104
  }
1105
  if (pNode->pFsm == NULL) {
18,610!
1106
    sError("vgId:%d, pFsm not registered", pNode->vgId);
×
1107
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1108
  }
1109
  if (pNode->pFsm->FpGetSnapshotInfo == NULL) {
18,610!
1110
    sError("vgId:%d, FpGetSnapshotInfo not registered", pNode->vgId);
×
1111
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1112
  }
1113
  SSnapshot snapshot = {0};
18,610✔
1114
  // TODO check return value
1115
  (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
18,610✔
1116

1117
  SyncIndex commitIndex = snapshot.lastApplyIndex;
18,610✔
1118
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
18,610✔
1119
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
18,610✔
1120
  if ((lastVer < commitIndex || firstVer > commitIndex + 1) || pNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
18,610!
1121
    if ((code = pNode->pLogStore->syncLogRestoreFromSnapshot(pNode->pLogStore, commitIndex)) != 0) {
1!
1122
      sError("vgId:%d, failed to restore log store from snapshot since %s. lastVer:%" PRId64 ", snapshotVer:%" PRId64,
×
1123
             pNode->vgId, terrstr(), lastVer, commitIndex);
1124
      TAOS_RETURN(code);
×
1125
    }
1126
  }
1127
  TAOS_RETURN(code);
18,610✔
1128
}
1129

1130
// open/close --------------
1131
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
18,608✔
1132
  int32_t    code = 0;
18,608✔
1133
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
18,608!
1134
  if (pSyncNode == NULL) {
18,610!
1135
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1136
    goto _error;
×
1137
  }
1138

1139
  if (!taosDirExist((char*)(pSyncInfo->path))) {
18,610✔
1140
    if (taosMkDir(pSyncInfo->path) != 0) {
14,482!
1141
      terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1142
      sError("vgId:%d, failed to create dir:%s since %s", pSyncInfo->vgId, pSyncInfo->path, terrstr());
×
1143
      goto _error;
×
1144
    }
1145
  }
1146

1147
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
18,610✔
1148
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
18,610✔
1149
           TD_DIRSEP);
1150
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
18,610✔
1151

1152
  if (!taosCheckExistFile(pSyncNode->configPath)) {
18,610✔
1153
    // create a new raft config file
1154
    sInfo("vgId:%d, create a new raft config file", pSyncInfo->vgId);
14,481✔
1155
    pSyncNode->vgId = pSyncInfo->vgId;
14,482✔
1156
    pSyncNode->raftCfg.isStandBy = pSyncInfo->isStandBy;
14,482✔
1157
    pSyncNode->raftCfg.snapshotStrategy = pSyncInfo->snapshotStrategy;
14,482✔
1158
    pSyncNode->raftCfg.lastConfigIndex = pSyncInfo->syncCfg.lastIndex;
14,482✔
1159
    pSyncNode->raftCfg.batchSize = pSyncInfo->batchSize;
14,482✔
1160
    pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
14,482✔
1161
    pSyncNode->raftCfg.configIndexCount = 1;
14,482✔
1162
    pSyncNode->raftCfg.configIndexArr[0] = -1;
14,482✔
1163

1164
    if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
14,482!
1165
      terrno = code;
×
1166
      sError("vgId:%d, failed to create sync cfg file", pSyncNode->vgId);
×
1167
      goto _error;
×
1168
    }
1169
  } else {
1170
    // update syncCfg by raft_config.json
1171
    if ((code = syncReadCfgFile(pSyncNode)) != 0) {
4,128!
1172
      terrno = code;
×
1173
      sError("vgId:%d, failed to read sync cfg file", pSyncNode->vgId);
×
1174
      goto _error;
×
1175
    }
1176

1177
    if (vnodeVersion > pSyncNode->raftCfg.cfg.changeVersion) {
4,128✔
1178
      if (pSyncInfo->syncCfg.totalReplicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) {
2,768!
1179
        sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
2,360!
1180
        pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
2,360✔
1181
        if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
2,360!
1182
          terrno = code;
×
1183
          sError("vgId:%d, failed to write sync cfg file", pSyncNode->vgId);
×
1184
          goto _error;
×
1185
        }
1186
      } else {
1187
        sInfo("vgId:%d, use sync config from sync cfg file", pSyncNode->vgId);
408!
1188
        pSyncInfo->syncCfg = pSyncNode->raftCfg.cfg;
408✔
1189
      }
1190
    } else {
1191
      sInfo("vgId:%d, skip save sync cfg file since request ver:%d <= file ver:%d", pSyncNode->vgId, vnodeVersion,
1,360!
1192
            pSyncInfo->syncCfg.changeVersion);
1193
    }
1194
  }
1195

1196
  // init by SSyncInfo
1197
  pSyncNode->vgId = pSyncInfo->vgId;
18,610✔
1198
  SSyncCfg* pCfg = &pSyncNode->raftCfg.cfg;
18,610✔
1199
  bool      updated = false;
18,610✔
1200
  sInfo("vgId:%d, start to open sync node, totalReplicaNum:%d replicaNum:%d selfIndex:%d", pSyncNode->vgId,
18,610✔
1201
        pCfg->totalReplicaNum, pCfg->replicaNum, pCfg->myIndex);
1202
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
48,053✔
1203
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
29,443✔
1204
    if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) {
29,443!
1205
      updated = true;
×
1206
    }
1207
    sInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort,
29,443✔
1208
          pNode->nodeId, pNode->clusterId);
1209
  }
1210

1211
  if (vnodeVersion > pSyncInfo->syncCfg.changeVersion) {
18,610✔
1212
    if (updated) {
2,550!
1213
      sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId);
×
1214
      if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
×
1215
        terrno = code;
×
1216
        sError("vgId:%d, failed to write sync cfg file on dnode info updated", pSyncNode->vgId);
×
1217
        goto _error;
×
1218
      }
1219
    }
1220
  }
1221

1222
  pSyncNode->pWal = pSyncInfo->pWal;
18,610✔
1223
  pSyncNode->msgcb = pSyncInfo->msgcb;
18,610✔
1224
  pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
18,610✔
1225
  pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
18,610✔
1226
  pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
18,610✔
1227

1228
  // create raft log ring buffer
1229
  code = syncLogBufferCreate(&pSyncNode->pLogBuf);
18,610✔
1230
  if (pSyncNode->pLogBuf == NULL) {
18,610!
1231
    sError("failed to init sync log buffer since %s. vgId:%d", tstrerror(code), pSyncNode->vgId);
×
1232
    goto _error;
×
1233
  }
1234

1235
  // init replicaNum, replicasId
1236
  pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
18,610✔
1237
  pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
18,610✔
1238
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
48,053✔
1239
    if (syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]) ==
29,443!
1240
        false) {
1241
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
×
1242
      goto _error;
×
1243
    }
1244
  }
1245

1246
  // init internal
1247
  pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
18,610✔
1248
  pSyncNode->myRaftId = pSyncNode->replicasId[pSyncNode->raftCfg.cfg.myIndex];
18,610✔
1249

1250
  // init peersNum, peers, peersId
1251
  pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
18,610✔
1252
  int32_t j = 0;
18,610✔
1253
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
48,053✔
1254
    if (i != pSyncNode->raftCfg.cfg.myIndex) {
29,443✔
1255
      pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
10,833✔
1256
      pSyncNode->peersId[j] = pSyncNode->replicasId[i];
10,833✔
1257
      syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
10,833✔
1258
      j++;
10,833✔
1259
    }
1260
  }
1261

1262
  pSyncNode->arbTerm = -1;
18,610✔
1263
  (void)taosThreadMutexInit(&pSyncNode->arbTokenMutex, NULL);
18,610✔
1264
  syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncInfo->vgId, pSyncNode->arbToken);
18,610✔
1265
  sInfo("vgId:%d, generate arb token:%s", pSyncNode->vgId, pSyncNode->arbToken);
18,610✔
1266

1267
  // init raft algorithm
1268
  pSyncNode->pFsm = pSyncInfo->pFsm;
18,610✔
1269
  pSyncInfo->pFsm = NULL;
18,610✔
1270
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
18,610✔
1271
  pSyncNode->leaderCache = EMPTY_RAFT_ID;
18,610✔
1272
  pSyncNode->leaderCacheEp.port = 0;
18,610✔
1273
  pSyncNode->leaderCacheEp.fqdn[0] = '\0';
18,610✔
1274

1275
  // init life cycle outside
1276

1277
  // TLA+ Spec
1278
  // InitHistoryVars == /\ elections = {}
1279
  //                    /\ allLogs   = {}
1280
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
1281
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
1282
  //                   /\ state       = [i \in Server |-> Follower]
1283
  //                   /\ votedFor    = [i \in Server |-> Nil]
1284
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
1285
  //                      /\ votesGranted   = [i \in Server |-> {}]
1286
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
1287
  // \* leader does not send itself messages. It's still easier to include these
1288
  // \* in the functions.
1289
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
1290
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
1291
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
1292
  //                /\ commitIndex  = [i \in Server |-> 0]
1293
  // Init == /\ messages = [m \in {} |-> 0]
1294
  //         /\ InitHistoryVars
1295
  //         /\ InitServerVars
1296
  //         /\ InitCandidateVars
1297
  //         /\ InitLeaderVars
1298
  //         /\ InitLogVars
1299
  //
1300

1301
  // init TLA+ server vars
1302
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
18,610✔
1303
  pSyncNode->roleTimeMs = taosGetTimestampMs();
18,610✔
1304
  if ((code = raftStoreOpen(pSyncNode)) != 0) {
18,610!
1305
    terrno = code;
×
1306
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
×
1307
    goto _error;
×
1308
  }
1309

1310
  // init TLA+ candidate vars
1311
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
18,610✔
1312
  if (pSyncNode->pVotesGranted == NULL) {
18,610!
1313
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
×
1314
    goto _error;
×
1315
  }
1316
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
18,610✔
1317
  if (pSyncNode->pVotesRespond == NULL) {
18,610!
1318
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
×
1319
    goto _error;
×
1320
  }
1321

1322
  // init TLA+ leader vars
1323
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
18,610✔
1324
  if (pSyncNode->pNextIndex == NULL) {
18,610!
1325
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
×
1326
    goto _error;
×
1327
  }
1328
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
18,610✔
1329
  if (pSyncNode->pMatchIndex == NULL) {
18,610!
1330
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
×
1331
    goto _error;
×
1332
  }
1333

1334
  // init TLA+ log vars
1335
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
18,610✔
1336
  if (pSyncNode->pLogStore == NULL) {
18,610!
1337
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
×
1338
    goto _error;
×
1339
  }
1340

1341
  SyncIndex commitIndex = SYNC_INDEX_INVALID;
18,610✔
1342
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
18,610!
1343
    SSnapshot snapshot = {0};
18,610✔
1344
    // TODO check return value
1345
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
18,610✔
1346
    if (snapshot.lastApplyIndex > commitIndex) {
18,609✔
1347
      commitIndex = snapshot.lastApplyIndex;
1,888✔
1348
      sNTrace(pSyncNode, "reset commit index by snapshot");
1,888✔
1349
    }
1350
    pSyncNode->fsmState = snapshot.state;
18,609✔
1351
    if (pSyncNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
18,609!
1352
      sError("vgId:%d, fsm state is incomplete.", pSyncNode->vgId);
×
1353
      if (pSyncNode->replicaNum == 1) {
×
1354
        terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1355
        goto _error;
×
1356
      }
1357
    }
1358
  }
1359
  pSyncNode->commitIndex = commitIndex;
18,609✔
1360
  sInfo("vgId:%d, sync node commitIndex initialized as %" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
18,609✔
1361

1362
  // restore log store on need
1363
  if ((code = syncNodeLogStoreRestoreOnNeed(pSyncNode)) < 0) {
18,610!
1364
    terrno = code;
×
1365
    sError("vgId:%d, failed to restore log store since %s.", pSyncNode->vgId, terrstr());
×
1366
    goto _error;
×
1367
  }
1368

1369
  // timer ms init
1370
  pSyncNode->pingBaseLine = PING_TIMER_MS;
18,610✔
1371
  pSyncNode->electBaseLine = tsElectInterval;
18,610✔
1372
  pSyncNode->hbBaseLine = tsHeartbeatInterval;
18,610✔
1373

1374
  // init ping timer
1375
  pSyncNode->pPingTimer = NULL;
18,610✔
1376
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
18,610✔
1377
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
18,610✔
1378
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
18,610✔
1379
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
18,610✔
1380
  pSyncNode->pingTimerCounter = 0;
18,610✔
1381

1382
  // init elect timer
1383
  pSyncNode->pElectTimer = NULL;
18,610✔
1384
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
18,610✔
1385
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
18,610✔
1386
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
18,610✔
1387
  pSyncNode->electTimerCounter = 0;
18,610✔
1388

1389
  // init heartbeat timer
1390
  pSyncNode->pHeartbeatTimer = NULL;
18,610✔
1391
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
18,610✔
1392
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
18,610✔
1393
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
18,610✔
1394
#ifdef BUILD_NO_CALL
1395
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
1396
#endif
1397
  pSyncNode->heartbeatTimerCounter = 0;
18,610✔
1398

1399
  // init peer heartbeat timer
1400
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
297,752✔
1401
    if ((code = syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i])) != 0) {
279,142!
1402
      terrno = code;
×
1403
      goto _error;
×
1404
    }
1405
  }
1406

1407
  // tools
1408
  if ((code = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS, &pSyncNode->pSyncRespMgr)) != 0) {
18,610!
1409
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
×
1410
    goto _error;
×
1411
  }
1412
  if (pSyncNode->pSyncRespMgr == NULL) {
18,610!
1413
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
×
1414
    goto _error;
×
1415
  }
1416

1417
  // restore state
1418
  pSyncNode->restoreFinish = false;
18,610✔
1419

1420
  // snapshot senders
1421
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
297,644✔
1422
    SSyncSnapshotSender* pSender = NULL;
279,037✔
1423
    code = snapshotSenderCreate(pSyncNode, i, &pSender);
279,037✔
1424
    if (pSender == NULL) return NULL;
279,025!
1425

1426
    pSyncNode->senders[i] = pSender;
279,025✔
1427
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
279,025✔
1428
  }
1429

1430
  // snapshot receivers
1431
  code = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID, &pSyncNode->pNewNodeReceiver);
18,607✔
1432
  if (pSyncNode->pNewNodeReceiver == NULL) return NULL;
18,610!
1433
  sRDebug(pSyncNode->pNewNodeReceiver, "snapshot receiver create while open sync node, data:%p",
18,610✔
1434
          pSyncNode->pNewNodeReceiver);
1435

1436
  // is config changing
1437
  pSyncNode->changing = false;
18,610✔
1438

1439
  // replication mgr
1440
  if ((code = syncNodeLogReplInit(pSyncNode)) < 0) {
18,610!
1441
    terrno = code;
×
1442
    sError("vgId:%d, failed to init repl mgr since %s.", pSyncNode->vgId, terrstr());
×
1443
    goto _error;
×
1444
  }
1445

1446
  // peer state
1447
  if ((code = syncNodePeerStateInit(pSyncNode)) < 0) {
18,610!
1448
    terrno = code;
×
1449
    sError("vgId:%d, failed to init peer stat since %s.", pSyncNode->vgId, terrstr());
×
1450
    goto _error;
×
1451
  }
1452

1453
  //
1454
  // min match index
1455
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
18,607✔
1456

1457
  // start in syncNodeStart
1458
  // start raft
1459

1460
  int64_t timeNow = taosGetTimestampMs();
18,610✔
1461
  pSyncNode->startTime = timeNow;
18,610✔
1462
  pSyncNode->lastReplicateTime = timeNow;
18,610✔
1463

1464
  // snapshotting
1465
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
18,610✔
1466

1467
  // init log buffer
1468
  if ((code = syncLogBufferInit(pSyncNode->pLogBuf, pSyncNode)) < 0) {
18,610!
1469
    terrno = code;
×
1470
    sError("vgId:%d, failed to init sync log buffer since %s", pSyncNode->vgId, terrstr());
×
1471
    goto _error;
×
1472
  }
1473

1474
  pSyncNode->isStart = true;
18,610✔
1475
  pSyncNode->electNum = 0;
18,610✔
1476
  pSyncNode->becomeLeaderNum = 0;
18,610✔
1477
  pSyncNode->becomeAssignedLeaderNum = 0;
18,610✔
1478
  pSyncNode->configChangeNum = 0;
18,610✔
1479
  pSyncNode->hbSlowNum = 0;
18,610✔
1480
  pSyncNode->hbrSlowNum = 0;
18,610✔
1481
  pSyncNode->tmrRoutineNum = 0;
18,610✔
1482

1483
  sNInfo(pSyncNode, "sync node opened, node:%p electInterval:%d heartbeatInterval:%d heartbeatTimeout:%d", pSyncNode,
18,610✔
1484
         tsElectInterval, tsHeartbeatInterval, tsHeartbeatTimeout);
1485
  return pSyncNode;
18,610✔
1486

1487
_error:
×
1488
  if (pSyncInfo->pFsm) {
×
1489
    taosMemoryFree(pSyncInfo->pFsm);
×
1490
    pSyncInfo->pFsm = NULL;
×
1491
  }
1492
  syncNodeClose(pSyncNode);
×
1493
  pSyncNode = NULL;
×
1494
  return NULL;
×
1495
}
1496

1497
#ifdef BUILD_NO_CALL
1498
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
1499
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
1500
    SSnapshot snapshot = {0};
1501
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
1502
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
1503
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
1504
    }
1505
  }
1506
}
1507
#endif
1508

1509
int32_t syncNodeRestore(SSyncNode* pSyncNode) {
18,608✔
1510
  int32_t code = 0;
18,608✔
1511
  if (pSyncNode->pLogStore == NULL) {
18,608!
1512
    sError("vgId:%d, log store not created", pSyncNode->vgId);
×
1513
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1514
  }
1515
  if (pSyncNode->pLogBuf == NULL) {
18,608!
1516
    sError("vgId:%d, ring log buffer not created", pSyncNode->vgId);
×
1517
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1518
  }
1519

1520
  (void)taosThreadMutexLock(&pSyncNode->pLogBuf->mutex);
18,608✔
1521
  SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
18,608✔
1522
  SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore);
18,608✔
1523
  SyncIndex endIndex = pSyncNode->pLogBuf->endIndex;
18,608✔
1524
  (void)taosThreadMutexUnlock(&pSyncNode->pLogBuf->mutex);
18,608✔
1525

1526
  if (lastVer != -1 && endIndex != lastVer + 1) {
18,608!
1527
    code = TSDB_CODE_WAL_LOG_INCOMPLETE;
×
1528
    sWarn("vgId:%d, failed to restore sync node since %s. expected lastLogIndex:%" PRId64 ", lastVer:%" PRId64,
×
1529
          pSyncNode->vgId, terrstr(), endIndex - 1, lastVer);
1530
    // TAOS_RETURN(code);
1531
  }
1532

1533
  // if (endIndex != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
1534
  pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);
18,608✔
1535
  sInfo("vgId:%d, restore began, and keep syncing until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
18,608✔
1536

1537
  if (pSyncNode->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
37,216!
1538
      (code = syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex, NULL, "restore")) < 0) {
18,608✔
1539
    TAOS_RETURN(code);
×
1540
  }
1541

1542
  TAOS_RETURN(code);
18,608✔
1543
}
1544

1545
int32_t syncNodeStart(SSyncNode* pSyncNode) {
18,608✔
1546
  // start raft
1547
  sInfo("vgId:%d, begin to start sync node", pSyncNode->vgId);
18,608✔
1548
  if (pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
18,608✔
1549
    syncNodeBecomeLearner(pSyncNode, "first start");
394✔
1550
  } else {
1551
    if (pSyncNode->replicaNum == 1) {
18,214✔
1552
      raftStoreNextTerm(pSyncNode);
13,497✔
1553
      syncNodeBecomeLeader(pSyncNode, "one replica start");
13,497✔
1554

1555
      // Raft 3.6.2 Committing entries from previous terms
1556
      TAOS_CHECK_RETURN(syncNodeAppendNoop(pSyncNode));
13,497!
1557
    } else {
1558
      SRaftId id = {0};
4,717✔
1559
      syncNodeBecomeFollower(pSyncNode, id, "first start");
4,717✔
1560
    }
1561
  }
1562

1563
  int32_t ret = 0;
18,608✔
1564
  ret = syncNodeStartPingTimer(pSyncNode);
18,608✔
1565
  if (ret != 0) {
18,608!
1566
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, tstrerror(ret));
×
1567
  }
1568
  sInfo("vgId:%d, sync node started", pSyncNode->vgId);
18,608✔
1569
  return ret;
18,608✔
1570
}
1571

1572
#ifdef BUILD_NO_CALL
1573
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
1574
  // state change
1575
  int32_t code = 0;
1576
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
1577
  pSyncNode->roleTimeMs = taosGetTimestampMs();
1578
  // TODO check return value
1579
  TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1580

1581
  // reset elect timer, long enough
1582
  int32_t electMS = TIMER_MAX_MS;
1583
  code = syncNodeRestartElectTimer(pSyncNode, electMS);
1584
  if (code < 0) {
1585
    sError("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, terrstr());
1586
    return -1;
1587
  }
1588

1589
  code = syncNodeStartPingTimer(pSyncNode);
1590
  if (code < 0) {
1591
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr());
1592
    return -1;
1593
  }
1594
  return code;
1595
}
1596
#endif
1597

1598
void syncNodePreClose(SSyncNode* pSyncNode) {
18,607✔
1599
  int32_t code = 0;
18,607✔
1600
  if (pSyncNode == NULL) {
18,607!
1601
    sError("failed to pre close sync node since sync node is null");
×
1602
    return;
×
1603
  }
1604
  if (pSyncNode->pFsm == NULL) {
18,607!
1605
    sError("failed to pre close sync node since fsm is null");
×
1606
    return;
×
1607
  }
1608
  if (pSyncNode->pFsm->FpApplyQueueItems == NULL) {
18,607!
1609
    sError("failed to pre close sync node since FpApplyQueueItems is null");
×
1610
    return;
×
1611
  }
1612

1613
  // stop elect timer
1614
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
18,607!
1615
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1616
    return;
×
1617
  }
1618

1619
  // stop heartbeat timer
1620
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
18,605!
1621
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
1622
    return;
×
1623
  }
1624

1625
  // stop ping timer
1626
  if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) {
18,606!
1627
    sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code));
×
1628
    return;
×
1629
  }
1630

1631
  // clean rsp
1632
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
18,608✔
1633
}
1634

1635
void syncNodePostClose(SSyncNode* pSyncNode) {
16,060✔
1636
  if (pSyncNode->pNewNodeReceiver != NULL) {
16,060!
1637
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
16,060!
1638
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
1639
    }
1640

1641
    sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
16,059✔
1642
           pSyncNode->pNewNodeReceiver);
1643
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
16,060✔
1644
    pSyncNode->pNewNodeReceiver = NULL;
16,060✔
1645
  }
1646
}
16,060✔
1647

1648
void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }
3,911!
1649

1650
void syncNodeClose(SSyncNode* pSyncNode) {
18,608✔
1651
  int32_t code = 0;
18,608✔
1652
  if (pSyncNode == NULL) return;
18,608!
1653
  sNInfo(pSyncNode, "sync close, node:%p", pSyncNode);
18,608✔
1654

1655
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
18,608✔
1656

1657
  if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) {
18,608!
1658
    sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code));
×
1659
    return;
×
1660
  }
1661
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
18,608!
1662
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1663
    return;
×
1664
  }
1665
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
18,608!
1666
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
1667
    return;
×
1668
  }
1669
  syncNodeLogReplDestroy(pSyncNode);
18,608✔
1670

1671
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
18,608✔
1672
  pSyncNode->pSyncRespMgr = NULL;
18,606✔
1673
  voteGrantedDestroy(pSyncNode->pVotesGranted);
18,606✔
1674
  pSyncNode->pVotesGranted = NULL;
18,608✔
1675
  votesRespondDestory(pSyncNode->pVotesRespond);
18,608✔
1676
  pSyncNode->pVotesRespond = NULL;
18,608✔
1677
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
18,608✔
1678
  pSyncNode->pNextIndex = NULL;
18,603✔
1679
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
18,603✔
1680
  pSyncNode->pMatchIndex = NULL;
18,602✔
1681
  logStoreDestory(pSyncNode->pLogStore);
18,602✔
1682
  pSyncNode->pLogStore = NULL;
18,605✔
1683
  syncLogBufferDestroy(pSyncNode->pLogBuf);
18,605✔
1684
  pSyncNode->pLogBuf = NULL;
18,608✔
1685

1686
  (void)taosThreadMutexDestroy(&pSyncNode->arbTokenMutex);
18,608✔
1687

1688
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
297,670✔
1689
    if (pSyncNode->senders[i] != NULL) {
279,062✔
1690
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", pSyncNode->vgId, pSyncNode->senders[i]);
279,061✔
1691

1692
      if (snapshotSenderIsStart(pSyncNode->senders[i])) {
279,067!
1693
        snapshotSenderStop(pSyncNode->senders[i], false);
×
1694
      }
1695

1696
      snapshotSenderDestroy(pSyncNode->senders[i]);
279,064✔
1697
      pSyncNode->senders[i] = NULL;
279,073✔
1698
    }
1699
  }
1700

1701
  if (pSyncNode->pNewNodeReceiver != NULL) {
18,608✔
1702
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
2,548!
1703
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
1704
    }
1705

1706
    sDebug("vgId:%d, snapshot receiver destroy while close, data:%p", pSyncNode->vgId, pSyncNode->pNewNodeReceiver);
2,548✔
1707
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
2,548✔
1708
    pSyncNode->pNewNodeReceiver = NULL;
2,548✔
1709
  }
1710

1711
  if (pSyncNode->pFsm != NULL) {
18,608!
1712
    taosMemoryFree(pSyncNode->pFsm);
18,608!
1713
  }
1714

1715
  raftStoreClose(pSyncNode);
18,608✔
1716

1717
  taosMemoryFree(pSyncNode);
18,607!
1718
}
1719

1720
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->raftCfg.snapshotStrategy; }
×
1721

1722
// timer control --------------
1723
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
18,608✔
1724
  int32_t code = 0;
18,608✔
1725
  if (syncIsInit()) {
18,608!
1726
    bool stopped = taosTmrResetPriority(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, (void*)pSyncNode->rid,
18,607✔
1727
                                        syncEnv()->pTimerManager, &pSyncNode->pPingTimer, 2);
18,607✔
1728
    if (stopped) {
18,608!
1729
      sError("vgId:%d, failed to reset ping timer, ms:%d", pSyncNode->vgId, pSyncNode->pingTimerMS);
×
1730
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1731
    }
1732
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
18,608✔
1733
  } else {
1734
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
×
1735
  }
1736
  return code;
18,608✔
1737
}
1738

1739
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
37,214✔
1740
  int32_t code = 0;
37,214✔
1741
  (void)atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
37,214✔
1742
  bool stop = taosTmrStop(pSyncNode->pPingTimer);
37,215✔
1743
  sDebug("vgId:%d, stop ping timer, stop:%d", pSyncNode->vgId, stop);
37,216✔
1744
  pSyncNode->pPingTimer = NULL;
37,216✔
1745
  return code;
37,216✔
1746
}
1747

1748
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
544,590✔
1749
  int32_t code = 0;
544,590✔
1750
  if (syncIsInit()) {
544,590!
1751
    pSyncNode->electTimerMS = ms;
544,591✔
1752

1753
    int64_t execTime = taosGetTimestampMs() + ms;
544,592✔
1754
    atomic_store_64(&(pSyncNode->electTimerParam.executeTime), execTime);
544,592✔
1755
    atomic_store_64(&(pSyncNode->electTimerParam.logicClock), pSyncNode->electTimerLogicClock);
544,590✔
1756
    pSyncNode->electTimerParam.pSyncNode = pSyncNode;
544,591✔
1757
    pSyncNode->electTimerParam.pData = NULL;
544,591✔
1758

1759
    bool stopped = taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid),
544,590✔
1760
                                syncEnv()->pTimerManager, &pSyncNode->pElectTimer);
544,591✔
1761
    if (stopped) sWarn("vgId:%d, failed to reset elect timer, ms:%d", pSyncNode->vgId, ms);
544,591!
1762
  } else {
1763
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
×
1764
  }
1765
  return code;
544,591✔
1766
}
1767

1768
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
596,981✔
1769
  int32_t code = 0;
596,981✔
1770
  (void)atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
596,981✔
1771
  bool stop = taosTmrStop(pSyncNode->pElectTimer);
596,981✔
1772
  sTrace("vgId:%d, stop elect timer, stop:%d", pSyncNode->vgId, stop);
596,980✔
1773
  pSyncNode->pElectTimer = NULL;
596,980✔
1774

1775
  return code;
596,980✔
1776
}
1777

1778
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
544,592✔
1779
  int32_t ret = 0;
544,592✔
1780
  TAOS_CHECK_RETURN(syncNodeStopElectTimer(pSyncNode));
544,592!
1781
  TAOS_CHECK_RETURN(syncNodeStartElectTimer(pSyncNode, ms));
544,591!
1782
  return ret;
544,591✔
1783
}
1784

1785
void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
544,586✔
1786
  int32_t code = 0;
544,586✔
1787
  int32_t electMS;
1788

1789
  if (pSyncNode->raftCfg.isStandBy) {
544,586!
1790
    electMS = TIMER_MAX_MS;
×
1791
  } else {
1792
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
544,586✔
1793
  }
1794

1795
  if ((code = syncNodeRestartElectTimer(pSyncNode, electMS)) != 0) {
544,592!
1796
    sWarn("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1797
    return;
×
1798
  };
1799

1800
  sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
544,591!
1801
          electMS);
1802
}
1803

1804
#ifdef BUILD_NO_CALL
1805
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
1806
  int32_t code = 0;
1807
  if (syncIsInit()) {
1808
    TAOS_CHECK_RETURN(taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, (void*)pSyncNode->rid,
1809
                                   syncEnv()->pTimerManager, &pSyncNode->pHeartbeatTimer));
1810
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
1811
  } else {
1812
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1813
  }
1814

1815
  sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
1816
  return code;
1817
}
1818
#endif
1819

1820
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
17,327✔
1821
  int32_t ret = 0;
17,327✔
1822

1823
#if 0
1824
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1825
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
1826
#endif
1827

1828
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
21,244✔
1829
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
3,917✔
1830
    if (pSyncTimer != NULL) {
3,917!
1831
      TAOS_CHECK_RETURN(syncHbTimerStart(pSyncNode, pSyncTimer));
3,917!
1832
    }
1833
  }
1834

1835
  return ret;
17,327✔
1836
}
1837

1838
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
47,798✔
1839
  int32_t code = 0;
47,798✔
1840

1841
#if 0
1842
  TAOS_CHECK_RETURN(atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1));
1843
  bool stop = taosTmrStop(pSyncNode->pHeartbeatTimer);
1844
  sDebug("vgId:%d, stop heartbeat timer, stop:%d", pSyncNode->vgId, stop);
1845
  pSyncNode->pHeartbeatTimer = NULL;
1846
#endif
1847

1848
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
87,926✔
1849
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
40,129✔
1850
    if (pSyncTimer != NULL) {
40,129!
1851
      TAOS_CHECK_RETURN(syncHbTimerStop(pSyncNode, pSyncTimer));
40,129!
1852
    }
1853
  }
1854

1855
  return code;
47,797✔
1856
}
1857

1858
#ifdef BUILD_NO_CALL
1859
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
1860
  // TODO check return value
1861
  int32_t code = 0;
1862
  TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1863
  TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
1864
  return 0;
1865
}
1866
#endif
1867

1868
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pNode, SRpcMsg* pMsg) {
1,176,296✔
1869
  SEpSet* epSet = NULL;
1,176,296✔
1870
  for (int32_t i = 0; i < pNode->peersNum; ++i) {
1,735,689✔
1871
    if (destRaftId->addr == pNode->peersId[i].addr) {
1,735,609✔
1872
      epSet = &pNode->peersEpset[i];
1,176,216✔
1873
      break;
1,176,216✔
1874
    }
1875
  }
1876

1877
  int32_t code = -1;
1,176,296✔
1878
  if (pNode->syncSendMSg != NULL && epSet != NULL) {
1,176,296!
1879
    syncUtilMsgHtoN(pMsg->pCont);
1,176,224✔
1880
    pMsg->info.noResp = 1;
1,176,213✔
1881
    code = pNode->syncSendMSg(epSet, pMsg);
1,176,213✔
1882
  }
1883

1884
  if (code < 0) {
1,176,329✔
1885
    sError("vgId:%d, failed to send sync msg since %s. epset:%p dnode:%d addr:0x%" PRIx64, pNode->vgId, tstrerror(code),
99!
1886
           epSet, DID(destRaftId), destRaftId->addr);
1887
    rpcFreeCont(pMsg->pCont);
99✔
1888
  }
1889

1890
  TAOS_RETURN(code);
1,176,329✔
1891
}
1892

1893
inline bool syncNodeInConfig(SSyncNode* pNode, const SSyncCfg* pCfg) {
3,087✔
1894
  bool b1 = false;
3,087✔
1895
  bool b2 = false;
3,087✔
1896

1897
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
3,660!
1898
    if (strcmp(pCfg->nodeInfo[i].nodeFqdn, pNode->myNodeInfo.nodeFqdn) == 0 &&
3,660!
1899
        pCfg->nodeInfo[i].nodePort == pNode->myNodeInfo.nodePort) {
3,660✔
1900
      b1 = true;
3,087✔
1901
      break;
3,087✔
1902
    }
1903
  }
1904

1905
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
3,660!
1906
    SRaftId raftId = {
3,660✔
1907
        .addr = SYNC_ADDR(&pCfg->nodeInfo[i]),
3,660✔
1908
        .vgId = pNode->vgId,
3,660✔
1909
    };
1910

1911
    if (syncUtilSameId(&raftId, &pNode->myRaftId)) {
3,660✔
1912
      b2 = true;
3,087✔
1913
      break;
3,087✔
1914
    }
1915
  }
1916

1917
  if (b1 != b2) {
3,087!
1918
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1919
    return false;
×
1920
  }
1921
  return b1;
3,087✔
1922
}
1923

1924
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
4,741✔
1925
  if (pOldCfg->totalReplicaNum != pNewCfg->totalReplicaNum) return true;
4,741✔
1926
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
3,109✔
1927
  for (int32_t i = 0; i < pOldCfg->totalReplicaNum; ++i) {
7,503✔
1928
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
5,475✔
1929
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
5,475✔
1930
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
5,475!
1931
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
5,475✔
1932
    if (pOldInfo->nodeRole != pNewInfo->nodeRole) return true;
5,472✔
1933
  }
1934

1935
  return false;
2,028✔
1936
}
1937

1938
int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
2,381✔
1939
  int32_t  code = 0;
2,381✔
1940
  SSyncCfg oldConfig = pSyncNode->raftCfg.cfg;
2,381✔
1941
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
2,381✔
1942
    sInfo("vgId:1, sync not reconfig since not changed");
2,028✔
1943
    return 0;
2,028✔
1944
  }
1945

1946
  pSyncNode->raftCfg.cfg = *pNewConfig;
353✔
1947
  pSyncNode->raftCfg.lastConfigIndex = lastConfigChangeIndex;
353✔
1948

1949
  pSyncNode->configChangeNum++;
353✔
1950

1951
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
353✔
1952
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
353✔
1953

1954
  bool isDrop = false;
353✔
1955
  bool isAdd = false;
353✔
1956

1957
  if (IamInOld && !IamInNew) {
353!
1958
    isDrop = true;
×
1959
  } else {
1960
    isDrop = false;
353✔
1961
  }
1962

1963
  if (!IamInOld && IamInNew) {
353!
1964
    isAdd = true;
×
1965
  } else {
1966
    isAdd = false;
353✔
1967
  }
1968

1969
  // log begin config change
1970
  sNInfo(pSyncNode, "begin do config change, from %d to %d, from %" PRId64 " to %" PRId64 ", replicas:%d",
353!
1971
         pSyncNode->vgId, oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum, oldConfig.lastIndex,
1972
         pNewConfig->lastIndex);
1973

1974
  if (IamInNew) {
353!
1975
    pSyncNode->raftCfg.isStandBy = 0;  // change isStandBy to normal
353✔
1976
  }
1977
  if (isDrop) {
353!
1978
    pSyncNode->raftCfg.isStandBy = 1;  // set standby
×
1979
  }
1980

1981
  // add last config index
1982
  SRaftCfg* pCfg = &pSyncNode->raftCfg;
353✔
1983
  if (pCfg->configIndexCount >= MAX_CONFIG_INDEX_COUNT) {
353!
1984
    sNError(pSyncNode, "failed to add cfg index:%d since out of range", pCfg->configIndexCount);
×
1985
    terrno = TSDB_CODE_OUT_OF_RANGE;
×
1986
    return -1;
×
1987
  }
1988

1989
  pCfg->configIndexArr[pCfg->configIndexCount] = lastConfigChangeIndex;
353✔
1990
  pCfg->configIndexCount++;
353✔
1991

1992
  if (IamInNew) {
353!
1993
    //-----------------------------------------
1994
    int32_t ret = 0;
353✔
1995

1996
    // save snapshot senders
1997
    SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
1998
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
353✔
1999
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
2000
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
5,648✔
2001
      oldSenders[i] = pSyncNode->senders[i];
5,295✔
2002
      sSTrace(oldSenders[i], "snapshot sender save old");
5,295!
2003
    }
2004

2005
    // init internal
2006
    pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
353✔
2007
    if (syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId) == false) return terrno;
353!
2008

2009
    // init peersNum, peers, peersId
2010
    pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
353✔
2011
    int32_t j = 0;
353✔
2012
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
1,273✔
2013
      if (i != pSyncNode->raftCfg.cfg.myIndex) {
920✔
2014
        pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
567✔
2015
        syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
567✔
2016
        j++;
567✔
2017
      }
2018
    }
2019
    for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
920✔
2020
      if (syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]) == false)
567!
2021
        return terrno;
×
2022
    }
2023

2024
    // init replicaNum, replicasId
2025
    pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
353✔
2026
    pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
353✔
2027
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
1,273✔
2028
      if (syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]) ==
920!
2029
          false)
2030
        return terrno;
×
2031
    }
2032

2033
    // update quorum first
2034
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
353✔
2035

2036
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
353✔
2037
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
353✔
2038
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
353✔
2039
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
353✔
2040

2041
    // reset snapshot senders
2042

2043
    // clear new
2044
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
5,648✔
2045
      pSyncNode->senders[i] = NULL;
5,295✔
2046
    }
2047

2048
    // reset new
2049
    for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
1,273✔
2050
      // reset sender
2051
      bool reset = false;
920✔
2052
      for (int32_t j = 0; j < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++j) {
3,898✔
2053
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j]) && oldSenders[j] != NULL) {
3,735!
2054
          sNTrace(pSyncNode, "snapshot sender reset for:%" PRId64 ", newIndex:%d, dnode:%d, %p",
757!
2055
                  (pSyncNode->replicasId)[i].addr, i, DID(&pSyncNode->replicasId[i]), oldSenders[j]);
2056

2057
          pSyncNode->senders[i] = oldSenders[j];
757✔
2058
          oldSenders[j] = NULL;
757✔
2059
          reset = true;
757✔
2060

2061
          // reset replicaIndex
2062
          int32_t oldreplicaIndex = pSyncNode->senders[i]->replicaIndex;
757✔
2063
          pSyncNode->senders[i]->replicaIndex = i;
757✔
2064

2065
          sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, dnode:%d, %p, reset:%d",
757!
2066
                  oldreplicaIndex, i, DID(&pSyncNode->replicasId[i]), pSyncNode->senders[i], reset);
2067

2068
          break;
757✔
2069
        }
2070
      }
2071
    }
2072

2073
    // create new
2074
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
5,648✔
2075
      if (pSyncNode->senders[i] == NULL) {
5,295✔
2076
        TAOS_CHECK_RETURN(snapshotSenderCreate(pSyncNode, i, &pSyncNode->senders[i]));
4,538!
2077
        if (pSyncNode->senders[i] == NULL) {
4,538!
2078
          // will be created later while send snapshot
2079
          sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig");
×
2080
        } else {
2081
          sSDebug(pSyncNode->senders[i], "snapshot sender create while reconfig, data:%p", pSyncNode->senders[i]);
4,538✔
2082
        }
2083
      } else {
2084
        sSDebug(pSyncNode->senders[i], "snapshot sender already exist, data:%p", pSyncNode->senders[i]);
757✔
2085
      }
2086
    }
2087

2088
    // free old
2089
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
5,648✔
2090
      if (oldSenders[i] != NULL) {
5,295✔
2091
        sSDebug(oldSenders[i], "snapshot sender destroy old, data:%p replica-index:%d", oldSenders[i], i);
4,538✔
2092
        snapshotSenderDestroy(oldSenders[i]);
4,538✔
2093
        oldSenders[i] = NULL;
4,538✔
2094
      }
2095
    }
2096

2097
    // persist cfg
2098
    TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode));
353!
2099
  } else {
2100
    // persist cfg
2101
    TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode));
×
2102
    sNInfo(pSyncNode, "do not config change from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
×
2103
  }
2104

2105
_END:
×
2106
  // log end config change
2107
  sNInfo(pSyncNode, "end do config change, from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
353!
2108
  return 0;
353✔
2109
}
2110

2111
// raft state change --------------
2112
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
691✔
2113
  if (term > raftStoreGetTerm(pSyncNode)) {
691!
2114
    raftStoreSetTerm(pSyncNode, term);
×
2115
  }
2116
}
691✔
2117

2118
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm, SRaftId id) {
474,678✔
2119
  SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
474,678✔
2120
  if (currentTerm > newTerm) {
474,682!
2121
    sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
×
2122
    return;
×
2123
  }
2124

2125
  do {
2126
    sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
474,682!
2127
  } while (0);
2128

2129
  if (pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
474,682!
2130
    (void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
×
2131
    syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncNode->vgId, pSyncNode->arbToken);
×
2132
    sInfo("vgId:%d, step down as assigned leader, new arbToken:%s", pSyncNode->vgId, pSyncNode->arbToken);
×
2133
    (void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
×
2134
  }
2135

2136
  if (currentTerm < newTerm) {
474,681✔
2137
    raftStoreSetTerm(pSyncNode, newTerm);
3,524✔
2138
    char tmpBuf[64];
2139
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
3,524✔
2140
    syncNodeBecomeFollower(pSyncNode, id, tmpBuf);
3,524✔
2141
    raftStoreClearVote(pSyncNode);
3,524✔
2142
  } else {
2143
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
471,157✔
2144
      syncNodeBecomeFollower(pSyncNode, id, "step down");
15✔
2145
    }
2146
  }
2147
}
2148

2149
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }
8,429✔
2150

2151
void syncNodeBecomeFollower(SSyncNode* pSyncNode, SRaftId leaderId, const char* debugStr) {
8,429✔
2152
  int32_t code = 0;  // maybe clear leader cache
8,429✔
2153
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
8,429✔
2154
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
226✔
2155
    pSyncNode->leaderCacheEp.port = 0;
226✔
2156
    pSyncNode->leaderCacheEp.fqdn[0] = '\0';
226✔
2157
  }
2158

2159
  pSyncNode->hbSlowNum = 0;
8,429✔
2160

2161
  pSyncNode->leaderCache = leaderId;  // state change
8,429✔
2162

2163
  for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
26,974✔
2164
    if (syncUtilSameId(&pSyncNode->replicasId[i], &leaderId)) {
22,084✔
2165
      pSyncNode->leaderCacheEp.port = pSyncNode->raftCfg.cfg.nodeInfo[i].nodePort;
3,539✔
2166
      strncpy(pSyncNode->leaderCacheEp.fqdn, pSyncNode->raftCfg.cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
3,539✔
2167
      break;
3,539✔
2168
    }
2169
  }
2170
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
8,429✔
2171
  pSyncNode->roleTimeMs = taosGetTimestampMs();
8,429✔
2172
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
8,429!
2173
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2174
    return;
×
2175
  }
2176

2177
  // trace log
2178
  sNTrace(pSyncNode, "become follower %s", debugStr);
8,429!
2179

2180
  // send rsp to client
2181
  syncNodeLeaderChangeRsp(pSyncNode);
8,429✔
2182

2183
  // call back
2184
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
8,429!
2185
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
8,429✔
2186
  }
2187

2188
  // min match index
2189
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
8,428✔
2190

2191
  // reset log buffer
2192
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
8,428!
2193
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2194
    return;
×
2195
  }
2196

2197
  // reset elect timer
2198
  syncNodeResetElectTimer(pSyncNode);
8,429✔
2199

2200
  sInfo("vgId:%d, become follower. %s", pSyncNode->vgId, debugStr);
8,429!
2201
}
2202

2203
void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) {
394✔
2204
  pSyncNode->hbSlowNum = 0;
394✔
2205

2206
  // state change
2207
  pSyncNode->state = TAOS_SYNC_STATE_LEARNER;
394✔
2208
  pSyncNode->roleTimeMs = taosGetTimestampMs();
394✔
2209

2210
  // trace log
2211
  sNTrace(pSyncNode, "become learner %s", debugStr);
394!
2212

2213
  // call back
2214
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLearnerCb != NULL) {
394!
2215
    pSyncNode->pFsm->FpBecomeLearnerCb(pSyncNode->pFsm);
394✔
2216
  }
2217

2218
  // min match index
2219
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
394✔
2220

2221
  // reset log buffer
2222
  int32_t code = 0;
394✔
2223
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
394!
2224
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2225
    return;
×
2226
  };
2227
}
2228

2229
// TLA+ Spec
2230
// \* Candidate i transitions to leader.
2231
// BecomeLeader(i) ==
2232
//     /\ state[i] = Candidate
2233
//     /\ votesGranted[i] \in Quorum
2234
//     /\ state'      = [state EXCEPT ![i] = Leader]
2235
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
2236
//                          [j \in Server |-> Len(log[i]) + 1]]
2237
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
2238
//                          [j \in Server |-> 0]]
2239
//     /\ elections'  = elections \cup
2240
//                          {[eterm     |-> currentTerm[i],
2241
//                            eleader   |-> i,
2242
//                            elog      |-> log[i],
2243
//                            evotes    |-> votesGranted[i],
2244
//                            evoterLog |-> voterLog[i]]}
2245
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
2246
//
2247
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
15,173✔
2248
  int32_t code = 0;
15,173✔
2249
  pSyncNode->becomeLeaderNum++;
15,173✔
2250
  pSyncNode->hbrSlowNum = 0;
15,173✔
2251

2252
  // reset restoreFinish
2253
  pSyncNode->restoreFinish = false;
15,173✔
2254

2255
  // state change
2256
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
15,173✔
2257
  pSyncNode->roleTimeMs = taosGetTimestampMs();
15,173✔
2258

2259
  // set leader cache
2260
  pSyncNode->leaderCache = pSyncNode->myRaftId;
15,173✔
2261
  strncpy(pSyncNode->leaderCacheEp.fqdn, pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeFqdn,
15,173✔
2262
          TSDB_FQDN_LEN);
2263
  pSyncNode->leaderCacheEp.port = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodePort;
15,173✔
2264

2265
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
33,661✔
2266
    SyncIndex lastIndex;
2267
    SyncTerm  lastTerm;
2268
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
18,488✔
2269
    if (code != 0) {
18,488!
2270
      sError("vgId:%d, failed to become leader since %s", pSyncNode->vgId, tstrerror(code));
×
2271
      return;
×
2272
    }
2273
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
18,488✔
2274
  }
2275

2276
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
33,661✔
2277
    // maybe overwrite myself, no harm
2278
    // just do it!
2279
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
18,488✔
2280
  }
2281

2282
  // init peer mgr
2283
  if ((code = syncNodePeerStateInit(pSyncNode)) != 0) {
15,173!
2284
    sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code));
×
2285
    return;
×
2286
  }
2287

2288
#if 0
2289
  // update sender private term
2290
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
2291
  if (pMySender != NULL) {
2292
    for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
2293
      if (pSyncNode->senders[i]->privateTerm > pMySender->privateTerm) {
2294
        pMySender->privateTerm = pSyncNode->senders[i]->privateTerm;
2295
      }
2296
    }
2297
    (pMySender->privateTerm) += 100;
2298
  }
2299
#endif
2300

2301
  // close receiver
2302
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
15,173!
2303
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
2304
  }
2305

2306
  // stop elect timer
2307
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
15,173!
2308
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
2309
    return;
×
2310
  }
2311

2312
  // start heartbeat timer
2313
  if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) {
15,173!
2314
    sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2315
    return;
×
2316
  }
2317

2318
  // send heartbeat right now
2319
  if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) {
15,173!
2320
    sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code));
×
2321
    return;
×
2322
  }
2323

2324
  // call back
2325
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
15,172!
2326
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
15,173✔
2327
  }
2328

2329
  // min match index
2330
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
15,172✔
2331

2332
  // reset log buffer
2333
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
15,172!
2334
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2335
    return;
×
2336
  }
2337

2338
  // trace log
2339
  sNInfo(pSyncNode, "become leader %s", debugStr);
15,173✔
2340
}
2341

2342
void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) {
×
2343
  int32_t code = 0;
×
2344
  pSyncNode->becomeAssignedLeaderNum++;
×
2345
  pSyncNode->hbrSlowNum = 0;
×
2346

2347
  // reset restoreFinish
2348
  // pSyncNode->restoreFinish = false;
2349

2350
  // state change
2351
  pSyncNode->state = TAOS_SYNC_STATE_ASSIGNED_LEADER;
×
2352
  pSyncNode->roleTimeMs = taosGetTimestampMs();
×
2353

2354
  // set leader cache
2355
  pSyncNode->leaderCache = pSyncNode->myRaftId;
×
2356

2357
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
×
2358
    SyncIndex lastIndex;
2359
    SyncTerm  lastTerm;
2360
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
×
2361
    if (code != 0) {
×
2362
      sError("vgId:%d, failed to become assigned leader since %s", pSyncNode->vgId, tstrerror(code));
×
2363
      return;
×
2364
    }
2365
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
×
2366
  }
2367

2368
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
×
2369
    // maybe overwrite myself, no harm
2370
    // just do it!
2371
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
×
2372
  }
2373

2374
  // init peer mgr
2375
  if ((code = syncNodePeerStateInit(pSyncNode)) != 0) {
×
2376
    sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code));
×
2377
    return;
×
2378
  }
2379

2380
  // close receiver
2381
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
×
2382
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
2383
  }
2384

2385
  // stop elect timer
2386
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
×
2387
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
2388
    return;
×
2389
  }
2390

2391
  // start heartbeat timer
2392
  if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) {
×
2393
    sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2394
    return;
×
2395
  }
2396

2397
  // send heartbeat right now
2398
  if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) {
×
2399
    sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code));
×
2400
    return;
×
2401
  }
2402

2403
  // call back
2404
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeAssignedLeaderCb != NULL) {
×
2405
    pSyncNode->pFsm->FpBecomeAssignedLeaderCb(pSyncNode->pFsm);
×
2406
  }
2407

2408
  // min match index
2409
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
×
2410

2411
  // reset log buffer
2412
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
×
2413
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2414
    return;
×
2415
  }
2416

2417
  // trace log
2418
  sNInfo(pSyncNode, "become assigned leader");
×
2419
}
2420

2421
void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
1,676✔
2422
  if (pSyncNode->state != TAOS_SYNC_STATE_CANDIDATE) {
1,676!
2423
    sError("vgId:%d, failed leader from candidate since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state);
×
2424
    return;
×
2425
  }
2426
  bool granted = voteGrantedMajority(pSyncNode->pVotesGranted);
1,676✔
2427
  if (!granted) {
1,676!
2428
    sError("vgId:%d, not granted by majority.", pSyncNode->vgId);
×
2429
    return;
×
2430
  }
2431
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");
1,676✔
2432

2433
  sNTrace(pSyncNode, "state change syncNodeCandidate2Leader");
1,676!
2434

2435
  int32_t ret = syncNodeAppendNoop(pSyncNode);
1,676✔
2436
  if (ret < 0) {
1,676!
2437
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, terrstr());
×
2438
  }
2439

2440
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
1,676✔
2441

2442
  sInfo("vgId:%d, become leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64, pSyncNode->vgId,
1,676!
2443
        raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
2444
}
2445

2446
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }
185,806✔
2447

2448
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
33,779✔
2449
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
540,445✔
2450
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
506,666✔
2451
    pSyncNode->peerStates[i].lastSendTime = 0;
506,666✔
2452
  }
2453

2454
  return 0;
33,779✔
2455
}
2456

2457
void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
2,220✔
2458
  if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
2,220!
2459
    sError("vgId:%d, failed candidate from follower since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state);
×
2460
    return;
×
2461
  }
2462
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
2,220✔
2463
  pSyncNode->roleTimeMs = taosGetTimestampMs();
2,220✔
2464
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
2,220✔
2465
  sInfo("vgId:%d, become candidate from follower. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
2,220!
2466
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
2467

2468
  sNTrace(pSyncNode, "follower to candidate");
2,220!
2469
}
2470

2471
int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) {
×
2472
  if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) return TSDB_CODE_SYN_INTERNAL_ERROR;
×
2473
  syncNodeBecomeLeader(pSyncNode, "assigned leader to leader");
×
2474

2475
  sNTrace(pSyncNode, "assigned leader to leader");
×
2476

2477
  int32_t ret = syncNodeAppendNoop(pSyncNode);
×
2478
  if (ret < 0) {
×
2479
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, tstrerror(ret));
×
2480
  }
2481

2482
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
×
2483
  sInfo("vgId:%d, become leader from assigned leader. term:%" PRId64 ", commit index:%" PRId64
×
2484
        "assigned commit index:%" PRId64 ", last index:%" PRId64,
2485
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, pSyncNode->assignedCommitIndex,
2486
        lastIndex);
2487
  return 0;
×
2488
}
2489

2490
// just called by syncNodeVoteForSelf
2491
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
2,356✔
2492
  SyncTerm storeTerm = raftStoreGetTerm(pSyncNode);
2,356✔
2493
  if (term != storeTerm) {
2,356!
2494
    sError("vgId:%d, failed to vote for term, term:%" PRId64 ", storeTerm:%" PRId64, pSyncNode->vgId, term, storeTerm);
×
2495
    return;
×
2496
  }
2497
  sTrace("vgId:%d, begin hasVoted", pSyncNode->vgId);
2,356!
2498
  bool voted = raftStoreHasVoted(pSyncNode);
2,356✔
2499
  if (voted) {
2,356!
2500
    sError("vgId:%d, failed to vote for term since not voted", pSyncNode->vgId);
×
2501
    return;
×
2502
  }
2503

2504
  raftStoreVote(pSyncNode, pRaftId);
2,356✔
2505
}
2506

2507
// simulate get vote from outside
2508
void syncNodeVoteForSelf(SSyncNode* pSyncNode, SyncTerm currentTerm) {
2,356✔
2509
  syncNodeVoteForTerm(pSyncNode, currentTerm, &pSyncNode->myRaftId);
2,356✔
2510

2511
  SRpcMsg rpcMsg = {0};
2,356✔
2512
  int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
2,356✔
2513
  if (ret != 0) return;
2,356!
2514

2515
  SyncRequestVoteReply* pMsg = rpcMsg.pCont;
2,356✔
2516
  pMsg->srcId = pSyncNode->myRaftId;
2,356✔
2517
  pMsg->destId = pSyncNode->myRaftId;
2,356✔
2518
  pMsg->term = currentTerm;
2,356✔
2519
  pMsg->voteGranted = true;
2,356✔
2520

2521
  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
2,356✔
2522
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
2,356✔
2523
  rpcFreeCont(rpcMsg.pCont);
2,356✔
2524
}
2525

2526
// return if has a snapshot
2527
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
26,569✔
2528
  bool      ret = false;
26,569✔
2529
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
26,569✔
2530
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
26,569!
2531
    // TODO check return value
2532
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
26,569✔
2533
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
26,569✔
2534
      ret = true;
3,001✔
2535
    }
2536
  }
2537
  return ret;
26,569✔
2538
}
2539

2540
// return max(logLastIndex, snapshotLastIndex)
2541
// if no snapshot and log, return -1
2542
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
26,652✔
2543
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
26,652✔
2544
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
26,652!
2545
    // TODO check return value
2546
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
26,652✔
2547
  }
2548
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
26,652✔
2549

2550
  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
26,652✔
2551
  return lastIndex;
26,652✔
2552
}
2553

2554
// return the last term of snapshot and log
2555
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
2556
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
26,569✔
2557
  SyncTerm lastTerm = 0;
26,569✔
2558
  if (syncNodeHasSnapshot(pSyncNode)) {
26,569✔
2559
    // has snapshot
2560
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
3,001✔
2561
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
3,001!
2562
      // TODO check return value
2563
      (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
3,001✔
2564
    }
2565

2566
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
3,001✔
2567
    if (logLastIndex > snapshot.lastApplyIndex) {
3,001✔
2568
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
1,687✔
2569
    } else {
2570
      lastTerm = snapshot.lastApplyTerm;
1,314✔
2571
    }
2572

2573
  } else {
2574
    // no snapshot
2575
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
23,568✔
2576
  }
2577

2578
  return lastTerm;
26,569✔
2579
}
2580

2581
// get last index and term along with snapshot
2582
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
23,040✔
2583
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
23,040✔
2584
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
23,040✔
2585
  return 0;
23,040✔
2586
}
2587

2588
#ifdef BUILD_NO_CALL
2589
// return append-entries first try index
2590
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
2591
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
2592
  return syncStartIndex;
2593
}
2594

2595
// if index > 0, return index - 1
2596
// else, return -1
2597
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
2598
  SyncIndex preIndex = index - 1;
2599
  if (preIndex < SYNC_INDEX_INVALID) {
2600
    preIndex = SYNC_INDEX_INVALID;
2601
  }
2602

2603
  return preIndex;
2604
}
2605

2606
// if index < 0, return SYNC_TERM_INVALID
2607
// if index == 0, return 0
2608
// if index > 0, return preTerm
2609
// if error, return SYNC_TERM_INVALID
2610
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
2611
  if (index < SYNC_INDEX_BEGIN) {
2612
    return SYNC_TERM_INVALID;
2613
  }
2614

2615
  if (index == SYNC_INDEX_BEGIN) {
2616
    return 0;
2617
  }
2618

2619
  SyncTerm  preTerm = 0;
2620
  SyncIndex preIndex = index - 1;
2621

2622
  SSyncRaftEntry* pPreEntry = NULL;
2623
  SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
2624
  LRUHandle*      h = taosLRUCacheLookup(pCache, &preIndex, sizeof(preIndex));
2625
  int32_t         code = 0;
2626
  if (h) {
2627
    pPreEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
2628
    code = 0;
2629

2630
    pSyncNode->pLogStore->cacheHit++;
2631
    sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", preIndex, pPreEntry->bytes, pPreEntry);
2632

2633
  } else {
2634
    pSyncNode->pLogStore->cacheMiss++;
2635
    sNTrace(pSyncNode, "miss cache index:%" PRId64, preIndex);
2636

2637
    code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
2638
  }
2639

2640
  SSnapshot snapshot = {.data = NULL,
2641
                        .lastApplyIndex = SYNC_INDEX_INVALID,
2642
                        .lastApplyTerm = SYNC_TERM_INVALID,
2643
                        .lastConfigIndex = SYNC_INDEX_INVALID};
2644

2645
  if (code == 0) {
2646
    if (pPreEntry == NULL) return -1;
2647
    preTerm = pPreEntry->term;
2648

2649
    if (h) {
2650
      taosLRUCacheRelease(pCache, h, false);
2651
    } else {
2652
      syncEntryDestroy(pPreEntry);
2653
    }
2654

2655
    return preTerm;
2656
  } else {
2657
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
2658
      // TODO check return value
2659
      (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
2660
      if (snapshot.lastApplyIndex == preIndex) {
2661
        return snapshot.lastApplyTerm;
2662
      }
2663
    }
2664
  }
2665

2666
  sNError(pSyncNode, "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRId64,
2667
          index, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
2668
  return SYNC_TERM_INVALID;
2669
}
2670

2671
// get pre index and term of "index"
2672
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
2673
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
2674
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
2675
  return 0;
2676
}
2677
#endif
2678

2679
static void syncNodeEqPingTimer(void* param, void* tmrId) {
77,770✔
2680
  if (!syncIsInit()) return;
77,770!
2681

2682
  int64_t    rid = (int64_t)param;
77,770✔
2683
  SSyncNode* pNode = syncNodeAcquire(rid);
77,770✔
2684

2685
  if (pNode == NULL) return;
77,770!
2686

2687
  if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
77,770!
2688
    SRpcMsg rpcMsg = {0};
77,770✔
2689
    int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
77,770✔
2690
                                    pNode->pingTimerMS, pNode);
2691
    if (code != 0) {
77,770!
2692
      sError("failed to build ping msg");
×
2693
      rpcFreeCont(rpcMsg.pCont);
×
2694
      goto _out;
×
2695
    }
2696

2697
    // sTrace("enqueue ping msg");
2698
    code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
77,770✔
2699
    if (code != 0) {
77,770✔
2700
      sError("failed to sync enqueue ping msg since %s", terrstr());
1!
2701
      rpcFreeCont(rpcMsg.pCont);
1✔
2702
      goto _out;
1✔
2703
    }
2704

2705
  _out:
77,769✔
2706
    if (taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
77,770!
2707
                     &pNode->pPingTimer))
2708
      sError("failed to reset ping timer");
×
2709
  }
2710
  syncNodeRelease(pNode);
77,770✔
2711
}
2712

2713
static void syncNodeEqElectTimer(void* param, void* tmrId) {
2,369✔
2714
  if (!syncIsInit()) return;
2,381!
2715

2716
  int64_t    rid = (int64_t)param;
2,369✔
2717
  SSyncNode* pNode = syncNodeAcquire(rid);
2,369✔
2718

2719
  if (pNode == NULL) return;
2,369✔
2720

2721
  if (pNode->syncEqMsg == NULL) {
2,365!
2722
    syncNodeRelease(pNode);
×
2723
    return;
×
2724
  }
2725

2726
  int64_t tsNow = taosGetTimestampMs();
2,365✔
2727
  if (tsNow < pNode->electTimerParam.executeTime) {
2,365✔
2728
    syncNodeRelease(pNode);
8✔
2729
    return;
8✔
2730
  }
2731

2732
  SRpcMsg rpcMsg = {0};
2,357✔
2733
  int32_t code =
2734
      syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerParam.logicClock, pNode->electTimerMS, pNode);
2,357✔
2735

2736
  if (code != 0) {
2,357!
2737
    sError("failed to build elect msg");
×
2738
    syncNodeRelease(pNode);
×
2739
    return;
×
2740
  }
2741

2742
  SyncTimeout* pTimeout = rpcMsg.pCont;
2,357✔
2743
  sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);
2,357!
2744

2745
  code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
2,357✔
2746
  if (code != 0) {
2,357!
2747
    sError("failed to sync enqueue elect msg since %s", terrstr());
×
2748
    rpcFreeCont(rpcMsg.pCont);
×
2749
    syncNodeRelease(pNode);
×
2750
    return;
×
2751
  }
2752

2753
  syncNodeRelease(pNode);
2,357✔
2754
}
2755

2756
#ifdef BUILD_NO_CALL
2757
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
2758
  if (!syncIsInit()) return;
2759

2760
  int64_t    rid = (int64_t)param;
2761
  SSyncNode* pNode = syncNodeAcquire(rid);
2762

2763
  if (pNode == NULL) return;
2764

2765
  if (pNode->totalReplicaNum > 1) {
2766
    if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
2767
      SRpcMsg rpcMsg = {0};
2768
      int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
2769
                                      pNode->heartbeatTimerMS, pNode);
2770

2771
      if (code != 0) {
2772
        sError("failed to build heartbeat msg");
2773
        goto _out;
2774
      }
2775

2776
      sTrace("vgId:%d, enqueue heartbeat timer", pNode->vgId);
2777
      code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
2778
      if (code != 0) {
2779
        sError("failed to enqueue heartbeat msg since %s", terrstr());
2780
        rpcFreeCont(rpcMsg.pCont);
2781
        goto _out;
2782
      }
2783

2784
    _out:
2785
      if (taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
2786
                       &pNode->pHeartbeatTimer) != 0)
2787
        return;
2788

2789
    } else {
2790
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
2791
             pNode->heartbeatTimerLogicClock, pNode->heartbeatTimerLogicClockUser);
2792
    }
2793
  }
2794
}
2795
#endif
2796

2797
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
66,404✔
2798
  int32_t code = 0;
66,404✔
2799
  int64_t hbDataRid = (int64_t)param;
66,404✔
2800
  int64_t tsNow = taosGetTimestampMs();
66,404✔
2801

2802
  SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid);
66,404✔
2803
  if (pData == NULL) {
66,404!
2804
    sError("hb timer get pData NULL, %" PRId64, hbDataRid);
×
2805
    return;
×
2806
  }
2807

2808
  SSyncNode* pSyncNode = syncNodeAcquire(pData->syncNodeRid);
66,404✔
2809
  if (pSyncNode == NULL) {
66,404✔
2810
    syncHbTimerDataRelease(pData);
4✔
2811
    sError("hb timer get pSyncNode NULL");
4!
2812
    return;
4✔
2813
  }
2814

2815
  SSyncTimer* pSyncTimer = pData->pTimer;
66,400✔
2816

2817
  if (!pSyncNode->isStart) {
66,400!
2818
    syncNodeRelease(pSyncNode);
×
2819
    syncHbTimerDataRelease(pData);
×
2820
    sError("vgId:%d, hb timer sync node already stop", pSyncNode->vgId);
×
2821
    return;
×
2822
  }
2823

2824
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
66,400!
2825
    syncNodeRelease(pSyncNode);
×
2826
    syncHbTimerDataRelease(pData);
×
2827
    sError("vgId:%d, hb timer sync node not leader", pSyncNode->vgId);
×
2828
    return;
×
2829
  }
2830

2831
  sTrace("vgId:%d, peer hb timer execution, rid:%" PRId64 " addr:0x%" PRIx64, pSyncNode->vgId, hbDataRid,
66,400!
2832
         pData->destId.addr);
2833

2834
  if (pSyncNode->totalReplicaNum > 1) {
66,400✔
2835
    int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
66,396✔
2836
    int64_t msgLogicClock = atomic_load_64(&pData->logicClock);
66,396✔
2837

2838
    if (timerLogicClock == msgLogicClock) {
66,396✔
2839
      if (tsNow > pData->execTime) {
66,391✔
2840
        pData->execTime += pSyncTimer->timerMS;
66,259✔
2841

2842
        SRpcMsg rpcMsg = {0};
66,259✔
2843
        if ((code = syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId)) != 0) {
66,259!
2844
          sError("vgId:%d, failed to build heartbeat msg since %s", pSyncNode->vgId, tstrerror(code));
×
2845
          syncNodeRelease(pSyncNode);
×
2846
          syncHbTimerDataRelease(pData);
×
2847
          return;
×
2848
        }
2849

2850
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);
66,259✔
2851

2852
        SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
66,259✔
2853
        pSyncMsg->srcId = pSyncNode->myRaftId;
66,259✔
2854
        pSyncMsg->destId = pData->destId;
66,259✔
2855
        pSyncMsg->term = raftStoreGetTerm(pSyncNode);
66,259✔
2856
        pSyncMsg->commitIndex = pSyncNode->commitIndex;
66,259✔
2857
        pSyncMsg->minMatchIndex = pSyncNode->minMatchIndex;
66,259✔
2858
        pSyncMsg->privateTerm = 0;
66,259✔
2859
        pSyncMsg->timeStamp = tsNow;
66,259✔
2860

2861
        // update reset time
2862
        int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
66,259✔
2863
        pSyncTimer->timeStamp = tsNow;
66,259✔
2864

2865
        // send msg
2866
        TRACE_SET_MSGID(&(rpcMsg.info.traceId), tGenIdPI64());
66,259✔
2867
        TRACE_SET_ROOTID(&(rpcMsg.info.traceId), tGenIdPI64());
66,259✔
2868
        sGTrace(&rpcMsg.info.traceId, "vgId:%d, send sync-heartbeat to dnode:%d", pSyncNode->vgId,
66,259!
2869
                DID(&(pSyncMsg->destId)));
2870
        syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime);
66,259✔
2871
        int ret = syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
66,259✔
2872
        if (ret != 0) {
66,259✔
2873
          sError("vgId:%d, failed to send heartbeat since %s", pSyncNode->vgId, tstrerror(ret));
99!
2874
        }
2875
      }
2876

2877
      if (syncIsInit()) {
66,391!
2878
        sTrace("vgId:%d, reset peer hb timer at %d", pSyncNode->vgId, pSyncTimer->timerMS);
66,391!
2879
        bool stopped = taosTmrResetPriority(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, (void*)hbDataRid,
66,391✔
2880
                                            syncEnv()->pTimerManager, &pSyncTimer->pTimer, 2);
66,391✔
2881
        if (stopped) sError("vgId:%d, reset peer hb timer error, %s", pSyncNode->vgId, tstrerror(code));
66,391!
2882

2883
      } else {
2884
        sError("sync env is stop, reset peer hb timer error");
×
2885
      }
2886

2887
    } else {
2888
      sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64, pSyncNode->vgId,
5!
2889
             timerLogicClock, msgLogicClock);
2890
    }
2891
  }
2892

2893
  syncHbTimerDataRelease(pData);
66,400✔
2894
  syncNodeRelease(pSyncNode);
66,400✔
2895
}
2896

2897
#ifdef BUILD_NO_CALL
2898
static void deleteCacheEntry(const void* key, size_t keyLen, void* value, void* ud) {
2899
  (void)ud;
2900
  taosMemoryFree(value);
2901
}
2902

2903
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
2904
  SSyncLogStoreData* pData = pLogStore->data;
2905
  sNTrace(pData->pSyncNode, "in cache index:%" PRId64 ", bytes:%u, %p", pEntry->index, pEntry->bytes, pEntry);
2906

2907
  int32_t   code = 0;
2908
  int32_t   entryLen = sizeof(*pEntry) + pEntry->dataLen;
2909
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
2910
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW, NULL);
2911
  if (status != TAOS_LRU_STATUS_OK) {
2912
    code = -1;
2913
  }
2914

2915
  return code;
2916
}
2917
#endif
2918

2919
void syncBuildConfigFromReq(SAlterVnodeReplicaReq* pReq, SSyncCfg* cfg) {  // TODO SAlterVnodeReplicaReq name is proper?
×
2920
  cfg->replicaNum = 0;
×
2921
  cfg->totalReplicaNum = 0;
×
2922
  int32_t code = 0;
×
2923

2924
  for (int i = 0; i < pReq->replica; ++i) {
×
2925
    SNodeInfo* pNode = &cfg->nodeInfo[i];
×
2926
    pNode->nodeId = pReq->replicas[i].id;
×
2927
    pNode->nodePort = pReq->replicas[i].port;
×
2928
    tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
×
2929
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2930
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
×
2931
    sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d, update:%d", pReq->vgId, i, pNode->nodeFqdn,
×
2932
          pNode->nodePort, pNode->nodeId, pNode->nodeRole, update);
2933
    cfg->replicaNum++;
×
2934
  }
2935
  if (pReq->selfIndex != -1) {
×
2936
    cfg->myIndex = pReq->selfIndex;
×
2937
  }
2938
  for (int i = cfg->replicaNum; i < pReq->replica + pReq->learnerReplica; ++i) {
×
2939
    SNodeInfo* pNode = &cfg->nodeInfo[i];
×
2940
    pNode->nodeId = pReq->learnerReplicas[cfg->totalReplicaNum].id;
×
2941
    pNode->nodePort = pReq->learnerReplicas[cfg->totalReplicaNum].port;
×
2942
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
2943
    tstrncpy(pNode->nodeFqdn, pReq->learnerReplicas[cfg->totalReplicaNum].fqdn, sizeof(pNode->nodeFqdn));
×
2944
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
×
2945
    sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d, update:%d", pReq->vgId, i, pNode->nodeFqdn,
×
2946
          pNode->nodePort, pNode->nodeId, pNode->nodeRole, update);
2947
    cfg->totalReplicaNum++;
×
2948
  }
2949
  cfg->totalReplicaNum += pReq->replica;
×
2950
  if (pReq->learnerSelfIndex != -1) {
×
2951
    cfg->myIndex = pReq->replica + pReq->learnerSelfIndex;
×
2952
  }
2953
  cfg->changeVersion = pReq->changeVersion;
×
2954
}
×
2955

2956
int32_t syncNodeCheckChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry) {
×
2957
  int32_t code = 0;
×
2958
  if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) {
×
2959
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
2960
  }
2961

2962
  SMsgHead* head = (SMsgHead*)pEntry->data;
×
2963
  void*     pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
×
2964

2965
  SAlterVnodeTypeReq req = {0};
×
2966
  if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
×
2967
    code = TSDB_CODE_INVALID_MSG;
×
2968
    TAOS_RETURN(code);
×
2969
  }
2970

2971
  SSyncCfg cfg = {0};
×
2972
  syncBuildConfigFromReq(&req, &cfg);
×
2973

2974
  if (cfg.totalReplicaNum >= 1 &&
×
2975
      (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
×
2976
    bool incfg = false;
×
2977
    for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
2978
      if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
2979
          ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
2980
        incfg = true;
×
2981
        break;
×
2982
      }
2983
    }
2984

2985
    if (!incfg) {
×
2986
      SyncTerm currentTerm = raftStoreGetTerm(ths);
×
2987
      SRaftId  id = EMPTY_RAFT_ID;
×
2988
      syncNodeStepDown(ths, currentTerm, id);
×
2989
      return 1;
×
2990
    }
2991
  }
2992
  return 0;
×
2993
}
2994

2995
void syncNodeLogConfigInfo(SSyncNode* ths, SSyncCfg* cfg, char* str) {
×
2996
  sInfo("vgId:%d, %s. SyncNode, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64
×
2997
        ", changeVersion:%d, "
2998
        "restoreFinish:%d",
2999
        ths->vgId, str, ths->replicaNum, ths->peersNum, ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion,
3000
        ths->restoreFinish);
3001

3002
  sInfo("vgId:%d, %s, myNodeInfo, clusterId:0x%" PRIx64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str,
×
3003
        ths->myNodeInfo.clusterId, ths->myNodeInfo.nodeId, ths->myNodeInfo.nodeFqdn, ths->myNodeInfo.nodePort,
3004
        ths->myNodeInfo.nodeRole);
3005

3006
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3007
    sInfo("vgId:%d, %s, peersNodeInfo%d, clusterId:0x%" PRIx64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str,
×
3008
          i, ths->peersNodeInfo[i].clusterId, ths->peersNodeInfo[i].nodeId, ths->peersNodeInfo[i].nodeFqdn,
3009
          ths->peersNodeInfo[i].nodePort, ths->peersNodeInfo[i].nodeRole);
3010
  }
3011

3012
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3013
    char    buf[256];
3014
    int32_t len = 256;
×
3015
    int32_t n = 0;
×
3016
    n += tsnprintf(buf + n, len - n, "%s", "{");
×
3017
    for (int i = 0; i < ths->peersEpset->numOfEps; i++) {
×
3018
      n += tsnprintf(buf + n, len - n, "%s:%d%s", ths->peersEpset->eps[i].fqdn, ths->peersEpset->eps[i].port,
×
3019
                     (i + 1 < ths->peersEpset->numOfEps ? ", " : ""));
×
3020
    }
3021
    n += tsnprintf(buf + n, len - n, "%s", "}");
×
3022

3023
    sInfo("vgId:%d, %s, peersEpset%d, %s, inUse:%d", ths->vgId, str, i, buf, ths->peersEpset->inUse);
×
3024
  }
3025

3026
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3027
    sInfo("vgId:%d, %s, peersId%d, addr:0x%" PRIx64, ths->vgId, str, i, ths->peersId[i].addr);
×
3028
  }
3029

3030
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3031
    sInfo("vgId:%d, %s, nodeInfo%d, clusterId:0x%" PRIx64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str, i,
×
3032
          ths->raftCfg.cfg.nodeInfo[i].clusterId, ths->raftCfg.cfg.nodeInfo[i].nodeId,
3033
          ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, ths->raftCfg.cfg.nodeInfo[i].nodePort,
3034
          ths->raftCfg.cfg.nodeInfo[i].nodeRole);
3035
  }
3036

3037
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3038
    sInfo("vgId:%d, %s, replicasId%d, addr:0x%" PRIx64, ths->vgId, str, i, ths->replicasId[i].addr);
×
3039
  }
3040
}
×
3041

3042
int32_t syncNodeRebuildPeerAndCfg(SSyncNode* ths, SSyncCfg* cfg) {
×
3043
  int32_t i = 0;
×
3044

3045
  // change peersNodeInfo
3046
  i = 0;
×
3047
  for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3048
    if (!(strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3049
          ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)) {
×
3050
      ths->peersNodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
×
3051
      ths->peersNodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
×
3052
      tstrncpy(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
×
3053
      ths->peersNodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
×
3054
      ths->peersNodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
×
3055

3056
      syncUtilNodeInfo2EpSet(&ths->peersNodeInfo[i], &ths->peersEpset[i]);
×
3057

3058
      if (syncUtilNodeInfo2RaftId(&ths->peersNodeInfo[i], ths->vgId, &ths->peersId[i]) == false) {
×
3059
        sError("vgId:%d, failed to determine raft member id, peer:%d", ths->vgId, i);
×
3060
        return terrno;
×
3061
      }
3062

3063
      i++;
×
3064
    }
3065
  }
3066
  ths->peersNum = i;
×
3067

3068
  // change cfg nodeInfo
3069
  ths->raftCfg.cfg.replicaNum = 0;
×
3070
  i = 0;
×
3071
  for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3072
    if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3073
      ths->raftCfg.cfg.replicaNum++;
×
3074
    }
3075
    ths->raftCfg.cfg.nodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
×
3076
    ths->raftCfg.cfg.nodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
×
3077
    tstrncpy(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
×
3078
    ths->raftCfg.cfg.nodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
×
3079
    ths->raftCfg.cfg.nodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
×
3080
    if ((strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3081
         ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)) {
×
3082
      ths->raftCfg.cfg.myIndex = i;
×
3083
    }
3084
    i++;
×
3085
  }
3086
  ths->raftCfg.cfg.totalReplicaNum = i;
×
3087

3088
  return 0;
×
3089
}
3090

3091
void syncNodeChangePeerAndCfgToVoter(SSyncNode* ths, SSyncCfg* cfg) {
×
3092
  // change peersNodeInfo
3093
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3094
    for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3095
      if (strcmp(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3096
          ths->peersNodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort) {
×
3097
        if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3098
          ths->peersNodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3099
        }
3100
      }
3101
    }
3102
  }
3103

3104
  // change cfg nodeInfo
3105
  ths->raftCfg.cfg.replicaNum = 0;
×
3106
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3107
    for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3108
      if (strcmp(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3109
          ths->raftCfg.cfg.nodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort) {
×
3110
        if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3111
          ths->raftCfg.cfg.nodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3112
          ths->raftCfg.cfg.replicaNum++;
×
3113
        }
3114
      }
3115
    }
3116
  }
3117
}
×
3118

3119
int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum) {
×
3120
  int32_t code = 0;
×
3121
  // 1.rebuild replicasId, remove deleted one
3122
  SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
3123
  memcpy(oldReplicasId, ths->replicasId, sizeof(oldReplicasId));
×
3124

3125
  ths->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3126
  ths->totalReplicaNum = ths->raftCfg.cfg.totalReplicaNum;
×
3127
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3128
    if (syncUtilNodeInfo2RaftId(&ths->raftCfg.cfg.nodeInfo[i], ths->vgId, &ths->replicasId[i]) == false) return terrno;
×
3129
  }
3130

3131
  // 2.rebuild MatchIndex, remove deleted one
3132
  SSyncIndexMgr* oldIndex = ths->pMatchIndex;
×
3133

3134
  ths->pMatchIndex = syncIndexMgrCreate(ths);
×
3135
  if (ths->pMatchIndex == NULL) {
×
3136
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3137
    if (terrno != 0) code = terrno;
×
3138
    TAOS_RETURN(code);
×
3139
  }
3140

3141
  syncIndexMgrCopyIfExist(ths->pMatchIndex, oldIndex, oldReplicasId);
×
3142

3143
  syncIndexMgrDestroy(oldIndex);
×
3144

3145
  // 3.rebuild NextIndex, remove deleted one
3146
  SSyncIndexMgr* oldNextIndex = ths->pNextIndex;
×
3147

3148
  ths->pNextIndex = syncIndexMgrCreate(ths);
×
3149
  if (ths->pNextIndex == NULL) {
×
3150
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3151
    if (terrno != 0) code = terrno;
×
3152
    TAOS_RETURN(code);
×
3153
  }
3154

3155
  syncIndexMgrCopyIfExist(ths->pNextIndex, oldNextIndex, oldReplicasId);
×
3156

3157
  syncIndexMgrDestroy(oldNextIndex);
×
3158

3159
  // 4.rebuild pVotesGranted, pVotesRespond, no need to keep old vote state, only rebuild
3160
  voteGrantedUpdate(ths->pVotesGranted, ths);
×
3161
  votesRespondUpdate(ths->pVotesRespond, ths);
×
3162

3163
  // 5.rebuild logReplMgr
3164
  for (int i = 0; i < oldtotalReplicaNum; ++i) {
×
3165
    sDebug("vgId:%d, old logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId,
×
3166
           i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
3167
           ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
3168
  }
3169

3170
  SSyncLogReplMgr* oldLogReplMgrs = NULL;
×
3171
  int64_t          length = sizeof(SSyncLogReplMgr) * (TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA);
×
3172
  oldLogReplMgrs = taosMemoryMalloc(length);
×
3173
  if (NULL == oldLogReplMgrs) return terrno;
×
3174
  memset(oldLogReplMgrs, 0, length);
×
3175

3176
  for (int i = 0; i < oldtotalReplicaNum; i++) {
×
3177
    oldLogReplMgrs[i] = *(ths->logReplMgrs[i]);
×
3178
  }
3179

3180
  syncNodeLogReplDestroy(ths);
×
3181
  if ((code = syncNodeLogReplInit(ths)) != 0) {
×
3182
    taosMemoryFree(oldLogReplMgrs);
×
3183
    TAOS_RETURN(code);
×
3184
  }
3185

3186
  for (int i = 0; i < ths->totalReplicaNum; ++i) {
×
3187
    for (int j = 0; j < oldtotalReplicaNum; j++) {
×
3188
      if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
×
3189
        *(ths->logReplMgrs[i]) = oldLogReplMgrs[j];
×
3190
        ths->logReplMgrs[i]->peerId = i;
×
3191
      }
3192
    }
3193
  }
3194

3195
  for (int i = 0; i < ths->totalReplicaNum; ++i) {
×
3196
    sDebug("vgId:%d, new logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId,
×
3197
           i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
3198
           ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
3199
  }
3200

3201
  // 6.rebuild sender
3202
  for (int i = 0; i < oldtotalReplicaNum; ++i) {
×
3203
    sDebug("vgId:%d, old sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i,
×
3204
           ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
3205
  }
3206

3207
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3208
    if (ths->senders[i] != NULL) {
×
3209
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", ths->vgId, ths->senders[i]);
×
3210

3211
      if (snapshotSenderIsStart(ths->senders[i])) {
×
3212
        snapshotSenderStop(ths->senders[i], false);
×
3213
      }
3214

3215
      snapshotSenderDestroy(ths->senders[i]);
×
3216
      ths->senders[i] = NULL;
×
3217
    }
3218
  }
3219

3220
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3221
    SSyncSnapshotSender* pSender = NULL;
×
3222
    int32_t              code = snapshotSenderCreate(ths, i, &pSender);
×
3223
    if (pSender == NULL) return terrno = code;
×
3224

3225
    ths->senders[i] = pSender;
×
3226
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
×
3227
  }
3228

3229
  for (int i = 0; i < ths->totalReplicaNum; i++) {
×
3230
    sDebug("vgId:%d, new sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i,
×
3231
           ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
3232
  }
3233

3234
  // 7.rebuild synctimer
3235
  if ((code = syncNodeStopHeartbeatTimer(ths)) != 0) {
×
3236
    taosMemoryFree(oldLogReplMgrs);
×
3237
    TAOS_RETURN(code);
×
3238
  }
3239

3240
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3241
    if ((code = syncHbTimerInit(ths, &ths->peerHeartbeatTimerArr[i], ths->replicasId[i])) != 0) {
×
3242
      taosMemoryFree(oldLogReplMgrs);
×
3243
      TAOS_RETURN(code);
×
3244
    }
3245
  }
3246

3247
  if ((code = syncNodeStartHeartbeatTimer(ths)) != 0) {
×
3248
    taosMemoryFree(oldLogReplMgrs);
×
3249
    TAOS_RETURN(code);
×
3250
  }
3251

3252
  // 8.rebuild peerStates
3253
  SPeerState oldState[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0};
×
3254
  for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) {
×
3255
    oldState[i] = ths->peerStates[i];
×
3256
  }
3257

3258
  for (int i = 0; i < ths->totalReplicaNum; i++) {
×
3259
    for (int j = 0; j < oldtotalReplicaNum; j++) {
×
3260
      if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
×
3261
        ths->peerStates[i] = oldState[j];
×
3262
      }
3263
    }
3264
  }
3265

3266
  taosMemoryFree(oldLogReplMgrs);
×
3267

3268
  return 0;
×
3269
}
3270

3271
void syncNodeChangeToVoter(SSyncNode* ths) {
×
3272
  // replicasId, only need to change replicaNum when 1->3
3273
  ths->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3274
  sDebug("vgId:%d, totalReplicaNum:%d", ths->vgId, ths->totalReplicaNum);
×
3275
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
×
3276
    sDebug("vgId:%d, i:%d, replicaId.addr:0x%" PRIx64, ths->vgId, i, ths->replicasId[i].addr);
×
3277
  }
3278

3279
  // pMatchIndex, pNextIndex, only need to change replicaNum when 1->3
3280
  ths->pMatchIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3281
  ths->pNextIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3282

3283
  sDebug("vgId:%d, pMatchIndex->totalReplicaNum:%d", ths->vgId, ths->pMatchIndex->totalReplicaNum);
×
3284
  for (int32_t i = 0; i < ths->pMatchIndex->totalReplicaNum; ++i) {
×
3285
    sDebug("vgId:%d, i:%d, match.index:%" PRId64, ths->vgId, i, ths->pMatchIndex->index[i]);
×
3286
  }
3287

3288
  // pVotesGranted, pVotesRespond
3289
  voteGrantedUpdate(ths->pVotesGranted, ths);
×
3290
  votesRespondUpdate(ths->pVotesRespond, ths);
×
3291

3292
  // logRepMgrs
3293
  // no need to change logRepMgrs when 1->3
3294
}
×
3295

3296
void syncNodeResetPeerAndCfg(SSyncNode* ths) {
×
3297
  SNodeInfo node = {0};
×
3298
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3299
    memcpy(&ths->peersNodeInfo[i], &node, sizeof(SNodeInfo));
×
3300
  }
3301

3302
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3303
    memcpy(&ths->raftCfg.cfg.nodeInfo[i], &node, sizeof(SNodeInfo));
×
3304
  }
3305
}
×
3306

3307
int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str) {
×
3308
  int32_t code = 0;
×
3309
  if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) {
×
3310
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3311
  }
3312

3313
  SMsgHead* head = (SMsgHead*)pEntry->data;
×
3314
  void*     pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
×
3315

3316
  SAlterVnodeTypeReq req = {0};
×
3317
  if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
×
3318
    code = TSDB_CODE_INVALID_MSG;
×
3319
    TAOS_RETURN(code);
×
3320
  }
3321

3322
  SSyncCfg cfg = {0};
×
3323
  syncBuildConfigFromReq(&req, &cfg);
×
3324

3325
  if (cfg.changeVersion <= ths->raftCfg.cfg.changeVersion) {
×
3326
    sInfo(
×
3327
        "vgId:%d, skip conf change entry since lower version. "
3328
        "this entry, index:%" PRId64 ", term:%" PRId64
3329
        ", totalReplicaNum:%d, changeVersion:%d; "
3330
        "current node, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64 ", changeVersion:%d",
3331
        ths->vgId, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum,
3332
        ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion);
3333
    return 0;
×
3334
  }
3335

3336
  if (strcmp(str, "Commit") == 0) {
×
3337
    sInfo(
×
3338
        "vgId:%d, change config from %s. "
3339
        "this, i:%" PRId64
3340
        ", trNum:%d, vers:%d; "
3341
        "node, rNum:%d, pNum:%d, trNum:%d, "
3342
        "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
3343
        "), "
3344
        "cond:(next i:%" PRId64 ", t:%" PRId64 " ==%s)",
3345
        ths->vgId, str, pEntry->index - 1, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum,
3346
        ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, ths->pLogBuf->matchIndex,
3347
        ths->pLogBuf->endIndex, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType));
3348
  } else {
3349
    sInfo(
×
3350
        "vgId:%d, change config from %s. "
3351
        "this, i:%" PRId64 ", t:%" PRId64
3352
        ", trNum:%d, vers:%d; "
3353
        "node, rNum:%d, pNum:%d, trNum:%d, "
3354
        "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
3355
        "), "
3356
        "cond:(pre i:%" PRId64 "==ci:%" PRId64 ", bci:%" PRId64 ")",
3357
        ths->vgId, str, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum,
3358
        ths->peersNum, ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
3359
        ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex, pEntry->index - 1, ths->commitIndex,
3360
        ths->pLogBuf->commitIndex);
3361
  }
3362

3363
  syncNodeLogConfigInfo(ths, &cfg, "before config change");
×
3364

3365
  int32_t oldTotalReplicaNum = ths->totalReplicaNum;
×
3366

3367
  if (cfg.totalReplicaNum == 1 || cfg.totalReplicaNum == 2) {  // remove replica
×
3368

3369
    bool incfg = false;
×
3370
    for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3371
      if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3372
          ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3373
        incfg = true;
×
3374
        break;
×
3375
      }
3376
    }
3377

3378
    if (incfg) {  // remove other
×
3379
      syncNodeResetPeerAndCfg(ths);
×
3380

3381
      // no need to change myNodeInfo
3382

3383
      if ((code = syncNodeRebuildPeerAndCfg(ths, &cfg)) != 0) {
×
3384
        TAOS_RETURN(code);
×
3385
      };
3386

3387
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3388
        TAOS_RETURN(code);
×
3389
      };
3390
    } else {  // remove myself
3391
      // no need to do anything actually, to change the following to reduce distruptive server chance
3392

3393
      syncNodeResetPeerAndCfg(ths);
×
3394

3395
      // change myNodeInfo
3396
      ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3397

3398
      // change peer and cfg
3399
      ths->peersNum = 0;
×
3400
      memcpy(&ths->raftCfg.cfg.nodeInfo[0], &ths->myNodeInfo, sizeof(SNodeInfo));
×
3401
      ths->raftCfg.cfg.replicaNum = 0;
×
3402
      ths->raftCfg.cfg.totalReplicaNum = 1;
×
3403

3404
      // change other
3405
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3406
        TAOS_RETURN(code);
×
3407
      }
3408

3409
      // change state
3410
      ths->state = TAOS_SYNC_STATE_LEARNER;
×
3411
    }
3412

3413
    ths->restoreFinish = false;
×
3414
  } else {                            // add replica, or change replica type
3415
    if (ths->totalReplicaNum == 3) {  // change replica type
×
3416
      sInfo("vgId:%d, begin change replica type", ths->vgId);
×
3417

3418
      // change myNodeInfo
3419
      for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3420
        if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3421
            ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3422
          if (cfg.nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3423
            ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3424
          }
3425
        }
3426
      }
3427

3428
      // change peer and cfg
3429
      syncNodeChangePeerAndCfgToVoter(ths, &cfg);
×
3430

3431
      // change other
3432
      syncNodeChangeToVoter(ths);
×
3433

3434
      // change state
3435
      if (ths->state == TAOS_SYNC_STATE_LEARNER) {
×
3436
        if (ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3437
          ths->state = TAOS_SYNC_STATE_FOLLOWER;
×
3438
        }
3439
      }
3440

3441
      ths->restoreFinish = false;
×
3442
    } else {  // add replica
3443
      sInfo("vgId:%d, begin add replica", ths->vgId);
×
3444

3445
      // no need to change myNodeInfo
3446

3447
      // change peer and cfg
3448
      if ((code = syncNodeRebuildPeerAndCfg(ths, &cfg)) != 0) {
×
3449
        TAOS_RETURN(code);
×
3450
      };
3451

3452
      // change other
3453
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3454
        TAOS_RETURN(code);
×
3455
      };
3456

3457
      // no need to change state
3458

3459
      if (ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) {
×
3460
        ths->restoreFinish = false;
×
3461
      }
3462
    }
3463
  }
3464

3465
  ths->quorum = syncUtilQuorum(ths->replicaNum);
×
3466

3467
  ths->raftCfg.lastConfigIndex = pEntry->index;
×
3468
  ths->raftCfg.cfg.lastIndex = pEntry->index;
×
3469
  ths->raftCfg.cfg.changeVersion = cfg.changeVersion;
×
3470

3471
  syncNodeLogConfigInfo(ths, &cfg, "after config change");
×
3472

3473
  if ((code = syncWriteCfgFile(ths)) != 0) {
×
3474
    sError("vgId:%d, failed to create sync cfg file", ths->vgId);
×
3475
    TAOS_RETURN(code);
×
3476
  };
3477

3478
  TAOS_RETURN(code);
×
3479
}
3480

3481
int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry, SRpcMsg* pMsg) {
2,977,555✔
3482
  int32_t code = -1;
2,977,555✔
3483
  if (pEntry->dataLen < sizeof(SMsgHead)) {
2,977,555!
3484
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
3485
    sError("vgId:%d, msg:%p, cannot append an invalid client request with no msg head, type:%s dataLen:%d", ths->vgId,
×
3486
           pMsg, TMSG_INFO(pEntry->originalRpcType), pEntry->dataLen);
3487
    syncEntryDestroy(pEntry);
×
3488
    pEntry = NULL;
×
3489
    goto _out;
×
3490
  }
3491

3492
  // append to log buffer
3493
  if ((code = syncLogBufferAppend(ths->pLogBuf, ths, pEntry)) < 0) {
2,977,555✔
3494
    sError("vgId:%d, index:%" PRId64 ", failed to enqueue sync log buffer", ths->vgId, pEntry->index);
123!
3495
    int32_t ret = 0;
123✔
3496
    if ((ret = syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno, false)) != 0) {
123!
3497
      sError("vgId:%d, index:%" PRId64 ", failed to execute fsm since %s", ths->vgId, pEntry->index, tstrerror(ret));
×
3498
    }
3499
    syncEntryDestroy(pEntry);
×
3500
    pEntry = NULL;
×
3501
    goto _out;
×
3502
  }
3503

3504
  code = 0;
2,977,491✔
3505
_out:;
2,977,491✔
3506
  // proceed match index, with replicating on needed
3507
  SyncIndex       matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL, "Append", pMsg);
2,977,491✔
3508
  const STraceId* trace = pEntry ? &pEntry->originRpcTraceId : NULL;
2,977,334✔
3509

3510
  if (pEntry != NULL) {
2,977,334!
3511
    sGDebug(trace,
2,977,380!
3512
            "vgId:%d, index:%" PRId64 ", raft entry appended, msg:%p term:%" PRId64 " buf:[%" PRId64 " %" PRId64
3513
            " %" PRId64 ", %" PRId64 ")",
3514
            ths->vgId, pEntry->index, pMsg, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
3515
            ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
3516
  }
3517

3518
  if (code == 0 && ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
2,977,335!
3519
    int64_t index = syncNodeUpdateAssignedCommitIndex(ths, matchIndex);
×
3520
    sGTrace(trace, "vgId:%d, index:%" PRId64 ", update assigned commit, msg:%p", ths->vgId, index, pMsg);
×
3521

3522
    if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
×
3523
        syncLogBufferCommit(ths->pLogBuf, ths, ths->assignedCommitIndex, trace, "append-entry") < 0) {
×
3524
      sGError(trace, "vgId:%d, index:%" PRId64 ", failed to commit, msg:%p commit index:%" PRId64, ths->vgId, index,
×
3525
              pMsg, ths->commitIndex);
3526
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
3527
    }
3528
  }
3529

3530
  // multi replica
3531
  if (ths->replicaNum > 1) {
2,977,240✔
3532
    TAOS_RETURN(code);
221,012✔
3533
  }
3534

3535
  // single replica
3536
  SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, matchIndex);
2,756,228✔
3537
  sGTrace(trace, "vgId:%d, index:%" PRId64 ", raft entry update commit, msg:%p return index:%" PRId64, ths->vgId,
2,756,196!
3538
          matchIndex, pMsg, returnIndex);
3539

3540
  if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
5,512,721!
3541
      (code = syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex, trace, "append-entry")) < 0) {
2,756,248✔
3542
    sGError(trace,
×
3543
            "vgId:%d, index:%" PRId64 ", failed to commit, msg:%p commit index:%" PRId64 " return index:%" PRId64,
3544
            ths->vgId, matchIndex, pMsg, ths->commitIndex, returnIndex);
3545
  }
3546

3547
  TAOS_RETURN(code);
2,756,473✔
3548
}
3549

3550
bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
2,964,399✔
3551
  if (pSyncNode->totalReplicaNum == 1) {
2,964,399✔
3552
    return false;
2,739,204✔
3553
  }
3554

3555
  int32_t toCount = 0;
225,195✔
3556
  int64_t tsNow = taosGetTimestampMs();
225,356✔
3557
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
671,746✔
3558
    if (pSyncNode->peersNodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
446,398✔
3559
      continue;
4,581✔
3560
    }
3561
    int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
441,817✔
3562
    if (recvTime == 0 || recvTime == -1) {
441,809!
3563
      continue;
×
3564
    }
3565

3566
    if (tsNow - recvTime > tsHeartbeatTimeout) {
441,810✔
3567
      toCount++;
14,140✔
3568
    }
3569
  }
3570

3571
  bool b = (toCount >= pSyncNode->quorum ? true : false);
225,348✔
3572

3573
  return b;
225,348✔
3574
}
3575

3576
bool syncNodeSnapshotSending(SSyncNode* pSyncNode) {
×
3577
  if (pSyncNode == NULL) return false;
×
3578
  bool b = false;
×
3579
  for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
×
3580
    if (pSyncNode->senders[i] != NULL && pSyncNode->senders[i]->start) {
×
3581
      b = true;
×
3582
      break;
×
3583
    }
3584
  }
3585
  return b;
×
3586
}
3587

3588
bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) {
×
3589
  if (pSyncNode == NULL) return false;
×
3590
  if (pSyncNode->pNewNodeReceiver == NULL) return false;
×
3591
  if (pSyncNode->pNewNodeReceiver->start) return true;
×
3592
  return false;
×
3593
}
3594

3595
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
15,173✔
3596
  int32_t   code = 0;
15,173✔
3597
  SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
15,173✔
3598
  SyncTerm  term = raftStoreGetTerm(ths);
15,173✔
3599

3600
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
15,173✔
3601
  if (pEntry == NULL) {
15,173!
3602
    code = TSDB_CODE_OUT_OF_MEMORY;
×
3603
    TAOS_RETURN(code);
×
3604
  }
3605

3606
  code = syncNodeAppend(ths, pEntry, NULL);
15,173✔
3607
  TAOS_RETURN(code);
15,173✔
3608
}
3609

3610
#ifdef BUILD_NO_CALL
3611
static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
3612
  int32_t ret = 0;
3613

3614
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
3615
  SyncTerm        term = raftStoreGetTerm(ths);
3616
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
3617
  if (pEntry == NULL) return -1;
3618

3619
  LRUHandle* h = NULL;
3620

3621
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
3622
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry, false);
3623
    if (code != 0) {
3624
      sError("append noop error");
3625
      return -1;
3626
    }
3627

3628
    syncCacheEntry(ths->pLogStore, pEntry, &h);
3629
  }
3630

3631
  if (h) {
3632
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
3633
  } else {
3634
    syncEntryDestroy(pEntry);
3635
  }
3636

3637
  return ret;
3638
}
3639
#endif
3640

3641
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
61,688✔
3642
  SyncHeartbeat* pMsg = pRpcMsg->pCont;
61,688✔
3643
  bool           resetElect = false;
61,688✔
3644

3645
  int64_t tsMs = taosGetTimestampMs();
61,688✔
3646

3647
  int64_t lastRecvTime = syncIndexMgrGetRecvTime(ths->pNextIndex, &(pMsg->srcId));
61,688✔
3648
  syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);
61,687✔
3649
  syncIndexMgrIncRecvCount(ths->pNextIndex, &(pMsg->srcId));
61,687✔
3650

3651
  int64_t netElapsed = tsMs - pMsg->timeStamp;
61,688✔
3652
  int64_t timeDiff = tsMs - lastRecvTime;
61,688✔
3653
  syncLogRecvHeartbeat(ths, pMsg, netElapsed, &pRpcMsg->info.traceId, timeDiff);
61,688✔
3654

3655
  if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) {
61,687✔
3656
    sWarn(
17!
3657
        "vgId:%d, drop heartbeat msg from dnode:%d, because it come from another cluster:%d, differ from current "
3658
        "cluster:%d",
3659
        ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), CID(&(ths->myRaftId)));
3660
    return 0;
17✔
3661
  }
3662

3663
  SyncTerm currentTerm = raftStoreGetTerm(ths);
61,671✔
3664

3665
  if (pMsg->term > currentTerm && ths->state == TAOS_SYNC_STATE_LEARNER) {
61,671✔
3666
    raftStoreSetTerm(ths, pMsg->term);
386✔
3667
    currentTerm = pMsg->term;
386✔
3668
  }
3669

3670
  int64_t tsMs2 = taosGetTimestampMs();
61,671✔
3671

3672
  int64_t processTime = tsMs2 - tsMs;
61,671✔
3673
  if (processTime > SYNC_HEARTBEAT_SLOW_MS) {
61,671!
3674
    sGError(&pRpcMsg->info.traceId,
×
3675
            "vgId:%d, process sync-heartbeat msg from dnode:%d, commit-index:%" PRId64 ", cluster:%d msgTerm:%" PRId64
3676
            " currentTerm:%" PRId64 ", processTime:%" PRId64,
3677
            ths->vgId, DID(&(pMsg->srcId)), pMsg->commitIndex, CID(&(pMsg->srcId)), pMsg->term, currentTerm,
3678
            processTime);
3679
  } else {
3680
    sGDebug(&pRpcMsg->info.traceId,
61,671!
3681
            "vgId:%d, process sync-heartbeat msg from dnode:%d, commit-index:%" PRId64 ", cluster:%d msgTerm:%" PRId64
3682
            " currentTerm:%" PRId64 ", processTime:%" PRId64,
3683
            ths->vgId, DID(&(pMsg->srcId)), pMsg->commitIndex, CID(&(pMsg->srcId)), pMsg->term, currentTerm,
3684
            processTime);
3685
  }
3686

3687
  if (pMsg->term == currentTerm &&
61,671✔
3688
      (ths->state != TAOS_SYNC_STATE_LEADER && ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
61,224!
3689
    resetElect = true;
61,224✔
3690

3691
    ths->minMatchIndex = pMsg->minMatchIndex;
61,224✔
3692

3693
    if (ths->state == TAOS_SYNC_STATE_FOLLOWER || ths->state == TAOS_SYNC_STATE_LEARNER) {
61,224✔
3694
      SRpcMsg rpcMsgLocalCmd = {0};
61,214✔
3695
      TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId));
61,214!
3696
      rpcMsgLocalCmd.info.traceId = pRpcMsg->info.traceId;
61,213✔
3697

3698
      SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
61,213✔
3699
      pSyncMsg->cmd =
61,213✔
3700
          (ths->state == TAOS_SYNC_STATE_LEARNER) ? SYNC_LOCAL_CMD_LEARNER_CMT : SYNC_LOCAL_CMD_FOLLOWER_CMT;
61,213✔
3701
      pSyncMsg->commitIndex = pMsg->commitIndex;
61,213✔
3702
      pSyncMsg->currentTerm = pMsg->term;
61,213✔
3703

3704
      if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
61,213!
3705
        int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
61,214✔
3706
        if (code != 0) {
61,213✔
3707
          sError("vgId:%d, failed to enqueue commit msg from heartbeat since %s, code:%d", ths->vgId, terrstr(), code);
1!
3708
          rpcFreeCont(rpcMsgLocalCmd.pCont);
1✔
3709
        } else {
3710
          sGTrace(&pRpcMsg->info.traceId,
61,212!
3711
                  "vgId:%d, enqueue commit msg from heartbeat, commit-index:%" PRId64 ", term:%" PRId64, ths->vgId,
3712
                  pMsg->commitIndex, pMsg->term);
3713
        }
3714
      }
3715
    }
3716
  }
3717

3718
  if (pMsg->term >= currentTerm &&
61,670✔
3719
      (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
61,667!
3720
    SRpcMsg rpcMsgLocalCmd = {0};
×
3721
    TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId));
×
3722
    rpcMsgLocalCmd.info.traceId = pRpcMsg->info.traceId;
×
3723

3724
    SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
×
3725
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
×
3726
    pSyncMsg->currentTerm = pMsg->term;
×
3727
    pSyncMsg->commitIndex = pMsg->commitIndex;
×
3728

3729
    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
×
3730
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
×
3731
      if (code != 0) {
×
3732
        sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
×
3733
        rpcFreeCont(rpcMsgLocalCmd.pCont);
×
3734
      } else {
3735
        sTrace("vgId:%d, sync enqueue step-down msg, new-term:%" PRId64, ths->vgId, pMsg->term);
×
3736
      }
3737
    }
3738
  }
3739

3740
  SRpcMsg rpcMsg = {0};
61,670✔
3741
  TAOS_CHECK_RETURN(syncBuildHeartbeatReply(&rpcMsg, ths->vgId));
61,670!
3742
  SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
61,671✔
3743
  pMsgReply->destId = pMsg->srcId;
61,671✔
3744
  pMsgReply->srcId = ths->myRaftId;
61,671✔
3745
  pMsgReply->term = currentTerm;
61,671✔
3746
  pMsgReply->privateTerm = 8864;  // magic number
61,671✔
3747
  pMsgReply->startTime = ths->startTime;
61,671✔
3748
  pMsgReply->timeStamp = tsMs;
61,671✔
3749
  rpcMsg.info.traceId = pRpcMsg->info.traceId;
61,671✔
3750

3751
  // reply
3752
  int64_t tsMs3 = taosGetTimestampMs();
61,671✔
3753

3754
  int64_t processTime2 = tsMs3 - tsMs2;
61,671✔
3755
  TRACE_SET_MSGID(&(rpcMsg.info.traceId), tGenIdPI64());
61,671✔
3756
  if (processTime2 > SYNC_HEARTBEAT_SLOW_MS) {
61,671✔
3757
    sGError(&rpcMsg.info.traceId,
1!
3758
            "vgId:%d, send sync-heartbeat-reply to dnode:%d term:%" PRId64 " timestamp:%" PRId64
3759
            ", processTime:%" PRId64,
3760
            ths->vgId, DID(&(pMsgReply->destId)), pMsgReply->term, pMsgReply->timeStamp, processTime2);
3761
  } else {
3762
    if(tsSyncLogHeartbeat){
61,670!
3763
      sGInfo(&rpcMsg.info.traceId,
×
3764
            "vgId:%d, send sync-heartbeat-reply to dnode:%d term:%" PRId64 " timestamp:%" PRId64
3765
            ", processTime:%" PRId64,
3766
            ths->vgId, DID(&(pMsgReply->destId)), pMsgReply->term, pMsgReply->timeStamp, processTime2);
3767
    }
3768
    else{
3769
      sGDebug(&rpcMsg.info.traceId,
61,670!
3770
            "vgId:%d, send sync-heartbeat-reply to dnode:%d term:%" PRId64 " timestamp:%" PRId64
3771
            ", processTime:%" PRId64,
3772
            ths->vgId, DID(&(pMsgReply->destId)), pMsgReply->term, pMsgReply->timeStamp, processTime2);
3773
    }
3774
  }
3775

3776
  TAOS_CHECK_RETURN(syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg));
61,671!
3777

3778
  if (resetElect) syncNodeResetElectTimer(ths);
61,671✔
3779
  return 0;
61,671✔
3780
}
3781

3782
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
60,102✔
3783
  int32_t code = 0;
60,102✔
3784

3785
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
60,102✔
3786
  SSyncLogReplMgr*    pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId);
60,102✔
3787
  if (pMgr == NULL) {
60,102!
3788
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3789
    if (terrno != 0) code = terrno;
×
3790
    sError("vgId:%d, failed to get log repl mgr for the peer at addr 0x016%" PRIx64, ths->vgId, pMsg->srcId.addr);
×
3791
    TAOS_RETURN(code);
×
3792
  }
3793

3794
  int64_t tsMs = taosGetTimestampMs();
60,102✔
3795
  int64_t lastRecvTime = syncIndexMgrGetRecvTime(ths->pMatchIndex, &pMsg->srcId);
60,102✔
3796
  syncLogRecvHeartbeatReply(ths, pMsg, tsMs - pMsg->timeStamp, &pRpcMsg->info.traceId, tsMs - lastRecvTime);
60,102✔
3797

3798
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
60,102✔
3799
  syncIndexMgrIncRecvCount(ths->pNextIndex, &(pMsg->srcId));
60,102✔
3800

3801
  return syncLogReplProcessHeartbeatReply(pMgr, ths, pMsg);
60,102✔
3802
}
3803

3804
#ifdef BUILD_NO_CALL
3805
int32_t syncNodeOnHeartbeatReplyOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
3806
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
3807

3808
  int64_t tsMs = taosGetTimestampMs();
3809
  int64_t timeDiff = tsMs - pMsg->timeStamp;
3810
  syncLogRecvHeartbeatReply(ths, pMsg, timeDiff, &pRpcMsg->info.traceId, timeDiff);
3811

3812
  // update last reply time, make decision whether the other node is alive or not
3813
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
3814
  return 0;
3815
}
3816
#endif
3817

3818
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
61,212✔
3819
  SyncLocalCmd* pMsg = pRpcMsg->pCont;
61,212✔
3820
  syncLogRecvLocalCmd(ths, pMsg, &pRpcMsg->info.traceId);
61,212✔
3821

3822
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
61,212!
3823
    SRaftId id = EMPTY_RAFT_ID;
×
3824
    syncNodeStepDown(ths, pMsg->currentTerm, id);
×
3825

3826
  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT || pMsg->cmd == SYNC_LOCAL_CMD_LEARNER_CMT) {
122,424!
3827
    if (syncLogBufferIsEmpty(ths->pLogBuf)) {
61,212!
3828
      sError("vgId:%d, sync log buffer is empty", ths->vgId);
×
3829
      return 0;
×
3830
    }
3831
    SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf);
61,213✔
3832
    if (matchTerm < 0) {
61,213!
3833
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3834
    }
3835
    if (pMsg->currentTerm == matchTerm) {
61,213✔
3836
      SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
56,239✔
3837
      sTrace("vgId:%d, raft entry update commit, return index:%" PRId64, ths->vgId, returnIndex);
56,236!
3838
    }
3839
    if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
122,422!
3840
        syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex, &pRpcMsg->info.traceId, "heartbeat") < 0) {
61,212✔
3841
      sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64, ths->vgId, terrstr(),
×
3842
             ths->commitIndex);
3843
    }
3844
  } else {
3845
    sError("error local cmd");
×
3846
  }
3847

3848
  return 0;
61,212✔
3849
}
3850

3851
// TLA+ Spec
3852
// ClientRequest(i, v) ==
3853
//     /\ state[i] = Leader
3854
//     /\ LET entry == [term  |-> currentTerm[i],
3855
//                      value |-> v]
3856
//            newLog == Append(log[i], entry)
3857
//        IN  log' = [log EXCEPT ![i] = newLog]
3858
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
3859
//                    leaderVars, commitIndex>>
3860
//
3861

3862
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
2,962,418✔
3863
  sGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, process client request", ths->vgId, pMsg);
2,962,418!
3864
  int32_t code = 0;
2,962,421✔
3865

3866
  SyncIndex       index = syncLogBufferGetEndIndex(ths->pLogBuf);
2,962,421✔
3867
  SyncTerm        term = raftStoreGetTerm(ths);
2,962,582✔
3868
  SSyncRaftEntry* pEntry = NULL;
2,962,511✔
3869
  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
2,962,511✔
3870
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index, &pMsg->info.traceId);
332,297✔
3871
  } else {
3872
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
2,630,214✔
3873
  }
3874

3875
  if (pEntry == NULL) {
2,962,452!
3876
    sGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to process client request since %s", ths->vgId, pMsg,
×
3877
            terrstr());
3878
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3879
  }
3880

3881
  // 1->2, config change is add in write thread, and will continue in sync thread
3882
  // need save message for it
3883
  if (pMsg->msgType == TDMT_SYNC_CONFIG_CHANGE) {
2,962,452!
3884
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
×
3885
    uint64_t  seqNum = syncRespMgrAdd(ths->pSyncRespMgr, &stub);
×
3886
    pEntry->seqNum = seqNum;
×
3887
  }
3888

3889
  if (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
2,962,452!
3890
    if (pRetIndex) {
2,962,452✔
3891
      (*pRetIndex) = index;
2,630,135✔
3892
    }
3893

3894
    if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
2,962,452!
3895
      int32_t code = syncNodeCheckChangeConfig(ths, pEntry);
×
3896
      if (code < 0) {
×
3897
        sGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to check change config since %s", ths->vgId, pMsg,
×
3898
                terrstr());
3899
        syncEntryDestroy(pEntry);
×
3900
        pEntry = NULL;
×
3901
        TAOS_RETURN(code);
×
3902
      }
3903

3904
      if (code > 0) {
×
3905
        SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
×
3906
        int32_t num = syncRespMgrGetAndDel(ths->pSyncRespMgr, pEntry->seqNum, &rsp.info);
×
3907
        sGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, get response stub for config change, seqNum:%" PRIu64 " num:%d",
×
3908
                ths->vgId, pMsg, pEntry->seqNum, num);
3909
        if (rsp.info.handle != NULL) {
×
3910
          tmsgSendRsp(&rsp);
×
3911
        }
3912
        syncEntryDestroy(pEntry);
×
3913
        pEntry = NULL;
×
3914
        TAOS_RETURN(code);
×
3915
      }
3916
    }
3917

3918
    code = syncNodeAppend(ths, pEntry, pMsg);
2,962,452✔
3919
    return code;
2,961,992✔
3920
  } else {
3921
    syncEntryDestroy(pEntry);
×
3922
    pEntry = NULL;
×
3923
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3924
  }
3925
}
3926

3927
const char* syncStr(ESyncState state) {
1,124,506✔
3928
  switch (state) {
1,124,506!
3929
    case TAOS_SYNC_STATE_FOLLOWER:
732,533✔
3930
      return "follower";
732,533✔
3931
    case TAOS_SYNC_STATE_CANDIDATE:
5,795✔
3932
      return "candidate";
5,795✔
3933
    case TAOS_SYNC_STATE_LEADER:
358,692✔
3934
      return "leader";
358,692✔
3935
    case TAOS_SYNC_STATE_ERROR:
×
3936
      return "error";
×
3937
    case TAOS_SYNC_STATE_OFFLINE:
6,309✔
3938
      return "offline";
6,309✔
3939
    case TAOS_SYNC_STATE_LEARNER:
21,188✔
3940
      return "learner";
21,188✔
3941
    case TAOS_SYNC_STATE_ASSIGNED_LEADER:
×
3942
      return "assigned leader";
×
3943
    default:
×
3944
      return "unknown";
×
3945
  }
3946
}
3947

3948
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
2,381✔
3949
  for (int32_t i = 0; i < pNewCfg->totalReplicaNum; ++i) {
2,675!
3950
    SRaftId raftId = {
2,675✔
3951
        .addr = SYNC_ADDR(&pNewCfg->nodeInfo[i]),
2,675✔
3952
        .vgId = ths->vgId,
2,675✔
3953
    };
3954

3955
    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
2,675✔
3956
      pNewCfg->myIndex = i;
2,381✔
3957
      return 0;
2,381✔
3958
    }
3959
  }
3960

3961
  return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3962
}
3963

3964
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
2,964,362✔
3965
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
2,964,362!
3966
}
3967

3968
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
1,086,035✔
3969
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
2,136,623✔
3970
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
2,136,613✔
3971
      return true;
1,086,017✔
3972
    }
3973
  }
3974
  return false;
10✔
3975
}
3976

3977
SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
45,797✔
3978
  SSyncSnapshotSender* pSender = NULL;
45,797✔
3979
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
185,191✔
3980
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
139,368✔
3981
      pSender = (ths->senders)[i];
45,849✔
3982
    }
3983
  }
3984
  return pSender;
45,823✔
3985
}
3986

3987
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
44,046✔
3988
  SSyncTimer* pTimer = NULL;
44,046✔
3989
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
186,292✔
3990
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
142,242✔
3991
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
44,046✔
3992
    }
3993
  }
3994
  return pTimer;
44,050✔
3995
}
3996

3997
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
132,034✔
3998
  SPeerState* pState = NULL;
132,034✔
3999
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
420,782✔
4000
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
288,752✔
4001
      pState = &((ths->peerStates)[i]);
132,036✔
4002
    }
4003
  }
4004
  return pState;
132,030✔
4005
}
4006

4007
#ifdef BUILD_NO_CALL
4008
bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
4009
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
4010
  if (pState == NULL) {
4011
    sError("vgId:%d, replica maybe dropped", ths->vgId);
4012
    return false;
4013
  }
4014

4015
  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
4016
  int64_t   tsNow = taosGetTimestampMs();
4017

4018
  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
4019
    return false;
4020
  }
4021

4022
  return true;
4023
}
4024

4025
bool syncNodeCanChange(SSyncNode* pSyncNode) {
4026
  if (pSyncNode->changing) {
4027
    sError("sync cannot change");
4028
    return false;
4029
  }
4030

4031
  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
4032
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
4033
    if (pSyncNode->commitIndex != lastIndex) {
4034
      sError("sync cannot change2");
4035
      return false;
4036
    }
4037
  }
4038

4039
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
4040
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
4041
    if (pSender != NULL && pSender->start) {
4042
      sError("sync cannot change3");
4043
      return false;
4044
    }
4045
  }
4046

4047
  return true;
4048
}
4049
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc