• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3534

21 Nov 2024 07:36AM UTC coverage: 60.825% (+2.0%) from 58.848%
#3534

push

travis-ci

web-flow
Merge pull request #28810 from taosdata/ehn/add-sync-heartbeat-sent-time-to-log

ehn:add-sync-heartbeat-sent-time-to-log

120023 of 252376 branches covered (47.56%)

Branch coverage included in aggregate %.

43 of 47 new or added lines in 3 files covered. (91.49%)

2254 existing lines in 162 files now uncovered.

200876 of 275203 relevant lines covered (72.99%)

16110754.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.31
/source/libs/sync/src/syncUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "syncUtil.h"
18
#include "syncIndexMgr.h"
19
#include "syncMessage.h"
20
#include "syncPipeline.h"
21
#include "syncRaftCfg.h"
22
#include "syncRaftStore.h"
23
#include "syncSnapshot.h"
24
#include "tglobal.h"
25
#include "ttime.h"
26

27
static void syncCfg2SimpleStr(const SSyncCfg* pCfg, char* buf, int32_t bufLen) {
246,884✔
28
  int32_t len = tsnprintf(buf, bufLen, "{num:%d, as:%d, [", pCfg->replicaNum, pCfg->myIndex);
246,884✔
29
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
541,736✔
30
    len += tsnprintf(buf + len, bufLen - len, "%s:%d", pCfg->nodeInfo[i].nodeFqdn, pCfg->nodeInfo[i].nodePort);
294,837✔
31
    if (i < pCfg->replicaNum - 1) {
294,846✔
32
      len += tsnprintf(buf + len, bufLen - len, "%s", ", ");
47,951✔
33
    }
34
  }
35
  len += tsnprintf(buf + len, bufLen - len, "%s", "]}");
246,899✔
36
}
246,897✔
37

38
void syncUtilNodeInfo2EpSet(const SNodeInfo* pInfo, SEpSet* pEpSet) {
7,318✔
39
  pEpSet->inUse = 0;
7,318✔
40
  pEpSet->numOfEps = 1;
7,318✔
41
  pEpSet->eps[0].port = pInfo->nodePort;
7,318✔
42
  tstrncpy(pEpSet->eps[0].fqdn, pInfo->nodeFqdn, TSDB_FQDN_LEN);
7,318✔
43
}
7,318✔
44

45
bool syncUtilNodeInfo2RaftId(const SNodeInfo* pInfo, SyncGroupId vgId, SRaftId* raftId) {
23,477✔
46
  uint32_t ipv4 = 0xFFFFFFFF;
23,477✔
47
  sDebug("vgId:%d, resolve sync addr from fqdn, ep:%s:%u", vgId, pInfo->nodeFqdn, pInfo->nodePort);
23,477✔
48
  for (int32_t i = 0; i < tsResolveFQDNRetryTime; i++) {
23,486!
49
    int32_t code = taosGetIpv4FromFqdn(pInfo->nodeFqdn, &ipv4);
23,486✔
50
    if (code) {
23,485!
51
      sError("vgId:%d, failed to resolve sync addr, dnode:%d fqdn:%s, retry", vgId, pInfo->nodeId, pInfo->nodeFqdn);
×
52
      taosSsleep(1);
×
53
    } else {
54
      break;
23,485✔
55
    }
56
  }
57

58
  if (ipv4 == 0xFFFFFFFF || ipv4 == 1) {
23,485!
59
    sError("vgId:%d, failed to resolve sync addr, dnode:%d fqdn:%s", vgId, pInfo->nodeId, pInfo->nodeFqdn);
×
60
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
61
    return false;
×
62
  }
63

64
  char ipbuf[TD_IP_LEN] = {0};
23,486✔
65
  tinet_ntoa(ipbuf, ipv4);
23,486✔
66
  raftId->addr = SYNC_ADDR(pInfo);
23,486✔
67
  raftId->vgId = vgId;
23,486✔
68

69
  sInfo("vgId:%d, sync addr:%" PRIu64 " is resolved, ep:%s:%u ip:%s ipv4:%u dnode:%d cluster:%" PRId64, vgId,
23,486!
70
        raftId->addr, pInfo->nodeFqdn, pInfo->nodePort, ipbuf, ipv4, pInfo->nodeId, pInfo->clusterId);
71
  return true;
23,487✔
72
}
73

74
bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2) {
50,328,339✔
75
  if (pId1->addr == pId2->addr && pId1->vgId == pId2->vgId) {
50,328,339!
76
    return true;
31,746,284✔
77
  }
78

79
  if ((CID(pId1) == 0 || CID(pId2) == 0) && (DID(pId1) == DID(pId2)) && pId1->vgId == pId2->vgId) {
18,582,055!
80
    return true;
184✔
81
  }
82

83
  return false;
18,581,871✔
84
}
85

86
bool syncUtilEmptyId(const SRaftId* pId) { return (pId->addr == 0 && pId->vgId == 0); }
3,196!
87

88
static inline int32_t syncUtilRand(int32_t max) { return taosRand() % max; }
603,877✔
89

90
int32_t syncUtilElectRandomMS(int32_t min, int32_t max) {
603,878✔
91
  int32_t rdm = min + syncUtilRand(max - min);
603,878✔
92

93
  // sDebug("random min:%d, max:%d, rdm:%d", min, max, rdm);
94
  return rdm;
603,880✔
95
}
96

97
int32_t syncUtilQuorum(int32_t replicaNum) { return replicaNum / 2 + 1; }
15,441✔
98

99
void syncUtilMsgHtoN(void* msg) {
5,930,427✔
100
  SMsgHead* pHead = msg;
5,930,427✔
101
  pHead->contLen = htonl(pHead->contLen);
5,930,427✔
102
  pHead->vgId = htonl(pHead->vgId);
5,930,427✔
103
}
5,930,427✔
104

105
void syncUtilGenerateArbToken(int32_t nodeId, int32_t groupId, char* buf) {
15,163✔
106
  (void)memset(buf, 0, TSDB_ARB_TOKEN_SIZE);
15,163✔
107
  int32_t randVal = taosSafeRand() % 1000;
15,163✔
108
  int64_t currentMs = taosGetTimestampMs();
15,163✔
109
  (void)snprintf(buf, TSDB_ARB_TOKEN_SIZE, "d%d#g%d#%" PRId64 "#%d", nodeId, groupId, currentMs, randVal);
15,163✔
110
}
15,163✔
111

112
static void syncPrintTime(bool formatTime, int32_t* len, int64_t tsMs, int32_t i, char* buf, int32_t bufLen) {
823,419✔
113
  if (formatTime) {
823,419✔
114
    char pBuf[TD_TIME_STR_LEN] = {0};
319,306✔
115
    if (tsMs > 0) {
319,306✔
116
      if (taosFormatUtcTime(pBuf, TD_TIME_STR_LEN, tsMs, TSDB_TIME_PRECISION_MILLI) != 0) {
218,929!
NEW
117
        pBuf[0] = '\0';
×
118
      }
119
    }
120
    (*len) += tsnprintf(buf + (*len), bufLen - (*len), "%d:%s", i, pBuf);
319,310✔
121
  } else {
122
    (*len) += tsnprintf(buf + (*len), bufLen - (*len), "%d:%" PRId64, i, tsMs);
504,113✔
123
  }
124
}
823,428✔
125

126
// for leader
127
static void syncHearbeatReplyTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen, bool formatTime) {
227,867✔
128
  int32_t len = 0;
227,867✔
129
  len += tsnprintf(buf + len, bufLen - len, "%s", "{");
227,867✔
130
  for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
502,367✔
131
    int64_t tsMs = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->replicasId[i]));
274,487✔
132
    syncPrintTime(formatTime, &len, tsMs, i, buf, bufLen);
274,493✔
133
    if (i < pSyncNode->replicaNum - 1) {
274,489✔
134
      len += tsnprintf(buf + len, bufLen - len, "%s", ",");
46,611✔
135
    }
136
  }
137
  len += tsnprintf(buf + len, bufLen - len, "%s", "}");
227,880✔
138
}
227,884✔
139

140
static void syncSentHearbeatTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen, bool formatTime) {
227,879✔
141
  int32_t len = 0;
227,879✔
142
  len += tsnprintf(buf + len, bufLen - len, "%s", "{");
227,879✔
143
  for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
502,361✔
144
    int64_t tsMs = syncIndexMgrGetSentTime(pSyncNode->pMatchIndex, &(pSyncNode->replicasId[i]));
274,490✔
145
    syncPrintTime(formatTime, &len, tsMs, i, buf, bufLen);
274,480✔
146
    if (i < pSyncNode->replicaNum - 1) {
274,483✔
147
      len += tsnprintf(buf + len, bufLen - len, "%s", ",");
46,612✔
148
    }
149
  }
150
  len += tsnprintf(buf + len, bufLen - len, "%s", "}");
227,871✔
151
}
227,879✔
152

153
// for follower
154
static void syncHearbeatTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen, bool formatTime) {
227,870✔
155
  int32_t len = 0;
227,870✔
156
  len += tsnprintf(buf + len, bufLen - len, "%s", "{");
227,870✔
157
  for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
502,370✔
158
    int64_t tsMs = syncIndexMgrGetRecvTime(pSyncNode->pNextIndex, &(pSyncNode->replicasId[i]));
274,492✔
159
    syncPrintTime(formatTime, &len, tsMs, i, buf, bufLen);
274,495✔
160
    if (i < pSyncNode->replicaNum - 1) {
274,490✔
161
      len += tsnprintf(buf + len, bufLen - len, "%s", ",");
46,613✔
162
    }
163
  }
164
  len += tsnprintf(buf + len, bufLen - len, "%s", "}");
227,878✔
165
}
227,881✔
166

167
static void syncLogBufferStates2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
227,871✔
168
  SSyncLogBuffer* pBuf = pSyncNode->pLogBuf;
227,871✔
169
  if (pBuf == NULL) {
227,871!
170
    return;
×
171
  }
172
  int32_t len = 0;
227,871✔
173
  len += tsnprintf(buf + len, bufLen - len, "[%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pBuf->startIndex,
227,871✔
174
                  pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
175
}
176

177
static void syncLogReplStates2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
227,878✔
178
  int32_t len = 0;
227,878✔
179
  len += tsnprintf(buf + len, bufLen - len, "%s", "{");
227,878✔
180
  for (int32_t i = 0; i < pSyncNode->replicaNum; i++) {
502,348✔
181
    SSyncLogReplMgr* pMgr = pSyncNode->logReplMgrs[i];
274,492✔
182
    if (pMgr == NULL) break;
274,492✔
183
    len += tsnprintf(buf + len, bufLen - len, "%d:%d [%" PRId64 ", %" PRId64 ", %" PRId64 "] ", i, pMgr->restored,
274,465✔
184
                     pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex);
185
    len += tsnprintf(buf + len, bufLen - len, "%d", pMgr->sendCount);
274,465✔
186
    if (i + 1 < pSyncNode->replicaNum) {
274,467✔
187
      len += tsnprintf(buf + len, bufLen - len, "%s", ", ");
46,612✔
188
    }
189
  }
190
  len += tsnprintf(buf + len, bufLen - len, "%s", "}");
227,883✔
191
}
227,880✔
192

193
static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
19,012✔
194
  int32_t len = 0;
19,012✔
195
  len += tsnprintf(buf + len, bufLen - len, "%s", "{");
19,012✔
196
  for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
39,240✔
197
    SPeerState* pState = syncNodeGetPeerState(pSyncNode, &(pSyncNode->replicasId[i]));
20,228✔
198
    if (pState == NULL) break;
20,229!
199
    len += tsnprintf(buf + len, bufLen - len, "%d:%" PRId64 " %" PRId64 "%s", i, pState->lastSendIndex,
20,229✔
200
                    pState->lastSendTime, (i < pSyncNode->replicaNum - 1) ? ", " : "");
20,229✔
201
  }
202
  len += tsnprintf(buf + len, bufLen - len, "%s", "}");
19,012✔
203
}
19,012✔
204

205
void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, bool formatTime, SSyncNode* pNode,
227,880✔
206
                      const char* format, ...) {
207
  if (pNode == NULL || pNode->pLogStore == NULL) return;
227,880!
208
  int64_t currentTerm = raftStoreGetTerm(pNode);
227,882✔
209

210
  // save error code, otherwise it will be overwritten
211
  int32_t errCode = terrno;
227,882✔
212

213
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
227,880✔
214
  if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) {
227,880!
215
    (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
227,882✔
216
  }
217

218
  SyncIndex logLastIndex = SYNC_INDEX_INVALID;
227,875✔
219
  SyncIndex logBeginIndex = SYNC_INDEX_INVALID;
227,875✔
220
  if (pNode->pLogStore != NULL) {
227,875!
221
    logLastIndex = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
227,877✔
222
    logBeginIndex = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
227,868✔
223
  }
224

225
  int32_t cacheHit = pNode->pLogStore->cacheHit;
227,866✔
226
  int32_t cacheMiss = pNode->pLogStore->cacheMiss;
227,866✔
227

228
  char cfgStr[1024] = "";
227,866✔
229
  syncCfg2SimpleStr(&pNode->raftCfg.cfg, cfgStr, sizeof(cfgStr));
227,866✔
230

231
  char replMgrStatesStr[1024] = "";
227,882✔
232
  syncLogReplStates2Str(pNode, replMgrStatesStr, sizeof(replMgrStatesStr));
227,882✔
233

234
  char bufferStatesStr[256] = "";
227,878✔
235
  syncLogBufferStates2Str(pNode, bufferStatesStr, sizeof(bufferStatesStr));
227,878✔
236

237
  char hbrTimeStr[256] = "";
227,877✔
238
  syncHearbeatReplyTime2Str(pNode, hbrTimeStr, sizeof(hbrTimeStr), formatTime);
227,877✔
239

240
  char hbTimeStr[256] = "";
227,883✔
241
  syncHearbeatTime2Str(pNode, hbTimeStr, sizeof(hbTimeStr), formatTime);
227,883✔
242

243
  char sentHbTimeStr[512] = "";
227,880✔
244
  syncSentHearbeatTime2Str(pNode, sentHbTimeStr, sizeof(sentHbTimeStr), formatTime);
227,880✔
245

246
  char    eventLog[512];  // {0};
247
  va_list argpointer;
248
  va_start(argpointer, format);
227,881✔
249
  int32_t writeLen = vsnprintf(eventLog, sizeof(eventLog), format, argpointer);
227,881✔
250
  va_end(argpointer);
227,881✔
251

252
  int32_t aqItems = 0;
227,881✔
253
  if (pNode != NULL && pNode->pFsm != NULL && pNode->pFsm->FpApplyQueueItems != NULL) {
227,881!
254
    aqItems = pNode->pFsm->FpApplyQueueItems(pNode->pFsm);
227,877✔
255
  }
256

257
  // restore error code
258
  terrno = errCode;
227,882✔
259
  SyncIndex appliedIndex = pNode->pFsm->FpAppliedIndexCb(pNode->pFsm);
227,877✔
260

261
  if (pNode != NULL) {
227,872!
262
    taosPrintLog(
227,862✔
263
        flags, level, dflag,
264
        "vgId:%d, %s, sync:%s, term:%" PRIu64 ", commit-index:%" PRId64 ", assigned-index:%" PRId64
265
        ", applied-index:%" PRId64 ", first-ver:%" PRId64 ", last-ver:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64
266
        ", snap-term:%" PRIu64
267
        ", elect-times:%d, as-leader-times:%d, as-assigned-leader-times:%d, cfg-ch-times:%d, hb-slow:%d, hbr-slow:%d, "
268
        "aq-items:%d, snaping:%" PRId64 ", replicas:%d, last-cfg:%" PRId64
269
        ", chging:%d, restore:%d, quorum:%d, elect-lc-timer:%" PRId64 ", hb:%" PRId64
270
        ", buffer:%s, repl-mgrs:%s, members:%s, send hb:%s, recv hb:%s, recv hb-reply:%s, arb-token:%s, msg[sent:%d, recv:%d, slow-recev:%d]",
271
        pNode->vgId, eventLog, syncStr(pNode->state), currentTerm, pNode->commitIndex, pNode->assignedCommitIndex,
272
        appliedIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex,
273
        snapshot.lastApplyTerm, pNode->electNum, pNode->becomeLeaderNum, pNode->becomeAssignedLeaderNum,
274
        pNode->configChangeNum, pNode->hbSlowNum, pNode->hbrSlowNum, aqItems, pNode->snapshottingIndex,
275
        pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish,
227,856✔
276
        syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, bufferStatesStr,
277
        replMgrStatesStr, cfgStr, sentHbTimeStr, hbTimeStr, hbrTimeStr, pNode->arbToken, pNode->sendCount, pNode->recvCount,
227,874✔
278
        pNode->slowCount);
279
  }
280
}
281

282
void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotSender* pSender,
17,181✔
283
                                const char* format, ...) {
284
  SSyncNode* pNode = pSender->pSyncNode;
17,181✔
285
  if (pNode == NULL || pNode->pLogStore == NULL) return;
17,181!
286

287
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
17,181✔
288
  if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) {
17,181!
289
    (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
17,181✔
290
  }
291

292
  SyncIndex logLastIndex = SYNC_INDEX_INVALID;
17,181✔
293
  SyncIndex logBeginIndex = SYNC_INDEX_INVALID;
17,181✔
294
  if (pNode->pLogStore != NULL) {
17,181!
295
    logLastIndex = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
17,181✔
296
    logBeginIndex = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
17,181✔
297
  }
298

299
  char cfgStr[1024] = "";
17,181✔
300
  syncCfg2SimpleStr(&pNode->raftCfg.cfg, cfgStr, sizeof(cfgStr));
17,181✔
301

302
  char peerStr[1024] = "";
17,182✔
303
  syncPeerState2Str(pNode, peerStr, sizeof(peerStr));
17,182✔
304

305
  char    eventLog[512];  // {0};
306
  va_list argpointer;
307
  va_start(argpointer, format);
17,181✔
308
  int32_t writeLen = vsnprintf(eventLog, sizeof(eventLog), format, argpointer);
17,181✔
309
  va_end(argpointer);
17,181✔
310

311
  taosPrintLog(flags, level, dflag,
17,180✔
312
               "vgId:%d, %s, sync:%s, snap-sender:%p signature:(%" PRId64 ", %" PRId64 "), {start:%" PRId64
313
               " end:%" PRId64 " last-index:%" PRId64 " last-term:%" PRId64 " last-cfg:%" PRId64
314
               ", seq:%d, ack:%d, "
315
               " buf:[%" PRId64 " %" PRId64 ", %" PRId64
316
               "], finish:%d, as:%d, to-dnode:%d}"
317
               ", term:%" PRIu64 ", commit-index:%" PRId64 ", firstver:%" PRId64 ", lastver:%" PRId64
318
               ", min-match:%" PRId64 ", snap:{last-index:%" PRId64 ", term:%" PRIu64
319
               "}, standby:%d, batch-sz:%d, replicas:%d, last-cfg:%" PRId64
320
               ", chging:%d, restore:%d, quorum:%d, peer:%s, cfg:%s",
321
               pNode->vgId, eventLog, syncStr(pNode->state), pSender, pSender->term, pSender->startTime,
322
               pSender->snapshotParam.start, pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex,
323
               pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack,
324
               pSender->pSndBuf->start, pSender->pSndBuf->cursor, pSender->pSndBuf->end, pSender->finish,
17,180✔
325
               pSender->replicaIndex, DID(&pNode->replicasId[pSender->replicaIndex]), raftStoreGetTerm(pNode),
17,180✔
326
               pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex,
327
               snapshot.lastApplyTerm, pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize, pNode->replicaNum,
17,181✔
328
               pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, pNode->quorum, peerStr, cfgStr);
17,181✔
329
}
330

331
void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver,
1,831✔
332
                                  const char* format, ...) {
333
  SSyncNode* pNode = pReceiver->pSyncNode;
1,831✔
334
  if (pNode == NULL || pNode->pLogStore == NULL) return;
1,831!
335

336
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
1,831✔
337
  if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) {
1,831!
338
    (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
1,831✔
339
  }
340

341
  SyncIndex logLastIndex = SYNC_INDEX_INVALID;
1,831✔
342
  SyncIndex logBeginIndex = SYNC_INDEX_INVALID;
1,831✔
343
  if (pNode->pLogStore != NULL) {
1,831!
344
    logLastIndex = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
1,831✔
345
    logBeginIndex = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
1,831✔
346
  }
347

348
  char cfgStr[1024] = "";
1,831✔
349
  syncCfg2SimpleStr(&pNode->raftCfg.cfg, cfgStr, sizeof(cfgStr));
1,831✔
350

351
  char peerStr[1024] = "";
1,831✔
352
  syncPeerState2Str(pNode, peerStr, sizeof(peerStr));
1,831✔
353

354
  char    eventLog[512];  // {0};
355
  va_list argpointer;
356
  va_start(argpointer, format);
1,831✔
357
  int32_t writeLen = vsnprintf(eventLog, sizeof(eventLog), format, argpointer);
1,831✔
358
  va_end(argpointer);
1,831✔
359

360
  taosPrintLog(
1,831✔
361
      flags, level, dflag,
362
      "vgId:%d, %s, sync:%s,"
363
      " snap-receiver:%p signature:(%" PRId64 ", %" PRId64 "), {start:%d ack:%d buf:[%" PRId64 " %" PRId64 ", %" PRId64
364
      ")"
365
      " from-dnode:%d, start:%" PRId64 " end:%" PRId64 " last-index:%" PRId64 " last-term:%" PRIu64 " last-cfg:%" PRId64
366
      "}"
367
      ", term:%" PRIu64 ", commit-index:%" PRId64 ", firstver:%" PRId64 ", lastver:%" PRId64 ", min-match:%" PRId64
368
      ", snap:{last-index:%" PRId64 ", last-term:%" PRIu64 "}, standby:%d, batch-sz:%d, replicas:%d, last-cfg:%" PRId64
369
      ", chging:%d, restore:%d, quorum:%d, peer:%s, cfg:%s",
370
      pNode->vgId, eventLog, syncStr(pNode->state), pReceiver, pReceiver->term, pReceiver->startTime, pReceiver->start,
1,831✔
371
      pReceiver->ack, pReceiver->pRcvBuf->start, pReceiver->pRcvBuf->cursor, pReceiver->pRcvBuf->end,
1,831✔
372
      DID(&pReceiver->fromId), pReceiver->snapshotParam.start, pReceiver->snapshotParam.end,
1,831✔
373
      pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm, pReceiver->snapshot.lastConfigIndex,
374
      raftStoreGetTerm(pNode), pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex,
375
      snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize,
1,831✔
376
      pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, pNode->quorum, peerStr,
1,831✔
377
      cfgStr);
378
}
379

380
void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) {
207,625✔
381
  if (!(sDebugFlag & DEBUG_TRACE)) return;
207,625✔
382

383
  int64_t tsNow = taosGetTimestampMs();
4,636✔
384
  int64_t timeDIff = tsNow - pMsg->timeStamp;
4,636✔
385
  sNTrace(
4,636!
386
      pSyncNode, "recv sync-timer {type:%s, lc:%" PRId64 ", ms:%d, ts:%" PRId64 ", elapsed:%" PRId64 ", data:%p}, %s",
387
      syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->timeStamp, timeDIff, pMsg->data, s);
388
}
389

390
void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s) {
43,349✔
391
  sNTrace(pSyncNode, "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRId64 ", fc-index:%" PRId64 "}, %s", pMsg->cmd,
43,349!
392
          syncLocalCmdGetStr(pMsg->cmd), pMsg->currentTerm, pMsg->commitIndex, s);
393
}
43,349✔
394

395
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
×
396
  sNTrace(pSyncNode,
×
397
          "send sync-append-entries-reply to dnode:%d, {term:%" PRId64 ", pterm:%" PRId64
398
          ", success:%d, lsend-index:%" PRId64 ", match:%" PRId64 "}, %s",
399
          DID(&pMsg->destId), pMsg->term, pMsg->lastMatchTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s);
400
}
×
401

402
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
×
403
  sNTrace(pSyncNode,
×
404
          "recv sync-append-entries-reply from dnode:%d {term:%" PRId64 ", pterm:%" PRId64
405
          ", success:%d, lsend-index:%" PRId64 ", match:%" PRId64 "}, %s",
406
          DID(&pMsg->srcId), pMsg->term, pMsg->lastMatchTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s);
407
}
×
408

409
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool printX, int64_t timerElapsed,
46,549✔
410
                          int64_t execTime) {
411
  if (printX) {
46,549✔
412
    sNTrace(pSyncNode,
2,146!
413
            "send sync-heartbeat to dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
414
            ", ts:%" PRId64 "}, x",
415
            DID(&pMsg->destId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp);
416
  } else {
417
    sNTrace(pSyncNode,
44,403!
418
            "send sync-heartbeat to dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
419
            ", ts:%" PRId64 "}, timer-elapsed:%" PRId64 ", next-exec:%" PRId64,
420
            DID(&pMsg->destId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, timerElapsed,
421
            execTime);
422
  }
423
}
46,549✔
424

425
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64_t timeDiff, const char* s) {
43,452✔
426
  if (timeDiff > SYNC_HEARTBEAT_SLOW_MS) {
43,452!
427
    pSyncNode->hbSlowNum++;
×
428

429
    sNTrace(pSyncNode,
×
430
            "recv sync-heartbeat from dnode:%d slow {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
431
            ", ts:%" PRId64 "}, QID:%s, net elapsed:%" PRId64,
432
            DID(&pMsg->srcId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s, timeDiff);
433
  }
434

435
  sNTrace(pSyncNode,
43,452!
436
          "recv sync-heartbeat from dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
437
          ", ts:%" PRId64 "}, QID:%s, net elapsed:%" PRId64,
438
          DID(&pMsg->srcId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s, timeDiff);
439
}
43,452✔
440

441
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
×
442
  sNTrace(pSyncNode, "send sync-heartbeat-reply from dnode:%d {term:%" PRId64 ", ts:%" PRId64 "}, %s",
×
443
          DID(&pMsg->destId), pMsg->term, pMsg->timeStamp, s);
444
}
×
445

446
void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, int64_t timeDiff, const char* s) {
43,212✔
447
  if (timeDiff > SYNC_HEARTBEAT_REPLY_SLOW_MS) {
43,212✔
448
    pSyncNode->hbrSlowNum++;
537✔
449

450
    sNTrace(pSyncNode,
537!
451
            "recv sync-heartbeat-reply from dnode:%d slow {term:%" PRId64 ", ts:%" PRId64 "}, %s, net elapsed:%" PRId64,
452
            DID(&pMsg->srcId), pMsg->term, pMsg->timeStamp, s, timeDiff);
453
  }
454

455
  sNTrace(pSyncNode,
43,212!
456
          "recv sync-heartbeat-reply from dnode:%d {term:%" PRId64 ", ts:%" PRId64 "}, %s, net elapsed:%" PRId64,
457
          DID(&pMsg->srcId), pMsg->term, pMsg->timeStamp, s, timeDiff);
458
}
43,212✔
459

460
void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {
×
461
  sNDebug(pSyncNode,
×
462
          "send sync-snapshot-send to dnode:%d, %s, seq:%d, term:%" PRId64 ", begin-index:%" PRId64
463
          ", last-index:%" PRId64 ", last-term:%" PRId64 ", start-time:%" PRId64,
464
          DID(&pMsg->destId), s, pMsg->seq, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm,
465
          pMsg->startTime);
466
}
×
467

468
void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {
×
469
  sNDebug(pSyncNode,
×
470
          "recv sync-snapshot-send from dnode:%d, %s, seq:%d, term:%" PRId64 ", begin-index:%" PRId64
471
          ", last-index:%" PRId64 ", last-term:%" PRId64 ", start-time:%" PRId64 ", data-len:%u",
472
          DID(&pMsg->srcId), s, pMsg->seq, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm,
473
          pMsg->startTime, pMsg->dataLen);
474
}
×
475

476
void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {
105✔
477
  sNDebug(pSyncNode,
105!
478
          "send sync-snapshot-rsp to dnode:%d, %s, acked:%d, term:%" PRId64 ", begin-index:%" PRId64
479
          ", last-index:%" PRId64 ", last-term:%" PRId64 ", start-time:%" PRId64,
480
          DID(&pMsg->destId), s, pMsg->ack, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm,
481
          pMsg->startTime);
482
}
105✔
483

484
void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {
×
485
  sNDebug(pSyncNode,
×
486
          "recv sync-snapshot-rsp from dnode:%d, %s, ack:%d, term:%" PRId64 ", begin-index:%" PRId64
487
          ", last-index:%" PRId64 ", last-term:%" PRId64 ", start-time:%" PRId64,
488
          DID(&pMsg->srcId), s, pMsg->ack, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm,
489
          pMsg->startTime);
490
}
×
491

UNCOV
492
void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
×
UNCOV
493
  sNTrace(pSyncNode,
×
494
          "recv sync-append-entries from dnode:%d {term:%" PRId64 ", prev-log:{index:%" PRId64 ", term:%" PRId64
495
          "}, commit-index:%" PRId64 ", datalen:%d}, %s",
496
          DID(&pMsg->srcId), pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->dataLen, s);
UNCOV
497
}
×
498

499
void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
×
500
  sNTrace(pSyncNode,
×
501
          "send sync-append-entries to dnode:%d, {term:%" PRId64 ", prev-log:{index:%" PRId64 ", term:%" PRId64
502
          "}, index:%" PRId64 ", commit-index:%" PRId64 ", datalen:%d}, %s",
503
          DID(&pMsg->destId), pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, (pMsg->prevLogIndex + 1),
504
          pMsg->commitIndex, pMsg->dataLen, s);
505
}
×
506

507
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, int32_t voteGranted, const char* errmsg,
4,326✔
508
                            const char* opt) {
509
  char statusMsg[64];
510
  snprintf(statusMsg, sizeof(statusMsg), "granted:%d", voteGranted);
4,326✔
511
  sNInfo(pSyncNode,
4,326!
512
         "%s sync-request-vote from dnode:%d, {term:%" PRId64 ", last-index:%" PRId64 ", last-term:%" PRId64 "}, %s",
513
         opt, DID(&pMsg->srcId), pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm,
514
         (voteGranted != -1) ? statusMsg : errmsg);
515
}
4,326✔
516

517
void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const char* s) {
×
518
  sNInfo(pNode,
×
519
         "send sync-request-vote to dnode:%d {term:%" PRId64 ", last-index:%" PRId64 ", last-term:%" PRId64 "}, %s",
520
         DID(&pMsg->destId), pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
521
}
×
522

523
void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
2,121✔
524
  sNInfo(pSyncNode, "recv sync-request-vote-reply from dnode:%d {term:%" PRId64 ", grant:%d}, %s", DID(&pMsg->srcId),
2,121!
525
         pMsg->term, pMsg->voteGranted, s);
526
}
2,121✔
527

528
void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
2,163✔
529
  sNInfo(pSyncNode, "send sync-request-vote-reply to dnode:%d {term:%" PRId64 ", grant:%d}, %s", DID(&pMsg->destId),
2,163!
530
         pMsg->term, pMsg->voteGranted, s);
531
}
2,163✔
532

533
int32_t syncSnapInfoDataRealloc(SSnapshot* pSnap, int32_t size) {
211✔
534
  void* data = taosMemoryRealloc(pSnap->data, size);
211✔
535
  if (data == NULL) {
211!
536
    return terrno;
×
537
  }
538
  pSnap->data = data;
211✔
539
  return 0;
211✔
540
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc