• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5047

07 May 2026 07:27AM UTC coverage: 73.152% (+0.008%) from 73.144%
#5047

push

travis-ci

web-flow
feat(parser): add support for QUARTER duration alias and related tests (#35268)

6 of 6 new or added lines in 2 files covered. (100.0%)

597 existing lines in 127 files now uncovered.

277691 of 379607 relevant lines covered (73.15%)

132499712.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.61
/source/libs/sync/src/syncAppendEntries.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "syncAppendEntries.h"
18
#include "syncPipeline.h"
19
#include "syncMessage.h"
20
#include "syncRaftLog.h"
21
#include "syncRaftStore.h"
22
#include "syncReplication.h"
23
#include "syncUtil.h"
24
#include "syncCommit.h"
25

26
// TLA+ Spec
27
// HandleAppendEntriesRequest(i, j, m) ==
28
//    LET logOk == \/ m.mprevLogIndex = 0
29
//                 \/ /\ m.mprevLogIndex > 0
30
//                    /\ m.mprevLogIndex <= Len(log[i])
31
//                    /\ m.mprevLogTerm = log[i][m.mprevLogIndex].term
32
//    IN /\ m.mterm <= currentTerm[i]
33
//       /\ \/ /\ \* reject request
34
//                \/ m.mterm < currentTerm[i]
35
//                \/ /\ m.mterm = currentTerm[i]
36
//                   /\ state[i] = Follower
37
//                   /\ \lnot logOk
38
//             /\ Reply([mtype           |-> AppendEntriesResponse,
39
//                       mterm           |-> currentTerm[i],
40
//                       msuccess        |-> FALSE,
41
//                       mmatchIndex     |-> 0,
42
//                       msource         |-> i,
43
//                       mdest           |-> j],
44
//                       m)
45
//             /\ UNCHANGED <<serverVars, logVars>>
46
//          \/ \* return to follower state
47
//             /\ m.mterm = currentTerm[i]
48
//             /\ state[i] = Candidate
49
//             /\ state' = [state EXCEPT ![i] = Follower]
50
//             /\ UNCHANGED <<currentTerm, votedFor, logVars, messages>>
51
//          \/ \* accept request
52
//             /\ m.mterm = currentTerm[i]
53
//             /\ state[i] = Follower
54
//             /\ logOk
55
//             /\ LET index == m.mprevLogIndex + 1
56
//                IN \/ \* already done with request
57
//                       /\ \/ m.mentries = << >>
58
//                          \/ /\ m.mentries /= << >>
59
//                             /\ Len(log[i]) >= index
60
//                             /\ log[i][index].term = m.mentries[1].term
61
//                          \* This could make our commitIndex decrease (for
62
//                          \* example if we process an old, duplicated request),
63
//                          \* but that doesn't really affect anything.
64
//                       /\ commitIndex' = [commitIndex EXCEPT ![i] =
65
//                                              m.mcommitIndex]
66
//                       /\ Reply([mtype           |-> AppendEntriesResponse,
67
//                                 mterm           |-> currentTerm[i],
68
//                                 msuccess        |-> TRUE,
69
//                                 mmatchIndex     |-> m.mprevLogIndex +
70
//                                                     Len(m.mentries),
71
//                                 msource         |-> i,
72
//                                 mdest           |-> j],
73
//                                 m)
74
//                       /\ UNCHANGED <<serverVars, log>>
75
//                   \/ \* conflict: remove 1 entry
76
//                       /\ m.mentries /= << >>
77
//                       /\ Len(log[i]) >= index
78
//                       /\ log[i][index].term /= m.mentries[1].term
79
//                       /\ LET new == [index2 \in 1..(Len(log[i]) - 1) |->
80
//                                          log[i][index2]]
81
//                          IN log' = [log EXCEPT ![i] = new]
82
//                       /\ UNCHANGED <<serverVars, commitIndex, messages>>
83
//                   \/ \* no conflict: append entry
84
//                       /\ m.mentries /= << >>
85
//                       /\ Len(log[i]) = m.mprevLogIndex
86
//                       /\ log' = [log EXCEPT ![i] =
87
//                                      Append(log[i], m.mentries[1])]
88
//                       /\ UNCHANGED <<serverVars, commitIndex, messages>>
89
//       /\ UNCHANGED <<candidateVars, leaderVars>>
90
//
91

92
int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
112,410,504✔
93
  SyncAppendEntries* pMsg = pRpcMsg->pCont;
112,410,504✔
94
  SRpcMsg            rpcRsp = {0};
112,410,504✔
95
  bool               accepted = false;
112,410,504✔
96
  SSyncRaftEntry*    pEntry = NULL;
112,410,504✔
97
  bool               resetElect = false;
112,410,504✔
98

99
  // if already drop replica, do not process
100
  if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
112,410,504✔
101
    syncLogRecvAppendEntries(ths, pMsg, "not in my config", &pRpcMsg->info.traceId);
×
102
    goto _IGNORE;
×
103
  }
104

105
  int64_t nRef = atomic_add_fetch_64(&ths->recvCount, 1);
112,410,504✔
106
  if (nRef <= 0) {
112,410,504✔
107
    sError("vgId:%d, recv count is %" PRId64, ths->vgId, nRef);
×
108
  }
109

110
  int32_t code = syncBuildAppendEntriesReply(&rpcRsp, ths->vgId);
112,410,504✔
111
  if (code != 0) {
112,410,504✔
112
    syncLogRecvAppendEntries(ths, pMsg, "build rsp error", &pRpcMsg->info.traceId);
×
113
    goto _IGNORE;
×
114
  }
115
  rpcRsp.info.traceId = pRpcMsg->info.traceId;
112,410,504✔
116

117
  SyncAppendEntriesReply* pReply = rpcRsp.pCont;
112,410,504✔
118
  // prepare response msg
119
  pReply->srcId = ths->myRaftId;
112,410,504✔
120
  pReply->destId = pMsg->srcId;
112,410,504✔
121
  pReply->term = raftStoreGetTerm(ths);
112,410,180✔
122
  pReply->success = false;
112,410,186✔
123
  pReply->matchIndex = SYNC_INDEX_INVALID;
112,410,186✔
124
  pReply->lastSendIndex = pMsg->prevLogIndex + 1;
112,409,569✔
125
  pReply->startTime = ths->startTime;
112,410,186✔
126
  pReply->appliedIndex = ths->pFsm->FpAppliedIndexCb(ths->pFsm);
112,410,186✔
127

128
  if (pMsg->term < raftStoreGetTerm(ths)) {
112,410,504✔
129
    goto _SEND_RESPONSE;
1,480✔
130
  }
131

132
  if (pMsg->term > raftStoreGetTerm(ths)) {
112,409,024✔
133
    pReply->term = pMsg->term;
193,631✔
134
  }
135

136
  if(ths->raftCfg.cfg.nodeInfo[ths->raftCfg.cfg.myIndex].nodeRole != TAOS_SYNC_ROLE_LEARNER){
112,408,243✔
137
    syncNodeStepDown(ths, pMsg->term, pMsg->srcId, "appendEntry");
98,791,545✔
138
    resetElect = true;
98,790,440✔
139
  }
140

141
  if (pMsg->dataLen < sizeof(SSyncRaftEntry)) {
112,407,323✔
142
    sError("vgId:%d, incomplete append entries received. prev index:%" PRId64 ", term:%" PRId64 ", datalen:%d",
×
143
           ths->vgId, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
144
    goto _IGNORE;
×
145
  }
146

147
  pEntry = syncEntryBuildFromAppendEntries(pMsg);
112,407,919✔
148
  if (pEntry == NULL) {
112,407,919✔
149
    sError("vgId:%d, failed to get raft entry from append entries since %s", ths->vgId, terrstr());
×
150
    goto _IGNORE;
×
151
  }
152

153
  if (pMsg->prevLogIndex + 1 != pEntry->index || pEntry->term < 0) {
112,407,919✔
UNCOV
154
    sError("vgId:%d, invalid previous log index in msg. index:%" PRId64 ",  term:%" PRId64 ", prevLogIndex:%" PRId64
×
155
           ", prevLogTerm:%" PRId64,
156
           ths->vgId, pEntry->index, pEntry->term, pMsg->prevLogIndex, pMsg->prevLogTerm);
157
    goto _IGNORE;
×
158
  }
159

160
  sGDebug(&pRpcMsg->info.traceId,
112,406,943✔
161
          "vgId:%d, index:%" PRId64 ", recv append entries msg, term:%" PRId64 ", preLogIndex:%" PRId64
162
          ", prevLogTerm:%" PRId64 " commitIndex:%" PRId64 " entryterm:%" PRId64,
163
          pMsg->vgId, pMsg->prevLogIndex + 1, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex,
164
          pEntry->term);
165

166
  if (ths->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
112,406,943✔
167
    pReply->fsmState = ths->fsmState;
×
168
    sWarn("vgId:%d, unable to accept, due to incomplete fsm state. index:%" PRId64, ths->vgId, pEntry->index);
×
169
    syncEntryDestroy(pEntry);
×
170
    goto _SEND_RESPONSE;
×
171
  }
172

173
  // accept
174
  if (syncLogBufferAccept(ths->pLogBuf, ths, pEntry, pMsg->prevLogTerm) < 0) {
112,406,943✔
175
    goto _SEND_RESPONSE;
338,017✔
176
  }
177
  accepted = true;
112,070,316✔
178

179
_SEND_RESPONSE:
112,409,813✔
180
  pEntry = NULL;
112,409,813✔
181
  pReply->matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, &pReply->lastMatchTerm, "OnAppn", pRpcMsg);
112,409,813✔
182
  bool matched = (pReply->matchIndex >= pReply->lastSendIndex);
112,406,657✔
183
  if (accepted && matched) {
112,405,947✔
184
    pReply->success = true;
107,219,153✔
185
    // update commit index only after matching
186
    SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, TMIN(pMsg->commitIndex, pReply->lastSendIndex));
107,221,398✔
187
    sGDebug(&rpcRsp.info.traceId, "vgId:%d, index:%" PRId64 ", last commit index:%" PRId64, ths->vgId,
107,219,185✔
188
           pMsg->prevLogIndex + 1, returnIndex);
189
  }
190

191
  TRACE_SET_MSGID(&(rpcRsp.info.traceId), tGenIdPI64());
112,405,979✔
192
  {
193
    sGDebug(&rpcRsp.info.traceId,
112,404,983✔
194
            "vgId:%d, index:%" PRId64 ", send append entries reply, matchIndex:%" PRId64 " term:%" PRId64
195
            " lastSendIndex:%" PRId64 " to dest addr:0x%016" PRIx64,
196
            ths->vgId, pMsg->prevLogIndex + 1, pReply->matchIndex, pReply->term, pReply->lastSendIndex,
197
            pReply->destId.addr);
198
  }
199
  // ack, i.e. send response
200
  TAOS_CHECK_RETURN(syncNodeSendMsgById(&pReply->destId, ths, &rpcRsp));
112,404,983✔
201

202
  // commit index, i.e. leader notice me
203
  if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
224,820,315✔
204
      syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex, &pRpcMsg->info.traceId, "sync-append-entries") < 0) {
112,410,504✔
205
    sGError(&pRpcMsg->info.traceId, "vgId:%d, failed to commit raft fsm log since %s", ths->vgId, terrstr());
21,621✔
206
  }
207

208
  if (resetElect) syncNodeResetElectTimer(ths);
112,409,688✔
209
  return 0;
112,410,504✔
210

211
_IGNORE:
×
212
  rpcFreeCont(rpcRsp.pCont);
×
213
  syncEntryDestroy(pEntry);
×
214
  return 0;
×
215
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc