• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5057

17 May 2026 01:15AM UTC coverage: 73.406% (+0.02%) from 73.384%
#5057

push

travis-ci

web-flow
feat (TDgpt): Dynamic Model Synchronization Enhancements (#35344)

* refactor: do some internal refactor.

* fix: fix multiprocess sync issue.

* feat: add dynamic anomaly detection and forecasting services

* fix: log error message for undeploying model in exception handling

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* fix: handle undeploy when model exists only on disk

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/286aafa0-c3ce-4c27-b803-2707571e9dc1

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: guard dynamic registry concurrent access

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: tighten service list locking scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: restore prophet support and update tests per review feedback

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: improve test name and move copy inside lock scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* Potential fix for pull request finding

Co-au... (continued)

281727 of 383795 relevant lines covered (73.41%)

136101761.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.27
/source/libs/sync/src/syncRequestVote.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "syncRequestVote.h"
18
#include "syncMessage.h"
19
#include "syncRaftCfg.h"
20
#include "syncRaftStore.h"
21
#include "syncUtil.h"
22
#include "syncVoteMgr.h"
23

24
// TLA+ Spec
25
// HandleRequestVoteRequest(i, j, m) ==
26
//    LET logOk == \/ m.mlastLogTerm > LastTerm(log[i])
27
//                 \/ /\ m.mlastLogTerm = LastTerm(log[i])
28
//                    /\ m.mlastLogIndex >= Len(log[i])
29
//        grant == /\ m.mterm = currentTerm[i]
30
//                 /\ logOk
31
//                 /\ votedFor[i] \in {Nil, j}
32
//    IN /\ m.mterm <= currentTerm[i]
33
//       /\ \/ grant  /\ votedFor' = [votedFor EXCEPT ![i] = j]
34
//          \/ ~grant /\ UNCHANGED votedFor
35
//       /\ Reply([mtype        |-> RequestVoteResponse,
36
//                 mterm        |-> currentTerm[i],
37
//                 mvoteGranted |-> grant,
38
//                 \* mlog is used just for the `elections' history variable for
39
//                 \* the proof. It would not exist in a real implementation.
40
//                 mlog         |-> log[i],
41
//                 msource      |-> i,
42
//                 mdest        |-> j],
43
//                 m)
44
//       /\ UNCHANGED <<state, currentTerm, candidateVars, leaderVars, logVars>>
45
//
46

47
static bool syncNodeOnRequestVoteLogOK(SSyncNode* ths, SyncRequestVote* pMsg) {
1,076,007✔
48
  SyncTerm  myLastTerm = syncNodeGetLastTerm(ths);
1,076,007✔
49
  SyncIndex myLastIndex = syncNodeGetLastIndex(ths);
1,076,007✔
50

51
  if (myLastTerm == SYNC_TERM_INVALID) {
1,076,007✔
52
    sNWarn(ths,
×
53
           "logok:0, my last term invalid, {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64
54
           ", recv-lindex:%" PRId64 ", recv-term:%" PRIu64 "}",
55
           myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term);
56
    return false;
×
57
  }
58

59
  if (pMsg->lastLogTerm > myLastTerm) {
1,076,007✔
60
    sNInfo(ths,
79,676✔
61
           "logok:1, larger log term, {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64
62
           ", recv-lindex:%" PRId64 ", recv-term:%" PRIu64 "}",
63
           myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term);
64

65
    if (pMsg->lastLogIndex < ths->commitIndex) {
79,676✔
66
      sNWarn(ths,
×
67
             "logok:1, commit rollback required. {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64
68
             ", recv-lindex:%" PRId64 ", recv-term:%" PRIu64 "}",
69
             myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term);
70
    }
71
    return true;
79,676✔
72
  }
73

74
  if (pMsg->lastLogTerm == myLastTerm && pMsg->lastLogIndex >= myLastIndex) {
996,331✔
75
    sNInfo(ths,
910,427✔
76
           "logok:1, larger log index , {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64
77
           ", recv-lindex:%" PRId64 ", recv-term:%" PRIu64 "}",
78
           myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term);
79
    return true;
910,427✔
80
  }
81

82
  sNWarn(ths,
85,904✔
83
         "logok:0, {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64 ", recv-lindex:%" PRId64
84
         ", recv-term:%" PRIu64 "}",
85
         myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term);
86
  return false;
85,904✔
87
}
88

89
int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
1,076,007✔
90
  int32_t          ret = 0;
1,076,007✔
91
  SyncRequestVote* pMsg = pRpcMsg->pCont;
1,076,007✔
92
  bool             resetElect = false;
1,076,007✔
93

94
  syncLogRecvRequestVote(ths, pMsg, -1, "", "recv", &pRpcMsg->info.traceId);
1,076,007✔
95

96
  // if already drop replica, do not process
97
  if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) {
1,076,007✔
98
    syncLogRecvRequestVote(ths, pMsg, -1, "not in my config", "process", &pRpcMsg->info.traceId);
×
99

100
    TAOS_RETURN(TSDB_CODE_SYN_NOT_IN_RAFT_GROUP);
×
101
  }
102

103
  if (ths->state == TAOS_SYNC_STATE_LEARNER) {
1,076,007✔
104
    syncLogRecvRequestVote(ths, pMsg, -1, "I'm learner", "process", &pRpcMsg->info.traceId);
×
105

106
    TAOS_RETURN(TSDB_CODE_SYN_LEARNER_NO_VOTE);
×
107
  }
108

109
  bool logOK = syncNodeOnRequestVoteLogOK(ths, pMsg);
1,076,007✔
110
  // maybe update term
111
  if (pMsg->term > raftStoreGetTerm(ths)) {
1,076,007✔
112
    syncNodeStepDown(ths, pMsg->term, pMsg->srcId, "requestVote-1");
984,078✔
113
  }
114
  SyncTerm currentTerm = raftStoreGetTerm(ths);
1,076,007✔
115
  if (!(pMsg->term <= currentTerm)) return TSDB_CODE_SYN_INTERNAL_ERROR;
1,076,007✔
116

117
  bool hasVoted = raftStoreHasVoted(ths);
1,076,007✔
118
  bool grant =
1,076,007✔
119
      (pMsg->term == currentTerm) && logOK && ((!hasVoted) || syncUtilSameId(&ths->raftStore.voteFor, &pMsg->srcId));
1,076,007✔
120
  sInfo("vgId:%d, grant:%d, hasVoted:%d, voteFor:0x%" PRIx64 ", srcId:0x%" PRIx64 ", logOK:%d, msg term:%" PRId64
1,076,007✔
121
        ", current term:%" PRId64,
122
        ths->vgId, grant, hasVoted, ths->raftStore.voteFor.addr, pMsg->srcId.addr, logOK, pMsg->term, currentTerm);
123
  if (grant) {
1,076,007✔
124
    // maybe has already voted for pMsg->srcId
125
    // vote again, no harm
126
    raftStoreVote(ths, &(pMsg->srcId));
931,870✔
127

128
    // candidate ?
129
    syncNodeStepDown(ths, currentTerm, pMsg->srcId, "requestVote-2");
931,870✔
130

131
    // forbid elect for this round
132
    resetElect = true;
931,870✔
133
  }
134

135
  // send msg
136
  SRpcMsg rpcMsg = {0};
1,076,007✔
137

138
  TAOS_CHECK_RETURN(syncBuildRequestVoteReply(&rpcMsg, ths->vgId));
1,076,007✔
139

140
  SyncRequestVoteReply* pReply = rpcMsg.pCont;
1,076,007✔
141
  pReply->srcId = ths->myRaftId;
1,076,007✔
142
  pReply->destId = pMsg->srcId;
1,076,007✔
143
  pReply->term = currentTerm;
1,076,007✔
144
  pReply->voteGranted = grant;
1,076,007✔
145
  if (!(!grant || pMsg->term == pReply->term)) return TSDB_CODE_SYN_INTERNAL_ERROR;
1,076,007✔
146

147
  // trace log
148
  syncLogRecvRequestVote(ths, pMsg, pReply->voteGranted, "", "proceed", &pRpcMsg->info.traceId);
1,076,007✔
149

150
  rpcMsg.info.traceId = pRpcMsg->info.traceId;
1,076,007✔
151
  TRACE_SET_MSGID(&(rpcMsg.info.traceId), tGenIdPI64());
1,076,007✔
152
  syncLogSendRequestVoteReply(ths, pReply, "", &rpcMsg.info.traceId);
1,076,007✔
153
  TAOS_CHECK_RETURN(syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg));
1,076,007✔
154

155
  if (resetElect) syncNodeResetElectTimer(ths);
1,076,007✔
156

157
  TAOS_RETURN(TSDB_CODE_SUCCESS);
1,076,007✔
158
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc