• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4829

30 Oct 2025 09:25AM UTC coverage: 49.734% (-11.3%) from 61.071%
#4829

push

travis-ci

web-flow
Merge pull request #33435 from taosdata/3.0

merge 3.0

123072 of 323930 branches covered (37.99%)

Branch coverage included in aggregate %.

7 of 25 new or added lines in 3 files covered. (28.0%)

35232 existing lines in 327 files now uncovered.

172062 of 269495 relevant lines covered (63.85%)

70709785.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.64
/source/libs/sync/src/syncMessage.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "syncMessage.h"
18
#include "syncRaftEntry.h"
19
#include "syncRaftStore.h"
20

21
int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS,
7,618,458✔
22
                         SSyncNode* pNode) {
23
  int32_t bytes = sizeof(SyncTimeout);
7,618,458✔
24
  pMsg->pCont = rpcMallocCont(bytes);
7,618,458✔
25
  pMsg->msgType = (timeoutType == SYNC_TIMEOUT_ELECTION) ? TDMT_SYNC_TIMEOUT_ELECTION : TDMT_SYNC_TIMEOUT;
7,618,458✔
26
  pMsg->contLen = bytes;
7,618,458✔
27
  if (pMsg->pCont == NULL) {
7,618,458!
28
    return terrno;
×
29
  }
30

31
  SyncTimeout* pTimeout = pMsg->pCont;
7,618,458✔
32
  pTimeout->bytes = bytes;
7,618,458✔
33
  pTimeout->msgType = pMsg->msgType;
7,618,458✔
34
  pTimeout->vgId = pNode->vgId;
7,618,458✔
35
  pTimeout->timeoutType = timeoutType;
7,618,458✔
36
  pTimeout->logicClock = logicClock;
7,618,458✔
37
  pTimeout->timerMS = timerMS;
7,618,458✔
38
  pTimeout->timeStamp = taosGetTimestampMs();
7,618,458✔
39
  pTimeout->data = pNode;
7,618,458✔
40
  return 0;
7,618,458✔
41
}
42

43
int32_t syncBuildClientRequest(SRpcMsg* pMsg, const SRpcMsg* pOriginal, uint64_t seqNum, bool isWeak, int32_t vgId) {
9,548,661✔
44
  int32_t bytes = sizeof(SyncClientRequest) + pOriginal->contLen;
9,548,661✔
45

46
  pMsg->pCont = rpcMallocCont(bytes);
9,548,661✔
47
  if (pMsg->pCont == NULL) {
9,548,661!
48
    return terrno;
×
49
  }
50
  pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST;
9,548,661✔
51
  pMsg->contLen = bytes;
9,548,661✔
52

53
  SyncClientRequest* pClientRequest = pMsg->pCont;
9,548,661✔
54
  pClientRequest->bytes = bytes;
9,548,661✔
55
  pClientRequest->vgId = vgId;
9,548,661✔
56
  pClientRequest->msgType = TDMT_SYNC_CLIENT_REQUEST;
9,548,439✔
57
  pClientRequest->originalRpcType = pOriginal->msgType;
9,548,661✔
58
  pClientRequest->seqNum = seqNum;
9,548,661✔
59
  pClientRequest->isWeak = isWeak;
9,548,661✔
60
  pClientRequest->dataLen = pOriginal->contLen;
9,548,439✔
61
  memcpy(pClientRequest->data, (char*)pOriginal->pCont, pOriginal->contLen);
9,548,439!
62

63
  return 0;
9,548,661✔
64
}
65

66
int32_t syncBuildClientRequestFromNoopEntry(SRpcMsg* pMsg, const SSyncRaftEntry* pEntry, int32_t vgId) {
×
67
  int32_t bytes = sizeof(SyncClientRequest) + pEntry->bytes;
×
68
  pMsg->pCont = rpcMallocCont(bytes);
×
69
  pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST;
×
70
  pMsg->contLen = bytes;
×
71
  if (pMsg->pCont == NULL) {
×
72
    return terrno;
×
73
  }
74

75
  SyncClientRequest* pClientRequest = pMsg->pCont;
×
76
  pClientRequest->bytes = bytes;
×
77
  pClientRequest->vgId = vgId;
×
78
  pClientRequest->msgType = TDMT_SYNC_CLIENT_REQUEST;
×
79
  pClientRequest->originalRpcType = TDMT_SYNC_NOOP;
×
80
  pClientRequest->dataLen = pEntry->bytes;
×
81
  memcpy(pClientRequest->data, (char*)pEntry, pEntry->bytes);
×
82

83
  return 0;
×
84
}
85

86
int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId) {
220,039✔
87
  int32_t bytes = sizeof(SyncRequestVote);
220,039✔
88
  pMsg->pCont = rpcMallocCont(bytes);
220,039✔
89
  pMsg->msgType = TDMT_SYNC_REQUEST_VOTE;
220,039✔
90
  pMsg->contLen = bytes;
220,039✔
91
  if (pMsg->pCont == NULL) {
220,039!
92
    return terrno;
×
93
  }
94

95
  SyncRequestVote* pRequestVote = pMsg->pCont;
220,039✔
96
  pRequestVote->bytes = bytes;
220,039✔
97
  pRequestVote->msgType = TDMT_SYNC_REQUEST_VOTE;
220,039✔
98
  pRequestVote->vgId = vgId;
220,039✔
99
  return 0;
220,039✔
100
}
101

102
int32_t syncBuildRequestVoteReply(SRpcMsg* pMsg, int32_t vgId) {
261,579✔
103
  int32_t bytes = sizeof(SyncRequestVoteReply);
261,579✔
104
  pMsg->pCont = rpcMallocCont(bytes);
261,579✔
105
  pMsg->msgType = TDMT_SYNC_REQUEST_VOTE_REPLY;
261,579✔
106
  pMsg->contLen = bytes;
261,579✔
107
  if (pMsg->pCont == NULL) {
261,579!
108
    return terrno;
×
109
  }
110

111
  SyncRequestVoteReply* pRequestVoteReply = pMsg->pCont;
261,579✔
112
  pRequestVoteReply->bytes = bytes;
261,579✔
113
  pRequestVoteReply->msgType = TDMT_SYNC_REQUEST_VOTE_REPLY;
261,579✔
114
  pRequestVoteReply->vgId = vgId;
261,579✔
115
  return 0;
261,579✔
116
}
117

118
int32_t syncBuildAppendEntries(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) {
×
119
  int32_t bytes = sizeof(SyncAppendEntries) + dataLen;
×
120
  pMsg->pCont = rpcMallocCont(bytes);
×
121
  pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES;
×
122
  pMsg->contLen = bytes;
×
123
  if (pMsg->pCont == NULL) {
×
124
    return terrno;
×
125
  }
126

127
  SyncAppendEntries* pAppendEntries = pMsg->pCont;
×
128
  pAppendEntries->bytes = bytes;
×
129
  pAppendEntries->vgId = vgId;
×
130
  pAppendEntries->msgType = TDMT_SYNC_APPEND_ENTRIES;
×
131
  pAppendEntries->dataLen = dataLen;
×
132
  return 0;
×
133
}
134

135
int32_t syncBuildAppendEntriesReply(SRpcMsg* pMsg, int32_t vgId) {
7,469,643✔
136
  int32_t bytes = sizeof(SyncAppendEntriesReply);
7,469,643✔
137
  pMsg->pCont = rpcMallocCont(bytes);
7,469,643✔
138
  pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES_REPLY;
7,469,643✔
139
  pMsg->contLen = bytes;
7,469,643✔
140
  if (pMsg->pCont == NULL) {
7,469,643!
141
    return terrno;
×
142
  }
143

144
  SyncAppendEntriesReply* pAppendEntriesReply = pMsg->pCont;
7,469,643✔
145
  pAppendEntriesReply->bytes = bytes;
7,469,643✔
146
  pAppendEntriesReply->msgType = TDMT_SYNC_APPEND_ENTRIES_REPLY;
7,469,643✔
147
  pAppendEntriesReply->vgId = vgId;
7,469,643✔
148
  return 0;
7,469,643✔
149
}
150

151
int32_t syncBuildAppendEntriesFromRaftEntry(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm,
7,603,451✔
152
                                            SRpcMsg* pRpcMsg) {
153
  uint32_t dataLen = pEntry->bytes;
7,603,451✔
154
  uint32_t bytes = sizeof(SyncAppendEntries) + dataLen;
7,603,673✔
155
  pRpcMsg->info.traceId = pEntry->originRpcTraceId;
7,603,673✔
156
  pRpcMsg->contLen = bytes;
7,602,479✔
157
  pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
7,602,953✔
158
  if (pRpcMsg->pCont == NULL) {
7,602,817!
159
    return terrno;
×
160
  }
161

162
  SyncAppendEntries* pMsg = pRpcMsg->pCont;
7,603,157✔
163
  pMsg->bytes = pRpcMsg->contLen;
7,602,995✔
164
  pMsg->msgType = pRpcMsg->msgType = TDMT_SYNC_APPEND_ENTRIES;
7,603,334✔
165
  pMsg->dataLen = dataLen;
7,603,673✔
166

167
  (void)memcpy(pMsg->data, pEntry, dataLen);
7,603,292!
168

169
  pMsg->prevLogIndex = pEntry->index - 1;
7,603,142✔
170
  pMsg->prevLogTerm = prevLogTerm;
7,603,673✔
171
  pMsg->vgId = pNode->vgId;
7,603,673✔
172
  pMsg->srcId = pNode->myRaftId;
7,602,791✔
173
  pMsg->term = raftStoreGetTerm(pNode);
7,603,673✔
174
  pMsg->commitIndex = pNode->commitIndex;
7,603,673✔
175
  pMsg->privateTerm = 0;
7,603,673✔
176
  return 0;
7,603,157✔
177
}
178

179
int32_t syncBuildHeartbeat(SRpcMsg* pMsg, int32_t vgId) {
7,197,256✔
180
  int32_t bytes = sizeof(SyncHeartbeat);
7,197,256✔
181
  pMsg->pCont = rpcMallocCont(bytes);
7,197,256✔
182
  pMsg->msgType = TDMT_SYNC_HEARTBEAT;
7,197,256✔
183
  pMsg->contLen = bytes;
7,197,256✔
184
  if (pMsg->pCont == NULL) {
7,197,256!
185
    return terrno;
×
186
  }
187

188
  SyncHeartbeat* pHeartbeat = pMsg->pCont;
7,197,256✔
189
  pHeartbeat->bytes = bytes;
7,197,256✔
190
  pHeartbeat->msgType = TDMT_SYNC_HEARTBEAT;
7,197,256✔
191
  pHeartbeat->vgId = vgId;
7,197,256✔
192
  return 0;
7,197,256✔
193
}
194

195
int32_t syncBuildHeartbeatReply(SRpcMsg* pMsg, int32_t vgId) {
6,239,250✔
196
  int32_t bytes = sizeof(SyncHeartbeatReply);
6,239,250✔
197
  pMsg->pCont = rpcMallocCont(bytes);
6,239,250✔
198
  pMsg->msgType = TDMT_SYNC_HEARTBEAT_REPLY;
6,238,248✔
199
  pMsg->contLen = bytes;
6,238,248✔
200
  if (pMsg->pCont == NULL) {
6,238,248!
201
    return terrno;
×
202
  }
203

204
  SyncHeartbeatReply* pHeartbeatReply = pMsg->pCont;
6,238,691✔
205
  pHeartbeatReply->bytes = bytes;
6,238,869✔
206
  pHeartbeatReply->msgType = TDMT_SYNC_HEARTBEAT_REPLY;
6,239,290✔
207
  pHeartbeatReply->vgId = vgId;
6,238,852✔
208
  return 0;
6,238,997✔
209
}
210

211
int32_t syncBuildSnapshotSend(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) {
5,058✔
212
  int32_t bytes = sizeof(SyncSnapshotSend) + dataLen;
5,058✔
213
  pMsg->pCont = rpcMallocCont(bytes);
5,058✔
214
  pMsg->msgType = TDMT_SYNC_SNAPSHOT_SEND;
5,058✔
215
  pMsg->contLen = bytes;
5,058✔
216
  if (pMsg->pCont == NULL) {
5,058!
217
    return terrno;
×
218
  }
219

220
  SyncSnapshotSend* pSnapshotSend = pMsg->pCont;
5,058✔
221
  pSnapshotSend->bytes = bytes;
5,058✔
222
  pSnapshotSend->vgId = vgId;
5,058✔
223
  pSnapshotSend->msgType = TDMT_SYNC_SNAPSHOT_SEND;
5,058✔
224
  pSnapshotSend->dataLen = dataLen;
5,058✔
225
  return 0;
5,058✔
226
}
227

228
int32_t syncBuildSnapshotSendRsp(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) {
5,058✔
229
  int32_t bytes = sizeof(SyncSnapshotRsp) + dataLen;
5,058✔
230
  pMsg->pCont = rpcMallocCont(bytes);
5,058✔
231
  pMsg->msgType = TDMT_SYNC_SNAPSHOT_RSP;
5,058✔
232
  pMsg->contLen = bytes;
5,058✔
233
  if (pMsg->pCont == NULL) {
5,058!
234
    return terrno;
×
235
  }
236

237
  SyncSnapshotRsp* pPreSnapshotRsp = pMsg->pCont;
5,058✔
238
  pPreSnapshotRsp->bytes = bytes;
5,058✔
239
  pPreSnapshotRsp->msgType = TDMT_SYNC_SNAPSHOT_RSP;
5,058✔
240
  pPreSnapshotRsp->vgId = vgId;
5,058✔
241
  return 0;
5,058✔
242
}
243

244
int32_t syncBuildLeaderTransfer(SRpcMsg* pMsg, int32_t vgId) {
85,548✔
245
  int32_t bytes = sizeof(SyncLeaderTransfer);
85,548✔
246
  pMsg->pCont = rpcMallocCont(bytes);
85,548✔
247
  pMsg->msgType = TDMT_SYNC_LEADER_TRANSFER;
85,548✔
248
  pMsg->contLen = bytes;
85,548✔
249
  if (pMsg->pCont == NULL) {
85,548!
250
    return terrno;
×
251
  }
252

253
  SyncLeaderTransfer* pLeaderTransfer = pMsg->pCont;
85,548✔
254
  pLeaderTransfer->bytes = bytes;
85,548✔
255
  pLeaderTransfer->msgType = TDMT_SYNC_LEADER_TRANSFER;
85,548✔
256
  pLeaderTransfer->vgId = vgId;
85,548✔
257
  return 0;
85,548✔
258
}
259

260
int32_t syncBuildLocalCmd(SRpcMsg* pMsg, int32_t vgId) {
6,138,338✔
261
  int32_t bytes = sizeof(SyncLocalCmd);
6,138,338✔
262
  pMsg->pCont = rpcMallocCont(bytes);
6,138,338✔
263
  pMsg->msgType = TDMT_SYNC_LOCAL_CMD;
6,137,694✔
264
  pMsg->contLen = bytes;
6,137,477✔
265
  if (pMsg->pCont == NULL) {
6,137,694!
266
    return terrno;
×
267
  }
268

269
  SyncLocalCmd* pLocalCmd = pMsg->pCont;
6,137,919✔
270
  pLocalCmd->bytes = bytes;
6,137,715✔
271
  pLocalCmd->msgType = TDMT_SYNC_LOCAL_CMD;
6,137,715✔
272
  pLocalCmd->vgId = vgId;
6,137,715✔
273
  return 0;
6,137,694✔
274
}
275

UNCOV
276
const char* syncTimerTypeStr(enum ESyncTimeoutType timerType) {
×
UNCOV
277
  switch (timerType) {
×
UNCOV
278
    case SYNC_TIMEOUT_PING:
×
UNCOV
279
      return "ping";
×
280
    case SYNC_TIMEOUT_ELECTION:
×
281
      return "elect";
×
282
    case SYNC_TIMEOUT_HEARTBEAT:
×
283
      return "heartbeat";
×
284
    default:
×
285
      return "unknown";
×
286
  }
287
}
288

289
const char* syncLocalCmdGetStr(ESyncLocalCmd cmd) {
×
290
  switch (cmd) {
×
291
    case SYNC_LOCAL_CMD_STEP_DOWN:
×
292
      return "step-down";
×
293
    case SYNC_LOCAL_CMD_FOLLOWER_CMT:
×
294
      return "follower-commit";
×
295
    default:
×
296
      return "unknown-local-cmd";
×
297
  }
298
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc