• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4797

16 Oct 2025 01:24AM UTC coverage: 61.083% (+0.2%) from 60.915%
#4797

push

travis-ci

web-flow
Merge 9f5f33536 into 19574fe21

155292 of 324369 branches covered (47.88%)

Branch coverage included in aggregate %.

79 of 100 new or added lines in 19 files covered. (79.0%)

2515 existing lines in 105 files now uncovered.

207484 of 269534 relevant lines covered (76.98%)

126629055.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.4
/source/dnode/mnode/impl/src/mndSnode.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndDnode.h"
18
#include "mndPrivilege.h"
19
#include "mndShow.h"
20
#include "mndSnode.h"
21
#include "mndTrans.h"
22
#include "mndUser.h"
23
#include "mndStream.h"
24

25
#define SNODE_VER_NUMBER   2
26
#define SNODE_RESERVE_SIZE 64
27

28
static SSdbRaw *mndSnodeActionEncode(SSnodeObj *pObj);
29
static SSdbRow *mndSnodeActionDecode(SSdbRaw *pRaw);
30
static int32_t  mndSnodeActionInsert(SSdb *pSdb, SSnodeObj *pObj);
31
static int32_t  mndSnodeActionUpdate(SSdb *pSdb, SSnodeObj *pOld, SSnodeObj *pNew);
32
static int32_t  mndSnodeActionDelete(SSdb *pSdb, SSnodeObj *pObj);
33
static int32_t  mndProcessCreateSnodeReq(SRpcMsg *pReq);
34
static int32_t  mndProcessDropSnodeReq(SRpcMsg *pReq);
35
static int32_t  mndRetrieveSnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
36
static void     mndCancelGetNextSnode(SMnode *pMnode, void *pIter);
37

38
int32_t mndInitSnode(SMnode *pMnode) {
553,297✔
39
  SSdbTable table = {
553,297✔
40
      .sdbType = SDB_SNODE,
41
      .keyType = SDB_KEY_INT32,
42
      .encodeFp = (SdbEncodeFp)mndSnodeActionEncode,
43
      .decodeFp = (SdbDecodeFp)mndSnodeActionDecode,
44
      .insertFp = (SdbInsertFp)mndSnodeActionInsert,
45
      .updateFp = (SdbUpdateFp)mndSnodeActionUpdate,
46
      .deleteFp = (SdbDeleteFp)mndSnodeActionDelete,
47
  };
48

49
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_SNODE, mndProcessCreateSnodeReq);
553,297✔
50
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_SNODE, mndProcessDropSnodeReq);
553,297✔
51
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_SNODE_RSP, mndTransProcessRsp);
553,297✔
52
  mndSetMsgHandle(pMnode, TDMT_DND_ALTER_SNODE_RSP, mndTransProcessRsp);
553,297✔
53
  mndSetMsgHandle(pMnode, TDMT_DND_DROP_SNODE_RSP, mndTransProcessRsp);
553,297✔
54

55
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SNODE, mndRetrieveSnodes);
553,297✔
56
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_SNODE, mndCancelGetNextSnode);
553,297✔
57

58
  return sdbSetTable(pMnode->pSdb, table);
553,297✔
59
}
60

61
void mndCleanupSnode(SMnode *pMnode) {}
553,075✔
62

63
SEpSet mndAcquireEpFromSnode(SMnode *pMnode, const SSnodeObj *pSnode) {
×
64
  SEpSet epSet = {.numOfEps = 1, .inUse = 0};
×
65
  memcpy(epSet.eps[0].fqdn, pSnode->pDnode->fqdn, TSDB_FQDN_LEN);
×
66
  epSet.eps[0].port = pSnode->pDnode->port;
×
67
  return epSet;
×
68
}
69

70
SSnodeObj *mndAcquireSnode(SMnode *pMnode, int32_t snodeId) {
1,232,834✔
71
  SSnodeObj *pObj = sdbAcquire(pMnode->pSdb, SDB_SNODE, &snodeId);
1,232,834✔
72
  if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
1,232,834!
73
    terrno = TSDB_CODE_MND_SNODE_NOT_EXIST;
180,987✔
74
  }
75
  return pObj;
1,232,834✔
76
}
77

78
void mndReleaseSnode(SMnode *pMnode, SSnodeObj *pObj) {
1,109,990✔
79
  SSdb *pSdb = pMnode->pSdb;
1,109,990✔
80
  sdbRelease(pSdb, pObj);
1,109,990✔
81
}
1,109,990✔
82

83
static SSdbRaw *mndSnodeActionEncode(SSnodeObj *pObj) {
1,147,890✔
84
  int32_t code = 0;
1,147,890✔
85
  int32_t lino = 0;
1,147,890✔
86
  terrno = TSDB_CODE_OUT_OF_MEMORY;
1,147,890✔
87

88
  SSdbRaw *pRaw = sdbAllocRaw(SDB_SNODE, SNODE_VER_NUMBER, sizeof(SSnodeObj) + SNODE_RESERVE_SIZE);
1,147,890✔
89
  if (pRaw == NULL) goto _OVER;
1,147,890!
90

91
  int32_t dataPos = 0;
1,147,890✔
92
  SDB_SET_INT32(pRaw, dataPos, pObj->id, _OVER)
1,147,890!
93
  SDB_SET_INT32(pRaw, dataPos, pObj->leadersId[0], _OVER)
1,147,890!
94
  SDB_SET_INT32(pRaw, dataPos, pObj->leadersId[1], _OVER)
1,147,890!
95
  SDB_SET_INT32(pRaw, dataPos, pObj->replicaId, _OVER)
1,147,890!
96
  SDB_SET_INT64(pRaw, dataPos, pObj->createdTime, _OVER)
1,147,890!
97
  SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, _OVER)
1,147,890!
98
  SDB_SET_RESERVE(pRaw, dataPos, SNODE_RESERVE_SIZE, _OVER)
1,147,890!
99

100
  terrno = 0;
1,147,890✔
101

102
_OVER:
1,147,890✔
103
  if (terrno != 0) {
1,147,890!
104
    mError("snode:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr());
×
105
    sdbFreeRaw(pRaw);
×
106
    return NULL;
×
107
  }
108

109
  mTrace("snode:%d, encode to raw:%p, row:%p", pObj->id, pRaw, pObj);
1,147,890!
110
  return pRaw;
1,147,890✔
111
}
112

113
static SSdbRow *mndSnodeActionDecode(SSdbRaw *pRaw) {
952,188✔
114
  int32_t code = 0;
952,188✔
115
  int32_t lino = 0;
952,188✔
116
  terrno = TSDB_CODE_OUT_OF_MEMORY;
952,188✔
117
  SSdbRow   *pRow = NULL;
952,188✔
118
  SSnodeObj *pObj = NULL;
952,188✔
119

120
  int8_t sver = 0;
952,188✔
121
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
952,188!
122

123
  if (sver != SNODE_VER_NUMBER) {
952,188!
124
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
125
    goto _OVER;
×
126
  }
127

128
  pRow = sdbAllocRow(sizeof(SSnodeObj));
952,188✔
129
  if (pRow == NULL) goto _OVER;
952,188!
130

131
  pObj = sdbGetRowObj(pRow);
952,188✔
132
  if (pObj == NULL) goto _OVER;
952,188!
133

134
  int32_t dataPos = 0;
952,188✔
135
  SDB_GET_INT32(pRaw, dataPos, &pObj->id, _OVER)
952,188!
136
  SDB_GET_INT32(pRaw, dataPos, &pObj->leadersId[0], _OVER)
952,188!
137
  SDB_GET_INT32(pRaw, dataPos, &pObj->leadersId[1], _OVER)
952,188!
138
  SDB_GET_INT32(pRaw, dataPos, &pObj->replicaId, _OVER)
952,188!
139
  SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, _OVER)
952,188!
140
  SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, _OVER)
952,188!
141
  SDB_GET_RESERVE(pRaw, dataPos, SNODE_RESERVE_SIZE, _OVER)
952,188!
142

143
  terrno = 0;
952,188✔
144

145
_OVER:
952,188✔
146
  if (terrno != 0) {
952,188!
147
    mError("snode:%d, failed to decode from raw:%p since %s", pObj == NULL ? 0 : pObj->id, pRaw, terrstr());
×
148
    taosMemoryFreeClear(pRow);
×
149
    return NULL;
×
150
  }
151

152
  mDebug("snode:%d, decode from raw:%p, row:%p, leaderId[0]:%d, leaderId[1]:%d, replicaId:%d", pObj->id, pRaw, pObj, pObj->leadersId[0], pObj->leadersId[1], pObj->replicaId);
952,188!
153
  return pRow;
952,188✔
154
}
155

156
static int32_t mndSnodeActionInsert(SSdb *pSdb, SSnodeObj *pObj) {
135,126✔
157
  mTrace("snode:%d, perform insert action, row:%p", pObj->id, pObj);
135,126!
158
  pObj->pDnode = sdbAcquire(pSdb, SDB_DNODE, &pObj->id);
135,126✔
159
  if (pObj->pDnode == NULL) {
135,126!
160
    terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
×
161
    mError("snode:%d, failed to perform insert action since %s", pObj->id, terrstr());
×
162
    return -1;
×
163
  }
164

165
  return 0;
135,126✔
166
}
167

168
static int32_t mndSnodeActionDelete(SSdb *pSdb, SSnodeObj *pObj) {
952,188✔
169
  mTrace("snode:%d, perform delete action, row:%p", pObj->id, pObj);
952,188!
170
  if (pObj->pDnode != NULL) {
952,188✔
171
    sdbRelease(pSdb, pObj->pDnode);
135,126✔
172
    pObj->pDnode = NULL;
135,126✔
173
  }
174

175
  return 0;
952,188✔
176
}
177

178
static int32_t mndSnodeActionUpdate(SSdb *pSdb, SSnodeObj *pOld, SSnodeObj *pNew) {
726,440✔
179
  mDebug("snode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
726,440!
180
  pOld->leadersId[0] = pNew->leadersId[0];
726,440✔
181
  pOld->leadersId[1] = pNew->leadersId[1];
726,440✔
182
  pOld->replicaId = pNew->replicaId;
726,440✔
183
  pOld->updateTime = pNew->updateTime;
726,440✔
184
  return 0;
726,440✔
185
}
186

187
static int32_t mndSetCreateSnodeRedoLogs(STrans *pTrans, SSnodeObj *pObj) {
129,504✔
188
  int32_t  code = 0;
129,504✔
189
  SSdbRaw *pRedoRaw = mndSnodeActionEncode(pObj);
129,504✔
190
  if (pRedoRaw == NULL) {
129,504!
191
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
192
    if (terrno != 0) code = terrno;
×
193
    TAOS_RETURN(code);
×
194
  }
195
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
129,504!
196
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
129,504!
197
  TAOS_RETURN(code);
129,504✔
198
}
199

200
static int32_t mndSetUpdateSnodeRedoLogs(STrans *pTrans, SSnodeObj *pObj) {
253,157✔
201
  int32_t  code = 0;
253,157✔
202
  SSdbRaw *pRedoRaw = mndSnodeActionEncode(pObj);
253,157✔
203
  if (pRedoRaw == NULL) {
253,157!
204
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
205
    if (terrno != 0) code = terrno;
×
206
    TAOS_RETURN(code);
×
207
  }
208
  TAOS_CHECK_RETURN(mndTransAppendGroupRedolog(pTrans, pRedoRaw, -1));
253,157!
209
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY));
253,157!
210
  TAOS_RETURN(code);
253,157✔
211
}
212

213

214
static int32_t mndSetCreateSnodeUndoLogs(STrans *pTrans, SSnodeObj *pObj) {
129,504✔
215
  int32_t  code = 0;
129,504✔
216
  SSdbRaw *pUndoRaw = mndSnodeActionEncode(pObj);
129,504✔
217
  if (pUndoRaw == NULL) {
129,504!
218
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
219
    if (terrno != 0) code = terrno;
×
220
    TAOS_RETURN(code);
×
221
  }
222
  TAOS_CHECK_RETURN(mndTransAppendUndolog(pTrans, pUndoRaw));
129,504!
223
  TAOS_CHECK_RETURN(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED));
129,504!
224
  TAOS_RETURN(code);
129,504✔
225
}
226

227
static int32_t mndSetCreateSnodeCommitLogs(STrans *pTrans, SSnodeObj *pObj) {
382,661✔
228
  int32_t  code = 0;
382,661✔
229
  SSdbRaw *pCommitRaw = mndSnodeActionEncode(pObj);
382,661✔
230
  if (pCommitRaw == NULL) {
382,661!
231
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
232
    if (terrno != 0) code = terrno;
×
233
    TAOS_RETURN(code);
×
234
  }
235
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
382,661!
236
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
382,661!
237
  TAOS_RETURN(code);
382,661✔
238
}
239

240
static int32_t mndSetCreateSnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SSnodeObj *pObj, SMnode *pMnode) {
129,504✔
241
  int32_t          code = 0;
129,504✔
242
  SDCreateSnodeReq createReq = {0};
129,504✔
243
  createReq.snodeId = pDnode->id;
129,504✔
244
  createReq.leaders[0].nodeId = pObj->leadersId[0];
129,504✔
245
  if (createReq.leaders[0].nodeId > 0) {
129,504✔
246
    createReq.leaders[0].epSet = mndGetDnodeEpsetById(pMnode, createReq.leaders[0].nodeId);
7,349✔
247
  }
248
  createReq.leaders[1].nodeId = pObj->leadersId[1];
129,504✔
249
  if (createReq.leaders[1].nodeId > 0) {
129,504!
250
    createReq.leaders[1].epSet = mndGetDnodeEpsetById(pMnode, createReq.leaders[1].nodeId);
×
251
  }
252
  createReq.replica.nodeId = pObj->replicaId;
129,504✔
253
  if (createReq.replica.nodeId > 0) {
129,504✔
254
    createReq.replica.epSet = mndGetDnodeEpsetById(pMnode, createReq.replica.nodeId);
93,705✔
255
  }
256

257
  int32_t contLen = tSerializeSDCreateSNodeReq(NULL, 0, &createReq);
129,504✔
258
  void   *pReq = taosMemoryMalloc(contLen);
129,504!
259
  if (pReq == NULL) {
129,504!
260
    code = terrno;
×
261
    TAOS_RETURN(code);
×
262
  }
263
  code = tSerializeSDCreateSNodeReq(pReq, contLen, &createReq);
129,504✔
264
  if (code < 0) {
129,504!
265
    mError("snode:%d, failed to serialize create drop snode request since %s", createReq.snodeId, terrstr());
×
266
  }
267

268
  STransAction action = {0};
129,504✔
269
  action.epSet = mndGetDnodeEpset(pDnode);
129,504✔
270
  action.pCont = pReq;
129,504✔
271
  action.contLen = contLen;
129,504✔
272
  action.msgType = TDMT_DND_CREATE_SNODE;
129,504✔
273
  action.acceptableCode = TSDB_CODE_SNODE_ALREADY_DEPLOYED;
129,504✔
274

275
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
129,504!
276
    taosMemoryFree(pReq);
×
277
    TAOS_RETURN(code);
×
278
  }
279

280
  TAOS_RETURN(code);
129,504✔
281
}
282

283
static int32_t mndSetUpdateSnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SSnodeObj *pObj, SMnode *pMnode) {
253,157✔
284
  int32_t          code = 0;
253,157✔
285
  SDCreateSnodeReq createReq = {0};
253,157✔
286
  createReq.snodeId = pDnode->id;
253,157✔
287
  createReq.leaders[0].nodeId = pObj->leadersId[0];
253,157✔
288
  if (createReq.leaders[0].nodeId > 0) {
253,157✔
289
    createReq.leaders[0].epSet = mndGetDnodeEpsetById(pMnode, createReq.leaders[0].nodeId);
226,185✔
290
  }
291
  createReq.leaders[1].nodeId = pObj->leadersId[1];
253,157✔
292
  if (createReq.leaders[1].nodeId > 0) {
253,157✔
293
    createReq.leaders[1].epSet = mndGetDnodeEpsetById(pMnode, createReq.leaders[1].nodeId);
46,068✔
294
  }
295
  createReq.replica.nodeId = pObj->replicaId;
253,157✔
296
  if (createReq.replica.nodeId > 0) {
253,157✔
297
    createReq.replica.epSet = mndGetDnodeEpsetById(pMnode, createReq.replica.nodeId);
248,543✔
298
  }
299

300
  int32_t contLen = tSerializeSDCreateSNodeReq(NULL, 0, &createReq);
253,157✔
301
  void   *pReq = taosMemoryMalloc(contLen);
253,157!
302
  if (pReq == NULL) {
253,157!
303
    code = terrno;
×
304
    TAOS_RETURN(code);
×
305
  }
306
  code = tSerializeSDCreateSNodeReq(pReq, contLen, &createReq);
253,157✔
307
  if (code < 0) {
253,157!
308
    mError("snode:%d, failed to serialize create drop snode request since %s", createReq.snodeId, terrstr());
×
309
  }
310

311
  STransAction action = {0};
253,157✔
312
  action.epSet = mndGetDnodeEpset(pDnode);
253,157✔
313
  action.pCont = pReq;
253,157✔
314
  action.contLen = contLen;
253,157✔
315
  action.msgType = TDMT_DND_ALTER_SNODE;
253,157✔
316
  action.acceptableCode = 0;
253,157✔
317
  action.groupId = -1;
253,157✔
318

319
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
253,157!
320
    taosMemoryFree(pReq);
×
321
    TAOS_RETURN(code);
×
322
  }
323

324
  TAOS_RETURN(code);
253,157✔
325
}
326

327

328
static int32_t mndSetCreateSnodeUndoActions(STrans *pTrans, SDnodeObj *pDnode, SSnodeObj *pObj) {
129,504✔
329
  int32_t        code = 0;
129,504✔
330
  SDDropSnodeReq dropReq = {0};
129,504✔
331
  dropReq.dnodeId = pDnode->id;
129,504✔
332

333
  int32_t contLen = tSerializeSCreateDropMQSNodeReq(NULL, 0, &dropReq);
129,504✔
334
  void   *pReq = taosMemoryMalloc(contLen);
129,504!
335
  if (pReq == NULL) {
129,504!
336
    code = terrno;
×
337
    TAOS_RETURN(code);
×
338
  }
339
  code = tSerializeSCreateDropMQSNodeReq(pReq, contLen, &dropReq);
129,504✔
340
  if (code < 0) {
129,504!
341
    mError("snode:%d, failed to serialize create drop snode request since %s", dropReq.dnodeId, terrstr());
×
342
  }
343

344
  STransAction action = {0};
129,504✔
345
  action.epSet = mndGetDnodeEpset(pDnode);
129,504✔
346
  action.pCont = pReq;
129,504✔
347
  action.contLen = contLen;
129,504✔
348
  action.msgType = TDMT_DND_DROP_SNODE;
129,504✔
349
  action.acceptableCode = TSDB_CODE_SNODE_NOT_DEPLOYED;
129,504✔
350

351
  if ((code = mndTransAppendUndoAction(pTrans, &action)) != 0) {
129,504!
352
    taosMemoryFree(pReq);
×
353
    TAOS_RETURN(code);
×
354
  }
355

356
  TAOS_RETURN(code);
129,504✔
357
}
358

359
static int32_t mndCreateSnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMCreateSnodeReq *pCreate, int64_t currTs) {
35,799✔
360
  int32_t code = -1;
35,799✔
361

362
  SSnodeObj snodeObj = {0};
35,799✔
363
  snodeObj.id = pDnode->id;
35,799✔
364
  snodeObj.createdTime = currTs;
35,799✔
365
  snodeObj.updateTime = snodeObj.createdTime;
35,799✔
366

367
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-snode");
35,799✔
368
  if (pTrans == NULL) {
35,799!
369
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
370
    if (terrno != 0) code = terrno;
×
371
    goto _OVER;
×
372
  }
373
  mndTransSetSerial(pTrans);
35,799✔
374

375
  mInfo("trans:%d, used to create snode:%d", pTrans->id, pCreate->dnodeId);
35,799!
376

377
  TAOS_CHECK_GOTO(mndSetCreateSnodeRedoLogs(pTrans, &snodeObj), NULL, _OVER);
35,799!
378
  TAOS_CHECK_GOTO(mndSetCreateSnodeUndoLogs(pTrans, &snodeObj), NULL, _OVER);
35,799!
379
  TAOS_CHECK_GOTO(mndSetCreateSnodeCommitLogs(pTrans, &snodeObj), NULL, _OVER);
35,799!
380
  TAOS_CHECK_GOTO(mndSetCreateSnodeRedoActions(pTrans, pDnode, &snodeObj, pMnode), NULL, _OVER);
35,799!
381
  TAOS_CHECK_GOTO(mndSetCreateSnodeUndoActions(pTrans, pDnode, &snodeObj), NULL, _OVER);
35,799!
382
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
35,799!
383

384
  code = 0;
35,799✔
385

386
_OVER:
35,799✔
387
  mndTransDrop(pTrans);
35,799✔
388
  TAOS_RETURN(code);
35,799✔
389
}
390

391
static bool mndSnodeTraverseForCreate(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
381,905✔
392
  SSnodeObj* pSnode = pObj;
381,905✔
393
  SSnodeObj* pReplica = (SSnodeObj*)p1;
381,905✔
394
  SSnodeObj* pLeader = (SSnodeObj*)p2;
381,905✔
395
  bool* noLeaderGot = (bool*)p3;
381,905✔
396

397
  if (0 == pSnode->leadersId[0]) {
381,905✔
398
    *pReplica = *pSnode;
77,589✔
399
    if (pLeader->id > 0) {
77,589!
400
      return false;
×
401
    }
402

403
    *noLeaderGot = true;
77,589✔
404
  } else if (0 == pReplica->id && 0 == pSnode->leadersId[1]) {
304,316✔
405
    *pReplica = *pSnode;
60,466✔
406
  }
407

408
  if (0 == pSnode->replicaId && 0 == pLeader->id) {
381,905!
409
    *pLeader = *pSnode;
7,349✔
410
    if (*noLeaderGot) {
7,349!
411
      return false;
7,349✔
412
    }
413
  }
414

415
  return true;
374,556✔
416
}
417

418

419
void mndSnodeGetReplicaSnode(SMnode *pMnode, SMCreateSnodeReq *pCreate, SSnodeObj *pReplica, SSnodeObj* pLeader) {
93,705✔
420
  bool noLeaderGot = false;
93,705✔
421
  sdbTraverse(pMnode->pSdb, SDB_SNODE, mndSnodeTraverseForCreate, pReplica, pLeader, &noLeaderGot);
93,705✔
422
}
93,705✔
423

424
static int32_t mndSnodeAppendUpdateToTrans(SMnode *pMnode, SSnodeObj* pObj, STrans *pTrans) {
253,157✔
425
  int32_t code = TSDB_CODE_SUCCESS;
253,157✔
426
  int32_t lino = 0;
253,157✔
427
  SDnodeObj *pDnode2 = mndAcquireDnode(pMnode, pObj->id);
253,157✔
428
  if (pDnode2 == NULL) {
253,157!
429
    code = TSDB_CODE_MND_DNODE_NOT_EXIST;
×
430
    goto _exit;
×
431
  }
432
  
433
  SSnodeObj snodeObj2 = {0};
253,157✔
434
  snodeObj2.id = pObj->id;
253,157✔
435
  snodeObj2.leadersId[0] = pObj->leadersId[0];
253,157✔
436
  snodeObj2.leadersId[1] = pObj->leadersId[1];
253,157✔
437
  snodeObj2.replicaId = pObj->replicaId;
253,157✔
438
  snodeObj2.createdTime = pObj->createdTime;
253,157✔
439
  snodeObj2.updateTime = taosGetTimestampMs();
253,157✔
440
  
441
  mInfo("trans:%d, used to update snode:%d", pTrans->id, snodeObj2.id);
253,157!
442
  
443
  TAOS_CHECK_EXIT(mndSetUpdateSnodeRedoLogs(pTrans, &snodeObj2));
253,157!
444
  //TAOS_CHECK_EXIT(mndSetCreateSnodeUndoLogs(pTrans, &snodeObj2));
445
  TAOS_CHECK_EXIT(mndSetCreateSnodeCommitLogs(pTrans, &snodeObj2));
253,157!
446
  TAOS_CHECK_EXIT(mndSetUpdateSnodeRedoActions(pTrans, pDnode2, &snodeObj2, pMnode));
253,157!
447
  //TAOS_CHECK_EXIT(mndSetCreateSnodeUndoActions(pTrans, pDnode2, &snodeObj2));
448

449
_exit:
253,157✔
450

451
  mndReleaseDnode(pMnode, pDnode2);
253,157✔
452

453
  if (code) {
253,157!
454
    mError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
455
  }
456

457
  return code;  
253,157✔
458
}
459

460
static int32_t mndCreateSnodeWithReplicaId(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMCreateSnodeReq *pCreate, int64_t currTs) {
93,705✔
461
  SSnodeObj replicaSnode = {0};
93,705✔
462
  SSnodeObj leaderSnode = {0};
93,705✔
463
  int32_t code = TSDB_CODE_SUCCESS;
93,705✔
464
  int32_t lino = 0;
93,705✔
465

466
  mndSnodeGetReplicaSnode(pMnode, pCreate, &replicaSnode, &leaderSnode);
93,705✔
467

468
  SSnodeObj snodeObj = {0};
93,705✔
469
  snodeObj.id = pDnode->id;
93,705✔
470
  if (leaderSnode.id > 0) {
93,705✔
471
    snodeObj.leadersId[0] = leaderSnode.id;
7,349✔
472
  }
473
  snodeObj.replicaId = replicaSnode.id;
93,705✔
474
  snodeObj.createdTime = currTs;
93,705✔
475
  snodeObj.updateTime = snodeObj.createdTime;
93,705✔
476

477
  if (leaderSnode.id > 0) {
93,705✔
478
    leaderSnode.replicaId = snodeObj.id;
7,349✔
479
  }
480

481
  if (replicaSnode.id > 0) {
93,705!
482
    SSnodeObj* pTarget = &replicaSnode;
93,705✔
483
    if (replicaSnode.id == leaderSnode.id) {
93,705✔
484
      pTarget = &leaderSnode;
7,349✔
485
    }
486
    if (pTarget->leadersId[0] > 0) {
93,705✔
487
      pTarget->leadersId[1] = snodeObj.id;
19,530✔
488
    } else {
489
      pTarget->leadersId[0] = snodeObj.id;
74,175✔
490
    }
491
  }
492

493
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-snode");
93,705✔
494
  if (pTrans == NULL) {
93,705!
495
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
496
    if (terrno != 0) code = terrno;
×
497
    goto _exit;
×
498
  }
499
  
500
  mndTransSetSerial(pTrans);
93,705✔
501

502
  mInfo("trans:%d, used to create snode:%d", pTrans->id, pCreate->dnodeId);
93,705!
503

504
  TAOS_CHECK_EXIT(mndSetCreateSnodeRedoLogs(pTrans, &snodeObj));
93,705!
505
  TAOS_CHECK_EXIT(mndSetCreateSnodeUndoLogs(pTrans, &snodeObj));
93,705!
506
  TAOS_CHECK_EXIT(mndSetCreateSnodeCommitLogs(pTrans, &snodeObj));
93,705!
507
  TAOS_CHECK_EXIT(mndSetCreateSnodeRedoActions(pTrans, pDnode, &snodeObj, pMnode));
93,705!
508
  TAOS_CHECK_EXIT(mndSetCreateSnodeUndoActions(pTrans, pDnode, &snodeObj));
93,705!
509

510
  if (leaderSnode.id > 0) {
93,705✔
511
    TAOS_CHECK_EXIT(mndSnodeAppendUpdateToTrans(pMnode, &leaderSnode, pTrans));
7,349!
512
  }
513

514
  if (replicaSnode.id > 0 && replicaSnode.id != leaderSnode.id) {
93,705!
515
    TAOS_CHECK_EXIT(mndSnodeAppendUpdateToTrans(pMnode, &replicaSnode, pTrans));
86,356!
516
  }
517

518
  TAOS_CHECK_EXIT(mndTransPrepare(pMnode, pTrans));
93,705!
519

520
_exit:
93,705✔
521

522
  mndTransDrop(pTrans);
93,705✔
523

524
  if (code) {
93,705!
525
    mError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
526
  }
527
  
528
  TAOS_RETURN(code);
93,705✔
529
}
530

531

532
static int32_t mndProcessCreateSnodeReq(SRpcMsg *pReq) {
157,427✔
533
  SMnode          *pMnode = pReq->info.node;
157,427✔
534
  int32_t          code = -1;
157,427✔
535
  SSnodeObj       *pObj = NULL;
157,427✔
536
  SDnodeObj       *pDnode = NULL;
157,427✔
537
  SMCreateSnodeReq createReq = {0};
157,427✔
538

539
  TAOS_CHECK_GOTO(tDeserializeSCreateDropMQSNodeReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);
157,427!
540

541
  mInfo("snode:%d, start to create", createReq.dnodeId);
157,427!
542
  
543
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_SNODE), NULL, _OVER);
157,427✔
544

545
  pObj = mndAcquireSnode(pMnode, createReq.dnodeId);
151,277✔
546
  if (pObj != NULL) {
151,277✔
547
    code = TSDB_CODE_MND_SNODE_ALREADY_EXIST;
21,773✔
548
    mndReleaseSnode(pMnode, pObj);
21,773✔
549
    goto _OVER;
21,773✔
550
  } else if (terrno != TSDB_CODE_MND_SNODE_NOT_EXIST) {
129,504!
551
    goto _OVER;
×
552
  }
553

554
  pDnode = mndAcquireDnode(pMnode, createReq.dnodeId);
129,504✔
555
  if (pDnode == NULL) {
129,504!
556
    code = TSDB_CODE_MND_DNODE_NOT_EXIST;
×
557
    goto _OVER;
×
558
  }
559

560
  int64_t currTs = taosGetTimestampMs();
129,504✔
561
  int32_t snodeNum = sdbGetSize(pMnode->pSdb, SDB_SNODE);
129,504✔
562
  if (snodeNum > 0) {
129,504✔
563
    code = mndCreateSnodeWithReplicaId(pMnode, pReq, pDnode, &createReq, currTs);
93,705✔
564
  } else {
565
    code = mndCreateSnode(pMnode, pReq, pDnode, &createReq, currTs);
35,799✔
566
  }
567
  
568
  if (code == 0) {
129,504!
569
    code = TSDB_CODE_ACTION_IN_PROGRESS;
129,504✔
570
    
571
    MND_STREAM_SET_LAST_TS(STM_EVENT_CREATE_SNODE, currTs);
129,504!
572
  }
573

574
_OVER:
157,325✔
575

576
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
157,427!
577
    mError("snode:%d, failed to create since %s", createReq.dnodeId, tstrerror(code));
27,923!
578
  }
579

580
  mndReleaseDnode(pMnode, pDnode);
157,427✔
581
  tFreeSMCreateQnodeReq(&createReq);
157,427✔
582
  TAOS_RETURN(code);
157,427✔
583
}
584

585
static int32_t mndSetDropSnodeRedoLogs(STrans *pTrans, SSnodeObj *pObj) {
90,622✔
586
  int32_t  code = 0;
90,622✔
587
  SSdbRaw *pRedoRaw = mndSnodeActionEncode(pObj);
90,622✔
588
  if (pRedoRaw == NULL) {
90,622!
589
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
590
    if (terrno != 0) code = terrno;
×
591
    TAOS_RETURN(code);
×
592
  }
593
  TAOS_CHECK_RETURN(mndTransAppendGroupRedolog(pTrans, pRedoRaw, -1));
90,622!
594
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
90,622!
595
  TAOS_RETURN(code);
90,622✔
596
}
597

598
static int32_t mndSetDropSnodeCommitLogs(STrans *pTrans, SSnodeObj *pObj) {
90,622✔
599
  int32_t  code = 0;
90,622✔
600
  SSdbRaw *pCommitRaw = mndSnodeActionEncode(pObj);
90,622✔
601
  if (pCommitRaw == NULL) {
90,622!
602
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
603
    if (terrno != 0) code = terrno;
×
604
    TAOS_RETURN(code);
×
605
  }
606
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
90,622!
607
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
90,622!
608
  TAOS_RETURN(code);
90,622✔
609
}
610

611
static int32_t mndSetDropSnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SSnodeObj *pObj) {
90,622✔
612
  int32_t        code = 0;
90,622✔
613
  SDDropSnodeReq dropReq = {0};
90,622✔
614
  dropReq.dnodeId = pDnode->id;
90,622✔
615

616
  int32_t contLen = tSerializeSCreateDropMQSNodeReq(NULL, 0, &dropReq);
90,622✔
617
  void   *pReq = taosMemoryMalloc(contLen);
90,622!
618
  if (pReq == NULL) {
90,622!
619
    code = terrno;
×
620
    TAOS_RETURN(code);
×
621
  }
622
  code = tSerializeSCreateDropMQSNodeReq(pReq, contLen, &dropReq);
90,622✔
623
  if (code < 0) {
90,622!
624
    mError("snode:%d, failed to serialize create drop snode request since %s", dropReq.dnodeId, terrstr());
×
625
  }
626

627
  STransAction action = {0};
90,622✔
628
  action.epSet = mndGetDnodeEpset(pDnode);
90,622✔
629
  action.pCont = pReq;
90,622✔
630
  action.contLen = contLen;
90,622✔
631
  action.msgType = TDMT_DND_DROP_SNODE;
90,622✔
632
  action.acceptableCode = TSDB_CODE_SNODE_NOT_DEPLOYED;
90,622✔
633
  action.groupId = -1;
90,622✔
634

635
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
90,622!
636
    taosMemoryFree(pReq);
×
637
    TAOS_RETURN(code);
×
638
  }
639

640
  TAOS_RETURN(code);
90,622✔
641
}
642

643
int32_t mndSetDropSnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SSnodeObj *pObj, bool force) {
90,622✔
644
  if (pObj == NULL) return 0;
90,622!
645
  TAOS_CHECK_RETURN(mndSetDropSnodeRedoLogs(pTrans, pObj));
90,622!
646
  TAOS_CHECK_RETURN(mndSetDropSnodeCommitLogs(pTrans, pObj));
90,622!
647
  if (!force) {
90,622!
648
    TAOS_CHECK_RETURN(mndSetDropSnodeRedoActions(pTrans, pObj->pDnode, pObj));
90,622!
649
  }
650
  return 0;
90,622✔
651
}
652

653
static bool mndSnodeTraverseForDropAff(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
×
654
  SSnodeObj* pSnode = pObj;
×
655
  SSnodeDropTraversaCtx* ctx = (SSnodeDropTraversaCtx*)p1;
×
656

657
  if (pSnode->replicaId == ctx->target->id) {
×
658
    if (ctx->affNum <= 1) {
×
659
      ctx->affSnode[ctx->affNum++] = *pSnode;
×
660
    } else {
661
      mError("%d snodes with same replicaId:%d, %d, %d, %d", ctx->affNum + 1, ctx->target->id, 
×
662
          ctx->affSnode[0].id, ctx->affSnode[1].id, pSnode->id);
663
    }
664
  }
665

666
  return true;
×
667
}
668

669
static bool mndSnodeTraverseForNewReplica(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
43,214✔
670
  SSnodeObj* pSnode = pObj;
43,214✔
671
  SSnodeObj* pSnode1 = (SSnodeObj*)p1;
43,214✔
672
  SSnodeObj* pSnode2 = (SSnodeObj*)p2;
43,214✔
673

674
  if (pSnode->id == pSnode1->id || pSnode->id == pSnode2->id) {
43,214✔
675
    return true;
20,193✔
676
  }
677

678
  int32_t n = pSnode->leadersId[0] ? (pSnode->leadersId[1] ? 2 : 1) : 0;
23,021!
679
  if (0 == n) {
23,021✔
680
    *(SSnodeObj*)p3 = *pSnode;
11,487✔
681
    return false;
11,487✔
682
  } else if (1 == n && 0 == ((SSnodeObj*)p3)->id) {
11,534!
683
    *(SSnodeObj*)p3 = *pSnode;
5,752✔
684
  }
685

686
  return true;
11,534✔
687
}
688

689

690
static int32_t mndSnodeCheckDropReplica(SMnode *pMnode, SSnodeObj* pObj, SSnodeDropTraversaCtx* pCtx) {
×
691
  int32_t snodeNum = sdbGetSize(pMnode->pSdb, SDB_SNODE);
×
692
  if (snodeNum <= 1) {
×
693
    mWarn("no need to get drop snode replica, replicaId:%d, remainSnode:%d", pObj->replicaId, snodeNum);
×
694
    return TSDB_CODE_SUCCESS;
×
695
  }
696

697
  sdbTraverse(pMnode->pSdb, SDB_SNODE, mndSnodeTraverseForDropAff, pCtx, NULL, NULL);
×
698

699
  if (0 == pCtx->affNum) {
×
700
    mDebug("no affected snode for dropping snode %d", pObj->id);
×
701
    return TSDB_CODE_SUCCESS;
×
702
  }
703

704
  if (2 == pCtx->affNum) {
×
705
    sdbTraverse(pMnode->pSdb, SDB_SNODE, mndSnodeTraverseForNewReplica, pCtx->target, &pCtx->affSnode[0], &pCtx->affNewReplica[0]);
×
706
    sdbTraverse(pMnode->pSdb, SDB_SNODE, mndSnodeTraverseForNewReplica, pCtx->target, &pCtx->affSnode[1], &pCtx->affNewReplica[1]);
×
707

708
    mDebug("two affected snodes for dropping snode %d, first:%d newRId:%d, sencod:%d newRId:%d", 
×
709
        pObj->id, pCtx->affSnode[0].id, pCtx->affNewReplica[0].id, pCtx->affSnode[1].id, pCtx->affNewReplica[1].id);
710

711
    return TSDB_CODE_SUCCESS;
×
712
  }
713

714
  SSnodeObj* replicaSnode = mndAcquireSnode(pMnode, pObj->replicaId);
×
715
  pCtx->affNewReplica[0] = *replicaSnode;
×
716
  mndReleaseSnode(pMnode, replicaSnode);
×
717
  
718
  mDebug("one affected snodes for dropping snode %d, first:%d newRId:%d", pObj->id, pCtx->affSnode[0].id, pCtx->affNewReplica[0].id);
×
719

720
  return TSDB_CODE_SUCCESS;
×
721
}
722

723
static void mndSnodeRemoveLeaderId(SSnodeObj *pObj, int32_t leaderId) {
271,866✔
724
  if (NULL == pObj || 0 == pObj->id) {
271,866!
725
    return;
123,901✔
726
  }
727
  
728
  if (pObj->leadersId[1] == leaderId) {
147,965✔
729
    pObj->leadersId[1] = 0;
12,058✔
730
    return;
12,058✔
731
  }
732

733
  if (pObj->leadersId[0] == leaderId) {
135,907✔
734
    if (pObj->leadersId[1] > 0) {
76,527✔
735
      pObj->leadersId[0] = pObj->leadersId[1];
17,864✔
736
      pObj->leadersId[1] = 0;
17,864✔
737
      return;
17,864✔
738
    }
739

740
    pObj->leadersId[0] = 0;
58,663✔
741
  }
742
}
743

744
static void mndSnodeAddLeaderId(SSnodeObj *pObj, int32_t leaderId) {
85,875✔
745
  if (NULL == pObj || 0 == pObj->id || 0 == leaderId) {
85,875!
746
    return;
4,614✔
747
  }
748
  
749
  if (0 == pObj->leadersId[0]) {
81,261✔
750
    pObj->leadersId[0] = leaderId;
57,029✔
751
    return;
57,029✔
752
  }
753

754
  if (0 == pObj->leadersId[1]) {
24,232!
755
    pObj->leadersId[1] = leaderId;
24,232✔
756
    return;
24,232✔
757
  }
758

759
  mError("snode %d can't add new leaderId %d since already has two leaders:%d, %d", pObj->id, leaderId, pObj->leadersId[0], pObj->leadersId[1]);
×
760
}
761

762
int32_t mndDropSnodeImpl(SMnode *pMnode, SRpcMsg *pReq, SSnodeObj *pObj, STrans *pTrans, bool force) {
90,622✔
763
  int32_t code = TSDB_CODE_SUCCESS;
90,622✔
764
  int32_t lino = 0;
90,622✔
765
  SArray* streamList = NULL;
90,622✔
766
  SSnodeObj asReplicaOf[2] = {0};
90,622✔
767
  SSnodeObj replicaSnode = {0};
90,622✔
768
  SSnodeObj* pTmp = NULL;
90,622✔
769

770
  if (pObj->leadersId[0] > 0) {
90,622✔
771
    pTmp = mndAcquireSnode(pMnode, pObj->leadersId[0]);
73,774✔
772
    TSDB_CHECK_NULL(pTmp, code, lino, _exit, terrno);
73,774!
773
    asReplicaOf[0] = *pTmp;
73,774✔
774
    mndReleaseSnode(pMnode, pTmp);
73,774✔
775
  }  
776

777
  if (pObj->leadersId[1] > 0) {
90,622✔
778
    pTmp = mndAcquireSnode(pMnode, pObj->leadersId[1]);
12,101✔
779
    TSDB_CHECK_NULL(pTmp, code, lino, _exit, terrno);
12,101!
780
    asReplicaOf[1] = *pTmp;
12,101✔
781
    mndReleaseSnode(pMnode, pTmp);
12,101✔
782
  }
783

784
  if (pObj->replicaId > 0 && pObj->replicaId != pObj->leadersId[0] && pObj->replicaId != pObj->leadersId[1]) {
90,622✔
785
    pTmp = mndAcquireSnode(pMnode, pObj->replicaId);
62,090✔
786
    TSDB_CHECK_NULL(pTmp, code, lino, _exit, terrno);
62,090!
787
    replicaSnode = *pTmp;
62,090✔
788
    mndReleaseSnode(pMnode, pTmp);
62,090✔
789
  }
790
  
791
  TAOS_CHECK_EXIT(msmCheckSnodeReassign(pMnode, pObj, &streamList));
90,622!
792

793
  int32_t streamNum = taosArrayGetSize(streamList);
90,622✔
794
  if (streamNum > 0) {
90,622✔
795
    for (int32_t i = 0; i < streamNum; ++i) {
5,971✔
796
      SStreamObj* pStream = taosArrayGetP(streamList, i);
3,811✔
797
      atomic_store_32(&pStream->mainSnodeId, pObj->replicaId);
3,811✔
798
      TAOS_CHECK_EXIT(mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY));
3,811!
799
    }
800
  }
801

802
  mndSnodeRemoveLeaderId(&asReplicaOf[0], pObj->id);
90,622✔
803
  mndSnodeRemoveLeaderId(&asReplicaOf[1], pObj->id);
90,622✔
804
  mndSnodeRemoveLeaderId(&replicaSnode, pObj->id);
90,622✔
805

806
  if (asReplicaOf[0].id > 0) {
90,622✔
807
    if (asReplicaOf[1].id > 0) {
73,774✔
808
      asReplicaOf[0].replicaId = asReplicaOf[1].id;
12,101✔
809
      asReplicaOf[1].replicaId = asReplicaOf[0].id;
12,101✔
810
      mndSnodeAddLeaderId(&asReplicaOf[0], asReplicaOf[1].id);
12,101✔
811
      mndSnodeAddLeaderId(&asReplicaOf[1], asReplicaOf[0].id);
12,101✔
812
    } else {
813
      if (0 == replicaSnode.id) {
61,673✔
814
        sdbTraverse(pMnode->pSdb, SDB_SNODE, mndSnodeTraverseForNewReplica, pObj, &asReplicaOf[0], &replicaSnode);
16,101✔
815
      }
816
      
817
      asReplicaOf[0].replicaId = replicaSnode.id;
61,673✔
818
      mndSnodeAddLeaderId(&replicaSnode, asReplicaOf[0].id);
61,673✔
819
    }
820
  }
821

822
  if (asReplicaOf[0].id > 0) {
90,622✔
823
    mInfo("trans:%d, used to update snode:%d", pTrans->id, asReplicaOf[0].id);
73,774!
824
    TAOS_CHECK_EXIT(mndSnodeAppendUpdateToTrans(pMnode, &asReplicaOf[0], pTrans));
73,774!
825
  }
826
  if (asReplicaOf[1].id > 0) {
90,622✔
827
    mInfo("trans:%d, used to update snode:%d", pTrans->id, asReplicaOf[1].id);
12,101!
828
    TAOS_CHECK_EXIT(mndSnodeAppendUpdateToTrans(pMnode, &asReplicaOf[1], pTrans));
12,101!
829
  }
830
  if (replicaSnode.id > 0) {
90,622✔
831
    mInfo("trans:%d, used to update snode:%d", pTrans->id, replicaSnode.id);
73,577!
832
    TAOS_CHECK_EXIT(mndSnodeAppendUpdateToTrans(pMnode, &replicaSnode, pTrans));
73,577!
833
  }
834

835
  mInfo("trans:%d, used to drop snode:%d", pTrans->id, pObj->id);
90,622!
836
  TAOS_CHECK_GOTO(mndSetDropSnodeInfoToTrans(pMnode, pTrans, pObj, force), NULL, _exit);
90,622!
837
  
838
_exit:
90,622✔
839

840
  taosArrayDestroy(streamList);
90,622✔
841

842
  if (code) {
90,622!
UNCOV
843
    mError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
844
  }
845
  
846
  TAOS_RETURN(code);
90,622✔
847
}
848

849

850
static int32_t mndDropSnode(SMnode *pMnode, SRpcMsg *pReq, SSnodeObj *pObj) {
89,050✔
851
  int32_t code = TSDB_CODE_SUCCESS;
89,050✔
852
  int32_t lino = 0;
89,050✔
853
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "drop-snode");
89,050✔
854
  if (pTrans == NULL) {
89,050!
855
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
856
    if (terrno != 0) code = terrno;
×
857
    goto _exit;
×
858
  }
859
  mndTransSetSerial(pTrans);
89,050✔
860

861
  TAOS_CHECK_GOTO(mndDropSnodeImpl(pMnode, pReq, pObj, pTrans, false), &lino, _exit);
89,050!
862
  
863
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), &lino, _exit);
89,050!
864

865
_exit:
89,050✔
866

867
  mndTransDrop(pTrans);
89,050✔
868

869
  if (code) {
89,050!
UNCOV
870
    mError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
871
  }
872
  
873
  TAOS_RETURN(code);
89,050✔
874
}
875

876
static int32_t mndProcessDropSnodeReq(SRpcMsg *pReq) {
130,749✔
877
  SMnode        *pMnode = pReq->info.node;
130,749✔
878
  int32_t        code = -1;
130,749✔
879
  SSnodeObj     *pObj = NULL;
130,749✔
880
  SMDropSnodeReq dropReq = {0};
130,749✔
881

882
  TAOS_CHECK_GOTO(tDeserializeSCreateDropMQSNodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
130,749!
883

884
  mInfo("snode:%d, start to drop", dropReq.dnodeId);
130,749!
885
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_SNODE), NULL, _OVER);
130,749✔
886

887
  if (dropReq.dnodeId <= 0) {
124,599!
888
    code = TSDB_CODE_INVALID_MSG;
×
889
    goto _OVER;
×
890
  }
891

892
  pObj = mndAcquireSnode(pMnode, dropReq.dnodeId);
124,599✔
893
  if (pObj == NULL) {
124,599✔
894
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
35,549✔
895
    if (terrno != 0) code = terrno;
35,549!
896
    goto _OVER;
35,549✔
897
  }
898

899
  // check deletable
900
  code = mndDropSnode(pMnode, pReq, pObj);
89,050✔
901
  if (code == 0) {
89,050!
902
    code = TSDB_CODE_ACTION_IN_PROGRESS;
89,050✔
903

904
    MND_STREAM_SET_LAST_TS(STM_EVENT_DROP_SNODE, taosGetTimestampMs());
178,100!
905
  }
906

907
_OVER:
130,749✔
908
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
130,749!
909
    mError("snode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
41,699!
910
  }
911

912
  mndReleaseSnode(pMnode, pObj);
130,749✔
913
  tFreeSMCreateQnodeReq(&dropReq);
130,749✔
914
  TAOS_RETURN(code);
130,749✔
915
}
916

917
static int32_t mndRetrieveSnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
222,392✔
918
  SMnode    *pMnode = pReq->info.node;
222,392✔
919
  SSdb      *pSdb = pMnode->pSdb;
222,392✔
920
  int32_t    numOfRows = 0;
222,392✔
921
  int32_t    cols = 0;
222,392✔
922
  SSnodeObj *pObj = NULL;
222,392✔
923

924
  while (numOfRows < rows) {
1,087,686!
925
    pShow->pIter = sdbFetch(pSdb, SDB_SNODE, pShow->pIter, (void **)&pObj);
1,087,686✔
926
    if (pShow->pIter == NULL) break;
1,087,686✔
927

928
    cols = 0;
865,294✔
929
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
865,294✔
930
    TAOS_CHECK_RETURN_WITH_RELEASE(colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->id, false), pSdb, pObj);
865,294!
931

932
    char ep[TSDB_EP_LEN + VARSTR_HEADER_SIZE] = {0};
865,294✔
933
    STR_WITH_MAXSIZE_TO_VARSTR(ep, pObj->pDnode->ep, pShow->pMeta->pSchemas[cols].bytes);
865,294!
934

935
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
865,294✔
936
    TAOS_CHECK_RETURN_WITH_RELEASE(colDataSetVal(pColInfo, numOfRows, (const char *)ep, false), pSdb, pObj);
865,294!
937

938
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
865,294✔
939
    TAOS_CHECK_RETURN_WITH_RELEASE(colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->createdTime, false), pSdb,
865,294!
940
                                   pObj);
941

942
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
865,294✔
943
    TAOS_CHECK_RETURN_WITH_RELEASE(colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->replicaId, false), pSdb,
865,294!
944
                                   pObj);
945

946
    char asReplicaOf[64 + VARSTR_HEADER_SIZE] = {0};
865,294✔
947
    if (pObj->leadersId[0] && pObj->leadersId[1]) {
865,294✔
948
      snprintf(varDataVal(asReplicaOf), sizeof(asReplicaOf) - VARSTR_HEADER_SIZE, "%d, %d", pObj->leadersId[0], pObj->leadersId[1]);
151,923✔
949
    } else if (pObj->leadersId[0] || pObj->leadersId[1]) {
713,371!
950
      snprintf(varDataVal(asReplicaOf), sizeof(asReplicaOf) - VARSTR_HEADER_SIZE, "%d", pObj->leadersId[0] ? pObj->leadersId[0] : pObj->leadersId[1]);
515,089!
951
    } else {
952
      strcpy(varDataVal(asReplicaOf), "None");
198,282!
953
    }
954
    varDataSetLen(asReplicaOf, strlen(varDataVal(asReplicaOf)));
865,294!
955
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
865,294✔
956
    TAOS_CHECK_RETURN_WITH_RELEASE(colDataSetVal(pColInfo, numOfRows, (const char *)asReplicaOf, false), pSdb,
865,294!
957
                                  pObj);
958

959
    numOfRows++;
865,294✔
960
    sdbRelease(pSdb, pObj);
865,294✔
961
  }
962

963
  pShow->numOfRows += numOfRows;
222,392✔
964

965
  return numOfRows;
222,392✔
966
}
967

968
static void mndCancelGetNextSnode(SMnode *pMnode, void *pIter) {
×
969
  SSdb *pSdb = pMnode->pSdb;
×
970
  sdbCancelFetchByType(pSdb, pIter, SDB_SNODE);
×
971
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc