• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3621

22 Feb 2025 11:44AM UTC coverage: 2.037% (-61.5%) from 63.573%
#3621

push

travis-ci

web-flow
Merge pull request #29874 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

4357 of 287032 branches covered (1.52%)

Branch coverage included in aggregate %.

0 of 174 new or added lines in 18 files covered. (0.0%)

213359 existing lines in 469 files now uncovered.

7260 of 283369 relevant lines covered (2.56%)

23737.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/libs/sync/src/syncMessage.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "syncMessage.h"
18
#include "syncRaftEntry.h"
19
#include "syncRaftStore.h"
20

UNCOV
21
int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS,
×
22
                         SSyncNode* pNode) {
UNCOV
23
  int32_t bytes = sizeof(SyncTimeout);
×
UNCOV
24
  pMsg->pCont = rpcMallocCont(bytes);
×
UNCOV
25
  pMsg->msgType = (timeoutType == SYNC_TIMEOUT_ELECTION) ? TDMT_SYNC_TIMEOUT_ELECTION : TDMT_SYNC_TIMEOUT;
×
UNCOV
26
  pMsg->contLen = bytes;
×
UNCOV
27
  if (pMsg->pCont == NULL) {
×
28
    return terrno;
×
29
  }
30

UNCOV
31
  SyncTimeout* pTimeout = pMsg->pCont;
×
UNCOV
32
  pTimeout->bytes = bytes;
×
UNCOV
33
  pTimeout->msgType = pMsg->msgType;
×
UNCOV
34
  pTimeout->vgId = pNode->vgId;
×
UNCOV
35
  pTimeout->timeoutType = timeoutType;
×
UNCOV
36
  pTimeout->logicClock = logicClock;
×
UNCOV
37
  pTimeout->timerMS = timerMS;
×
UNCOV
38
  pTimeout->timeStamp = taosGetTimestampMs();
×
UNCOV
39
  pTimeout->data = pNode;
×
UNCOV
40
  return 0;
×
41
}
42

UNCOV
43
int32_t syncBuildClientRequest(SRpcMsg* pMsg, const SRpcMsg* pOriginal, uint64_t seqNum, bool isWeak, int32_t vgId) {
×
UNCOV
44
  int32_t bytes = sizeof(SyncClientRequest) + pOriginal->contLen;
×
45

UNCOV
46
  pMsg->pCont = rpcMallocCont(bytes);
×
UNCOV
47
  if (pMsg->pCont == NULL) {
×
UNCOV
48
    return terrno;
×
49
  }
UNCOV
50
  pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST;
×
UNCOV
51
  pMsg->contLen = bytes;
×
52

UNCOV
53
  SyncClientRequest* pClientRequest = pMsg->pCont;
×
UNCOV
54
  pClientRequest->bytes = bytes;
×
UNCOV
55
  pClientRequest->vgId = vgId;
×
UNCOV
56
  pClientRequest->msgType = TDMT_SYNC_CLIENT_REQUEST;
×
UNCOV
57
  pClientRequest->originalRpcType = pOriginal->msgType;
×
UNCOV
58
  pClientRequest->seqNum = seqNum;
×
UNCOV
59
  pClientRequest->isWeak = isWeak;
×
UNCOV
60
  pClientRequest->dataLen = pOriginal->contLen;
×
UNCOV
61
  memcpy(pClientRequest->data, (char*)pOriginal->pCont, pOriginal->contLen);
×
62

UNCOV
63
  return 0;
×
64
}
65

66
int32_t syncBuildClientRequestFromNoopEntry(SRpcMsg* pMsg, const SSyncRaftEntry* pEntry, int32_t vgId) {
×
67
  int32_t bytes = sizeof(SyncClientRequest) + pEntry->bytes;
×
68
  pMsg->pCont = rpcMallocCont(bytes);
×
69
  pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST;
×
70
  pMsg->contLen = bytes;
×
71
  if (pMsg->pCont == NULL) {
×
72
    return terrno;
×
73
  }
74

75
  SyncClientRequest* pClientRequest = pMsg->pCont;
×
76
  pClientRequest->bytes = bytes;
×
77
  pClientRequest->vgId = vgId;
×
78
  pClientRequest->msgType = TDMT_SYNC_CLIENT_REQUEST;
×
79
  pClientRequest->originalRpcType = TDMT_SYNC_NOOP;
×
80
  pClientRequest->dataLen = pEntry->bytes;
×
81
  memcpy(pClientRequest->data, (char*)pEntry, pEntry->bytes);
×
82

83
  return 0;
×
84
}
85

UNCOV
86
int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId) {
×
UNCOV
87
  int32_t bytes = sizeof(SyncRequestVote);
×
UNCOV
88
  pMsg->pCont = rpcMallocCont(bytes);
×
UNCOV
89
  pMsg->msgType = TDMT_SYNC_REQUEST_VOTE;
×
UNCOV
90
  pMsg->contLen = bytes;
×
UNCOV
91
  if (pMsg->pCont == NULL) {
×
92
    return terrno;
×
93
  }
94

UNCOV
95
  SyncRequestVote* pRequestVote = pMsg->pCont;
×
UNCOV
96
  pRequestVote->bytes = bytes;
×
UNCOV
97
  pRequestVote->msgType = TDMT_SYNC_REQUEST_VOTE;
×
UNCOV
98
  pRequestVote->vgId = vgId;
×
UNCOV
99
  return 0;
×
100
}
101

UNCOV
102
int32_t syncBuildRequestVoteReply(SRpcMsg* pMsg, int32_t vgId) {
×
UNCOV
103
  int32_t bytes = sizeof(SyncRequestVoteReply);
×
UNCOV
104
  pMsg->pCont = rpcMallocCont(bytes);
×
UNCOV
105
  pMsg->msgType = TDMT_SYNC_REQUEST_VOTE_REPLY;
×
UNCOV
106
  pMsg->contLen = bytes;
×
UNCOV
107
  if (pMsg->pCont == NULL) {
×
108
    return terrno;
×
109
  }
110

UNCOV
111
  SyncRequestVoteReply* pRequestVoteReply = pMsg->pCont;
×
UNCOV
112
  pRequestVoteReply->bytes = bytes;
×
UNCOV
113
  pRequestVoteReply->msgType = TDMT_SYNC_REQUEST_VOTE_REPLY;
×
UNCOV
114
  pRequestVoteReply->vgId = vgId;
×
UNCOV
115
  return 0;
×
116
}
117

118
int32_t syncBuildAppendEntries(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) {
×
119
  int32_t bytes = sizeof(SyncAppendEntries) + dataLen;
×
120
  pMsg->pCont = rpcMallocCont(bytes);
×
121
  pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES;
×
122
  pMsg->contLen = bytes;
×
123
  if (pMsg->pCont == NULL) {
×
124
    return terrno;
×
125
  }
126

127
  SyncAppendEntries* pAppendEntries = pMsg->pCont;
×
128
  pAppendEntries->bytes = bytes;
×
129
  pAppendEntries->vgId = vgId;
×
130
  pAppendEntries->msgType = TDMT_SYNC_APPEND_ENTRIES;
×
131
  pAppendEntries->dataLen = dataLen;
×
132
  return 0;
×
133
}
134

UNCOV
135
int32_t syncBuildAppendEntriesReply(SRpcMsg* pMsg, int32_t vgId) {
×
UNCOV
136
  int32_t bytes = sizeof(SyncAppendEntriesReply);
×
UNCOV
137
  pMsg->pCont = rpcMallocCont(bytes);
×
UNCOV
138
  pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES_REPLY;
×
UNCOV
139
  pMsg->contLen = bytes;
×
UNCOV
140
  if (pMsg->pCont == NULL) {
×
141
    return terrno;
×
142
  }
143

UNCOV
144
  SyncAppendEntriesReply* pAppendEntriesReply = pMsg->pCont;
×
UNCOV
145
  pAppendEntriesReply->bytes = bytes;
×
UNCOV
146
  pAppendEntriesReply->msgType = TDMT_SYNC_APPEND_ENTRIES_REPLY;
×
UNCOV
147
  pAppendEntriesReply->vgId = vgId;
×
UNCOV
148
  return 0;
×
149
}
150

UNCOV
151
int32_t syncBuildAppendEntriesFromRaftEntry(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm,
×
152
                                            SRpcMsg* pRpcMsg) {
UNCOV
153
  uint32_t dataLen = pEntry->bytes;
×
UNCOV
154
  uint32_t bytes = sizeof(SyncAppendEntries) + dataLen;
×
UNCOV
155
  pRpcMsg->contLen = bytes;
×
UNCOV
156
  pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
×
UNCOV
157
  if (pRpcMsg->pCont == NULL) {
×
158
    return terrno;
×
159
  }
160

UNCOV
161
  SyncAppendEntries* pMsg = pRpcMsg->pCont;
×
UNCOV
162
  pMsg->bytes = pRpcMsg->contLen;
×
UNCOV
163
  pMsg->msgType = pRpcMsg->msgType = TDMT_SYNC_APPEND_ENTRIES;
×
UNCOV
164
  pMsg->dataLen = dataLen;
×
165

UNCOV
166
  (void)memcpy(pMsg->data, pEntry, dataLen);
×
167

UNCOV
168
  pMsg->prevLogIndex = pEntry->index - 1;
×
UNCOV
169
  pMsg->prevLogTerm = prevLogTerm;
×
UNCOV
170
  pMsg->vgId = pNode->vgId;
×
UNCOV
171
  pMsg->srcId = pNode->myRaftId;
×
UNCOV
172
  pMsg->term = raftStoreGetTerm(pNode);
×
UNCOV
173
  pMsg->commitIndex = pNode->commitIndex;
×
UNCOV
174
  pMsg->privateTerm = 0;
×
UNCOV
175
  return 0;
×
176
}
177

UNCOV
178
int32_t syncBuildHeartbeat(SRpcMsg* pMsg, int32_t vgId) {
×
UNCOV
179
  int32_t bytes = sizeof(SyncHeartbeat);
×
UNCOV
180
  pMsg->pCont = rpcMallocCont(bytes);
×
UNCOV
181
  pMsg->msgType = TDMT_SYNC_HEARTBEAT;
×
UNCOV
182
  pMsg->contLen = bytes;
×
UNCOV
183
  if (pMsg->pCont == NULL) {
×
184
    return terrno;
×
185
  }
186

UNCOV
187
  SyncHeartbeat* pHeartbeat = pMsg->pCont;
×
UNCOV
188
  pHeartbeat->bytes = bytes;
×
UNCOV
189
  pHeartbeat->msgType = TDMT_SYNC_HEARTBEAT;
×
UNCOV
190
  pHeartbeat->vgId = vgId;
×
UNCOV
191
  return 0;
×
192
}
193

UNCOV
194
int32_t syncBuildHeartbeatReply(SRpcMsg* pMsg, int32_t vgId) {
×
UNCOV
195
  int32_t bytes = sizeof(SyncHeartbeatReply);
×
UNCOV
196
  pMsg->pCont = rpcMallocCont(bytes);
×
UNCOV
197
  pMsg->msgType = TDMT_SYNC_HEARTBEAT_REPLY;
×
UNCOV
198
  pMsg->contLen = bytes;
×
UNCOV
199
  if (pMsg->pCont == NULL) {
×
200
    return terrno;
×
201
  }
202

UNCOV
203
  SyncHeartbeatReply* pHeartbeatReply = pMsg->pCont;
×
UNCOV
204
  pHeartbeatReply->bytes = bytes;
×
UNCOV
205
  pHeartbeatReply->msgType = TDMT_SYNC_HEARTBEAT_REPLY;
×
UNCOV
206
  pHeartbeatReply->vgId = vgId;
×
UNCOV
207
  return 0;
×
208
}
209

UNCOV
210
int32_t syncBuildSnapshotSend(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) {
×
UNCOV
211
  int32_t bytes = sizeof(SyncSnapshotSend) + dataLen;
×
UNCOV
212
  pMsg->pCont = rpcMallocCont(bytes);
×
UNCOV
213
  pMsg->msgType = TDMT_SYNC_SNAPSHOT_SEND;
×
UNCOV
214
  pMsg->contLen = bytes;
×
UNCOV
215
  if (pMsg->pCont == NULL) {
×
216
    return terrno;
×
217
  }
218

UNCOV
219
  SyncSnapshotSend* pSnapshotSend = pMsg->pCont;
×
UNCOV
220
  pSnapshotSend->bytes = bytes;
×
UNCOV
221
  pSnapshotSend->vgId = vgId;
×
UNCOV
222
  pSnapshotSend->msgType = TDMT_SYNC_SNAPSHOT_SEND;
×
UNCOV
223
  pSnapshotSend->dataLen = dataLen;
×
UNCOV
224
  return 0;
×
225
}
226

UNCOV
227
int32_t syncBuildSnapshotSendRsp(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) {
×
UNCOV
228
  int32_t bytes = sizeof(SyncSnapshotRsp) + dataLen;
×
UNCOV
229
  pMsg->pCont = rpcMallocCont(bytes);
×
UNCOV
230
  pMsg->msgType = TDMT_SYNC_SNAPSHOT_RSP;
×
UNCOV
231
  pMsg->contLen = bytes;
×
UNCOV
232
  if (pMsg->pCont == NULL) {
×
233
    return terrno;
×
234
  }
235

UNCOV
236
  SyncSnapshotRsp* pPreSnapshotRsp = pMsg->pCont;
×
UNCOV
237
  pPreSnapshotRsp->bytes = bytes;
×
UNCOV
238
  pPreSnapshotRsp->msgType = TDMT_SYNC_SNAPSHOT_RSP;
×
UNCOV
239
  pPreSnapshotRsp->vgId = vgId;
×
UNCOV
240
  return 0;
×
241
}
242

UNCOV
243
int32_t syncBuildLeaderTransfer(SRpcMsg* pMsg, int32_t vgId) {
×
UNCOV
244
  int32_t bytes = sizeof(SyncLeaderTransfer);
×
UNCOV
245
  pMsg->pCont = rpcMallocCont(bytes);
×
UNCOV
246
  pMsg->msgType = TDMT_SYNC_LEADER_TRANSFER;
×
UNCOV
247
  pMsg->contLen = bytes;
×
UNCOV
248
  if (pMsg->pCont == NULL) {
×
249
    return terrno;
×
250
  }
251

UNCOV
252
  SyncLeaderTransfer* pLeaderTransfer = pMsg->pCont;
×
UNCOV
253
  pLeaderTransfer->bytes = bytes;
×
UNCOV
254
  pLeaderTransfer->msgType = TDMT_SYNC_LEADER_TRANSFER;
×
UNCOV
255
  pLeaderTransfer->vgId = vgId;
×
UNCOV
256
  return 0;
×
257
}
258

UNCOV
259
int32_t syncBuildLocalCmd(SRpcMsg* pMsg, int32_t vgId) {
×
UNCOV
260
  int32_t bytes = sizeof(SyncLocalCmd);
×
UNCOV
261
  pMsg->pCont = rpcMallocCont(bytes);
×
UNCOV
262
  pMsg->msgType = TDMT_SYNC_LOCAL_CMD;
×
UNCOV
263
  pMsg->contLen = bytes;
×
UNCOV
264
  if (pMsg->pCont == NULL) {
×
265
    return terrno;
×
266
  }
267

UNCOV
268
  SyncLocalCmd* pLocalCmd = pMsg->pCont;
×
UNCOV
269
  pLocalCmd->bytes = bytes;
×
UNCOV
270
  pLocalCmd->msgType = TDMT_SYNC_LOCAL_CMD;
×
UNCOV
271
  pLocalCmd->vgId = vgId;
×
UNCOV
272
  return 0;
×
273
}
274

UNCOV
275
const char* syncTimerTypeStr(enum ESyncTimeoutType timerType) {
×
UNCOV
276
  switch (timerType) {
×
UNCOV
277
    case SYNC_TIMEOUT_PING:
×
UNCOV
278
      return "ping";
×
279
    case SYNC_TIMEOUT_ELECTION:
×
280
      return "elect";
×
281
    case SYNC_TIMEOUT_HEARTBEAT:
×
282
      return "heartbeat";
×
283
    default:
×
284
      return "unknown";
×
285
  }
286
}
287

288
const char* syncLocalCmdGetStr(ESyncLocalCmd cmd) {
×
289
  switch (cmd) {
×
290
    case SYNC_LOCAL_CMD_STEP_DOWN:
×
291
      return "step-down";
×
292
    case SYNC_LOCAL_CMD_FOLLOWER_CMT:
×
293
      return "follower-commit";
×
294
    default:
×
295
      return "unknown-local-cmd";
×
296
  }
297
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc