• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5055

17 May 2026 01:15AM UTC coverage: 73.355% (-0.003%) from 73.358%
#5055

push

travis-ci

web-flow
feat (TDgpt): Dynamic Model Synchronization Enhancements (#35344)

* refactor: do some internal refactor.

* fix: fix multiprocess sync issue.

* feat: add dynamic anomaly detection and forecasting services

* fix: log error message for undeploying model in exception handling

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* fix: handle undeploy when model exists only on disk

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/286aafa0-c3ce-4c27-b803-2707571e9dc1

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: guard dynamic registry concurrent access

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: tighten service list locking scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: restore prophet support and update tests per review feedback

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: improve test name and move copy inside lock scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* Potential fix for pull request finding

Co-au... (continued)

281532 of 383795 relevant lines covered (73.35%)

135557734.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.71
/source/dnode/mgmt/mgmt_snode/src/smHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "smInt.h"
18
#include "stream.h"
19

20
SSnodeInfo gSnode = {0};
21

22

23
void smGetMonitorInfo(SSnodeMgmt *pMgmt, SMonSmInfo *smInfo) {}
×
24

25
static int32_t epToJson(const void* pObj, SJson* pJson) {
303,265✔
26
  const SEp* pNode = (const SEp*)pObj;
303,265✔
27

28
  int32_t code = tjsonAddStringToObject(pJson, "fqdn", pNode->fqdn);
303,265✔
29
  if (TSDB_CODE_SUCCESS == code) {
303,265✔
30
    code = tjsonAddIntegerToObject(pJson, "port", pNode->port);
303,265✔
31
  }
32

33
  return code;
303,265✔
34
}
35

36
static int32_t jsonToEp(const SJson* pJson, void* pObj) {
5,699✔
37
  SEp* pNode = (SEp*)pObj;
5,699✔
38

39
  int32_t code = tjsonGetStringValue1(pJson, "fqdn", pNode->fqdn, sizeof(pNode->fqdn));
5,699✔
40
  if (TSDB_CODE_SUCCESS == code) {
5,699✔
41
    code = tjsonGetSmallIntValue(pJson, "port", (int16_t *)&pNode->port);
5,699✔
42
  }
43

44
  return code;
5,699✔
45
}
46

47

48
void smUpdateSnodeInfo(SDCreateSnodeReq* pReq) {
930,973✔
49
  taosWLockLatch(&gSnode.snodeLock);
930,973✔
50
  gSnode.snodeId = pReq->snodeId;
930,973✔
51
  gSnode.snodeLeaders[0] = pReq->leaders[0];
930,973✔
52
  gSnode.snodeLeaders[1] = pReq->leaders[1];  
930,973✔
53
  gSnode.snodeReplica = pReq->replica;
930,973✔
54
  taosWUnLockLatch(&gSnode.snodeLock);
930,973✔
55
}
930,973✔
56

57
SEpSet* dmGetSynEpset(int32_t leaderId) {
485,079✔
58
  if (gSnode.snodeId == leaderId && gSnode.snodeReplica.nodeId > 0) {
485,079✔
59
    return &gSnode.snodeReplica.epSet;
10,695✔
60
  } 
61
  for (int32_t i = 0; i < 2; ++i) {
1,423,152✔
62
    if (gSnode.snodeLeaders[i].nodeId == leaderId) {
948,768✔
63
      return &gSnode.snodeLeaders[i].epSet;
×
64
    }
65
  }
66
  return NULL;
474,384✔
67
}
68

69
int32_t smBuildCreateReqFromJson(SJson *pJson, SDCreateSnodeReq *pReq) {
728,241✔
70
  SJson* pLeader0 = NULL;
728,241✔
71
  SJson* pLeader1 = NULL;
728,241✔
72
  SJson* pReplica = NULL;
728,241✔
73
  int32_t code = tjsonGetIntValue(pJson, "snodeId", &pReq->snodeId);
728,241✔
74

75
  // Read encrypted flag (optional, defaults to false for backward compatibility)
76
  int32_t encrypted = 0;
728,241✔
77
  if (TSDB_CODE_SUCCESS == code) {
728,241✔
78
    code = tjsonGetIntValue(pJson, "encrypted", &encrypted);
728,241✔
79
    if (TSDB_CODE_SUCCESS == code) {
728,241✔
80
      // Update global snode encrypted flag
81
      taosWLockLatch(&gSnode.snodeLock);
728,241✔
82
      gSnode.encrypted = encrypted;
728,241✔
83
      taosWUnLockLatch(&gSnode.snodeLock);
728,241✔
84
    }
85
  }
86

87
  if (TSDB_CODE_SUCCESS == code) {
728,241✔
88
    pLeader0 = tjsonGetObjectItem(pJson, "leader0");
728,241✔
89
    if (pLeader0) {
728,241✔
90
      code = tjsonGetIntValue(pLeader0, "nodeId", &pReq->leaders[0].nodeId);
3,637✔
91
      if (TSDB_CODE_SUCCESS == code) {
3,637✔
92
        code = tjsonGetTinyIntValue(pLeader0, "inUse", &pReq->leaders[0].epSet.inUse);
3,637✔
93
      }
94
      if (TSDB_CODE_SUCCESS == code) {
3,637✔
95
        code = tjsonGetTinyIntValue(pLeader0, "numOfEps", &pReq->leaders[0].epSet.numOfEps);
3,637✔
96
      }
97
      if (TSDB_CODE_SUCCESS == code) {
3,637✔
98
        code = tjsonToArray(pLeader0, "eps", jsonToEp, pReq->leaders[0].epSet.eps, sizeof(SEp));
3,637✔
99
      }
100
    }
101
  }
102

103
  if (TSDB_CODE_SUCCESS == code) {
728,241✔
104
    pLeader1 = tjsonGetObjectItem(pJson, "leader1");
728,241✔
105
    if (pLeader1) {
728,241✔
106
      code = tjsonGetIntValue((pLeader1), "nodeId", &pReq->leaders[1].nodeId);
3,637✔
107
      if (TSDB_CODE_SUCCESS == code) {
3,637✔
108
        code = tjsonGetTinyIntValue((pLeader1), "inUse", &pReq->leaders[1].epSet.inUse);
3,637✔
109
      }
110
      if (TSDB_CODE_SUCCESS == code) {
3,637✔
111
        code = tjsonGetTinyIntValue((pLeader1), "numOfEps", &pReq->leaders[1].epSet.numOfEps);
3,637✔
112
      }
113
      if (TSDB_CODE_SUCCESS == code) {
3,637✔
114
        code = tjsonToArray(pLeader1, "eps", jsonToEp, pReq->leaders[1].epSet.eps, sizeof(SEp));
3,637✔
115
      }
116
    }
117
  }
118

119
  if (TSDB_CODE_SUCCESS == code) {
728,241✔
120
    pReplica = tjsonGetObjectItem(pJson, "replica");
728,241✔
121
    if (pReplica) {
728,241✔
122
      code = tjsonGetIntValue((pReplica), "nodeId", &pReq->replica.nodeId);
3,637✔
123
      if (TSDB_CODE_SUCCESS == code) {
3,637✔
124
        code = tjsonGetTinyIntValue((pReplica), "inUse", &pReq->replica.epSet.inUse);
3,637✔
125
      }
126
      if (TSDB_CODE_SUCCESS == code) {
3,637✔
127
        code = tjsonGetTinyIntValue((pReplica), "numOfEps", &pReq->replica.epSet.numOfEps);
3,637✔
128
      }
129
      if (TSDB_CODE_SUCCESS == code) {
3,637✔
130
        code = tjsonToArray(pReplica, "eps", jsonToEp, pReq->replica.epSet.eps, sizeof(SEp));
3,637✔
131
      }
132
    }
133
  }
134

135
  return code;
728,241✔
136
}
137

138
int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
202,732✔
139
  int32_t          code = 0;
202,732✔
140
  int32_t          lino = 0;
202,732✔
141
  SDCreateSnodeReq createReq = {0};
202,732✔
142
  if (tDeserializeSDCreateSNodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
202,732✔
143
    code = TSDB_CODE_INVALID_MSG;
×
144
    return code;
×
145
  }
146

147
  if (pInput->pData->dnodeId != 0 && createReq.snodeId != pInput->pData->dnodeId) {
202,732✔
148
    code = TSDB_CODE_INVALID_OPTION;
×
149
    dError("failed to create snode since %s", tstrerror(code));
×
150
    goto _exit;
×
151
  }
152

153
  bool deployed = true;
202,732✔
154
  SJson *pJson = tjsonCreateObject();
202,732✔
155
  if (pJson == NULL) {
202,732✔
156
    code = terrno;
×
157
    dError("failed to create json object since %s", tstrerror(code));
×
158
    goto _exit;
×
159
  }
160

161
  TAOS_CHECK_EXIT(tjsonAddDoubleToObject(pJson, "deployed", deployed));
202,732✔
162
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(pJson, "snodeId", createReq.snodeId));
202,732✔
163

164
  SJson *leader0 = tjsonCreateObject();
202,732✔
165
  TAOS_CHECK_EXIT(tjsonAddItemToObject(pJson, "leader0", leader0));
202,732✔
166
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader0, "nodeId", createReq.leaders[0].nodeId));
202,732✔
167
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader0, "inUse", createReq.leaders[0].epSet.inUse));
202,732✔
168
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader0, "numOfEps", createReq.leaders[0].epSet.numOfEps));
202,732✔
169
  TAOS_CHECK_EXIT(tjsonAddArray(leader0, "eps", epToJson, createReq.leaders[0].epSet.eps, sizeof(SEp), createReq.leaders[0].epSet.numOfEps));
202,732✔
170

171
  SJson *leader1 = tjsonCreateObject();
202,732✔
172
  TAOS_CHECK_EXIT(tjsonAddItemToObject(pJson, "leader1", leader1));
202,732✔
173
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader1, "nodeId", createReq.leaders[1].nodeId));
202,732✔
174
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader1, "inUse", createReq.leaders[1].epSet.inUse));
202,732✔
175
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader1, "numOfEps", createReq.leaders[1].epSet.numOfEps));
202,732✔
176
  TAOS_CHECK_EXIT(tjsonAddArray(leader1, "eps", epToJson, createReq.leaders[1].epSet.eps, sizeof(SEp), createReq.leaders[1].epSet.numOfEps));
202,732✔
177

178
  SJson *replica = tjsonCreateObject();
202,732✔
179
  TAOS_CHECK_EXIT(tjsonAddItemToObject(pJson, "replica", replica));
202,732✔
180
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(replica, "nodeId", createReq.replica.nodeId));
202,732✔
181
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(replica, "inUse", createReq.replica.epSet.inUse));
202,732✔
182
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(replica, "numOfEps", createReq.replica.epSet.numOfEps));
202,732✔
183
  TAOS_CHECK_EXIT(tjsonAddArray(replica, "eps", epToJson, createReq.replica.epSet.eps, sizeof(SEp), createReq.replica.epSet.numOfEps));
202,732✔
184

185
  char path[TSDB_FILENAME_LEN];
202,647✔
186
  snprintf(path, TSDB_FILENAME_LEN, "%s%ssnode%d", pInput->path, TD_DIRSEP, createReq.snodeId);
202,732✔
187

188
  if (taosMulMkDir(path) != 0) {
202,732✔
189
    code = terrno;
×
190
    dError("failed to create dir:%s since %s", path, tstrerror(code));
×
191
    goto _exit;
×
192
  }
193

194
  dInfo("path %s created", path);
202,732✔
195
  
196
  if ((code = dmWriteFileJson(path, pInput->name, pJson)) != 0) {
202,732✔
197
    dError("failed to write snode file since %s", tstrerror(code));
×
198
    goto _exit;
×
199
  }
200

201
  if (createReq.replica.nodeId != gSnode.snodeReplica.nodeId && createReq.replica.nodeId != 0) {
202,732✔
202
    int32_t ret = streamSyncAllCheckpoints(&createReq.replica.epSet);
88,918✔
203
    dInfo("[checkpoint] sync all checkpoint from snode %d to replicaId:%d, return:%d", createReq.snodeId, createReq.replica.nodeId, ret);
88,918✔
204
  }
205
  smUpdateSnodeInfo(&createReq);
202,732✔
206

207
  dInfo("snode %d created, replicaId:%d", createReq.snodeId, createReq.replica.nodeId);
202,732✔
208

209
_exit:
202,647✔
210

211
  tFreeSDCreateSnodeReq(&createReq);
202,732✔
212
  
213
  return code;
202,732✔
214
}
215

216
int32_t smProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
43,002✔
217
  int32_t        code = 0;
43,002✔
218
  SDDropSnodeReq dropReq = {0};
43,002✔
219
  if (tDeserializeSCreateDropMQSNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
43,002✔
220
    code = TSDB_CODE_INVALID_MSG;
×
221

222
    return code;
×
223
  }
224

225
  if (pInput->pData->dnodeId != 0 && dropReq.dnodeId != pInput->pData->dnodeId) {
43,002✔
226
    code = TSDB_CODE_INVALID_OPTION;
×
227
    dError("failed to drop snode since %s", tstrerror(code));
×
228
    tFreeSMCreateQnodeReq(&dropReq);
×
229
    return code;
×
230
  }
231

232
  char path[TSDB_FILENAME_LEN];
43,002✔
233
  snprintf(path, TSDB_FILENAME_LEN, "%s%ssnode%d", pInput->path, TD_DIRSEP, dropReq.dnodeId);
43,002✔
234

235
  streamDeleteAllCheckpoints();
43,002✔
236

237
  bool deployed = false;
43,002✔
238
  if ((code = dmWriteFile(path, pInput->name, deployed)) != 0) {
43,002✔
239
    dError("failed to write snode file since %s", tstrerror(code));
×
240
    tFreeSMCreateQnodeReq(&dropReq);
×
241
    return code;
×
242
  }
243

244
  smUndeploySnodeTasks(true);
43,002✔
245

246
  tFreeSMCreateQnodeReq(&dropReq);
43,002✔
247
  return 0;
43,002✔
248
}
249

250
SArray *smGetMsgHandles() {
727,308✔
251
  int32_t code = -1;
727,308✔
252
  SArray *pArray = taosArrayInit(4, sizeof(SMgmtHandle));
727,308✔
253
  if (pArray == NULL) goto _OVER;
727,308✔
254

255
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_SYNC_CHECKPOINT, smPutMsgToRunnerQueue, 0) == NULL) goto _OVER;
727,308✔
256
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_SYNC_CHECKPOINT_RSP, smPutMsgToRunnerQueue, 0) == NULL) goto _OVER;
727,308✔
257
  
258
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_DELETE_CHECKPOINT, smPutMsgToRunnerQueue, 0) == NULL) goto _OVER;
727,308✔
259

260
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_CALC, smPutMsgToRunnerQueue, 1) == NULL) goto _OVER;
727,308✔
261
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_FETCH_FROM_RUNNER, smPutMsgToRunnerQueue, 0) == NULL) goto _OVER;
727,308✔
262
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_FETCH_FROM_CACHE, smPutMsgToRunnerQueue, 0) == NULL) goto _OVER;
727,308✔
263
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_DROP, smPutMsgToRunnerQueue, 1) == NULL) goto _OVER;
727,308✔
264

265
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_PULL_RSP, smPutMsgToTriggerQueue, 0) == NULL) goto _OVER;
727,308✔
266
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_CALC_RSP, smPutMsgToTriggerQueue, 0) == NULL) goto _OVER;
727,308✔
267
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_DROP_RSP, smPutMsgToTriggerQueue, 1) == NULL) goto _OVER;
727,308✔
268
  if (dmSetMgmtHandle(pArray, TDMT_VND_SNODE_DROP_TABLE_RSP, smPutMsgToTriggerQueue, 1) == NULL) goto _OVER;
727,308✔
269
  if (dmSetMgmtHandle(pArray, TDMT_SND_BATCH_META, smPutMsgToTriggerQueue, 0) == NULL) goto _OVER;
727,308✔
270

271
  code = 0;
727,308✔
272
  
273
_OVER:
727,308✔
274
  if (code != 0) {
727,308✔
275
    taosArrayDestroy(pArray);
×
276
    return NULL;
×
277
  } else {
278
    return pArray;
727,308✔
279
  }
280
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc