• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4933

20 Jan 2026 10:44AM UTC coverage: 66.671% (+0.03%) from 66.646%
#4933

push

travis-ci

web-flow
merge: from main to 3.0 #34340

73 of 178 new or added lines in 9 files covered. (41.01%)

1199 existing lines in 124 files now uncovered.

203121 of 304663 relevant lines covered (66.67%)

132228377.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.05
/source/dnode/mgmt/mgmt_snode/src/smHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "smInt.h"
18
#include "stream.h"
19

20
SSnodeInfo gSnode = {0};
21

22

23
void smGetMonitorInfo(SSnodeMgmt *pMgmt, SMonSmInfo *smInfo) {}
×
24

25
static int32_t epToJson(const void* pObj, SJson* pJson) {
229,239✔
26
  const SEp* pNode = (const SEp*)pObj;
229,239✔
27

28
  int32_t code = tjsonAddStringToObject(pJson, "fqdn", pNode->fqdn);
229,239✔
29
  if (TSDB_CODE_SUCCESS == code) {
229,239✔
30
    code = tjsonAddIntegerToObject(pJson, "port", pNode->port);
229,239✔
31
  }
32

33
  return code;
229,239✔
34
}
35

36
static int32_t jsonToEp(const SJson* pJson, void* pObj) {
4,285✔
37
  SEp* pNode = (SEp*)pObj;
4,285✔
38

39
  int32_t code = tjsonGetStringValue(pJson, "fqdn", pNode->fqdn);
4,285✔
40
  if (TSDB_CODE_SUCCESS == code) {
4,285✔
41
    code = tjsonGetSmallIntValue(pJson, "port", &pNode->port);
4,285✔
42
  }
43

44
  return code;
4,285✔
45
}
46

47

48
void smUpdateSnodeInfo(SDCreateSnodeReq* pReq) {
695,601✔
49
  taosWLockLatch(&gSnode.snodeLock);
695,601✔
50
  gSnode.snodeId = pReq->snodeId;
695,601✔
51
  gSnode.snodeLeaders[0] = pReq->leaders[0];
695,601✔
52
  gSnode.snodeLeaders[1] = pReq->leaders[1];  
695,601✔
53
  gSnode.snodeReplica = pReq->replica;
695,601✔
54
  taosWUnLockLatch(&gSnode.snodeLock);
695,601✔
55
}
695,601✔
56

57
SEpSet* dmGetSynEpset(int32_t leaderId) {
360,102✔
58
  if (gSnode.snodeId == leaderId && gSnode.snodeReplica.nodeId > 0) {
360,102✔
59
    return &gSnode.snodeReplica.epSet;
6,803✔
60
  } 
61
  for (int32_t i = 0; i < 2; ++i) {
1,059,897✔
62
    if (gSnode.snodeLeaders[i].nodeId == leaderId) {
706,598✔
UNCOV
63
      return &gSnode.snodeLeaders[i].epSet;
×
64
    }
65
  }
66
  return NULL;
353,299✔
67
}
68

69
int32_t smBuildCreateReqFromJson(SJson *pJson, SDCreateSnodeReq *pReq) {
550,330✔
70
  SJson* pLeader0 = NULL;
550,330✔
71
  SJson* pLeader1 = NULL;
550,330✔
72
  SJson* pReplica = NULL;
550,330✔
73
  int32_t code = tjsonGetIntValue(pJson, "snodeId", &pReq->snodeId);
550,330✔
74
  if (TSDB_CODE_SUCCESS == code) {
550,330✔
75
    pLeader0 = tjsonGetObjectItem(pJson, "leader0");
550,330✔
76
    if (pLeader0) {
550,330✔
77
      code = tjsonGetIntValue(pLeader0, "nodeId", &pReq->leaders[0].nodeId);
2,864✔
78
      if (TSDB_CODE_SUCCESS == code) {
2,864✔
79
        code = tjsonGetTinyIntValue(pLeader0, "inUse", &pReq->leaders[0].epSet.inUse);
2,864✔
80
      }
81
      if (TSDB_CODE_SUCCESS == code) {
2,864✔
82
        code = tjsonGetTinyIntValue(pLeader0, "numOfEps", &pReq->leaders[0].epSet.numOfEps);
2,864✔
83
      }
84
      if (TSDB_CODE_SUCCESS == code) {
2,864✔
85
        code = tjsonToArray(pLeader0, "eps", jsonToEp, pReq->leaders[0].epSet.eps, sizeof(SEp));
2,864✔
86
      }
87
    }
88
  }
89

90
  if (TSDB_CODE_SUCCESS == code) {
550,330✔
91
    pLeader1 = tjsonGetObjectItem(pJson, "leader1");
550,330✔
92
    if (pLeader1) {
550,330✔
93
      code = tjsonGetIntValue((pLeader1), "nodeId", &pReq->leaders[1].nodeId);
2,864✔
94
      if (TSDB_CODE_SUCCESS == code) {
2,864✔
95
        code = tjsonGetTinyIntValue((pLeader1), "inUse", &pReq->leaders[1].epSet.inUse);
2,864✔
96
      }
97
      if (TSDB_CODE_SUCCESS == code) {
2,864✔
98
        code = tjsonGetTinyIntValue((pLeader1), "numOfEps", &pReq->leaders[1].epSet.numOfEps);
2,864✔
99
      }
100
      if (TSDB_CODE_SUCCESS == code) {
2,864✔
101
        code = tjsonToArray(pLeader1, "eps", jsonToEp, pReq->leaders[1].epSet.eps, sizeof(SEp));
2,864✔
102
      }
103
    }
104
  }
105

106
  if (TSDB_CODE_SUCCESS == code) {
550,330✔
107
    pReplica = tjsonGetObjectItem(pJson, "replica");
550,330✔
108
    if (pReplica) {
550,330✔
109
      code = tjsonGetIntValue((pReplica), "nodeId", &pReq->replica.nodeId);
2,864✔
110
      if (TSDB_CODE_SUCCESS == code) {
2,864✔
111
        code = tjsonGetTinyIntValue((pReplica), "inUse", &pReq->replica.epSet.inUse);
2,864✔
112
      }
113
      if (TSDB_CODE_SUCCESS == code) {
2,864✔
114
        code = tjsonGetTinyIntValue((pReplica), "numOfEps", &pReq->replica.epSet.numOfEps);
2,864✔
115
      }
116
      if (TSDB_CODE_SUCCESS == code) {
2,864✔
117
        code = tjsonToArray(pReplica, "eps", jsonToEp, pReq->replica.epSet.eps, sizeof(SEp));
2,864✔
118
      }
119
    }
120
  }
121

122
  return code;
550,330✔
123
}
124

125
int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
145,271✔
126
  int32_t          code = 0;
145,271✔
127
  int32_t          lino = 0;
145,271✔
128
  SDCreateSnodeReq createReq = {0};
145,271✔
129
  if (tDeserializeSDCreateSNodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
145,271✔
130
    code = TSDB_CODE_INVALID_MSG;
×
131
    return code;
×
132
  }
133

134
  if (pInput->pData->dnodeId != 0 && createReq.snodeId != pInput->pData->dnodeId) {
145,271✔
135
    code = TSDB_CODE_INVALID_OPTION;
×
136
    dError("failed to create snode since %s", tstrerror(code));
×
137
    goto _exit;
×
138
  }
139

140
  bool deployed = true;
145,271✔
141
  SJson *pJson = tjsonCreateObject();
145,271✔
142
  if (pJson == NULL) {
145,271✔
143
    code = terrno;
×
144
    dError("failed to create json object since %s", tstrerror(code));
×
145
    goto _exit;
×
146
  }
147

148
  TAOS_CHECK_EXIT(tjsonAddDoubleToObject(pJson, "deployed", deployed));
145,271✔
149
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(pJson, "snodeId", createReq.snodeId));
145,271✔
150

151
  SJson *leader0 = tjsonCreateObject();
145,271✔
152
  TAOS_CHECK_EXIT(tjsonAddItemToObject(pJson, "leader0", leader0));
145,271✔
153
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader0, "nodeId", createReq.leaders[0].nodeId));
145,271✔
154
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader0, "inUse", createReq.leaders[0].epSet.inUse));
145,271✔
155
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader0, "numOfEps", createReq.leaders[0].epSet.numOfEps));
145,271✔
156
  TAOS_CHECK_EXIT(tjsonAddArray(leader0, "eps", epToJson, createReq.leaders[0].epSet.eps, sizeof(SEp), createReq.leaders[0].epSet.numOfEps));
145,271✔
157

158
  SJson *leader1 = tjsonCreateObject();
145,271✔
159
  TAOS_CHECK_EXIT(tjsonAddItemToObject(pJson, "leader1", leader1));
145,271✔
160
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader1, "nodeId", createReq.leaders[1].nodeId));
145,271✔
161
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader1, "inUse", createReq.leaders[1].epSet.inUse));
145,271✔
162
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader1, "numOfEps", createReq.leaders[1].epSet.numOfEps));
145,271✔
163
  TAOS_CHECK_EXIT(tjsonAddArray(leader1, "eps", epToJson, createReq.leaders[1].epSet.eps, sizeof(SEp), createReq.leaders[1].epSet.numOfEps));
145,271✔
164

165
  SJson *replica = tjsonCreateObject();
145,271✔
166
  TAOS_CHECK_EXIT(tjsonAddItemToObject(pJson, "replica", replica));
145,271✔
167
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(replica, "nodeId", createReq.replica.nodeId));
145,271✔
168
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(replica, "inUse", createReq.replica.epSet.inUse));
145,271✔
169
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(replica, "numOfEps", createReq.replica.epSet.numOfEps));
145,271✔
170
  TAOS_CHECK_EXIT(tjsonAddArray(replica, "eps", epToJson, createReq.replica.epSet.eps, sizeof(SEp), createReq.replica.epSet.numOfEps));
145,271✔
171

172
  char path[TSDB_FILENAME_LEN];
145,215✔
173
  snprintf(path, TSDB_FILENAME_LEN, "%s%ssnode%d", pInput->path, TD_DIRSEP, createReq.snodeId);
145,271✔
174

175
  if (taosMulMkDir(path) != 0) {
145,271✔
176
    code = terrno;
×
177
    dError("failed to create dir:%s since %s", path, tstrerror(code));
×
178
    goto _exit;
×
179
  }
180

181
  dInfo("path %s created", path);
145,271✔
182
  
183
  if ((code = dmWriteFileJson(path, pInput->name, pJson)) != 0) {
145,271✔
184
    dError("failed to write snode file since %s", tstrerror(code));
×
185
    goto _exit;
×
186
  }
187

188
  if (createReq.replica.nodeId != gSnode.snodeReplica.nodeId && createReq.replica.nodeId != 0) {
145,271✔
189
    int32_t ret = streamSyncAllCheckpoints(&createReq.replica.epSet);
65,313✔
190
    dInfo("[checkpoint] sync all checkpoint from snode %d to replicaId:%d, return:%d", createReq.snodeId, createReq.replica.nodeId, ret);
65,313✔
191
  }
192
  smUpdateSnodeInfo(&createReq);
145,271✔
193

194
  dInfo("snode %d created, replicaId:%d", createReq.snodeId, createReq.replica.nodeId);
145,271✔
195

196
_exit:
145,215✔
197

198
  tFreeSDCreateSnodeReq(&createReq);
145,271✔
199
  
200
  return code;
145,271✔
201
}
202

203
int32_t smProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
32,338✔
204
  int32_t        code = 0;
32,338✔
205
  SDDropSnodeReq dropReq = {0};
32,338✔
206
  if (tDeserializeSCreateDropMQSNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
32,338✔
207
    code = TSDB_CODE_INVALID_MSG;
×
208

209
    return code;
×
210
  }
211

212
  if (pInput->pData->dnodeId != 0 && dropReq.dnodeId != pInput->pData->dnodeId) {
32,338✔
213
    code = TSDB_CODE_INVALID_OPTION;
×
214
    dError("failed to drop snode since %s", tstrerror(code));
×
215
    tFreeSMCreateQnodeReq(&dropReq);
×
216
    return code;
×
217
  }
218

219
  char path[TSDB_FILENAME_LEN];
32,338✔
220
  snprintf(path, TSDB_FILENAME_LEN, "%s%ssnode%d", pInput->path, TD_DIRSEP, dropReq.dnodeId);
32,338✔
221

222
  streamDeleteAllCheckpoints();
32,338✔
223

224
  bool deployed = false;
32,338✔
225
  if ((code = dmWriteFile(path, pInput->name, deployed)) != 0) {
32,338✔
226
    dError("failed to write snode file since %s", tstrerror(code));
×
227
    tFreeSMCreateQnodeReq(&dropReq);
×
228
    return code;
×
229
  }
230

231
  smUndeploySnodeTasks(true);
32,338✔
232

233
  tFreeSMCreateQnodeReq(&dropReq);
32,338✔
234
  return 0;
32,338✔
235
}
236

237
SArray *smGetMsgHandles() {
550,068✔
238
  int32_t code = -1;
550,068✔
239
  SArray *pArray = taosArrayInit(4, sizeof(SMgmtHandle));
550,068✔
240
  if (pArray == NULL) goto _OVER;
550,068✔
241

242
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_SYNC_CHECKPOINT, smPutMsgToRunnerQueue, 0) == NULL) goto _OVER;
550,068✔
243
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_SYNC_CHECKPOINT_RSP, smPutMsgToRunnerQueue, 0) == NULL) goto _OVER;
550,068✔
244
  
245
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_DELETE_CHECKPOINT, smPutMsgToRunnerQueue, 0) == NULL) goto _OVER;
550,068✔
246

247
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_CALC, smPutMsgToRunnerQueue, 1) == NULL) goto _OVER;
550,068✔
248
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_FETCH_FROM_RUNNER, smPutMsgToRunnerQueue, 0) == NULL) goto _OVER;
550,068✔
249
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_FETCH_FROM_CACHE, smPutMsgToRunnerQueue, 0) == NULL) goto _OVER;
550,068✔
250
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_DROP, smPutMsgToRunnerQueue, 1) == NULL) goto _OVER;
550,068✔
251

252
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_PULL_RSP, smPutMsgToTriggerQueue, 0) == NULL) goto _OVER;
550,068✔
253
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_CALC_RSP, smPutMsgToTriggerQueue, 0) == NULL) goto _OVER;
550,068✔
254
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_DROP_RSP, smPutMsgToTriggerQueue, 1) == NULL) goto _OVER;
550,068✔
255
  if (dmSetMgmtHandle(pArray, TDMT_VND_SNODE_DROP_TABLE_RSP, smPutMsgToTriggerQueue, 1) == NULL) goto _OVER;
550,068✔
256
  if (dmSetMgmtHandle(pArray, TDMT_SND_BATCH_META, smPutMsgToTriggerQueue, 0) == NULL) goto _OVER;
550,068✔
257

258
  code = 0;
550,068✔
259
  
260
_OVER:
550,068✔
261
  if (code != 0) {
550,068✔
262
    taosArrayDestroy(pArray);
×
263
    return NULL;
×
264
  } else {
265
    return pArray;
550,068✔
266
  }
267
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc