• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4908

30 Dec 2025 10:52AM UTC coverage: 65.386% (-0.2%) from 65.541%
#4908

push

travis-ci

web-flow
enh: drop multi-stream (#33962)

60 of 106 new or added lines in 4 files covered. (56.6%)

1330 existing lines in 113 files now uncovered.

193461 of 295877 relevant lines covered (65.39%)

115765274.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.05
/source/dnode/mgmt/mgmt_snode/src/smHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "smInt.h"
18
#include "stream.h"
19

20
SSnodeInfo gSnode = {0};
21

22

23
void smGetMonitorInfo(SSnodeMgmt *pMgmt, SMonSmInfo *smInfo) {}
×
24

25
static int32_t epToJson(const void* pObj, SJson* pJson) {
248,100✔
26
  const SEp* pNode = (const SEp*)pObj;
248,100✔
27

28
  int32_t code = tjsonAddStringToObject(pJson, "fqdn", pNode->fqdn);
248,100✔
29
  if (TSDB_CODE_SUCCESS == code) {
248,100✔
30
    code = tjsonAddIntegerToObject(pJson, "port", pNode->port);
248,100✔
31
  }
32

33
  return code;
248,100✔
34
}
35

36
static int32_t jsonToEp(const SJson* pJson, void* pObj) {
4,600✔
37
  SEp* pNode = (SEp*)pObj;
4,600✔
38

39
  int32_t code = tjsonGetStringValue(pJson, "fqdn", pNode->fqdn);
4,600✔
40
  if (TSDB_CODE_SUCCESS == code) {
4,600✔
41
    code = tjsonGetSmallIntValue(pJson, "port", &pNode->port);
4,600✔
42
  }
43

44
  return code;
4,600✔
45
}
46

47

48
void smUpdateSnodeInfo(SDCreateSnodeReq* pReq) {
727,283✔
49
  taosWLockLatch(&gSnode.snodeLock);
727,283✔
50
  gSnode.snodeId = pReq->snodeId;
727,283✔
51
  gSnode.snodeLeaders[0] = pReq->leaders[0];
727,283✔
52
  gSnode.snodeLeaders[1] = pReq->leaders[1];  
727,283✔
53
  gSnode.snodeReplica = pReq->replica;
727,283✔
54
  taosWUnLockLatch(&gSnode.snodeLock);
727,283✔
55
}
727,283✔
56

57
SEpSet* dmGetSynEpset(int32_t leaderId) {
334,659✔
58
  if (gSnode.snodeId == leaderId && gSnode.snodeReplica.nodeId > 0) {
334,659✔
59
    return &gSnode.snodeReplica.epSet;
5,991✔
60
  } 
61
  for (int32_t i = 0; i < 2; ++i) {
986,004✔
62
    if (gSnode.snodeLeaders[i].nodeId == leaderId) {
657,336✔
UNCOV
63
      return &gSnode.snodeLeaders[i].epSet;
×
64
    }
65
  }
66
  return NULL;
328,668✔
67
}
68

69
int32_t smBuildCreateReqFromJson(SJson *pJson, SDCreateSnodeReq *pReq) {
570,953✔
70
  SJson* pLeader0 = NULL;
570,953✔
71
  SJson* pLeader1 = NULL;
570,953✔
72
  SJson* pReplica = NULL;
570,953✔
73
  int32_t code = tjsonGetIntValue(pJson, "snodeId", &pReq->snodeId);
570,953✔
74
  if (TSDB_CODE_SUCCESS == code) {
570,953✔
75
    pLeader0 = tjsonGetObjectItem(pJson, "leader0");
570,953✔
76
    if (pLeader0) {
570,953✔
77
      code = tjsonGetIntValue(pLeader0, "nodeId", &pReq->leaders[0].nodeId);
3,019✔
78
      if (TSDB_CODE_SUCCESS == code) {
3,019✔
79
        code = tjsonGetTinyIntValue(pLeader0, "inUse", &pReq->leaders[0].epSet.inUse);
3,019✔
80
      }
81
      if (TSDB_CODE_SUCCESS == code) {
3,019✔
82
        code = tjsonGetTinyIntValue(pLeader0, "numOfEps", &pReq->leaders[0].epSet.numOfEps);
3,019✔
83
      }
84
      if (TSDB_CODE_SUCCESS == code) {
3,019✔
85
        code = tjsonToArray(pLeader0, "eps", jsonToEp, pReq->leaders[0].epSet.eps, sizeof(SEp));
3,019✔
86
      }
87
    }
88
  }
89

90
  if (TSDB_CODE_SUCCESS == code) {
570,953✔
91
    pLeader1 = tjsonGetObjectItem(pJson, "leader1");
570,953✔
92
    if (pLeader1) {
570,953✔
93
      code = tjsonGetIntValue((pLeader1), "nodeId", &pReq->leaders[1].nodeId);
3,019✔
94
      if (TSDB_CODE_SUCCESS == code) {
3,019✔
95
        code = tjsonGetTinyIntValue((pLeader1), "inUse", &pReq->leaders[1].epSet.inUse);
3,019✔
96
      }
97
      if (TSDB_CODE_SUCCESS == code) {
3,019✔
98
        code = tjsonGetTinyIntValue((pLeader1), "numOfEps", &pReq->leaders[1].epSet.numOfEps);
3,019✔
99
      }
100
      if (TSDB_CODE_SUCCESS == code) {
3,019✔
101
        code = tjsonToArray(pLeader1, "eps", jsonToEp, pReq->leaders[1].epSet.eps, sizeof(SEp));
3,019✔
102
      }
103
    }
104
  }
105

106
  if (TSDB_CODE_SUCCESS == code) {
570,953✔
107
    pReplica = tjsonGetObjectItem(pJson, "replica");
570,953✔
108
    if (pReplica) {
570,953✔
109
      code = tjsonGetIntValue((pReplica), "nodeId", &pReq->replica.nodeId);
3,019✔
110
      if (TSDB_CODE_SUCCESS == code) {
3,019✔
111
        code = tjsonGetTinyIntValue((pReplica), "inUse", &pReq->replica.epSet.inUse);
3,019✔
112
      }
113
      if (TSDB_CODE_SUCCESS == code) {
3,019✔
114
        code = tjsonGetTinyIntValue((pReplica), "numOfEps", &pReq->replica.epSet.numOfEps);
3,019✔
115
      }
116
      if (TSDB_CODE_SUCCESS == code) {
3,019✔
117
        code = tjsonToArray(pReplica, "eps", jsonToEp, pReq->replica.epSet.eps, sizeof(SEp));
3,019✔
118
      }
119
    }
120
  }
121

122
  return code;
570,953✔
123
}
124

125
int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
156,330✔
126
  int32_t          code = 0;
156,330✔
127
  int32_t          lino = 0;
156,330✔
128
  SDCreateSnodeReq createReq = {0};
156,330✔
129
  if (tDeserializeSDCreateSNodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
156,330✔
130
    code = TSDB_CODE_INVALID_MSG;
×
131
    return code;
×
132
  }
133

134
  if (pInput->pData->dnodeId != 0 && createReq.snodeId != pInput->pData->dnodeId) {
156,330✔
135
    code = TSDB_CODE_INVALID_OPTION;
×
136
    dError("failed to create snode since %s", tstrerror(code));
×
137
    goto _exit;
×
138
  }
139

140
  bool deployed = true;
156,330✔
141
  SJson *pJson = tjsonCreateObject();
156,330✔
142
  if (pJson == NULL) {
156,330✔
143
    code = terrno;
×
144
    dError("failed to create json object since %s", tstrerror(code));
×
145
    goto _exit;
×
146
  }
147

148
  TAOS_CHECK_EXIT(tjsonAddDoubleToObject(pJson, "deployed", deployed));
156,330✔
149
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(pJson, "snodeId", createReq.snodeId));
156,330✔
150

151
  SJson *leader0 = tjsonCreateObject();
156,330✔
152
  TAOS_CHECK_EXIT(tjsonAddItemToObject(pJson, "leader0", leader0));
156,330✔
153
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader0, "nodeId", createReq.leaders[0].nodeId));
156,330✔
154
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader0, "inUse", createReq.leaders[0].epSet.inUse));
156,330✔
155
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader0, "numOfEps", createReq.leaders[0].epSet.numOfEps));
156,330✔
156
  TAOS_CHECK_EXIT(tjsonAddArray(leader0, "eps", epToJson, createReq.leaders[0].epSet.eps, sizeof(SEp), createReq.leaders[0].epSet.numOfEps));
156,330✔
157

158
  SJson *leader1 = tjsonCreateObject();
156,330✔
159
  TAOS_CHECK_EXIT(tjsonAddItemToObject(pJson, "leader1", leader1));
156,330✔
160
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader1, "nodeId", createReq.leaders[1].nodeId));
156,330✔
161
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader1, "inUse", createReq.leaders[1].epSet.inUse));
156,330✔
162
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(leader1, "numOfEps", createReq.leaders[1].epSet.numOfEps));
156,330✔
163
  TAOS_CHECK_EXIT(tjsonAddArray(leader1, "eps", epToJson, createReq.leaders[1].epSet.eps, sizeof(SEp), createReq.leaders[1].epSet.numOfEps));
156,330✔
164

165
  SJson *replica = tjsonCreateObject();
156,330✔
166
  TAOS_CHECK_EXIT(tjsonAddItemToObject(pJson, "replica", replica));
156,330✔
167
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(replica, "nodeId", createReq.replica.nodeId));
156,330✔
168
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(replica, "inUse", createReq.replica.epSet.inUse));
156,330✔
169
  TAOS_CHECK_EXIT(tjsonAddIntegerToObject(replica, "numOfEps", createReq.replica.epSet.numOfEps));
156,330✔
170
  TAOS_CHECK_EXIT(tjsonAddArray(replica, "eps", epToJson, createReq.replica.epSet.eps, sizeof(SEp), createReq.replica.epSet.numOfEps));
156,330✔
171

172
  char path[TSDB_FILENAME_LEN];
156,264✔
173
  snprintf(path, TSDB_FILENAME_LEN, "%s%ssnode%d", pInput->path, TD_DIRSEP, createReq.snodeId);
156,330✔
174

175
  if (taosMulMkDir(path) != 0) {
156,330✔
176
    code = terrno;
×
177
    dError("failed to create dir:%s since %s", path, tstrerror(code));
×
178
    goto _exit;
×
179
  }
180

181
  dInfo("path %s created", path);
156,330✔
182
  
183
  if ((code = dmWriteFileJson(path, pInput->name, pJson)) != 0) {
156,330✔
184
    dError("failed to write snode file since %s", tstrerror(code));
×
185
    goto _exit;
×
186
  }
187

188
  if (createReq.replica.nodeId != gSnode.snodeReplica.nodeId && createReq.replica.nodeId != 0) {
156,330✔
189
    int32_t ret = streamSyncAllCheckpoints(&createReq.replica.epSet);
69,837✔
190
    dInfo("[checkpoint] sync all checkpoint from snode %d to replicaId:%d, return:%d", createReq.snodeId, createReq.replica.nodeId, ret);
69,837✔
191
  }
192
  smUpdateSnodeInfo(&createReq);
156,330✔
193

194
  dInfo("snode %d created, replicaId:%d", createReq.snodeId, createReq.replica.nodeId);
156,330✔
195

196
_exit:
156,264✔
197

198
  tFreeSDCreateSnodeReq(&createReq);
156,330✔
199
  
200
  return code;
156,330✔
201
}
202

203
int32_t smProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
35,946✔
204
  int32_t        code = 0;
35,946✔
205
  SDDropSnodeReq dropReq = {0};
35,946✔
206
  if (tDeserializeSCreateDropMQSNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
35,946✔
207
    code = TSDB_CODE_INVALID_MSG;
×
208

209
    return code;
×
210
  }
211

212
  if (pInput->pData->dnodeId != 0 && dropReq.dnodeId != pInput->pData->dnodeId) {
35,946✔
213
    code = TSDB_CODE_INVALID_OPTION;
×
214
    dError("failed to drop snode since %s", tstrerror(code));
×
215
    tFreeSMCreateQnodeReq(&dropReq);
×
216
    return code;
×
217
  }
218

219
  char path[TSDB_FILENAME_LEN];
35,946✔
220
  snprintf(path, TSDB_FILENAME_LEN, "%s%ssnode%d", pInput->path, TD_DIRSEP, dropReq.dnodeId);
35,946✔
221

222
  streamDeleteAllCheckpoints();
35,946✔
223

224
  bool deployed = false;
35,946✔
225
  if ((code = dmWriteFile(path, pInput->name, deployed)) != 0) {
35,946✔
226
    dError("failed to write snode file since %s", tstrerror(code));
×
227
    tFreeSMCreateQnodeReq(&dropReq);
×
228
    return code;
×
229
  }
230

231
  smUndeploySnodeTasks(true);
35,946✔
232

233
  tFreeSMCreateQnodeReq(&dropReq);
35,946✔
234
  return 0;
35,946✔
235
}
236

237
SArray *smGetMsgHandles() {
561,214✔
238
  int32_t code = -1;
561,214✔
239
  SArray *pArray = taosArrayInit(4, sizeof(SMgmtHandle));
561,214✔
240
  if (pArray == NULL) goto _OVER;
561,214✔
241

242
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_SYNC_CHECKPOINT, smPutMsgToRunnerQueue, 0) == NULL) goto _OVER;
561,214✔
243
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_SYNC_CHECKPOINT_RSP, smPutMsgToRunnerQueue, 0) == NULL) goto _OVER;
561,214✔
244
  
245
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_DELETE_CHECKPOINT, smPutMsgToRunnerQueue, 0) == NULL) goto _OVER;
561,214✔
246

247
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_CALC, smPutMsgToRunnerQueue, 1) == NULL) goto _OVER;
561,214✔
248
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_FETCH_FROM_RUNNER, smPutMsgToRunnerQueue, 0) == NULL) goto _OVER;
561,214✔
249
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_FETCH_FROM_CACHE, smPutMsgToRunnerQueue, 0) == NULL) goto _OVER;
561,214✔
250
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_DROP, smPutMsgToRunnerQueue, 1) == NULL) goto _OVER;
561,214✔
251

252
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_PULL_RSP, smPutMsgToTriggerQueue, 0) == NULL) goto _OVER;
561,214✔
253
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_CALC_RSP, smPutMsgToTriggerQueue, 0) == NULL) goto _OVER;
561,214✔
254
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_DROP_RSP, smPutMsgToTriggerQueue, 1) == NULL) goto _OVER;
561,214✔
255
  if (dmSetMgmtHandle(pArray, TDMT_VND_SNODE_DROP_TABLE_RSP, smPutMsgToTriggerQueue, 1) == NULL) goto _OVER;
561,214✔
256
  if (dmSetMgmtHandle(pArray, TDMT_SND_BATCH_META, smPutMsgToTriggerQueue, 0) == NULL) goto _OVER;
561,214✔
257

258
  code = 0;
561,214✔
259
  
260
_OVER:
561,214✔
261
  if (code != 0) {
561,214✔
262
    taosArrayDestroy(pArray);
×
263
    return NULL;
×
264
  } else {
265
    return pArray;
561,214✔
266
  }
267
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc