• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5050

12 May 2026 05:36AM UTC coverage: 73.398% (+0.09%) from 73.313%
#5050

push

travis-ci

web-flow
merge: from main to 3.0 branch #35319

90 of 101 new or added lines in 2 files covered. (89.11%)

489 existing lines in 125 files now uncovered.

281602 of 383662 relevant lines covered (73.4%)

138099127.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.61
/source/libs/sync/src/syncAppendEntries.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "syncAppendEntries.h"
18
#include "syncPipeline.h"
19
#include "syncMessage.h"
20
#include "syncRaftLog.h"
21
#include "syncRaftStore.h"
22
#include "syncReplication.h"
23
#include "syncUtil.h"
24
#include "syncCommit.h"
25

26
// TLA+ Spec
27
// HandleAppendEntriesRequest(i, j, m) ==
28
//    LET logOk == \/ m.mprevLogIndex = 0
29
//                 \/ /\ m.mprevLogIndex > 0
30
//                    /\ m.mprevLogIndex <= Len(log[i])
31
//                    /\ m.mprevLogTerm = log[i][m.mprevLogIndex].term
32
//    IN /\ m.mterm <= currentTerm[i]
33
//       /\ \/ /\ \* reject request
34
//                \/ m.mterm < currentTerm[i]
35
//                \/ /\ m.mterm = currentTerm[i]
36
//                   /\ state[i] = Follower
37
//                   /\ \lnot logOk
38
//             /\ Reply([mtype           |-> AppendEntriesResponse,
39
//                       mterm           |-> currentTerm[i],
40
//                       msuccess        |-> FALSE,
41
//                       mmatchIndex     |-> 0,
42
//                       msource         |-> i,
43
//                       mdest           |-> j],
44
//                       m)
45
//             /\ UNCHANGED <<serverVars, logVars>>
46
//          \/ \* return to follower state
47
//             /\ m.mterm = currentTerm[i]
48
//             /\ state[i] = Candidate
49
//             /\ state' = [state EXCEPT ![i] = Follower]
50
//             /\ UNCHANGED <<currentTerm, votedFor, logVars, messages>>
51
//          \/ \* accept request
52
//             /\ m.mterm = currentTerm[i]
53
//             /\ state[i] = Follower
54
//             /\ logOk
55
//             /\ LET index == m.mprevLogIndex + 1
56
//                IN \/ \* already done with request
57
//                       /\ \/ m.mentries = << >>
58
//                          \/ /\ m.mentries /= << >>
59
//                             /\ Len(log[i]) >= index
60
//                             /\ log[i][index].term = m.mentries[1].term
61
//                          \* This could make our commitIndex decrease (for
62
//                          \* example if we process an old, duplicated request),
63
//                          \* but that doesn't really affect anything.
64
//                       /\ commitIndex' = [commitIndex EXCEPT ![i] =
65
//                                              m.mcommitIndex]
66
//                       /\ Reply([mtype           |-> AppendEntriesResponse,
67
//                                 mterm           |-> currentTerm[i],
68
//                                 msuccess        |-> TRUE,
69
//                                 mmatchIndex     |-> m.mprevLogIndex +
70
//                                                     Len(m.mentries),
71
//                                 msource         |-> i,
72
//                                 mdest           |-> j],
73
//                                 m)
74
//                       /\ UNCHANGED <<serverVars, log>>
75
//                   \/ \* conflict: remove 1 entry
76
//                       /\ m.mentries /= << >>
77
//                       /\ Len(log[i]) >= index
78
//                       /\ log[i][index].term /= m.mentries[1].term
79
//                       /\ LET new == [index2 \in 1..(Len(log[i]) - 1) |->
80
//                                          log[i][index2]]
81
//                          IN log' = [log EXCEPT ![i] = new]
82
//                       /\ UNCHANGED <<serverVars, commitIndex, messages>>
83
//                   \/ \* no conflict: append entry
84
//                       /\ m.mentries /= << >>
85
//                       /\ Len(log[i]) = m.mprevLogIndex
86
//                       /\ log' = [log EXCEPT ![i] =
87
//                                      Append(log[i], m.mentries[1])]
88
//                       /\ UNCHANGED <<serverVars, commitIndex, messages>>
89
//       /\ UNCHANGED <<candidateVars, leaderVars>>
90
//
91

92
int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
579,085,523✔
93
  SyncAppendEntries* pMsg = pRpcMsg->pCont;
579,085,523✔
94
  SRpcMsg            rpcRsp = {0};
579,086,919✔
95
  bool               accepted = false;
579,086,919✔
96
  SSyncRaftEntry*    pEntry = NULL;
579,086,919✔
97
  bool               resetElect = false;
579,086,919✔
98

99
  // if already drop replica, do not process
100
  if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
579,086,919✔
101
    syncLogRecvAppendEntries(ths, pMsg, "not in my config", &pRpcMsg->info.traceId);
×
102
    goto _IGNORE;
×
103
  }
104

105
  int64_t nRef = atomic_add_fetch_64(&ths->recvCount, 1);
579,087,031✔
106
  if (nRef <= 0) {
579,087,031✔
107
    sError("vgId:%d, recv count is %" PRId64, ths->vgId, nRef);
×
108
  }
109

110
  int32_t code = syncBuildAppendEntriesReply(&rpcRsp, ths->vgId);
579,087,031✔
111
  if (code != 0) {
579,087,031✔
112
    syncLogRecvAppendEntries(ths, pMsg, "build rsp error", &pRpcMsg->info.traceId);
×
113
    goto _IGNORE;
×
114
  }
115
  rpcRsp.info.traceId = pRpcMsg->info.traceId;
579,087,031✔
116

117
  SyncAppendEntriesReply* pReply = rpcRsp.pCont;
579,087,031✔
118
  // prepare response msg
119
  pReply->srcId = ths->myRaftId;
579,087,031✔
120
  pReply->destId = pMsg->srcId;
579,086,660✔
121
  pReply->term = raftStoreGetTerm(ths);
579,085,762✔
122
  pReply->success = false;
579,087,031✔
123
  pReply->matchIndex = SYNC_INDEX_INVALID;
579,087,031✔
124
  pReply->lastSendIndex = pMsg->prevLogIndex + 1;
579,087,031✔
125
  pReply->startTime = ths->startTime;
579,085,849✔
126
  pReply->appliedIndex = ths->pFsm->FpAppliedIndexCb(ths->pFsm);
579,086,660✔
127

128
  if (pMsg->term < raftStoreGetTerm(ths)) {
579,087,031✔
129
    goto _SEND_RESPONSE;
487✔
130
  }
131

132
  if (pMsg->term > raftStoreGetTerm(ths)) {
579,086,544✔
133
    pReply->term = pMsg->term;
889,935✔
134
  }
135

136
  if(ths->raftCfg.cfg.nodeInfo[ths->raftCfg.cfg.myIndex].nodeRole != TAOS_SYNC_ROLE_LEARNER){
579,086,173✔
137
    syncNodeStepDown(ths, pMsg->term, pMsg->srcId, "appendEntry");
485,838,476✔
138
    resetElect = true;
485,838,320✔
139
  }
140

141
  if (pMsg->dataLen < sizeof(SSyncRaftEntry)) {
579,085,398✔
142
    sError("vgId:%d, incomplete append entries received. prev index:%" PRId64 ", term:%" PRId64 ", datalen:%d",
×
143
           ths->vgId, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
144
    goto _IGNORE;
×
145
  }
146

147
  pEntry = syncEntryBuildFromAppendEntries(pMsg);
579,084,633✔
148
  if (pEntry == NULL) {
579,085,925✔
149
    sError("vgId:%d, failed to get raft entry from append entries since %s", ths->vgId, terrstr());
×
150
    goto _IGNORE;
×
151
  }
152

153
  if (pMsg->prevLogIndex + 1 != pEntry->index || pEntry->term < 0) {
579,085,925✔
UNCOV
154
    sError("vgId:%d, invalid previous log index in msg. index:%" PRId64 ",  term:%" PRId64 ", prevLogIndex:%" PRId64
×
155
           ", prevLogTerm:%" PRId64,
156
           ths->vgId, pEntry->index, pEntry->term, pMsg->prevLogIndex, pMsg->prevLogTerm);
157
    goto _IGNORE;
×
158
  }
159

160
  sGDebug(&pRpcMsg->info.traceId,
579,085,554✔
161
          "vgId:%d, index:%" PRId64 ", recv append entries msg, term:%" PRId64 ", preLogIndex:%" PRId64
162
          ", prevLogTerm:%" PRId64 " commitIndex:%" PRId64 " entryterm:%" PRId64,
163
          pMsg->vgId, pMsg->prevLogIndex + 1, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex,
164
          pEntry->term);
165

166
  if (ths->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
579,085,554✔
167
    pReply->fsmState = ths->fsmState;
×
168
    sWarn("vgId:%d, unable to accept, due to incomplete fsm state. index:%" PRId64, ths->vgId, pEntry->index);
×
169
    syncEntryDestroy(pEntry);
×
170
    goto _SEND_RESPONSE;
×
171
  }
172

173
  // accept
174
  if (syncLogBufferAccept(ths->pLogBuf, ths, pEntry, pMsg->prevLogTerm) < 0) {
579,084,352✔
175
    goto _SEND_RESPONSE;
369,974✔
176
  }
177
  accepted = true;
578,714,068✔
178

179
_SEND_RESPONSE:
579,084,529✔
180
  pEntry = NULL;
579,084,529✔
181
  pReply->matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, &pReply->lastMatchTerm, "OnAppn", pRpcMsg);
579,084,529✔
182
  bool matched = (pReply->matchIndex >= pReply->lastSendIndex);
579,083,374✔
183
  if (accepted && matched) {
579,084,690✔
184
    pReply->success = true;
524,678,822✔
185
    // update commit index only after matching
186
    SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, TMIN(pMsg->commitIndex, pReply->lastSendIndex));
524,682,415✔
187
    sGDebug(&rpcRsp.info.traceId, "vgId:%d, index:%" PRId64 ", last commit index:%" PRId64, ths->vgId,
524,675,066✔
188
           pMsg->prevLogIndex + 1, returnIndex);
189
  }
190

191
  TRACE_SET_MSGID(&(rpcRsp.info.traceId), tGenIdPI64());
579,080,934✔
192
  {
193
    sGDebug(&rpcRsp.info.traceId,
579,072,998✔
194
            "vgId:%d, index:%" PRId64 ", send append entries reply, matchIndex:%" PRId64 " term:%" PRId64
195
            " lastSendIndex:%" PRId64 " to dest addr:0x%016" PRIx64,
196
            ths->vgId, pMsg->prevLogIndex + 1, pReply->matchIndex, pReply->term, pReply->lastSendIndex,
197
            pReply->destId.addr);
198
  }
199
  // ack, i.e. send response
200
  TAOS_CHECK_RETURN(syncNodeSendMsgById(&pReply->destId, ths, &rpcRsp));
579,072,998✔
201

202
  // commit index, i.e. leader notice me
203
  if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
1,158,172,697✔
204
      syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex, &pRpcMsg->info.traceId, "sync-append-entries") < 0) {
579,087,031✔
205
    sGError(&pRpcMsg->info.traceId, "vgId:%d, failed to commit raft fsm log since %s", ths->vgId, terrstr());
24,308✔
206
  }
207

208
  if (resetElect) syncNodeResetElectTimer(ths);
579,086,057✔
209
  return 0;
579,087,031✔
210

211
_IGNORE:
×
212
  rpcFreeCont(rpcRsp.pCont);
×
213
  syncEntryDestroy(pEntry);
×
214
  return 0;
×
215
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc