• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4887

16 Dec 2025 08:27AM UTC coverage: 65.289% (-0.003%) from 65.292%
#4887

push

travis-ci

web-flow
feat[TS-7233]: audit (#33850)

377 of 536 new or added lines in 28 files covered. (70.34%)

1025 existing lines in 111 files now uncovered.

178977 of 274129 relevant lines covered (65.29%)

102580217.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.34
/source/dnode/mnode/impl/src/mndMnode.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "mndCluster.h"
19
#include "mndDnode.h"
20
#include "mndMnode.h"
21
#include "mndPrivilege.h"
22
#include "mndShow.h"
23
#include "mndSync.h"
24
#include "mndTrans.h"
25
#include "tmisce.h"
26

27
#define MNODE_VER_NUMBER   2
28
#define MNODE_RESERVE_SIZE 64
29

30
static int32_t  mndCreateDefaultMnode(SMnode *pMnode);
31
static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pObj);
32
static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw);
33
static int32_t  mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj);
34
static int32_t  mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj);
35
static int32_t  mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOld, SMnodeObj *pNew);
36
static int32_t  mndProcessCreateMnodeReq(SRpcMsg *pReq);
37
static int32_t  mndProcessAlterMnodeReq(SRpcMsg *pReq);
38
static int32_t  mndProcessDropMnodeReq(SRpcMsg *pReq);
39
static int32_t  mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
40
static void     mndCancelGetNextMnode(SMnode *pMnode, void *pIter);
41
static void     mndReloadSyncConfig(SMnode *pMnode);
42

43
int32_t mndInitMnode(SMnode *pMnode) {
517,028✔
44
  SSdbTable table = {
517,028✔
45
      .sdbType = SDB_MNODE,
46
      .keyType = SDB_KEY_INT32,
47
      .deployFp = (SdbDeployFp)mndCreateDefaultMnode,
48
      .encodeFp = (SdbEncodeFp)mndMnodeActionEncode,
49
      .decodeFp = (SdbDecodeFp)mndMnodeActionDecode,
50
      .insertFp = (SdbInsertFp)mndMnodeActionInsert,
51
      .updateFp = (SdbUpdateFp)mndMnodeActionUpdate,
52
      .deleteFp = (SdbDeleteFp)mndMnodeActionDelete,
53
  };
54

55
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_MNODE, mndProcessCreateMnodeReq);
517,028✔
56
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_MNODE_RSP, mndTransProcessRsp);
517,028✔
57
  mndSetMsgHandle(pMnode, TDMT_DND_ALTER_MNODE_TYPE_RSP, mndTransProcessRsp);
517,028✔
58
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_MNODE, mndProcessAlterMnodeReq);
517,028✔
59
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_MNODE_RSP, mndTransProcessRsp);
517,028✔
60
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_MNODE, mndProcessDropMnodeReq);
517,028✔
61
  mndSetMsgHandle(pMnode, TDMT_DND_DROP_MNODE_RSP, mndTransProcessRsp);
517,028✔
62

63
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndRetrieveMnodes);
517,028✔
64
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndCancelGetNextMnode);
517,028✔
65

66
  return sdbSetTable(pMnode->pSdb, table);
517,028✔
67
}
68

69
void mndCleanupMnode(SMnode *pMnode) {}
516,911✔
70

71
SMnodeObj *mndAcquireMnode(SMnode *pMnode, int32_t mnodeId) {
41,385,558✔
72
  terrno = 0;
41,385,558✔
73
  SMnodeObj *pObj = sdbAcquire(pMnode->pSdb, SDB_MNODE, &mnodeId);
41,385,558✔
74
  if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
41,385,558✔
75
    terrno = TSDB_CODE_MND_MNODE_NOT_EXIST;
17,528,332✔
76
  }
77
  return pObj;
41,385,558✔
78
}
79

80
void mndReleaseMnode(SMnode *pMnode, SMnodeObj *pObj) {
23,724,955✔
81
  SSdb *pSdb = pMnode->pSdb;
23,724,955✔
82
  sdbRelease(pMnode->pSdb, pObj);
23,724,955✔
83
}
23,724,955✔
84

85
static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
334,580✔
86
  int32_t   code = 0;
334,580✔
87
  SMnodeObj mnodeObj = {0};
334,580✔
88
  mnodeObj.id = 1;
334,580✔
89
  mnodeObj.createdTime = taosGetTimestampMs();
334,580✔
90
  mnodeObj.updateTime = mnodeObj.createdTime;
334,580✔
91

92
  SSdbRaw *pRaw = mndMnodeActionEncode(&mnodeObj);
334,580✔
93
  if (pRaw == NULL) {
334,580✔
94
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
95
    if (terrno != 0) code = terrno;
×
96
    return -1;
×
97
  }
98
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRaw, SDB_STATUS_READY));
334,580✔
99

100
  mInfo("mnode:%d, will be created when deploying, raw:%p", mnodeObj.id, pRaw);
334,580✔
101

102
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL, "create-mnode");
334,580✔
103
  if (pTrans == NULL) {
334,580✔
104
    sdbFreeRaw(pRaw);
×
105
    mError("mnode:%d, failed to create since %s", mnodeObj.id, terrstr());
×
106
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
107
    if (terrno != 0) code = terrno;
×
108
    return -1;
×
109
  }
110
  mInfo("trans:%d, used to create mnode:%d", pTrans->id, mnodeObj.id);
334,580✔
111

112
  if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
334,580✔
113
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
×
114
    mndTransDrop(pTrans);
×
115
    TAOS_RETURN(code);
×
116
  }
117
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRaw, SDB_STATUS_READY));
334,580✔
118

119
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
334,580✔
120
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
×
121
    mndTransDrop(pTrans);
×
122
    return -1;
×
123
  }
124

125
  mndTransDrop(pTrans);
334,580✔
126
  TAOS_RETURN(code);
334,580✔
127
}
128

129
static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pObj) {
1,590,941✔
130
  int32_t code = 0;
1,590,941✔
131
  int32_t lino = 0;
1,590,941✔
132
  terrno = TSDB_CODE_OUT_OF_MEMORY;
1,590,941✔
133

134
  SSdbRaw *pRaw = sdbAllocRaw(SDB_MNODE, MNODE_VER_NUMBER, sizeof(SMnodeObj) + MNODE_RESERVE_SIZE);
1,590,941✔
135
  if (pRaw == NULL) goto _OVER;
1,590,941✔
136

137
  int32_t dataPos = 0;
1,590,941✔
138
  SDB_SET_INT32(pRaw, dataPos, pObj->id, _OVER)
1,590,941✔
139
  SDB_SET_INT64(pRaw, dataPos, pObj->createdTime, _OVER)
1,590,941✔
140
  SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, _OVER)
1,590,941✔
141
  SDB_SET_INT32(pRaw, dataPos, pObj->role, _OVER)
1,590,941✔
142
  SDB_SET_INT64(pRaw, dataPos, pObj->lastIndex, _OVER)
1,590,941✔
143
  SDB_SET_RESERVE(pRaw, dataPos, MNODE_RESERVE_SIZE, _OVER)
1,590,941✔
144

145
  terrno = 0;
1,590,941✔
146

147
_OVER:
1,590,941✔
148
  if (terrno != 0) {
1,590,941✔
149
    mError("mnode:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr());
×
150
    sdbFreeRaw(pRaw);
×
151
    return NULL;
×
152
  }
153

154
  mTrace("mnode:%d, encode to raw:%p, row:%p", pObj->id, pRaw, pObj);
1,590,941✔
155
  return pRaw;
1,590,941✔
156
}
157

158
static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw) {
812,133✔
159
  int32_t code = 0;
812,133✔
160
  int32_t lino = 0;
812,133✔
161
  terrno = TSDB_CODE_OUT_OF_MEMORY;
812,133✔
162
  SSdbRow   *pRow = NULL;
812,133✔
163
  SMnodeObj *pObj = NULL;
812,133✔
164

165
  int8_t sver = 0;
812,133✔
166
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
812,133✔
167

168
  if (sver != 1 && sver != 2) {
812,133✔
169
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
170
    goto _OVER;
×
171
  }
172

173
  pRow = sdbAllocRow(sizeof(SMnodeObj));
812,133✔
174
  if (pRow == NULL) goto _OVER;
812,133✔
175

176
  pObj = sdbGetRowObj(pRow);
812,133✔
177
  if (pObj == NULL) goto _OVER;
812,133✔
178

179
  int32_t dataPos = 0;
812,133✔
180
  SDB_GET_INT32(pRaw, dataPos, &pObj->id, _OVER)
812,133✔
181
  SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, _OVER)
812,133✔
182
  SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, _OVER)
812,133✔
183
  if (sver >= 2) {
812,133✔
184
    SDB_GET_INT32(pRaw, dataPos, &pObj->role, _OVER)
812,133✔
185
    SDB_GET_INT64(pRaw, dataPos, &pObj->lastIndex, _OVER)
812,133✔
186
  }
187
  SDB_GET_RESERVE(pRaw, dataPos, MNODE_RESERVE_SIZE, _OVER)
812,133✔
188

189
  terrno = 0;
812,133✔
190

191
_OVER:
812,133✔
192
  if (terrno != 0) {
812,133✔
193
    mError("mnode:%d, failed to decode from raw:%p since %s", pObj == NULL ? 0 : pObj->id, pRaw, terrstr());
×
194
    taosMemoryFreeClear(pRow);
×
195
    return NULL;
×
196
  }
197

198
  mTrace("mnode:%d, decode from raw:%p, row:%p", pObj->id, pRaw, pObj);
812,133✔
199
  return pRow;
812,133✔
200
}
201

202
static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj) {
730,354✔
203
  int32_t code = 0;
730,354✔
204
  mTrace("mnode:%d, perform insert action, row:%p", pObj->id, pObj);
730,354✔
205
  pObj->pDnode = sdbAcquireNotReadyObj(pSdb, SDB_DNODE, &pObj->id);
730,354✔
206
  if (pObj->pDnode == NULL) {
730,354✔
207
    mError("mnode:%d, failed to perform insert action since %s", pObj->id, terrstr());
×
208
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
209
    if (terrno != 0) code = terrno;
×
210
    int32_t code = 0;
×
211
  }
212

213
  pObj->syncState = TAOS_SYNC_STATE_OFFLINE;
730,354✔
214
  mndReloadSyncConfig(pSdb->pMnode);
730,354✔
215
  TAOS_RETURN(code);
730,354✔
216
}
217

218
static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj) {
812,050✔
219
  mTrace("mnode:%d, perform delete action, row:%p", pObj->id, pObj);
812,050✔
220
  if (pObj->pDnode != NULL) {
812,050✔
221
    sdbRelease(pSdb, pObj->pDnode);
730,271✔
222
    pObj->pDnode = NULL;
730,271✔
223
  }
224

225
  return 0;
812,050✔
226
}
227

228
static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOld, SMnodeObj *pNew) {
77,850✔
229
  mTrace("mnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
77,850✔
230
  pOld->role = pNew->role;
77,850✔
231
  pOld->updateTime = pNew->updateTime;
77,850✔
232
  pOld->lastIndex = pNew->lastIndex;
77,850✔
233
  mndReloadSyncConfig(pSdb->pMnode);
77,850✔
234

235
  return 0;
77,850✔
236
}
237

238
bool mndIsMnode(SMnode *pMnode, int32_t dnodeId) {
136,958,365✔
239
  SSdb *pSdb = pMnode->pSdb;
136,958,365✔
240

241
  SMnodeObj *pObj = sdbAcquire(pSdb, SDB_MNODE, &dnodeId);
136,958,365✔
242
  if (pObj == NULL) {
136,958,365✔
243
    return false;
92,492,992✔
244
  }
245

246
  sdbRelease(pSdb, pObj);
44,465,373✔
247
  return true;
44,465,373✔
248
}
249

250
void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {
26,867,506✔
251
  if (pMnode == NULL || pEpSet == NULL) {
26,867,506✔
252
    return;
×
253
  }
254

255
  syncGetRetryEpSet(pMnode->syncMgmt.sync, pEpSet);
26,868,576✔
256

257
  /*
258
  SSdb   *pSdb = pMnode->pSdb;
259
  int32_t totalMnodes = sdbGetSize(pSdb, SDB_MNODE);
260
  if (totalMnodes == 0) {
261
    syncGetRetryEpSet(pMnode->syncMgmt.sync, pEpSet);
262
    return;
263
  }
264

265
  void *pIter = NULL;
266
  while (1) {
267
    SMnodeObj *pObj = NULL;
268
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
269
    if (pIter == NULL) break;
270

271
    if (pObj->id == pMnode->selfDnodeId) {
272
      if (mndIsLeader(pMnode)) {
273
        pEpSet->inUse = pEpSet->numOfEps;
274
      } else {
275
        pEpSet->inUse = (pEpSet->numOfEps + 1) % totalMnodes;
276
        // pEpSet->inUse = 0;
277
      }
278
    }
279
    if (pObj->pDnode != NULL) {
280
      if (addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port) != 0) {
281
        mError("mnode:%d, failed to add ep:%s:%d into epset", pObj->id, pObj->pDnode->fqdn, pObj->pDnode->port);
282
      }
283
      sdbRelease(pSdb, pObj);
284
    }
285

286
    if (pEpSet->numOfEps == 0) {
287
      syncGetRetryEpSet(pMnode->syncMgmt.sync, pEpSet);
288
    }
289

290
    if (pEpSet->inUse >= pEpSet->numOfEps) {
291
      pEpSet->inUse = 0;
292
    }
293
    epsetSort(pEpSet);
294
  }
295
    */
296
}
297

298
static int32_t mndSetCreateMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
32,294✔
299
  int32_t  code = 0;
32,294✔
300
  SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj);
32,294✔
301
  if (pRedoRaw == NULL) {
32,294✔
302
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
303
    if (terrno != 0) code = terrno;
×
304
    TAOS_RETURN(code);
×
305
  }
306
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
32,294✔
307
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
32,294✔
308
  TAOS_RETURN(code);
32,294✔
309
}
310

311
int32_t mndSetRestoreCreateMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
×
312
  int32_t  code = 0;
×
313
  SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj);
×
314
  if (pRedoRaw == NULL) {
×
315
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
316
    if (terrno != 0) code = terrno;
×
317
    TAOS_RETURN(code);
×
318
  }
319
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
320
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY));
×
321
  TAOS_RETURN(code);
×
322
}
323

324
static int32_t mndSetCreateMnodeUndoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
×
325
  int32_t  code = 0;
×
326
  SSdbRaw *pUndoRaw = mndMnodeActionEncode(pObj);
×
327
  if (pUndoRaw == NULL) {
×
328
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
329
    if (terrno != 0) code = terrno;
×
330
    TAOS_RETURN(code);
×
331
  }
332
  TAOS_CHECK_RETURN(mndTransAppendUndolog(pTrans, pUndoRaw));
×
333
  TAOS_CHECK_RETURN(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED));
×
334
  TAOS_RETURN(code);
×
335
}
336

337
int32_t mndSetCreateMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
32,934✔
338
  int32_t  code = 0;
32,934✔
339
  SSdbRaw *pCommitRaw = mndMnodeActionEncode(pObj);
32,934✔
340
  if (pCommitRaw == NULL) {
32,934✔
341
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
342
    if (terrno != 0) code = terrno;
×
343
    TAOS_RETURN(code);
×
344
  }
345
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
32,934✔
346
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
32,934✔
347
  TAOS_RETURN(code);
32,934✔
348
}
349

350
static int32_t mndBuildCreateMnodeRedoAction(STrans *pTrans, SDCreateMnodeReq *pCreateReq, SEpSet *pCreateEpSet) {
32,934✔
351
  int32_t code = 0;
32,934✔
352
  int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pCreateReq);
32,934✔
353
  void   *pReq = taosMemoryMalloc(contLen);
32,934✔
354
  if (pReq == NULL) {
32,934✔
355
    code = terrno;
×
356
    return code;
×
357
  }
358
  code = tSerializeSDCreateMnodeReq(pReq, contLen, pCreateReq);
32,934✔
359
  if (code < 0) {
32,934✔
360
    taosMemoryFree(pReq);
×
361
    TAOS_RETURN(code);
×
362
  }
363

364
  STransAction action = {
32,934✔
365
      .epSet = *pCreateEpSet,
366
      .pCont = pReq,
367
      .contLen = contLen,
368
      .msgType = TDMT_DND_CREATE_MNODE,
369
      .acceptableCode = TSDB_CODE_MNODE_ALREADY_DEPLOYED,
370
      .groupId = -1,
371
  };
372

373
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
32,934✔
374
    taosMemoryFree(pReq);
×
375
    TAOS_RETURN(code);
×
376
  }
377
  TAOS_RETURN(code);
32,934✔
378
}
379

380
static int32_t mndBuildAlterMnodeTypeRedoAction(STrans *pTrans, SDAlterMnodeTypeReq *pAlterMnodeTypeReq,
32,934✔
381
                                                SEpSet *pAlterMnodeTypeEpSet) {
382
  int32_t code = 0;
32,934✔
383
  int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pAlterMnodeTypeReq);
32,934✔
384
  void   *pReq = taosMemoryMalloc(contLen);
32,934✔
385
  if (pReq == NULL) {
32,934✔
386
    code = terrno;
×
387
    return code;
×
388
  }
389
  code = tSerializeSDCreateMnodeReq(pReq, contLen, pAlterMnodeTypeReq);
32,934✔
390
  if (code < 0) {
32,934✔
391
    taosMemoryFree(pReq);
×
392
    TAOS_RETURN(code);
×
393
  }
394

395
  STransAction action = {
32,934✔
396
      .epSet = *pAlterMnodeTypeEpSet,
397
      .pCont = pReq,
398
      .contLen = contLen,
399
      .msgType = TDMT_DND_ALTER_MNODE_TYPE,
400
      .retryCode = TSDB_CODE_MNODE_NOT_CATCH_UP,
401
      .acceptableCode = TSDB_CODE_MNODE_ALREADY_IS_VOTER,
402
      .groupId = -1,
403
  };
404

405
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
32,934✔
406
    taosMemoryFree(pReq);
×
407
    TAOS_RETURN(code);
×
408
  }
409
  TAOS_RETURN(code);
32,934✔
410
}
411

412
static int32_t mndBuildAlterMnodeRedoAction(STrans *pTrans, SDCreateMnodeReq *pAlterReq, SEpSet *pAlterEpSet) {
×
413
  int32_t code = 0;
×
414
  int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pAlterReq);
×
415
  void   *pReq = taosMemoryMalloc(contLen);
×
416
  if (pReq == NULL) {
×
417
    code = terrno;
×
418
    return code;
×
419
  }
420
  code = tSerializeSDCreateMnodeReq(pReq, contLen, pAlterReq);
×
421
  if (code < 0) {
×
422
    taosMemoryFree(pReq);
×
423
    TAOS_RETURN(code);
×
424
  }
425
  STransAction action = {
×
426
      .epSet = *pAlterEpSet,
427
      .pCont = pReq,
428
      .contLen = contLen,
429
      .msgType = TDMT_MND_ALTER_MNODE,
430
      .acceptableCode = 0,
431
  };
432

433
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
434
    taosMemoryFree(pReq);
×
435
    TAOS_RETURN(code);
×
436
  }
437

438
  TAOS_RETURN(code);
×
439
}
440

441
static int32_t mndBuildDropMnodeRedoAction(STrans *pTrans, SDDropMnodeReq *pDropReq, SEpSet *pDroprEpSet) {
1,666✔
442
  int32_t code = 0;
1,666✔
443
  int32_t contLen = tSerializeSCreateDropMQSNodeReq(NULL, 0, pDropReq);
1,666✔
444
  void   *pReq = taosMemoryMalloc(contLen);
1,666✔
445
  if (pReq == NULL) {
1,666✔
446
    code = terrno;
×
447
    return code;
×
448
  }
449
  code = tSerializeSCreateDropMQSNodeReq(pReq, contLen, pDropReq);
1,666✔
450
  if (code < 0) {
1,666✔
451
    taosMemoryFree(pReq);
×
452
    TAOS_RETURN(code);
×
453
  }
454

455
  STransAction action = {
1,666✔
456
      .epSet = *pDroprEpSet,
457
      .pCont = pReq,
458
      .contLen = contLen,
459
      .msgType = TDMT_DND_DROP_MNODE,
460
      .acceptableCode = TSDB_CODE_MNODE_NOT_DEPLOYED,
461
      .groupId = -1,
462
  };
463

464
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
1,666✔
465
    taosMemoryFree(pReq);
×
466
    TAOS_RETURN(code);
×
467
  }
468
  TAOS_RETURN(code);
1,666✔
469
}
470

471
static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
32,294✔
472
  SSdb            *pSdb = pMnode->pSdb;
32,294✔
473
  void            *pIter = NULL;
32,294✔
474
  int32_t          numOfReplicas = 0;
32,294✔
475
  int32_t          numOfLearnerReplicas = 0;
32,294✔
476
  SDCreateMnodeReq createReq = {0};
32,294✔
477
  SEpSet           createEpset = {0};
32,294✔
478

479
  while (1) {
48,054✔
480
    SMnodeObj *pMObj = NULL;
80,348✔
481
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
80,348✔
482
    if (pIter == NULL) break;
80,348✔
483

484
    if (pMObj->role == TAOS_SYNC_ROLE_VOTER) {
48,054✔
485
      createReq.replicas[numOfReplicas].id = pMObj->id;
48,054✔
486
      createReq.replicas[numOfReplicas].port = pMObj->pDnode->port;
48,054✔
487
      memcpy(createReq.replicas[numOfReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
48,054✔
488
      numOfReplicas++;
48,054✔
489
    } else {
490
      createReq.learnerReplicas[numOfLearnerReplicas].id = pMObj->id;
×
491
      createReq.learnerReplicas[numOfLearnerReplicas].port = pMObj->pDnode->port;
×
492
      memcpy(createReq.learnerReplicas[numOfLearnerReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
×
493
      numOfLearnerReplicas++;
×
494
    }
495

496
    sdbRelease(pSdb, pMObj);
48,054✔
497
  }
498

499
  createReq.replica = numOfReplicas;
32,294✔
500
  createReq.learnerReplica = numOfLearnerReplicas + 1;
32,294✔
501
  createReq.learnerReplicas[numOfLearnerReplicas].id = pDnode->id;
32,294✔
502
  createReq.learnerReplicas[numOfLearnerReplicas].port = pDnode->port;
32,294✔
503
  memcpy(createReq.learnerReplicas[numOfLearnerReplicas].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
32,294✔
504

505
  createReq.lastIndex = pObj->lastIndex;
32,294✔
506

507
  createEpset.inUse = 0;
32,294✔
508
  createEpset.numOfEps = 1;
32,294✔
509
  createEpset.eps[0].port = pDnode->port;
32,294✔
510
  memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
32,294✔
511

512
  TAOS_CHECK_RETURN(mndBuildCreateMnodeRedoAction(pTrans, &createReq, &createEpset));
32,294✔
513

514
  TAOS_RETURN(0);
32,294✔
515
}
516

517
int32_t mndSetRestoreCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
640✔
518
  SSdb            *pSdb = pMnode->pSdb;
640✔
519
  void            *pIter = NULL;
640✔
520
  SDCreateMnodeReq createReq = {0};
640✔
521
  SEpSet           createEpset = {0};
640✔
522

523
  while (1) {
1,920✔
524
    SMnodeObj *pMObj = NULL;
2,560✔
525
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
2,560✔
526
    if (pIter == NULL) break;
2,560✔
527

528
    if (pMObj->id == pDnode->id) {
1,920✔
529
      sdbRelease(pSdb, pMObj);
640✔
530
      continue;
640✔
531
    }
532

533
    if (pMObj->role == TAOS_SYNC_ROLE_VOTER) {
1,280✔
534
      createReq.replicas[createReq.replica].id = pMObj->id;
1,280✔
535
      createReq.replicas[createReq.replica].port = pMObj->pDnode->port;
1,280✔
536
      memcpy(createReq.replicas[createReq.replica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
1,280✔
537
      createReq.replica++;
1,280✔
538
    } else {
539
      createReq.learnerReplicas[createReq.learnerReplica].id = pMObj->id;
×
540
      createReq.learnerReplicas[createReq.learnerReplica].port = pMObj->pDnode->port;
×
541
      memcpy(createReq.learnerReplicas[createReq.learnerReplica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
×
542
      createReq.learnerReplica++;
×
543
    }
544

545
    sdbRelease(pSdb, pMObj);
1,280✔
546
  }
547

548
  createReq.learnerReplicas[createReq.learnerReplica].id = pDnode->id;
640✔
549
  createReq.learnerReplicas[createReq.learnerReplica].port = pDnode->port;
640✔
550
  memcpy(createReq.learnerReplicas[createReq.learnerReplica].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
640✔
551
  createReq.learnerReplica++;
640✔
552

553
  createReq.lastIndex = pObj->lastIndex;
640✔
554

555
  createEpset.inUse = 0;
640✔
556
  createEpset.numOfEps = 1;
640✔
557
  createEpset.eps[0].port = pDnode->port;
640✔
558
  memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
640✔
559

560
  TAOS_CHECK_RETURN(mndBuildCreateMnodeRedoAction(pTrans, &createReq, &createEpset));
640✔
561

562
  TAOS_RETURN(0);
640✔
563
}
564

565
static int32_t mndSetAlterMnodeTypeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
32,294✔
566
  SSdb               *pSdb = pMnode->pSdb;
32,294✔
567
  void               *pIter = NULL;
32,294✔
568
  SDAlterMnodeTypeReq alterReq = {0};
32,294✔
569
  SEpSet              createEpset = {0};
32,294✔
570

571
  while (1) {
48,054✔
572
    SMnodeObj *pMObj = NULL;
80,348✔
573
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
80,348✔
574
    if (pIter == NULL) break;
80,348✔
575

576
    if (pMObj->role == TAOS_SYNC_ROLE_VOTER) {
48,054✔
577
      alterReq.replicas[alterReq.replica].id = pMObj->id;
48,054✔
578
      alterReq.replicas[alterReq.replica].port = pMObj->pDnode->port;
48,054✔
579
      memcpy(alterReq.replicas[alterReq.replica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
48,054✔
580
      alterReq.replica++;
48,054✔
581
    } else {
582
      alterReq.learnerReplicas[alterReq.learnerReplica].id = pMObj->id;
×
583
      alterReq.learnerReplicas[alterReq.learnerReplica].port = pMObj->pDnode->port;
×
584
      memcpy(alterReq.learnerReplicas[alterReq.learnerReplica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
×
585
      alterReq.learnerReplica++;
×
586
    }
587

588
    sdbRelease(pSdb, pMObj);
48,054✔
589
  }
590

591
  alterReq.replicas[alterReq.replica].id = pDnode->id;
32,294✔
592
  alterReq.replicas[alterReq.replica].port = pDnode->port;
32,294✔
593
  memcpy(alterReq.replicas[alterReq.replica].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
32,294✔
594
  alterReq.replica++;
32,294✔
595

596
  alterReq.lastIndex = pObj->lastIndex;
32,294✔
597

598
  createEpset.inUse = 0;
32,294✔
599
  createEpset.numOfEps = 1;
32,294✔
600
  createEpset.eps[0].port = pDnode->port;
32,294✔
601
  memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
32,294✔
602

603
  TAOS_CHECK_RETURN(mndBuildAlterMnodeTypeRedoAction(pTrans, &alterReq, &createEpset));
32,294✔
604

605
  TAOS_RETURN(0);
32,294✔
606
}
607

608
int32_t mndSetRestoreAlterMnodeTypeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
640✔
609
  SSdb               *pSdb = pMnode->pSdb;
640✔
610
  void               *pIter = NULL;
640✔
611
  SDAlterMnodeTypeReq alterReq = {0};
640✔
612
  SEpSet              createEpset = {0};
640✔
613

614
  while (1) {
1,920✔
615
    SMnodeObj *pMObj = NULL;
2,560✔
616
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
2,560✔
617
    if (pIter == NULL) break;
2,560✔
618

619
    if (pMObj->id == pDnode->id) {
1,920✔
620
      sdbRelease(pSdb, pMObj);
640✔
621
      continue;
640✔
622
    }
623

624
    if (pMObj->role == TAOS_SYNC_ROLE_VOTER) {
1,280✔
625
      alterReq.replicas[alterReq.replica].id = pMObj->id;
1,280✔
626
      alterReq.replicas[alterReq.replica].port = pMObj->pDnode->port;
1,280✔
627
      memcpy(alterReq.replicas[alterReq.replica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
1,280✔
628
      alterReq.replica++;
1,280✔
629
    } else {
630
      alterReq.learnerReplicas[alterReq.learnerReplica].id = pMObj->id;
×
631
      alterReq.learnerReplicas[alterReq.learnerReplica].port = pMObj->pDnode->port;
×
632
      memcpy(alterReq.learnerReplicas[alterReq.learnerReplica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
×
633
      alterReq.learnerReplica++;
×
634
    }
635

636
    sdbRelease(pSdb, pMObj);
1,280✔
637
  }
638

639
  alterReq.replicas[alterReq.replica].id = pDnode->id;
640✔
640
  alterReq.replicas[alterReq.replica].port = pDnode->port;
640✔
641
  memcpy(alterReq.replicas[alterReq.replica].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
640✔
642
  alterReq.replica++;
640✔
643

644
  alterReq.lastIndex = pObj->lastIndex;
640✔
645

646
  createEpset.inUse = 0;
640✔
647
  createEpset.numOfEps = 1;
640✔
648
  createEpset.eps[0].port = pDnode->port;
640✔
649
  memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
640✔
650

651
  TAOS_CHECK_RETURN(mndBuildAlterMnodeTypeRedoAction(pTrans, &alterReq, &createEpset));
640✔
652

653
  TAOS_RETURN(0);
640✔
654
}
655

656
static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMCreateMnodeReq *pCreate) {
32,294✔
657
  int32_t code = -1;
32,294✔
658

659
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "create-mnode");
32,294✔
660
  if (pTrans == NULL) {
32,294✔
661
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
662
    if (terrno != 0) code = terrno;
×
663
    goto _OVER;
×
664
  }
665
  mndTransSetSerial(pTrans);
32,294✔
666
  mInfo("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId);
32,294✔
667
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
32,294✔
668

669
  SMnodeObj mnodeObj = {0};
32,294✔
670
  mnodeObj.id = pDnode->id;
32,294✔
671
  mnodeObj.createdTime = taosGetTimestampMs();
32,294✔
672
  mnodeObj.updateTime = mnodeObj.createdTime;
32,294✔
673
  mnodeObj.role = TAOS_SYNC_ROLE_LEARNER;
32,294✔
674
  mnodeObj.lastIndex = pMnode->applied;
32,294✔
675

676
  TAOS_CHECK_GOTO(mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj), NULL, _OVER);
32,294✔
677
  TAOS_CHECK_GOTO(mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj), NULL, _OVER);
32,294✔
678

679
  SMnodeObj mnodeLeaderObj = {0};
32,294✔
680
  mnodeLeaderObj.id = pDnode->id;
32,294✔
681
  mnodeLeaderObj.createdTime = taosGetTimestampMs();
32,294✔
682
  mnodeLeaderObj.updateTime = mnodeLeaderObj.createdTime;
32,294✔
683
  mnodeLeaderObj.role = TAOS_SYNC_ROLE_VOTER;
32,294✔
684
  mnodeLeaderObj.lastIndex = pMnode->applied + 1;
32,294✔
685

686
  TAOS_CHECK_GOTO(mndSetAlterMnodeTypeRedoActions(pMnode, pTrans, pDnode, &mnodeLeaderObj), NULL, _OVER);
32,294✔
687
  TAOS_CHECK_GOTO(mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeLeaderObj), NULL, _OVER);
32,294✔
688
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
32,294✔
689

690
  code = 0;
32,294✔
691

692
_OVER:
32,294✔
693
  mndTransDrop(pTrans);
32,294✔
694
  TAOS_RETURN(code);
32,294✔
695
}
696

697
static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) {
51,988✔
698
  SMnode          *pMnode = pReq->info.node;
51,988✔
699
  int32_t          code = -1;
51,988✔
700
  SMnodeObj       *pObj = NULL;
51,988✔
701
  SDnodeObj       *pDnode = NULL;
51,988✔
702
  SMCreateMnodeReq createReq = {0};
51,988✔
703
  int64_t          tss = taosGetTimestampMs();
51,988✔
704

705
  TAOS_CHECK_GOTO(tDeserializeSCreateDropMQSNodeReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);
51,988✔
706

707
  mInfo("mnode:%d, start to create", createReq.dnodeId);
51,988✔
708
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_MNODE), NULL, _OVER);
51,988✔
709

710
  pObj = mndAcquireMnode(pMnode, createReq.dnodeId);
51,623✔
711
  if (pObj != NULL) {
51,623✔
712
    code = TSDB_CODE_MND_MNODE_ALREADY_EXIST;
13,529✔
713
    goto _OVER;
13,529✔
714
  } else if (terrno != TSDB_CODE_MND_MNODE_NOT_EXIST) {
38,094✔
715
    goto _OVER;
×
716
  }
717

718
  pDnode = mndAcquireDnode(pMnode, createReq.dnodeId);
38,094✔
719
  if (pDnode == NULL) {
38,094✔
720
    code = TSDB_CODE_MND_DNODE_NOT_EXIST;
2,728✔
721
    goto _OVER;
2,728✔
722
  }
723

724
  if (sdbGetSize(pMnode->pSdb, SDB_MNODE) >= 3) {
35,366✔
725
    code = TSDB_CODE_MND_TOO_MANY_MNODES;
682✔
726
    goto _OVER;
682✔
727
  }
728

729
  if (!mndIsDnodeOnline(pDnode, taosGetTimestampMs())) {
34,684✔
730
    code = TSDB_CODE_DNODE_OFFLINE;
2,390✔
731
    goto _OVER;
2,390✔
732
  }
733

734
  code = mndCreateMnode(pMnode, pReq, pDnode, &createReq);
32,294✔
735
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
32,294✔
736

737
  if (tsAuditLevel >= AUDIT_LEVEL_SYSTEM) {
32,294✔
738
    char    obj[40] = {0};
32,294✔
739
    int32_t bytes = snprintf(obj, sizeof(obj), "%d", createReq.dnodeId);
32,294✔
740
    if ((uint32_t)bytes < sizeof(obj)) {
32,294✔
741
      int64_t tse = taosGetTimestampMs();
32,294✔
742
      double  duration = (double)(tse - tss);
32,294✔
743
      duration = duration / 1000;
32,294✔
744
      auditRecord(pReq, pMnode->clusterId, "createMnode", "", obj, createReq.sql, createReq.sqlLen, duration, 0);
32,294✔
745
    } else {
NEW
746
      mError("mnode:%d, failed to audit create req since %s", createReq.dnodeId, tstrerror(TSDB_CODE_OUT_OF_RANGE));
×
747
    }
748
  }
749

750
_OVER:
51,988✔
751
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
51,988✔
752
    mError("mnode:%d, failed to create since %s", createReq.dnodeId, terrstr());
19,694✔
753
  }
754

755
  mndReleaseMnode(pMnode, pObj);
51,988✔
756
  mndReleaseDnode(pMnode, pDnode);
51,988✔
757
  tFreeSMCreateQnodeReq(&createReq);
51,988✔
758

759
  TAOS_RETURN(code);
51,988✔
760
}
761

762
static int32_t mndSetDropMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
1,666✔
763
  int32_t  code = 0;
1,666✔
764
  SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj);
1,666✔
765
  if (pRedoRaw == NULL) {
1,666✔
766
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
767
    if (terrno != 0) code = terrno;
×
768
    TAOS_RETURN(code);
×
769
  }
770
  TAOS_CHECK_RETURN(mndTransAppendGroupRedolog(pTrans, pRedoRaw, -1));
1,666✔
771
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
1,666✔
772
  TAOS_RETURN(code);
1,666✔
773
}
774

775
static int32_t mndSetDropMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
1,666✔
776
  int32_t  code = 0;
1,666✔
777
  SSdbRaw *pCommitRaw = mndMnodeActionEncode(pObj);
1,666✔
778
  if (pCommitRaw == NULL) {
1,666✔
779
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
780
    if (terrno != 0) code = terrno;
×
781
    TAOS_RETURN(code);
×
782
  }
783
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
1,666✔
784
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
1,666✔
785
  TAOS_RETURN(code);
1,666✔
786
}
787

788
static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj,
1,666✔
789
                                          bool force) {
790
  int32_t        code = 0;
1,666✔
791
  SSdb          *pSdb = pMnode->pSdb;
1,666✔
792
  void          *pIter = NULL;
1,666✔
793
  SDDropMnodeReq dropReq = {0};
1,666✔
794
  SEpSet         dropEpSet = {0};
1,666✔
795

796
  dropReq.dnodeId = pDnode->id;
1,666✔
797
  dropEpSet.numOfEps = 1;
1,666✔
798
  dropEpSet.eps[0].port = pDnode->port;
1,666✔
799
  memcpy(dropEpSet.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
1,666✔
800

801
  int32_t totalMnodes = sdbGetSize(pSdb, SDB_MNODE);
1,666✔
802
  if (totalMnodes == 2) {
1,666✔
803
    if (force) {
708✔
804
      mError("cant't force drop dnode, since a mnode on it and replica is 2");
×
805
      code = TSDB_CODE_MNODE_ONLY_TWO_MNODE;
×
806
      TAOS_RETURN(code);
×
807
    }
808
    mInfo("vgId:1, has %d mnodes, exec redo log first", totalMnodes);
708✔
809
    TAOS_CHECK_RETURN(mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj));
708✔
810
    if (!force) {
708✔
811
      TAOS_CHECK_RETURN(mndBuildDropMnodeRedoAction(pTrans, &dropReq, &dropEpSet));
708✔
812
    }
813
  } else if (totalMnodes == 3) {
958✔
814
    mInfo("vgId:1, has %d mnodes, exec redo action first", totalMnodes);
958✔
815
    if (!force) {
958✔
816
      TAOS_CHECK_RETURN(mndBuildDropMnodeRedoAction(pTrans, &dropReq, &dropEpSet));
958✔
817
    }
818
    TAOS_CHECK_RETURN(mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj));
958✔
819
  } else {
820
    TAOS_RETURN(-1);
×
821
  }
822

823
  TAOS_RETURN(code);
1,666✔
824
}
825

826
int32_t mndSetDropMnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj, bool force) {
1,666✔
827
  if (pObj == NULL) return 0;
1,666✔
828
  pObj->lastIndex = pMnode->applied;
1,666✔
829
  TAOS_CHECK_RETURN(mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj, force));
1,666✔
830
  TAOS_CHECK_RETURN(mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj));
1,666✔
831
  return 0;
1,666✔
832
}
833

834
static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) {
1,013✔
835
  int32_t code = -1;
1,013✔
836
  STrans *pTrans = NULL;
1,013✔
837

838
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "drop-mnode");
1,013✔
839
  if (pTrans == NULL) {
1,013✔
840
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
841
    if (terrno != 0) code = terrno;
×
842
    goto _OVER;
×
843
  }
844
  mndTransSetSerial(pTrans);
1,013✔
845
  mInfo("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id);
1,013✔
846
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
1,013✔
847

848
  TAOS_CHECK_GOTO(mndSetDropMnodeInfoToTrans(pMnode, pTrans, pObj, false), NULL, _OVER);
1,013✔
849
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
1,013✔
850

851
  code = 0;
1,013✔
852

853
_OVER:
1,013✔
854
  mndTransDrop(pTrans);
1,013✔
855
  TAOS_RETURN(code);
1,013✔
856
}
857

858
static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) {
6,469✔
859
  SMnode        *pMnode = pReq->info.node;
6,469✔
860
  int32_t        code = -1;
6,469✔
861
  SMnodeObj     *pObj = NULL;
6,469✔
862
  SMDropMnodeReq dropReq = {0};
6,469✔
863
  int64_t        tss = taosGetTimestampMs();
6,469✔
864

865
  TAOS_CHECK_GOTO(tDeserializeSCreateDropMQSNodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
6,469✔
866

867
  mInfo("mnode:%d, start to drop", dropReq.dnodeId);
6,469✔
868
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE), NULL, _OVER);
6,469✔
869

870
  if (dropReq.dnodeId <= 0) {
6,104✔
871
    code = TSDB_CODE_INVALID_MSG;
×
872
    goto _OVER;
×
873
  }
874

875
  pObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
6,104✔
876
  if (pObj == NULL) {
6,104✔
877
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
2,392✔
878
    if (terrno != 0) code = terrno;
2,392✔
879
    goto _OVER;
2,392✔
880
  }
881

882
  if (pMnode->selfDnodeId == dropReq.dnodeId) {
3,712✔
883
    code = TSDB_CODE_MND_CANT_DROP_LEADER;
2,014✔
884
    goto _OVER;
2,014✔
885
  }
886

887
  if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
1,698✔
888
    code = TSDB_CODE_MND_TOO_FEW_MNODES;
×
889
    goto _OVER;
×
890
  }
891

892
  if (!mndIsDnodeOnline(pObj->pDnode, taosGetTimestampMs())) {
1,698✔
893
    code = TSDB_CODE_DNODE_OFFLINE;
685✔
894
    goto _OVER;
685✔
895
  }
896

897
  code = mndDropMnode(pMnode, pReq, pObj);
1,013✔
898
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
1,013✔
899

900
  if (tsAuditLevel >= AUDIT_LEVEL_SYSTEM) {
1,013✔
901
    char obj[40] = {0};
1,013✔
902
    (void)tsnprintf(obj, sizeof(obj), "%d", dropReq.dnodeId);
1,013✔
903

904
    int64_t tse = taosGetTimestampMs();
1,013✔
905
    double  duration = (double)(tse - tss);
1,013✔
906
    duration = duration / 1000;
1,013✔
907
    auditRecord(pReq, pMnode->clusterId, "dropMnode", "", obj, dropReq.sql, dropReq.sqlLen, duration, 0);
1,013✔
908
  }
909

910
_OVER:
6,469✔
911
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
6,469✔
912
    mError("mnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr());
5,456✔
913
  }
914

915
  mndReleaseMnode(pMnode, pObj);
6,469✔
916
  tFreeSMCreateQnodeReq(&dropReq);
6,469✔
917
  TAOS_RETURN(code);
6,469✔
918
}
919

920
static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
382,155✔
921
  SMnode    *pMnode = pReq->info.node;
382,155✔
922
  SSdb      *pSdb = pMnode->pSdb;
382,155✔
923
  int32_t    numOfRows = 0;
382,155✔
924
  int32_t    cols = 0;
382,155✔
925
  SMnodeObj *pObj = NULL;
382,155✔
926
  SMnodeObj *pSelfObj = NULL;
382,155✔
927
  ESdbStatus objStatus = 0;
382,155✔
928
  char      *pWrite;
929
  int64_t    curMs = taosGetTimestampMs();
382,155✔
930
  int        code = 0;
382,155✔
931

932
  pSelfObj = sdbAcquire(pSdb, SDB_MNODE, &pMnode->selfDnodeId);
382,155✔
933
  if (pSelfObj == NULL) {
382,155✔
934
    mError("mnode:%d, failed to acquire self %s", pMnode->selfDnodeId, terrstr());
×
935
    goto _out;
×
936
  }
937

938
  while (numOfRows < rows) {
868,098✔
939
    pShow->pIter = sdbFetchAll(pSdb, SDB_MNODE, pShow->pIter, (void **)&pObj, &objStatus, true);
868,098✔
940
    if (pShow->pIter == NULL) break;
868,098✔
941

942
    cols = 0;
485,943✔
943
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
485,943✔
944
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->id, false);
485,943✔
945
    if (code != 0) {
485,943✔
946
      mError("mnode:%d, failed to set col data val since %s", pObj->id, tstrerror(code));
×
947
      sdbCancelFetch(pSdb, pShow->pIter);
×
948
      sdbRelease(pSdb, pObj);
×
949
      goto _out;
×
950
    }
951

952
    char b1[TSDB_EP_LEN + VARSTR_HEADER_SIZE] = {0};
485,943✔
953
    STR_WITH_MAXSIZE_TO_VARSTR(b1, pObj->pDnode->ep, TSDB_EP_LEN + VARSTR_HEADER_SIZE);
485,943✔
954

955
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
485,943✔
956
    code = colDataSetVal(pColInfo, numOfRows, b1, false);
485,943✔
957
    if (code != 0) {
485,943✔
958
      mError("mnode:%d, failed to set col data val since %s", pObj->id, tstrerror(code));
×
959
      sdbCancelFetch(pSdb, pShow->pIter);
×
960
      sdbRelease(pSdb, pObj);
×
961
      goto _out;
×
962
    }
963

964
    char role[20] = "offline";
485,943✔
965
    if (pObj->id == pMnode->selfDnodeId) {
485,943✔
966
      snprintf(role, sizeof(role), "%s%s", syncStr(TAOS_SYNC_STATE_LEADER), pMnode->restored ? "" : "*");
382,155✔
967
    }
968
    bool isDnodeOnline = mndIsDnodeOnline(pObj->pDnode, curMs);
485,943✔
969
    if (isDnodeOnline) {
485,943✔
970
      tstrncpy(role, syncStr(pObj->syncState), sizeof(role));
472,677✔
971
      if (pObj->syncState == TAOS_SYNC_STATE_LEADER && pObj->id != pMnode->selfDnodeId) {
472,677✔
972
        tstrncpy(role, syncStr(TAOS_SYNC_STATE_ERROR), sizeof(role));
×
973
        mError("mnode:%d, is leader too", pObj->id);
×
974
      }
975
    }
976
    char b2[12 + VARSTR_HEADER_SIZE] = {0};
485,943✔
977
    STR_WITH_MAXSIZE_TO_VARSTR(b2, role, pShow->pMeta->pSchemas[cols].bytes);
485,943✔
978
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
485,943✔
979
    code = colDataSetVal(pColInfo, numOfRows, (const char *)b2, false);
485,943✔
980
    if (code != 0) goto _err;
485,943✔
981

982
    const char *status = "ready";
485,943✔
983
    if (objStatus == SDB_STATUS_CREATING) status = "creating";
485,943✔
984
    if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
485,943✔
985
    if (!isDnodeOnline) status = "offline";
485,943✔
986
    char b3[9 + VARSTR_HEADER_SIZE] = {0};
485,943✔
987
    STR_WITH_MAXSIZE_TO_VARSTR(b3, status, pShow->pMeta->pSchemas[cols].bytes);
485,943✔
988
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
485,943✔
989
    code = colDataSetVal(pColInfo, numOfRows, (const char *)b3, false);
485,943✔
990
    if (code != 0) goto _err;
485,943✔
991

992
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
485,943✔
993
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->createdTime, false);
485,943✔
994
    if (code != 0) goto _err;
485,943✔
995

996
    int64_t roleTimeMs = (isDnodeOnline) ? pObj->roleTimeMs : 0;
485,943✔
997
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
485,943✔
998
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&roleTimeMs, false);
485,943✔
999
    if (code != 0) goto _err;
485,943✔
1000

1001
    numOfRows++;
485,943✔
1002
    sdbRelease(pSdb, pObj);
485,943✔
1003
  }
1004

1005
  pShow->numOfRows += numOfRows;
382,155✔
1006

1007
_out:
382,155✔
1008
  sdbRelease(pSdb, pSelfObj);
382,155✔
1009
  return numOfRows;
382,155✔
1010

1011
_err:
×
1012
  mError("mnode:%d, failed to set col data val since %s", pObj->id, tstrerror(code));
×
1013
  sdbCancelFetch(pSdb, pShow->pIter);
×
1014
  sdbRelease(pSdb, pObj);
×
1015
  sdbRelease(pSdb, pSelfObj);
×
1016
  return numOfRows;
×
1017
}
1018

1019
static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter) {
×
1020
  SSdb *pSdb = pMnode->pSdb;
×
1021
  sdbCancelFetchByType(pSdb, pIter, SDB_MNODE);
×
1022
}
×
1023

1024
static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) {
×
1025
#if 1
1026
  return 0;
×
1027
#else
1028
  int32_t         code = 0;
1029
  SMnode         *pMnode = pReq->info.node;
1030
  SDAlterMnodeReq alterReq = {0};
1031

1032
  TAOS_CHECK_RETURN(tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq));
1033

1034
  SMnodeOpt option = {.deploy = true, .numOfReplicas = alterReq.replica, .selfIndex = -1};
1035
  memcpy(option.replicas, alterReq.replicas, sizeof(alterReq.replicas));
1036
  for (int32_t i = 0; i < option.numOfReplicas; ++i) {
1037
    if (alterReq.replicas[i].id == pMnode->selfDnodeId) {
1038
      option.selfIndex = i;
1039
    }
1040
  }
1041

1042
  if (option.selfIndex == -1) {
1043
    mInfo("alter mnode not processed since selfIndex is -1", terrstr());
1044
    return 0;
1045
  }
1046

1047
  if ((code = mndWriteFile(pMnode->path, &option)) != 0) {
1048
    mError("failed to write mnode file since %s", terrstr());
1049
    TAOS_RETURN(code);
1050
  }
1051

1052
  SSyncCfg cfg = {.replicaNum = alterReq.replica, .myIndex = -1};
1053
  for (int32_t i = 0; i < alterReq.replica; ++i) {
1054
    SNodeInfo *pNode = &cfg.nodeInfo[i];
1055
    tstrncpy(pNode->nodeFqdn, alterReq.replicas[i].fqdn, sizeof(pNode->nodeFqdn));
1056
    pNode->nodePort = alterReq.replicas[i].port;
1057
    if (alterReq.replicas[i].id == pMnode->selfDnodeId) {
1058
      cfg.myIndex = i;
1059
    }
1060
  }
1061

1062
  if (cfg.myIndex == -1) {
1063
    mError("failed to alter mnode since myindex is -1");
1064
    return -1;
1065
  } else {
1066
    mInfo("start to alter mnode sync, replica:%d myIndex:%d", cfg.replicaNum, cfg.myIndex);
1067
    for (int32_t i = 0; i < alterReq.replica; ++i) {
1068
      SNodeInfo *pNode = &cfg.nodeInfo[i];
1069
      mInfo("index:%d, fqdn:%s port:%d", i, pNode->nodeFqdn, pNode->nodePort);
1070
    }
1071
  }
1072

1073
  code = syncReconfig(pMnode->syncMgmt.sync, &cfg);
1074
  if (code != 0) {
1075
    mError("failed to sync reconfig since %s", terrstr());
1076
  } else {
1077
    mInfo("alter mnode sync success");
1078
  }
1079

1080
  TAOS_RETURN(code);
1081
#endif
1082
}
1083

1084
static void mndReloadSyncConfig(SMnode *pMnode) {
808,204✔
1085
  SSdb      *pSdb = pMnode->pSdb;
808,204✔
1086
  SMnodeObj *pObj = NULL;
808,204✔
1087
  ESdbStatus objStatus = 0;
808,204✔
1088
  void      *pIter = NULL;
808,204✔
1089
  int32_t    updatingMnodes = 0;
808,204✔
1090
  int32_t    readyMnodes = 0;
808,204✔
1091
  int32_t    code = 0;
808,204✔
1092
  SSyncCfg   cfg = {
808,204✔
1093
        .myIndex = -1,
1094
        .lastIndex = 0,
1095
  };
1096
  SyncIndex maxIndex = 0;
808,204✔
1097

1098
  while (1) {
1099
    pIter = sdbFetchAll(pSdb, SDB_MNODE, pIter, (void **)&pObj, &objStatus, false);
2,045,979✔
1100
    if (pIter == NULL) break;
2,045,979✔
1101
    if (objStatus == SDB_STATUS_CREATING || objStatus == SDB_STATUS_DROPPING) {
1,237,775✔
1102
      mInfo("vgId:1, has updating mnode:%d, status:%s", pObj->id, sdbStatusName(objStatus));
106,825✔
1103
      updatingMnodes++;
106,825✔
1104
    }
1105
    if (objStatus == SDB_STATUS_READY) {
1,237,775✔
1106
      mInfo("vgId:1, has ready mnode:%d, status:%s", pObj->id, sdbStatusName(objStatus));
1,130,950✔
1107
      readyMnodes++;
1,130,950✔
1108
    }
1109

1110
    if (objStatus == SDB_STATUS_READY || objStatus == SDB_STATUS_CREATING) {
1,237,775✔
1111
      SNodeInfo *pNode = &cfg.nodeInfo[cfg.totalReplicaNum];
1,233,846✔
1112
      pNode->nodeId = pObj->pDnode->id;
1,233,846✔
1113
      pNode->clusterId = mndGetClusterId(pMnode);
1,233,846✔
1114
      pNode->nodePort = pObj->pDnode->port;
1,233,846✔
1115
      pNode->nodeRole = pObj->role;
1,233,846✔
1116
      tstrncpy(pNode->nodeFqdn, pObj->pDnode->fqdn, TSDB_FQDN_LEN);
1,233,846✔
1117
      code = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
1,233,846✔
1118
      if (code != 0) {
1,233,846✔
1119
        mError("mnode:%d, failed to update dnode info since %s", pObj->id, terrstr());
×
1120
      }
1121
      mInfo("vgId:1, ep:%s:%u dnode:%d", pNode->nodeFqdn, pNode->nodePort, pNode->nodeId);
1,233,846✔
1122
      if (pObj->pDnode->id == pMnode->selfDnodeId) {
1,233,846✔
1123
        cfg.myIndex = cfg.totalReplicaNum;
662,079✔
1124
      }
1125
      if (pNode->nodeRole == TAOS_SYNC_ROLE_VOTER) {
1,233,846✔
1126
        cfg.replicaNum++;
1,130,950✔
1127
      }
1128
      cfg.totalReplicaNum++;
1,233,846✔
1129
      if (pObj->lastIndex > cfg.lastIndex) {
1,233,846✔
1130
        cfg.lastIndex = pObj->lastIndex;
419,140✔
1131
      }
1132
    }
1133

1134
    if (objStatus == SDB_STATUS_DROPPING) {
1,237,775✔
1135
      if (pObj->lastIndex > cfg.lastIndex) {
3,929✔
1136
        cfg.lastIndex = pObj->lastIndex;
3,929✔
1137
      }
1138
    }
1139

1140
    mInfo("vgId:1, mnode:%d, role:%d, lastIndex:%" PRId64, pObj->id, pObj->role, pObj->lastIndex);
1,237,775✔
1141

1142
    sdbReleaseLock(pSdb, pObj, false);
1,237,775✔
1143
  }
1144

1145
  // if (readyMnodes <= 0 || updatingMnodes <= 0) {
1146
  //   mInfo("vgId:1, mnode sync not reconfig since readyMnodes:%d updatingMnodes:%d", readyMnodes, updatingMnodes);
1147
  //   return;
1148
  // }
1149

1150
  if (cfg.myIndex == -1) {
808,204✔
1151
#if 1
1152
    mInfo("vgId:1, mnode sync not reconfig since selfIndex is -1");
146,125✔
1153
#else
1154
    // cannot reconfig because the leader may fail to elect after reboot
1155
    mInfo("vgId:1, mnode sync not reconfig since selfIndex is -1, do sync stop oper");
1156
    syncStop(pMnode->syncMgmt.sync);
1157
#endif
1158
    return;
146,125✔
1159
  }
1160

1161
  if (pMnode->syncMgmt.sync > 0) {
662,079✔
1162
    mInfo("vgId:1, mnode sync reconfig, totalReplica:%d replica:%d myIndex:%d", cfg.totalReplicaNum, cfg.replicaNum,
512,781✔
1163
          cfg.myIndex);
1164

1165
    for (int32_t i = 0; i < cfg.totalReplicaNum; ++i) {
1,305,392✔
1166
      SNodeInfo *pNode = &cfg.nodeInfo[i];
792,611✔
1167
      mInfo("vgId:1, index:%d, ep:%s:%u dnode:%d cluster:%" PRId64 " role:%d", i, pNode->nodeFqdn, pNode->nodePort,
792,611✔
1168
            pNode->nodeId, pNode->clusterId, pNode->nodeRole);
1169
    }
1170

1171
    int32_t code = syncReconfig(pMnode->syncMgmt.sync, &cfg);
512,781✔
1172
    if (code != 0) {
512,781✔
1173
      mError("vgId:1, mnode sync reconfig failed since %s", terrstr());
×
1174
    } else {
1175
      mInfo("vgId:1, mnode sync reconfig success");
512,781✔
1176
    }
1177
  }
1178
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc