• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5056

17 May 2026 01:15AM UTC coverage: 73.384% (+0.03%) from 73.355%
#5056

push

travis-ci

web-flow
feat (TDgpt): Dynamic Model Synchronization Enhancements (#35344)

* refactor: do some internal refactor.

* fix: fix multiprocess sync issue.

* feat: add dynamic anomaly detection and forecasting services

* fix: log error message for undeploying model in exception handling

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* fix: handle undeploy when model exists only on disk

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/286aafa0-c3ce-4c27-b803-2707571e9dc1

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: guard dynamic registry concurrent access

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: tighten service list locking scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: restore prophet support and update tests per review feedback

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: improve test name and move copy inside lock scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* Potential fix for pull request finding

Co-au... (continued)

281643 of 383795 relevant lines covered (73.38%)

135942701.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.61
/source/libs/sync/src/syncAppendEntries.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "syncAppendEntries.h"
18
#include "syncPipeline.h"
19
#include "syncMessage.h"
20
#include "syncRaftLog.h"
21
#include "syncRaftStore.h"
22
#include "syncReplication.h"
23
#include "syncUtil.h"
24
#include "syncCommit.h"
25

26
// TLA+ Spec
27
// HandleAppendEntriesRequest(i, j, m) ==
28
//    LET logOk == \/ m.mprevLogIndex = 0
29
//                 \/ /\ m.mprevLogIndex > 0
30
//                    /\ m.mprevLogIndex <= Len(log[i])
31
//                    /\ m.mprevLogTerm = log[i][m.mprevLogIndex].term
32
//    IN /\ m.mterm <= currentTerm[i]
33
//       /\ \/ /\ \* reject request
34
//                \/ m.mterm < currentTerm[i]
35
//                \/ /\ m.mterm = currentTerm[i]
36
//                   /\ state[i] = Follower
37
//                   /\ \lnot logOk
38
//             /\ Reply([mtype           |-> AppendEntriesResponse,
39
//                       mterm           |-> currentTerm[i],
40
//                       msuccess        |-> FALSE,
41
//                       mmatchIndex     |-> 0,
42
//                       msource         |-> i,
43
//                       mdest           |-> j],
44
//                       m)
45
//             /\ UNCHANGED <<serverVars, logVars>>
46
//          \/ \* return to follower state
47
//             /\ m.mterm = currentTerm[i]
48
//             /\ state[i] = Candidate
49
//             /\ state' = [state EXCEPT ![i] = Follower]
50
//             /\ UNCHANGED <<currentTerm, votedFor, logVars, messages>>
51
//          \/ \* accept request
52
//             /\ m.mterm = currentTerm[i]
53
//             /\ state[i] = Follower
54
//             /\ logOk
55
//             /\ LET index == m.mprevLogIndex + 1
56
//                IN \/ \* already done with request
57
//                       /\ \/ m.mentries = << >>
58
//                          \/ /\ m.mentries /= << >>
59
//                             /\ Len(log[i]) >= index
60
//                             /\ log[i][index].term = m.mentries[1].term
61
//                          \* This could make our commitIndex decrease (for
62
//                          \* example if we process an old, duplicated request),
63
//                          \* but that doesn't really affect anything.
64
//                       /\ commitIndex' = [commitIndex EXCEPT ![i] =
65
//                                              m.mcommitIndex]
66
//                       /\ Reply([mtype           |-> AppendEntriesResponse,
67
//                                 mterm           |-> currentTerm[i],
68
//                                 msuccess        |-> TRUE,
69
//                                 mmatchIndex     |-> m.mprevLogIndex +
70
//                                                     Len(m.mentries),
71
//                                 msource         |-> i,
72
//                                 mdest           |-> j],
73
//                                 m)
74
//                       /\ UNCHANGED <<serverVars, log>>
75
//                   \/ \* conflict: remove 1 entry
76
//                       /\ m.mentries /= << >>
77
//                       /\ Len(log[i]) >= index
78
//                       /\ log[i][index].term /= m.mentries[1].term
79
//                       /\ LET new == [index2 \in 1..(Len(log[i]) - 1) |->
80
//                                          log[i][index2]]
81
//                          IN log' = [log EXCEPT ![i] = new]
82
//                       /\ UNCHANGED <<serverVars, commitIndex, messages>>
83
//                   \/ \* no conflict: append entry
84
//                       /\ m.mentries /= << >>
85
//                       /\ Len(log[i]) = m.mprevLogIndex
86
//                       /\ log' = [log EXCEPT ![i] =
87
//                                      Append(log[i], m.mentries[1])]
88
//                       /\ UNCHANGED <<serverVars, commitIndex, messages>>
89
//       /\ UNCHANGED <<candidateVars, leaderVars>>
90
//
91

92
int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
515,880,396✔
93
  SyncAppendEntries* pMsg = pRpcMsg->pCont;
515,880,396✔
94
  SRpcMsg            rpcRsp = {0};
515,880,396✔
95
  bool               accepted = false;
515,880,396✔
96
  SSyncRaftEntry*    pEntry = NULL;
515,880,396✔
97
  bool               resetElect = false;
515,880,396✔
98

99
  // if already drop replica, do not process
100
  if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
515,880,396✔
101
    syncLogRecvAppendEntries(ths, pMsg, "not in my config", &pRpcMsg->info.traceId);
×
102
    goto _IGNORE;
×
103
  }
104

105
  int64_t nRef = atomic_add_fetch_64(&ths->recvCount, 1);
515,880,396✔
106
  if (nRef <= 0) {
515,880,396✔
107
    sError("vgId:%d, recv count is %" PRId64, ths->vgId, nRef);
×
108
  }
109

110
  int32_t code = syncBuildAppendEntriesReply(&rpcRsp, ths->vgId);
515,880,396✔
111
  if (code != 0) {
515,880,267✔
112
    syncLogRecvAppendEntries(ths, pMsg, "build rsp error", &pRpcMsg->info.traceId);
×
113
    goto _IGNORE;
×
114
  }
115
  rpcRsp.info.traceId = pRpcMsg->info.traceId;
515,880,267✔
116

117
  SyncAppendEntriesReply* pReply = rpcRsp.pCont;
515,880,267✔
118
  // prepare response msg
119
  pReply->srcId = ths->myRaftId;
515,880,267✔
120
  pReply->destId = pMsg->srcId;
515,880,396✔
121
  pReply->term = raftStoreGetTerm(ths);
515,879,803✔
122
  pReply->success = false;
515,880,396✔
123
  pReply->matchIndex = SYNC_INDEX_INVALID;
515,880,396✔
124
  pReply->lastSendIndex = pMsg->prevLogIndex + 1;
515,880,258✔
125
  pReply->startTime = ths->startTime;
515,880,258✔
126
  pReply->appliedIndex = ths->pFsm->FpAppliedIndexCb(ths->pFsm);
515,880,258✔
127

128
  if (pMsg->term < raftStoreGetTerm(ths)) {
515,880,258✔
129
    goto _SEND_RESPONSE;
520✔
130
  }
131

132
  if (pMsg->term > raftStoreGetTerm(ths)) {
515,879,876✔
133
    pReply->term = pMsg->term;
915,236✔
134
  }
135

136
  if(ths->raftCfg.cfg.nodeInfo[ths->raftCfg.cfg.myIndex].nodeRole != TAOS_SYNC_ROLE_LEARNER){
515,879,876✔
137
    syncNodeStepDown(ths, pMsg->term, pMsg->srcId, "appendEntry");
415,761,077✔
138
    resetElect = true;
415,760,707✔
139
  }
140

141
  if (pMsg->dataLen < sizeof(SSyncRaftEntry)) {
515,879,506✔
142
    sError("vgId:%d, incomplete append entries received. prev index:%" PRId64 ", term:%" PRId64 ", datalen:%d",
×
143
           ths->vgId, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
144
    goto _IGNORE;
×
145
  }
146

147
  pEntry = syncEntryBuildFromAppendEntries(pMsg);
515,879,506✔
148
  if (pEntry == NULL) {
515,879,876✔
149
    sError("vgId:%d, failed to get raft entry from append entries since %s", ths->vgId, terrstr());
×
150
    goto _IGNORE;
×
151
  }
152

153
  if (pMsg->prevLogIndex + 1 != pEntry->index || pEntry->term < 0) {
515,879,876✔
154
    sError("vgId:%d, invalid previous log index in msg. index:%" PRId64 ",  term:%" PRId64 ", prevLogIndex:%" PRId64
×
155
           ", prevLogTerm:%" PRId64,
156
           ths->vgId, pEntry->index, pEntry->term, pMsg->prevLogIndex, pMsg->prevLogTerm);
157
    goto _IGNORE;
×
158
  }
159

160
  sGDebug(&pRpcMsg->info.traceId,
515,879,876✔
161
          "vgId:%d, index:%" PRId64 ", recv append entries msg, term:%" PRId64 ", preLogIndex:%" PRId64
162
          ", prevLogTerm:%" PRId64 " commitIndex:%" PRId64 " entryterm:%" PRId64,
163
          pMsg->vgId, pMsg->prevLogIndex + 1, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex,
164
          pEntry->term);
165

166
  if (ths->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
515,879,876✔
167
    pReply->fsmState = ths->fsmState;
×
168
    sWarn("vgId:%d, unable to accept, due to incomplete fsm state. index:%" PRId64, ths->vgId, pEntry->index);
×
169
    syncEntryDestroy(pEntry);
×
170
    goto _SEND_RESPONSE;
×
171
  }
172

173
  // accept
174
  if (syncLogBufferAccept(ths->pLogBuf, ths, pEntry, pMsg->prevLogTerm) < 0) {
515,879,876✔
175
    goto _SEND_RESPONSE;
317,660✔
176
  }
177
  accepted = true;
515,561,844✔
178

179
_SEND_RESPONSE:
515,880,024✔
180
  pEntry = NULL;
515,880,024✔
181
  pReply->matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, &pReply->lastMatchTerm, "OnAppn", pRpcMsg);
515,880,024✔
182
  bool matched = (pReply->matchIndex >= pReply->lastSendIndex);
515,878,946✔
183
  if (accepted && matched) {
515,879,291✔
184
    pReply->success = true;
379,446,075✔
185
    // update commit index only after matching
186
    SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, TMIN(pMsg->commitIndex, pReply->lastSendIndex));
379,447,253✔
187
    sGDebug(&rpcRsp.info.traceId, "vgId:%d, index:%" PRId64 ", last commit index:%" PRId64, ths->vgId,
379,444,709✔
188
           pMsg->prevLogIndex + 1, returnIndex);
189
  }
190

191
  TRACE_SET_MSGID(&(rpcRsp.info.traceId), tGenIdPI64());
515,877,925✔
192
  {
193
    sGDebug(&rpcRsp.info.traceId,
515,872,631✔
194
            "vgId:%d, index:%" PRId64 ", send append entries reply, matchIndex:%" PRId64 " term:%" PRId64
195
            " lastSendIndex:%" PRId64 " to dest addr:0x%016" PRIx64,
196
            ths->vgId, pMsg->prevLogIndex + 1, pReply->matchIndex, pReply->term, pReply->lastSendIndex,
197
            pReply->destId.addr);
198
  }
199
  // ack, i.e. send response
200
  TAOS_CHECK_RETURN(syncNodeSendMsgById(&pReply->destId, ths, &rpcRsp));
515,872,631✔
201

202
  // commit index, i.e. leader notice me
203
  if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
1,031,759,806✔
204
      syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex, &pRpcMsg->info.traceId, "sync-append-entries") < 0) {
515,880,396✔
205
    sGError(&pRpcMsg->info.traceId, "vgId:%d, failed to commit raft fsm log since %s", ths->vgId, terrstr());
173,917✔
206
  }
207

208
  if (resetElect) syncNodeResetElectTimer(ths);
515,880,048✔
209
  return 0;
515,880,396✔
210

211
_IGNORE:
×
212
  rpcFreeCont(rpcRsp.pCont);
×
213
  syncEntryDestroy(pEntry);
×
214
  return 0;
×
215
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc