• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5034

24 Apr 2026 11:25AM UTC coverage: 73.058%. Remained the same
#5034

push

travis-ci

web-flow
merge: from main to 3.0 branch #35224

merge: from main to 3.0 branch[manual-only]

1336 of 1975 new or added lines in 48 files covered. (67.65%)

14149 existing lines in 164 files now uncovered.

275896 of 377640 relevant lines covered (73.06%)

132944440.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.71
/source/dnode/mgmt/mgmt_snode/src/smHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "smInt.h"
18
#include "stream.h"
19

20
SSnodeInfo gSnode = {0};
21

22

23
void smGetMonitorInfo(SSnodeMgmt *pMgmt, SMonSmInfo *smInfo) {}
×
24

25
static int32_t epToJson(const void* pObj, SJson* pJson) {
241,685✔
26
  const SEp* pNode = (const SEp*)pObj;
241,685✔
27

28
  int32_t code = tjsonAddStringToObject(pJson, "fqdn", pNode->fqdn);
241,685✔
29
  if (TSDB_CODE_SUCCESS == code) {
241,685✔
30
    code = tjsonAddIntegerToObject(pJson, "port", pNode->port);
241,685✔
31
  }
32

33
  return code;
241,685✔
34
}
35

36
static int32_t jsonToEp(const SJson* pJson, void* pObj) {
5,201✔
37
  SEp* pNode = (SEp*)pObj;
5,201✔
38

39
  int32_t code = tjsonGetStringValue1(pJson, "fqdn", pNode->fqdn, sizeof(pNode->fqdn));
5,201✔
40
  if (TSDB_CODE_SUCCESS == code) {
5,201✔
41
    code = tjsonGetSmallIntValue(pJson, "port", (int16_t *)&pNode->port);
5,201✔
42
  }
43

44
  return code;
5,201✔
45
}
46

47

48
void smUpdateSnodeInfo(SDCreateSnodeReq* pReq) {
811,027✔
49
  taosWLockLatch(&gSnode.snodeLock);
811,027✔
50
  gSnode.snodeId = pReq->snodeId;
811,027✔
51
  gSnode.snodeLeaders[0] = pReq->leaders[0];
811,027✔
52
  gSnode.snodeLeaders[1] = pReq->leaders[1];  
811,027✔
53
  gSnode.snodeReplica = pReq->replica;
811,027✔
54
  taosWUnLockLatch(&gSnode.snodeLock);
811,027✔
55
}
811,027✔
56

57
SEpSet* dmGetSynEpset(int32_t leaderId) {
397,969✔
58
  if (gSnode.snodeId == leaderId && gSnode.snodeReplica.nodeId > 0) {
397,969✔
59
    return &gSnode.snodeReplica.epSet;
9,120✔
60
  } 
61
  for (int32_t i = 0; i < 2; ++i) {
1,166,547✔
62
    if (gSnode.snodeLeaders[i].nodeId == leaderId) {
777,698✔
UNCOV
63
      return &gSnode.snodeLeaders[i].epSet;
×
64
    }
65
  }
66
  return NULL;
388,849✔
67
}
68

69
int32_t smBuildCreateReqFromJson(SJson *pJson, SDCreateSnodeReq *pReq) {
651,776✔
70
  SJson* pLeader0 = NULL;
651,776✔
71
  SJson* pLeader1 = NULL;
651,776✔
72
  SJson* pReplica = NULL;
651,776✔
73
  int32_t code = tjsonGetIntValue(pJson, "snodeId", &pReq->snodeId);
651,776✔
74

75
  // Read encrypted flag (optional, defaults to false for backward compatibility)
76
  int32_t encrypted = 0;
651,776✔
77
  if (TSDB_CODE_SUCCESS == code) {
651,776✔
78
    code = tjsonGetIntValue(pJson, "encrypted", &encrypted);
651,776✔
79
    if (TSDB_CODE_SUCCESS == code) {
651,776✔
80
      // Update global snode encrypted flag
81
      taosWLockLatch(&gSnode.snodeLock);
651,776✔
82
      gSnode.encrypted = encrypted;
651,776✔
83
      taosWUnLockLatch(&gSnode.snodeLock);
651,776✔
84
    }
85
  }
86

87
  if (TSDB_CODE_SUCCESS == code) {
651,776✔
88
    pLeader0 = tjsonGetObjectItem(pJson, "leader0");
651,776✔
89
    if (pLeader0) {
651,776✔
90
      code = tjsonGetIntValue(pLeader0, "nodeId", &pReq->leaders[0].nodeId);
3,416✔
91
      if (TSDB_CODE_SUCCESS == code) {
3,416✔
92
        code = tjsonGetTinyIntValue(pLeader0, "inUse", &pReq->leaders[0].epSet.inUse);
3,416✔
93
      }
94
      if (TSDB_CODE_SUCCESS == code) {
3,416✔
95
        code = tjsonGetTinyIntValue(pLeader0, "numOfEps", &pReq->leaders[0].epSet.numOfEps);
3,416✔
96
      }
97
      if (TSDB_CODE_SUCCESS == code) {
3,416✔
98
        code = tjsonToArray(pLeader0, "eps", jsonToEp, pReq->leaders[0].epSet.eps, sizeof(SEp));
3,416✔
99
      }
100
    }
101
  }
102

103
  if (TSDB_CODE_SUCCESS == code) {
651,776✔
104
    pLeader1 = tjsonGetObjectItem(pJson, "leader1");
651,776✔
105
    if (pLeader1) {
651,776✔
106
      code = tjsonGetIntValue((pLeader1), "nodeId", &pReq->leaders[1].nodeId);
3,416✔
107
      if (TSDB_CODE_SUCCESS == code) {
3,416✔
108
        code = tjsonGetTinyIntValue((pLeader1), "inUse", &pReq->leaders[1].epSet.inUse);
3,416✔
109
      }
110
      if (TSDB_CODE_SUCCESS == code) {
3,416✔
111
        code = tjsonGetTinyIntValue((pLeader1), "numOfEps", &pReq->leaders[1].epSet.numOfEps);
3,416✔
112
      }
113
      if (TSDB_CODE_SUCCESS == code) {
3,416✔
114
        code = tjsonToArray(pLeader1, "eps", jsonToEp, pReq->leaders[1].epSet.eps, sizeof(SEp));
3,416✔
115
      }
116
    }
117
  }
118

119
  if (TSDB_CODE_SUCCESS == code) {
651,776✔
120
    pReplica = tjsonGetObjectItem(pJson, "replica");
651,776✔
121
    if (pReplica) {
651,776✔
122
      code = tjsonGetIntValue((pReplica), "nodeId", &pReq->replica.nodeId);
3,416✔
123
      if (TSDB_CODE_SUCCESS == code) {
3,416✔
124
        code = tjsonGetTinyIntValue((pReplica), "inUse", &pReq->replica.epSet.inUse);
3,416✔
125
      }
126
      if (TSDB_CODE_SUCCESS == code) {
3,416✔
127
        code = tjsonGetTinyIntValue((pReplica), "numOfEps", &pReq->replica.epSet.numOfEps);
3,416✔
128
      }
129
      if (TSDB_CODE_SUCCESS == code) {
3,416✔
130
        code = tjsonToArray(pReplica, "eps", jsonToEp, pReq->replica.epSet.eps, sizeof(SEp));
3,416✔
131
      }
132
    }
133
  }
134

135
  return code;
651,776✔
136
}
137

138
int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
159,251✔
139
  int32_t          code = 0;
159,251✔
140
  int32_t          lino = 0;
159,251✔
141
  SDCreateSnodeReq createReq = {0};
159,251✔
142
  if (tDeserializeSDCreateSNodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
159,251✔
143
    code = TSDB_CODE_INVALID_MSG;
×
144
    return code;
×
145
  }
146

147
  if (pInput->pData->dnodeId != 0 && createReq.snodeId != pInput->pData->dnodeId) {
159,251✔
148
    code = TSDB_CODE_INVALID_OPTION;
×
149
    dError("failed to create snode since %s", tstrerror(code));
×
150
    goto _exit;
×
151
  }
152

153
  bool deployed = true;
159,251✔
154
  SJson *pJson = tjsonCreateObject();
159,251✔
155
  if (pJson == NULL) {
159,251✔
156
    code = terrno;
×
157
    dError("failed to create json object since %s", tstrerror(code));
×
158
    goto _exit;
×
159
  }
160

161
  TAOS_CHECK_EXIT(tjsonAddDoubleToObject(pJson, "deployed", deployed));
159,251✔
162
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(pJson, "snodeId", createReq.snodeId));
159,251✔
163

164
  SJson *leader0 = tjsonCreateObject();
159,251✔
165
  TAOS_CHECK_EXIT(tjsonAddItemToObject(pJson, "leader0", leader0));
159,251✔
166
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader0, "nodeId", createReq.leaders[0].nodeId));
159,251✔
167
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader0, "inUse", createReq.leaders[0].epSet.inUse));
159,251✔
168
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader0, "numOfEps", createReq.leaders[0].epSet.numOfEps));
159,251✔
169
  TAOS_CHECK_EXIT(tjsonAddArray(leader0, "eps", epToJson, createReq.leaders[0].epSet.eps, sizeof(SEp), createReq.leaders[0].epSet.numOfEps));
159,251✔
170

171
  SJson *leader1 = tjsonCreateObject();
159,251✔
172
  TAOS_CHECK_EXIT(tjsonAddItemToObject(pJson, "leader1", leader1));
159,251✔
173
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader1, "nodeId", createReq.leaders[1].nodeId));
159,251✔
174
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader1, "inUse", createReq.leaders[1].epSet.inUse));
159,251✔
175
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader1, "numOfEps", createReq.leaders[1].epSet.numOfEps));
159,251✔
176
  TAOS_CHECK_EXIT(tjsonAddArray(leader1, "eps", epToJson, createReq.leaders[1].epSet.eps, sizeof(SEp), createReq.leaders[1].epSet.numOfEps));
159,251✔
177

178
  SJson *replica = tjsonCreateObject();
159,251✔
179
  TAOS_CHECK_EXIT(tjsonAddItemToObject(pJson, "replica", replica));
159,251✔
180
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(replica, "nodeId", createReq.replica.nodeId));
159,251✔
181
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(replica, "inUse", createReq.replica.epSet.inUse));
159,251✔
182
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(replica, "numOfEps", createReq.replica.epSet.numOfEps));
159,251✔
183
  TAOS_CHECK_EXIT(tjsonAddArray(replica, "eps", epToJson, createReq.replica.epSet.eps, sizeof(SEp), createReq.replica.epSet.numOfEps));
159,251✔
184

185
  char path[TSDB_FILENAME_LEN];
159,170✔
186
  snprintf(path, TSDB_FILENAME_LEN, "%s%ssnode%d", pInput->path, TD_DIRSEP, createReq.snodeId);
159,251✔
187

188
  if (taosMulMkDir(path) != 0) {
159,251✔
189
    code = terrno;
×
190
    dError("failed to create dir:%s since %s", path, tstrerror(code));
×
191
    goto _exit;
×
192
  }
193

194
  dInfo("path %s created", path);
159,251✔
195
  
196
  if ((code = dmWriteFileJson(path, pInput->name, pJson)) != 0) {
159,251✔
197
    dError("failed to write snode file since %s", tstrerror(code));
×
198
    goto _exit;
×
199
  }
200

201
  if (createReq.replica.nodeId != gSnode.snodeReplica.nodeId && createReq.replica.nodeId != 0) {
159,251✔
202
    int32_t ret = streamSyncAllCheckpoints(&createReq.replica.epSet);
69,495✔
203
    dInfo("[checkpoint] sync all checkpoint from snode %d to replicaId:%d, return:%d", createReq.snodeId, createReq.replica.nodeId, ret);
69,495✔
204
  }
205
  smUpdateSnodeInfo(&createReq);
159,251✔
206

207
  dInfo("snode %d created, replicaId:%d", createReq.snodeId, createReq.replica.nodeId);
159,251✔
208

209
_exit:
159,170✔
210

211
  tFreeSDCreateSnodeReq(&createReq);
159,251✔
212
  
213
  return code;
159,251✔
214
}
215

216
int32_t smProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
34,006✔
217
  int32_t        code = 0;
34,006✔
218
  SDDropSnodeReq dropReq = {0};
34,006✔
219
  if (tDeserializeSCreateDropMQSNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
34,006✔
220
    code = TSDB_CODE_INVALID_MSG;
×
221

222
    return code;
×
223
  }
224

225
  if (pInput->pData->dnodeId != 0 && dropReq.dnodeId != pInput->pData->dnodeId) {
34,006✔
226
    code = TSDB_CODE_INVALID_OPTION;
×
227
    dError("failed to drop snode since %s", tstrerror(code));
×
228
    tFreeSMCreateQnodeReq(&dropReq);
×
229
    return code;
×
230
  }
231

232
  char path[TSDB_FILENAME_LEN];
34,006✔
233
  snprintf(path, TSDB_FILENAME_LEN, "%s%ssnode%d", pInput->path, TD_DIRSEP, dropReq.dnodeId);
34,006✔
234

235
  streamDeleteAllCheckpoints();
34,006✔
236

237
  bool deployed = false;
34,006✔
238
  if ((code = dmWriteFile(path, pInput->name, deployed)) != 0) {
34,006✔
239
    dError("failed to write snode file since %s", tstrerror(code));
×
240
    tFreeSMCreateQnodeReq(&dropReq);
×
241
    return code;
×
242
  }
243

244
  smUndeploySnodeTasks(true);
34,006✔
245

246
  tFreeSMCreateQnodeReq(&dropReq);
34,006✔
247
  return 0;
34,006✔
248
}
249

250
SArray *smGetMsgHandles() {
651,590✔
251
  int32_t code = -1;
651,590✔
252
  SArray *pArray = taosArrayInit(4, sizeof(SMgmtHandle));
651,590✔
253
  if (pArray == NULL) goto _OVER;
651,590✔
254

255
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_SYNC_CHECKPOINT, smPutMsgToRunnerQueue, 0) == NULL) goto _OVER;
651,590✔
256
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_SYNC_CHECKPOINT_RSP, smPutMsgToRunnerQueue, 0) == NULL) goto _OVER;
651,590✔
257
  
258
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_DELETE_CHECKPOINT, smPutMsgToRunnerQueue, 0) == NULL) goto _OVER;
651,590✔
259

260
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_CALC, smPutMsgToRunnerQueue, 1) == NULL) goto _OVER;
651,590✔
261
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_FETCH_FROM_RUNNER, smPutMsgToRunnerQueue, 0) == NULL) goto _OVER;
651,590✔
262
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_FETCH_FROM_CACHE, smPutMsgToRunnerQueue, 0) == NULL) goto _OVER;
651,590✔
263
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_DROP, smPutMsgToRunnerQueue, 1) == NULL) goto _OVER;
651,590✔
264

265
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_PULL_RSP, smPutMsgToTriggerQueue, 0) == NULL) goto _OVER;
651,590✔
266
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_CALC_RSP, smPutMsgToTriggerQueue, 0) == NULL) goto _OVER;
651,590✔
267
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_DROP_RSP, smPutMsgToTriggerQueue, 1) == NULL) goto _OVER;
651,590✔
268
  if (dmSetMgmtHandle(pArray, TDMT_VND_SNODE_DROP_TABLE_RSP, smPutMsgToTriggerQueue, 1) == NULL) goto _OVER;
651,590✔
269
  if (dmSetMgmtHandle(pArray, TDMT_SND_BATCH_META, smPutMsgToTriggerQueue, 0) == NULL) goto _OVER;
651,590✔
270

271
  code = 0;
651,590✔
272
  
273
_OVER:
651,590✔
274
  if (code != 0) {
651,590✔
275
    taosArrayDestroy(pArray);
×
276
    return NULL;
×
277
  } else {
278
    return pArray;
651,590✔
279
  }
280
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc