• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3524

08 Nov 2024 04:27AM UTC coverage: 60.898% (+5.0%) from 55.861%
#3524

push

travis-ci

web-flow
Merge pull request #28647 from taosdata/fix/3.0/TD-32519_drop_ctb

fix TD-32519 drop child table with tsma caused crash

118687 of 248552 branches covered (47.75%)

Branch coverage included in aggregate %.

286 of 337 new or added lines in 18 files covered. (84.87%)

9647 existing lines in 190 files now uncovered.

199106 of 273291 relevant lines covered (72.85%)

15236719.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.15
/source/libs/sync/src/syncUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "syncUtil.h"
18
#include "syncIndexMgr.h"
19
#include "syncMessage.h"
20
#include "syncPipeline.h"
21
#include "syncRaftCfg.h"
22
#include "syncRaftStore.h"
23
#include "syncSnapshot.h"
24
#include "tglobal.h"
25

26
static void syncCfg2SimpleStr(const SSyncCfg* pCfg, char* buf, int32_t bufLen) {
252,275✔
27
  int32_t len = tsnprintf(buf, bufLen, "{num:%d, as:%d, [", pCfg->replicaNum, pCfg->myIndex);
252,275✔
28
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
559,554✔
29
    len += tsnprintf(buf + len, bufLen - len, "%s:%d", pCfg->nodeInfo[i].nodeFqdn, pCfg->nodeInfo[i].nodePort);
307,275✔
30
    if (i < pCfg->replicaNum - 1) {
307,277✔
31
      len += tsnprintf(buf + len, bufLen - len, "%s", ", ");
55,003✔
32
    }
33
  }
34
  len += tsnprintf(buf + len, bufLen - len, "%s", "]}");
252,279✔
35
}
252,276✔
36

37
void syncUtilNodeInfo2EpSet(const SNodeInfo* pInfo, SEpSet* pEpSet) {
7,416✔
38
  pEpSet->inUse = 0;
7,416✔
39
  pEpSet->numOfEps = 1;
7,416✔
40
  pEpSet->eps[0].port = pInfo->nodePort;
7,416✔
41
  tstrncpy(pEpSet->eps[0].fqdn, pInfo->nodeFqdn, TSDB_FQDN_LEN);
7,416✔
42
}
7,416✔
43

44
bool syncUtilNodeInfo2RaftId(const SNodeInfo* pInfo, SyncGroupId vgId, SRaftId* raftId) {
23,946✔
45
  uint32_t ipv4 = 0xFFFFFFFF;
23,946✔
46
  sDebug("vgId:%d, resolve sync addr from fqdn, ep:%s:%u", vgId, pInfo->nodeFqdn, pInfo->nodePort);
23,946✔
47
  for (int32_t i = 0; i < tsResolveFQDNRetryTime; i++) {
23,955!
48
    int32_t code = taosGetIpv4FromFqdn(pInfo->nodeFqdn, &ipv4);
23,955✔
49
    if (code) {
23,954!
50
      sError("vgId:%d, failed to resolve sync addr, dnode:%d fqdn:%s, retry", vgId, pInfo->nodeId, pInfo->nodeFqdn);
×
51
      taosSsleep(1);
×
52
    } else {
53
      break;
23,954✔
54
    }
55
  }
56

57
  if (ipv4 == 0xFFFFFFFF || ipv4 == 1) {
23,954!
58
    sError("vgId:%d, failed to resolve sync addr, dnode:%d fqdn:%s", vgId, pInfo->nodeId, pInfo->nodeFqdn);
1!
59
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
1✔
60
    return false;
×
61
  }
62

63
  char ipbuf[TD_IP_LEN] = {0};
23,953✔
64
  tinet_ntoa(ipbuf, ipv4);
23,953✔
65
  raftId->addr = SYNC_ADDR(pInfo);
23,954✔
66
  raftId->vgId = vgId;
23,954✔
67

68
  sInfo("vgId:%d, sync addr:%" PRIu64 " is resolved, ep:%s:%u ip:%s ipv4:%u dnode:%d cluster:%" PRId64, vgId,
23,954!
69
        raftId->addr, pInfo->nodeFqdn, pInfo->nodePort, ipbuf, ipv4, pInfo->nodeId, pInfo->clusterId);
70
  return true;
23,955✔
71
}
72

73
bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2) {
44,300,328✔
74
  if (pId1->addr == pId2->addr && pId1->vgId == pId2->vgId) {
44,300,328!
75
    return true;
28,564,915✔
76
  }
77

78
  if ((CID(pId1) == 0 || CID(pId2) == 0) && (DID(pId1) == DID(pId2)) && pId1->vgId == pId2->vgId) {
15,735,413!
79
    return true;
184✔
80
  }
81

82
  return false;
15,735,229✔
83
}
84

85
bool syncUtilEmptyId(const SRaftId* pId) { return (pId->addr == 0 && pId->vgId == 0); }
3,261!
86

87
static inline int32_t syncUtilRand(int32_t max) { return taosRand() % max; }
578,092✔
88

89
int32_t syncUtilElectRandomMS(int32_t min, int32_t max) {
578,092✔
90
  int32_t rdm = min + syncUtilRand(max - min);
578,092✔
91

92
  // sDebug("random min:%d, max:%d, rdm:%d", min, max, rdm);
93
  return rdm;
578,094✔
94
}
95

96
int32_t syncUtilQuorum(int32_t replicaNum) { return replicaNum / 2 + 1; }
15,811✔
97

98
void syncUtilMsgHtoN(void* msg) {
5,899,658✔
99
  SMsgHead* pHead = msg;
5,899,658✔
100
  pHead->contLen = htonl(pHead->contLen);
5,899,658✔
101
  pHead->vgId = htonl(pHead->vgId);
5,899,658✔
102
}
5,899,658✔
103

104
void syncUtilGenerateArbToken(int32_t nodeId, int32_t groupId, char* buf) {
15,533✔
105
  (void)memset(buf, 0, TSDB_ARB_TOKEN_SIZE);
15,533✔
106
  int32_t randVal = taosSafeRand() % 1000;
15,533✔
107
  int64_t currentMs = taosGetTimestampMs();
15,533✔
108
  (void)snprintf(buf, TSDB_ARB_TOKEN_SIZE, "d%d#g%d#%" PRId64 "#%d", nodeId, groupId, currentMs, randVal);
15,533✔
109
}
15,533✔
110

111
// for leader
112
static void syncHearbeatReplyTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
233,162✔
113
  int32_t len = 0;
233,162✔
114
  len += tsnprintf(buf + len, bufLen - len, "%s", "{");
233,162✔
115
  for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
520,025✔
116
    int64_t tsMs = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->replicasId[i]));
286,853✔
117
    len += tsnprintf(buf + len, bufLen - len, "%d:%" PRId64, i, tsMs);
286,857✔
118
    if (i < pSyncNode->replicaNum - 1) {
286,857✔
119
      len += tsnprintf(buf + len, bufLen - len, "%s", ",");
53,690✔
120
    }
121
  }
122
  len += tsnprintf(buf + len, bufLen - len, "%s", "}");
233,172✔
123
}
233,169✔
124

125
// for follower
126
static void syncHearbeatTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
233,165✔
127
  int32_t len = 0;
233,165✔
128
  len += tsnprintf(buf + len, bufLen - len, "%s", "{");
233,165✔
129
  for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
520,027✔
130
    int64_t tsMs = syncIndexMgrGetRecvTime(pSyncNode->pNextIndex, &(pSyncNode->replicasId[i]));
286,854✔
131
    len += tsnprintf(buf + len, bufLen - len, "%d:%" PRId64, i, tsMs);
286,856✔
132
    if (i < pSyncNode->replicaNum - 1) {
286,862✔
133
      len += tsnprintf(buf + len, bufLen - len, "%s", ",");
53,692✔
134
    }
135
  }
136
  len += tsnprintf(buf + len, bufLen - len, "%s", "}");
233,173✔
137
}
233,166✔
138

139
static void syncLogBufferStates2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
233,161✔
140
  SSyncLogBuffer* pBuf = pSyncNode->pLogBuf;
233,161✔
141
  if (pBuf == NULL) {
233,161!
142
    return;
×
143
  }
144
  int32_t len = 0;
233,161✔
145
  len += tsnprintf(buf + len, bufLen - len, "[%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pBuf->startIndex,
233,161✔
146
                  pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
147
}
148

149
static void syncLogReplStates2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
233,166✔
150
  int32_t len = 0;
233,166✔
151
  len += tsnprintf(buf + len, bufLen - len, "%s", "{");
233,166✔
152
  for (int32_t i = 0; i < pSyncNode->replicaNum; i++) {
519,999✔
153
    SSyncLogReplMgr* pMgr = pSyncNode->logReplMgrs[i];
286,857✔
154
    if (pMgr == NULL) break;
286,857✔
155
    len += tsnprintf(buf + len, bufLen - len, "%d:%d [%" PRId64 ", %" PRId64 ", %" PRId64 "]", i, pMgr->restored,
286,830✔
156
                     pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex);
157
    if (i + 1 < pSyncNode->replicaNum) {
286,832✔
158
      len += tsnprintf(buf + len, bufLen - len, "%s", ", ");
53,693✔
159
    }
160
  }
161
  len += tsnprintf(buf + len, bufLen - len, "%s", "}");
233,169✔
162
}
233,169✔
163

164
static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
19,110✔
165
  int32_t len = 0;
19,110✔
166
  len += tsnprintf(buf + len, bufLen - len, "%s", "{");
19,110✔
167
  for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
39,408✔
168
    SPeerState* pState = syncNodeGetPeerState(pSyncNode, &(pSyncNode->replicasId[i]));
20,299✔
169
    if (pState == NULL) break;
20,298!
170
    len += tsnprintf(buf + len, bufLen - len, "%d:%" PRId64 " %" PRId64 "%s", i, pState->lastSendIndex,
20,298✔
171
                    pState->lastSendTime, (i < pSyncNode->replicaNum - 1) ? ", " : "");
20,298✔
172
  }
173
  len += tsnprintf(buf + len, bufLen - len, "%s", "}");
19,109✔
174
}
19,109✔
175

176
void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...) {
233,166✔
177
  if (pNode == NULL || pNode->pLogStore == NULL) return;
233,166!
178
  int64_t currentTerm = raftStoreGetTerm(pNode);
233,167✔
179

180
  // save error code, otherwise it will be overwritten
181
  int32_t errCode = terrno;
233,166✔
182

183
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
233,165✔
184
  if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) {
233,165!
185
    (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
233,166✔
186
  }
187

188
  SyncIndex logLastIndex = SYNC_INDEX_INVALID;
233,165✔
189
  SyncIndex logBeginIndex = SYNC_INDEX_INVALID;
233,165✔
190
  if (pNode->pLogStore != NULL) {
233,165!
191
    logLastIndex = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
233,166✔
192
    logBeginIndex = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
233,159✔
193
  }
194

195
  int32_t cacheHit = pNode->pLogStore->cacheHit;
233,159✔
196
  int32_t cacheMiss = pNode->pLogStore->cacheMiss;
233,159✔
197

198
  char cfgStr[1024] = "";
233,159✔
199
  syncCfg2SimpleStr(&pNode->raftCfg.cfg, cfgStr, sizeof(cfgStr));
233,159✔
200

201
  char replMgrStatesStr[1024] = "";
233,166✔
202
  syncLogReplStates2Str(pNode, replMgrStatesStr, sizeof(replMgrStatesStr));
233,166✔
203

204
  char bufferStatesStr[256] = "";
233,168✔
205
  syncLogBufferStates2Str(pNode, bufferStatesStr, sizeof(bufferStatesStr));
233,168✔
206

207
  char hbrTimeStr[256] = "";
233,168✔
208
  syncHearbeatReplyTime2Str(pNode, hbrTimeStr, sizeof(hbrTimeStr));
233,168✔
209

210
  char hbTimeStr[256] = "";
233,169✔
211
  syncHearbeatTime2Str(pNode, hbTimeStr, sizeof(hbTimeStr));
233,169✔
212

213
  char    eventLog[512];  // {0};
214
  va_list argpointer;
215
  va_start(argpointer, format);
233,166✔
216
  int32_t writeLen = vsnprintf(eventLog, sizeof(eventLog), format, argpointer);
233,166✔
217
  va_end(argpointer);
233,166✔
218

219
  int32_t aqItems = 0;
233,166✔
220
  if (pNode != NULL && pNode->pFsm != NULL && pNode->pFsm->FpApplyQueueItems != NULL) {
233,166!
221
    aqItems = pNode->pFsm->FpApplyQueueItems(pNode->pFsm);
233,163✔
222
  }
223

224
  // restore error code
225
  terrno = errCode;
233,172✔
226
  SyncIndex appliedIndex = pNode->pFsm->FpAppliedIndexCb(pNode->pFsm);
233,169✔
227

228
  if (pNode != NULL) {
233,167!
229
    taosPrintLog(
233,163✔
230
        flags, level, dflag,
231
        "vgId:%d, %s, sync:%s, term:%" PRIu64 ", commit-index:%" PRId64 ", assigned-index:%" PRId64
232
        ", applied-index:%" PRId64 ", first-ver:%" PRId64 ", last-ver:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64
233
        ", snap-term:%" PRIu64
234
        ", elect-times:%d, as-leader-times:%d, as-assigned-leader-times:%d, cfg-ch-times:%d, hb-slow:%d, hbr-slow:%d, "
235
        "aq-items:%d, snaping:%" PRId64 ", replicas:%d, last-cfg:%" PRId64
236
        ", chging:%d, restore:%d, quorum:%d, elect-lc-timer:%" PRId64 ", hb:%" PRId64
237
        ", buffer:%s, repl-mgrs:%s, members:%s, hb:%s, hb-reply:%s, arb-token:%s",
238
        pNode->vgId, eventLog, syncStr(pNode->state), currentTerm, pNode->commitIndex, pNode->assignedCommitIndex,
239
        appliedIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex,
240
        snapshot.lastApplyTerm, pNode->electNum, pNode->becomeLeaderNum, pNode->becomeAssignedLeaderNum,
241
        pNode->configChangeNum, pNode->hbSlowNum, pNode->hbrSlowNum, aqItems, pNode->snapshottingIndex,
242
        pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish,
233,165✔
243
        syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, bufferStatesStr,
244
        replMgrStatesStr, cfgStr, hbTimeStr, hbrTimeStr, pNode->arbToken);
233,168✔
245
  }
246
}
247

248
void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotSender* pSender,
17,274✔
249
                                const char* format, ...) {
250
  SSyncNode* pNode = pSender->pSyncNode;
17,274✔
251
  if (pNode == NULL || pNode->pLogStore == NULL) return;
17,274!
252

253
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
17,274✔
254
  if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) {
17,274!
255
    (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
17,274✔
256
  }
257

258
  SyncIndex logLastIndex = SYNC_INDEX_INVALID;
17,275✔
259
  SyncIndex logBeginIndex = SYNC_INDEX_INVALID;
17,275✔
260
  if (pNode->pLogStore != NULL) {
17,275!
261
    logLastIndex = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
17,275✔
262
    logBeginIndex = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
17,277✔
263
  }
264

265
  char cfgStr[1024] = "";
17,275✔
266
  syncCfg2SimpleStr(&pNode->raftCfg.cfg, cfgStr, sizeof(cfgStr));
17,275✔
267

268
  char peerStr[1024] = "";
17,279✔
269
  syncPeerState2Str(pNode, peerStr, sizeof(peerStr));
17,279✔
270

271
  char    eventLog[512];  // {0};
272
  va_list argpointer;
273
  va_start(argpointer, format);
17,278✔
274
  int32_t writeLen = vsnprintf(eventLog, sizeof(eventLog), format, argpointer);
17,278✔
275
  va_end(argpointer);
17,278✔
276

277
  taosPrintLog(flags, level, dflag,
17,279✔
278
               "vgId:%d, %s, sync:%s, snap-sender:%p signature:(%" PRId64 ", %" PRId64 "), {start:%" PRId64
279
               " end:%" PRId64 " last-index:%" PRId64 " last-term:%" PRId64 " last-cfg:%" PRId64
280
               ", seq:%d, ack:%d, "
281
               " buf:[%" PRId64 " %" PRId64 ", %" PRId64
282
               "], finish:%d, as:%d, to-dnode:%d}"
283
               ", term:%" PRIu64 ", commit-index:%" PRId64 ", firstver:%" PRId64 ", lastver:%" PRId64
284
               ", min-match:%" PRId64 ", snap:{last-index:%" PRId64 ", term:%" PRIu64
285
               "}, standby:%d, batch-sz:%d, replicas:%d, last-cfg:%" PRId64
286
               ", chging:%d, restore:%d, quorum:%d, peer:%s, cfg:%s",
287
               pNode->vgId, eventLog, syncStr(pNode->state), pSender, pSender->term, pSender->startTime,
288
               pSender->snapshotParam.start, pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex,
289
               pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack,
290
               pSender->pSndBuf->start, pSender->pSndBuf->cursor, pSender->pSndBuf->end, pSender->finish,
17,280✔
291
               pSender->replicaIndex, DID(&pNode->replicasId[pSender->replicaIndex]), raftStoreGetTerm(pNode),
17,280✔
292
               pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex,
293
               snapshot.lastApplyTerm, pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize, pNode->replicaNum,
17,278✔
294
               pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, pNode->quorum, peerStr, cfgStr);
17,278✔
295
}
296

297
void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver,
1,831✔
298
                                  const char* format, ...) {
299
  SSyncNode* pNode = pReceiver->pSyncNode;
1,831✔
300
  if (pNode == NULL || pNode->pLogStore == NULL) return;
1,831!
301

302
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
1,831✔
303
  if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) {
1,831!
304
    (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
1,831✔
305
  }
306

307
  SyncIndex logLastIndex = SYNC_INDEX_INVALID;
1,831✔
308
  SyncIndex logBeginIndex = SYNC_INDEX_INVALID;
1,831✔
309
  if (pNode->pLogStore != NULL) {
1,831!
310
    logLastIndex = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
1,831✔
311
    logBeginIndex = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
1,831✔
312
  }
313

314
  char cfgStr[1024] = "";
1,831✔
315
  syncCfg2SimpleStr(&pNode->raftCfg.cfg, cfgStr, sizeof(cfgStr));
1,831✔
316

317
  char peerStr[1024] = "";
1,831✔
318
  syncPeerState2Str(pNode, peerStr, sizeof(peerStr));
1,831✔
319

320
  char    eventLog[512];  // {0};
321
  va_list argpointer;
322
  va_start(argpointer, format);
1,831✔
323
  int32_t writeLen = vsnprintf(eventLog, sizeof(eventLog), format, argpointer);
1,831✔
324
  va_end(argpointer);
1,831✔
325

326
  taosPrintLog(
1,831✔
327
      flags, level, dflag,
328
      "vgId:%d, %s, sync:%s,"
329
      " snap-receiver:%p signature:(%" PRId64 ", %" PRId64 "), {start:%d ack:%d buf:[%" PRId64 " %" PRId64 ", %" PRId64
330
      ")"
331
      " from-dnode:%d, start:%" PRId64 " end:%" PRId64 " last-index:%" PRId64 " last-term:%" PRIu64 " last-cfg:%" PRId64
332
      "}"
333
      ", term:%" PRIu64 ", commit-index:%" PRId64 ", firstver:%" PRId64 ", lastver:%" PRId64 ", min-match:%" PRId64
334
      ", snap:{last-index:%" PRId64 ", last-term:%" PRIu64 "}, standby:%d, batch-sz:%d, replicas:%d, last-cfg:%" PRId64
335
      ", chging:%d, restore:%d, quorum:%d, peer:%s, cfg:%s",
336
      pNode->vgId, eventLog, syncStr(pNode->state), pReceiver, pReceiver->term, pReceiver->startTime, pReceiver->start,
1,831✔
337
      pReceiver->ack, pReceiver->pRcvBuf->start, pReceiver->pRcvBuf->cursor, pReceiver->pRcvBuf->end,
1,831✔
338
      DID(&pReceiver->fromId), pReceiver->snapshotParam.start, pReceiver->snapshotParam.end,
1,831✔
339
      pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm, pReceiver->snapshot.lastConfigIndex,
340
      raftStoreGetTerm(pNode), pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex,
341
      snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize,
1,831✔
342
      pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, pNode->quorum, peerStr,
1,831✔
343
      cfgStr);
344
}
345

346
void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) {
232,811✔
347
  if (!(sDebugFlag & DEBUG_TRACE)) return;
232,811✔
348

349
  int64_t tsNow = taosGetTimestampMs();
4,656✔
350
  int64_t timeDIff = tsNow - pMsg->timeStamp;
4,656✔
351
  sNTrace(
4,656!
352
      pSyncNode, "recv sync-timer {type:%s, lc:%" PRId64 ", ms:%d, ts:%" PRId64 ", elapsed:%" PRId64 ", data:%p}, %s",
353
      syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->timeStamp, timeDIff, pMsg->data, s);
354
}
355

356
void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s) {
43,618✔
357
  sNTrace(pSyncNode, "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRId64 ", fc-index:%" PRId64 "}, %s", pMsg->cmd,
43,618!
358
          syncLocalCmdGetStr(pMsg->cmd), pMsg->currentTerm, pMsg->commitIndex, s);
359
}
43,618✔
360

361
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
×
362
  sNTrace(pSyncNode,
×
363
          "send sync-append-entries-reply to dnode:%d, {term:%" PRId64 ", pterm:%" PRId64
364
          ", success:%d, lsend-index:%" PRId64 ", match:%" PRId64 "}, %s",
365
          DID(&pMsg->destId), pMsg->term, pMsg->lastMatchTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s);
366
}
×
367

368
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
×
369
  sNTrace(pSyncNode,
×
370
          "recv sync-append-entries-reply from dnode:%d {term:%" PRId64 ", pterm:%" PRId64
371
          ", success:%d, lsend-index:%" PRId64 ", match:%" PRId64 "}, %s",
372
          DID(&pMsg->srcId), pMsg->term, pMsg->lastMatchTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s);
373
}
×
374

375
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool printX, int64_t timerElapsed,
47,073✔
376
                          int64_t execTime) {
377
  if (printX) {
47,073✔
378
    sNTrace(pSyncNode,
2,154!
379
            "send sync-heartbeat to dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
380
            ", ts:%" PRId64 "}, x",
381
            DID(&pMsg->destId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp);
382
  } else {
383
    sNTrace(pSyncNode,
44,919!
384
            "send sync-heartbeat to dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
385
            ", ts:%" PRId64 "}, timer-elapsed:%" PRId64 ", next-exec:%" PRId64,
386
            DID(&pMsg->destId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, timerElapsed,
387
            execTime);
388
  }
389
}
47,073✔
390

391
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64_t timeDiff, const char* s) {
43,799✔
392
  if (timeDiff > SYNC_HEARTBEAT_SLOW_MS) {
43,799!
UNCOV
393
    pSyncNode->hbSlowNum++;
×
394

UNCOV
395
    sNTrace(pSyncNode,
×
396
            "recv sync-heartbeat from dnode:%d slow {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
397
            ", ts:%" PRId64 "}, QID:%s, net elapsed:%" PRId64,
398
            DID(&pMsg->srcId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s, timeDiff);
399
  }
400

401
  sNTrace(pSyncNode,
43,799!
402
          "recv sync-heartbeat from dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
403
          ", ts:%" PRId64 "}, QID:%s, net elapsed:%" PRId64,
404
          DID(&pMsg->srcId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s, timeDiff);
405
}
43,799✔
406

407
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
×
408
  sNTrace(pSyncNode, "send sync-heartbeat-reply from dnode:%d {term:%" PRId64 ", ts:%" PRId64 "}, %s",
×
409
          DID(&pMsg->destId), pMsg->term, pMsg->timeStamp, s);
410
}
×
411

412
void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, int64_t timeDiff, const char* s) {
43,461✔
413
  if (timeDiff > SYNC_HEARTBEAT_REPLY_SLOW_MS) {
43,461✔
414
    pSyncNode->hbrSlowNum++;
505✔
415

416
    sNTrace(pSyncNode,
505!
417
            "recv sync-heartbeat-reply from dnode:%d slow {term:%" PRId64 ", ts:%" PRId64 "}, %s, net elapsed:%" PRId64,
418
            DID(&pMsg->srcId), pMsg->term, pMsg->timeStamp, s, timeDiff);
419
  }
420

421
  sNTrace(pSyncNode,
43,461!
422
          "recv sync-heartbeat-reply from dnode:%d {term:%" PRId64 ", ts:%" PRId64 "}, %s, net elapsed:%" PRId64,
423
          DID(&pMsg->srcId), pMsg->term, pMsg->timeStamp, s, timeDiff);
424
}
43,461✔
425

426
void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {
×
427
  sNDebug(pSyncNode,
×
428
          "send sync-snapshot-send to dnode:%d, %s, seq:%d, term:%" PRId64 ", begin-index:%" PRId64
429
          ", last-index:%" PRId64 ", last-term:%" PRId64 ", start-time:%" PRId64,
430
          DID(&pMsg->destId), s, pMsg->seq, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm,
431
          pMsg->startTime);
432
}
×
433

434
void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {
×
435
  sNDebug(pSyncNode,
×
436
          "recv sync-snapshot-send from dnode:%d, %s, seq:%d, term:%" PRId64 ", begin-index:%" PRId64
437
          ", last-index:%" PRId64 ", last-term:%" PRId64 ", start-time:%" PRId64 ", data-len:%u",
438
          DID(&pMsg->srcId), s, pMsg->seq, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm,
439
          pMsg->startTime, pMsg->dataLen);
440
}
×
441

442
void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {
104✔
443
  sNDebug(pSyncNode,
104!
444
          "send sync-snapshot-rsp to dnode:%d, %s, acked:%d, term:%" PRId64 ", begin-index:%" PRId64
445
          ", last-index:%" PRId64 ", last-term:%" PRId64 ", start-time:%" PRId64,
446
          DID(&pMsg->destId), s, pMsg->ack, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm,
447
          pMsg->startTime);
448
}
104✔
449

450
void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {
×
451
  sNDebug(pSyncNode,
×
452
          "recv sync-snapshot-rsp from dnode:%d, %s, ack:%d, term:%" PRId64 ", begin-index:%" PRId64
453
          ", last-index:%" PRId64 ", last-term:%" PRId64 ", start-time:%" PRId64,
454
          DID(&pMsg->srcId), s, pMsg->ack, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm,
455
          pMsg->startTime);
456
}
×
457

458
void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
1✔
459
  sNTrace(pSyncNode,
1!
460
          "recv sync-append-entries from dnode:%d {term:%" PRId64 ", prev-log:{index:%" PRId64 ", term:%" PRId64
461
          "}, commit-index:%" PRId64 ", datalen:%d}, %s",
462
          DID(&pMsg->srcId), pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->dataLen, s);
463
}
1✔
464

465
void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
×
466
  sNTrace(pSyncNode,
×
467
          "send sync-append-entries to dnode:%d, {term:%" PRId64 ", prev-log:{index:%" PRId64 ", term:%" PRId64
468
          "}, index:%" PRId64 ", commit-index:%" PRId64 ", datalen:%d}, %s",
469
          DID(&pMsg->destId), pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, (pMsg->prevLogIndex + 1),
470
          pMsg->commitIndex, pMsg->dataLen, s);
471
}
×
472

473
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, int32_t voteGranted, const char* errmsg,
4,418✔
474
                            const char* opt) {
475
  char statusMsg[64];
476
  snprintf(statusMsg, sizeof(statusMsg), "granted:%d", voteGranted);
4,418✔
477
  sNInfo(pSyncNode,
4,418!
478
         "%s sync-request-vote from dnode:%d, {term:%" PRId64 ", last-index:%" PRId64 ", last-term:%" PRId64 "}, %s",
479
         opt, DID(&pMsg->srcId), pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm,
480
         (voteGranted != -1) ? statusMsg : errmsg);
481
}
4,418✔
482

483
void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const char* s) {
×
484
  sNInfo(pNode,
×
485
         "send sync-request-vote to dnode:%d {term:%" PRId64 ", last-index:%" PRId64 ", last-term:%" PRId64 "}, %s",
486
         DID(&pMsg->destId), pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
487
}
×
488

489
void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
2,125✔
490
  sNInfo(pSyncNode, "recv sync-request-vote-reply from dnode:%d {term:%" PRId64 ", grant:%d}, %s", DID(&pMsg->srcId),
2,125!
491
         pMsg->term, pMsg->voteGranted, s);
492
}
2,125✔
493

494
void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
2,208✔
495
  sNInfo(pSyncNode, "send sync-request-vote-reply to dnode:%d {term:%" PRId64 ", grant:%d}, %s", DID(&pMsg->destId),
2,208!
496
         pMsg->term, pMsg->voteGranted, s);
497
}
2,208✔
498

499
int32_t syncSnapInfoDataRealloc(SSnapshot* pSnap, int32_t size) {
209✔
500
  void* data = taosMemoryRealloc(pSnap->data, size);
209✔
501
  if (data == NULL) {
209!
502
    return terrno;
×
503
  }
504
  pSnap->data = data;
209✔
505
  return 0;
209✔
506
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc