• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3531

19 Nov 2024 10:42AM UTC coverage: 60.213% (-0.006%) from 60.219%
#3531

push

travis-ci

web-flow
Merge pull request #28777 from taosdata/fix/3.0/TD-32366

fix:TD-32366/stmt add geometry datatype check

118529 of 252344 branches covered (46.97%)

Branch coverage included in aggregate %.

7 of 48 new or added lines in 3 files covered. (14.58%)

2282 existing lines in 115 files now uncovered.

199096 of 275161 relevant lines covered (72.36%)

6067577.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.39
/source/libs/sync/src/syncAppendEntries.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "syncAppendEntries.h"
18
#include "syncPipeline.h"
19
#include "syncMessage.h"
20
#include "syncRaftLog.h"
21
#include "syncRaftStore.h"
22
#include "syncReplication.h"
23
#include "syncUtil.h"
24
#include "syncCommit.h"
25

26
// TLA+ Spec
27
// HandleAppendEntriesRequest(i, j, m) ==
28
//    LET logOk == \/ m.mprevLogIndex = 0
29
//                 \/ /\ m.mprevLogIndex > 0
30
//                    /\ m.mprevLogIndex <= Len(log[i])
31
//                    /\ m.mprevLogTerm = log[i][m.mprevLogIndex].term
32
//    IN /\ m.mterm <= currentTerm[i]
33
//       /\ \/ /\ \* reject request
34
//                \/ m.mterm < currentTerm[i]
35
//                \/ /\ m.mterm = currentTerm[i]
36
//                   /\ state[i] = Follower
37
//                   /\ \lnot logOk
38
//             /\ Reply([mtype           |-> AppendEntriesResponse,
39
//                       mterm           |-> currentTerm[i],
40
//                       msuccess        |-> FALSE,
41
//                       mmatchIndex     |-> 0,
42
//                       msource         |-> i,
43
//                       mdest           |-> j],
44
//                       m)
45
//             /\ UNCHANGED <<serverVars, logVars>>
46
//          \/ \* return to follower state
47
//             /\ m.mterm = currentTerm[i]
48
//             /\ state[i] = Candidate
49
//             /\ state' = [state EXCEPT ![i] = Follower]
50
//             /\ UNCHANGED <<currentTerm, votedFor, logVars, messages>>
51
//          \/ \* accept request
52
//             /\ m.mterm = currentTerm[i]
53
//             /\ state[i] = Follower
54
//             /\ logOk
55
//             /\ LET index == m.mprevLogIndex + 1
56
//                IN \/ \* already done with request
57
//                       /\ \/ m.mentries = << >>
58
//                          \/ /\ m.mentries /= << >>
59
//                             /\ Len(log[i]) >= index
60
//                             /\ log[i][index].term = m.mentries[1].term
61
//                          \* This could make our commitIndex decrease (for
62
//                          \* example if we process an old, duplicated request),
63
//                          \* but that doesn't really affect anything.
64
//                       /\ commitIndex' = [commitIndex EXCEPT ![i] =
65
//                                              m.mcommitIndex]
66
//                       /\ Reply([mtype           |-> AppendEntriesResponse,
67
//                                 mterm           |-> currentTerm[i],
68
//                                 msuccess        |-> TRUE,
69
//                                 mmatchIndex     |-> m.mprevLogIndex +
70
//                                                     Len(m.mentries),
71
//                                 msource         |-> i,
72
//                                 mdest           |-> j],
73
//                                 m)
74
//                       /\ UNCHANGED <<serverVars, log>>
75
//                   \/ \* conflict: remove 1 entry
76
//                       /\ m.mentries /= << >>
77
//                       /\ Len(log[i]) >= index
78
//                       /\ log[i][index].term /= m.mentries[1].term
79
//                       /\ LET new == [index2 \in 1..(Len(log[i]) - 1) |->
80
//                                          log[i][index2]]
81
//                          IN log' = [log EXCEPT ![i] = new]
82
//                       /\ UNCHANGED <<serverVars, commitIndex, messages>>
83
//                   \/ \* no conflict: append entry
84
//                       /\ m.mentries /= << >>
85
//                       /\ Len(log[i]) = m.mprevLogIndex
86
//                       /\ log' = [log EXCEPT ![i] =
87
//                                      Append(log[i], m.mentries[1])]
88
//                       /\ UNCHANGED <<serverVars, commitIndex, messages>>
89
//       /\ UNCHANGED <<candidateVars, leaderVars>>
90
//
91

92
int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
353,647✔
93
  SyncAppendEntries* pMsg = pRpcMsg->pCont;
353,647✔
94
  SRpcMsg            rpcRsp = {0};
353,647✔
95
  bool               accepted = false;
353,647✔
96
  SSyncRaftEntry*    pEntry = NULL;
353,647✔
97
  bool               resetElect = false;
353,647✔
98
  const STraceId*    trace = &pRpcMsg->info.traceId;
353,647✔
99
  char               tbuf[40] = {0};
353,647✔
100

101
  // if already drop replica, do not process
102
  if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
353,647✔
103
    syncLogRecvAppendEntries(ths, pMsg, "not in my config");
2✔
104
    goto _IGNORE;
2✔
105
  }
106

107
  int32_t nRef = atomic_fetch_add_32(&ths->recvCount, 1);
353,657✔
108
  if (nRef <= 0) {
353,657✔
109
    sError("vgId:%d, recv count is %d", ths->vgId, nRef);
2,264!
110
  }
111

112
  int32_t code = syncBuildAppendEntriesReply(&rpcRsp, ths->vgId);
353,657✔
113
  if (code != 0) {
353,657!
UNCOV
114
    syncLogRecvAppendEntries(ths, pMsg, "build rsp error");
×
UNCOV
115
    goto _IGNORE;
×
116
  }
117

118
  SyncAppendEntriesReply* pReply = rpcRsp.pCont;
353,657✔
119
  // prepare response msg
120
  pReply->srcId = ths->myRaftId;
353,657✔
121
  pReply->destId = pMsg->srcId;
353,657✔
122
  pReply->term = raftStoreGetTerm(ths);
353,657✔
123
  pReply->success = false;
353,657✔
124
  pReply->matchIndex = SYNC_INDEX_INVALID;
353,657✔
125
  pReply->lastSendIndex = pMsg->prevLogIndex + 1;
353,657✔
126
  pReply->startTime = ths->startTime;
353,657✔
127

128
  if (pMsg->term < raftStoreGetTerm(ths)) {
353,657✔
129
    goto _SEND_RESPONSE;
2✔
130
  }
131

132
  if (pMsg->term > raftStoreGetTerm(ths)) {
353,654✔
133
    pReply->term = pMsg->term;
332✔
134
  }
135

136
  if(ths->raftCfg.cfg.nodeInfo[ths->raftCfg.cfg.myIndex].nodeRole != TAOS_SYNC_ROLE_LEARNER){
353,654✔
137
    syncNodeStepDown(ths, pMsg->term);
326,717✔
138
    resetElect = true;
326,711✔
139
  }
140

141
  if (pMsg->dataLen < sizeof(SSyncRaftEntry)) {
353,648!
UNCOV
142
    sError("vgId:%d, incomplete append entries received. prev index:%" PRId64 ", term:%" PRId64 ", datalen:%d",
×
143
           ths->vgId, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
144
    goto _IGNORE;
×
145
  }
146

147
  pEntry = syncEntryBuildFromAppendEntries(pMsg);
353,648✔
148
  if (pEntry == NULL) {
353,645!
UNCOV
149
    sError("vgId:%d, failed to get raft entry from append entries since %s", ths->vgId, terrstr());
×
UNCOV
150
    goto _IGNORE;
×
151
  }
152

153
  if (pMsg->prevLogIndex + 1 != pEntry->index || pEntry->term < 0) {
353,645!
UNCOV
154
    sError("vgId:%d, invalid previous log index in msg. index:%" PRId64 ",  term:%" PRId64 ", prevLogIndex:%" PRId64
×
155
           ", prevLogTerm:%" PRId64,
156
           ths->vgId, pEntry->index, pEntry->term, pMsg->prevLogIndex, pMsg->prevLogTerm);
UNCOV
157
    goto _IGNORE;
×
158
  }
159

160
  sGTrace("vgId:%d, recv append entries msg. index:%" PRId64 ", term:%" PRId64 ", preLogIndex:%" PRId64
353,646!
161
          ", prevLogTerm:%" PRId64 " commitIndex:%" PRId64 " entryterm:%" PRId64,
162
          pMsg->vgId, pMsg->prevLogIndex + 1, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex,
163
          pEntry->term);
164

165
  if (ths->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
353,646!
UNCOV
166
    pReply->fsmState = ths->fsmState;
×
UNCOV
167
    sWarn("vgId:%d, unable to accept, due to incomplete fsm state. index:%" PRId64, ths->vgId, pEntry->index);
×
UNCOV
168
    syncEntryDestroy(pEntry);
×
UNCOV
169
    goto _SEND_RESPONSE;
×
170
  }
171

172
  // accept
173
  if (syncLogBufferAccept(ths->pLogBuf, ths, pEntry, pMsg->prevLogTerm) < 0) {
353,646✔
174
    goto _SEND_RESPONSE;
623✔
175
  }
176
  accepted = true;
353,019✔
177

178
_SEND_RESPONSE:
353,644✔
179
  pEntry = NULL;
353,644✔
180
  pReply->matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, &pReply->lastMatchTerm, "OnAppn");
353,644✔
181
  bool matched = (pReply->matchIndex >= pReply->lastSendIndex);
353,632✔
182
  if (accepted && matched) {
353,632✔
183
    pReply->success = true;
328,036✔
184
    // update commit index only after matching
185
    SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, TMIN(pMsg->commitIndex, pReply->lastSendIndex));
328,036✔
186
    sTrace("vgId:%d, update commit return index %" PRId64 "", ths->vgId, returnIndex);
328,037!
187
  }
188

189
  TRACE_SET_MSGID(&(rpcRsp.info.traceId), tGenIdPI64());
353,633✔
190
  trace = &(rpcRsp.info.traceId);
353,654✔
191
  sGTrace("vgId:%d, send append reply matchIndex:%" PRId64 " term:%" PRId64 " lastSendIndex:%" PRId64
353,654!
192
          " to dest: 0x%016" PRIx64,
193
          ths->vgId, pReply->matchIndex, pReply->term, pReply->lastSendIndex, pReply->destId.addr);
194
  // ack, i.e. send response
195
  TAOS_CHECK_RETURN(syncNodeSendMsgById(&pReply->destId, ths, &rpcRsp));
353,654!
196

197
  // commit index, i.e. leader notice me
198
  if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE && syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
353,655!
199
    sError("vgId:%d, failed to commit raft fsm log since %s.", ths->vgId, terrstr());
4!
200
  }
201

202
  if (resetElect) syncNodeResetElectTimer(ths);
353,656✔
203
  return 0;
353,657✔
204

205
_IGNORE:
2✔
206
  rpcFreeCont(rpcRsp.pCont);
2✔
207
  syncEntryDestroy(pEntry);
2✔
208
  return 0;
2✔
209
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc