• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3584

17 Jan 2025 07:28AM UTC coverage: 63.756% (-0.1%) from 63.876%
#3584

push

travis-ci

web-flow
Merge pull request #29594 from taosdata/fix/insert-when-2-replicas

fix/insert-when-2-replicas

141233 of 284535 branches covered (49.64%)

Branch coverage included in aggregate %.

0 of 2 new or added lines in 1 file covered. (0.0%)

684 existing lines in 111 files now uncovered.

219774 of 281695 relevant lines covered (78.02%)

18696822.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.43
/source/libs/sync/src/syncUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "syncUtil.h"
18
#include "syncIndexMgr.h"
19
#include "syncMessage.h"
20
#include "syncPipeline.h"
21
#include "syncRaftCfg.h"
22
#include "syncRaftStore.h"
23
#include "syncSnapshot.h"
24
#include "tglobal.h"
25
#include "ttime.h"
26

27
#define FQDNRETRYTIMES 100
28

29
static void syncCfg2SimpleStr(const SSyncCfg* pCfg, char* buf, int32_t bufLen) {
211,355✔
30
  int32_t len = tsnprintf(buf, bufLen, "{num:%d, as:%d, [", pCfg->replicaNum, pCfg->myIndex);
211,355✔
31
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
472,854✔
32
    len += tsnprintf(buf + len, bufLen - len, "%s:%d", pCfg->nodeInfo[i].nodeFqdn, pCfg->nodeInfo[i].nodePort);
261,487✔
33
    if (i < pCfg->replicaNum - 1) {
261,505✔
34
      len += tsnprintf(buf + len, bufLen - len, "%s", ", ");
50,146✔
35
    }
36
  }
37
  len += tsnprintf(buf + len, bufLen - len, "%s", "]}");
211,367✔
38
}
211,360✔
39

40
void syncUtilNodeInfo2EpSet(const SNodeInfo* pInfo, SEpSet* pEpSet) {
7,169✔
41
  pEpSet->inUse = 0;
7,169✔
42
  pEpSet->numOfEps = 1;
7,169✔
43
  pEpSet->eps[0].port = pInfo->nodePort;
7,169✔
44
  tstrncpy(pEpSet->eps[0].fqdn, pInfo->nodeFqdn, TSDB_FQDN_LEN);
7,169✔
45
}
7,169✔
46

47
bool syncUtilNodeInfo2RaftId(const SNodeInfo* pInfo, SyncGroupId vgId, SRaftId* raftId) {
22,141✔
48
  uint32_t ipv4 = 0xFFFFFFFF;
22,141✔
49
  sDebug("vgId:%d, resolve sync addr from fqdn, ep:%s:%u", vgId, pInfo->nodeFqdn, pInfo->nodePort);
22,141✔
50

51
  for (int32_t i = 0; i < FQDNRETRYTIMES; i++) {
22,147!
52
    int32_t code = taosGetIpv4FromFqdn(pInfo->nodeFqdn, &ipv4);
22,147✔
53
    if (code) {
22,146!
UNCOV
54
      sError("vgId:%d, failed to resolve sync addr, dnode:%d fqdn:%s, retry", vgId, pInfo->nodeId, pInfo->nodeFqdn);
×
UNCOV
55
      taosSsleep(1);
×
56
    } else {
57
      break;
22,146✔
58
    }
59
  }
60

61
  if (ipv4 == 0xFFFFFFFF || ipv4 == 1) {
22,146!
62
    sError("vgId:%d, failed to resolve sync addr, dnode:%d fqdn:%s", vgId, pInfo->nodeId, pInfo->nodeFqdn);
×
63
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
64
    return false;
×
65
  }
66

67
  char ipbuf[TD_IP_LEN] = {0};
22,147✔
68
  taosInetNtoa(ipbuf, ipv4);
22,147✔
69
  raftId->addr = SYNC_ADDR(pInfo);
22,146✔
70
  raftId->vgId = vgId;
22,146✔
71

72
  sInfo("vgId:%d, sync addr:%" PRIu64 " is resolved, ep:%s:%u ip:%s ipv4:%u dnode:%d cluster:%" PRId64, vgId,
22,146✔
73
        raftId->addr, pInfo->nodeFqdn, pInfo->nodePort, ipbuf, ipv4, pInfo->nodeId, pInfo->clusterId);
74
  return true;
22,147✔
75
}
76

77
bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2) {
48,085,329✔
78
  if (pId1->addr == pId2->addr && pId1->vgId == pId2->vgId) {
48,085,329!
79
    return true;
30,444,154✔
80
  }
81

82
  if ((CID(pId1) == 0 || CID(pId2) == 0) && (DID(pId1) == DID(pId2)) && pId1->vgId == pId2->vgId) {
17,641,175!
83
    return true;
196✔
84
  }
85

86
  return false;
17,640,979✔
87
}
88

89
bool syncUtilEmptyId(const SRaftId* pId) { return (pId->addr == 0 && pId->vgId == 0); }
3,090!
90

91
static inline int32_t syncUtilRand(int32_t max) { return taosRand() % max; }
512,982✔
92

93
int32_t syncUtilElectRandomMS(int32_t min, int32_t max) {
512,983✔
94
  int32_t rdm = min + syncUtilRand(max - min);
512,983✔
95

96
  // sDebug("random min:%d, max:%d, rdm:%d", min, max, rdm);
97
  return rdm;
512,983✔
98
}
99

100
int32_t syncUtilQuorum(int32_t replicaNum) { return replicaNum / 2 + 1; }
14,183✔
101

102
void syncUtilMsgHtoN(void* msg) {
5,573,831✔
103
  SMsgHead* pHead = msg;
5,573,831✔
104
  pHead->contLen = htonl(pHead->contLen);
5,573,831✔
105
  pHead->vgId = htonl(pHead->vgId);
5,573,831✔
106
}
5,573,831✔
107

108
void syncUtilGenerateArbToken(int32_t nodeId, int32_t groupId, char* buf) {
13,880✔
109
  (void)memset(buf, 0, TSDB_ARB_TOKEN_SIZE);
13,880✔
110
  int32_t randVal = taosSafeRand() % 1000;
13,880✔
111
  int64_t currentMs = taosGetTimestampMs();
13,880✔
112
  (void)snprintf(buf, TSDB_ARB_TOKEN_SIZE, "d%d#g%d#%" PRId64 "#%d", nodeId, groupId, currentMs, randVal);
13,880✔
113
}
13,880✔
114

115
static void syncPrintTime(bool formatTime, int32_t* len, int64_t tsMs, int32_t i, char* buf, int32_t bufLen) {
725,407✔
116
  if (formatTime) {
725,407✔
117
    char pBuf[TD_TIME_STR_LEN] = {0};
318,764✔
118
    if (tsMs > 0) {
318,764✔
119
      if (taosFormatUtcTime(pBuf, TD_TIME_STR_LEN, tsMs, TSDB_TIME_PRECISION_MILLI) != 0) {
218,674!
120
        pBuf[0] = '\0';
×
121
      }
122
    }
123
    (*len) += tsnprintf(buf + (*len), bufLen - (*len), "%d:%s", i, pBuf);
318,761✔
124
  } else {
125
    (*len) += tsnprintf(buf + (*len), bufLen - (*len), "%d:%" PRId64, i, tsMs);
406,643✔
126
  }
127
}
725,430✔
128

129
// for leader
130
static void syncHearbeatReplyTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen, bool formatTime) {
192,689✔
131
  int32_t len = 0;
192,689✔
132
  len += tsnprintf(buf + len, bufLen - len, "%s", "{");
192,689✔
133
  for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
434,525✔
134
    int64_t tsMs = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->replicasId[i]));
241,808✔
135
    syncPrintTime(formatTime, &len, tsMs, i, buf, bufLen);
241,813✔
136
    if (i < pSyncNode->replicaNum - 1) {
241,814✔
137
      len += tsnprintf(buf + len, bufLen - len, "%s", ",");
49,105✔
138
    }
139
  }
140
  len += tsnprintf(buf + len, bufLen - len, "%s", "}");
192,717✔
141
}
192,713✔
142

143
static void syncSentHearbeatTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen, bool formatTime) {
192,704✔
144
  int32_t len = 0;
192,704✔
145
  len += tsnprintf(buf + len, bufLen - len, "%s", "{");
192,704✔
146
  for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
434,530✔
147
    int64_t tsMs = syncIndexMgrGetSentTime(pSyncNode->pMatchIndex, &(pSyncNode->replicasId[i]));
241,811✔
148
    syncPrintTime(formatTime, &len, tsMs, i, buf, bufLen);
241,818✔
149
    if (i < pSyncNode->replicaNum - 1) {
241,818✔
150
      len += tsnprintf(buf + len, bufLen - len, "%s", ",");
49,105✔
151
    }
152
  }
153
  len += tsnprintf(buf + len, bufLen - len, "%s", "}");
192,719✔
154
}
192,716✔
155

156
// for follower
157
static void syncHearbeatTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen, bool formatTime) {
192,688✔
158
  int32_t len = 0;
192,688✔
159
  len += tsnprintf(buf + len, bufLen - len, "%s", "{");
192,688✔
160
  for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
434,530✔
161
    int64_t tsMs = syncIndexMgrGetRecvTime(pSyncNode->pNextIndex, &(pSyncNode->replicasId[i]));
241,803✔
162
    syncPrintTime(formatTime, &len, tsMs, i, buf, bufLen);
241,820✔
163
    if (i < pSyncNode->replicaNum - 1) {
241,823✔
164
      len += tsnprintf(buf + len, bufLen - len, "%s", ",");
49,106✔
165
    }
166
  }
167
  len += tsnprintf(buf + len, bufLen - len, "%s", "}");
192,727✔
168
}
192,717✔
169

170
static void syncLogBufferStates2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
192,700✔
171
  SSyncLogBuffer* pBuf = pSyncNode->pLogBuf;
192,700✔
172
  if (pBuf == NULL) {
192,700!
173
    return;
×
174
  }
175
  int32_t len = 0;
192,700✔
176
  len += tsnprintf(buf + len, bufLen - len, "[%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pBuf->startIndex,
192,700✔
177
                  pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
178
}
179

180
static void syncLogReplStates2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
192,704✔
181
  int32_t len = 0;
192,704✔
182
  len += tsnprintf(buf + len, bufLen - len, "%s", "{");
192,704✔
183
  for (int32_t i = 0; i < pSyncNode->replicaNum; i++) {
434,529✔
184
    SSyncLogReplMgr* pMgr = pSyncNode->logReplMgrs[i];
241,815✔
185
    if (pMgr == NULL) break;
241,815!
186
    len += tsnprintf(buf + len, bufLen - len, "%d:%d [%" PRId64 ", %" PRId64 ", %" PRId64 "] ", i, pMgr->restored,
241,815✔
187
                     pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex);
188
    len += tsnprintf(buf + len, bufLen - len, "%d", pMgr->sendCount);
241,819✔
189
    if (i + 1 < pSyncNode->replicaNum) {
241,819✔
190
      len += tsnprintf(buf + len, bufLen - len, "%s", ", ");
49,104✔
191
    }
192
  }
193
  len += tsnprintf(buf + len, bufLen - len, "%s", "}");
192,714✔
194
}
192,704✔
195

196
static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
18,647✔
197
  int32_t len = 0;
18,647✔
198
  len += tsnprintf(buf + len, bufLen - len, "%s", "{");
18,647✔
199
  for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
38,199✔
200
    SPeerState* pState = syncNodeGetPeerState(pSyncNode, &(pSyncNode->replicasId[i]));
19,553✔
201
    if (pState == NULL) break;
19,552!
202
    len += tsnprintf(buf + len, bufLen - len, "%d:%" PRId64 " %" PRId64 "%s", i, pState->lastSendIndex,
19,552✔
203
                    pState->lastSendTime, (i < pSyncNode->replicaNum - 1) ? ", " : "");
19,552✔
204
  }
205
  len += tsnprintf(buf + len, bufLen - len, "%s", "}");
18,646✔
206
}
18,647✔
207

208
void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, bool formatTime, SSyncNode* pNode,
192,708✔
209
                      const char* format, ...) {
210
  if (pNode == NULL || pNode->pLogStore == NULL) return;
192,708!
211
  int64_t currentTerm = raftStoreGetTerm(pNode);
192,712✔
212

213
  // save error code, otherwise it will be overwritten
214
  int32_t errCode = terrno;
192,715✔
215

216
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
192,714✔
217
  if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) {
192,714!
218
    (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
192,714✔
219
  }
220

221
  SyncIndex logLastIndex = SYNC_INDEX_INVALID;
192,713✔
222
  SyncIndex logBeginIndex = SYNC_INDEX_INVALID;
192,713✔
223
  if (pNode->pLogStore != NULL) {
192,713!
224
    logLastIndex = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
192,713✔
225
    logBeginIndex = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
192,709✔
226
  }
227

228
  int32_t cacheHit = pNode->pLogStore->cacheHit;
192,707✔
229
  int32_t cacheMiss = pNode->pLogStore->cacheMiss;
192,707✔
230

231
  char cfgStr[1024] = "";
192,707✔
232
  syncCfg2SimpleStr(&pNode->raftCfg.cfg, cfgStr, sizeof(cfgStr));
192,707✔
233

234
  char replMgrStatesStr[1024] = "";
192,714✔
235
  syncLogReplStates2Str(pNode, replMgrStatesStr, sizeof(replMgrStatesStr));
192,714✔
236

237
  char bufferStatesStr[256] = "";
192,704✔
238
  syncLogBufferStates2Str(pNode, bufferStatesStr, sizeof(bufferStatesStr));
192,704✔
239

240
  char hbrTimeStr[256] = "";
192,707✔
241
  syncHearbeatReplyTime2Str(pNode, hbrTimeStr, sizeof(hbrTimeStr), formatTime);
192,707✔
242

243
  char hbTimeStr[256] = "";
192,710✔
244
  syncHearbeatTime2Str(pNode, hbTimeStr, sizeof(hbTimeStr), formatTime);
192,710✔
245

246
  char sentHbTimeStr[512] = "";
192,717✔
247
  syncSentHearbeatTime2Str(pNode, sentHbTimeStr, sizeof(sentHbTimeStr), formatTime);
192,717✔
248

249
  char    eventLog[512];  // {0};
250
  va_list argpointer;
251
  va_start(argpointer, format);
192,716✔
252
  int32_t writeLen = vsnprintf(eventLog, sizeof(eventLog), format, argpointer);
192,716✔
253
  va_end(argpointer);
192,716✔
254

255
  int32_t aqItems = 0;
192,716✔
256
  if (pNode != NULL && pNode->pFsm != NULL && pNode->pFsm->FpApplyQueueItems != NULL) {
192,716!
257
    aqItems = pNode->pFsm->FpApplyQueueItems(pNode->pFsm);
192,708✔
258
  }
259

260
  // restore error code
261
  terrno = errCode;
192,718✔
262
  SyncIndex appliedIndex = pNode->pFsm->FpAppliedIndexCb(pNode->pFsm);
192,710✔
263

264
  if (pNode != NULL) {
192,712!
265
    taosPrintLog(
192,705✔
266
        flags, level, dflag,
267
        "vgId:%d, %s, sync:%s, term:%" PRIu64 ", commit-index:%" PRId64 ", assigned-index:%" PRId64
268
        ", applied-index:%" PRId64 ", first-ver:%" PRId64 ", last-ver:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64
269
        ", snap-term:%" PRIu64
270
        ", elect-times:%d, as-leader-times:%d, as-assigned-leader-times:%d, cfg-ch-times:%d, hb-slow:%d, hbr-slow:%d, "
271
        "aq-items:%d, snaping:%" PRId64 ", replicas:%d, last-cfg:%" PRId64
272
        ", chging:%d, restore:%d, quorum:%d, elect-lc-timer:%" PRId64 ", hb:%" PRId64
273
        ", buffer:%s, repl-mgrs:%s, members:%s, send hb:%s, recv hb:%s, recv hb-reply:%s, arb-token:%s, msg[sent:%d, recv:%d, slow-recev:%d]",
274
        pNode->vgId, eventLog, syncStr(pNode->state), currentTerm, pNode->commitIndex, pNode->assignedCommitIndex,
275
        appliedIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex,
276
        snapshot.lastApplyTerm, pNode->electNum, pNode->becomeLeaderNum, pNode->becomeAssignedLeaderNum,
277
        pNode->configChangeNum, pNode->hbSlowNum, pNode->hbrSlowNum, aqItems, pNode->snapshottingIndex,
278
        pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish,
192,706✔
279
        syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, bufferStatesStr,
280
        replMgrStatesStr, cfgStr, sentHbTimeStr, hbTimeStr, hbrTimeStr, pNode->arbToken, pNode->sendCount, pNode->recvCount,
192,712✔
281
        pNode->slowCount);
282
  }
283
}
284

285
void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotSender* pSender,
17,069✔
286
                                const char* format, ...) {
287
  SSyncNode* pNode = pSender->pSyncNode;
17,069✔
288
  if (pNode == NULL || pNode->pLogStore == NULL) return;
17,069!
289

290
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
17,069✔
291
  if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) {
17,069!
292
    (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
17,069✔
293
  }
294

295
  SyncIndex logLastIndex = SYNC_INDEX_INVALID;
17,071✔
296
  SyncIndex logBeginIndex = SYNC_INDEX_INVALID;
17,071✔
297
  if (pNode->pLogStore != NULL) {
17,071!
298
    logLastIndex = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
17,071✔
299
    logBeginIndex = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
17,071✔
300
  }
301

302
  char cfgStr[1024] = "";
17,071✔
303
  syncCfg2SimpleStr(&pNode->raftCfg.cfg, cfgStr, sizeof(cfgStr));
17,071✔
304

305
  char peerStr[1024] = "";
17,072✔
306
  syncPeerState2Str(pNode, peerStr, sizeof(peerStr));
17,072✔
307

308
  char    eventLog[512];  // {0};
309
  va_list argpointer;
310
  va_start(argpointer, format);
17,072✔
311
  int32_t writeLen = vsnprintf(eventLog, sizeof(eventLog), format, argpointer);
17,072✔
312
  va_end(argpointer);
17,072✔
313

314
  taosPrintLog(flags, level, dflag,
17,073✔
315
               "vgId:%d, %s, sync:%s, snap-sender:%p signature:(%" PRId64 ", %" PRId64 "), {start:%" PRId64
316
               " end:%" PRId64 " last-index:%" PRId64 " last-term:%" PRId64 " last-cfg:%" PRId64
317
               ", seq:%d, ack:%d, "
318
               " buf:[%" PRId64 " %" PRId64 ", %" PRId64
319
               "], finish:%d, as:%d, to-dnode:%d}"
320
               ", term:%" PRIu64 ", commit-index:%" PRId64 ", firstver:%" PRId64 ", lastver:%" PRId64
321
               ", min-match:%" PRId64 ", snap:{last-index:%" PRId64 ", term:%" PRIu64
322
               "}, standby:%d, batch-sz:%d, replicas:%d, last-cfg:%" PRId64
323
               ", chging:%d, restore:%d, quorum:%d, peer:%s, cfg:%s",
324
               pNode->vgId, eventLog, syncStr(pNode->state), pSender, pSender->term, pSender->startTime,
325
               pSender->snapshotParam.start, pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex,
326
               pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack,
327
               pSender->pSndBuf->start, pSender->pSndBuf->cursor, pSender->pSndBuf->end, pSender->finish,
17,073✔
328
               pSender->replicaIndex, DID(&pNode->replicasId[pSender->replicaIndex]), raftStoreGetTerm(pNode),
17,073✔
329
               pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex,
330
               snapshot.lastApplyTerm, pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize, pNode->replicaNum,
17,072✔
331
               pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, pNode->quorum, peerStr, cfgStr);
17,072✔
332
}
333

334
void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver,
1,575✔
335
                                  const char* format, ...) {
336
  SSyncNode* pNode = pReceiver->pSyncNode;
1,575✔
337
  if (pNode == NULL || pNode->pLogStore == NULL) return;
1,575!
338

339
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
1,575✔
340
  if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) {
1,575!
341
    (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
1,575✔
342
  }
343

344
  SyncIndex logLastIndex = SYNC_INDEX_INVALID;
1,575✔
345
  SyncIndex logBeginIndex = SYNC_INDEX_INVALID;
1,575✔
346
  if (pNode->pLogStore != NULL) {
1,575!
347
    logLastIndex = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
1,575✔
348
    logBeginIndex = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
1,575✔
349
  }
350

351
  char cfgStr[1024] = "";
1,575✔
352
  syncCfg2SimpleStr(&pNode->raftCfg.cfg, cfgStr, sizeof(cfgStr));
1,575✔
353

354
  char peerStr[1024] = "";
1,575✔
355
  syncPeerState2Str(pNode, peerStr, sizeof(peerStr));
1,575✔
356

357
  char    eventLog[512];  // {0};
358
  va_list argpointer;
359
  va_start(argpointer, format);
1,575✔
360
  int32_t writeLen = vsnprintf(eventLog, sizeof(eventLog), format, argpointer);
1,575✔
361
  va_end(argpointer);
1,575✔
362

363
  taosPrintLog(
1,575✔
364
      flags, level, dflag,
365
      "vgId:%d, %s, sync:%s,"
366
      " snap-receiver:%p signature:(%" PRId64 ", %" PRId64 "), {start:%d ack:%d buf:[%" PRId64 " %" PRId64 ", %" PRId64
367
      ")"
368
      " from-dnode:%d, start:%" PRId64 " end:%" PRId64 " last-index:%" PRId64 " last-term:%" PRIu64 " last-cfg:%" PRId64
369
      "}"
370
      ", term:%" PRIu64 ", commit-index:%" PRId64 ", firstver:%" PRId64 ", lastver:%" PRId64 ", min-match:%" PRId64
371
      ", snap:{last-index:%" PRId64 ", last-term:%" PRIu64 "}, standby:%d, batch-sz:%d, replicas:%d, last-cfg:%" PRId64
372
      ", chging:%d, restore:%d, quorum:%d, peer:%s, cfg:%s",
373
      pNode->vgId, eventLog, syncStr(pNode->state), pReceiver, pReceiver->term, pReceiver->startTime, pReceiver->start,
1,575✔
374
      pReceiver->ack, pReceiver->pRcvBuf->start, pReceiver->pRcvBuf->cursor, pReceiver->pRcvBuf->end,
1,575✔
375
      DID(&pReceiver->fromId), pReceiver->snapshotParam.start, pReceiver->snapshotParam.end,
1,575✔
376
      pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm, pReceiver->snapshot.lastConfigIndex,
377
      raftStoreGetTerm(pNode), pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex,
378
      snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize,
1,575✔
379
      pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, pNode->quorum, peerStr,
1,575✔
380
      cfgStr);
381
}
382

383
void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) {
200,735✔
384
  if (!(sDebugFlag & DEBUG_TRACE)) return;
200,735✔
385

386
  int64_t tsNow = taosGetTimestampMs();
3,671✔
387
  int64_t timeDIff = tsNow - pMsg->timeStamp;
3,671✔
388
  sNTrace(
3,671!
389
      pSyncNode, "recv sync-timer {type:%s, lc:%" PRId64 ", ms:%d, ts:%" PRId64 ", elapsed:%" PRId64 ", data:%p}, %s",
390
      syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->timeStamp, timeDIff, pMsg->data, s);
391
}
392

393
void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s) {
38,578✔
394
  sNTrace(pSyncNode, "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRId64 ", fc-index:%" PRId64 "}, %s", pMsg->cmd,
38,578!
395
          syncLocalCmdGetStr(pMsg->cmd), pMsg->currentTerm, pMsg->commitIndex, s);
396
}
38,578✔
397

398
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
×
399
  sNTrace(pSyncNode,
×
400
          "send sync-append-entries-reply to dnode:%d, {term:%" PRId64 ", pterm:%" PRId64
401
          ", success:%d, lsend-index:%" PRId64 ", match:%" PRId64 "}, %s",
402
          DID(&pMsg->destId), pMsg->term, pMsg->lastMatchTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s);
403
}
×
404

405
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
×
406
  sNTrace(pSyncNode,
×
407
          "recv sync-append-entries-reply from dnode:%d {term:%" PRId64 ", pterm:%" PRId64
408
          ", success:%d, lsend-index:%" PRId64 ", match:%" PRId64 "}, %s",
409
          DID(&pMsg->srcId), pMsg->term, pMsg->lastMatchTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s);
410
}
×
411

412
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool printX, int64_t timerElapsed,
42,614✔
413
                          int64_t execTime) {
414
  if (printX) {
42,614✔
415
    sNTrace(pSyncNode,
2,053!
416
            "send sync-heartbeat to dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
417
            ", ts:%" PRId64 "}, x",
418
            DID(&pMsg->destId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp);
419
  } else {
420
    sNTrace(pSyncNode,
40,561!
421
            "send sync-heartbeat to dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
422
            ", ts:%" PRId64 "}, timer-elapsed:%" PRId64 ", next-exec:%" PRId64,
423
            DID(&pMsg->destId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, timerElapsed,
424
            execTime);
425
  }
426
}
42,614✔
427

428
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64_t timeDiff, const char* s) {
38,729✔
429
  if (timeDiff > SYNC_HEARTBEAT_SLOW_MS) {
38,729!
430
    pSyncNode->hbSlowNum++;
×
431

432
    sNTrace(pSyncNode,
×
433
            "recv sync-heartbeat from dnode:%d slow {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
434
            ", ts:%" PRId64 "}, QID:%s, net elapsed:%" PRId64,
435
            DID(&pMsg->srcId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s, timeDiff);
436
  }
437

438
  sNTrace(pSyncNode,
38,729!
439
          "recv sync-heartbeat from dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
440
          ", ts:%" PRId64 "}, QID:%s, net elapsed:%" PRId64,
441
          DID(&pMsg->srcId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s, timeDiff);
442
}
38,729✔
443

444
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
×
445
  sNTrace(pSyncNode, "send sync-heartbeat-reply from dnode:%d {term:%" PRId64 ", ts:%" PRId64 "}, %s",
×
446
          DID(&pMsg->destId), pMsg->term, pMsg->timeStamp, s);
447
}
×
448

449
void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, int64_t timeDiff, const char* s) {
38,192✔
450
  if (timeDiff > SYNC_HEARTBEAT_REPLY_SLOW_MS) {
38,192✔
451
    pSyncNode->hbrSlowNum++;
615✔
452

453
    sNTrace(pSyncNode,
615!
454
            "recv sync-heartbeat-reply from dnode:%d slow {term:%" PRId64 ", ts:%" PRId64 "}, %s, net elapsed:%" PRId64,
455
            DID(&pMsg->srcId), pMsg->term, pMsg->timeStamp, s, timeDiff);
456
  }
457

458
  sNTrace(pSyncNode,
38,192!
459
          "recv sync-heartbeat-reply from dnode:%d {term:%" PRId64 ", ts:%" PRId64 "}, %s, net elapsed:%" PRId64,
460
          DID(&pMsg->srcId), pMsg->term, pMsg->timeStamp, s, timeDiff);
461
}
38,192✔
462

463
void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {
×
464
  sNDebug(pSyncNode,
×
465
          "send sync-snapshot-send to dnode:%d, %s, seq:%d, term:%" PRId64 ", begin-index:%" PRId64
466
          ", last-index:%" PRId64 ", last-term:%" PRId64 ", start-time:%" PRId64,
467
          DID(&pMsg->destId), s, pMsg->seq, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm,
468
          pMsg->startTime);
469
}
×
470

471
void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {
×
472
  sNDebug(pSyncNode,
×
473
          "recv sync-snapshot-send from dnode:%d, %s, seq:%d, term:%" PRId64 ", begin-index:%" PRId64
474
          ", last-index:%" PRId64 ", last-term:%" PRId64 ", start-time:%" PRId64 ", data-len:%u",
475
          DID(&pMsg->srcId), s, pMsg->seq, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm,
476
          pMsg->startTime, pMsg->dataLen);
477
}
×
478

479
void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {
67✔
480
  sNDebug(pSyncNode,
67!
481
          "send sync-snapshot-rsp to dnode:%d, %s, acked:%d, term:%" PRId64 ", begin-index:%" PRId64
482
          ", last-index:%" PRId64 ", last-term:%" PRId64 ", start-time:%" PRId64,
483
          DID(&pMsg->destId), s, pMsg->ack, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm,
484
          pMsg->startTime);
485
}
67✔
486

487
void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {
×
488
  sNDebug(pSyncNode,
×
489
          "recv sync-snapshot-rsp from dnode:%d, %s, ack:%d, term:%" PRId64 ", begin-index:%" PRId64
490
          ", last-index:%" PRId64 ", last-term:%" PRId64 ", start-time:%" PRId64,
491
          DID(&pMsg->srcId), s, pMsg->ack, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm,
492
          pMsg->startTime);
493
}
×
494

495
void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
2✔
496
  sNTrace(pSyncNode,
2!
497
          "recv sync-append-entries from dnode:%d {term:%" PRId64 ", prev-log:{index:%" PRId64 ", term:%" PRId64
498
          "}, commit-index:%" PRId64 ", datalen:%d}, %s",
499
          DID(&pMsg->srcId), pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->dataLen, s);
500
}
2✔
501

502
void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
×
503
  sNTrace(pSyncNode,
×
504
          "send sync-append-entries to dnode:%d, {term:%" PRId64 ", prev-log:{index:%" PRId64 ", term:%" PRId64
505
          "}, index:%" PRId64 ", commit-index:%" PRId64 ", datalen:%d}, %s",
506
          DID(&pMsg->destId), pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, (pMsg->prevLogIndex + 1),
507
          pMsg->commitIndex, pMsg->dataLen, s);
508
}
×
509

510
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, int32_t voteGranted, const char* errmsg,
4,172✔
511
                            const char* opt) {
512
  char statusMsg[64];
513
  snprintf(statusMsg, sizeof(statusMsg), "granted:%d", voteGranted);
4,172✔
514
  sNInfo(pSyncNode,
4,172!
515
         "%s sync-request-vote from dnode:%d, {term:%" PRId64 ", last-index:%" PRId64 ", last-term:%" PRId64 "}, %s",
516
         opt, DID(&pMsg->srcId), pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm,
517
         (voteGranted != -1) ? statusMsg : errmsg);
518
}
4,172✔
519

520
void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const char* s) {
×
521
  sNInfo(pNode,
×
522
         "send sync-request-vote to dnode:%d {term:%" PRId64 ", last-index:%" PRId64 ", last-term:%" PRId64 "}, %s",
523
         DID(&pMsg->destId), pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
524
}
×
525

526
void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
1,967✔
527
  sNInfo(pSyncNode, "recv sync-request-vote-reply from dnode:%d {term:%" PRId64 ", grant:%d}, %s", DID(&pMsg->srcId),
1,967!
528
         pMsg->term, pMsg->voteGranted, s);
529
}
1,967✔
530

531
void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
2,084✔
532
  sNInfo(pSyncNode, "send sync-request-vote-reply to dnode:%d {term:%" PRId64 ", grant:%d}, %s", DID(&pMsg->destId),
2,084!
533
         pMsg->term, pMsg->voteGranted, s);
534
}
2,084✔
535

536
int32_t syncSnapInfoDataRealloc(SSnapshot* pSnap, int32_t size) {
136✔
537
  void* data = taosMemoryRealloc(pSnap->data, size);
136!
538
  if (data == NULL) {
136!
539
    return terrno;
×
540
  }
541
  pSnap->data = data;
136✔
542
  return 0;
136✔
543
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc