• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.02
/source/libs/sync/src/syncMain.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "sync.h"
18
#include "syncAppendEntries.h"
19
#include "syncAppendEntriesReply.h"
20
#include "syncCommit.h"
21
#include "syncElection.h"
22
#include "syncEnv.h"
23
#include "syncIndexMgr.h"
24
#include "syncInt.h"
25
#include "syncMessage.h"
26
#include "syncPipeline.h"
27
#include "syncRaftCfg.h"
28
#include "syncRaftLog.h"
29
#include "syncRaftStore.h"
30
#include "syncReplication.h"
31
#include "syncRequestVote.h"
32
#include "syncRequestVoteReply.h"
33
#include "syncRespMgr.h"
34
#include "syncSnapshot.h"
35
#include "syncTimeout.h"
36
#include "syncUtil.h"
37
#include "syncVoteMgr.h"
38
#include "tglobal.h"
39
#include "tmisce.h"
40
#include "tref.h"
41

42
static void    syncNodeEqPingTimer(void* param, void* tmrId);
43
static void    syncNodeEqElectTimer(void* param, void* tmrId);
44
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
45
static int32_t syncNodeAppendNoop(SSyncNode* ths);
46
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
47
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
48
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
49
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
50
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
51
static int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
52
static bool    syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
53
static int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
54
static bool    syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);
55

56
static bool    syncNodeCanChange(SSyncNode* pSyncNode);
57
static int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
58
static int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
59
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);
60

61
static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
62

63
int64_t syncOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
13,653✔
64
  sInfo("vgId:%d, start to open sync", pSyncInfo->vgId);
13,653✔
65
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo, vnodeVersion);
13,653✔
66
  if (pSyncNode == NULL) {
13,655!
67
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
×
68
    return -1;
×
69
  }
70

71
  pSyncNode->rid = syncNodeAdd(pSyncNode);
13,655✔
72
  if (pSyncNode->rid < 0) {
13,655!
73
    syncNodeClose(pSyncNode);
×
74
    return -1;
×
75
  }
76

77
  pSyncNode->pingBaseLine = pSyncInfo->pingMs;
13,655✔
78
  pSyncNode->pingTimerMS = pSyncInfo->pingMs;
13,655✔
79
  pSyncNode->electBaseLine = pSyncInfo->electMs;
13,655✔
80
  pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
13,655✔
81
  pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
13,655✔
82
  pSyncNode->msgcb = pSyncInfo->msgcb;
13,655✔
83
  sInfo("vgId:%d, sync opened", pSyncInfo->vgId);
13,655✔
84
  return pSyncNode->rid;
13,655✔
85
}
86

87
int32_t syncStart(int64_t rid) {
13,654✔
88
  int32_t    code = 0;
13,654✔
89
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
13,654✔
90
  if (pSyncNode == NULL) {
13,654!
91
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
92
    if (terrno != 0) code = terrno;
×
93
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
94
    TAOS_RETURN(code);
×
95
  }
96
  sInfo("vgId:%d, begin to start sync", pSyncNode->vgId);
13,654✔
97

98
  if ((code = syncNodeRestore(pSyncNode)) < 0) {
13,655!
99
    sError("vgId:%d, failed to restore sync log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
100
    goto _err;
×
101
  }
102

103
  if ((code = syncNodeStart(pSyncNode)) < 0) {
13,653!
104
    sError("vgId:%d, failed to start sync node since %s", pSyncNode->vgId, tstrerror(code));
×
105
    goto _err;
×
106
  }
107

108
  syncNodeRelease(pSyncNode);
13,654✔
109

110
  sInfo("vgId:%d, sync started", pSyncNode->vgId);
13,654✔
111

112
  TAOS_RETURN(code);
13,654✔
113

114
_err:
×
115
  syncNodeRelease(pSyncNode);
×
116
  TAOS_RETURN(code);
×
117
}
118

119
int32_t syncNodeGetConfig(int64_t rid, SSyncCfg* cfg) {
28,703✔
120
  int32_t    code = 0;
28,703✔
121
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
28,703✔
122

123
  if (pSyncNode == NULL) {
28,704!
124
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
125
    if (terrno != 0) code = terrno;
×
126
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
127
    TAOS_RETURN(code);
×
128
  }
129

130
  *cfg = pSyncNode->raftCfg.cfg;
28,704✔
131

132
  syncNodeRelease(pSyncNode);
28,704✔
133

134
  return 0;
28,704✔
135
}
136

137
void syncStop(int64_t rid) {
13,654✔
138
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
13,654✔
139
  if (pSyncNode != NULL) {
13,654!
140
    pSyncNode->isStart = false;
13,654✔
141
    syncNodeRelease(pSyncNode);
13,654✔
142
    syncNodeRemove(rid);
13,654✔
143
  }
144
}
13,650✔
145

146
void syncPreStop(int64_t rid) {
13,653✔
147
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
13,653✔
148
  if (pSyncNode != NULL) {
13,654!
149
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
13,654!
150
      sInfo("vgId:%d, stop snapshot receiver", pSyncNode->vgId);
×
151
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
152
    }
153
    syncNodePreClose(pSyncNode);
13,654✔
154
    syncNodeRelease(pSyncNode);
13,654✔
155
  }
156
}
13,654✔
157

158
void syncPostStop(int64_t rid) {
11,907✔
159
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
11,907✔
160
  if (pSyncNode != NULL) {
11,907!
161
    syncNodePostClose(pSyncNode);
11,907✔
162
    syncNodeRelease(pSyncNode);
11,905✔
163
  }
164
}
11,901✔
165

166
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
1,662✔
167
  if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
1,662!
168
  return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
1,662!
169
}
170

171
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
1,779✔
172
  int32_t    code = 0;
1,779✔
173
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
1,779✔
174
  if (pSyncNode == NULL) {
1,779!
175
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
176
    if (terrno != 0) code = terrno;
×
177
    TAOS_RETURN(code);
×
178
  }
179

180
  if (pSyncNode->raftCfg.lastConfigIndex >= pNewCfg->lastIndex) {
1,779✔
181
    syncNodeRelease(pSyncNode);
117✔
182
    sInfo("vgId:%d, no need Reconfig, current index:%" PRId64 ", new index:%" PRId64, pSyncNode->vgId,
117!
183
          pSyncNode->raftCfg.lastConfigIndex, pNewCfg->lastIndex);
184
    return 0;
117✔
185
  }
186

187
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
1,662!
188
    syncNodeRelease(pSyncNode);
×
189
    code = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
×
190
    sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
×
191
    TAOS_RETURN(code);
×
192
  }
193

194
  TAOS_CHECK_RETURN(syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg));
1,662!
195

196
  if (syncNodeDoConfigChange(pSyncNode, pNewCfg, pNewCfg->lastIndex) != 0) {
1,662!
197
    code = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
×
198
    sError("vgId:%d, failed to reconfig since do change error", pSyncNode->vgId);
×
199
    TAOS_RETURN(code);
×
200
  }
201

202
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER || pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
1,662!
203
    // TODO check return value
204
    TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1,464!
205

206
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
23,424✔
207
      TAOS_CHECK_RETURN(syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]));
21,960!
208
    }
209

210
    TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
1,464!
211
    // syncNodeReplicate(pSyncNode);
212
  }
213

214
  syncNodeRelease(pSyncNode);
1,662✔
215
  TAOS_RETURN(code);
1,662✔
216
}
217

218
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
2,411,021✔
219
  int32_t code = -1;
2,411,021✔
220
  if (!syncIsInit()) {
2,411,021!
221
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
222
    if (terrno != 0) code = terrno;
×
223
    TAOS_RETURN(code);
×
224
  }
225

226
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
2,410,890✔
227
  if (pSyncNode == NULL) {
2,411,619✔
228
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
256✔
229
    if (terrno != 0) code = terrno;
256!
230
    TAOS_RETURN(code);
×
231
  }
232

233
  switch (pMsg->msgType) {
2,411,363!
234
    case TDMT_SYNC_HEARTBEAT:
37,012✔
235
      code = syncNodeOnHeartbeat(pSyncNode, pMsg);
37,012✔
236
      break;
37,012✔
237
    case TDMT_SYNC_HEARTBEAT_REPLY:
36,334✔
238
      code = syncNodeOnHeartbeatReply(pSyncNode, pMsg);
36,334✔
239
      break;
36,334✔
240
    case TDMT_SYNC_TIMEOUT:
248,092✔
241
      code = syncNodeOnTimeout(pSyncNode, pMsg);
248,092✔
242
      break;
248,166✔
243
    case TDMT_SYNC_TIMEOUT_ELECTION:
1,314✔
244
      code = syncNodeOnTimeout(pSyncNode, pMsg);
1,314✔
245
      break;
1,315✔
246
    case TDMT_SYNC_CLIENT_REQUEST:
242,379✔
247
      code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
242,379✔
248
      break;
242,389✔
249
    case TDMT_SYNC_REQUEST_VOTE:
2,074✔
250
      code = syncNodeOnRequestVote(pSyncNode, pMsg);
2,074✔
251
      break;
2,074✔
252
    case TDMT_SYNC_REQUEST_VOTE_REPLY:
1,940✔
253
      code = syncNodeOnRequestVoteReply(pSyncNode, pMsg);
1,940✔
254
      break;
1,940✔
255
    case TDMT_SYNC_APPEND_ENTRIES:
895,499✔
256
      code = syncNodeOnAppendEntries(pSyncNode, pMsg);
895,499✔
257
      break;
895,509✔
258
    case TDMT_SYNC_APPEND_ENTRIES_REPLY:
896,255✔
259
      code = syncNodeOnAppendEntriesReply(pSyncNode, pMsg);
896,255✔
260
      break;
896,254✔
261
    case TDMT_SYNC_SNAPSHOT_SEND:
6,749✔
262
      code = syncNodeOnSnapshot(pSyncNode, pMsg);
6,749✔
263
      break;
6,749✔
264
    case TDMT_SYNC_SNAPSHOT_RSP:
6,857✔
265
      code = syncNodeOnSnapshotRsp(pSyncNode, pMsg);
6,857✔
266
      break;
6,857✔
267
    case TDMT_SYNC_LOCAL_CMD:
36,843✔
268
      code = syncNodeOnLocalCmd(pSyncNode, pMsg);
36,843✔
269
      break;
36,843✔
270
    case TDMT_SYNC_FORCE_FOLLOWER:
15✔
271
      code = syncForceBecomeFollower(pSyncNode, pMsg);
15✔
272
      break;
15✔
273
    case TDMT_SYNC_SET_ASSIGNED_LEADER:
×
274
      code = syncBecomeAssignedLeader(pSyncNode, pMsg);
×
275
      break;
×
276
    default:
×
277
      code = TSDB_CODE_MSG_NOT_PROCESSED;
×
278
  }
279

280
  syncNodeRelease(pSyncNode);
2,411,457✔
281
  if (code != 0) {
2,411,349✔
282
    sDebug("vgId:%d, failed to process sync msg:%p type:%s since %s", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
15!
283
           tstrerror(code));
284
  }
285
  TAOS_RETURN(code);
2,411,349✔
286
}
287

288
int32_t syncLeaderTransfer(int64_t rid) {
13,654✔
289
  int32_t    code = 0;
13,654✔
290
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
13,654✔
291
  if (pSyncNode == NULL) {
13,654!
292
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
293
    if (terrno != 0) code = terrno;
×
294
    TAOS_RETURN(code);
×
295
  }
296

297
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
13,654✔
298
  syncNodeRelease(pSyncNode);
13,654✔
299
  return ret;
13,654✔
300
}
301

302
int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
15✔
303
  SRaftId id = {0};
15✔
304
  syncNodeBecomeFollower(ths, id, "force election");
15✔
305

306
  SRpcMsg rsp = {
15✔
307
      .code = 0,
308
      .pCont = pRpcMsg->info.rsp,
15✔
309
      .contLen = pRpcMsg->info.rspLen,
15✔
310
      .info = pRpcMsg->info,
311
  };
312
  tmsgSendRsp(&rsp);
15✔
313

314
  return 0;
15✔
315
}
316

317
int32_t syncBecomeAssignedLeader(SSyncNode* ths, SRpcMsg* pRpcMsg) {
×
318
  int32_t code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
319
  void*   pHead = NULL;
×
320
  int32_t contLen = 0;
×
321

322
  SVArbSetAssignedLeaderReq req = {0};
×
323
  if (tDeserializeSVArbSetAssignedLeaderReq((char*)pRpcMsg->pCont + sizeof(SMsgHead), pRpcMsg->contLen, &req) != 0) {
×
324
    sError("vgId:%d, failed to deserialize SVArbSetAssignedLeaderReq", ths->vgId);
×
325
    code = TSDB_CODE_INVALID_MSG;
×
326
    goto _OVER;
×
327
  }
328

329
  if (ths->arbTerm > req.arbTerm) {
×
330
    sInfo("vgId:%d, skip to set assigned leader, msg with lower term, local:%" PRId64 "msg:%" PRId64, ths->vgId,
×
331
          ths->arbTerm, req.arbTerm);
332
    goto _OVER;
×
333
  }
334

335
  ths->arbTerm = TMAX(req.arbTerm, ths->arbTerm);
×
336

337
  if (!req.force) {
×
338
    if (strncmp(req.memberToken, ths->arbToken, TSDB_ARB_TOKEN_SIZE) != 0) {
×
339
      sInfo("vgId:%d, skip to set assigned leader, token mismatch, local:%s, msg:%s", ths->vgId, ths->arbToken,
×
340
            req.memberToken);
341
      goto _OVER;
×
342
    }
343
  }
344

345
  if (ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
×
346
    code = TSDB_CODE_SUCCESS;
×
347
    raftStoreNextTerm(ths);
×
348
    if (terrno != TSDB_CODE_SUCCESS) {
×
349
      code = terrno;
×
350
      sError("vgId:%d, failed to set next term since:%s", ths->vgId, tstrerror(code));
×
351
      goto _OVER;
×
352
    }
353
    syncNodeBecomeAssignedLeader(ths);
×
354

355
    if ((code = syncNodeAppendNoop(ths)) < 0) {
×
356
      sError("vgId:%d, assigned leader failed to append noop entry since %s", ths->vgId, tstrerror(code));
×
357
    }
358
  }
359

360
  SVArbSetAssignedLeaderRsp rsp = {0};
×
361
  rsp.arbToken = req.arbToken;
×
362
  rsp.memberToken = req.memberToken;
×
363
  rsp.vgId = ths->vgId;
×
364

365
  contLen = tSerializeSVArbSetAssignedLeaderRsp(NULL, 0, &rsp);
×
366
  if (contLen <= 0) {
×
367
    code = TSDB_CODE_OUT_OF_MEMORY;
×
368
    sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId);
×
369
    goto _OVER;
×
370
  }
371
  pHead = rpcMallocCont(contLen);
×
372
  if (!pHead) {
×
373
    code = terrno;
×
374
    sError("vgId:%d, failed to malloc memory for SVArbSetAssignedLeaderRsp", ths->vgId);
×
375
    goto _OVER;
×
376
  }
377
  if (tSerializeSVArbSetAssignedLeaderRsp(pHead, contLen, &rsp) <= 0) {
×
378
    code = TSDB_CODE_OUT_OF_MEMORY;
×
379
    sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId);
×
380
    rpcFreeCont(pHead);
×
381
    goto _OVER;
×
382
  }
383

384
  code = TSDB_CODE_SUCCESS;
×
385

386
_OVER:;
×
387
  SRpcMsg rspMsg = {
×
388
      .code = code,
389
      .pCont = pHead,
390
      .contLen = contLen,
391
      .info = pRpcMsg->info,
392
  };
393

394
  tmsgSendRsp(&rspMsg);
×
395

396
  tFreeSVArbSetAssignedLeaderReq(&req);
×
397
  TAOS_RETURN(code);
×
398
}
399

400
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
×
401
  int32_t    code = 0;
×
402
  SSyncNode* pNode = syncNodeAcquire(rid);
×
403
  if (pNode == NULL) {
×
404
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
405
    if (terrno != 0) code = terrno;
×
406
    TAOS_RETURN(code);
×
407
  }
408

409
  SRpcMsg rpcMsg = {0, .info.notFreeAhandle = 1};
×
410
  int32_t ret = syncRespMgrGetAndDel(pNode->pSyncRespMgr, seq, &rpcMsg.info);
×
411
  rpcMsg.code = TSDB_CODE_SYN_TIMEOUT;
×
412

413
  syncNodeRelease(pNode);
×
414
  if (ret == 1) {
×
415
    sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle);
×
416
    code = rpcSendResponse(&rpcMsg);
×
417
    return code;
×
418
  } else {
419
    sError("no message handle to send timeout response, seq:%" PRId64, seq);
×
420
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
421
  }
422
}
423

424
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
42,831✔
425
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;
42,831✔
426

427
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
128,136✔
428
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
85,305✔
429
    if (minMatchIndex == SYNC_INDEX_INVALID) {
85,305✔
430
      minMatchIndex = matchIndex;
45,801✔
431
    } else if (matchIndex > 0 && matchIndex < minMatchIndex) {
39,504✔
432
      minMatchIndex = matchIndex;
1,550✔
433
    }
434
  }
435
  return minMatchIndex;
42,831✔
436
}
437

438
static SyncIndex syncLogRetentionIndex(SSyncNode* pSyncNode, int64_t bytes) {
792✔
439
  return pSyncNode->pLogStore->syncLogIndexRetention(pSyncNode->pLogStore, bytes);
792✔
440
}
441

442
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
31,918✔
443
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
31,918✔
444
  int32_t    code = 0;
31,919✔
445
  if (pSyncNode == NULL) {
31,919✔
446
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
40✔
447
    if (terrno != 0) code = terrno;
40!
448
    sError("sync begin snapshot error");
40!
449
    TAOS_RETURN(code);
40✔
450
  }
451

452
  SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
31,879✔
453
  SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
31,878✔
454
  bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);
31,877✔
455

456
  if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
31,877!
457
    sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
49!
458
    syncNodeRelease(pSyncNode);
49✔
459
    return 0;
49✔
460
  }
461

462
  int64_t logRetention = 0;
31,828✔
463

464
  if (syncNodeIsMnode(pSyncNode)) {
31,828✔
465
    // mnode
466
    logRetention = tsMndLogRetention;
3,174✔
467
  } else {
468
    // vnode
469
    if (pSyncNode->replicaNum > 1) {
28,654✔
470
      logRetention = SYNC_VNODE_LOG_RETENTION;
363✔
471
    }
472
  }
473

474
  if (pSyncNode->totalReplicaNum > 1) {
31,828✔
475
    if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER &&
805✔
476
        pSyncNode->state != TAOS_SYNC_STATE_LEARNER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
150!
477
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate or unknown state, do not delete wal",
13!
478
              lastApplyIndex);
479
      syncNodeRelease(pSyncNode);
13✔
480
      return 0;
13✔
481
    }
482
    SyncIndex retentionIndex =
792✔
483
        TMAX(pSyncNode->minMatchIndex, syncLogRetentionIndex(pSyncNode, SYNC_WAL_LOG_RETENTION_SIZE));
792✔
484
    logRetention += TMAX(0, lastApplyIndex - retentionIndex);
792✔
485
  }
486

487
_DEL_WAL:
31,023✔
488

489
  do {
490
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
31,815✔
491
    SyncIndex          snapshotVer = walGetSnapshotVer(pData->pWal);
31,815✔
492
    SyncIndex          walCommitVer = walGetCommittedVer(pData->pWal);
31,815✔
493
    SyncIndex          wallastVer = walGetLastVer(pData->pWal);
31,815✔
494
    if (lastApplyIndex <= walCommitVer) {
31,815!
495
      SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);
31,815✔
496

497
      if (snapshottingIndex == SYNC_INDEX_INVALID) {
31,814!
498
        atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
31,814✔
499
        pSyncNode->snapshottingTime = taosGetTimestampMs();
31,816✔
500

501
        code = walBeginSnapshot(pData->pWal, lastApplyIndex, logRetention);
31,816✔
502
        if (code == 0) {
31,816!
503
          sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
31,816✔
504
                  pSyncNode->snapshottingIndex, lastApplyIndex);
505
        } else {
506
          sNError(pSyncNode, "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64,
×
507
                  terrstr(), pSyncNode->snapshottingIndex, lastApplyIndex);
508
          atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
×
509
        }
510

511
      } else {
512
        sNTrace(pSyncNode, "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64,
×
513
                snapshottingIndex, lastApplyIndex);
514
      }
515
    }
516
  } while (0);
517

518
  syncNodeRelease(pSyncNode);
31,816✔
519
  TAOS_RETURN(code);
31,817✔
520
}
521

522
int32_t syncEndSnapshot(int64_t rid) {
31,879✔
523
  int32_t    code = 0;
31,879✔
524
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
31,879✔
525
  if (pSyncNode == NULL) {
31,879!
526
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
527
    if (terrno != 0) code = terrno;
×
528
    sError("sync end snapshot error");
×
529
    TAOS_RETURN(code);
×
530
  }
531

532
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
31,879✔
533
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
31,817✔
534
    code = walEndSnapshot(pData->pWal);
31,817✔
535
    if (code != 0) {
31,817!
536
      sNError(pSyncNode, "wal snapshot end error since:%s", tstrerror(code));
×
537
      syncNodeRelease(pSyncNode);
×
538
      TAOS_RETURN(code);
×
539
    } else {
540
      sNTrace(pSyncNode, "wal snapshot end, index:%" PRId64, atomic_load_64(&pSyncNode->snapshottingIndex));
31,817✔
541
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
31,817✔
542
    }
543
  }
544

545
  syncNodeRelease(pSyncNode);
31,879✔
546
  TAOS_RETURN(code);
31,879✔
547
}
548

549
bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
17,513,697✔
550
  if (pSyncNode == NULL) {
17,513,697!
551
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
552
    sError("sync ready for read error");
×
553
    return false;
×
554
  }
555

556
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
17,513,697!
557
    terrno = TSDB_CODE_SYN_NOT_LEADER;
67,644✔
558
    return false;
67,642✔
559
  }
560

561
  if (!pSyncNode->restoreFinish) {
17,446,053✔
562
    terrno = TSDB_CODE_SYN_RESTORING;
20,599✔
563
    return false;
20,599✔
564
  }
565

566
  return true;
17,425,454✔
567
}
568

569
bool syncIsReadyForRead(int64_t rid) {
16,155,968✔
570
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
16,155,968✔
571
  if (pSyncNode == NULL) {
16,158,915!
572
    sError("sync ready for read error");
×
573
    return false;
×
574
  }
575

576
  bool ready = syncNodeIsReadyForRead(pSyncNode);
16,158,915✔
577

578
  syncNodeRelease(pSyncNode);
16,158,979✔
579
  return ready;
16,157,980✔
580
}
581

582
#ifdef BUILD_NO_CALL
583
bool syncSnapshotSending(int64_t rid) {
584
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
585
  if (pSyncNode == NULL) {
586
    return false;
587
  }
588

589
  bool b = syncNodeSnapshotSending(pSyncNode);
590
  syncNodeRelease(pSyncNode);
591
  return b;
592
}
593

594
bool syncSnapshotRecving(int64_t rid) {
595
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
596
  if (pSyncNode == NULL) {
597
    return false;
598
  }
599

600
  bool b = syncNodeSnapshotRecving(pSyncNode);
601
  syncNodeRelease(pSyncNode);
602
  return b;
603
}
604
#endif
605

606
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
13,654✔
607
  if (pSyncNode->peersNum == 0) {
13,654✔
608
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
10,418✔
609
    return 0;
10,418✔
610
  }
611

612
  int32_t ret = 0;
3,236✔
613
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->replicaNum > 1) {
3,236✔
614
    SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
986✔
615
    if (pSyncNode->peersNum == 2) {
986✔
616
      SyncIndex matchIndex0 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
703✔
617
      SyncIndex matchIndex1 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[1]));
703✔
618
      if (matchIndex1 > matchIndex0) {
703✔
619
        newLeader = (pSyncNode->peersNodeInfo)[1];
34✔
620
      }
621
    }
622
    ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
986✔
623
  }
624

625
  return ret;
3,236✔
626
}
627

628
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
986✔
629
  if (pSyncNode->replicaNum == 1) {
986!
630
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
×
631
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
632
  }
633

634
  sNTrace(pSyncNode, "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
986!
635

636
  SRpcMsg rpcMsg = {0};
986✔
637
  TAOS_CHECK_RETURN(syncBuildLeaderTransfer(&rpcMsg, pSyncNode->vgId));
986!
638

639
  SyncLeaderTransfer* pMsg = rpcMsg.pCont;
986✔
640
  pMsg->newLeaderId.addr = SYNC_ADDR(&newLeader);
986✔
641
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
986✔
642
  pMsg->newNodeInfo = newLeader;
986✔
643

644
  int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false, NULL);
986✔
645
  rpcFreeCont(rpcMsg.pCont);
986✔
646
  return ret;
986✔
647
}
648

649
SSyncState syncGetState(int64_t rid) {
6,276,478✔
650
  SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
6,276,478✔
651

652
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
6,276,478✔
653
  if (pSyncNode != NULL) {
6,277,864✔
654
    state.state = pSyncNode->state;
6,277,689✔
655
    state.roleTimeMs = pSyncNode->roleTimeMs;
6,277,689✔
656
    state.startTimeMs = pSyncNode->startTime;
6,277,689✔
657
    state.restored = pSyncNode->restoreFinish;
6,277,689✔
658
    if (pSyncNode->vgId != 1) {
6,277,689✔
659
      state.canRead = syncNodeIsReadyForRead(pSyncNode);
1,357,706✔
660
    } else {
661
      state.canRead = state.restored;
4,919,983✔
662
    }
663
    /*
664
    double progress = 0;
665
    if(pSyncNode->pLogBuf->totalIndex > 0 && pSyncNode->pLogBuf->commitIndex > 0){
666
      progress = (double)pSyncNode->pLogBuf->commitIndex/(double)pSyncNode->pLogBuf->totalIndex;
667
      state.progress = (int32_t)(progress * 100);
668
    }
669
    else{
670
      state.progress = -1;
671
    }
672
    sDebug("vgId:%d, learner progress state, commitIndex:%" PRId64 " totalIndex:%" PRId64 ", "
673
            "progress:%lf, progress:%d",
674
          pSyncNode->vgId,
675
         pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->totalIndex, progress, state.progress);
676
    */
677
    state.term = raftStoreGetTerm(pSyncNode);
6,277,634✔
678
    syncNodeRelease(pSyncNode);
6,277,996✔
679
  }
680

681
  return state;
6,277,907✔
682
}
683

684
void syncGetCommitIndex(int64_t rid, int64_t* syncCommitIndex) {
1,166,108✔
685
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
1,166,108✔
686
  if (pSyncNode != NULL) {
1,166,108!
687
    *syncCommitIndex = pSyncNode->commitIndex;
1,166,108✔
688
    syncNodeRelease(pSyncNode);
1,166,108✔
689
  }
690
}
1,166,108✔
691

692
int32_t syncGetArbToken(int64_t rid, char* outToken) {
56,844✔
693
  int32_t    code = 0;
56,844✔
694
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
56,844✔
695
  if (pSyncNode == NULL) {
56,844!
696
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
697
    if (terrno != 0) code = terrno;
×
698
    TAOS_RETURN(code);
×
699
  }
700

701
  memset(outToken, 0, TSDB_ARB_TOKEN_SIZE);
56,844✔
702
  (void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
56,844✔
703
  tstrncpy(outToken, pSyncNode->arbToken, TSDB_ARB_TOKEN_SIZE);
56,844✔
704
  (void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
56,844✔
705

706
  syncNodeRelease(pSyncNode);
56,844✔
707
  TAOS_RETURN(code);
56,844✔
708
}
709

710
int32_t syncCheckSynced(int64_t rid) {
4✔
711
  int32_t    code = 0;
4✔
712
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
4✔
713
  if (pSyncNode == NULL) {
4!
714
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
715
    if (terrno != 0) code = terrno;
×
716
    TAOS_RETURN(code);
×
717
  }
718

719
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
4!
720
    code = TSDB_CODE_VND_ARB_NOT_SYNCED;
×
721
    syncNodeRelease(pSyncNode);
×
722
    TAOS_RETURN(code);
×
723
  }
724

725
  bool isSync = pSyncNode->commitIndex >= pSyncNode->assignedCommitIndex;
4✔
726
  code = (isSync ? TSDB_CODE_SUCCESS : TSDB_CODE_VND_ARB_NOT_SYNCED);
4!
727

728
  syncNodeRelease(pSyncNode);
4✔
729
  TAOS_RETURN(code);
4✔
730
}
731

732
int32_t syncUpdateArbTerm(int64_t rid, SyncTerm arbTerm) {
110✔
733
  int32_t    code = 0;
110✔
734
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
110✔
735
  if (pSyncNode == NULL) {
110!
736
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
737
    if (terrno != 0) code = terrno;
×
738
    TAOS_RETURN(code);
×
739
  }
740

741
  pSyncNode->arbTerm = TMAX(arbTerm, pSyncNode->arbTerm);
110✔
742
  syncNodeRelease(pSyncNode);
110✔
743
  TAOS_RETURN(code);
110✔
744
}
745

746
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
1,467,348✔
747
  if (!(pSyncNode->raftCfg.configIndexCount >= 1)) {
1,467,348!
748
    sError("vgId:%d, failed get snapshot config index, configIndexCount:%d", pSyncNode->vgId,
×
749
           pSyncNode->raftCfg.configIndexCount);
750
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
751
    return -2;
×
752
  }
753
  SyncIndex lastIndex = (pSyncNode->raftCfg.configIndexArr)[0];
1,467,348✔
754

755
  for (int32_t i = 0; i < pSyncNode->raftCfg.configIndexCount; ++i) {
2,978,835✔
756
    if ((pSyncNode->raftCfg.configIndexArr)[i] > lastIndex &&
1,511,487✔
757
        (pSyncNode->raftCfg.configIndexArr)[i] <= snapshotLastApplyIndex) {
44,145!
758
      lastIndex = (pSyncNode->raftCfg.configIndexArr)[i];
44,145✔
759
    }
760
  }
761
  sTrace("vgId:%d, sync get last config index, index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId,
1,467,348✔
762
         snapshotLastApplyIndex, lastIndex);
763

764
  return lastIndex;
1,467,347✔
765
}
766

767
static SRaftId syncGetRaftIdByEp(SSyncNode* pSyncNode, const SEp* pEp) {
111,558✔
768
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
227,057✔
769
    if (strcmp(pEp->fqdn, (pSyncNode->peersNodeInfo)[i].nodeFqdn) == 0 &&
188,771!
770
        pEp->port == (pSyncNode->peersNodeInfo)[i].nodePort) {
188,771✔
771
      return pSyncNode->peersId[i];
73,272✔
772
    }
773
  }
774
  return EMPTY_RAFT_ID;
38,286✔
775
}
776

777
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
38,388✔
778
  pEpSet->numOfEps = 0;
38,388✔
779

780
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
38,388✔
781
  if (pSyncNode == NULL) return;
38,388!
782

783
  int index = -1;
38,388✔
784

785
  int j = 0;
38,388✔
786
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
151,134✔
787
    if (pSyncNode->raftCfg.cfg.nodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) continue;
112,746✔
788
    SEp* pEp = &pEpSet->eps[j];
111,558✔
789
    tstrncpy(pEp->fqdn, pSyncNode->raftCfg.cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
111,558✔
790
    pEp->port = (pSyncNode->raftCfg.cfg.nodeInfo)[i].nodePort;
111,558✔
791
    pEpSet->numOfEps++;
111,558✔
792
    sDebug("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port);
111,558✔
793
    SRaftId id = syncGetRaftIdByEp(pSyncNode, pEp);
111,558✔
794
    if (id.addr == pSyncNode->leaderCache.addr && id.vgId == pSyncNode->leaderCache.vgId && id.addr != 0 &&
111,558!
795
        id.vgId != 0)
15,280!
796
      index = j;
15,280✔
797
    j++;
111,558✔
798
  }
799
  if (pEpSet->numOfEps > 0) {
38,388!
800
    if (index != -1) {
38,388✔
801
      pEpSet->inUse = index;
15,280✔
802
    } else {
803
      if (pSyncNode->myRaftId.addr == pSyncNode->leaderCache.addr &&
23,108✔
804
          pSyncNode->myRaftId.vgId == pSyncNode->leaderCache.vgId) {
702!
805
        pEpSet->inUse = pSyncNode->raftCfg.cfg.myIndex;
702✔
806
      } else {
807
        pEpSet->inUse = (pSyncNode->raftCfg.cfg.myIndex + 1) % pEpSet->numOfEps;
22,406✔
808
      }
809
    }
810
    // pEpSet->inUse = 0;
811
  }
812
  epsetSort(pEpSet);
38,388✔
813

814
  sInfo("vgId:%d, sync get retry epset numOfEps:%d inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, pEpSet->inUse);
38,388!
815
  syncNodeRelease(pSyncNode);
38,388✔
816
}
817

818
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
10,341,891✔
819
  int32_t    code = 0;
10,341,891✔
820
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
10,341,891✔
821
  if (pSyncNode == NULL) {
10,342,076✔
822
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
1✔
823
    if (terrno != 0) code = terrno;
1!
824
    sError("sync propose error");
1!
825
    TAOS_RETURN(code);
1✔
826
  }
827

828
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak, seq);
10,342,075✔
829
  syncNodeRelease(pSyncNode);
10,341,731✔
830
  return ret;
10,341,979✔
831
}
832

833
int32_t syncCheckMember(int64_t rid) {
×
834
  int32_t    code = 0;
×
835
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
836
  if (pSyncNode == NULL) {
×
837
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
838
    if (terrno != 0) code = terrno;
×
839
    sError("sync propose error");
×
840
    TAOS_RETURN(code);
×
841
  }
842

843
  if (pSyncNode->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) {
×
844
    syncNodeRelease(pSyncNode);
×
845
    return TSDB_CODE_SYN_WRONG_ROLE;
×
846
  }
847

848
  syncNodeRelease(pSyncNode);
×
849
  return 0;
×
850
}
851

852
int32_t syncIsCatchUp(int64_t rid) {
4,487✔
853
  int32_t    code = 0;
4,487✔
854
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
4,487✔
855
  if (pSyncNode == NULL) {
4,487!
856
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
857
    if (terrno != 0) code = terrno;
×
858
    sError("sync Node Acquire error since %d", ERRNO);
×
859
    TAOS_RETURN(code);
×
860
  }
861

862
  int32_t isCatchUp = 0;
4,487✔
863
  if (pSyncNode->pLogBuf->totalIndex < 0 || pSyncNode->pLogBuf->commitIndex < 0 ||
4,487!
864
      pSyncNode->pLogBuf->totalIndex < pSyncNode->pLogBuf->commitIndex ||
1,143!
865
      pSyncNode->pLogBuf->totalIndex - pSyncNode->pLogBuf->commitIndex > SYNC_LEARNER_CATCHUP) {
1,143✔
866
    sInfo("vgId:%d, Not catch up, wait one second, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64,
4,230!
867
          pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex,
868
          pSyncNode->pLogBuf->matchIndex);
869
    isCatchUp = 0;
4,230✔
870
  } else {
871
    sInfo("vgId:%d, Catch up, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, pSyncNode->vgId,
257!
872
          pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->matchIndex);
873
    isCatchUp = 1;
257✔
874
  }
875

876
  syncNodeRelease(pSyncNode);
4,487✔
877
  return isCatchUp;
4,487✔
878
}
879

880
ESyncRole syncGetRole(int64_t rid) {
4,487✔
881
  int32_t    code = 0;
4,487✔
882
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
4,487✔
883
  if (pSyncNode == NULL) {
4,487!
884
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
885
    if (terrno != 0) code = terrno;
×
886
    sError("sync Node Acquire error since %d", ERRNO);
×
887
    TAOS_RETURN(code);
×
888
  }
889

890
  ESyncRole role = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole;
4,487✔
891

892
  syncNodeRelease(pSyncNode);
4,487✔
893
  return role;
4,487✔
894
}
895

896
int64_t syncGetTerm(int64_t rid) {
22,584✔
897
  int32_t    code = 0;
22,584✔
898
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
22,584✔
899
  if (pSyncNode == NULL) {
22,584!
900
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
901
    if (terrno != 0) code = terrno;
×
902
    sError("sync Node Acquire error since %d", ERRNO);
×
903
    TAOS_RETURN(code);
×
904
  }
905

906
  int64_t term = raftStoreGetTerm(pSyncNode);
22,584✔
907

908
  syncNodeRelease(pSyncNode);
22,584✔
909
  return term;
22,584✔
910
}
911

912
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
10,342,820✔
913
  int32_t code = 0;
10,342,820✔
914
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
10,342,820!
915
    code = TSDB_CODE_SYN_NOT_LEADER;
10,094✔
916
    sNWarn(pSyncNode, "sync propose not leader, type:%s", TMSG_INFO(pMsg->msgType));
10,094!
917
    TAOS_RETURN(code);
10,094✔
918
  }
919

920
  if (!pSyncNode->restoreFinish) {
10,332,726✔
921
    code = TSDB_CODE_SYN_PROPOSE_NOT_READY;
2✔
922
    sNWarn(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
2!
923
           TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
924
    TAOS_RETURN(code);
2✔
925
  }
926

927
  // heartbeat timeout
928
  if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER && syncNodeHeartbeatReplyTimeout(pSyncNode)) {
10,332,724!
929
    code = TSDB_CODE_SYN_PROPOSE_NOT_READY;
3✔
930
    sNError(pSyncNode, "failed to sync propose since heartbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
3!
931
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
932
    TAOS_RETURN(code);
3✔
933
  }
934

935
  // optimized one replica
936
  if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
10,332,675✔
937
    SyncIndex retIndex;
938
    int32_t   code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
10,089,208✔
939
    if (code >= 0) {
10,088,893!
940
      pMsg->info.conn.applyIndex = retIndex;
10,088,969✔
941
      pMsg->info.conn.applyTerm = raftStoreGetTerm(pSyncNode);
10,088,969✔
942

943
      // after raft member change, need to handle 1->2 switching point
944
      // at this point, need to switch entry handling thread
945
      if (pSyncNode->replicaNum == 1) {
10,089,174!
946
        sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
10,089,194!
947
               TMSG_INFO(pMsg->msgType));
948
        return 1;
10,089,176✔
949
      } else {
UNCOV
950
        sTrace("vgId:%d, propose optimized msg, return to normal, index:%" PRId64
×
951
               " type:%s, "
952
               "handle:%p",
953
               pSyncNode->vgId, retIndex, TMSG_INFO(pMsg->msgType), pMsg->info.handle);
954
        return 0;
×
955
      }
956
    } else {
957
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
958
      sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
×
959
             TMSG_INFO(pMsg->msgType));
960
      TAOS_RETURN(code);
×
961
    }
962
  } else {
963
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
243,665✔
964
    uint64_t  seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
243,679✔
965
    SRpcMsg   rpcMsg = {0};
243,689✔
966
    int32_t   code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
243,689✔
967
    if (code != 0) {
243,688!
968
      sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
×
969
      code = syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
×
970
      TAOS_RETURN(code);
×
971
    }
972

973
    sNTrace(pSyncNode, "propose msg, type:%s", TMSG_INFO(pMsg->msgType));
243,688!
974
    code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
243,688✔
975
    if (code != 0) {
243,684✔
976
      sWarn("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
1,298!
977
      TAOS_CHECK_RETURN(syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum));
1,300✔
978
    }
979

980
    if (seq != NULL) *seq = seqNum;
243,606✔
981
    TAOS_RETURN(code);
243,606✔
982
  }
983
}
984

985
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
226,775✔
986
  pSyncTimer->pTimer = NULL;
226,775✔
987
  pSyncTimer->counter = 0;
226,775✔
988
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
226,775✔
989
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
226,775✔
990
  pSyncTimer->destId = destId;
226,775✔
991
  pSyncTimer->timeStamp = taosGetTimestampMs();
226,775✔
992
  atomic_store_64(&pSyncTimer->logicClock, 0);
226,775✔
993
  return 0;
226,780✔
994
}
995

996
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
2,341✔
997
  int32_t code = 0;
2,341✔
998
  int64_t tsNow = taosGetTimestampMs();
2,341✔
999
  if (syncIsInit()) {
2,341!
1000
    SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid);
2,341✔
1001
    if (pData == NULL) {
2,341!
1002
      pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
2,341!
1003
      pData->rid = syncHbTimerDataAdd(pData);
2,341✔
1004
    }
1005
    pSyncTimer->hbDataRid = pData->rid;
2,341✔
1006
    pSyncTimer->timeStamp = tsNow;
2,341✔
1007

1008
    pData->syncNodeRid = pSyncNode->rid;
2,341✔
1009
    pData->pTimer = pSyncTimer;
2,341✔
1010
    pData->destId = pSyncTimer->destId;
2,341✔
1011
    pData->logicClock = pSyncTimer->logicClock;
2,341✔
1012
    pData->execTime = tsNow + pSyncTimer->timerMS;
2,341✔
1013

1014
    sTrace("vgId:%d, start hb timer, rid:%" PRId64 " addr:%" PRId64 " at %d", pSyncNode->vgId, pData->rid,
2,341!
1015
           pData->destId.addr, pSyncTimer->timerMS);
1016

1017
    bool stopped = taosTmrResetPriority(pSyncTimer->timerCb, pSyncTimer->timerMS, (void*)(pData->rid),
2,341✔
1018
                                        syncEnv()->pTimerManager, &pSyncTimer->pTimer, 2);
2,341✔
1019
    if (stopped) {
2,341!
1020
      sError("vgId:%d, failed to reset hb timer success", pSyncNode->vgId);
×
1021
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1022
    }
1023
  } else {
1024
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1025
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
×
1026
  }
1027
  return code;
2,341✔
1028
}
1029

1030
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
23,948✔
1031
  int32_t ret = 0;
23,948✔
1032
  (void)atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
23,948✔
1033
  bool stop = taosTmrStop(pSyncTimer->pTimer);
23,952✔
1034
  sDebug("vgId:%d, stop hb timer stop:%d", pSyncNode->vgId, stop);
23,952✔
1035
  pSyncTimer->pTimer = NULL;
23,952✔
1036
  syncHbTimerDataRemove(pSyncTimer->hbDataRid);
23,952✔
1037
  pSyncTimer->hbDataRid = -1;
23,952✔
1038
  return ret;
23,952✔
1039
}
1040

1041
int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
13,655✔
1042
  int32_t code = 0;
13,655✔
1043
  if (pNode->pLogStore == NULL) {
13,655!
1044
    sError("vgId:%d, log store not created", pNode->vgId);
×
1045
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1046
  }
1047
  if (pNode->pFsm == NULL) {
13,655!
1048
    sError("vgId:%d, pFsm not registered", pNode->vgId);
×
1049
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1050
  }
1051
  if (pNode->pFsm->FpGetSnapshotInfo == NULL) {
13,655!
1052
    sError("vgId:%d, FpGetSnapshotInfo not registered", pNode->vgId);
×
1053
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1054
  }
1055
  SSnapshot snapshot = {0};
13,655✔
1056
  // TODO check return value
1057
  (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
13,655✔
1058

1059
  SyncIndex commitIndex = snapshot.lastApplyIndex;
13,655✔
1060
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
13,655✔
1061
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
13,655✔
1062
  if ((lastVer < commitIndex || firstVer > commitIndex + 1) || pNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
13,654!
1063
    if ((code = pNode->pLogStore->syncLogRestoreFromSnapshot(pNode->pLogStore, commitIndex)) != 0) {
×
1064
      sError("vgId:%d, failed to restore log store from snapshot since %s. lastVer:%" PRId64 ", snapshotVer:%" PRId64,
×
1065
             pNode->vgId, terrstr(), lastVer, commitIndex);
1066
      TAOS_RETURN(code);
×
1067
    }
1068
  }
1069
  TAOS_RETURN(code);
13,655✔
1070
}
1071

1072
// open/close --------------
1073
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
13,649✔
1074
  int32_t    code = 0;
13,649✔
1075
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
13,649!
1076
  if (pSyncNode == NULL) {
13,654!
1077
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1078
    goto _error;
×
1079
  }
1080

1081
  if (!taosDirExist((char*)(pSyncInfo->path))) {
13,654✔
1082
    if (taosMkDir(pSyncInfo->path) != 0) {
11,041!
1083
      terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1084
      sError("vgId:%d, failed to create dir:%s since %s", pSyncInfo->vgId, pSyncInfo->path, terrstr());
×
1085
      goto _error;
×
1086
    }
1087
  }
1088

1089
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
13,655✔
1090
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
13,655✔
1091
           TD_DIRSEP);
1092
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
13,655✔
1093

1094
  if (!taosCheckExistFile(pSyncNode->configPath)) {
13,655✔
1095
    // create a new raft config file
1096
    sInfo("vgId:%d, create a new raft config file", pSyncInfo->vgId);
11,041✔
1097
    pSyncNode->vgId = pSyncInfo->vgId;
11,041✔
1098
    pSyncNode->raftCfg.isStandBy = pSyncInfo->isStandBy;
11,041✔
1099
    pSyncNode->raftCfg.snapshotStrategy = pSyncInfo->snapshotStrategy;
11,041✔
1100
    pSyncNode->raftCfg.lastConfigIndex = pSyncInfo->syncCfg.lastIndex;
11,041✔
1101
    pSyncNode->raftCfg.batchSize = pSyncInfo->batchSize;
11,041✔
1102
    pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
11,041✔
1103
    pSyncNode->raftCfg.configIndexCount = 1;
11,041✔
1104
    pSyncNode->raftCfg.configIndexArr[0] = -1;
11,041✔
1105

1106
    if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
11,041!
1107
      terrno = code;
×
1108
      sError("vgId:%d, failed to create sync cfg file", pSyncNode->vgId);
×
1109
      goto _error;
×
1110
    }
1111
  } else {
1112
    // update syncCfg by raft_config.json
1113
    if ((code = syncReadCfgFile(pSyncNode)) != 0) {
2,614!
1114
      terrno = code;
×
1115
      sError("vgId:%d, failed to read sync cfg file", pSyncNode->vgId);
×
1116
      goto _error;
×
1117
    }
1118

1119
    if (vnodeVersion > pSyncNode->raftCfg.cfg.changeVersion) {
2,614✔
1120
      if (pSyncInfo->syncCfg.totalReplicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) {
1,646!
1121
        sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
1,355!
1122
        pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
1,355✔
1123
        if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
1,355!
1124
          terrno = code;
×
1125
          sError("vgId:%d, failed to write sync cfg file", pSyncNode->vgId);
×
1126
          goto _error;
×
1127
        }
1128
      } else {
1129
        sInfo("vgId:%d, use sync config from sync cfg file", pSyncNode->vgId);
291!
1130
        pSyncInfo->syncCfg = pSyncNode->raftCfg.cfg;
291✔
1131
      }
1132
    } else {
1133
      sInfo("vgId:%d, skip save sync cfg file since request ver:%d <= file ver:%d", pSyncNode->vgId, vnodeVersion,
968!
1134
            pSyncInfo->syncCfg.changeVersion);
1135
    }
1136
  }
1137

1138
  // init by SSyncInfo
1139
  pSyncNode->vgId = pSyncInfo->vgId;
13,655✔
1140
  SSyncCfg* pCfg = &pSyncNode->raftCfg.cfg;
13,655✔
1141
  bool      updated = false;
13,655✔
1142
  sInfo("vgId:%d, start to open sync node, totalReplicaNum:%d replicaNum:%d selfIndex:%d", pSyncNode->vgId,
13,655✔
1143
        pCfg->totalReplicaNum, pCfg->replicaNum, pCfg->myIndex);
1144
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
33,835✔
1145
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
20,180✔
1146
    if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) {
20,180!
1147
      updated = true;
×
1148
    }
1149
    sInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort,
20,180✔
1150
          pNode->nodeId, pNode->clusterId);
1151
  }
1152

1153
  if (vnodeVersion > pSyncInfo->syncCfg.changeVersion) {
13,655✔
1154
    if (updated) {
1,748!
1155
      sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId);
×
1156
      if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
×
1157
        terrno = code;
×
1158
        sError("vgId:%d, failed to write sync cfg file on dnode info updated", pSyncNode->vgId);
×
1159
        goto _error;
×
1160
      }
1161
    }
1162
  }
1163

1164
  pSyncNode->pWal = pSyncInfo->pWal;
13,655✔
1165
  pSyncNode->msgcb = pSyncInfo->msgcb;
13,655✔
1166
  pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
13,655✔
1167
  pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
13,655✔
1168
  pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
13,655✔
1169

1170
  // create raft log ring buffer
1171
  code = syncLogBufferCreate(&pSyncNode->pLogBuf);
13,655✔
1172
  if (pSyncNode->pLogBuf == NULL) {
13,653✔
1173
    sError("failed to init sync log buffer since %s. vgId:%d", tstrerror(code), pSyncNode->vgId);
1!
1174
    goto _error;
×
1175
  }
1176

1177
  // init replicaNum, replicasId
1178
  pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
13,652✔
1179
  pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
13,652✔
1180
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
33,832✔
1181
    if (syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]) ==
20,176!
1182
        false) {
1183
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
×
1184
      goto _error;
×
1185
    }
1186
  }
1187

1188
  // init internal
1189
  pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
13,656✔
1190
  pSyncNode->myRaftId = pSyncNode->replicasId[pSyncNode->raftCfg.cfg.myIndex];
13,656✔
1191

1192
  // init peersNum, peers, peersId
1193
  pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
13,656✔
1194
  int32_t j = 0;
13,656✔
1195
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
33,836✔
1196
    if (i != pSyncNode->raftCfg.cfg.myIndex) {
20,180✔
1197
      pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
6,525✔
1198
      pSyncNode->peersId[j] = pSyncNode->replicasId[i];
6,525✔
1199
      syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
6,525✔
1200
      j++;
6,525✔
1201
    }
1202
  }
1203

1204
  pSyncNode->arbTerm = -1;
13,656✔
1205
  (void)taosThreadMutexInit(&pSyncNode->arbTokenMutex, NULL);
13,656✔
1206
  syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncInfo->vgId, pSyncNode->arbToken);
13,655✔
1207
  sInfo("vgId:%d, generate arb token:%s", pSyncNode->vgId, pSyncNode->arbToken);
13,652✔
1208

1209
  // init raft algorithm
1210
  pSyncNode->pFsm = pSyncInfo->pFsm;
13,653✔
1211
  pSyncInfo->pFsm = NULL;
13,653✔
1212
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
13,653✔
1213
  pSyncNode->leaderCache = EMPTY_RAFT_ID;
13,655✔
1214

1215
  // init life cycle outside
1216

1217
  // TLA+ Spec
1218
  // InitHistoryVars == /\ elections = {}
1219
  //                    /\ allLogs   = {}
1220
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
1221
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
1222
  //                   /\ state       = [i \in Server |-> Follower]
1223
  //                   /\ votedFor    = [i \in Server |-> Nil]
1224
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
1225
  //                      /\ votesGranted   = [i \in Server |-> {}]
1226
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
1227
  // \* leader does not send itself messages. It's still easier to include these
1228
  // \* in the functions.
1229
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
1230
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
1231
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
1232
  //                /\ commitIndex  = [i \in Server |-> 0]
1233
  // Init == /\ messages = [m \in {} |-> 0]
1234
  //         /\ InitHistoryVars
1235
  //         /\ InitServerVars
1236
  //         /\ InitCandidateVars
1237
  //         /\ InitLeaderVars
1238
  //         /\ InitLogVars
1239
  //
1240

1241
  // init TLA+ server vars
1242
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
13,655✔
1243
  pSyncNode->roleTimeMs = taosGetTimestampMs();
13,655✔
1244
  if ((code = raftStoreOpen(pSyncNode)) != 0) {
13,655!
1245
    terrno = code;
×
1246
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
×
1247
    goto _error;
×
1248
  }
1249

1250
  // init TLA+ candidate vars
1251
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
13,655✔
1252
  if (pSyncNode->pVotesGranted == NULL) {
13,654!
1253
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
×
1254
    goto _error;
×
1255
  }
1256
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
13,654✔
1257
  if (pSyncNode->pVotesRespond == NULL) {
13,654!
1258
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
×
1259
    goto _error;
×
1260
  }
1261

1262
  // init TLA+ leader vars
1263
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
13,654✔
1264
  if (pSyncNode->pNextIndex == NULL) {
13,654!
1265
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
×
1266
    goto _error;
×
1267
  }
1268
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
13,654✔
1269
  if (pSyncNode->pMatchIndex == NULL) {
13,654!
1270
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
×
1271
    goto _error;
×
1272
  }
1273

1274
  // init TLA+ log vars
1275
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
13,654✔
1276
  if (pSyncNode->pLogStore == NULL) {
13,654!
1277
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
×
1278
    goto _error;
×
1279
  }
1280

1281
  SyncIndex commitIndex = SYNC_INDEX_INVALID;
13,654✔
1282
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
13,654!
1283
    SSnapshot snapshot = {0};
13,654✔
1284
    // TODO check return value
1285
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
13,654✔
1286
    if (snapshot.lastApplyIndex > commitIndex) {
13,655✔
1287
      commitIndex = snapshot.lastApplyIndex;
1,291✔
1288
      sNTrace(pSyncNode, "reset commit index by snapshot");
1,291!
1289
    }
1290
    pSyncNode->fsmState = snapshot.state;
13,655✔
1291
    if (pSyncNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
13,655!
1292
      sError("vgId:%d, fsm state is incomplete.", pSyncNode->vgId);
×
1293
      if (pSyncNode->replicaNum == 1) {
×
1294
        terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1295
        goto _error;
×
1296
      }
1297
    }
1298
  }
1299
  pSyncNode->commitIndex = commitIndex;
13,655✔
1300
  sInfo("vgId:%d, sync node commitIndex initialized as %" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
13,655✔
1301

1302
  // restore log store on need
1303
  if ((code = syncNodeLogStoreRestoreOnNeed(pSyncNode)) < 0) {
13,655!
1304
    terrno = code;
×
1305
    sError("vgId:%d, failed to restore log store since %s.", pSyncNode->vgId, terrstr());
×
1306
    goto _error;
×
1307
  }
1308

1309
  // timer ms init
1310
  pSyncNode->pingBaseLine = PING_TIMER_MS;
13,655✔
1311
  pSyncNode->electBaseLine = tsElectInterval;
13,655✔
1312
  pSyncNode->hbBaseLine = tsHeartbeatInterval;
13,655✔
1313

1314
  // init ping timer
1315
  pSyncNode->pPingTimer = NULL;
13,655✔
1316
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
13,655✔
1317
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
13,655✔
1318
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
13,654✔
1319
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
13,655✔
1320
  pSyncNode->pingTimerCounter = 0;
13,655✔
1321

1322
  // init elect timer
1323
  pSyncNode->pElectTimer = NULL;
13,655✔
1324
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
13,655✔
1325
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
13,655✔
1326
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
13,655✔
1327
  pSyncNode->electTimerCounter = 0;
13,655✔
1328

1329
  // init heartbeat timer
1330
  pSyncNode->pHeartbeatTimer = NULL;
13,655✔
1331
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
13,655✔
1332
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
13,655✔
1333
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
13,655✔
1334
#ifdef BUILD_NO_CALL
1335
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
1336
#endif
1337
  pSyncNode->heartbeatTimerCounter = 0;
13,654✔
1338

1339
  // init peer heartbeat timer
1340
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
218,474✔
1341
    if ((code = syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i])) != 0) {
204,819!
1342
      terrno = code;
×
1343
      goto _error;
×
1344
    }
1345
  }
1346

1347
  // tools
1348
  if ((code = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS, &pSyncNode->pSyncRespMgr)) != 0) {
13,655!
UNCOV
1349
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
×
1350
    goto _error;
×
1351
  }
1352
  if (pSyncNode->pSyncRespMgr == NULL) {
13,654!
1353
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
×
1354
    goto _error;
×
1355
  }
1356

1357
  // restore state
1358
  pSyncNode->restoreFinish = false;
13,654✔
1359

1360
  // snapshot senders
1361
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
218,379✔
1362
    SSyncSnapshotSender* pSender = NULL;
204,718✔
1363
    code = snapshotSenderCreate(pSyncNode, i, &pSender);
204,718✔
1364
    if (pSender == NULL) return NULL;
204,722!
1365

1366
    pSyncNode->senders[i] = pSender;
204,722✔
1367
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
204,722✔
1368
  }
1369

1370
  // snapshot receivers
1371
  code = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID, &pSyncNode->pNewNodeReceiver);
13,661✔
1372
  if (pSyncNode->pNewNodeReceiver == NULL) return NULL;
13,655!
1373
  sRDebug(pSyncNode->pNewNodeReceiver, "snapshot receiver create while open sync node, data:%p",
13,655✔
1374
          pSyncNode->pNewNodeReceiver);
1375

1376
  // is config changing
1377
  pSyncNode->changing = false;
13,655✔
1378

1379
  // replication mgr
1380
  if ((code = syncNodeLogReplInit(pSyncNode)) < 0) {
13,655!
1381
    terrno = code;
×
1382
    sError("vgId:%d, failed to init repl mgr since %s.", pSyncNode->vgId, terrstr());
×
1383
    goto _error;
×
1384
  }
1385

1386
  // peer state
1387
  if ((code = syncNodePeerStateInit(pSyncNode)) < 0) {
13,655!
1388
    terrno = code;
×
1389
    sError("vgId:%d, failed to init peer stat since %s.", pSyncNode->vgId, terrstr());
×
1390
    goto _error;
×
1391
  }
1392

1393
  //
1394
  // min match index
1395
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
13,655✔
1396

1397
  // start in syncNodeStart
1398
  // start raft
1399

1400
  int64_t timeNow = taosGetTimestampMs();
13,654✔
1401
  pSyncNode->startTime = timeNow;
13,654✔
1402
  pSyncNode->lastReplicateTime = timeNow;
13,654✔
1403

1404
  // snapshotting
1405
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
13,654✔
1406

1407
  // init log buffer
1408
  if ((code = syncLogBufferInit(pSyncNode->pLogBuf, pSyncNode)) < 0) {
13,653!
1409
    terrno = code;
×
1410
    sError("vgId:%d, failed to init sync log buffer since %s", pSyncNode->vgId, terrstr());
×
1411
    goto _error;
×
1412
  }
1413

1414
  pSyncNode->isStart = true;
13,655✔
1415
  pSyncNode->electNum = 0;
13,655✔
1416
  pSyncNode->becomeLeaderNum = 0;
13,655✔
1417
  pSyncNode->becomeAssignedLeaderNum = 0;
13,655✔
1418
  pSyncNode->configChangeNum = 0;
13,655✔
1419
  pSyncNode->hbSlowNum = 0;
13,655✔
1420
  pSyncNode->hbrSlowNum = 0;
13,655✔
1421
  pSyncNode->tmrRoutineNum = 0;
13,655✔
1422

1423
  sNInfo(pSyncNode, "sync node opened, node:%p electInterval:%d heartbeatInterval:%d heartbeatTimeout:%d", pSyncNode,
13,655✔
1424
         tsElectInterval, tsHeartbeatInterval, tsHeartbeatTimeout);
1425
  return pSyncNode;
13,655✔
1426

1427
_error:
×
1428
  if (pSyncInfo->pFsm) {
×
1429
    taosMemoryFree(pSyncInfo->pFsm);
×
1430
    pSyncInfo->pFsm = NULL;
×
1431
  }
1432
  syncNodeClose(pSyncNode);
×
1433
  pSyncNode = NULL;
×
1434
  return NULL;
×
1435
}
1436

1437
#ifdef BUILD_NO_CALL
1438
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
1439
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
1440
    SSnapshot snapshot = {0};
1441
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
1442
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
1443
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
1444
    }
1445
  }
1446
}
1447
#endif
1448

1449
int32_t syncNodeRestore(SSyncNode* pSyncNode) {
13,653✔
1450
  int32_t code = 0;
13,653✔
1451
  if (pSyncNode->pLogStore == NULL) {
13,653!
1452
    sError("vgId:%d, log store not created", pSyncNode->vgId);
×
1453
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1454
  }
1455
  if (pSyncNode->pLogBuf == NULL) {
13,653!
1456
    sError("vgId:%d, ring log buffer not created", pSyncNode->vgId);
×
1457
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1458
  }
1459

1460
  (void)taosThreadMutexLock(&pSyncNode->pLogBuf->mutex);
13,653✔
1461
  SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
13,654✔
1462
  SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore);
13,654✔
1463
  SyncIndex endIndex = pSyncNode->pLogBuf->endIndex;
13,654✔
1464
  (void)taosThreadMutexUnlock(&pSyncNode->pLogBuf->mutex);
13,654✔
1465

1466
  if (lastVer != -1 && endIndex != lastVer + 1) {
13,654!
1467
    code = TSDB_CODE_WAL_LOG_INCOMPLETE;
×
1468
    sWarn("vgId:%d, failed to restore sync node since %s. expected lastLogIndex:%" PRId64 ", lastVer:%" PRId64 "",
×
1469
          pSyncNode->vgId, terrstr(), endIndex - 1, lastVer);
1470
    // TAOS_RETURN(code);
1471
  }
1472

1473
  // if (endIndex != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
1474
  pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);
13,654✔
1475
  sInfo("vgId:%d, restore began, and keep syncing until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
13,654✔
1476

1477
  if (pSyncNode->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
27,308!
1478
      (code = syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex)) < 0) {
13,654✔
1479
    TAOS_RETURN(code);
×
1480
  }
1481

1482
  TAOS_RETURN(code);
13,654✔
1483
}
1484

1485
int32_t syncNodeStart(SSyncNode* pSyncNode) {
13,653✔
1486
  // start raft
1487
  sInfo("vgId:%d, begin to start sync node", pSyncNode->vgId);
13,653✔
1488
  if (pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
13,654✔
1489
    syncNodeBecomeLearner(pSyncNode, "first start");
258✔
1490
  } else {
1491
    if (pSyncNode->replicaNum == 1) {
13,396✔
1492
      raftStoreNextTerm(pSyncNode);
10,545✔
1493
      syncNodeBecomeLeader(pSyncNode, "one replica start");
10,545✔
1494

1495
      // Raft 3.6.2 Committing entries from previous terms
1496
      TAOS_CHECK_RETURN(syncNodeAppendNoop(pSyncNode));
10,545!
1497
    } else {
1498
      SRaftId id = {0};
2,851✔
1499
      syncNodeBecomeFollower(pSyncNode, id, "first start");
2,851✔
1500
    }
1501
  }
1502

1503
  int32_t ret = 0;
13,654✔
1504
  ret = syncNodeStartPingTimer(pSyncNode);
13,654✔
1505
  if (ret != 0) {
13,654!
1506
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, tstrerror(ret));
×
1507
  }
1508
  sInfo("vgId:%d, sync node started", pSyncNode->vgId);
13,654✔
1509
  return ret;
13,654✔
1510
}
1511

1512
#ifdef BUILD_NO_CALL
1513
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
1514
  // state change
1515
  int32_t code = 0;
1516
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
1517
  pSyncNode->roleTimeMs = taosGetTimestampMs();
1518
  // TODO check return value
1519
  TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1520

1521
  // reset elect timer, long enough
1522
  int32_t electMS = TIMER_MAX_MS;
1523
  code = syncNodeRestartElectTimer(pSyncNode, electMS);
1524
  if (code < 0) {
1525
    sError("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, terrstr());
1526
    return -1;
1527
  }
1528

1529
  code = syncNodeStartPingTimer(pSyncNode);
1530
  if (code < 0) {
1531
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr());
1532
    return -1;
1533
  }
1534
  return code;
1535
}
1536
#endif
1537

1538
void syncNodePreClose(SSyncNode* pSyncNode) {
13,654✔
1539
  int32_t code = 0;
13,654✔
1540
  if (pSyncNode == NULL) {
13,654!
1541
    sError("failed to pre close sync node since sync node is null");
×
1542
    return;
×
1543
  }
1544
  if (pSyncNode->pFsm == NULL) {
13,654!
1545
    sError("failed to pre close sync node since fsm is null");
×
1546
    return;
×
1547
  }
1548
  if (pSyncNode->pFsm->FpApplyQueueItems == NULL) {
13,654!
1549
    sError("failed to pre close sync node since FpApplyQueueItems is null");
×
1550
    return;
×
1551
  }
1552

1553
  // stop elect timer
1554
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
13,654!
1555
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1556
    return;
×
1557
  }
1558

1559
  // stop heartbeat timer
1560
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
13,653!
1561
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
1562
    return;
×
1563
  }
1564

1565
  // stop ping timer
1566
  if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) {
13,653!
1567
    sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code));
×
1568
    return;
×
1569
  }
1570

1571
  // clean rsp
1572
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
13,654✔
1573
}
1574

1575
void syncNodePostClose(SSyncNode* pSyncNode) {
11,907✔
1576
  if (pSyncNode->pNewNodeReceiver != NULL) {
11,907!
1577
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
11,907!
1578
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
1579
    }
1580

1581
    sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
11,907✔
1582
           pSyncNode->pNewNodeReceiver);
1583
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
11,907✔
1584
    pSyncNode->pNewNodeReceiver = NULL;
11,905✔
1585
  }
1586
}
11,905✔
1587

1588
void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }
2,337!
1589

1590
void syncNodeClose(SSyncNode* pSyncNode) {
13,654✔
1591
  int32_t code = 0;
13,654✔
1592
  if (pSyncNode == NULL) return;
13,654!
1593
  sNInfo(pSyncNode, "sync close, node:%p", pSyncNode);
13,654✔
1594

1595
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
13,654✔
1596

1597
  if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) {
13,654!
1598
    sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code));
×
1599
    return;
×
1600
  }
1601
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
13,654!
1602
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1603
    return;
×
1604
  }
1605
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
13,654!
1606
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
1607
    return;
×
1608
  }
1609
  syncNodeLogReplDestroy(pSyncNode);
13,654✔
1610

1611
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
13,654✔
1612
  pSyncNode->pSyncRespMgr = NULL;
13,650✔
1613
  voteGrantedDestroy(pSyncNode->pVotesGranted);
13,650✔
1614
  pSyncNode->pVotesGranted = NULL;
13,653✔
1615
  votesRespondDestory(pSyncNode->pVotesRespond);
13,653✔
1616
  pSyncNode->pVotesRespond = NULL;
13,649✔
1617
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
13,649✔
1618
  pSyncNode->pNextIndex = NULL;
13,650✔
1619
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
13,650✔
1620
  pSyncNode->pMatchIndex = NULL;
13,650✔
1621
  logStoreDestory(pSyncNode->pLogStore);
13,650✔
1622
  pSyncNode->pLogStore = NULL;
13,652✔
1623
  syncLogBufferDestroy(pSyncNode->pLogBuf);
13,652✔
1624
  pSyncNode->pLogBuf = NULL;
13,654✔
1625

1626
  (void)taosThreadMutexDestroy(&pSyncNode->arbTokenMutex);
13,654✔
1627

1628
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
218,415✔
1629
    if (pSyncNode->senders[i] != NULL) {
204,761✔
1630
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", pSyncNode->vgId, pSyncNode->senders[i]);
204,758✔
1631

1632
      if (snapshotSenderIsStart(pSyncNode->senders[i])) {
204,758!
1633
        snapshotSenderStop(pSyncNode->senders[i], false);
×
1634
      }
1635

1636
      snapshotSenderDestroy(pSyncNode->senders[i]);
204,754✔
1637
      pSyncNode->senders[i] = NULL;
204,770✔
1638
    }
1639
  }
1640

1641
  if (pSyncNode->pNewNodeReceiver != NULL) {
13,654✔
1642
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
1,747!
1643
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
1644
    }
1645

1646
    sDebug("vgId:%d, snapshot receiver destroy while close, data:%p", pSyncNode->vgId, pSyncNode->pNewNodeReceiver);
1,747✔
1647
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
1,747✔
1648
    pSyncNode->pNewNodeReceiver = NULL;
1,747✔
1649
  }
1650

1651
  if (pSyncNode->pFsm != NULL) {
13,654!
1652
    taosMemoryFree(pSyncNode->pFsm);
13,654!
1653
  }
1654

1655
  raftStoreClose(pSyncNode);
13,654✔
1656

1657
  taosMemoryFree(pSyncNode);
13,654!
1658
}
1659

1660
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->raftCfg.snapshotStrategy; }
×
1661

1662
// timer control --------------
1663
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
13,654✔
1664
  int32_t code = 0;
13,654✔
1665
  if (syncIsInit()) {
13,654!
1666
    bool stopped = taosTmrResetPriority(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, (void*)pSyncNode->rid,
13,654✔
1667
                                        syncEnv()->pTimerManager, &pSyncNode->pPingTimer, 2);
13,654✔
1668
    if (stopped) {
13,654!
1669
      sError("vgId:%d, failed to reset ping timer, ms:%d", pSyncNode->vgId, pSyncNode->pingTimerMS);
×
1670
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1671
    }
1672
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
13,654✔
1673
  } else {
1674
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
×
1675
  }
1676
  return code;
13,654✔
1677
}
1678

1679
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
27,307✔
1680
  int32_t code = 0;
27,307✔
1681
  (void)atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
27,307✔
1682
  bool stop = taosTmrStop(pSyncNode->pPingTimer);
27,308✔
1683
  sDebug("vgId:%d, stop ping timer, stop:%d", pSyncNode->vgId, stop);
27,308✔
1684
  pSyncNode->pPingTimer = NULL;
27,308✔
1685
  return code;
27,308✔
1686
}
1687

1688
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
478,163✔
1689
  int32_t code = 0;
478,163✔
1690
  if (syncIsInit()) {
478,163!
1691
    pSyncNode->electTimerMS = ms;
478,161✔
1692

1693
    int64_t execTime = taosGetTimestampMs() + ms;
478,161✔
1694
    atomic_store_64(&(pSyncNode->electTimerParam.executeTime), execTime);
478,161✔
1695
    atomic_store_64(&(pSyncNode->electTimerParam.logicClock), pSyncNode->electTimerLogicClock);
478,160✔
1696
    pSyncNode->electTimerParam.pSyncNode = pSyncNode;
478,160✔
1697
    pSyncNode->electTimerParam.pData = NULL;
478,160✔
1698

1699
    bool stopped = taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid),
478,161✔
1700
                                syncEnv()->pTimerManager, &pSyncNode->pElectTimer);
478,160✔
1701
    if (stopped) sError("vgId:%d, failed to reset elect timer, ms:%d", pSyncNode->vgId, ms);
478,163!
1702
  } else {
1703
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
×
1704
  }
1705
  return code;
478,163✔
1706
}
1707

1708
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
516,974✔
1709
  int32_t code = 0;
516,974✔
1710
  (void)atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
516,974✔
1711
  bool stop = taosTmrStop(pSyncNode->pElectTimer);
516,976✔
1712
  sDebug("vgId:%d, stop elect timer, stop:%d", pSyncNode->vgId, stop);
516,975✔
1713
  pSyncNode->pElectTimer = NULL;
516,975✔
1714

1715
  return code;
516,975✔
1716
}
1717

1718
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
478,163✔
1719
  int32_t ret = 0;
478,163✔
1720
  TAOS_CHECK_RETURN(syncNodeStopElectTimer(pSyncNode));
478,163!
1721
  TAOS_CHECK_RETURN(syncNodeStartElectTimer(pSyncNode, ms));
478,163!
1722
  return ret;
478,163✔
1723
}
1724

1725
void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
478,155✔
1726
  int32_t code = 0;
478,155✔
1727
  int32_t electMS;
1728

1729
  if (pSyncNode->raftCfg.isStandBy) {
478,155!
1730
    electMS = TIMER_MAX_MS;
×
1731
  } else {
1732
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
478,155✔
1733
  }
1734

1735
  if ((code = syncNodeRestartElectTimer(pSyncNode, electMS)) != 0) {
478,163!
1736
    sWarn("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1737
    return;
×
1738
  };
1739

1740
  sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
478,163!
1741
          electMS);
1742
}
1743

1744
#ifdef BUILD_NO_CALL
1745
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
1746
  int32_t code = 0;
1747
  if (syncIsInit()) {
1748
    TAOS_CHECK_RETURN(taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, (void*)pSyncNode->rid,
1749
                                   syncEnv()->pTimerManager, &pSyncNode->pHeartbeatTimer));
1750
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
1751
  } else {
1752
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1753
  }
1754

1755
  sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
1756
  return code;
1757
}
1758
#endif
1759

1760
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
12,968✔
1761
  int32_t ret = 0;
12,968✔
1762

1763
#if 0
1764
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1765
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
1766
#endif
1767

1768
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
15,309✔
1769
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
2,341✔
1770
    if (pSyncTimer != NULL) {
2,341!
1771
      TAOS_CHECK_RETURN(syncHbTimerStart(pSyncNode, pSyncTimer));
2,341!
1772
    }
1773
  }
1774

1775
  return ret;
12,968✔
1776
}
1777

1778
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
33,691✔
1779
  int32_t code = 0;
33,691✔
1780

1781
#if 0
1782
  TAOS_CHECK_RETURN(atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1));
1783
  bool stop = taosTmrStop(pSyncNode->pHeartbeatTimer);
1784
  sDebug("vgId:%d, stop heartbeat timer, stop:%d", pSyncNode->vgId, stop);
1785
  pSyncNode->pHeartbeatTimer = NULL;
1786
#endif
1787

1788
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
57,643✔
1789
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
23,950✔
1790
    if (pSyncTimer != NULL) {
23,950✔
1791
      TAOS_CHECK_RETURN(syncHbTimerStop(pSyncNode, pSyncTimer));
23,949✔
1792
    }
1793
  }
1794

1795
  return code;
33,693✔
1796
}
1797

1798
#ifdef BUILD_NO_CALL
1799
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
1800
  // TODO check return value
1801
  int32_t code = 0;
1802
  TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1803
  TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
1804
  return 0;
1805
}
1806
#endif
1807

1808
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pNode, SRpcMsg* pMsg) {
1,909,470✔
1809
  SEpSet* epSet = NULL;
1,909,470✔
1810
  for (int32_t i = 0; i < pNode->peersNum; ++i) {
2,317,466✔
1811
    if (destRaftId->addr == pNode->peersId[i].addr) {
2,317,421✔
1812
      epSet = &pNode->peersEpset[i];
1,909,425✔
1813
      break;
1,909,425✔
1814
    }
1815
  }
1816

1817
  int32_t code = -1;
1,909,470✔
1818
  if (pNode->syncSendMSg != NULL && epSet != NULL) {
1,909,470!
1819
    syncUtilMsgHtoN(pMsg->pCont);
1,909,425✔
1820
    pMsg->info.noResp = 1;
1,909,390✔
1821
    code = pNode->syncSendMSg(epSet, pMsg);
1,909,390✔
1822
  }
1823

1824
  if (code < 0) {
1,909,515✔
1825
    sError("vgId:%d, failed to send sync msg since %s. epset:%p dnode:%d addr:%" PRId64, pNode->vgId, tstrerror(code),
82!
1826
           epSet, DID(destRaftId), destRaftId->addr);
1827
    rpcFreeCont(pMsg->pCont);
82✔
1828
  }
1829

1830
  TAOS_RETURN(code);
1,909,515✔
1831
}
1832

1833
inline bool syncNodeInConfig(SSyncNode* pNode, const SSyncCfg* pCfg) {
2,268✔
1834
  bool b1 = false;
2,268✔
1835
  bool b2 = false;
2,268✔
1836

1837
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
2,746!
1838
    if (strcmp(pCfg->nodeInfo[i].nodeFqdn, pNode->myNodeInfo.nodeFqdn) == 0 &&
2,746!
1839
        pCfg->nodeInfo[i].nodePort == pNode->myNodeInfo.nodePort) {
2,746✔
1840
      b1 = true;
2,268✔
1841
      break;
2,268✔
1842
    }
1843
  }
1844

1845
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
2,746!
1846
    SRaftId raftId = {
2,746✔
1847
        .addr = SYNC_ADDR(&pCfg->nodeInfo[i]),
2,746✔
1848
        .vgId = pNode->vgId,
2,746✔
1849
    };
1850

1851
    if (syncUtilSameId(&raftId, &pNode->myRaftId)) {
2,746✔
1852
      b2 = true;
2,268✔
1853
      break;
2,268✔
1854
    }
1855
  }
1856

1857
  if (b1 != b2) {
2,268!
1858
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1859
    return false;
×
1860
  }
1861
  return b1;
2,268✔
1862
}
1863

1864
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
3,017✔
1865
  if (pOldCfg->totalReplicaNum != pNewCfg->totalReplicaNum) return true;
3,017✔
1866
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
2,059✔
1867
  for (int32_t i = 0; i < pOldCfg->totalReplicaNum; ++i) {
4,952✔
1868
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
3,593✔
1869
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
3,593✔
1870
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
3,593!
1871
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
3,593✔
1872
    if (pOldInfo->nodeRole != pNewInfo->nodeRole) return true;
3,591✔
1873
  }
1874

1875
  return false;
1,359✔
1876
}
1877

1878
int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
1,662✔
1879
  int32_t  code = 0;
1,662✔
1880
  SSyncCfg oldConfig = pSyncNode->raftCfg.cfg;
1,662✔
1881
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
1,662✔
1882
    sInfo("vgId:1, sync not reconfig since not changed");
1,359✔
1883
    return 0;
1,359✔
1884
  }
1885

1886
  pSyncNode->raftCfg.cfg = *pNewConfig;
303✔
1887
  pSyncNode->raftCfg.lastConfigIndex = lastConfigChangeIndex;
303✔
1888

1889
  pSyncNode->configChangeNum++;
303✔
1890

1891
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
303✔
1892
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
303✔
1893

1894
  bool isDrop = false;
303✔
1895
  bool isAdd = false;
303✔
1896

1897
  if (IamInOld && !IamInNew) {
303!
1898
    isDrop = true;
×
1899
  } else {
1900
    isDrop = false;
303✔
1901
  }
1902

1903
  if (!IamInOld && IamInNew) {
303!
1904
    isAdd = true;
×
1905
  } else {
1906
    isAdd = false;
303✔
1907
  }
1908

1909
  // log begin config change
1910
  sNInfo(pSyncNode, "begin do config change, from %d to %d, from %" PRId64 " to %" PRId64 ", replicas:%d",
303!
1911
         pSyncNode->vgId, oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum, oldConfig.lastIndex,
1912
         pNewConfig->lastIndex);
1913

1914
  if (IamInNew) {
303!
1915
    pSyncNode->raftCfg.isStandBy = 0;  // change isStandBy to normal
303✔
1916
  }
1917
  if (isDrop) {
303!
1918
    pSyncNode->raftCfg.isStandBy = 1;  // set standby
×
1919
  }
1920

1921
  // add last config index
1922
  SRaftCfg* pCfg = &pSyncNode->raftCfg;
303✔
1923
  if (pCfg->configIndexCount >= MAX_CONFIG_INDEX_COUNT) {
303!
1924
    sNError(pSyncNode, "failed to add cfg index:%d since out of range", pCfg->configIndexCount);
×
1925
    terrno = TSDB_CODE_OUT_OF_RANGE;
×
1926
    return -1;
×
1927
  }
1928

1929
  pCfg->configIndexArr[pCfg->configIndexCount] = lastConfigChangeIndex;
303✔
1930
  pCfg->configIndexCount++;
303✔
1931

1932
  if (IamInNew) {
303!
1933
    //-----------------------------------------
1934
    int32_t ret = 0;
303✔
1935

1936
    // save snapshot senders
1937
    SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
1938
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
303✔
1939
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
1940
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
4,848✔
1941
      oldSenders[i] = pSyncNode->senders[i];
4,545✔
1942
      sSTrace(oldSenders[i], "snapshot sender save old");
4,545!
1943
    }
1944

1945
    // init internal
1946
    pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
303✔
1947
    if (syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId) == false) return terrno;
303!
1948

1949
    // init peersNum, peers, peersId
1950
    pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
303✔
1951
    int32_t j = 0;
303✔
1952
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
1,098✔
1953
      if (i != pSyncNode->raftCfg.cfg.myIndex) {
795✔
1954
        pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
492✔
1955
        syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
492✔
1956
        j++;
492✔
1957
      }
1958
    }
1959
    for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
795✔
1960
      if (syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]) == false)
492!
1961
        return terrno;
×
1962
    }
1963

1964
    // init replicaNum, replicasId
1965
    pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
303✔
1966
    pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
303✔
1967
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
1,098✔
1968
      if (syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]) ==
795!
1969
          false)
1970
        return terrno;
×
1971
    }
1972

1973
    // update quorum first
1974
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
303✔
1975

1976
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
303✔
1977
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
303✔
1978
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
303✔
1979
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
303✔
1980

1981
    // reset snapshot senders
1982

1983
    // clear new
1984
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
4,848✔
1985
      pSyncNode->senders[i] = NULL;
4,545✔
1986
    }
1987

1988
    // reset new
1989
    for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
1,098✔
1990
      // reset sender
1991
      bool reset = false;
795✔
1992
      for (int32_t j = 0; j < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++j) {
3,396✔
1993
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j]) && oldSenders[j] != NULL) {
3,253!
1994
          sNTrace(pSyncNode, "snapshot sender reset for:%" PRId64 ", newIndex:%d, dnode:%d, %p",
652!
1995
                  (pSyncNode->replicasId)[i].addr, i, DID(&pSyncNode->replicasId[i]), oldSenders[j]);
1996

1997
          pSyncNode->senders[i] = oldSenders[j];
652✔
1998
          oldSenders[j] = NULL;
652✔
1999
          reset = true;
652✔
2000

2001
          // reset replicaIndex
2002
          int32_t oldreplicaIndex = pSyncNode->senders[i]->replicaIndex;
652✔
2003
          pSyncNode->senders[i]->replicaIndex = i;
652✔
2004

2005
          sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, dnode:%d, %p, reset:%d",
652!
2006
                  oldreplicaIndex, i, DID(&pSyncNode->replicasId[i]), pSyncNode->senders[i], reset);
2007

2008
          break;
652✔
2009
        }
2010
      }
2011
    }
2012

2013
    // create new
2014
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
4,848✔
2015
      if (pSyncNode->senders[i] == NULL) {
4,545✔
2016
        TAOS_CHECK_RETURN(snapshotSenderCreate(pSyncNode, i, &pSyncNode->senders[i]));
3,893!
2017
        if (pSyncNode->senders[i] == NULL) {
3,893!
2018
          // will be created later while send snapshot
2019
          sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig");
×
2020
        } else {
2021
          sSDebug(pSyncNode->senders[i], "snapshot sender create while reconfig, data:%p", pSyncNode->senders[i]);
3,893!
2022
        }
2023
      } else {
2024
        sSDebug(pSyncNode->senders[i], "snapshot sender already exist, data:%p", pSyncNode->senders[i]);
652!
2025
      }
2026
    }
2027

2028
    // free old
2029
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
4,848✔
2030
      if (oldSenders[i] != NULL) {
4,545✔
2031
        sSDebug(oldSenders[i], "snapshot sender destroy old, data:%p replica-index:%d", oldSenders[i], i);
3,893!
2032
        snapshotSenderDestroy(oldSenders[i]);
3,893✔
2033
        oldSenders[i] = NULL;
3,893✔
2034
      }
2035
    }
2036

2037
    // persist cfg
2038
    TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode));
303!
2039
  } else {
2040
    // persist cfg
2041
    TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode));
×
2042
    sNInfo(pSyncNode, "do not config change from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
×
2043
  }
2044

2045
_END:
×
2046
  // log end config change
2047
  sNInfo(pSyncNode, "end do config change, from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
303!
2048
  return 0;
303✔
2049
}
2050

2051
// raft state change --------------
2052
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
6,354✔
2053
  if (term > raftStoreGetTerm(pSyncNode)) {
6,354!
2054
    raftStoreSetTerm(pSyncNode, term);
×
2055
  }
2056
}
6,354✔
2057

2058
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm, SRaftId id) {
430,287✔
2059
  SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
430,287✔
2060
  if (currentTerm > newTerm) {
430,295!
2061
    sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
×
2062
    return;
×
2063
  }
2064

2065
  do {
2066
    sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
430,295!
2067
  } while (0);
2068

2069
  if (pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
430,295!
2070
    (void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
×
2071
    syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncNode->vgId, pSyncNode->arbToken);
×
2072
    sInfo("vgId:%d, step down as assigned leader, new arbToken:%s", pSyncNode->vgId, pSyncNode->arbToken);
×
2073
    (void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
×
2074
  }
2075

2076
  if (currentTerm < newTerm) {
430,285✔
2077
    raftStoreSetTerm(pSyncNode, newTerm);
2,049✔
2078
    char tmpBuf[64];
2079
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
2,049✔
2080
    syncNodeBecomeFollower(pSyncNode, id, tmpBuf);
2,049✔
2081
    raftStoreClearVote(pSyncNode);
2,049✔
2082
  } else {
2083
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
428,236✔
2084
      syncNodeBecomeFollower(pSyncNode, id, "step down");
5✔
2085
    }
2086
  }
2087
}
2088

2089
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }
4,920✔
2090

2091
void syncNodeBecomeFollower(SSyncNode* pSyncNode, SRaftId leaderId, const char* debugStr) {
4,920✔
2092
  int32_t code = 0;  // maybe clear leader cache
4,920✔
2093
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
4,920✔
2094
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
23✔
2095
  }
2096

2097
  pSyncNode->hbSlowNum = 0;
4,920✔
2098

2099
  pSyncNode->leaderCache = leaderId;  // state change
4,920✔
2100
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
4,920✔
2101
  pSyncNode->roleTimeMs = taosGetTimestampMs();
4,920✔
2102
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
4,920!
2103
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2104
    return;
×
2105
  }
2106

2107
  // trace log
2108
  sNTrace(pSyncNode, "become follower %s", debugStr);
4,920!
2109

2110
  // send rsp to client
2111
  syncNodeLeaderChangeRsp(pSyncNode);
4,920✔
2112

2113
  // call back
2114
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
4,920!
2115
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
4,920✔
2116
  }
2117

2118
  // min match index
2119
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
4,920✔
2120

2121
  // reset log buffer
2122
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
4,920!
2123
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2124
    return;
×
2125
  }
2126

2127
  // reset elect timer
2128
  syncNodeResetElectTimer(pSyncNode);
4,920✔
2129

2130
  sInfo("vgId:%d, become follower. %s", pSyncNode->vgId, debugStr);
4,920!
2131
}
2132

2133
void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) {
258✔
2134
  pSyncNode->hbSlowNum = 0;
258✔
2135

2136
  // state change
2137
  pSyncNode->state = TAOS_SYNC_STATE_LEARNER;
258✔
2138
  pSyncNode->roleTimeMs = taosGetTimestampMs();
258✔
2139

2140
  // trace log
2141
  sNTrace(pSyncNode, "become learner %s", debugStr);
258!
2142

2143
  // call back
2144
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLearnerCb != NULL) {
258!
2145
    pSyncNode->pFsm->FpBecomeLearnerCb(pSyncNode->pFsm);
258✔
2146
  }
2147

2148
  // min match index
2149
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
258✔
2150

2151
  // reset log buffer
2152
  int32_t code = 0;
258✔
2153
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
258!
2154
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2155
    return;
×
2156
  };
2157
}
2158

2159
// TLA+ Spec
2160
// \* Candidate i transitions to leader.
2161
// BecomeLeader(i) ==
2162
//     /\ state[i] = Candidate
2163
//     /\ votesGranted[i] \in Quorum
2164
//     /\ state'      = [state EXCEPT ![i] = Leader]
2165
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
2166
//                          [j \in Server |-> Len(log[i]) + 1]]
2167
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
2168
//                          [j \in Server |-> 0]]
2169
//     /\ elections'  = elections \cup
2170
//                          {[eterm     |-> currentTerm[i],
2171
//                            eleader   |-> i,
2172
//                            elog      |-> log[i],
2173
//                            evotes    |-> votesGranted[i],
2174
//                            evoterLog |-> voterLog[i]]}
2175
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
2176
//
2177
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
11,506✔
2178
  int32_t code = 0;
11,506✔
2179
  pSyncNode->becomeLeaderNum++;
11,506✔
2180
  pSyncNode->hbrSlowNum = 0;
11,506✔
2181

2182
  // reset restoreFinish
2183
  pSyncNode->restoreFinish = false;
11,506✔
2184

2185
  // state change
2186
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
11,506✔
2187
  pSyncNode->roleTimeMs = taosGetTimestampMs();
11,506✔
2188

2189
  // set leader cache
2190
  pSyncNode->leaderCache = pSyncNode->myRaftId;
11,506✔
2191

2192
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
24,903✔
2193
    SyncIndex lastIndex;
2194
    SyncTerm  lastTerm;
2195
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
13,399✔
2196
    if (code != 0) {
13,396!
2197
      sError("vgId:%d, failed to become leader since %s", pSyncNode->vgId, tstrerror(code));
×
2198
      return;
×
2199
    }
2200
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
13,397✔
2201
  }
2202

2203
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
24,901✔
2204
    // maybe overwrite myself, no harm
2205
    // just do it!
2206
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
13,397✔
2207
  }
2208

2209
  // init peer mgr
2210
  if ((code = syncNodePeerStateInit(pSyncNode)) != 0) {
11,504!
2211
    sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code));
×
2212
    return;
×
2213
  }
2214

2215
#if 0
2216
  // update sender private term
2217
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
2218
  if (pMySender != NULL) {
2219
    for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
2220
      if (pSyncNode->senders[i]->privateTerm > pMySender->privateTerm) {
2221
        pMySender->privateTerm = pSyncNode->senders[i]->privateTerm;
2222
      }
2223
    }
2224
    (pMySender->privateTerm) += 100;
2225
  }
2226
#endif
2227

2228
  // close receiver
2229
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
11,506!
2230
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
2231
  }
2232

2233
  // stop elect timer
2234
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
11,506!
2235
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
2236
    return;
×
2237
  }
2238

2239
  // start heartbeat timer
2240
  if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) {
11,505!
2241
    sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2242
    return;
×
2243
  }
2244

2245
  // send heartbeat right now
2246
  if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) {
11,504!
2247
    sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code));
×
2248
    return;
×
2249
  }
2250

2251
  // call back
2252
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
11,504!
2253
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
11,506✔
2254
  }
2255

2256
  // min match index
2257
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
11,501✔
2258

2259
  // reset log buffer
2260
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
11,501!
2261
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2262
    return;
×
2263
  }
2264

2265
  // trace log
2266
  sNInfo(pSyncNode, "become leader %s", debugStr);
11,506✔
2267
}
2268

2269
void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) {
×
2270
  int32_t code = 0;
×
2271
  pSyncNode->becomeAssignedLeaderNum++;
×
2272
  pSyncNode->hbrSlowNum = 0;
×
2273

2274
  // reset restoreFinish
2275
  // pSyncNode->restoreFinish = false;
2276

2277
  // state change
2278
  pSyncNode->state = TAOS_SYNC_STATE_ASSIGNED_LEADER;
×
2279
  pSyncNode->roleTimeMs = taosGetTimestampMs();
×
2280

2281
  // set leader cache
2282
  pSyncNode->leaderCache = pSyncNode->myRaftId;
×
2283

2284
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
×
2285
    SyncIndex lastIndex;
2286
    SyncTerm  lastTerm;
2287
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
×
2288
    if (code != 0) {
×
2289
      sError("vgId:%d, failed to become assigned leader since %s", pSyncNode->vgId, tstrerror(code));
×
2290
      return;
×
2291
    }
2292
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
×
2293
  }
2294

2295
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
×
2296
    // maybe overwrite myself, no harm
2297
    // just do it!
2298
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
×
2299
  }
2300

2301
  // init peer mgr
2302
  if ((code = syncNodePeerStateInit(pSyncNode)) != 0) {
×
2303
    sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code));
×
2304
    return;
×
2305
  }
2306

2307
  // close receiver
2308
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
×
2309
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
2310
  }
2311

2312
  // stop elect timer
2313
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
×
2314
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
2315
    return;
×
2316
  }
2317

2318
  // start heartbeat timer
2319
  if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) {
×
2320
    sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2321
    return;
×
2322
  }
2323

2324
  // send heartbeat right now
2325
  if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) {
×
2326
    sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code));
×
2327
    return;
×
2328
  }
2329

2330
  // call back
2331
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeAssignedLeaderCb != NULL) {
×
2332
    pSyncNode->pFsm->FpBecomeAssignedLeaderCb(pSyncNode->pFsm);
×
2333
  }
2334

2335
  // min match index
2336
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
×
2337

2338
  // reset log buffer
2339
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
×
2340
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2341
    return;
×
2342
  }
2343

2344
  // trace log
2345
  sNInfo(pSyncNode, "become assigned leader");
×
2346
}
2347

2348
void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
961✔
2349
  if (pSyncNode->state != TAOS_SYNC_STATE_CANDIDATE) {
961!
2350
    sError("vgId:%d, failed leader from candidate since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state);
×
2351
    return;
×
2352
  }
2353
  bool granted = voteGrantedMajority(pSyncNode->pVotesGranted);
961✔
2354
  if (!granted) {
961!
2355
    sError("vgId:%d, not granted by majority.", pSyncNode->vgId);
×
2356
    return;
×
2357
  }
2358
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");
961✔
2359

2360
  sNTrace(pSyncNode, "state change syncNodeCandidate2Leader");
961!
2361

2362
  int32_t ret = syncNodeAppendNoop(pSyncNode);
961✔
2363
  if (ret < 0) {
961!
2364
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, terrstr());
×
2365
  }
2366

2367
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
961✔
2368

2369
  sInfo("vgId:%d, become leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64 "", pSyncNode->vgId,
961!
2370
        raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
2371
}
2372

2373
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }
526,946✔
2374

2375
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
25,150✔
2376
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
402,490✔
2377
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
377,340✔
2378
    pSyncNode->peerStates[i].lastSendTime = 0;
377,340✔
2379
  }
2380

2381
  return 0;
25,150✔
2382
}
2383

2384
void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
1,238✔
2385
  if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
1,238!
2386
    sError("vgId:%d, failed candidate from follower since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state);
×
2387
    return;
×
2388
  }
2389
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
1,238✔
2390
  pSyncNode->roleTimeMs = taosGetTimestampMs();
1,238✔
2391
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
1,238✔
2392
  sInfo("vgId:%d, become candidate from follower. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
1,238!
2393
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
2394

2395
  sNTrace(pSyncNode, "follower to candidate");
1,238!
2396
}
2397

2398
int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) {
×
2399
  if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) return TSDB_CODE_SYN_INTERNAL_ERROR;
×
2400
  syncNodeBecomeLeader(pSyncNode, "assigned leader to leader");
×
2401

2402
  sNTrace(pSyncNode, "assigned leader to leader");
×
2403

2404
  int32_t ret = syncNodeAppendNoop(pSyncNode);
×
2405
  if (ret < 0) {
×
2406
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, tstrerror(ret));
×
2407
  }
2408

2409
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
×
2410
  sInfo("vgId:%d, become leader from assigned leader. term:%" PRId64 ", commit index:%" PRId64
×
2411
        "assigned commit index:%" PRId64 ", last index:%" PRId64,
2412
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, pSyncNode->assignedCommitIndex,
2413
        lastIndex);
2414
  return 0;
×
2415
}
2416

2417
// just called by syncNodeVoteForSelf
2418
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
1,315✔
2419
  SyncTerm storeTerm = raftStoreGetTerm(pSyncNode);
1,315✔
2420
  if (term != storeTerm) {
1,315!
2421
    sError("vgId:%d, failed to vote for term, term:%" PRId64 ", storeTerm:%" PRId64, pSyncNode->vgId, term, storeTerm);
×
2422
    return;
×
2423
  }
2424
  bool voted = raftStoreHasVoted(pSyncNode);
1,315✔
2425
  if (voted) {
1,315!
2426
    sError("vgId:%d, failed to vote for term since not voted", pSyncNode->vgId);
×
2427
    return;
×
2428
  }
2429

2430
  raftStoreVote(pSyncNode, pRaftId);
1,315✔
2431
}
2432

2433
// simulate get vote from outside
2434
void syncNodeVoteForSelf(SSyncNode* pSyncNode, SyncTerm currentTerm) {
1,315✔
2435
  syncNodeVoteForTerm(pSyncNode, currentTerm, &pSyncNode->myRaftId);
1,315✔
2436

2437
  SRpcMsg rpcMsg = {0};
1,315✔
2438
  int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
1,315✔
2439
  if (ret != 0) return;
1,315!
2440

2441
  SyncRequestVoteReply* pMsg = rpcMsg.pCont;
1,315✔
2442
  pMsg->srcId = pSyncNode->myRaftId;
1,315✔
2443
  pMsg->destId = pSyncNode->myRaftId;
1,315✔
2444
  pMsg->term = currentTerm;
1,315✔
2445
  pMsg->voteGranted = true;
1,315✔
2446

2447
  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
1,315✔
2448
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
1,315✔
2449
  rpcFreeCont(rpcMsg.pCont);
1,315✔
2450
}
2451

2452
// return if has a snapshot
2453
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
17,992✔
2454
  bool      ret = false;
17,992✔
2455
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
17,992✔
2456
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
17,992!
2457
    // TODO check return value
2458
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
17,992✔
2459
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
17,992✔
2460
      ret = true;
2,115✔
2461
    }
2462
  }
2463
  return ret;
17,992✔
2464
}
2465

2466
// return max(logLastIndex, snapshotLastIndex)
2467
// if no snapshot and log, return -1
2468
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
17,997✔
2469
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
17,997✔
2470
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
17,997!
2471
    // TODO check return value
2472
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
17,997✔
2473
  }
2474
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
17,997✔
2475

2476
  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
17,997✔
2477
  return lastIndex;
17,997✔
2478
}
2479

2480
// return the last term of snapshot and log
2481
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
2482
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
17,992✔
2483
  SyncTerm lastTerm = 0;
17,992✔
2484
  if (syncNodeHasSnapshot(pSyncNode)) {
17,992✔
2485
    // has snapshot
2486
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
2,115✔
2487
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
2,115!
2488
      // TODO check return value
2489
      (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
2,115✔
2490
    }
2491

2492
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
2,115✔
2493
    if (logLastIndex > snapshot.lastApplyIndex) {
2,115✔
2494
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
1,221✔
2495
    } else {
2496
      lastTerm = snapshot.lastApplyTerm;
894✔
2497
    }
2498

2499
  } else {
2500
    // no snapshot
2501
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
15,877✔
2502
  }
2503

2504
  return lastTerm;
17,991✔
2505
}
2506

2507
// get last index and term along with snapshot
2508
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
15,919✔
2509
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
15,919✔
2510
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
15,919✔
2511
  return 0;
15,918✔
2512
}
2513

2514
#ifdef BUILD_NO_CALL
2515
// return append-entries first try index
2516
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
2517
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
2518
  return syncStartIndex;
2519
}
2520

2521
// if index > 0, return index - 1
2522
// else, return -1
2523
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
2524
  SyncIndex preIndex = index - 1;
2525
  if (preIndex < SYNC_INDEX_INVALID) {
2526
    preIndex = SYNC_INDEX_INVALID;
2527
  }
2528

2529
  return preIndex;
2530
}
2531

2532
// if index < 0, return SYNC_TERM_INVALID
2533
// if index == 0, return 0
2534
// if index > 0, return preTerm
2535
// if error, return SYNC_TERM_INVALID
2536
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
2537
  if (index < SYNC_INDEX_BEGIN) {
2538
    return SYNC_TERM_INVALID;
2539
  }
2540

2541
  if (index == SYNC_INDEX_BEGIN) {
2542
    return 0;
2543
  }
2544

2545
  SyncTerm  preTerm = 0;
2546
  SyncIndex preIndex = index - 1;
2547

2548
  SSyncRaftEntry* pPreEntry = NULL;
2549
  SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
2550
  LRUHandle*      h = taosLRUCacheLookup(pCache, &preIndex, sizeof(preIndex));
2551
  int32_t         code = 0;
2552
  if (h) {
2553
    pPreEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
2554
    code = 0;
2555

2556
    pSyncNode->pLogStore->cacheHit++;
2557
    sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", preIndex, pPreEntry->bytes, pPreEntry);
2558

2559
  } else {
2560
    pSyncNode->pLogStore->cacheMiss++;
2561
    sNTrace(pSyncNode, "miss cache index:%" PRId64, preIndex);
2562

2563
    code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
2564
  }
2565

2566
  SSnapshot snapshot = {.data = NULL,
2567
                        .lastApplyIndex = SYNC_INDEX_INVALID,
2568
                        .lastApplyTerm = SYNC_TERM_INVALID,
2569
                        .lastConfigIndex = SYNC_INDEX_INVALID};
2570

2571
  if (code == 0) {
2572
    if (pPreEntry == NULL) return -1;
2573
    preTerm = pPreEntry->term;
2574

2575
    if (h) {
2576
      taosLRUCacheRelease(pCache, h, false);
2577
    } else {
2578
      syncEntryDestroy(pPreEntry);
2579
    }
2580

2581
    return preTerm;
2582
  } else {
2583
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
2584
      // TODO check return value
2585
      (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
2586
      if (snapshot.lastApplyIndex == preIndex) {
2587
        return snapshot.lastApplyTerm;
2588
      }
2589
    }
2590
  }
2591

2592
  sNError(pSyncNode, "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRId64,
2593
          index, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
2594
  return SYNC_TERM_INVALID;
2595
}
2596

2597
// get pre index and term of "index"
2598
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
2599
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
2600
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
2601
  return 0;
2602
}
2603
#endif
2604

2605
static void syncNodeEqPingTimer(void* param, void* tmrId) {
248,465✔
2606
  if (!syncIsInit()) return;
248,465!
2607

2608
  int64_t    rid = (int64_t)param;
248,465✔
2609
  SSyncNode* pNode = syncNodeAcquire(rid);
248,465✔
2610

2611
  if (pNode == NULL) return;
248,465!
2612

2613
  if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
248,465!
2614
    SRpcMsg rpcMsg = {0};
248,465✔
2615
    int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
248,465✔
2616
                                    pNode->pingTimerMS, pNode);
2617
    if (code != 0) {
248,465!
2618
      sError("failed to build ping msg");
×
2619
      rpcFreeCont(rpcMsg.pCont);
×
2620
      goto _out;
×
2621
    }
2622

2623
    // sTrace("enqueue ping msg");
2624
    code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
248,465✔
2625
    if (code != 0) {
248,465✔
2626
      sError("failed to sync enqueue ping msg since %s", terrstr());
1!
2627
      rpcFreeCont(rpcMsg.pCont);
1✔
2628
      goto _out;
1✔
2629
    }
2630

2631
  _out:
248,464✔
2632
    if (taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
248,465!
2633
                     &pNode->pPingTimer))
2634
      sError("failed to reset ping timer");
×
2635
  }
2636
  syncNodeRelease(pNode);
248,465✔
2637
}
2638

2639
static void syncNodeEqElectTimer(void* param, void* tmrId) {
1,320✔
2640
  if (!syncIsInit()) return;
1,325!
2641

2642
  int64_t    rid = (int64_t)param;
1,320✔
2643
  SSyncNode* pNode = syncNodeAcquire(rid);
1,320✔
2644

2645
  if (pNode == NULL) return;
1,320!
2646

2647
  if (pNode->syncEqMsg == NULL) {
1,320!
2648
    syncNodeRelease(pNode);
×
2649
    return;
×
2650
  }
2651

2652
  int64_t tsNow = taosGetTimestampMs();
1,320✔
2653
  if (tsNow < pNode->electTimerParam.executeTime) {
1,320✔
2654
    syncNodeRelease(pNode);
5✔
2655
    return;
5✔
2656
  }
2657

2658
  SRpcMsg rpcMsg = {0};
1,315✔
2659
  int32_t code =
2660
      syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerParam.logicClock, pNode->electTimerMS, pNode);
1,315✔
2661

2662
  if (code != 0) {
1,315!
2663
    sError("failed to build elect msg");
×
2664
    syncNodeRelease(pNode);
×
2665
    return;
×
2666
  }
2667

2668
  SyncTimeout* pTimeout = rpcMsg.pCont;
1,315✔
2669
  sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);
1,315!
2670

2671
  code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
1,315✔
2672
  if (code != 0) {
1,315!
2673
    sError("failed to sync enqueue elect msg since %s", terrstr());
×
2674
    rpcFreeCont(rpcMsg.pCont);
×
2675
    syncNodeRelease(pNode);
×
2676
    return;
×
2677
  }
2678

2679
  syncNodeRelease(pNode);
1,315✔
2680
}
2681

2682
#ifdef BUILD_NO_CALL
2683
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
2684
  if (!syncIsInit()) return;
2685

2686
  int64_t    rid = (int64_t)param;
2687
  SSyncNode* pNode = syncNodeAcquire(rid);
2688

2689
  if (pNode == NULL) return;
2690

2691
  if (pNode->totalReplicaNum > 1) {
2692
    if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
2693
      SRpcMsg rpcMsg = {0};
2694
      int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
2695
                                      pNode->heartbeatTimerMS, pNode);
2696

2697
      if (code != 0) {
2698
        sError("failed to build heartbeat msg");
2699
        goto _out;
2700
      }
2701

2702
      sTrace("vgId:%d, enqueue heartbeat timer", pNode->vgId);
2703
      code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
2704
      if (code != 0) {
2705
        sError("failed to enqueue heartbeat msg since %s", terrstr());
2706
        rpcFreeCont(rpcMsg.pCont);
2707
        goto _out;
2708
      }
2709

2710
    _out:
2711
      if (taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
2712
                       &pNode->pHeartbeatTimer) != 0)
2713
        return;
2714

2715
    } else {
2716
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
2717
             pNode->heartbeatTimerLogicClock, pNode->heartbeatTimerLogicClockUser);
2718
    }
2719
  }
2720
}
2721
#endif
2722

2723
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
40,914✔
2724
  int32_t code = 0;
40,914✔
2725
  int64_t hbDataRid = (int64_t)param;
40,914✔
2726
  int64_t tsNow = taosGetTimestampMs();
40,914✔
2727

2728
  SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid);
40,914✔
2729
  if (pData == NULL) {
40,914!
2730
    sError("hb timer get pData NULL, %" PRId64, hbDataRid);
×
2731
    return;
×
2732
  }
2733

2734
  SSyncNode* pSyncNode = syncNodeAcquire(pData->syncNodeRid);
40,914✔
2735
  if (pSyncNode == NULL) {
40,914✔
2736
    syncHbTimerDataRelease(pData);
3✔
2737
    sError("hb timer get pSyncNode NULL");
3!
2738
    return;
3✔
2739
  }
2740

2741
  SSyncTimer* pSyncTimer = pData->pTimer;
40,911✔
2742

2743
  if (!pSyncNode->isStart) {
40,911!
2744
    syncNodeRelease(pSyncNode);
×
2745
    syncHbTimerDataRelease(pData);
×
2746
    sError("vgId:%d, hb timer sync node already stop", pSyncNode->vgId);
×
2747
    return;
×
2748
  }
2749

2750
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
40,911!
2751
    syncNodeRelease(pSyncNode);
×
2752
    syncHbTimerDataRelease(pData);
×
2753
    sError("vgId:%d, hb timer sync node not leader", pSyncNode->vgId);
×
2754
    return;
×
2755
  }
2756

2757
  sTrace("vgId:%d, peer hb timer execution, rid:%" PRId64 " addr:%" PRId64, pSyncNode->vgId, hbDataRid,
40,911!
2758
         pData->destId.addr);
2759

2760
  if (pSyncNode->totalReplicaNum > 1) {
40,911✔
2761
    int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
40,909✔
2762
    int64_t msgLogicClock = atomic_load_64(&pData->logicClock);
40,909✔
2763

2764
    if (timerLogicClock == msgLogicClock) {
40,909✔
2765
      if (tsNow > pData->execTime) {
40,906✔
2766
        pData->execTime += pSyncTimer->timerMS;
40,787✔
2767

2768
        SRpcMsg rpcMsg = {0};
40,787✔
2769
        if ((code = syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId)) != 0) {
40,787!
2770
          sError("vgId:%d, failed to build heartbeat msg since %s", pSyncNode->vgId, tstrerror(code));
×
2771
          syncNodeRelease(pSyncNode);
×
2772
          syncHbTimerDataRelease(pData);
×
2773
          return;
×
2774
        }
2775

2776
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);
40,787✔
2777

2778
        SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
40,787✔
2779
        pSyncMsg->srcId = pSyncNode->myRaftId;
40,787✔
2780
        pSyncMsg->destId = pData->destId;
40,787✔
2781
        pSyncMsg->term = raftStoreGetTerm(pSyncNode);
40,787✔
2782
        pSyncMsg->commitIndex = pSyncNode->commitIndex;
40,787✔
2783
        pSyncMsg->minMatchIndex = pSyncNode->minMatchIndex;
40,787✔
2784
        pSyncMsg->privateTerm = 0;
40,787✔
2785
        pSyncMsg->timeStamp = tsNow;
40,787✔
2786

2787
        // update reset time
2788
        int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
40,787✔
2789
        pSyncTimer->timeStamp = tsNow;
40,787✔
2790

2791
        // send msg
2792
        TRACE_SET_MSGID(&(rpcMsg.info.traceId), tGenIdPI64());
40,787✔
2793
        STraceId* trace = &(rpcMsg.info.traceId);
40,787✔
2794
        sGTrace("vgId:%d, send sync-heartbeat to dnode:%d", pSyncNode->vgId, DID(&(pSyncMsg->destId)));
40,787!
2795
        syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime);
40,787✔
2796
        int ret = syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
40,787✔
2797
        if (ret != 0) {
40,787✔
2798
          sError("vgId:%d, failed to send heartbeat since %s", pSyncNode->vgId, tstrerror(ret));
82!
2799
        }
2800
      }
2801

2802
      if (syncIsInit()) {
40,906!
2803
        sTrace("vgId:%d, reset peer hb timer at %d", pSyncNode->vgId, pSyncTimer->timerMS);
40,906!
2804
        bool stopped = taosTmrResetPriority(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, (void*)hbDataRid,
40,906✔
2805
                                            syncEnv()->pTimerManager, &pSyncTimer->pTimer, 2);
40,906✔
2806
        if (stopped) sError("vgId:%d, reset peer hb timer error, %s", pSyncNode->vgId, tstrerror(code));
40,906!
2807

2808
      } else {
2809
        sError("sync env is stop, reset peer hb timer error");
×
2810
      }
2811

2812
    } else {
2813
      sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", pSyncNode->vgId,
3!
2814
             timerLogicClock, msgLogicClock);
2815
    }
2816
  }
2817

2818
  syncHbTimerDataRelease(pData);
40,911✔
2819
  syncNodeRelease(pSyncNode);
40,911✔
2820
}
2821

2822
#ifdef BUILD_NO_CALL
2823
static void deleteCacheEntry(const void* key, size_t keyLen, void* value, void* ud) {
2824
  (void)ud;
2825
  taosMemoryFree(value);
2826
}
2827

2828
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
2829
  SSyncLogStoreData* pData = pLogStore->data;
2830
  sNTrace(pData->pSyncNode, "in cache index:%" PRId64 ", bytes:%u, %p", pEntry->index, pEntry->bytes, pEntry);
2831

2832
  int32_t   code = 0;
2833
  int32_t   entryLen = sizeof(*pEntry) + pEntry->dataLen;
2834
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
2835
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW, NULL);
2836
  if (status != TAOS_LRU_STATUS_OK) {
2837
    code = -1;
2838
  }
2839

2840
  return code;
2841
}
2842
#endif
2843

2844
void syncBuildConfigFromReq(SAlterVnodeReplicaReq* pReq, SSyncCfg* cfg) {  // TODO SAlterVnodeReplicaReq name is proper?
×
2845
  cfg->replicaNum = 0;
×
2846
  cfg->totalReplicaNum = 0;
×
2847
  int32_t code = 0;
×
2848

2849
  for (int i = 0; i < pReq->replica; ++i) {
×
2850
    SNodeInfo* pNode = &cfg->nodeInfo[i];
×
2851
    pNode->nodeId = pReq->replicas[i].id;
×
2852
    pNode->nodePort = pReq->replicas[i].port;
×
2853
    tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
×
2854
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2855
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
×
2856
    sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d, update:%d", pReq->vgId, i, pNode->nodeFqdn,
×
2857
          pNode->nodePort, pNode->nodeId, pNode->nodeRole, update);
2858
    cfg->replicaNum++;
×
2859
  }
2860
  if (pReq->selfIndex != -1) {
×
2861
    cfg->myIndex = pReq->selfIndex;
×
2862
  }
2863
  for (int i = cfg->replicaNum; i < pReq->replica + pReq->learnerReplica; ++i) {
×
2864
    SNodeInfo* pNode = &cfg->nodeInfo[i];
×
2865
    pNode->nodeId = pReq->learnerReplicas[cfg->totalReplicaNum].id;
×
2866
    pNode->nodePort = pReq->learnerReplicas[cfg->totalReplicaNum].port;
×
2867
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
2868
    tstrncpy(pNode->nodeFqdn, pReq->learnerReplicas[cfg->totalReplicaNum].fqdn, sizeof(pNode->nodeFqdn));
×
2869
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
×
2870
    sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d, update:%d", pReq->vgId, i, pNode->nodeFqdn,
×
2871
          pNode->nodePort, pNode->nodeId, pNode->nodeRole, update);
2872
    cfg->totalReplicaNum++;
×
2873
  }
2874
  cfg->totalReplicaNum += pReq->replica;
×
2875
  if (pReq->learnerSelfIndex != -1) {
×
2876
    cfg->myIndex = pReq->replica + pReq->learnerSelfIndex;
×
2877
  }
2878
  cfg->changeVersion = pReq->changeVersion;
×
2879
}
×
2880

2881
int32_t syncNodeCheckChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry) {
×
2882
  int32_t code = 0;
×
2883
  if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) {
×
2884
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
2885
  }
2886

2887
  SMsgHead* head = (SMsgHead*)pEntry->data;
×
2888
  void*     pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
×
2889

2890
  SAlterVnodeTypeReq req = {0};
×
2891
  if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
×
2892
    code = TSDB_CODE_INVALID_MSG;
×
2893
    TAOS_RETURN(code);
×
2894
  }
2895

2896
  SSyncCfg cfg = {0};
×
2897
  syncBuildConfigFromReq(&req, &cfg);
×
2898

2899
  if (cfg.totalReplicaNum >= 1 &&
×
2900
      (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
×
2901
    bool incfg = false;
×
2902
    for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
2903
      if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
2904
          ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
2905
        incfg = true;
×
2906
        break;
×
2907
      }
2908
    }
2909

2910
    if (!incfg) {
×
2911
      SyncTerm currentTerm = raftStoreGetTerm(ths);
×
2912
      SRaftId  id = EMPTY_RAFT_ID;
×
2913
      syncNodeStepDown(ths, currentTerm, id);
×
2914
      return 1;
×
2915
    }
2916
  }
2917
  return 0;
×
2918
}
2919

2920
void syncNodeLogConfigInfo(SSyncNode* ths, SSyncCfg* cfg, char* str) {
×
2921
  sInfo("vgId:%d, %s. SyncNode, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64
×
2922
        ", changeVersion:%d, "
2923
        "restoreFinish:%d",
2924
        ths->vgId, str, ths->replicaNum, ths->peersNum, ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion,
2925
        ths->restoreFinish);
2926

2927
  sInfo("vgId:%d, %s, myNodeInfo, clusterId:0x%" PRIx64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str,
×
2928
        ths->myNodeInfo.clusterId, ths->myNodeInfo.nodeId, ths->myNodeInfo.nodeFqdn, ths->myNodeInfo.nodePort,
2929
        ths->myNodeInfo.nodeRole);
2930

2931
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
2932
    sInfo("vgId:%d, %s, peersNodeInfo%d, clusterId:0x%" PRIx64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str,
×
2933
          i, ths->peersNodeInfo[i].clusterId, ths->peersNodeInfo[i].nodeId, ths->peersNodeInfo[i].nodeFqdn,
2934
          ths->peersNodeInfo[i].nodePort, ths->peersNodeInfo[i].nodeRole);
2935
  }
2936

2937
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
2938
    char    buf[256];
2939
    int32_t len = 256;
×
2940
    int32_t n = 0;
×
2941
    n += tsnprintf(buf + n, len - n, "%s", "{");
×
2942
    for (int i = 0; i < ths->peersEpset->numOfEps; i++) {
×
2943
      n += tsnprintf(buf + n, len - n, "%s:%d%s", ths->peersEpset->eps[i].fqdn, ths->peersEpset->eps[i].port,
×
2944
                     (i + 1 < ths->peersEpset->numOfEps ? ", " : ""));
×
2945
    }
2946
    n += tsnprintf(buf + n, len - n, "%s", "}");
×
2947

2948
    sInfo("vgId:%d, %s, peersEpset%d, %s, inUse:%d", ths->vgId, str, i, buf, ths->peersEpset->inUse);
×
2949
  }
2950

2951
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
2952
    sInfo("vgId:%d, %s, peersId%d, addr:%" PRId64, ths->vgId, str, i, ths->peersId[i].addr);
×
2953
  }
2954

2955
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
2956
    sInfo("vgId:%d, %s, nodeInfo%d, clusterId:0x%" PRIx64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str, i,
×
2957
          ths->raftCfg.cfg.nodeInfo[i].clusterId, ths->raftCfg.cfg.nodeInfo[i].nodeId,
2958
          ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, ths->raftCfg.cfg.nodeInfo[i].nodePort,
2959
          ths->raftCfg.cfg.nodeInfo[i].nodeRole);
2960
  }
2961

2962
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
2963
    sInfo("vgId:%d, %s, replicasId%d, addr:%" PRId64, ths->vgId, str, i, ths->replicasId[i].addr);
×
2964
  }
2965
}
×
2966

2967
int32_t syncNodeRebuildPeerAndCfg(SSyncNode* ths, SSyncCfg* cfg) {
×
2968
  int32_t i = 0;
×
2969

2970
  // change peersNodeInfo
2971
  i = 0;
×
2972
  for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
2973
    if (!(strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
2974
          ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)) {
×
2975
      ths->peersNodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
×
2976
      ths->peersNodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
×
2977
      tstrncpy(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
×
2978
      ths->peersNodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
×
2979
      ths->peersNodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
×
2980

2981
      syncUtilNodeInfo2EpSet(&ths->peersNodeInfo[i], &ths->peersEpset[i]);
×
2982

2983
      if (syncUtilNodeInfo2RaftId(&ths->peersNodeInfo[i], ths->vgId, &ths->peersId[i]) == false) {
×
2984
        sError("vgId:%d, failed to determine raft member id, peer:%d", ths->vgId, i);
×
2985
        return terrno;
×
2986
      }
2987

2988
      i++;
×
2989
    }
2990
  }
2991
  ths->peersNum = i;
×
2992

2993
  // change cfg nodeInfo
2994
  ths->raftCfg.cfg.replicaNum = 0;
×
2995
  i = 0;
×
2996
  for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
2997
    if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
2998
      ths->raftCfg.cfg.replicaNum++;
×
2999
    }
3000
    ths->raftCfg.cfg.nodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
×
3001
    ths->raftCfg.cfg.nodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
×
3002
    tstrncpy(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
×
3003
    ths->raftCfg.cfg.nodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
×
3004
    ths->raftCfg.cfg.nodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
×
3005
    if ((strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3006
         ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)) {
×
3007
      ths->raftCfg.cfg.myIndex = i;
×
3008
    }
3009
    i++;
×
3010
  }
3011
  ths->raftCfg.cfg.totalReplicaNum = i;
×
3012

3013
  return 0;
×
3014
}
3015

3016
void syncNodeChangePeerAndCfgToVoter(SSyncNode* ths, SSyncCfg* cfg) {
×
3017
  // change peersNodeInfo
3018
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3019
    for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3020
      if (strcmp(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3021
          ths->peersNodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort) {
×
3022
        if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3023
          ths->peersNodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3024
        }
3025
      }
3026
    }
3027
  }
3028

3029
  // change cfg nodeInfo
3030
  ths->raftCfg.cfg.replicaNum = 0;
×
3031
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3032
    for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3033
      if (strcmp(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3034
          ths->raftCfg.cfg.nodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort) {
×
3035
        if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3036
          ths->raftCfg.cfg.nodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3037
          ths->raftCfg.cfg.replicaNum++;
×
3038
        }
3039
      }
3040
    }
3041
  }
3042
}
×
3043

3044
int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum) {
×
3045
  int32_t code = 0;
×
3046
  // 1.rebuild replicasId, remove deleted one
3047
  SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
3048
  memcpy(oldReplicasId, ths->replicasId, sizeof(oldReplicasId));
×
3049

3050
  ths->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3051
  ths->totalReplicaNum = ths->raftCfg.cfg.totalReplicaNum;
×
3052
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3053
    if (syncUtilNodeInfo2RaftId(&ths->raftCfg.cfg.nodeInfo[i], ths->vgId, &ths->replicasId[i]) == false) return terrno;
×
3054
  }
3055

3056
  // 2.rebuild MatchIndex, remove deleted one
3057
  SSyncIndexMgr* oldIndex = ths->pMatchIndex;
×
3058

3059
  ths->pMatchIndex = syncIndexMgrCreate(ths);
×
3060
  if (ths->pMatchIndex == NULL) {
×
3061
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3062
    if (terrno != 0) code = terrno;
×
3063
    TAOS_RETURN(code);
×
3064
  }
3065

3066
  syncIndexMgrCopyIfExist(ths->pMatchIndex, oldIndex, oldReplicasId);
×
3067

3068
  syncIndexMgrDestroy(oldIndex);
×
3069

3070
  // 3.rebuild NextIndex, remove deleted one
3071
  SSyncIndexMgr* oldNextIndex = ths->pNextIndex;
×
3072

3073
  ths->pNextIndex = syncIndexMgrCreate(ths);
×
3074
  if (ths->pNextIndex == NULL) {
×
3075
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3076
    if (terrno != 0) code = terrno;
×
3077
    TAOS_RETURN(code);
×
3078
  }
3079

3080
  syncIndexMgrCopyIfExist(ths->pNextIndex, oldNextIndex, oldReplicasId);
×
3081

3082
  syncIndexMgrDestroy(oldNextIndex);
×
3083

3084
  // 4.rebuild pVotesGranted, pVotesRespond, no need to keep old vote state, only rebuild
3085
  voteGrantedUpdate(ths->pVotesGranted, ths);
×
3086
  votesRespondUpdate(ths->pVotesRespond, ths);
×
3087

3088
  // 5.rebuild logReplMgr
3089
  for (int i = 0; i < oldtotalReplicaNum; ++i) {
×
3090
    sDebug("vgId:%d, old logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId,
×
3091
           i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
3092
           ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
3093
  }
3094

3095
  SSyncLogReplMgr* oldLogReplMgrs = NULL;
×
3096
  int64_t          length = sizeof(SSyncLogReplMgr) * (TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA);
×
3097
  oldLogReplMgrs = taosMemoryMalloc(length);
×
3098
  if (NULL == oldLogReplMgrs) return terrno;
×
3099
  memset(oldLogReplMgrs, 0, length);
×
3100

3101
  for (int i = 0; i < oldtotalReplicaNum; i++) {
×
3102
    oldLogReplMgrs[i] = *(ths->logReplMgrs[i]);
×
3103
  }
3104

3105
  syncNodeLogReplDestroy(ths);
×
3106
  if ((code = syncNodeLogReplInit(ths)) != 0) {
×
3107
    taosMemoryFree(oldLogReplMgrs);
×
3108
    TAOS_RETURN(code);
×
3109
  }
3110

3111
  for (int i = 0; i < ths->totalReplicaNum; ++i) {
×
3112
    for (int j = 0; j < oldtotalReplicaNum; j++) {
×
3113
      if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
×
3114
        *(ths->logReplMgrs[i]) = oldLogReplMgrs[j];
×
3115
        ths->logReplMgrs[i]->peerId = i;
×
3116
      }
3117
    }
3118
  }
3119

3120
  for (int i = 0; i < ths->totalReplicaNum; ++i) {
×
3121
    sDebug("vgId:%d, new logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId,
×
3122
           i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
3123
           ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
3124
  }
3125

3126
  // 6.rebuild sender
3127
  for (int i = 0; i < oldtotalReplicaNum; ++i) {
×
3128
    sDebug("vgId:%d, old sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i,
×
3129
           ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
3130
  }
3131

3132
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3133
    if (ths->senders[i] != NULL) {
×
3134
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", ths->vgId, ths->senders[i]);
×
3135

3136
      if (snapshotSenderIsStart(ths->senders[i])) {
×
3137
        snapshotSenderStop(ths->senders[i], false);
×
3138
      }
3139

3140
      snapshotSenderDestroy(ths->senders[i]);
×
3141
      ths->senders[i] = NULL;
×
3142
    }
3143
  }
3144

3145
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3146
    SSyncSnapshotSender* pSender = NULL;
×
3147
    int32_t              code = snapshotSenderCreate(ths, i, &pSender);
×
3148
    if (pSender == NULL) return terrno = code;
×
3149

3150
    ths->senders[i] = pSender;
×
3151
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
×
3152
  }
3153

3154
  for (int i = 0; i < ths->totalReplicaNum; i++) {
×
3155
    sDebug("vgId:%d, new sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i,
×
3156
           ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
3157
  }
3158

3159
  // 7.rebuild synctimer
3160
  if ((code = syncNodeStopHeartbeatTimer(ths)) != 0) {
×
3161
    taosMemoryFree(oldLogReplMgrs);
×
3162
    TAOS_RETURN(code);
×
3163
  }
3164

3165
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3166
    if ((code = syncHbTimerInit(ths, &ths->peerHeartbeatTimerArr[i], ths->replicasId[i])) != 0) {
×
3167
      taosMemoryFree(oldLogReplMgrs);
×
3168
      TAOS_RETURN(code);
×
3169
    }
3170
  }
3171

3172
  if ((code = syncNodeStartHeartbeatTimer(ths)) != 0) {
×
3173
    taosMemoryFree(oldLogReplMgrs);
×
3174
    TAOS_RETURN(code);
×
3175
  }
3176

3177
  // 8.rebuild peerStates
3178
  SPeerState oldState[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0};
×
3179
  for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) {
×
3180
    oldState[i] = ths->peerStates[i];
×
3181
  }
3182

3183
  for (int i = 0; i < ths->totalReplicaNum; i++) {
×
3184
    for (int j = 0; j < oldtotalReplicaNum; j++) {
×
3185
      if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
×
3186
        ths->peerStates[i] = oldState[j];
×
3187
      }
3188
    }
3189
  }
3190

3191
  taosMemoryFree(oldLogReplMgrs);
×
3192

3193
  return 0;
×
3194
}
3195

3196
void syncNodeChangeToVoter(SSyncNode* ths) {
×
3197
  // replicasId, only need to change replicaNum when 1->3
3198
  ths->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3199
  sDebug("vgId:%d, totalReplicaNum:%d", ths->vgId, ths->totalReplicaNum);
×
3200
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
×
3201
    sDebug("vgId:%d, i:%d, replicaId.addr:%" PRIx64, ths->vgId, i, ths->replicasId[i].addr);
×
3202
  }
3203

3204
  // pMatchIndex, pNextIndex, only need to change replicaNum when 1->3
3205
  ths->pMatchIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3206
  ths->pNextIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3207

3208
  sDebug("vgId:%d, pMatchIndex->totalReplicaNum:%d", ths->vgId, ths->pMatchIndex->totalReplicaNum);
×
3209
  for (int32_t i = 0; i < ths->pMatchIndex->totalReplicaNum; ++i) {
×
3210
    sDebug("vgId:%d, i:%d, match.index:%" PRId64, ths->vgId, i, ths->pMatchIndex->index[i]);
×
3211
  }
3212

3213
  // pVotesGranted, pVotesRespond
3214
  voteGrantedUpdate(ths->pVotesGranted, ths);
×
3215
  votesRespondUpdate(ths->pVotesRespond, ths);
×
3216

3217
  // logRepMgrs
3218
  // no need to change logRepMgrs when 1->3
3219
}
×
3220

3221
void syncNodeResetPeerAndCfg(SSyncNode* ths) {
×
3222
  SNodeInfo node = {0};
×
3223
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3224
    memcpy(&ths->peersNodeInfo[i], &node, sizeof(SNodeInfo));
×
3225
  }
3226

3227
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3228
    memcpy(&ths->raftCfg.cfg.nodeInfo[i], &node, sizeof(SNodeInfo));
×
3229
  }
3230
}
×
3231

3232
int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str) {
×
3233
  int32_t code = 0;
×
3234
  if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) {
×
3235
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3236
  }
3237

3238
  SMsgHead* head = (SMsgHead*)pEntry->data;
×
3239
  void*     pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
×
3240

3241
  SAlterVnodeTypeReq req = {0};
×
3242
  if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
×
3243
    code = TSDB_CODE_INVALID_MSG;
×
3244
    TAOS_RETURN(code);
×
3245
  }
3246

3247
  SSyncCfg cfg = {0};
×
3248
  syncBuildConfigFromReq(&req, &cfg);
×
3249

3250
  if (cfg.changeVersion <= ths->raftCfg.cfg.changeVersion) {
×
3251
    sInfo(
×
3252
        "vgId:%d, skip conf change entry since lower version. "
3253
        "this entry, index:%" PRId64 ", term:%" PRId64
3254
        ", totalReplicaNum:%d, changeVersion:%d; "
3255
        "current node, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64 ", changeVersion:%d",
3256
        ths->vgId, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum,
3257
        ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion);
3258
    return 0;
×
3259
  }
3260

3261
  if (strcmp(str, "Commit") == 0) {
×
3262
    sInfo(
×
3263
        "vgId:%d, change config from %s. "
3264
        "this, i:%" PRId64
3265
        ", trNum:%d, vers:%d; "
3266
        "node, rNum:%d, pNum:%d, trNum:%d, "
3267
        "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
3268
        "), "
3269
        "cond:(next i:%" PRId64 ", t:%" PRId64 " ==%s)",
3270
        ths->vgId, str, pEntry->index - 1, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum,
3271
        ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, ths->pLogBuf->matchIndex,
3272
        ths->pLogBuf->endIndex, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType));
3273
  } else {
3274
    sInfo(
×
3275
        "vgId:%d, change config from %s. "
3276
        "this, i:%" PRId64 ", t:%" PRId64
3277
        ", trNum:%d, vers:%d; "
3278
        "node, rNum:%d, pNum:%d, trNum:%d, "
3279
        "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
3280
        "), "
3281
        "cond:(pre i:%" PRId64 "==ci:%" PRId64 ", bci:%" PRId64 ")",
3282
        ths->vgId, str, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum,
3283
        ths->peersNum, ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
3284
        ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex, pEntry->index - 1, ths->commitIndex,
3285
        ths->pLogBuf->commitIndex);
3286
  }
3287

3288
  syncNodeLogConfigInfo(ths, &cfg, "before config change");
×
3289

3290
  int32_t oldTotalReplicaNum = ths->totalReplicaNum;
×
3291

3292
  if (cfg.totalReplicaNum == 1 || cfg.totalReplicaNum == 2) {  // remove replica
×
3293

3294
    bool incfg = false;
×
3295
    for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3296
      if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3297
          ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3298
        incfg = true;
×
3299
        break;
×
3300
      }
3301
    }
3302

3303
    if (incfg) {  // remove other
×
3304
      syncNodeResetPeerAndCfg(ths);
×
3305

3306
      // no need to change myNodeInfo
3307

3308
      if ((code = syncNodeRebuildPeerAndCfg(ths, &cfg)) != 0) {
×
3309
        TAOS_RETURN(code);
×
3310
      };
3311

3312
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3313
        TAOS_RETURN(code);
×
3314
      };
3315
    } else {  // remove myself
3316
      // no need to do anything actually, to change the following to reduce distruptive server chance
3317

3318
      syncNodeResetPeerAndCfg(ths);
×
3319

3320
      // change myNodeInfo
3321
      ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3322

3323
      // change peer and cfg
3324
      ths->peersNum = 0;
×
3325
      memcpy(&ths->raftCfg.cfg.nodeInfo[0], &ths->myNodeInfo, sizeof(SNodeInfo));
×
3326
      ths->raftCfg.cfg.replicaNum = 0;
×
3327
      ths->raftCfg.cfg.totalReplicaNum = 1;
×
3328

3329
      // change other
3330
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3331
        TAOS_RETURN(code);
×
3332
      }
3333

3334
      // change state
3335
      ths->state = TAOS_SYNC_STATE_LEARNER;
×
3336
    }
3337

3338
    ths->restoreFinish = false;
×
3339
  } else {                            // add replica, or change replica type
3340
    if (ths->totalReplicaNum == 3) {  // change replica type
×
3341
      sInfo("vgId:%d, begin change replica type", ths->vgId);
×
3342

3343
      // change myNodeInfo
3344
      for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3345
        if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3346
            ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3347
          if (cfg.nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3348
            ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3349
          }
3350
        }
3351
      }
3352

3353
      // change peer and cfg
3354
      syncNodeChangePeerAndCfgToVoter(ths, &cfg);
×
3355

3356
      // change other
3357
      syncNodeChangeToVoter(ths);
×
3358

3359
      // change state
3360
      if (ths->state == TAOS_SYNC_STATE_LEARNER) {
×
3361
        if (ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3362
          ths->state = TAOS_SYNC_STATE_FOLLOWER;
×
3363
        }
3364
      }
3365

3366
      ths->restoreFinish = false;
×
3367
    } else {  // add replica
3368
      sInfo("vgId:%d, begin add replica", ths->vgId);
×
3369

3370
      // no need to change myNodeInfo
3371

3372
      // change peer and cfg
3373
      if ((code = syncNodeRebuildPeerAndCfg(ths, &cfg)) != 0) {
×
3374
        TAOS_RETURN(code);
×
3375
      };
3376

3377
      // change other
3378
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3379
        TAOS_RETURN(code);
×
3380
      };
3381

3382
      // no need to change state
3383

3384
      if (ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) {
×
3385
        ths->restoreFinish = false;
×
3386
      }
3387
    }
3388
  }
3389

3390
  ths->quorum = syncUtilQuorum(ths->replicaNum);
×
3391

3392
  ths->raftCfg.lastConfigIndex = pEntry->index;
×
3393
  ths->raftCfg.cfg.lastIndex = pEntry->index;
×
3394
  ths->raftCfg.cfg.changeVersion = cfg.changeVersion;
×
3395

3396
  syncNodeLogConfigInfo(ths, &cfg, "after config change");
×
3397

3398
  if ((code = syncWriteCfgFile(ths)) != 0) {
×
3399
    sError("vgId:%d, failed to create sync cfg file", ths->vgId);
×
3400
    TAOS_RETURN(code);
×
3401
  };
3402

3403
  TAOS_RETURN(code);
×
3404
}
3405

3406
int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
10,343,013✔
3407
  int32_t code = -1;
10,343,013✔
3408
  if (pEntry->dataLen < sizeof(SMsgHead)) {
10,343,013!
3409
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
3410
    sError("vgId:%d, cannot append an invalid client request with no msg head. type:%s, dataLen:%d", ths->vgId,
×
3411
           TMSG_INFO(pEntry->originalRpcType), pEntry->dataLen);
3412
    syncEntryDestroy(pEntry);
×
3413
    pEntry = NULL;
×
3414
    goto _out;
×
3415
  }
3416

3417
  // append to log buffer
3418
  if ((code = syncLogBufferAppend(ths->pLogBuf, ths, pEntry)) < 0) {
10,343,013✔
3419
    sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index);
100!
3420
    int32_t ret = 0;
100✔
3421
    if ((ret = syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno, false)) != 0) {
100!
3422
      sError("vgId:%d, failed to execute fsm, since %s", ths->vgId, tstrerror(ret));
×
3423
    }
3424
    syncEntryDestroy(pEntry);
×
3425
    pEntry = NULL;
×
3426
    goto _out;
×
3427
  }
3428

3429
  code = 0;
10,342,946✔
3430
_out:;
10,342,946✔
3431
  // proceed match index, with replicating on needed
3432
  SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL, "Append");
10,342,946✔
3433

3434
  if (pEntry != NULL)
10,342,871✔
3435
    sTrace("vgId:%d, append raft entry. index:%" PRId64 ", term:%" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64
10,342,798✔
3436
           ", %" PRId64 ")",
3437
           ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
3438
           ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
3439

3440
  if (code == 0 && ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
10,342,871!
3441
    int64_t index = syncNodeUpdateAssignedCommitIndex(ths, matchIndex);
×
3442
    sTrace("vgId:%d, update assigned commit index %" PRId64 "", ths->vgId, index);
×
3443

3444
    if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
×
3445
        syncLogBufferCommit(ths->pLogBuf, ths, ths->assignedCommitIndex) < 0) {
×
3446
      sError("vgId:%d, failed to commit until commitIndex:%" PRId64 "", ths->vgId, ths->commitIndex);
×
3447
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
3448
    }
3449
  }
3450

3451
  // multi replica
3452
  if (ths->replicaNum > 1) {
10,342,818✔
3453
    TAOS_RETURN(code);
151,739✔
3454
  }
3455

3456
  // single replica
3457
  SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, matchIndex);
10,191,079✔
3458
  sTrace("vgId:%d, update commit return index %" PRId64 "", ths->vgId, returnIndex);
10,191,062✔
3459

3460
  if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
20,382,399!
3461
      (code = syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex)) < 0) {
10,191,025✔
3462
    sError("vgId:%d, failed to commit until commitIndex:%" PRId64 "", ths->vgId, ths->commitIndex);
×
3463
  }
3464

3465
  TAOS_RETURN(code);
10,191,374✔
3466
}
3467

3468
bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
10,332,738✔
3469
  if (pSyncNode->totalReplicaNum == 1) {
10,332,738✔
3470
    return false;
10,065,943✔
3471
  }
3472

3473
  int32_t toCount = 0;
266,795✔
3474
  int64_t tsNow = taosGetTimestampMs();
266,909✔
3475
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
684,942✔
3476
    if (pSyncNode->peersNodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
418,046✔
3477
      continue;
115,283✔
3478
    }
3479
    int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
302,763✔
3480
    if (recvTime == 0 || recvTime == -1) {
302,750!
3481
      continue;
×
3482
    }
3483

3484
    if (tsNow - recvTime > tsHeartbeatTimeout) {
302,750✔
3485
      toCount++;
9,476✔
3486
    }
3487
  }
3488

3489
  bool b = (toCount >= pSyncNode->quorum ? true : false);
266,896✔
3490

3491
  return b;
266,896✔
3492
}
3493

3494
bool syncNodeSnapshotSending(SSyncNode* pSyncNode) {
×
3495
  if (pSyncNode == NULL) return false;
×
3496
  bool b = false;
×
3497
  for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
×
3498
    if (pSyncNode->senders[i] != NULL && pSyncNode->senders[i]->start) {
×
3499
      b = true;
×
3500
      break;
×
3501
    }
3502
  }
3503
  return b;
×
3504
}
3505

3506
bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) {
×
3507
  if (pSyncNode == NULL) return false;
×
3508
  if (pSyncNode->pNewNodeReceiver == NULL) return false;
×
3509
  if (pSyncNode->pNewNodeReceiver->start) return true;
×
3510
  return false;
×
3511
}
3512

3513
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
11,506✔
3514
  int32_t   code = 0;
11,506✔
3515
  SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
11,506✔
3516
  SyncTerm  term = raftStoreGetTerm(ths);
11,506✔
3517

3518
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
11,506✔
3519
  if (pEntry == NULL) {
11,506!
3520
    code = TSDB_CODE_OUT_OF_MEMORY;
×
3521
    TAOS_RETURN(code);
×
3522
  }
3523

3524
  code = syncNodeAppend(ths, pEntry);
11,506✔
3525
  TAOS_RETURN(code);
11,506✔
3526
}
3527

3528
#ifdef BUILD_NO_CALL
3529
static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
3530
  int32_t ret = 0;
3531

3532
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
3533
  SyncTerm        term = raftStoreGetTerm(ths);
3534
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
3535
  if (pEntry == NULL) return -1;
3536

3537
  LRUHandle* h = NULL;
3538

3539
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
3540
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry, false);
3541
    if (code != 0) {
3542
      sError("append noop error");
3543
      return -1;
3544
    }
3545

3546
    syncCacheEntry(ths->pLogStore, pEntry, &h);
3547
  }
3548

3549
  if (h) {
3550
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
3551
  } else {
3552
    syncEntryDestroy(pEntry);
3553
  }
3554

3555
  return ret;
3556
}
3557
#endif
3558

3559
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
37,012✔
3560
  SyncHeartbeat* pMsg = pRpcMsg->pCont;
37,012✔
3561
  bool           resetElect = false;
37,012✔
3562

3563
  const STraceId* trace = &pRpcMsg->info.traceId;
37,012✔
3564
  char            tbuf[40] = {0};
37,012✔
3565
  TRACE_TO_STR(trace, tbuf);
37,012!
3566

3567
  int64_t tsMs = taosGetTimestampMs();
37,012✔
3568
  int64_t timeDiff = tsMs - pMsg->timeStamp;
37,012✔
3569
  syncLogRecvHeartbeat(ths, pMsg, timeDiff, tbuf);
37,012✔
3570

3571
  if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) {
37,011✔
3572
    sWarn(
6!
3573
        "vgId:%d, drop heartbeat msg from dnode:%d, because it come from another cluster:%d, differ from current "
3574
        "cluster:%d",
3575
        ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), CID(&(ths->myRaftId)));
3576
    return 0;
6✔
3577
  }
3578

3579
  SRpcMsg rpcMsg = {0};
37,006✔
3580
  TAOS_CHECK_RETURN(syncBuildHeartbeatReply(&rpcMsg, ths->vgId));
37,006!
3581
  SyncTerm currentTerm = raftStoreGetTerm(ths);
37,006✔
3582

3583
  SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
37,006✔
3584
  pMsgReply->destId = pMsg->srcId;
37,006✔
3585
  pMsgReply->srcId = ths->myRaftId;
37,006✔
3586
  pMsgReply->term = currentTerm;
37,006✔
3587
  pMsgReply->privateTerm = 8864;  // magic number
37,006✔
3588
  pMsgReply->startTime = ths->startTime;
37,006✔
3589
  pMsgReply->timeStamp = tsMs;
37,006✔
3590

3591
  sGTrace("vgId:%d, process sync-heartbeat msg from dnode:%d, cluster:%d, Msgterm:%" PRId64 " currentTerm:%" PRId64,
37,006!
3592
          ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), pMsg->term, currentTerm);
3593

3594
  if (pMsg->term > currentTerm && ths->state == TAOS_SYNC_STATE_LEARNER) {
37,006✔
3595
    raftStoreSetTerm(ths, pMsg->term);
248✔
3596
    currentTerm = pMsg->term;
248✔
3597
  }
3598

3599
  if (pMsg->term == currentTerm &&
37,006✔
3600
      (ths->state != TAOS_SYNC_STATE_LEADER && ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
36,851!
3601
    syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);
36,852✔
3602
    resetElect = true;
36,851✔
3603

3604
    ths->minMatchIndex = pMsg->minMatchIndex;
36,851✔
3605

3606
    if (ths->state == TAOS_SYNC_STATE_FOLLOWER || ths->state == TAOS_SYNC_STATE_LEARNER) {
36,851✔
3607
      SRpcMsg rpcMsgLocalCmd = {0};
36,843✔
3608
      TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId));
36,843!
3609

3610
      SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
36,844✔
3611
      pSyncMsg->cmd =
36,844✔
3612
          (ths->state == TAOS_SYNC_STATE_LEARNER) ? SYNC_LOCAL_CMD_LEARNER_CMT : SYNC_LOCAL_CMD_FOLLOWER_CMT;
36,844✔
3613
      pSyncMsg->commitIndex = pMsg->commitIndex;
36,844✔
3614
      pSyncMsg->currentTerm = pMsg->term;
36,844✔
3615

3616
      if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
36,844!
3617
        int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
36,844✔
3618
        if (code != 0) {
36,843!
3619
          sError("vgId:%d, failed to enqueue commit msg from heartbeat since %s, code:%d", ths->vgId, terrstr(), code);
×
3620
          rpcFreeCont(rpcMsgLocalCmd.pCont);
×
3621
        } else {
3622
          sTrace("vgId:%d, enqueue commit msg from heartbeat, commit-index:%" PRId64 ", term:%" PRId64, ths->vgId,
36,843!
3623
                 pMsg->commitIndex, pMsg->term);
3624
        }
3625
      }
3626
    }
3627
  }
3628

3629
  if (pMsg->term >= currentTerm &&
37,005!
3630
      (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
37,005!
3631
    SRpcMsg rpcMsgLocalCmd = {0};
×
3632
    TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId));
×
3633

3634
    SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
×
3635
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
×
3636
    pSyncMsg->currentTerm = pMsg->term;
×
3637
    pSyncMsg->commitIndex = pMsg->commitIndex;
×
3638

3639
    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
×
3640
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
×
3641
      if (code != 0) {
×
3642
        sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
×
3643
        rpcFreeCont(rpcMsgLocalCmd.pCont);
×
3644
      } else {
3645
        sTrace("vgId:%d, sync enqueue step-down msg, new-term:%" PRId64, ths->vgId, pMsg->term);
×
3646
      }
3647
    }
3648
  }
3649

3650
  // reply
3651
  TAOS_CHECK_RETURN(syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg));
37,005!
3652

3653
  if (resetElect) syncNodeResetElectTimer(ths);
37,006✔
3654
  return 0;
37,006✔
3655
}
3656

3657
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
36,334✔
3658
  int32_t         code = 0;
36,334✔
3659
  const STraceId* trace = &pRpcMsg->info.traceId;
36,334✔
3660
  char            tbuf[40] = {0};
36,334✔
3661
  TRACE_TO_STR(trace, tbuf);
36,334!
3662

3663
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
36,334✔
3664
  SSyncLogReplMgr*    pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId);
36,334✔
3665
  if (pMgr == NULL) {
36,334!
3666
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3667
    if (terrno != 0) code = terrno;
×
3668
    sError("vgId:%d, failed to get log repl mgr for the peer at addr 0x016%" PRIx64 "", ths->vgId, pMsg->srcId.addr);
×
3669
    TAOS_RETURN(code);
×
3670
  }
3671

3672
  int64_t tsMs = taosGetTimestampMs();
36,334✔
3673
  syncLogRecvHeartbeatReply(ths, pMsg, tsMs - pMsg->timeStamp, tbuf);
36,334✔
3674

3675
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
36,334✔
3676

3677
  return syncLogReplProcessHeartbeatReply(pMgr, ths, pMsg);
36,334✔
3678
}
3679

3680
#ifdef BUILD_NO_CALL
3681
int32_t syncNodeOnHeartbeatReplyOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
3682
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
3683

3684
  const STraceId* trace = &pRpcMsg->info.traceId;
3685
  char            tbuf[40] = {0};
3686
  TRACE_TO_STR(trace, tbuf);
3687

3688
  int64_t tsMs = taosGetTimestampMs();
3689
  int64_t timeDiff = tsMs - pMsg->timeStamp;
3690
  syncLogRecvHeartbeatReply(ths, pMsg, timeDiff, tbuf);
3691

3692
  // update last reply time, make decision whether the other node is alive or not
3693
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
3694
  return 0;
3695
}
3696
#endif
3697

3698
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
36,843✔
3699
  SyncLocalCmd* pMsg = pRpcMsg->pCont;
36,843✔
3700
  syncLogRecvLocalCmd(ths, pMsg, "");
36,843✔
3701

3702
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
36,843!
3703
    SRaftId id = EMPTY_RAFT_ID;
×
3704
    syncNodeStepDown(ths, pMsg->currentTerm, id);
×
3705

3706
  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT || pMsg->cmd == SYNC_LOCAL_CMD_LEARNER_CMT) {
73,686!
3707
    if (syncLogBufferIsEmpty(ths->pLogBuf)) {
36,843!
3708
      sError("vgId:%d, sync log buffer is empty.", ths->vgId);
×
3709
      return 0;
×
3710
    }
3711
    SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf);
36,844✔
3712
    if (matchTerm < 0) {
36,844!
3713
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3714
    }
3715
    if (pMsg->currentTerm == matchTerm) {
36,844✔
3716
      SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
33,970✔
3717
      sTrace("vgId:%d, update commit return index %" PRId64 "", ths->vgId, returnIndex);
33,970!
3718
    }
3719
    if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE && syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
36,844!
3720
      sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64 "", ths->vgId, terrstr(),
×
3721
             ths->commitIndex);
3722
    }
3723
  } else {
3724
    sError("error local cmd");
×
3725
  }
3726

3727
  return 0;
36,843✔
3728
}
3729

3730
// TLA+ Spec
3731
// ClientRequest(i, v) ==
3732
//     /\ state[i] = Leader
3733
//     /\ LET entry == [term  |-> currentTerm[i],
3734
//                      value |-> v]
3735
//            newLog == Append(log[i], entry)
3736
//        IN  log' = [log EXCEPT ![i] = newLog]
3737
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
3738
//                    leaderVars, commitIndex>>
3739
//
3740

3741
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
10,331,456✔
3742
  sNTrace(ths, "on client request");
10,331,456✔
3743

3744
  int32_t code = 0;
10,331,456✔
3745

3746
  SyncIndex       index = syncLogBufferGetEndIndex(ths->pLogBuf);
10,331,456✔
3747
  SyncTerm        term = raftStoreGetTerm(ths);
10,331,642✔
3748
  SSyncRaftEntry* pEntry = NULL;
10,331,630✔
3749
  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
10,331,630✔
3750
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index);
242,387✔
3751
  } else {
3752
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
10,089,243✔
3753
  }
3754

3755
  if (pEntry == NULL) {
10,331,639!
3756
    sError("vgId:%d, failed to process client request since %s.", ths->vgId, terrstr());
×
3757
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3758
  }
3759

3760
  // 1->2, config change is add in write thread, and will continue in sync thread
3761
  // need save message for it
3762
  if (pMsg->msgType == TDMT_SYNC_CONFIG_CHANGE) {
10,331,639!
3763
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
×
3764
    uint64_t  seqNum = syncRespMgrAdd(ths->pSyncRespMgr, &stub);
×
3765
    pEntry->seqNum = seqNum;
×
3766
  }
3767

3768
  if (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
10,331,639!
3769
    if (pRetIndex) {
10,331,639✔
3770
      (*pRetIndex) = index;
10,089,175✔
3771
    }
3772

3773
    if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
10,331,639!
3774
      int32_t code = syncNodeCheckChangeConfig(ths, pEntry);
×
3775
      if (code < 0) {
×
3776
        sError("vgId:%d, failed to check change config since %s.", ths->vgId, terrstr());
×
3777
        syncEntryDestroy(pEntry);
×
3778
        pEntry = NULL;
×
3779
        TAOS_RETURN(code);
×
3780
      }
3781

3782
      if (code > 0) {
×
3783
        SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
×
3784
        int32_t num = syncRespMgrGetAndDel(ths->pSyncRespMgr, pEntry->seqNum, &rsp.info);
×
3785
        sDebug("vgId:%d, get response stub for config change, seqNum:%" PRIu64 ", num:%d", ths->vgId, pEntry->seqNum,
×
3786
               num);
3787
        if (rsp.info.handle != NULL) {
×
3788
          tmsgSendRsp(&rsp);
×
3789
        }
3790
        syncEntryDestroy(pEntry);
×
3791
        pEntry = NULL;
×
3792
        TAOS_RETURN(code);
×
3793
      }
3794
    }
3795

3796
    code = syncNodeAppend(ths, pEntry);
10,331,639✔
3797
    return code;
10,331,286✔
3798
  } else {
3799
    syncEntryDestroy(pEntry);
×
3800
    pEntry = NULL;
×
3801
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3802
  }
3803
}
3804

3805
const char* syncStr(ESyncState state) {
1,866,687✔
3806
  switch (state) {
1,866,687!
3807
    case TAOS_SYNC_STATE_FOLLOWER:
142,162✔
3808
      return "follower";
142,162✔
3809
    case TAOS_SYNC_STATE_CANDIDATE:
8,929✔
3810
      return "candidate";
8,929✔
3811
    case TAOS_SYNC_STATE_LEADER:
1,699,730✔
3812
      return "leader";
1,699,730✔
3813
    case TAOS_SYNC_STATE_ERROR:
×
3814
      return "error";
×
3815
    case TAOS_SYNC_STATE_OFFLINE:
3,642✔
3816
      return "offline";
3,642✔
3817
    case TAOS_SYNC_STATE_LEARNER:
12,207✔
3818
      return "learner";
12,207✔
3819
    case TAOS_SYNC_STATE_ASSIGNED_LEADER:
×
3820
      return "assigned leader";
×
3821
    default:
17✔
3822
      return "unknown";
17✔
3823
  }
3824
}
3825

3826
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
1,662✔
3827
  for (int32_t i = 0; i < pNewCfg->totalReplicaNum; ++i) {
1,914!
3828
    SRaftId raftId = {
1,914✔
3829
        .addr = SYNC_ADDR(&pNewCfg->nodeInfo[i]),
1,914✔
3830
        .vgId = ths->vgId,
1,914✔
3831
    };
3832

3833
    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
1,914✔
3834
      pNewCfg->myIndex = i;
1,662✔
3835
      return 0;
1,662✔
3836
    }
3837
  }
3838

3839
  return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3840
}
3841

3842
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
10,332,713✔
3843
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
10,332,713!
3844
}
3845

3846
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
1,846,381✔
3847
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
3,142,651✔
3848
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
3,142,647✔
3849
      return true;
1,846,378✔
3850
    }
3851
  }
3852
  return false;
4✔
3853
}
3854

3855
SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
32,894✔
3856
  SSyncSnapshotSender* pSender = NULL;
32,894✔
3857
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
126,333✔
3858
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
93,417✔
3859
      pSender = (ths->senders)[i];
32,910✔
3860
    }
3861
  }
3862
  return pSender;
32,916✔
3863
}
3864

3865
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
26,290✔
3866
  SSyncTimer* pTimer = NULL;
26,290✔
3867
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
110,628✔
3868
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
84,334✔
3869
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
26,291✔
3870
    }
3871
  }
3872
  return pTimer;
26,294✔
3873
}
3874

3875
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
19,575✔
3876
  SPeerState* pState = NULL;
19,575✔
3877
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
41,862✔
3878
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
22,287✔
3879
      pState = &((ths->peerStates)[i]);
19,576✔
3880
    }
3881
  }
3882
  return pState;
19,575✔
3883
}
3884

3885
#ifdef BUILD_NO_CALL
3886
bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
3887
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
3888
  if (pState == NULL) {
3889
    sError("vgId:%d, replica maybe dropped", ths->vgId);
3890
    return false;
3891
  }
3892

3893
  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
3894
  int64_t   tsNow = taosGetTimestampMs();
3895

3896
  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
3897
    return false;
3898
  }
3899

3900
  return true;
3901
}
3902

3903
bool syncNodeCanChange(SSyncNode* pSyncNode) {
3904
  if (pSyncNode->changing) {
3905
    sError("sync cannot change");
3906
    return false;
3907
  }
3908

3909
  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
3910
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
3911
    if (pSyncNode->commitIndex != lastIndex) {
3912
      sError("sync cannot change2");
3913
      return false;
3914
    }
3915
  }
3916

3917
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
3918
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
3919
    if (pSender != NULL && pSender->start) {
3920
      sError("sync cannot change3");
3921
      return false;
3922
    }
3923
  }
3924

3925
  return true;
3926
}
3927
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc