• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4788

14 Oct 2025 11:21AM UTC coverage: 60.992% (-2.3%) from 63.264%
#4788

push

travis-ci

web-flow
Merge 7ca9b50f9 into 19574fe21

154868 of 324306 branches covered (47.75%)

Branch coverage included in aggregate %.

207304 of 269498 relevant lines covered (76.92%)

125773493.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.05
/source/dnode/mgmt/mgmt_snode/src/smHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "smInt.h"
18
#include "stream.h"
19

20
SSnodeInfo gSnode = {0};
21

22

23
void smGetMonitorInfo(SSnodeMgmt *pMgmt, SMonSmInfo *smInfo) {}
×
24

25
static int32_t epToJson(const void* pObj, SJson* pJson) {
596,046✔
26
  const SEp* pNode = (const SEp*)pObj;
596,046✔
27

28
  int32_t code = tjsonAddStringToObject(pJson, "fqdn", pNode->fqdn);
596,046✔
29
  if (TSDB_CODE_SUCCESS == code) {
596,046!
30
    code = tjsonAddIntegerToObject(pJson, "port", pNode->port);
596,046✔
31
  }
32

33
  return code;
596,046✔
34
}
35

36
static int32_t jsonToEp(const SJson* pJson, void* pObj) {
11,447✔
37
  SEp* pNode = (SEp*)pObj;
11,447✔
38

39
  int32_t code = tjsonGetStringValue(pJson, "fqdn", pNode->fqdn);
11,447✔
40
  if (TSDB_CODE_SUCCESS == code) {
11,447!
41
    code = tjsonGetSmallIntValue(pJson, "port", &pNode->port);
11,447✔
42
  }
43

44
  return code;
11,447✔
45
}
46

47

48
void smUpdateSnodeInfo(SDCreateSnodeReq* pReq) {
1,121,929✔
49
  taosWLockLatch(&gSnode.snodeLock);
1,121,929✔
50
  gSnode.snodeId = pReq->snodeId;
1,121,929✔
51
  gSnode.snodeLeaders[0] = pReq->leaders[0];
1,121,929✔
52
  gSnode.snodeLeaders[1] = pReq->leaders[1];  
1,121,929✔
53
  gSnode.snodeReplica = pReq->replica;
1,121,929✔
54
  taosWUnLockLatch(&gSnode.snodeLock);
1,121,929✔
55
}
1,121,929✔
56

57
SEpSet* dmGetSynEpset(int32_t leaderId) {
475,409✔
58
  if (gSnode.snodeId == leaderId && gSnode.snodeReplica.nodeId > 0) {
475,409!
59
    return &gSnode.snodeReplica.epSet;
19,075✔
60
  } 
61
  for (int32_t i = 0; i < 2; ++i) {
1,369,002✔
62
    if (gSnode.snodeLeaders[i].nodeId == leaderId) {
912,668!
63
      return &gSnode.snodeLeaders[i].epSet;
×
64
    }
65
  }
66
  return NULL;
456,334✔
67
}
68

69
int32_t smBuildCreateReqFromJson(SJson *pJson, SDCreateSnodeReq *pReq) {
747,441✔
70
  SJson* pLeader0 = NULL;
747,441✔
71
  SJson* pLeader1 = NULL;
747,441✔
72
  SJson* pReplica = NULL;
747,441✔
73
  int32_t code = tjsonGetIntValue(pJson, "snodeId", &pReq->snodeId);
747,441✔
74
  if (TSDB_CODE_SUCCESS == code) {
747,441!
75
    pLeader0 = tjsonGetObjectItem(pJson, "leader0");
747,441✔
76
    if (pLeader0) {
747,441✔
77
      code = tjsonGetIntValue(pLeader0, "nodeId", &pReq->leaders[0].nodeId);
6,078✔
78
      if (TSDB_CODE_SUCCESS == code) {
6,078!
79
        code = tjsonGetTinyIntValue(pLeader0, "inUse", &pReq->leaders[0].epSet.inUse);
6,078✔
80
      }
81
      if (TSDB_CODE_SUCCESS == code) {
6,078!
82
        code = tjsonGetTinyIntValue(pLeader0, "numOfEps", &pReq->leaders[0].epSet.numOfEps);
6,078✔
83
      }
84
      if (TSDB_CODE_SUCCESS == code) {
6,078!
85
        code = tjsonToArray(pLeader0, "eps", jsonToEp, pReq->leaders[0].epSet.eps, sizeof(SEp));
6,078✔
86
      }
87
    }
88
  }
89

90
  if (TSDB_CODE_SUCCESS == code) {
747,441!
91
    pLeader1 = tjsonGetObjectItem(pJson, "leader1");
747,441✔
92
    if (pLeader1) {
747,441✔
93
      code = tjsonGetIntValue((pLeader1), "nodeId", &pReq->leaders[1].nodeId);
6,078✔
94
      if (TSDB_CODE_SUCCESS == code) {
6,078!
95
        code = tjsonGetTinyIntValue((pLeader1), "inUse", &pReq->leaders[1].epSet.inUse);
6,078✔
96
      }
97
      if (TSDB_CODE_SUCCESS == code) {
6,078!
98
        code = tjsonGetTinyIntValue((pLeader1), "numOfEps", &pReq->leaders[1].epSet.numOfEps);
6,078✔
99
      }
100
      if (TSDB_CODE_SUCCESS == code) {
6,078!
101
        code = tjsonToArray(pLeader1, "eps", jsonToEp, pReq->leaders[1].epSet.eps, sizeof(SEp));
6,078✔
102
      }
103
    }
104
  }
105

106
  if (TSDB_CODE_SUCCESS == code) {
747,441!
107
    pReplica = tjsonGetObjectItem(pJson, "replica");
747,441✔
108
    if (pReplica) {
747,441✔
109
      code = tjsonGetIntValue((pReplica), "nodeId", &pReq->replica.nodeId);
6,078✔
110
      if (TSDB_CODE_SUCCESS == code) {
6,078!
111
        code = tjsonGetTinyIntValue((pReplica), "inUse", &pReq->replica.epSet.inUse);
6,078✔
112
      }
113
      if (TSDB_CODE_SUCCESS == code) {
6,078!
114
        code = tjsonGetTinyIntValue((pReplica), "numOfEps", &pReq->replica.epSet.numOfEps);
6,078✔
115
      }
116
      if (TSDB_CODE_SUCCESS == code) {
6,078!
117
        code = tjsonToArray(pReplica, "eps", jsonToEp, pReq->replica.epSet.eps, sizeof(SEp));
6,078✔
118
      }
119
    }
120
  }
121

122
  return code;
747,441✔
123
}
124

125
int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
374,488✔
126
  int32_t          code = 0;
374,488✔
127
  int32_t          lino = 0;
374,488✔
128
  SDCreateSnodeReq createReq = {0};
374,488✔
129
  if (tDeserializeSDCreateSNodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
374,488!
130
    code = TSDB_CODE_INVALID_MSG;
×
131
    return code;
×
132
  }
133

134
  if (pInput->pData->dnodeId != 0 && createReq.snodeId != pInput->pData->dnodeId) {
374,488!
135
    code = TSDB_CODE_INVALID_OPTION;
×
136
    dError("failed to create snode since %s", tstrerror(code));
×
137
    goto _exit;
×
138
  }
139

140
  bool deployed = true;
374,488✔
141
  SJson *pJson = tjsonCreateObject();
374,488✔
142
  if (pJson == NULL) {
374,488!
143
    code = terrno;
×
144
    dError("failed to create json object since %s", tstrerror(code));
×
145
    goto _exit;
×
146
  }
147

148
  TAOS_CHECK_EXIT(tjsonAddDoubleToObject(pJson, "deployed", deployed));
374,488!
149
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(pJson, "snodeId", createReq.snodeId));
374,488!
150

151
  SJson *leader0 = tjsonCreateObject();
374,488✔
152
  TAOS_CHECK_EXIT(tjsonAddItemToObject(pJson, "leader0", leader0));
374,488!
153
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader0, "nodeId", createReq.leaders[0].nodeId));
374,488!
154
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader0, "inUse", createReq.leaders[0].epSet.inUse));
374,488!
155
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader0, "numOfEps", createReq.leaders[0].epSet.numOfEps));
374,488!
156
  TAOS_CHECK_EXIT(tjsonAddArray(leader0, "eps", epToJson, createReq.leaders[0].epSet.eps, sizeof(SEp), createReq.leaders[0].epSet.numOfEps));
374,488!
157

158
  SJson *leader1 = tjsonCreateObject();
374,488✔
159
  TAOS_CHECK_EXIT(tjsonAddItemToObject(pJson, "leader1", leader1));
374,488!
160
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader1, "nodeId", createReq.leaders[1].nodeId));
374,488!
161
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader1, "inUse", createReq.leaders[1].epSet.inUse));
374,488!
162
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader1, "numOfEps", createReq.leaders[1].epSet.numOfEps));
374,488!
163
  TAOS_CHECK_EXIT(tjsonAddArray(leader1, "eps", epToJson, createReq.leaders[1].epSet.eps, sizeof(SEp), createReq.leaders[1].epSet.numOfEps));
374,488!
164

165
  SJson *replica = tjsonCreateObject();
374,488✔
166
  TAOS_CHECK_EXIT(tjsonAddItemToObject(pJson, "replica", replica));
374,488!
167
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(replica, "nodeId", createReq.replica.nodeId));
374,488!
168
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(replica, "inUse", createReq.replica.epSet.inUse));
374,488!
169
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(replica, "numOfEps", createReq.replica.epSet.numOfEps));
374,488!
170
  TAOS_CHECK_EXIT(tjsonAddArray(replica, "eps", epToJson, createReq.replica.epSet.eps, sizeof(SEp), createReq.replica.epSet.numOfEps));
374,488!
171

172
  char path[TSDB_FILENAME_LEN];
374,278✔
173
  snprintf(path, TSDB_FILENAME_LEN, "%s%ssnode%d", pInput->path, TD_DIRSEP, createReq.snodeId);
374,488!
174

175
  if (taosMulMkDir(path) != 0) {
374,488!
176
    code = terrno;
×
177
    dError("failed to create dir:%s since %s", path, tstrerror(code));
×
178
    goto _exit;
×
179
  }
180

181
  dInfo("path %s created", path);
374,488!
182
  
183
  if ((code = dmWriteFileJson(path, pInput->name, pJson)) != 0) {
374,488!
184
    dError("failed to write snode file since %s", tstrerror(code));
×
185
    goto _exit;
×
186
  }
187

188
  if (createReq.replica.nodeId != gSnode.snodeReplica.nodeId && createReq.replica.nodeId != 0) {
374,488✔
189
    int32_t ret = streamSyncAllCheckpoints(&createReq.replica.epSet);
167,758✔
190
    dInfo("[checkpoint] sync all checkpoint from snode %d to replicaId:%d, return:%d", createReq.snodeId, createReq.replica.nodeId, ret);
167,758!
191
  }
192
  smUpdateSnodeInfo(&createReq);
374,488✔
193

194
  dInfo("snode %d created, replicaId:%d", createReq.snodeId, createReq.replica.nodeId);
374,488!
195

196
_exit:
374,278✔
197

198
  tFreeSDCreateSnodeReq(&createReq);
374,488✔
199
  
200
  return code;
374,488✔
201
}
202

203
int32_t smProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
88,474✔
204
  int32_t        code = 0;
88,474✔
205
  SDDropSnodeReq dropReq = {0};
88,474✔
206
  if (tDeserializeSCreateDropMQSNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
88,474!
207
    code = TSDB_CODE_INVALID_MSG;
×
208

209
    return code;
×
210
  }
211

212
  if (pInput->pData->dnodeId != 0 && dropReq.dnodeId != pInput->pData->dnodeId) {
88,474!
213
    code = TSDB_CODE_INVALID_OPTION;
×
214
    dError("failed to drop snode since %s", tstrerror(code));
×
215
    tFreeSMCreateQnodeReq(&dropReq);
×
216
    return code;
×
217
  }
218

219
  char path[TSDB_FILENAME_LEN];
88,474✔
220
  snprintf(path, TSDB_FILENAME_LEN, "%s%ssnode%d", pInput->path, TD_DIRSEP, dropReq.dnodeId);
88,474!
221

222
  streamDeleteAllCheckpoints();
88,474✔
223

224
  bool deployed = false;
88,474✔
225
  if ((code = dmWriteFile(path, pInput->name, deployed)) != 0) {
88,474!
226
    dError("failed to write snode file since %s", tstrerror(code));
×
227
    tFreeSMCreateQnodeReq(&dropReq);
×
228
    return code;
×
229
  }
230

231
  smUndeploySnodeTasks(true);
88,474✔
232

233
  tFreeSMCreateQnodeReq(&dropReq);
88,474✔
234
  return 0;
88,474✔
235
}
236

237
SArray *smGetMsgHandles() {
746,863✔
238
  int32_t code = -1;
746,863✔
239
  SArray *pArray = taosArrayInit(4, sizeof(SMgmtHandle));
746,863✔
240
  if (pArray == NULL) goto _OVER;
746,863!
241

242
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_SYNC_CHECKPOINT, smPutMsgToRunnerQueue, 0) == NULL) goto _OVER;
746,863!
243
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_SYNC_CHECKPOINT_RSP, smPutMsgToRunnerQueue, 0) == NULL) goto _OVER;
746,863!
244
  
245
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_DELETE_CHECKPOINT, smPutMsgToRunnerQueue, 0) == NULL) goto _OVER;
746,863!
246

247
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_CALC, smPutMsgToRunnerQueue, 1) == NULL) goto _OVER;
746,863!
248
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_FETCH_FROM_RUNNER, smPutMsgToRunnerQueue, 0) == NULL) goto _OVER;
746,863!
249
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_FETCH_FROM_CACHE, smPutMsgToRunnerQueue, 0) == NULL) goto _OVER;
746,863!
250
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_DROP, smPutMsgToRunnerQueue, 1) == NULL) goto _OVER;
746,863!
251

252
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_PULL_RSP, smPutMsgToTriggerQueue, 0) == NULL) goto _OVER;
746,863!
253
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_CALC_RSP, smPutMsgToTriggerQueue, 0) == NULL) goto _OVER;
746,863!
254
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_DROP_RSP, smPutMsgToTriggerQueue, 1) == NULL) goto _OVER;
746,863!
255
  if (dmSetMgmtHandle(pArray, TDMT_VND_SNODE_DROP_TABLE_RSP, smPutMsgToTriggerQueue, 1) == NULL) goto _OVER;
746,863!
256
  if (dmSetMgmtHandle(pArray, TDMT_SND_BATCH_META, smPutMsgToTriggerQueue, 0) == NULL) goto _OVER;
746,863!
257

258
  code = 0;
746,863✔
259
  
260
_OVER:
746,863✔
261
  if (code != 0) {
746,863!
262
    taosArrayDestroy(pArray);
×
263
    return NULL;
×
264
  } else {
265
    return pArray;
746,863✔
266
  }
267
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc