• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4887

16 Dec 2025 08:27AM UTC coverage: 65.289% (-0.003%) from 65.292%
#4887

push

travis-ci

web-flow
feat[TS-7233]: audit (#33850)

377 of 536 new or added lines in 28 files covered. (70.34%)

1025 existing lines in 111 files now uncovered.

178977 of 274129 relevant lines covered (65.29%)

102580217.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.05
/source/dnode/mgmt/mgmt_snode/src/smHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "smInt.h"
18
#include "stream.h"
19

20
SSnodeInfo gSnode = {0};
21

22

23
void smGetMonitorInfo(SSnodeMgmt *pMgmt, SMonSmInfo *smInfo) {}
×
24

25
static int32_t epToJson(const void* pObj, SJson* pJson) {
395,996✔
26
  const SEp* pNode = (const SEp*)pObj;
395,996✔
27

28
  int32_t code = tjsonAddStringToObject(pJson, "fqdn", pNode->fqdn);
395,996✔
29
  if (TSDB_CODE_SUCCESS == code) {
395,996✔
30
    code = tjsonAddIntegerToObject(pJson, "port", pNode->port);
395,996✔
31
  }
32

33
  return code;
395,996✔
34
}
35

36
static int32_t jsonToEp(const SJson* pJson, void* pObj) {
8,448✔
37
  SEp* pNode = (SEp*)pObj;
8,448✔
38

39
  int32_t code = tjsonGetStringValue(pJson, "fqdn", pNode->fqdn);
8,448✔
40
  if (TSDB_CODE_SUCCESS == code) {
8,448✔
41
    code = tjsonGetSmallIntValue(pJson, "port", &pNode->port);
8,448✔
42
  }
43

44
  return code;
8,448✔
45
}
46

47

48
void smUpdateSnodeInfo(SDCreateSnodeReq* pReq) {
983,614✔
49
  taosWLockLatch(&gSnode.snodeLock);
983,614✔
50
  gSnode.snodeId = pReq->snodeId;
983,614✔
51
  gSnode.snodeLeaders[0] = pReq->leaders[0];
983,614✔
52
  gSnode.snodeLeaders[1] = pReq->leaders[1];  
983,614✔
53
  gSnode.snodeReplica = pReq->replica;
983,614✔
54
  taosWUnLockLatch(&gSnode.snodeLock);
983,614✔
55
}
983,614✔
56

57
SEpSet* dmGetSynEpset(int32_t leaderId) {
510,740✔
58
  if (gSnode.snodeId == leaderId && gSnode.snodeReplica.nodeId > 0) {
510,740✔
59
    return &gSnode.snodeReplica.epSet;
14,553✔
60
  } 
61
  for (int32_t i = 0; i < 2; ++i) {
1,488,561✔
62
    if (gSnode.snodeLeaders[i].nodeId == leaderId) {
992,374✔
UNCOV
63
      return &gSnode.snodeLeaders[i].epSet;
×
64
    }
65
  }
66
  return NULL;
496,187✔
67
}
68

69
int32_t smBuildCreateReqFromJson(SJson *pJson, SDCreateSnodeReq *pReq) {
729,365✔
70
  SJson* pLeader0 = NULL;
729,365✔
71
  SJson* pLeader1 = NULL;
729,365✔
72
  SJson* pReplica = NULL;
729,365✔
73
  int32_t code = tjsonGetIntValue(pJson, "snodeId", &pReq->snodeId);
729,365✔
74
  if (TSDB_CODE_SUCCESS == code) {
729,365✔
75
    pLeader0 = tjsonGetObjectItem(pJson, "leader0");
729,365✔
76
    if (pLeader0) {
729,365✔
77
      code = tjsonGetIntValue(pLeader0, "nodeId", &pReq->leaders[0].nodeId);
4,272✔
78
      if (TSDB_CODE_SUCCESS == code) {
4,272✔
79
        code = tjsonGetTinyIntValue(pLeader0, "inUse", &pReq->leaders[0].epSet.inUse);
4,272✔
80
      }
81
      if (TSDB_CODE_SUCCESS == code) {
4,272✔
82
        code = tjsonGetTinyIntValue(pLeader0, "numOfEps", &pReq->leaders[0].epSet.numOfEps);
4,272✔
83
      }
84
      if (TSDB_CODE_SUCCESS == code) {
4,272✔
85
        code = tjsonToArray(pLeader0, "eps", jsonToEp, pReq->leaders[0].epSet.eps, sizeof(SEp));
4,272✔
86
      }
87
    }
88
  }
89

90
  if (TSDB_CODE_SUCCESS == code) {
729,365✔
91
    pLeader1 = tjsonGetObjectItem(pJson, "leader1");
729,365✔
92
    if (pLeader1) {
729,365✔
93
      code = tjsonGetIntValue((pLeader1), "nodeId", &pReq->leaders[1].nodeId);
4,272✔
94
      if (TSDB_CODE_SUCCESS == code) {
4,272✔
95
        code = tjsonGetTinyIntValue((pLeader1), "inUse", &pReq->leaders[1].epSet.inUse);
4,272✔
96
      }
97
      if (TSDB_CODE_SUCCESS == code) {
4,272✔
98
        code = tjsonGetTinyIntValue((pLeader1), "numOfEps", &pReq->leaders[1].epSet.numOfEps);
4,272✔
99
      }
100
      if (TSDB_CODE_SUCCESS == code) {
4,272✔
101
        code = tjsonToArray(pLeader1, "eps", jsonToEp, pReq->leaders[1].epSet.eps, sizeof(SEp));
4,272✔
102
      }
103
    }
104
  }
105

106
  if (TSDB_CODE_SUCCESS == code) {
729,365✔
107
    pReplica = tjsonGetObjectItem(pJson, "replica");
729,365✔
108
    if (pReplica) {
729,365✔
109
      code = tjsonGetIntValue((pReplica), "nodeId", &pReq->replica.nodeId);
4,272✔
110
      if (TSDB_CODE_SUCCESS == code) {
4,272✔
111
        code = tjsonGetTinyIntValue((pReplica), "inUse", &pReq->replica.epSet.inUse);
4,272✔
112
      }
113
      if (TSDB_CODE_SUCCESS == code) {
4,272✔
114
        code = tjsonGetTinyIntValue((pReplica), "numOfEps", &pReq->replica.epSet.numOfEps);
4,272✔
115
      }
116
      if (TSDB_CODE_SUCCESS == code) {
4,272✔
117
        code = tjsonToArray(pReplica, "eps", jsonToEp, pReq->replica.epSet.eps, sizeof(SEp));
4,272✔
118
      }
119
    }
120
  }
121

122
  return code;
729,365✔
123
}
124

125
int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
254,249✔
126
  int32_t          code = 0;
254,249✔
127
  int32_t          lino = 0;
254,249✔
128
  SDCreateSnodeReq createReq = {0};
254,249✔
129
  if (tDeserializeSDCreateSNodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
254,249✔
130
    code = TSDB_CODE_INVALID_MSG;
×
131
    return code;
×
132
  }
133

134
  if (pInput->pData->dnodeId != 0 && createReq.snodeId != pInput->pData->dnodeId) {
254,249✔
135
    code = TSDB_CODE_INVALID_OPTION;
×
136
    dError("failed to create snode since %s", tstrerror(code));
×
137
    goto _exit;
×
138
  }
139

140
  bool deployed = true;
254,249✔
141
  SJson *pJson = tjsonCreateObject();
254,249✔
142
  if (pJson == NULL) {
254,249✔
143
    code = terrno;
×
144
    dError("failed to create json object since %s", tstrerror(code));
×
145
    goto _exit;
×
146
  }
147

148
  TAOS_CHECK_EXIT(tjsonAddDoubleToObject(pJson, "deployed", deployed));
254,249✔
149
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(pJson, "snodeId", createReq.snodeId));
254,249✔
150

151
  SJson *leader0 = tjsonCreateObject();
254,249✔
152
  TAOS_CHECK_EXIT(tjsonAddItemToObject(pJson, "leader0", leader0));
254,249✔
153
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader0, "nodeId", createReq.leaders[0].nodeId));
254,249✔
154
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader0, "inUse", createReq.leaders[0].epSet.inUse));
254,249✔
155
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader0, "numOfEps", createReq.leaders[0].epSet.numOfEps));
254,249✔
156
  TAOS_CHECK_EXIT(tjsonAddArray(leader0, "eps", epToJson, createReq.leaders[0].epSet.eps, sizeof(SEp), createReq.leaders[0].epSet.numOfEps));
254,249✔
157

158
  SJson *leader1 = tjsonCreateObject();
254,249✔
159
  TAOS_CHECK_EXIT(tjsonAddItemToObject(pJson, "leader1", leader1));
254,249✔
160
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader1, "nodeId", createReq.leaders[1].nodeId));
254,249✔
161
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader1, "inUse", createReq.leaders[1].epSet.inUse));
254,249✔
162
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader1, "numOfEps", createReq.leaders[1].epSet.numOfEps));
254,249✔
163
  TAOS_CHECK_EXIT(tjsonAddArray(leader1, "eps", epToJson, createReq.leaders[1].epSet.eps, sizeof(SEp), createReq.leaders[1].epSet.numOfEps));
254,249✔
164

165
  SJson *replica = tjsonCreateObject();
254,249✔
166
  TAOS_CHECK_EXIT(tjsonAddItemToObject(pJson, "replica", replica));
254,249✔
167
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(replica, "nodeId", createReq.replica.nodeId));
254,249✔
168
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(replica, "inUse", createReq.replica.epSet.inUse));
254,249✔
169
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(replica, "numOfEps", createReq.replica.epSet.numOfEps));
254,249✔
170
  TAOS_CHECK_EXIT(tjsonAddArray(replica, "eps", epToJson, createReq.replica.epSet.eps, sizeof(SEp), createReq.replica.epSet.numOfEps));
254,249✔
171

172
  char path[TSDB_FILENAME_LEN];
254,151✔
173
  snprintf(path, TSDB_FILENAME_LEN, "%s%ssnode%d", pInput->path, TD_DIRSEP, createReq.snodeId);
254,249✔
174

175
  if (taosMulMkDir(path) != 0) {
254,249✔
176
    code = terrno;
×
177
    dError("failed to create dir:%s since %s", path, tstrerror(code));
×
178
    goto _exit;
×
179
  }
180

181
  dInfo("path %s created", path);
254,249✔
182
  
183
  if ((code = dmWriteFileJson(path, pInput->name, pJson)) != 0) {
254,249✔
184
    dError("failed to write snode file since %s", tstrerror(code));
×
185
    goto _exit;
×
186
  }
187

188
  if (createReq.replica.nodeId != gSnode.snodeReplica.nodeId && createReq.replica.nodeId != 0) {
254,249✔
189
    int32_t ret = streamSyncAllCheckpoints(&createReq.replica.epSet);
110,892✔
190
    dInfo("[checkpoint] sync all checkpoint from snode %d to replicaId:%d, return:%d", createReq.snodeId, createReq.replica.nodeId, ret);
110,892✔
191
  }
192
  smUpdateSnodeInfo(&createReq);
254,249✔
193

194
  dInfo("snode %d created, replicaId:%d", createReq.snodeId, createReq.replica.nodeId);
254,249✔
195

196
_exit:
254,151✔
197

198
  tFreeSDCreateSnodeReq(&createReq);
254,249✔
199
  
200
  return code;
254,249✔
201
}
202

203
int32_t smProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
58,357✔
204
  int32_t        code = 0;
58,357✔
205
  SDDropSnodeReq dropReq = {0};
58,357✔
206
  if (tDeserializeSCreateDropMQSNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
58,357✔
207
    code = TSDB_CODE_INVALID_MSG;
×
208

209
    return code;
×
210
  }
211

212
  if (pInput->pData->dnodeId != 0 && dropReq.dnodeId != pInput->pData->dnodeId) {
58,357✔
213
    code = TSDB_CODE_INVALID_OPTION;
×
214
    dError("failed to drop snode since %s", tstrerror(code));
×
215
    tFreeSMCreateQnodeReq(&dropReq);
×
216
    return code;
×
217
  }
218

219
  char path[TSDB_FILENAME_LEN];
58,357✔
220
  snprintf(path, TSDB_FILENAME_LEN, "%s%ssnode%d", pInput->path, TD_DIRSEP, dropReq.dnodeId);
58,357✔
221

222
  streamDeleteAllCheckpoints();
58,357✔
223

224
  bool deployed = false;
58,357✔
225
  if ((code = dmWriteFile(path, pInput->name, deployed)) != 0) {
58,357✔
226
    dError("failed to write snode file since %s", tstrerror(code));
×
227
    tFreeSMCreateQnodeReq(&dropReq);
×
228
    return code;
×
229
  }
230

231
  smUndeploySnodeTasks(true);
58,357✔
232

233
  tFreeSMCreateQnodeReq(&dropReq);
58,357✔
234
  return 0;
58,357✔
235
}
236

237
SArray *smGetMsgHandles() {
719,969✔
238
  int32_t code = -1;
719,969✔
239
  SArray *pArray = taosArrayInit(4, sizeof(SMgmtHandle));
719,969✔
240
  if (pArray == NULL) goto _OVER;
719,969✔
241

242
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_SYNC_CHECKPOINT, smPutMsgToRunnerQueue, 0) == NULL) goto _OVER;
719,969✔
243
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_SYNC_CHECKPOINT_RSP, smPutMsgToRunnerQueue, 0) == NULL) goto _OVER;
719,969✔
244
  
245
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_DELETE_CHECKPOINT, smPutMsgToRunnerQueue, 0) == NULL) goto _OVER;
719,969✔
246

247
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_CALC, smPutMsgToRunnerQueue, 1) == NULL) goto _OVER;
719,969✔
248
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_FETCH_FROM_RUNNER, smPutMsgToRunnerQueue, 0) == NULL) goto _OVER;
719,969✔
249
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_FETCH_FROM_CACHE, smPutMsgToRunnerQueue, 0) == NULL) goto _OVER;
719,969✔
250
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_DROP, smPutMsgToRunnerQueue, 1) == NULL) goto _OVER;
719,969✔
251

252
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_PULL_RSP, smPutMsgToTriggerQueue, 0) == NULL) goto _OVER;
719,969✔
253
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_CALC_RSP, smPutMsgToTriggerQueue, 0) == NULL) goto _OVER;
719,969✔
254
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_DROP_RSP, smPutMsgToTriggerQueue, 1) == NULL) goto _OVER;
719,969✔
255
  if (dmSetMgmtHandle(pArray, TDMT_VND_SNODE_DROP_TABLE_RSP, smPutMsgToTriggerQueue, 1) == NULL) goto _OVER;
719,969✔
256
  if (dmSetMgmtHandle(pArray, TDMT_SND_BATCH_META, smPutMsgToTriggerQueue, 0) == NULL) goto _OVER;
719,969✔
257

258
  code = 0;
719,969✔
259
  
260
_OVER:
719,969✔
261
  if (code != 0) {
719,969✔
262
    taosArrayDestroy(pArray);
×
263
    return NULL;
×
264
  } else {
265
    return pArray;
719,969✔
266
  }
267
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc