• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4971

28 Feb 2026 08:05AM UTC coverage: 67.671% (-0.04%) from 67.707%
#4971

push

travis-ci

web-flow
fix(planner): disable project block merge in non-top-level subplans (#34617)

208281 of 307783 relevant lines covered (67.67%)

130135765.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.71
/source/dnode/mgmt/mgmt_snode/src/smHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "smInt.h"
18
#include "stream.h"
19

20
SSnodeInfo gSnode = {0};
21

22

23
void smGetMonitorInfo(SSnodeMgmt *pMgmt, SMonSmInfo *smInfo) {}
×
24

25
static int32_t epToJson(const void* pObj, SJson* pJson) {
237,611✔
26
  const SEp* pNode = (const SEp*)pObj;
237,611✔
27

28
  int32_t code = tjsonAddStringToObject(pJson, "fqdn", pNode->fqdn);
237,611✔
29
  if (TSDB_CODE_SUCCESS == code) {
237,611✔
30
    code = tjsonAddIntegerToObject(pJson, "port", pNode->port);
237,611✔
31
  }
32

33
  return code;
237,611✔
34
}
35

36
static int32_t jsonToEp(const SJson* pJson, void* pObj) {
5,158✔
37
  SEp* pNode = (SEp*)pObj;
5,158✔
38

39
  int32_t code = tjsonGetStringValue(pJson, "fqdn", pNode->fqdn);
5,158✔
40
  if (TSDB_CODE_SUCCESS == code) {
5,158✔
41
    code = tjsonGetSmallIntValue(pJson, "port", &pNode->port);
5,158✔
42
  }
43

44
  return code;
5,158✔
45
}
46

47

48
void smUpdateSnodeInfo(SDCreateSnodeReq* pReq) {
751,086✔
49
  taosWLockLatch(&gSnode.snodeLock);
751,086✔
50
  gSnode.snodeId = pReq->snodeId;
751,086✔
51
  gSnode.snodeLeaders[0] = pReq->leaders[0];
751,086✔
52
  gSnode.snodeLeaders[1] = pReq->leaders[1];  
751,086✔
53
  gSnode.snodeReplica = pReq->replica;
751,086✔
54
  taosWUnLockLatch(&gSnode.snodeLock);
751,086✔
55
}
751,086✔
56

57
SEpSet* dmGetSynEpset(int32_t leaderId) {
415,827✔
58
  if (gSnode.snodeId == leaderId && gSnode.snodeReplica.nodeId > 0) {
415,827✔
59
    return &gSnode.snodeReplica.epSet;
7,888✔
60
  } 
61
  for (int32_t i = 0; i < 2; ++i) {
1,223,817✔
62
    if (gSnode.snodeLeaders[i].nodeId == leaderId) {
815,878✔
63
      return &gSnode.snodeLeaders[i].epSet;
×
64
    }
65
  }
66
  return NULL;
407,939✔
67
}
68

69
int32_t smBuildCreateReqFromJson(SJson *pJson, SDCreateSnodeReq *pReq) {
597,846✔
70
  SJson* pLeader0 = NULL;
597,846✔
71
  SJson* pLeader1 = NULL;
597,846✔
72
  SJson* pReplica = NULL;
597,846✔
73
  int32_t code = tjsonGetIntValue(pJson, "snodeId", &pReq->snodeId);
597,846✔
74

75
  // Read encrypted flag (optional, defaults to false for backward compatibility)
76
  int32_t encrypted = 0;
597,846✔
77
  if (TSDB_CODE_SUCCESS == code) {
597,846✔
78
    code = tjsonGetIntValue(pJson, "encrypted", &encrypted);
597,846✔
79
    if (TSDB_CODE_SUCCESS == code) {
597,846✔
80
      // Update global snode encrypted flag
81
      taosWLockLatch(&gSnode.snodeLock);
597,846✔
82
      gSnode.encrypted = encrypted;
597,846✔
83
      taosWUnLockLatch(&gSnode.snodeLock);
597,846✔
84
    }
85
  }
86

87
  if (TSDB_CODE_SUCCESS == code) {
597,846✔
88
    pLeader0 = tjsonGetObjectItem(pJson, "leader0");
597,846✔
89
    if (pLeader0) {
597,846✔
90
      code = tjsonGetIntValue(pLeader0, "nodeId", &pReq->leaders[0].nodeId);
3,271✔
91
      if (TSDB_CODE_SUCCESS == code) {
3,271✔
92
        code = tjsonGetTinyIntValue(pLeader0, "inUse", &pReq->leaders[0].epSet.inUse);
3,271✔
93
      }
94
      if (TSDB_CODE_SUCCESS == code) {
3,271✔
95
        code = tjsonGetTinyIntValue(pLeader0, "numOfEps", &pReq->leaders[0].epSet.numOfEps);
3,271✔
96
      }
97
      if (TSDB_CODE_SUCCESS == code) {
3,271✔
98
        code = tjsonToArray(pLeader0, "eps", jsonToEp, pReq->leaders[0].epSet.eps, sizeof(SEp));
3,271✔
99
      }
100
    }
101
  }
102

103
  if (TSDB_CODE_SUCCESS == code) {
597,846✔
104
    pLeader1 = tjsonGetObjectItem(pJson, "leader1");
597,846✔
105
    if (pLeader1) {
597,846✔
106
      code = tjsonGetIntValue((pLeader1), "nodeId", &pReq->leaders[1].nodeId);
3,271✔
107
      if (TSDB_CODE_SUCCESS == code) {
3,271✔
108
        code = tjsonGetTinyIntValue((pLeader1), "inUse", &pReq->leaders[1].epSet.inUse);
3,271✔
109
      }
110
      if (TSDB_CODE_SUCCESS == code) {
3,271✔
111
        code = tjsonGetTinyIntValue((pLeader1), "numOfEps", &pReq->leaders[1].epSet.numOfEps);
3,271✔
112
      }
113
      if (TSDB_CODE_SUCCESS == code) {
3,271✔
114
        code = tjsonToArray(pLeader1, "eps", jsonToEp, pReq->leaders[1].epSet.eps, sizeof(SEp));
3,271✔
115
      }
116
    }
117
  }
118

119
  if (TSDB_CODE_SUCCESS == code) {
597,846✔
120
    pReplica = tjsonGetObjectItem(pJson, "replica");
597,846✔
121
    if (pReplica) {
597,846✔
122
      code = tjsonGetIntValue((pReplica), "nodeId", &pReq->replica.nodeId);
3,271✔
123
      if (TSDB_CODE_SUCCESS == code) {
3,271✔
124
        code = tjsonGetTinyIntValue((pReplica), "inUse", &pReq->replica.epSet.inUse);
3,271✔
125
      }
126
      if (TSDB_CODE_SUCCESS == code) {
3,271✔
127
        code = tjsonGetTinyIntValue((pReplica), "numOfEps", &pReq->replica.epSet.numOfEps);
3,271✔
128
      }
129
      if (TSDB_CODE_SUCCESS == code) {
3,271✔
130
        code = tjsonToArray(pReplica, "eps", jsonToEp, pReq->replica.epSet.eps, sizeof(SEp));
3,271✔
131
      }
132
    }
133
  }
134

135
  return code;
597,846✔
136
}
137

138
int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
153,240✔
139
  int32_t          code = 0;
153,240✔
140
  int32_t          lino = 0;
153,240✔
141
  SDCreateSnodeReq createReq = {0};
153,240✔
142
  if (tDeserializeSDCreateSNodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
153,240✔
143
    code = TSDB_CODE_INVALID_MSG;
×
144
    return code;
×
145
  }
146

147
  if (pInput->pData->dnodeId != 0 && createReq.snodeId != pInput->pData->dnodeId) {
153,240✔
148
    code = TSDB_CODE_INVALID_OPTION;
×
149
    dError("failed to create snode since %s", tstrerror(code));
×
150
    goto _exit;
×
151
  }
152

153
  bool deployed = true;
153,240✔
154
  SJson *pJson = tjsonCreateObject();
153,240✔
155
  if (pJson == NULL) {
153,240✔
156
    code = terrno;
×
157
    dError("failed to create json object since %s", tstrerror(code));
×
158
    goto _exit;
×
159
  }
160

161
  TAOS_CHECK_EXIT(tjsonAddDoubleToObject(pJson, "deployed", deployed));
153,240✔
162
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(pJson, "snodeId", createReq.snodeId));
153,240✔
163

164
  SJson *leader0 = tjsonCreateObject();
153,240✔
165
  TAOS_CHECK_EXIT(tjsonAddItemToObject(pJson, "leader0", leader0));
153,240✔
166
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader0, "nodeId", createReq.leaders[0].nodeId));
153,240✔
167
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader0, "inUse", createReq.leaders[0].epSet.inUse));
153,240✔
168
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader0, "numOfEps", createReq.leaders[0].epSet.numOfEps));
153,240✔
169
  TAOS_CHECK_EXIT(tjsonAddArray(leader0, "eps", epToJson, createReq.leaders[0].epSet.eps, sizeof(SEp), createReq.leaders[0].epSet.numOfEps));
153,240✔
170

171
  SJson *leader1 = tjsonCreateObject();
153,240✔
172
  TAOS_CHECK_EXIT(tjsonAddItemToObject(pJson, "leader1", leader1));
153,240✔
173
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader1, "nodeId", createReq.leaders[1].nodeId));
153,240✔
174
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader1, "inUse", createReq.leaders[1].epSet.inUse));
153,240✔
175
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader1, "numOfEps", createReq.leaders[1].epSet.numOfEps));
153,240✔
176
  TAOS_CHECK_EXIT(tjsonAddArray(leader1, "eps", epToJson, createReq.leaders[1].epSet.eps, sizeof(SEp), createReq.leaders[1].epSet.numOfEps));
153,240✔
177

178
  SJson *replica = tjsonCreateObject();
153,240✔
179
  TAOS_CHECK_EXIT(tjsonAddItemToObject(pJson, "replica", replica));
153,240✔
180
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(replica, "nodeId", createReq.replica.nodeId));
153,240✔
181
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(replica, "inUse", createReq.replica.epSet.inUse));
153,240✔
182
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(replica, "numOfEps", createReq.replica.epSet.numOfEps));
153,240✔
183
  TAOS_CHECK_EXIT(tjsonAddArray(replica, "eps", epToJson, createReq.replica.epSet.eps, sizeof(SEp), createReq.replica.epSet.numOfEps));
153,240✔
184

185
  char path[TSDB_FILENAME_LEN];
153,178✔
186
  snprintf(path, TSDB_FILENAME_LEN, "%s%ssnode%d", pInput->path, TD_DIRSEP, createReq.snodeId);
153,240✔
187

188
  if (taosMulMkDir(path) != 0) {
153,240✔
189
    code = terrno;
×
190
    dError("failed to create dir:%s since %s", path, tstrerror(code));
×
191
    goto _exit;
×
192
  }
193

194
  dInfo("path %s created", path);
153,240✔
195
  
196
  if ((code = dmWriteFileJson(path, pInput->name, pJson)) != 0) {
153,240✔
197
    dError("failed to write snode file since %s", tstrerror(code));
×
198
    goto _exit;
×
199
  }
200

201
  if (createReq.replica.nodeId != gSnode.snodeReplica.nodeId && createReq.replica.nodeId != 0) {
153,240✔
202
    int32_t ret = streamSyncAllCheckpoints(&createReq.replica.epSet);
68,129✔
203
    dInfo("[checkpoint] sync all checkpoint from snode %d to replicaId:%d, return:%d", createReq.snodeId, createReq.replica.nodeId, ret);
68,129✔
204
  }
205
  smUpdateSnodeInfo(&createReq);
153,240✔
206

207
  dInfo("snode %d created, replicaId:%d", createReq.snodeId, createReq.replica.nodeId);
153,240✔
208

209
_exit:
153,178✔
210

211
  tFreeSDCreateSnodeReq(&createReq);
153,240✔
212
  
213
  return code;
153,240✔
214
}
215

216
int32_t smProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
34,166✔
217
  int32_t        code = 0;
34,166✔
218
  SDDropSnodeReq dropReq = {0};
34,166✔
219
  if (tDeserializeSCreateDropMQSNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
34,166✔
220
    code = TSDB_CODE_INVALID_MSG;
×
221

222
    return code;
×
223
  }
224

225
  if (pInput->pData->dnodeId != 0 && dropReq.dnodeId != pInput->pData->dnodeId) {
34,166✔
226
    code = TSDB_CODE_INVALID_OPTION;
×
227
    dError("failed to drop snode since %s", tstrerror(code));
×
228
    tFreeSMCreateQnodeReq(&dropReq);
×
229
    return code;
×
230
  }
231

232
  char path[TSDB_FILENAME_LEN];
34,166✔
233
  snprintf(path, TSDB_FILENAME_LEN, "%s%ssnode%d", pInput->path, TD_DIRSEP, dropReq.dnodeId);
34,166✔
234

235
  streamDeleteAllCheckpoints();
34,166✔
236

237
  bool deployed = false;
34,166✔
238
  if ((code = dmWriteFile(path, pInput->name, deployed)) != 0) {
34,166✔
239
    dError("failed to write snode file since %s", tstrerror(code));
×
240
    tFreeSMCreateQnodeReq(&dropReq);
×
241
    return code;
×
242
  }
243

244
  smUndeploySnodeTasks(true);
34,166✔
245

246
  tFreeSMCreateQnodeReq(&dropReq);
34,166✔
247
  return 0;
34,166✔
248
}
249

250
SArray *smGetMsgHandles() {
597,542✔
251
  int32_t code = -1;
597,542✔
252
  SArray *pArray = taosArrayInit(4, sizeof(SMgmtHandle));
597,542✔
253
  if (pArray == NULL) goto _OVER;
597,542✔
254

255
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_SYNC_CHECKPOINT, smPutMsgToRunnerQueue, 0) == NULL) goto _OVER;
597,542✔
256
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_SYNC_CHECKPOINT_RSP, smPutMsgToRunnerQueue, 0) == NULL) goto _OVER;
597,542✔
257
  
258
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_DELETE_CHECKPOINT, smPutMsgToRunnerQueue, 0) == NULL) goto _OVER;
597,542✔
259

260
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_CALC, smPutMsgToRunnerQueue, 1) == NULL) goto _OVER;
597,542✔
261
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_FETCH_FROM_RUNNER, smPutMsgToRunnerQueue, 0) == NULL) goto _OVER;
597,542✔
262
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_FETCH_FROM_CACHE, smPutMsgToRunnerQueue, 0) == NULL) goto _OVER;
597,542✔
263
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_DROP, smPutMsgToRunnerQueue, 1) == NULL) goto _OVER;
597,542✔
264

265
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_PULL_RSP, smPutMsgToTriggerQueue, 0) == NULL) goto _OVER;
597,542✔
266
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_CALC_RSP, smPutMsgToTriggerQueue, 0) == NULL) goto _OVER;
597,542✔
267
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_DROP_RSP, smPutMsgToTriggerQueue, 1) == NULL) goto _OVER;
597,542✔
268
  if (dmSetMgmtHandle(pArray, TDMT_VND_SNODE_DROP_TABLE_RSP, smPutMsgToTriggerQueue, 1) == NULL) goto _OVER;
597,542✔
269
  if (dmSetMgmtHandle(pArray, TDMT_SND_BATCH_META, smPutMsgToTriggerQueue, 0) == NULL) goto _OVER;
597,542✔
270

271
  code = 0;
597,542✔
272
  
273
_OVER:
597,542✔
274
  if (code != 0) {
597,542✔
275
    taosArrayDestroy(pArray);
×
276
    return NULL;
×
277
  } else {
278
    return pArray;
597,542✔
279
  }
280
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc