• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3546

03 Dec 2024 10:02AM UTC coverage: 60.691% (-0.1%) from 60.839%
#3546

push

travis-ci

web-flow
Merge pull request #29015 from taosdata/fix/TS-5668

[TS-5668] fix(keeper): fix endpoint value too long for column/tag and eliminate warnings

120577 of 253823 branches covered (47.5%)

Branch coverage included in aggregate %.

201666 of 277134 relevant lines covered (72.77%)

18719900.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.9
/source/dnode/mnode/impl/src/mndMnode.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "mndCluster.h"
19
#include "mndDnode.h"
20
#include "mndMnode.h"
21
#include "mndPrivilege.h"
22
#include "mndShow.h"
23
#include "mndSync.h"
24
#include "mndTrans.h"
25
#include "tmisce.h"
26

27
#define MNODE_VER_NUMBER   2
28
#define MNODE_RESERVE_SIZE 64
29

30
static int32_t  mndCreateDefaultMnode(SMnode *pMnode);
31
static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pObj);
32
static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw);
33
static int32_t  mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj);
34
static int32_t  mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj);
35
static int32_t  mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOld, SMnodeObj *pNew);
36
static int32_t  mndProcessCreateMnodeReq(SRpcMsg *pReq);
37
static int32_t  mndProcessAlterMnodeReq(SRpcMsg *pReq);
38
static int32_t  mndProcessDropMnodeReq(SRpcMsg *pReq);
39
static int32_t  mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
40
static void     mndCancelGetNextMnode(SMnode *pMnode, void *pIter);
41
static void     mndReloadSyncConfig(SMnode *pMnode);
42

43
int32_t mndInitMnode(SMnode *pMnode) {
2,014✔
44
  SSdbTable table = {
2,014✔
45
      .sdbType = SDB_MNODE,
46
      .keyType = SDB_KEY_INT32,
47
      .deployFp = (SdbDeployFp)mndCreateDefaultMnode,
48
      .encodeFp = (SdbEncodeFp)mndMnodeActionEncode,
49
      .decodeFp = (SdbDecodeFp)mndMnodeActionDecode,
50
      .insertFp = (SdbInsertFp)mndMnodeActionInsert,
51
      .updateFp = (SdbUpdateFp)mndMnodeActionUpdate,
52
      .deleteFp = (SdbDeleteFp)mndMnodeActionDelete,
53
  };
54

55
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_MNODE, mndProcessCreateMnodeReq);
2,014✔
56
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_MNODE_RSP, mndTransProcessRsp);
2,014✔
57
  mndSetMsgHandle(pMnode, TDMT_DND_ALTER_MNODE_TYPE_RSP, mndTransProcessRsp);
2,014✔
58
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_MNODE, mndProcessAlterMnodeReq);
2,014✔
59
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_MNODE_RSP, mndTransProcessRsp);
2,014✔
60
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_MNODE, mndProcessDropMnodeReq);
2,014✔
61
  mndSetMsgHandle(pMnode, TDMT_DND_DROP_MNODE_RSP, mndTransProcessRsp);
2,014✔
62

63
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndRetrieveMnodes);
2,014✔
64
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndCancelGetNextMnode);
2,014✔
65

66
  return sdbSetTable(pMnode->pSdb, table);
2,014✔
67
}
68

69
void mndCleanupMnode(SMnode *pMnode) {}
2,013✔
70

71
SMnodeObj *mndAcquireMnode(SMnode *pMnode, int32_t mnodeId) {
102,157✔
72
  terrno = 0;
102,157✔
73
  SMnodeObj *pObj = sdbAcquire(pMnode->pSdb, SDB_MNODE, &mnodeId);
102,157✔
74
  if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
102,158✔
75
    terrno = TSDB_CODE_MND_MNODE_NOT_EXIST;
36,168✔
76
  }
77
  return pObj;
102,158✔
78
}
79

80
void mndReleaseMnode(SMnode *pMnode, SMnodeObj *pObj) {
65,740✔
81
  SSdb *pSdb = pMnode->pSdb;
65,740✔
82
  sdbRelease(pMnode->pSdb, pObj);
65,740✔
83
}
65,740✔
84

85
static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
1,498✔
86
  int32_t   code = 0;
1,498✔
87
  SMnodeObj mnodeObj = {0};
1,498✔
88
  mnodeObj.id = 1;
1,498✔
89
  mnodeObj.createdTime = taosGetTimestampMs();
1,498✔
90
  mnodeObj.updateTime = mnodeObj.createdTime;
1,498✔
91

92
  SSdbRaw *pRaw = mndMnodeActionEncode(&mnodeObj);
1,498✔
93
  if (pRaw == NULL) {
1,498!
94
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
95
    if (terrno != 0) code = terrno;
×
96
    return -1;
×
97
  }
98
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRaw, SDB_STATUS_READY));
1,498!
99

100
  mInfo("mnode:%d, will be created when deploying, raw:%p", mnodeObj.id, pRaw);
1,498!
101

102
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL, "create-mnode");
1,498✔
103
  if (pTrans == NULL) {
1,498!
104
    sdbFreeRaw(pRaw);
×
105
    mError("mnode:%d, failed to create since %s", mnodeObj.id, terrstr());
×
106
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
107
    if (terrno != 0) code = terrno;
×
108
    return -1;
×
109
  }
110
  mInfo("trans:%d, used to create mnode:%d", pTrans->id, mnodeObj.id);
1,498!
111

112
  if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
1,498!
113
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
×
114
    mndTransDrop(pTrans);
×
115
    TAOS_RETURN(code);
×
116
  }
117
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRaw, SDB_STATUS_READY));
1,498!
118

119
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
1,498!
120
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
×
121
    mndTransDrop(pTrans);
×
122
    return -1;
×
123
  }
124

125
  mndTransDrop(pTrans);
1,498✔
126
  TAOS_RETURN(code);
1,498✔
127
}
128

129
static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pObj) {
5,614✔
130
  int32_t code = 0;
5,614✔
131
  int32_t lino = 0;
5,614✔
132
  terrno = TSDB_CODE_OUT_OF_MEMORY;
5,614✔
133

134
  SSdbRaw *pRaw = sdbAllocRaw(SDB_MNODE, MNODE_VER_NUMBER, sizeof(SMnodeObj) + MNODE_RESERVE_SIZE);
5,614✔
135
  if (pRaw == NULL) goto _OVER;
5,614!
136

137
  int32_t dataPos = 0;
5,614✔
138
  SDB_SET_INT32(pRaw, dataPos, pObj->id, _OVER)
5,614!
139
  SDB_SET_INT64(pRaw, dataPos, pObj->createdTime, _OVER)
5,614!
140
  SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, _OVER)
5,614!
141
  SDB_SET_INT32(pRaw, dataPos, pObj->role, _OVER)
5,614!
142
  SDB_SET_INT64(pRaw, dataPos, pObj->lastIndex, _OVER)
5,614!
143
  SDB_SET_RESERVE(pRaw, dataPos, MNODE_RESERVE_SIZE, _OVER)
5,614!
144

145
  terrno = 0;
5,614✔
146

147
_OVER:
5,614✔
148
  if (terrno != 0) {
5,614!
149
    mError("mnode:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr());
×
150
    sdbFreeRaw(pRaw);
×
151
    return NULL;
×
152
  }
153

154
  mTrace("mnode:%d, encode to raw:%p, row:%p", pObj->id, pRaw, pObj);
5,614✔
155
  return pRaw;
5,614✔
156
}
157

158
static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw) {
2,827✔
159
  int32_t code = 0;
2,827✔
160
  int32_t lino = 0;
2,827✔
161
  terrno = TSDB_CODE_OUT_OF_MEMORY;
2,827✔
162
  SSdbRow   *pRow = NULL;
2,827✔
163
  SMnodeObj *pObj = NULL;
2,827✔
164

165
  int8_t sver = 0;
2,827✔
166
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
2,827!
167

168
  if (sver != 1 && sver != 2) {
2,827!
169
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
170
    goto _OVER;
×
171
  }
172

173
  pRow = sdbAllocRow(sizeof(SMnodeObj));
2,827✔
174
  if (pRow == NULL) goto _OVER;
2,827!
175

176
  pObj = sdbGetRowObj(pRow);
2,827✔
177
  if (pObj == NULL) goto _OVER;
2,827!
178

179
  int32_t dataPos = 0;
2,827✔
180
  SDB_GET_INT32(pRaw, dataPos, &pObj->id, _OVER)
2,827!
181
  SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, _OVER)
2,827!
182
  SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, _OVER)
2,827!
183
  if (sver >= 2) {
2,827!
184
    SDB_GET_INT32(pRaw, dataPos, &pObj->role, _OVER)
2,827!
185
    SDB_GET_INT64(pRaw, dataPos, &pObj->lastIndex, _OVER)
2,827!
186
  }
187
  SDB_GET_RESERVE(pRaw, dataPos, MNODE_RESERVE_SIZE, _OVER)
2,827!
188

189
  terrno = 0;
2,827✔
190

191
_OVER:
2,827✔
192
  if (terrno != 0) {
2,827!
193
    mError("mnode:%d, failed to decode from raw:%p since %s", pObj == NULL ? 0 : pObj->id, pRaw, terrstr());
×
194
    taosMemoryFreeClear(pRow);
×
195
    return NULL;
×
196
  }
197

198
  mTrace("mnode:%d, decode from raw:%p, row:%p", pObj->id, pRaw, pObj);
2,827✔
199
  return pRow;
2,827✔
200
}
201

202
static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj) {
2,590✔
203
  int32_t code = 0;
2,590✔
204
  mTrace("mnode:%d, perform insert action, row:%p", pObj->id, pObj);
2,590✔
205
  pObj->pDnode = sdbAcquireNotReadyObj(pSdb, SDB_DNODE, &pObj->id);
2,590✔
206
  if (pObj->pDnode == NULL) {
2,590!
207
    mError("mnode:%d, failed to perform insert action since %s", pObj->id, terrstr());
×
208
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
209
    if (terrno != 0) code = terrno;
×
210
    int32_t code = 0;
×
211
  }
212

213
  pObj->syncState = TAOS_SYNC_STATE_OFFLINE;
2,590✔
214
  mndReloadSyncConfig(pSdb->pMnode);
2,590✔
215
  TAOS_RETURN(code);
2,590✔
216
}
217

218
static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj) {
2,824✔
219
  mTrace("mnode:%d, perform delete action, row:%p", pObj->id, pObj);
2,824✔
220
  if (pObj->pDnode != NULL) {
2,824✔
221
    sdbRelease(pSdb, pObj->pDnode);
2,587✔
222
    pObj->pDnode = NULL;
2,587✔
223
  }
224

225
  return 0;
2,824✔
226
}
227

228
static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOld, SMnodeObj *pNew) {
221✔
229
  mTrace("mnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
221!
230
  pOld->role = pNew->role;
221✔
231
  pOld->updateTime = pNew->updateTime;
221✔
232
  pOld->lastIndex = pNew->lastIndex;
221✔
233
  mndReloadSyncConfig(pSdb->pMnode);
221✔
234

235
  return 0;
221✔
236
}
237

238
bool mndIsMnode(SMnode *pMnode, int32_t dnodeId) {
266,466✔
239
  SSdb *pSdb = pMnode->pSdb;
266,466✔
240

241
  SMnodeObj *pObj = sdbAcquire(pSdb, SDB_MNODE, &dnodeId);
266,466✔
242
  if (pObj == NULL) {
266,466✔
243
    return false;
184,793✔
244
  }
245

246
  sdbRelease(pSdb, pObj);
81,673✔
247
  return true;
81,673✔
248
}
249

250
void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {
450,963✔
251
  SSdb   *pSdb = pMnode->pSdb;
450,963✔
252
  int32_t totalMnodes = sdbGetSize(pSdb, SDB_MNODE);
450,963✔
253
  if (totalMnodes == 0) {
450,995✔
254
    syncGetRetryEpSet(pMnode->syncMgmt.sync, pEpSet);
79✔
255
    return;
79✔
256
  }
257

258
  void *pIter = NULL;
450,916✔
259
  while (1) {
493,953✔
260
    SMnodeObj *pObj = NULL;
944,869✔
261
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
944,869✔
262
    if (pIter == NULL) break;
944,866✔
263

264
    if (pObj->id == pMnode->selfDnodeId) {
493,917✔
265
      if (mndIsLeader(pMnode)) {
450,892✔
266
        pEpSet->inUse = pEpSet->numOfEps;
430,931✔
267
      } else {
268
        pEpSet->inUse = (pEpSet->numOfEps + 1) % totalMnodes;
19,941✔
269
        // pEpSet->inUse = 0;
270
      }
271
    }
272
    if (pObj->pDnode != NULL) {
493,897!
273
      if (addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port) != 0) {
493,922!
274
        mError("mnode:%d, failed to add ep:%s:%d into epset", pObj->id, pObj->pDnode->fqdn, pObj->pDnode->port);
×
275
      }
276
      sdbRelease(pSdb, pObj);
493,848✔
277
    }
278

279
    if (pEpSet->numOfEps == 0) {
493,946!
280
      syncGetRetryEpSet(pMnode->syncMgmt.sync, pEpSet);
×
281
    }
282

283
    if (pEpSet->inUse >= pEpSet->numOfEps) {
493,946✔
284
      pEpSet->inUse = 0;
13,230✔
285
    }
286
    epsetSort(pEpSet);
493,946✔
287
  }
288
}
289

290
static int32_t mndSetCreateMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
93✔
291
  int32_t  code = 0;
93✔
292
  SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj);
93✔
293
  if (pRedoRaw == NULL) {
93!
294
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
295
    if (terrno != 0) code = terrno;
×
296
    TAOS_RETURN(code);
×
297
  }
298
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
93!
299
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
93!
300
  TAOS_RETURN(code);
93✔
301
}
302

303
int32_t mndSetRestoreCreateMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
×
304
  int32_t  code = 0;
×
305
  SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj);
×
306
  if (pRedoRaw == NULL) {
×
307
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
308
    if (terrno != 0) code = terrno;
×
309
    TAOS_RETURN(code);
×
310
  }
311
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
312
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY));
×
313
  TAOS_RETURN(code);
×
314
}
315

316
static int32_t mndSetCreateMnodeUndoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
×
317
  int32_t  code = 0;
×
318
  SSdbRaw *pUndoRaw = mndMnodeActionEncode(pObj);
×
319
  if (pUndoRaw == NULL) {
×
320
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
321
    if (terrno != 0) code = terrno;
×
322
    TAOS_RETURN(code);
×
323
  }
324
  TAOS_CHECK_RETURN(mndTransAppendUndolog(pTrans, pUndoRaw));
×
325
  TAOS_CHECK_RETURN(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED));
×
326
  TAOS_RETURN(code);
×
327
}
328

329
int32_t mndSetCreateMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
95✔
330
  int32_t  code = 0;
95✔
331
  SSdbRaw *pCommitRaw = mndMnodeActionEncode(pObj);
95✔
332
  if (pCommitRaw == NULL) {
95!
333
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
334
    if (terrno != 0) code = terrno;
×
335
    TAOS_RETURN(code);
×
336
  }
337
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
95!
338
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
95!
339
  TAOS_RETURN(code);
95✔
340
}
341

342
static int32_t mndBuildCreateMnodeRedoAction(STrans *pTrans, SDCreateMnodeReq *pCreateReq, SEpSet *pCreateEpSet) {
95✔
343
  int32_t code = 0;
95✔
344
  int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pCreateReq);
95✔
345
  void   *pReq = taosMemoryMalloc(contLen);
95✔
346
  if (pReq == NULL) {
95!
347
    code = terrno;
×
348
    return code;
×
349
  }
350
  code = tSerializeSDCreateMnodeReq(pReq, contLen, pCreateReq);
95✔
351
  if (code < 0) {
95!
352
    taosMemoryFree(pReq);
×
353
    TAOS_RETURN(code);
×
354
  }
355

356
  STransAction action = {
95✔
357
      .epSet = *pCreateEpSet,
358
      .pCont = pReq,
359
      .contLen = contLen,
360
      .msgType = TDMT_DND_CREATE_MNODE,
361
      .acceptableCode = TSDB_CODE_MNODE_ALREADY_DEPLOYED,
362
  };
363

364
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
95!
365
    taosMemoryFree(pReq);
×
366
    TAOS_RETURN(code);
×
367
  }
368
  TAOS_RETURN(code);
95✔
369
}
370

371
static int32_t mndBuildAlterMnodeTypeRedoAction(STrans *pTrans, SDAlterMnodeTypeReq *pAlterMnodeTypeReq,
95✔
372
                                                SEpSet *pAlterMnodeTypeEpSet) {
373
  int32_t code = 0;
95✔
374
  int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pAlterMnodeTypeReq);
95✔
375
  void   *pReq = taosMemoryMalloc(contLen);
95✔
376
  if (pReq == NULL) {
95!
377
    code = terrno;
×
378
    return code;
×
379
  }
380
  code = tSerializeSDCreateMnodeReq(pReq, contLen, pAlterMnodeTypeReq);
95✔
381
  if (code < 0) {
95!
382
    taosMemoryFree(pReq);
×
383
    TAOS_RETURN(code);
×
384
  }
385

386
  STransAction action = {
95✔
387
      .epSet = *pAlterMnodeTypeEpSet,
388
      .pCont = pReq,
389
      .contLen = contLen,
390
      .msgType = TDMT_DND_ALTER_MNODE_TYPE,
391
      .retryCode = TSDB_CODE_MNODE_NOT_CATCH_UP,
392
      .acceptableCode = TSDB_CODE_MNODE_ALREADY_IS_VOTER,
393
  };
394

395
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
95!
396
    taosMemoryFree(pReq);
×
397
    TAOS_RETURN(code);
×
398
  }
399
  TAOS_RETURN(code);
95✔
400
}
401

402
static int32_t mndBuildAlterMnodeRedoAction(STrans *pTrans, SDCreateMnodeReq *pAlterReq, SEpSet *pAlterEpSet) {
×
403
  int32_t code = 0;
×
404
  int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pAlterReq);
×
405
  void   *pReq = taosMemoryMalloc(contLen);
×
406
  if (pReq == NULL) {
×
407
    code = terrno;
×
408
    return code;
×
409
  }
410
  code = tSerializeSDCreateMnodeReq(pReq, contLen, pAlterReq);
×
411
  if (code < 0) {
×
412
    taosMemoryFree(pReq);
×
413
    TAOS_RETURN(code);
×
414
  }
415
  STransAction action = {
×
416
      .epSet = *pAlterEpSet,
417
      .pCont = pReq,
418
      .contLen = contLen,
419
      .msgType = TDMT_MND_ALTER_MNODE,
420
      .acceptableCode = 0,
421
  };
422

423
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
424
    taosMemoryFree(pReq);
×
425
    TAOS_RETURN(code);
×
426
  }
427

428
  TAOS_RETURN(code);
×
429
}
430

431
static int32_t mndBuildDropMnodeRedoAction(STrans *pTrans, SDDropMnodeReq *pDropReq, SEpSet *pDroprEpSet) {
6✔
432
  int32_t code = 0;
6✔
433
  int32_t contLen = tSerializeSCreateDropMQSNodeReq(NULL, 0, pDropReq);
6✔
434
  void   *pReq = taosMemoryMalloc(contLen);
6✔
435
  if (pReq == NULL) {
6!
436
    code = terrno;
×
437
    return code;
×
438
  }
439
  code = tSerializeSCreateDropMQSNodeReq(pReq, contLen, pDropReq);
6✔
440
  if (code < 0) {
6!
441
    taosMemoryFree(pReq);
×
442
    TAOS_RETURN(code);
×
443
  }
444

445
  STransAction action = {
6✔
446
      .epSet = *pDroprEpSet,
447
      .pCont = pReq,
448
      .contLen = contLen,
449
      .msgType = TDMT_DND_DROP_MNODE,
450
      .acceptableCode = TSDB_CODE_MNODE_NOT_DEPLOYED,
451
  };
452

453
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
6!
454
    taosMemoryFree(pReq);
×
455
    TAOS_RETURN(code);
×
456
  }
457
  TAOS_RETURN(code);
6✔
458
}
459

460
static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
93✔
461
  SSdb            *pSdb = pMnode->pSdb;
93✔
462
  void            *pIter = NULL;
93✔
463
  int32_t          numOfReplicas = 0;
93✔
464
  int32_t          numOfLearnerReplicas = 0;
93✔
465
  SDCreateMnodeReq createReq = {0};
93✔
466
  SEpSet           createEpset = {0};
93✔
467

468
  while (1) {
138✔
469
    SMnodeObj *pMObj = NULL;
231✔
470
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
231✔
471
    if (pIter == NULL) break;
231✔
472

473
    if (pMObj->role == TAOS_SYNC_ROLE_VOTER) {
138!
474
      createReq.replicas[numOfReplicas].id = pMObj->id;
138✔
475
      createReq.replicas[numOfReplicas].port = pMObj->pDnode->port;
138✔
476
      memcpy(createReq.replicas[numOfReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
138✔
477
      numOfReplicas++;
138✔
478
    } else {
479
      createReq.learnerReplicas[numOfLearnerReplicas].id = pMObj->id;
×
480
      createReq.learnerReplicas[numOfLearnerReplicas].port = pMObj->pDnode->port;
×
481
      memcpy(createReq.learnerReplicas[numOfLearnerReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
×
482
      numOfLearnerReplicas++;
×
483
    }
484

485
    sdbRelease(pSdb, pMObj);
138✔
486
  }
487

488
  createReq.replica = numOfReplicas;
93✔
489
  createReq.learnerReplica = numOfLearnerReplicas + 1;
93✔
490
  createReq.learnerReplicas[numOfLearnerReplicas].id = pDnode->id;
93✔
491
  createReq.learnerReplicas[numOfLearnerReplicas].port = pDnode->port;
93✔
492
  memcpy(createReq.learnerReplicas[numOfLearnerReplicas].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
93✔
493

494
  createReq.lastIndex = pObj->lastIndex;
93✔
495

496
  createEpset.inUse = 0;
93✔
497
  createEpset.numOfEps = 1;
93✔
498
  createEpset.eps[0].port = pDnode->port;
93✔
499
  memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
93✔
500

501
  TAOS_CHECK_RETURN(mndBuildCreateMnodeRedoAction(pTrans, &createReq, &createEpset));
93!
502

503
  TAOS_RETURN(0);
93✔
504
}
505

506
int32_t mndSetRestoreCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
2✔
507
  SSdb            *pSdb = pMnode->pSdb;
2✔
508
  void            *pIter = NULL;
2✔
509
  SDCreateMnodeReq createReq = {0};
2✔
510
  SEpSet           createEpset = {0};
2✔
511

512
  while (1) {
6✔
513
    SMnodeObj *pMObj = NULL;
8✔
514
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
8✔
515
    if (pIter == NULL) break;
8✔
516

517
    if (pMObj->id == pDnode->id) {
6✔
518
      sdbRelease(pSdb, pMObj);
2✔
519
      continue;
2✔
520
    }
521

522
    if (pMObj->role == TAOS_SYNC_ROLE_VOTER) {
4!
523
      createReq.replicas[createReq.replica].id = pMObj->id;
4✔
524
      createReq.replicas[createReq.replica].port = pMObj->pDnode->port;
4✔
525
      memcpy(createReq.replicas[createReq.replica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
4✔
526
      createReq.replica++;
4✔
527
    } else {
528
      createReq.learnerReplicas[createReq.learnerReplica].id = pMObj->id;
×
529
      createReq.learnerReplicas[createReq.learnerReplica].port = pMObj->pDnode->port;
×
530
      memcpy(createReq.learnerReplicas[createReq.learnerReplica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
×
531
      createReq.learnerReplica++;
×
532
    }
533

534
    sdbRelease(pSdb, pMObj);
4✔
535
  }
536

537
  createReq.learnerReplicas[createReq.learnerReplica].id = pDnode->id;
2✔
538
  createReq.learnerReplicas[createReq.learnerReplica].port = pDnode->port;
2✔
539
  memcpy(createReq.learnerReplicas[createReq.learnerReplica].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
2✔
540
  createReq.learnerReplica++;
2✔
541

542
  createReq.lastIndex = pObj->lastIndex;
2✔
543

544
  createEpset.inUse = 0;
2✔
545
  createEpset.numOfEps = 1;
2✔
546
  createEpset.eps[0].port = pDnode->port;
2✔
547
  memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
2✔
548

549
  TAOS_CHECK_RETURN(mndBuildCreateMnodeRedoAction(pTrans, &createReq, &createEpset));
2!
550

551
  TAOS_RETURN(0);
2✔
552
}
553

554
static int32_t mndSetAlterMnodeTypeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
93✔
555
  SSdb               *pSdb = pMnode->pSdb;
93✔
556
  void               *pIter = NULL;
93✔
557
  SDAlterMnodeTypeReq alterReq = {0};
93✔
558
  SEpSet              createEpset = {0};
93✔
559

560
  while (1) {
138✔
561
    SMnodeObj *pMObj = NULL;
231✔
562
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
231✔
563
    if (pIter == NULL) break;
231✔
564

565
    if (pMObj->role == TAOS_SYNC_ROLE_VOTER) {
138!
566
      alterReq.replicas[alterReq.replica].id = pMObj->id;
138✔
567
      alterReq.replicas[alterReq.replica].port = pMObj->pDnode->port;
138✔
568
      memcpy(alterReq.replicas[alterReq.replica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
138✔
569
      alterReq.replica++;
138✔
570
    } else {
571
      alterReq.learnerReplicas[alterReq.learnerReplica].id = pMObj->id;
×
572
      alterReq.learnerReplicas[alterReq.learnerReplica].port = pMObj->pDnode->port;
×
573
      memcpy(alterReq.learnerReplicas[alterReq.learnerReplica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
×
574
      alterReq.learnerReplica++;
×
575
    }
576

577
    sdbRelease(pSdb, pMObj);
138✔
578
  }
579

580
  alterReq.replicas[alterReq.replica].id = pDnode->id;
93✔
581
  alterReq.replicas[alterReq.replica].port = pDnode->port;
93✔
582
  memcpy(alterReq.replicas[alterReq.replica].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
93✔
583
  alterReq.replica++;
93✔
584

585
  alterReq.lastIndex = pObj->lastIndex;
93✔
586

587
  createEpset.inUse = 0;
93✔
588
  createEpset.numOfEps = 1;
93✔
589
  createEpset.eps[0].port = pDnode->port;
93✔
590
  memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
93✔
591

592
  TAOS_CHECK_RETURN(mndBuildAlterMnodeTypeRedoAction(pTrans, &alterReq, &createEpset));
93!
593

594
  TAOS_RETURN(0);
93✔
595
}
596

597
int32_t mndSetRestoreAlterMnodeTypeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
2✔
598
  SSdb               *pSdb = pMnode->pSdb;
2✔
599
  void               *pIter = NULL;
2✔
600
  SDAlterMnodeTypeReq alterReq = {0};
2✔
601
  SEpSet              createEpset = {0};
2✔
602

603
  while (1) {
6✔
604
    SMnodeObj *pMObj = NULL;
8✔
605
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
8✔
606
    if (pIter == NULL) break;
8✔
607

608
    if (pMObj->id == pDnode->id) {
6✔
609
      sdbRelease(pSdb, pMObj);
2✔
610
      continue;
2✔
611
    }
612

613
    if (pMObj->role == TAOS_SYNC_ROLE_VOTER) {
4!
614
      alterReq.replicas[alterReq.replica].id = pMObj->id;
4✔
615
      alterReq.replicas[alterReq.replica].port = pMObj->pDnode->port;
4✔
616
      memcpy(alterReq.replicas[alterReq.replica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
4✔
617
      alterReq.replica++;
4✔
618
    } else {
619
      alterReq.learnerReplicas[alterReq.learnerReplica].id = pMObj->id;
×
620
      alterReq.learnerReplicas[alterReq.learnerReplica].port = pMObj->pDnode->port;
×
621
      memcpy(alterReq.learnerReplicas[alterReq.learnerReplica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
×
622
      alterReq.learnerReplica++;
×
623
    }
624

625
    sdbRelease(pSdb, pMObj);
4✔
626
  }
627

628
  alterReq.replicas[alterReq.replica].id = pDnode->id;
2✔
629
  alterReq.replicas[alterReq.replica].port = pDnode->port;
2✔
630
  memcpy(alterReq.replicas[alterReq.replica].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
2✔
631
  alterReq.replica++;
2✔
632

633
  alterReq.lastIndex = pObj->lastIndex;
2✔
634

635
  createEpset.inUse = 0;
2✔
636
  createEpset.numOfEps = 1;
2✔
637
  createEpset.eps[0].port = pDnode->port;
2✔
638
  memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
2✔
639

640
  TAOS_CHECK_RETURN(mndBuildAlterMnodeTypeRedoAction(pTrans, &alterReq, &createEpset));
2!
641

642
  TAOS_RETURN(0);
2✔
643
}
644

645
static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMCreateMnodeReq *pCreate) {
93✔
646
  int32_t code = -1;
93✔
647

648
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "create-mnode");
93✔
649
  if (pTrans == NULL) {
93!
650
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
651
    if (terrno != 0) code = terrno;
×
652
    goto _OVER;
×
653
  }
654
  mndTransSetSerial(pTrans);
93✔
655
  mInfo("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId);
93!
656
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
93!
657

658
  SMnodeObj mnodeObj = {0};
93✔
659
  mnodeObj.id = pDnode->id;
93✔
660
  mnodeObj.createdTime = taosGetTimestampMs();
93✔
661
  mnodeObj.updateTime = mnodeObj.createdTime;
93✔
662
  mnodeObj.role = TAOS_SYNC_ROLE_LEARNER;
93✔
663
  mnodeObj.lastIndex = pMnode->applied;
93✔
664

665
  TAOS_CHECK_GOTO(mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj), NULL, _OVER);
93!
666
  TAOS_CHECK_GOTO(mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj), NULL, _OVER);
93!
667

668
  SMnodeObj mnodeLeaderObj = {0};
93✔
669
  mnodeLeaderObj.id = pDnode->id;
93✔
670
  mnodeLeaderObj.createdTime = taosGetTimestampMs();
93✔
671
  mnodeLeaderObj.updateTime = mnodeLeaderObj.createdTime;
93✔
672
  mnodeLeaderObj.role = TAOS_SYNC_ROLE_VOTER;
93✔
673
  mnodeLeaderObj.lastIndex = pMnode->applied + 1;
93✔
674

675
  TAOS_CHECK_GOTO(mndSetAlterMnodeTypeRedoActions(pMnode, pTrans, pDnode, &mnodeLeaderObj), NULL, _OVER);
93!
676
  TAOS_CHECK_GOTO(mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeLeaderObj), NULL, _OVER);
93!
677
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
93!
678

679
  code = 0;
93✔
680

681
_OVER:
93✔
682
  mndTransDrop(pTrans);
93✔
683
  TAOS_RETURN(code);
93✔
684
}
685

686
static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) {
187✔
687
  SMnode          *pMnode = pReq->info.node;
187✔
688
  int32_t          code = -1;
187✔
689
  SMnodeObj       *pObj = NULL;
187✔
690
  SDnodeObj       *pDnode = NULL;
187✔
691
  SMCreateMnodeReq createReq = {0};
187✔
692

693
  TAOS_CHECK_GOTO(tDeserializeSCreateDropMQSNodeReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);
187!
694

695
  mInfo("mnode:%d, start to create", createReq.dnodeId);
187!
696
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_MNODE), NULL, _OVER);
187✔
697

698
  pObj = mndAcquireMnode(pMnode, createReq.dnodeId);
186✔
699
  if (pObj != NULL) {
186✔
700
    code = TSDB_CODE_MND_MNODE_ALREADY_EXIST;
40✔
701
    goto _OVER;
40✔
702
  } else if (terrno != TSDB_CODE_MND_MNODE_NOT_EXIST) {
146!
703
    goto _OVER;
×
704
  }
705

706
  pDnode = mndAcquireDnode(pMnode, createReq.dnodeId);
146✔
707
  if (pDnode == NULL) {
146✔
708
    code = TSDB_CODE_MND_DNODE_NOT_EXIST;
8✔
709
    goto _OVER;
8✔
710
  }
711

712
  if (sdbGetSize(pMnode->pSdb, SDB_MNODE) >= 3) {
138✔
713
    code = TSDB_CODE_MND_TOO_MANY_MNODES;
2✔
714
    goto _OVER;
2✔
715
  }
716

717
  if (!mndIsDnodeOnline(pDnode, taosGetTimestampMs())) {
136✔
718
    code = TSDB_CODE_DNODE_OFFLINE;
43✔
719
    goto _OVER;
43✔
720
  }
721

722
  code = mndCreateMnode(pMnode, pReq, pDnode, &createReq);
93✔
723
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
93!
724

725
  char obj[40] = {0};
93✔
726
  sprintf(obj, "%d", createReq.dnodeId);
93✔
727

728
  auditRecord(pReq, pMnode->clusterId, "createMnode", "", obj, createReq.sql, createReq.sqlLen);
93✔
729

730
_OVER:
187✔
731
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
187!
732
    mError("mnode:%d, failed to create since %s", createReq.dnodeId, terrstr());
94!
733
  }
734

735
  mndReleaseMnode(pMnode, pObj);
187✔
736
  mndReleaseDnode(pMnode, pDnode);
187✔
737
  tFreeSMCreateQnodeReq(&createReq);
187✔
738

739
  TAOS_RETURN(code);
187✔
740
}
741

742
static int32_t mndSetDropMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
8✔
743
  int32_t  code = 0;
8✔
744
  SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj);
8✔
745
  if (pRedoRaw == NULL) {
8!
746
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
747
    if (terrno != 0) code = terrno;
×
748
    TAOS_RETURN(code);
×
749
  }
750
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
8!
751
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
8!
752
  TAOS_RETURN(code);
8✔
753
}
754

755
static int32_t mndSetDropMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
8✔
756
  int32_t  code = 0;
8✔
757
  SSdbRaw *pCommitRaw = mndMnodeActionEncode(pObj);
8✔
758
  if (pCommitRaw == NULL) {
8!
759
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
760
    if (terrno != 0) code = terrno;
×
761
    TAOS_RETURN(code);
×
762
  }
763
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
8!
764
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
8!
765
  TAOS_RETURN(code);
8✔
766
}
767

768
static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj,
8✔
769
                                          bool force) {
770
  int32_t        code = 0;
8✔
771
  SSdb          *pSdb = pMnode->pSdb;
8✔
772
  void          *pIter = NULL;
8✔
773
  SDDropMnodeReq dropReq = {0};
8✔
774
  SEpSet         dropEpSet = {0};
8✔
775

776
  dropReq.dnodeId = pDnode->id;
8✔
777
  dropEpSet.numOfEps = 1;
8✔
778
  dropEpSet.eps[0].port = pDnode->port;
8✔
779
  memcpy(dropEpSet.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
8✔
780

781
  int32_t totalMnodes = sdbGetSize(pSdb, SDB_MNODE);
8✔
782
  if (totalMnodes == 2) {
8✔
783
    if (force) {
2!
784
      mError("cant't force drop dnode, since a mnode on it and replica is 2");
×
785
      code = TSDB_CODE_MNODE_ONLY_TWO_MNODE;
×
786
      TAOS_RETURN(code);
×
787
    }
788
    mInfo("vgId:1, has %d mnodes, exec redo log first", totalMnodes);
2!
789
    TAOS_CHECK_RETURN(mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj));
2!
790
    if (!force) {
2!
791
      TAOS_CHECK_RETURN(mndBuildDropMnodeRedoAction(pTrans, &dropReq, &dropEpSet));
2!
792
    }
793
  } else if (totalMnodes == 3) {
6!
794
    mInfo("vgId:1, has %d mnodes, exec redo action first", totalMnodes);
6!
795
    if (!force) {
6✔
796
      TAOS_CHECK_RETURN(mndBuildDropMnodeRedoAction(pTrans, &dropReq, &dropEpSet));
4!
797
    }
798
    TAOS_CHECK_RETURN(mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj));
6!
799
  } else {
800
    TAOS_RETURN(-1);
×
801
  }
802

803
  TAOS_RETURN(code);
8✔
804
}
805

806
int32_t mndSetDropMnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj, bool force) {
8✔
807
  if (pObj == NULL) return 0;
8!
808
  pObj->lastIndex = pMnode->applied;
8✔
809
  TAOS_CHECK_RETURN(mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj, force));
8!
810
  TAOS_CHECK_RETURN(mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj));
8!
811
  return 0;
8✔
812
}
813

814
static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) {
4✔
815
  int32_t code = -1;
4✔
816
  STrans *pTrans = NULL;
4✔
817

818
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "drop-mnode");
4✔
819
  if (pTrans == NULL) {
4!
820
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
821
    if (terrno != 0) code = terrno;
×
822
    goto _OVER;
×
823
  }
824
  mndTransSetSerial(pTrans);
4✔
825
  mInfo("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id);
4!
826
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
4!
827

828
  TAOS_CHECK_GOTO(mndSetDropMnodeInfoToTrans(pMnode, pTrans, pObj, false), NULL, _OVER);
4!
829
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
4!
830

831
  code = 0;
4✔
832

833
_OVER:
4✔
834
  mndTransDrop(pTrans);
4✔
835
  TAOS_RETURN(code);
4✔
836
}
837

838
static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) {
20✔
839
  SMnode        *pMnode = pReq->info.node;
20✔
840
  int32_t        code = -1;
20✔
841
  SMnodeObj     *pObj = NULL;
20✔
842
  SMDropMnodeReq dropReq = {0};
20✔
843

844
  TAOS_CHECK_GOTO(tDeserializeSCreateDropMQSNodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
20!
845

846
  mInfo("mnode:%d, start to drop", dropReq.dnodeId);
20!
847
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE), NULL, _OVER);
20✔
848

849
  if (dropReq.dnodeId <= 0) {
19!
850
    code = TSDB_CODE_INVALID_MSG;
×
851
    goto _OVER;
×
852
  }
853

854
  pObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
19✔
855
  if (pObj == NULL) {
19✔
856
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
7✔
857
    if (terrno != 0) code = terrno;
7!
858
    goto _OVER;
7✔
859
  }
860

861
  if (pMnode->selfDnodeId == dropReq.dnodeId) {
12✔
862
    code = TSDB_CODE_MND_CANT_DROP_LEADER;
6✔
863
    goto _OVER;
6✔
864
  }
865

866
  if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
6!
867
    code = TSDB_CODE_MND_TOO_FEW_MNODES;
×
868
    goto _OVER;
×
869
  }
870

871
  if (!mndIsDnodeOnline(pObj->pDnode, taosGetTimestampMs())) {
6✔
872
    code = TSDB_CODE_DNODE_OFFLINE;
2✔
873
    goto _OVER;
2✔
874
  }
875

876
  code = mndDropMnode(pMnode, pReq, pObj);
4✔
877
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
4!
878

879
  char obj[40] = {0};
4✔
880
  sprintf(obj, "%d", dropReq.dnodeId);
4✔
881

882
  auditRecord(pReq, pMnode->clusterId, "dropMnode", "", obj, dropReq.sql, dropReq.sqlLen);
4✔
883

884
_OVER:
20✔
885
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
20!
886
    mError("mnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr());
16!
887
  }
888

889
  mndReleaseMnode(pMnode, pObj);
20✔
890
  tFreeSMCreateQnodeReq(&dropReq);
20✔
891
  TAOS_RETURN(code);
20✔
892
}
893

894
static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
4,489✔
895
  SMnode    *pMnode = pReq->info.node;
4,489✔
896
  SSdb      *pSdb = pMnode->pSdb;
4,489✔
897
  int32_t    numOfRows = 0;
4,489✔
898
  int32_t    cols = 0;
4,489✔
899
  SMnodeObj *pObj = NULL;
4,489✔
900
  SMnodeObj *pSelfObj = NULL;
4,489✔
901
  ESdbStatus objStatus = 0;
4,489✔
902
  char      *pWrite;
903
  int64_t    curMs = taosGetTimestampMs();
4,490✔
904
  int        code = 0;
4,490✔
905

906
  pSelfObj = sdbAcquire(pSdb, SDB_MNODE, &pMnode->selfDnodeId);
4,490✔
907
  if (pSelfObj == NULL) {
4,486!
908
    mError("mnode:%d, failed to acquire self %s", pMnode->selfDnodeId, terrstr());
×
909
    goto _out;
×
910
  }
911

912
  while (numOfRows < rows) {
9,186✔
913
    pShow->pIter = sdbFetchAll(pSdb, SDB_MNODE, pShow->pIter, (void **)&pObj, &objStatus, true);
9,180✔
914
    if (pShow->pIter == NULL) break;
9,189✔
915

916
    cols = 0;
4,698✔
917
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
4,698✔
918
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->id, false);
4,691✔
919
    if (code != 0) {
4,685!
920
      mError("mnode:%d, failed to set col data val since %s", pObj->id, terrstr());
×
921
      goto _out;
×
922
    }
923

924
    char b1[TSDB_EP_LEN + VARSTR_HEADER_SIZE] = {0};
4,685✔
925
    STR_WITH_MAXSIZE_TO_VARSTR(b1, pObj->pDnode->ep, TSDB_EP_LEN + VARSTR_HEADER_SIZE);
4,685✔
926

927
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
4,685✔
928
    code = colDataSetVal(pColInfo, numOfRows, b1, false);
4,674✔
929
    if (code != 0) {
4,687!
930
      mError("mnode:%d, failed to set col data val since %s", pObj->id, terrstr());
×
931
      goto _out;
×
932
    }
933

934
    char role[20] = "offline";
4,687✔
935
    if (pObj->id == pMnode->selfDnodeId) {
4,687✔
936
      snprintf(role, sizeof(role), "%s%s", syncStr(TAOS_SYNC_STATE_LEADER), pMnode->restored ? "" : "*");
4,479!
937
    }
938
    bool isDnodeOnline = mndIsDnodeOnline(pObj->pDnode, curMs);
4,683✔
939
    if (isDnodeOnline) {
4,684✔
940
      tstrncpy(role, syncStr(pObj->syncState), sizeof(role));
4,646✔
941
      if (pObj->syncState == TAOS_SYNC_STATE_LEADER && pObj->id != pMnode->selfDnodeId) {
4,650!
942
        tstrncpy(role, syncStr(TAOS_SYNC_STATE_ERROR), sizeof(role));
×
943
        mError("mnode:%d, is leader too", pObj->id);
×
944
      }
945
    }
946
    char b2[12 + VARSTR_HEADER_SIZE] = {0};
4,688✔
947
    STR_WITH_MAXSIZE_TO_VARSTR(b2, role, pShow->pMeta->pSchemas[cols].bytes);
4,688✔
948
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
4,688✔
949
    code = colDataSetVal(pColInfo, numOfRows, (const char *)b2, false);
4,672✔
950
    if (code != 0) {
4,685!
951
      mError("mnode:%d, failed to set col data val since %s", pObj->id, terrstr());
×
952
      goto _out;
×
953
    }
954
    const char *status = "ready";
4,685✔
955
    if (objStatus == SDB_STATUS_CREATING) status = "creating";
4,685!
956
    if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
4,685!
957
    if (!isDnodeOnline) status = "offline";
4,685✔
958
    char b3[9 + VARSTR_HEADER_SIZE] = {0};
4,685✔
959
    STR_WITH_MAXSIZE_TO_VARSTR(b3, status, pShow->pMeta->pSchemas[cols].bytes);
4,685✔
960
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
4,685✔
961
    code = colDataSetVal(pColInfo, numOfRows, (const char *)b3, false);
4,675✔
962
    if (code != 0) {
4,683!
963
      mError("mnode:%d, failed to set col data val since %s", pObj->id, terrstr());
×
964
      goto _out;
×
965
    }
966

967
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
4,683✔
968
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->createdTime, false);
4,680✔
969
    if (code != 0) {
4,668!
970
      mError("mnode:%d, failed to set col data val since %s", pObj->id, terrstr());
×
971
      goto _out;
×
972
    }
973

974
    int64_t roleTimeMs = (isDnodeOnline) ? pObj->roleTimeMs : 0;
4,668✔
975
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
4,668✔
976
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&roleTimeMs, false);
4,665✔
977
    if (code != 0) {
4,672!
978
      mError("mnode:%d, failed to set col data val since %s", pObj->id, terrstr());
×
979
      goto _out;
×
980
    }
981

982
    numOfRows++;
4,672✔
983
    sdbRelease(pSdb, pObj);
4,672✔
984
  }
985

986
  pShow->numOfRows += numOfRows;
4,497✔
987

988
_out:
4,497✔
989
  sdbRelease(pSdb, pSelfObj);
4,497✔
990
  return numOfRows;
4,492✔
991
}
992

993
static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter) {
×
994
  SSdb *pSdb = pMnode->pSdb;
×
995
  sdbCancelFetchByType(pSdb, pIter, SDB_MNODE);
×
996
}
×
997

998
static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) {
×
999
#if 1
1000
  return 0;
×
1001
#else
1002
  int32_t         code = 0;
1003
  SMnode         *pMnode = pReq->info.node;
1004
  SDAlterMnodeReq alterReq = {0};
1005

1006
  TAOS_CHECK_RETURN(tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq));
1007

1008
  SMnodeOpt option = {.deploy = true, .numOfReplicas = alterReq.replica, .selfIndex = -1};
1009
  memcpy(option.replicas, alterReq.replicas, sizeof(alterReq.replicas));
1010
  for (int32_t i = 0; i < option.numOfReplicas; ++i) {
1011
    if (alterReq.replicas[i].id == pMnode->selfDnodeId) {
1012
      option.selfIndex = i;
1013
    }
1014
  }
1015

1016
  if (option.selfIndex == -1) {
1017
    mInfo("alter mnode not processed since selfIndex is -1", terrstr());
1018
    return 0;
1019
  }
1020

1021
  if ((code = mndWriteFile(pMnode->path, &option)) != 0) {
1022
    mError("failed to write mnode file since %s", terrstr());
1023
    TAOS_RETURN(code);
1024
  }
1025

1026
  SSyncCfg cfg = {.replicaNum = alterReq.replica, .myIndex = -1};
1027
  for (int32_t i = 0; i < alterReq.replica; ++i) {
1028
    SNodeInfo *pNode = &cfg.nodeInfo[i];
1029
    tstrncpy(pNode->nodeFqdn, alterReq.replicas[i].fqdn, sizeof(pNode->nodeFqdn));
1030
    pNode->nodePort = alterReq.replicas[i].port;
1031
    if (alterReq.replicas[i].id == pMnode->selfDnodeId) {
1032
      cfg.myIndex = i;
1033
    }
1034
  }
1035

1036
  if (cfg.myIndex == -1) {
1037
    mError("failed to alter mnode since myindex is -1");
1038
    return -1;
1039
  } else {
1040
    mInfo("start to alter mnode sync, replica:%d myIndex:%d", cfg.replicaNum, cfg.myIndex);
1041
    for (int32_t i = 0; i < alterReq.replica; ++i) {
1042
      SNodeInfo *pNode = &cfg.nodeInfo[i];
1043
      mInfo("index:%d, fqdn:%s port:%d", i, pNode->nodeFqdn, pNode->nodePort);
1044
    }
1045
  }
1046

1047
  code = syncReconfig(pMnode->syncMgmt.sync, &cfg);
1048
  if (code != 0) {
1049
    mError("failed to sync reconfig since %s", terrstr());
1050
  } else {
1051
    mInfo("alter mnode sync success");
1052
  }
1053

1054
  TAOS_RETURN(code);
1055
#endif
1056
}
1057

1058
static void mndReloadSyncConfig(SMnode *pMnode) {
2,811✔
1059
  SSdb      *pSdb = pMnode->pSdb;
2,811✔
1060
  SMnodeObj *pObj = NULL;
2,811✔
1061
  ESdbStatus objStatus = 0;
2,811✔
1062
  void      *pIter = NULL;
2,811✔
1063
  int32_t    updatingMnodes = 0;
2,811✔
1064
  int32_t    readyMnodes = 0;
2,811✔
1065
  int32_t    code = 0;
2,811✔
1066
  SSyncCfg   cfg = {
2,811✔
1067
        .myIndex = -1,
1068
        .lastIndex = 0,
1069
  };
1070
  SyncIndex maxIndex = 0;
2,811✔
1071

1072
  while (1) {
1073
    pIter = sdbFetchAll(pSdb, SDB_MNODE, pIter, (void **)&pObj, &objStatus, false);
6,797✔
1074
    if (pIter == NULL) break;
6,797✔
1075
    if (objStatus == SDB_STATUS_CREATING || objStatus == SDB_STATUS_DROPPING) {
3,986✔
1076
      mInfo("vgId:1, has updating mnode:%d, status:%s", pObj->id, sdbStatusName(objStatus));
298!
1077
      updatingMnodes++;
298✔
1078
    }
1079
    if (objStatus == SDB_STATUS_READY) {
3,986✔
1080
      mInfo("vgId:1, has ready mnode:%d, status:%s", pObj->id, sdbStatusName(objStatus));
3,688!
1081
      readyMnodes++;
3,688✔
1082
    }
1083

1084
    if (objStatus == SDB_STATUS_READY || objStatus == SDB_STATUS_CREATING) {
3,986✔
1085
      SNodeInfo *pNode = &cfg.nodeInfo[cfg.totalReplicaNum];
3,970✔
1086
      pNode->nodeId = pObj->pDnode->id;
3,970✔
1087
      pNode->clusterId = mndGetClusterId(pMnode);
3,970✔
1088
      pNode->nodePort = pObj->pDnode->port;
3,970✔
1089
      pNode->nodeRole = pObj->role;
3,970✔
1090
      tstrncpy(pNode->nodeFqdn, pObj->pDnode->fqdn, TSDB_FQDN_LEN);
3,970✔
1091
      code = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
3,970✔
1092
      if (code != 0) {
3,970!
1093
        mError("mnode:%d, failed to update dnode info since %s", pObj->id, terrstr());
×
1094
      }
1095
      mInfo("vgId:1, ep:%s:%u dnode:%d", pNode->nodeFqdn, pNode->nodePort, pNode->nodeId);
3,970!
1096
      if (pObj->pDnode->id == pMnode->selfDnodeId) {
3,970✔
1097
        cfg.myIndex = cfg.totalReplicaNum;
2,405✔
1098
      }
1099
      if (pNode->nodeRole == TAOS_SYNC_ROLE_VOTER) {
3,970✔
1100
        cfg.replicaNum++;
3,688✔
1101
      }
1102
      cfg.totalReplicaNum++;
3,970✔
1103
      if (pObj->lastIndex > cfg.lastIndex) {
3,970✔
1104
        cfg.lastIndex = pObj->lastIndex;
1,135✔
1105
      }
1106
    }
1107

1108
    if (objStatus == SDB_STATUS_DROPPING) {
3,986✔
1109
      if (pObj->lastIndex > cfg.lastIndex) {
16!
1110
        cfg.lastIndex = pObj->lastIndex;
16✔
1111
      }
1112
    }
1113

1114
    mInfo("vgId:1, mnode:%d, role:%d, lastIndex:%" PRId64, pObj->id, pObj->role, pObj->lastIndex);
3,986!
1115

1116
    sdbReleaseLock(pSdb, pObj, false);
3,986✔
1117
  }
1118

1119
  // if (readyMnodes <= 0 || updatingMnodes <= 0) {
1120
  //   mInfo("vgId:1, mnode sync not reconfig since readyMnodes:%d updatingMnodes:%d", readyMnodes, updatingMnodes);
1121
  //   return;
1122
  // }
1123

1124
  if (cfg.myIndex == -1) {
2,811✔
1125
#if 1
1126
    mInfo("vgId:1, mnode sync not reconfig since selfIndex is -1");
406!
1127
#else
1128
    // cannot reconfig because the leader may fail to elect after reboot
1129
    mInfo("vgId:1, mnode sync not reconfig since selfIndex is -1, do sync stop oper");
1130
    syncStop(pMnode->syncMgmt.sync);
1131
#endif
1132
    return;
406✔
1133
  }
1134

1135
  if (pMnode->syncMgmt.sync > 0) {
2,405✔
1136
    mInfo("vgId:1, mnode sync reconfig, totalReplica:%d replica:%d myIndex:%d", cfg.totalReplicaNum, cfg.replicaNum,
1,991!
1137
          cfg.myIndex);
1138

1139
    for (int32_t i = 0; i < cfg.totalReplicaNum; ++i) {
4,750✔
1140
      SNodeInfo *pNode = &cfg.nodeInfo[i];
2,759✔
1141
      mInfo("vgId:1, index:%d, ep:%s:%u dnode:%d cluster:%" PRId64 " role:%d", i, pNode->nodeFqdn, pNode->nodePort,
2,759!
1142
            pNode->nodeId, pNode->clusterId, pNode->nodeRole);
1143
    }
1144

1145
    int32_t code = syncReconfig(pMnode->syncMgmt.sync, &cfg);
1,991✔
1146
    if (code != 0) {
1,991!
1147
      mError("vgId:1, mnode sync reconfig failed since %s", terrstr());
×
1148
    } else {
1149
      mInfo("vgId:1, mnode sync reconfig success");
1,991!
1150
    }
1151
  }
1152
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc