• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3562

20 Dec 2024 09:57AM UTC coverage: 26.655% (-32.2%) from 58.812%
#3562

push

travis-ci

web-flow
Merge pull request #29229 from taosdata/enh/TS-5749-3.0

enh: seperate tsdb async tasks to different thread pools

21498 of 109421 branches covered (19.65%)

Branch coverage included in aggregate %.

66 of 96 new or added lines in 7 files covered. (68.75%)

39441 existing lines in 157 files now uncovered.

35007 of 102566 relevant lines covered (34.13%)

53922.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

20.67
/source/dnode/mnode/impl/src/mndMnode.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "mndCluster.h"
19
#include "mndDnode.h"
20
#include "mndMnode.h"
21
#include "mndPrivilege.h"
22
#include "mndShow.h"
23
#include "mndSync.h"
24
#include "mndTrans.h"
25
#include "tmisce.h"
26

27
#define MNODE_VER_NUMBER   2
28
#define MNODE_RESERVE_SIZE 64
29

30
static int32_t  mndCreateDefaultMnode(SMnode *pMnode);
31
static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pObj);
32
static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw);
33
static int32_t  mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj);
34
static int32_t  mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj);
35
static int32_t  mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOld, SMnodeObj *pNew);
36
static int32_t  mndProcessCreateMnodeReq(SRpcMsg *pReq);
37
static int32_t  mndProcessAlterMnodeReq(SRpcMsg *pReq);
38
static int32_t  mndProcessDropMnodeReq(SRpcMsg *pReq);
39
static int32_t  mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
40
static void     mndCancelGetNextMnode(SMnode *pMnode, void *pIter);
41
static void     mndReloadSyncConfig(SMnode *pMnode);
42

43
int32_t mndInitMnode(SMnode *pMnode) {
13✔
44
  SSdbTable table = {
13✔
45
      .sdbType = SDB_MNODE,
46
      .keyType = SDB_KEY_INT32,
47
      .deployFp = (SdbDeployFp)mndCreateDefaultMnode,
48
      .encodeFp = (SdbEncodeFp)mndMnodeActionEncode,
49
      .decodeFp = (SdbDecodeFp)mndMnodeActionDecode,
50
      .insertFp = (SdbInsertFp)mndMnodeActionInsert,
51
      .updateFp = (SdbUpdateFp)mndMnodeActionUpdate,
52
      .deleteFp = (SdbDeleteFp)mndMnodeActionDelete,
53
  };
54

55
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_MNODE, mndProcessCreateMnodeReq);
13✔
56
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_MNODE_RSP, mndTransProcessRsp);
13✔
57
  mndSetMsgHandle(pMnode, TDMT_DND_ALTER_MNODE_TYPE_RSP, mndTransProcessRsp);
13✔
58
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_MNODE, mndProcessAlterMnodeReq);
13✔
59
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_MNODE_RSP, mndTransProcessRsp);
13✔
60
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_MNODE, mndProcessDropMnodeReq);
13✔
61
  mndSetMsgHandle(pMnode, TDMT_DND_DROP_MNODE_RSP, mndTransProcessRsp);
13✔
62

63
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndRetrieveMnodes);
13✔
64
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndCancelGetNextMnode);
13✔
65

66
  return sdbSetTable(pMnode->pSdb, table);
13✔
67
}
68

69
void mndCleanupMnode(SMnode *pMnode) {}
13✔
70

71
SMnodeObj *mndAcquireMnode(SMnode *pMnode, int32_t mnodeId) {
6,831✔
72
  terrno = 0;
6,831✔
73
  SMnodeObj *pObj = sdbAcquire(pMnode->pSdb, SDB_MNODE, &mnodeId);
6,831✔
74
  if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
6,831!
75
    terrno = TSDB_CODE_MND_MNODE_NOT_EXIST;
5,206✔
76
  }
77
  return pObj;
6,831✔
78
}
79

80
void mndReleaseMnode(SMnode *pMnode, SMnodeObj *pObj) {
1,629✔
81
  SSdb *pSdb = pMnode->pSdb;
1,629✔
82
  sdbRelease(pMnode->pSdb, pObj);
1,629✔
83
}
1,629✔
84

85
static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
12✔
86
  int32_t   code = 0;
12✔
87
  SMnodeObj mnodeObj = {0};
12✔
88
  mnodeObj.id = 1;
12✔
89
  mnodeObj.createdTime = taosGetTimestampMs();
12✔
90
  mnodeObj.updateTime = mnodeObj.createdTime;
12✔
91

92
  SSdbRaw *pRaw = mndMnodeActionEncode(&mnodeObj);
12✔
93
  if (pRaw == NULL) {
12!
94
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
95
    if (terrno != 0) code = terrno;
×
96
    return -1;
×
97
  }
98
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRaw, SDB_STATUS_READY));
12!
99

100
  mInfo("mnode:%d, will be created when deploying, raw:%p", mnodeObj.id, pRaw);
12!
101

102
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL, "create-mnode");
12✔
103
  if (pTrans == NULL) {
12!
104
    sdbFreeRaw(pRaw);
×
105
    mError("mnode:%d, failed to create since %s", mnodeObj.id, terrstr());
×
106
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
107
    if (terrno != 0) code = terrno;
×
108
    return -1;
×
109
  }
110
  mInfo("trans:%d, used to create mnode:%d", pTrans->id, mnodeObj.id);
12!
111

112
  if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
12!
113
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
×
114
    mndTransDrop(pTrans);
×
115
    TAOS_RETURN(code);
×
116
  }
117
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRaw, SDB_STATUS_READY));
12!
118

119
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
12!
120
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
×
121
    mndTransDrop(pTrans);
×
122
    return -1;
×
123
  }
124

125
  mndTransDrop(pTrans);
12✔
126
  TAOS_RETURN(code);
12✔
127
}
128

129
static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pObj) {
36✔
130
  int32_t code = 0;
36✔
131
  int32_t lino = 0;
36✔
132
  terrno = TSDB_CODE_OUT_OF_MEMORY;
36✔
133

134
  SSdbRaw *pRaw = sdbAllocRaw(SDB_MNODE, MNODE_VER_NUMBER, sizeof(SMnodeObj) + MNODE_RESERVE_SIZE);
36✔
135
  if (pRaw == NULL) goto _OVER;
36!
136

137
  int32_t dataPos = 0;
36✔
138
  SDB_SET_INT32(pRaw, dataPos, pObj->id, _OVER)
36!
139
  SDB_SET_INT64(pRaw, dataPos, pObj->createdTime, _OVER)
36!
140
  SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, _OVER)
36!
141
  SDB_SET_INT32(pRaw, dataPos, pObj->role, _OVER)
36!
142
  SDB_SET_INT64(pRaw, dataPos, pObj->lastIndex, _OVER)
36!
143
  SDB_SET_RESERVE(pRaw, dataPos, MNODE_RESERVE_SIZE, _OVER)
36!
144

145
  terrno = 0;
36✔
146

147
_OVER:
36✔
148
  if (terrno != 0) {
36!
149
    mError("mnode:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr());
×
150
    sdbFreeRaw(pRaw);
×
151
    return NULL;
×
152
  }
153

154
  mTrace("mnode:%d, encode to raw:%p, row:%p", pObj->id, pRaw, pObj);
36✔
155
  return pRaw;
36✔
156
}
157

158
static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw) {
13✔
159
  int32_t code = 0;
13✔
160
  int32_t lino = 0;
13✔
161
  terrno = TSDB_CODE_OUT_OF_MEMORY;
13✔
162
  SSdbRow   *pRow = NULL;
13✔
163
  SMnodeObj *pObj = NULL;
13✔
164

165
  int8_t sver = 0;
13✔
166
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
13!
167

168
  if (sver != 1 && sver != 2) {
13!
169
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
170
    goto _OVER;
×
171
  }
172

173
  pRow = sdbAllocRow(sizeof(SMnodeObj));
13✔
174
  if (pRow == NULL) goto _OVER;
13!
175

176
  pObj = sdbGetRowObj(pRow);
13✔
177
  if (pObj == NULL) goto _OVER;
13!
178

179
  int32_t dataPos = 0;
13✔
180
  SDB_GET_INT32(pRaw, dataPos, &pObj->id, _OVER)
13!
181
  SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, _OVER)
13!
182
  SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, _OVER)
13!
183
  if (sver >= 2) {
13!
184
    SDB_GET_INT32(pRaw, dataPos, &pObj->role, _OVER)
13!
185
    SDB_GET_INT64(pRaw, dataPos, &pObj->lastIndex, _OVER)
13!
186
  }
187
  SDB_GET_RESERVE(pRaw, dataPos, MNODE_RESERVE_SIZE, _OVER)
13!
188

189
  terrno = 0;
13✔
190

191
_OVER:
13✔
192
  if (terrno != 0) {
13!
193
    mError("mnode:%d, failed to decode from raw:%p since %s", pObj == NULL ? 0 : pObj->id, pRaw, terrstr());
×
194
    taosMemoryFreeClear(pRow);
×
195
    return NULL;
×
196
  }
197

198
  mTrace("mnode:%d, decode from raw:%p, row:%p", pObj->id, pRaw, pObj);
13✔
199
  return pRow;
13✔
200
}
201

202
static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj) {
13✔
203
  int32_t code = 0;
13✔
204
  mTrace("mnode:%d, perform insert action, row:%p", pObj->id, pObj);
13✔
205
  pObj->pDnode = sdbAcquireNotReadyObj(pSdb, SDB_DNODE, &pObj->id);
13✔
206
  if (pObj->pDnode == NULL) {
13!
207
    mError("mnode:%d, failed to perform insert action since %s", pObj->id, terrstr());
×
208
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
209
    if (terrno != 0) code = terrno;
×
210
    int32_t code = 0;
×
211
  }
212

213
  pObj->syncState = TAOS_SYNC_STATE_OFFLINE;
13✔
214
  mndReloadSyncConfig(pSdb->pMnode);
13✔
215
  TAOS_RETURN(code);
13✔
216
}
217

218
static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj) {
13✔
219
  mTrace("mnode:%d, perform delete action, row:%p", pObj->id, pObj);
13✔
220
  if (pObj->pDnode != NULL) {
13!
221
    sdbRelease(pSdb, pObj->pDnode);
13✔
222
    pObj->pDnode = NULL;
13✔
223
  }
224

225
  return 0;
13✔
226
}
227

UNCOV
228
static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOld, SMnodeObj *pNew) {
×
UNCOV
229
  mTrace("mnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
×
UNCOV
230
  pOld->role = pNew->role;
×
UNCOV
231
  pOld->updateTime = pNew->updateTime;
×
UNCOV
232
  pOld->lastIndex = pNew->lastIndex;
×
UNCOV
233
  mndReloadSyncConfig(pSdb->pMnode);
×
234

UNCOV
235
  return 0;
×
236
}
237

238
bool mndIsMnode(SMnode *pMnode, int32_t dnodeId) {
40,031✔
239
  SSdb *pSdb = pMnode->pSdb;
40,031✔
240

241
  SMnodeObj *pObj = sdbAcquire(pSdb, SDB_MNODE, &dnodeId);
40,031✔
242
  if (pObj == NULL) {
40,031✔
243
    return false;
33,588✔
244
  }
245

246
  sdbRelease(pSdb, pObj);
6,443✔
247
  return true;
6,443✔
248
}
249

250
void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {
1,439✔
251
  if (pMnode == NULL || pEpSet == NULL) {
1,439!
252
    return;
×
253
  }
254

255
  SSdb   *pSdb = pMnode->pSdb;
1,439✔
256
  int32_t totalMnodes = sdbGetSize(pSdb, SDB_MNODE);
1,439✔
257
  if (totalMnodes == 0) {
1,439!
UNCOV
258
    syncGetRetryEpSet(pMnode->syncMgmt.sync, pEpSet);
×
UNCOV
259
    return;
×
260
  }
261

262
  void *pIter = NULL;
1,439✔
263
  while (1) {
1,439✔
264
    SMnodeObj *pObj = NULL;
2,878✔
265
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
2,878✔
266
    if (pIter == NULL) break;
2,878✔
267

268
    if (pObj->id == pMnode->selfDnodeId) {
1,439!
269
      if (mndIsLeader(pMnode)) {
1,439!
270
        pEpSet->inUse = pEpSet->numOfEps;
1,439✔
271
      } else {
UNCOV
272
        pEpSet->inUse = (pEpSet->numOfEps + 1) % totalMnodes;
×
273
        // pEpSet->inUse = 0;
274
      }
275
    }
276
    if (pObj->pDnode != NULL) {
1,439!
277
      if (addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port) != 0) {
1,439!
278
        mError("mnode:%d, failed to add ep:%s:%d into epset", pObj->id, pObj->pDnode->fqdn, pObj->pDnode->port);
×
279
      }
280
      sdbRelease(pSdb, pObj);
1,439✔
281
    }
282

283
    if (pEpSet->numOfEps == 0) {
1,439!
284
      syncGetRetryEpSet(pMnode->syncMgmt.sync, pEpSet);
×
285
    }
286

287
    if (pEpSet->inUse >= pEpSet->numOfEps) {
1,439!
UNCOV
288
      pEpSet->inUse = 0;
×
289
    }
290
    epsetSort(pEpSet);
1,439✔
291
  }
292
}
293

UNCOV
294
static int32_t mndSetCreateMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
×
UNCOV
295
  int32_t  code = 0;
×
UNCOV
296
  SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj);
×
UNCOV
297
  if (pRedoRaw == NULL) {
×
298
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
299
    if (terrno != 0) code = terrno;
×
300
    TAOS_RETURN(code);
×
301
  }
UNCOV
302
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
UNCOV
303
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
×
UNCOV
304
  TAOS_RETURN(code);
×
305
}
306

307
int32_t mndSetRestoreCreateMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
×
308
  int32_t  code = 0;
×
309
  SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj);
×
310
  if (pRedoRaw == NULL) {
×
311
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
312
    if (terrno != 0) code = terrno;
×
313
    TAOS_RETURN(code);
×
314
  }
315
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
316
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY));
×
317
  TAOS_RETURN(code);
×
318
}
319

320
static int32_t mndSetCreateMnodeUndoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
×
321
  int32_t  code = 0;
×
322
  SSdbRaw *pUndoRaw = mndMnodeActionEncode(pObj);
×
323
  if (pUndoRaw == NULL) {
×
324
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
325
    if (terrno != 0) code = terrno;
×
326
    TAOS_RETURN(code);
×
327
  }
328
  TAOS_CHECK_RETURN(mndTransAppendUndolog(pTrans, pUndoRaw));
×
329
  TAOS_CHECK_RETURN(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED));
×
330
  TAOS_RETURN(code);
×
331
}
332

UNCOV
333
int32_t mndSetCreateMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
×
UNCOV
334
  int32_t  code = 0;
×
UNCOV
335
  SSdbRaw *pCommitRaw = mndMnodeActionEncode(pObj);
×
UNCOV
336
  if (pCommitRaw == NULL) {
×
337
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
338
    if (terrno != 0) code = terrno;
×
339
    TAOS_RETURN(code);
×
340
  }
UNCOV
341
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
×
UNCOV
342
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
×
UNCOV
343
  TAOS_RETURN(code);
×
344
}
345

UNCOV
346
static int32_t mndBuildCreateMnodeRedoAction(STrans *pTrans, SDCreateMnodeReq *pCreateReq, SEpSet *pCreateEpSet) {
×
UNCOV
347
  int32_t code = 0;
×
UNCOV
348
  int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pCreateReq);
×
UNCOV
349
  void   *pReq = taosMemoryMalloc(contLen);
×
UNCOV
350
  if (pReq == NULL) {
×
351
    code = terrno;
×
352
    return code;
×
353
  }
UNCOV
354
  code = tSerializeSDCreateMnodeReq(pReq, contLen, pCreateReq);
×
UNCOV
355
  if (code < 0) {
×
356
    taosMemoryFree(pReq);
×
357
    TAOS_RETURN(code);
×
358
  }
359

UNCOV
360
  STransAction action = {
×
361
      .epSet = *pCreateEpSet,
362
      .pCont = pReq,
363
      .contLen = contLen,
364
      .msgType = TDMT_DND_CREATE_MNODE,
365
      .acceptableCode = TSDB_CODE_MNODE_ALREADY_DEPLOYED,
366
  };
367

UNCOV
368
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
369
    taosMemoryFree(pReq);
×
370
    TAOS_RETURN(code);
×
371
  }
UNCOV
372
  TAOS_RETURN(code);
×
373
}
374

UNCOV
375
static int32_t mndBuildAlterMnodeTypeRedoAction(STrans *pTrans, SDAlterMnodeTypeReq *pAlterMnodeTypeReq,
×
376
                                                SEpSet *pAlterMnodeTypeEpSet) {
UNCOV
377
  int32_t code = 0;
×
UNCOV
378
  int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pAlterMnodeTypeReq);
×
UNCOV
379
  void   *pReq = taosMemoryMalloc(contLen);
×
UNCOV
380
  if (pReq == NULL) {
×
381
    code = terrno;
×
382
    return code;
×
383
  }
UNCOV
384
  code = tSerializeSDCreateMnodeReq(pReq, contLen, pAlterMnodeTypeReq);
×
UNCOV
385
  if (code < 0) {
×
386
    taosMemoryFree(pReq);
×
387
    TAOS_RETURN(code);
×
388
  }
389

UNCOV
390
  STransAction action = {
×
391
      .epSet = *pAlterMnodeTypeEpSet,
392
      .pCont = pReq,
393
      .contLen = contLen,
394
      .msgType = TDMT_DND_ALTER_MNODE_TYPE,
395
      .retryCode = TSDB_CODE_MNODE_NOT_CATCH_UP,
396
      .acceptableCode = TSDB_CODE_MNODE_ALREADY_IS_VOTER,
397
  };
398

UNCOV
399
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
400
    taosMemoryFree(pReq);
×
401
    TAOS_RETURN(code);
×
402
  }
UNCOV
403
  TAOS_RETURN(code);
×
404
}
405

406
static int32_t mndBuildAlterMnodeRedoAction(STrans *pTrans, SDCreateMnodeReq *pAlterReq, SEpSet *pAlterEpSet) {
×
407
  int32_t code = 0;
×
408
  int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pAlterReq);
×
409
  void   *pReq = taosMemoryMalloc(contLen);
×
410
  if (pReq == NULL) {
×
411
    code = terrno;
×
412
    return code;
×
413
  }
414
  code = tSerializeSDCreateMnodeReq(pReq, contLen, pAlterReq);
×
415
  if (code < 0) {
×
416
    taosMemoryFree(pReq);
×
417
    TAOS_RETURN(code);
×
418
  }
419
  STransAction action = {
×
420
      .epSet = *pAlterEpSet,
421
      .pCont = pReq,
422
      .contLen = contLen,
423
      .msgType = TDMT_MND_ALTER_MNODE,
424
      .acceptableCode = 0,
425
  };
426

427
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
428
    taosMemoryFree(pReq);
×
429
    TAOS_RETURN(code);
×
430
  }
431

432
  TAOS_RETURN(code);
×
433
}
434

UNCOV
435
static int32_t mndBuildDropMnodeRedoAction(STrans *pTrans, SDDropMnodeReq *pDropReq, SEpSet *pDroprEpSet) {
×
UNCOV
436
  int32_t code = 0;
×
UNCOV
437
  int32_t contLen = tSerializeSCreateDropMQSNodeReq(NULL, 0, pDropReq);
×
UNCOV
438
  void   *pReq = taosMemoryMalloc(contLen);
×
UNCOV
439
  if (pReq == NULL) {
×
440
    code = terrno;
×
441
    return code;
×
442
  }
UNCOV
443
  code = tSerializeSCreateDropMQSNodeReq(pReq, contLen, pDropReq);
×
UNCOV
444
  if (code < 0) {
×
445
    taosMemoryFree(pReq);
×
446
    TAOS_RETURN(code);
×
447
  }
448

UNCOV
449
  STransAction action = {
×
450
      .epSet = *pDroprEpSet,
451
      .pCont = pReq,
452
      .contLen = contLen,
453
      .msgType = TDMT_DND_DROP_MNODE,
454
      .acceptableCode = TSDB_CODE_MNODE_NOT_DEPLOYED,
455
  };
456

UNCOV
457
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
458
    taosMemoryFree(pReq);
×
459
    TAOS_RETURN(code);
×
460
  }
UNCOV
461
  TAOS_RETURN(code);
×
462
}
463

UNCOV
464
static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
×
UNCOV
465
  SSdb            *pSdb = pMnode->pSdb;
×
UNCOV
466
  void            *pIter = NULL;
×
UNCOV
467
  int32_t          numOfReplicas = 0;
×
UNCOV
468
  int32_t          numOfLearnerReplicas = 0;
×
UNCOV
469
  SDCreateMnodeReq createReq = {0};
×
UNCOV
470
  SEpSet           createEpset = {0};
×
471

UNCOV
472
  while (1) {
×
UNCOV
473
    SMnodeObj *pMObj = NULL;
×
UNCOV
474
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
×
UNCOV
475
    if (pIter == NULL) break;
×
476

UNCOV
477
    if (pMObj->role == TAOS_SYNC_ROLE_VOTER) {
×
UNCOV
478
      createReq.replicas[numOfReplicas].id = pMObj->id;
×
UNCOV
479
      createReq.replicas[numOfReplicas].port = pMObj->pDnode->port;
×
UNCOV
480
      memcpy(createReq.replicas[numOfReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
×
UNCOV
481
      numOfReplicas++;
×
482
    } else {
483
      createReq.learnerReplicas[numOfLearnerReplicas].id = pMObj->id;
×
484
      createReq.learnerReplicas[numOfLearnerReplicas].port = pMObj->pDnode->port;
×
485
      memcpy(createReq.learnerReplicas[numOfLearnerReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
×
486
      numOfLearnerReplicas++;
×
487
    }
488

UNCOV
489
    sdbRelease(pSdb, pMObj);
×
490
  }
491

UNCOV
492
  createReq.replica = numOfReplicas;
×
UNCOV
493
  createReq.learnerReplica = numOfLearnerReplicas + 1;
×
UNCOV
494
  createReq.learnerReplicas[numOfLearnerReplicas].id = pDnode->id;
×
UNCOV
495
  createReq.learnerReplicas[numOfLearnerReplicas].port = pDnode->port;
×
UNCOV
496
  memcpy(createReq.learnerReplicas[numOfLearnerReplicas].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
×
497

UNCOV
498
  createReq.lastIndex = pObj->lastIndex;
×
499

UNCOV
500
  createEpset.inUse = 0;
×
UNCOV
501
  createEpset.numOfEps = 1;
×
UNCOV
502
  createEpset.eps[0].port = pDnode->port;
×
UNCOV
503
  memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
×
504

UNCOV
505
  TAOS_CHECK_RETURN(mndBuildCreateMnodeRedoAction(pTrans, &createReq, &createEpset));
×
506

UNCOV
507
  TAOS_RETURN(0);
×
508
}
509

UNCOV
510
int32_t mndSetRestoreCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
×
UNCOV
511
  SSdb            *pSdb = pMnode->pSdb;
×
UNCOV
512
  void            *pIter = NULL;
×
UNCOV
513
  SDCreateMnodeReq createReq = {0};
×
UNCOV
514
  SEpSet           createEpset = {0};
×
515

UNCOV
516
  while (1) {
×
UNCOV
517
    SMnodeObj *pMObj = NULL;
×
UNCOV
518
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
×
UNCOV
519
    if (pIter == NULL) break;
×
520

UNCOV
521
    if (pMObj->id == pDnode->id) {
×
UNCOV
522
      sdbRelease(pSdb, pMObj);
×
UNCOV
523
      continue;
×
524
    }
525

UNCOV
526
    if (pMObj->role == TAOS_SYNC_ROLE_VOTER) {
×
UNCOV
527
      createReq.replicas[createReq.replica].id = pMObj->id;
×
UNCOV
528
      createReq.replicas[createReq.replica].port = pMObj->pDnode->port;
×
UNCOV
529
      memcpy(createReq.replicas[createReq.replica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
×
UNCOV
530
      createReq.replica++;
×
531
    } else {
532
      createReq.learnerReplicas[createReq.learnerReplica].id = pMObj->id;
×
533
      createReq.learnerReplicas[createReq.learnerReplica].port = pMObj->pDnode->port;
×
534
      memcpy(createReq.learnerReplicas[createReq.learnerReplica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
×
535
      createReq.learnerReplica++;
×
536
    }
537

UNCOV
538
    sdbRelease(pSdb, pMObj);
×
539
  }
540

UNCOV
541
  createReq.learnerReplicas[createReq.learnerReplica].id = pDnode->id;
×
UNCOV
542
  createReq.learnerReplicas[createReq.learnerReplica].port = pDnode->port;
×
UNCOV
543
  memcpy(createReq.learnerReplicas[createReq.learnerReplica].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
×
UNCOV
544
  createReq.learnerReplica++;
×
545

UNCOV
546
  createReq.lastIndex = pObj->lastIndex;
×
547

UNCOV
548
  createEpset.inUse = 0;
×
UNCOV
549
  createEpset.numOfEps = 1;
×
UNCOV
550
  createEpset.eps[0].port = pDnode->port;
×
UNCOV
551
  memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
×
552

UNCOV
553
  TAOS_CHECK_RETURN(mndBuildCreateMnodeRedoAction(pTrans, &createReq, &createEpset));
×
554

UNCOV
555
  TAOS_RETURN(0);
×
556
}
557

UNCOV
558
static int32_t mndSetAlterMnodeTypeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
×
UNCOV
559
  SSdb               *pSdb = pMnode->pSdb;
×
UNCOV
560
  void               *pIter = NULL;
×
UNCOV
561
  SDAlterMnodeTypeReq alterReq = {0};
×
UNCOV
562
  SEpSet              createEpset = {0};
×
563

UNCOV
564
  while (1) {
×
UNCOV
565
    SMnodeObj *pMObj = NULL;
×
UNCOV
566
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
×
UNCOV
567
    if (pIter == NULL) break;
×
568

UNCOV
569
    if (pMObj->role == TAOS_SYNC_ROLE_VOTER) {
×
UNCOV
570
      alterReq.replicas[alterReq.replica].id = pMObj->id;
×
UNCOV
571
      alterReq.replicas[alterReq.replica].port = pMObj->pDnode->port;
×
UNCOV
572
      memcpy(alterReq.replicas[alterReq.replica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
×
UNCOV
573
      alterReq.replica++;
×
574
    } else {
575
      alterReq.learnerReplicas[alterReq.learnerReplica].id = pMObj->id;
×
576
      alterReq.learnerReplicas[alterReq.learnerReplica].port = pMObj->pDnode->port;
×
577
      memcpy(alterReq.learnerReplicas[alterReq.learnerReplica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
×
578
      alterReq.learnerReplica++;
×
579
    }
580

UNCOV
581
    sdbRelease(pSdb, pMObj);
×
582
  }
583

UNCOV
584
  alterReq.replicas[alterReq.replica].id = pDnode->id;
×
UNCOV
585
  alterReq.replicas[alterReq.replica].port = pDnode->port;
×
UNCOV
586
  memcpy(alterReq.replicas[alterReq.replica].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
×
UNCOV
587
  alterReq.replica++;
×
588

UNCOV
589
  alterReq.lastIndex = pObj->lastIndex;
×
590

UNCOV
591
  createEpset.inUse = 0;
×
UNCOV
592
  createEpset.numOfEps = 1;
×
UNCOV
593
  createEpset.eps[0].port = pDnode->port;
×
UNCOV
594
  memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
×
595

UNCOV
596
  TAOS_CHECK_RETURN(mndBuildAlterMnodeTypeRedoAction(pTrans, &alterReq, &createEpset));
×
597

UNCOV
598
  TAOS_RETURN(0);
×
599
}
600

UNCOV
601
int32_t mndSetRestoreAlterMnodeTypeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
×
UNCOV
602
  SSdb               *pSdb = pMnode->pSdb;
×
UNCOV
603
  void               *pIter = NULL;
×
UNCOV
604
  SDAlterMnodeTypeReq alterReq = {0};
×
UNCOV
605
  SEpSet              createEpset = {0};
×
606

UNCOV
607
  while (1) {
×
UNCOV
608
    SMnodeObj *pMObj = NULL;
×
UNCOV
609
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
×
UNCOV
610
    if (pIter == NULL) break;
×
611

UNCOV
612
    if (pMObj->id == pDnode->id) {
×
UNCOV
613
      sdbRelease(pSdb, pMObj);
×
UNCOV
614
      continue;
×
615
    }
616

UNCOV
617
    if (pMObj->role == TAOS_SYNC_ROLE_VOTER) {
×
UNCOV
618
      alterReq.replicas[alterReq.replica].id = pMObj->id;
×
UNCOV
619
      alterReq.replicas[alterReq.replica].port = pMObj->pDnode->port;
×
UNCOV
620
      memcpy(alterReq.replicas[alterReq.replica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
×
UNCOV
621
      alterReq.replica++;
×
622
    } else {
623
      alterReq.learnerReplicas[alterReq.learnerReplica].id = pMObj->id;
×
624
      alterReq.learnerReplicas[alterReq.learnerReplica].port = pMObj->pDnode->port;
×
625
      memcpy(alterReq.learnerReplicas[alterReq.learnerReplica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
×
626
      alterReq.learnerReplica++;
×
627
    }
628

UNCOV
629
    sdbRelease(pSdb, pMObj);
×
630
  }
631

UNCOV
632
  alterReq.replicas[alterReq.replica].id = pDnode->id;
×
UNCOV
633
  alterReq.replicas[alterReq.replica].port = pDnode->port;
×
UNCOV
634
  memcpy(alterReq.replicas[alterReq.replica].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
×
UNCOV
635
  alterReq.replica++;
×
636

UNCOV
637
  alterReq.lastIndex = pObj->lastIndex;
×
638

UNCOV
639
  createEpset.inUse = 0;
×
UNCOV
640
  createEpset.numOfEps = 1;
×
UNCOV
641
  createEpset.eps[0].port = pDnode->port;
×
UNCOV
642
  memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
×
643

UNCOV
644
  TAOS_CHECK_RETURN(mndBuildAlterMnodeTypeRedoAction(pTrans, &alterReq, &createEpset));
×
645

UNCOV
646
  TAOS_RETURN(0);
×
647
}
648

UNCOV
649
static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMCreateMnodeReq *pCreate) {
×
UNCOV
650
  int32_t code = -1;
×
651

UNCOV
652
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "create-mnode");
×
UNCOV
653
  if (pTrans == NULL) {
×
654
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
655
    if (terrno != 0) code = terrno;
×
656
    goto _OVER;
×
657
  }
UNCOV
658
  mndTransSetSerial(pTrans);
×
UNCOV
659
  mInfo("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId);
×
UNCOV
660
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
×
661

UNCOV
662
  SMnodeObj mnodeObj = {0};
×
UNCOV
663
  mnodeObj.id = pDnode->id;
×
UNCOV
664
  mnodeObj.createdTime = taosGetTimestampMs();
×
UNCOV
665
  mnodeObj.updateTime = mnodeObj.createdTime;
×
UNCOV
666
  mnodeObj.role = TAOS_SYNC_ROLE_LEARNER;
×
UNCOV
667
  mnodeObj.lastIndex = pMnode->applied;
×
668

UNCOV
669
  TAOS_CHECK_GOTO(mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj), NULL, _OVER);
×
UNCOV
670
  TAOS_CHECK_GOTO(mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj), NULL, _OVER);
×
671

UNCOV
672
  SMnodeObj mnodeLeaderObj = {0};
×
UNCOV
673
  mnodeLeaderObj.id = pDnode->id;
×
UNCOV
674
  mnodeLeaderObj.createdTime = taosGetTimestampMs();
×
UNCOV
675
  mnodeLeaderObj.updateTime = mnodeLeaderObj.createdTime;
×
UNCOV
676
  mnodeLeaderObj.role = TAOS_SYNC_ROLE_VOTER;
×
UNCOV
677
  mnodeLeaderObj.lastIndex = pMnode->applied + 1;
×
678

UNCOV
679
  TAOS_CHECK_GOTO(mndSetAlterMnodeTypeRedoActions(pMnode, pTrans, pDnode, &mnodeLeaderObj), NULL, _OVER);
×
UNCOV
680
  TAOS_CHECK_GOTO(mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeLeaderObj), NULL, _OVER);
×
UNCOV
681
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
682

UNCOV
683
  code = 0;
×
684

UNCOV
685
_OVER:
×
UNCOV
686
  mndTransDrop(pTrans);
×
UNCOV
687
  TAOS_RETURN(code);
×
688
}
689

UNCOV
690
static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) {
×
UNCOV
691
  SMnode          *pMnode = pReq->info.node;
×
UNCOV
692
  int32_t          code = -1;
×
UNCOV
693
  SMnodeObj       *pObj = NULL;
×
UNCOV
694
  SDnodeObj       *pDnode = NULL;
×
UNCOV
695
  SMCreateMnodeReq createReq = {0};
×
696

UNCOV
697
  TAOS_CHECK_GOTO(tDeserializeSCreateDropMQSNodeReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);
×
698

UNCOV
699
  mInfo("mnode:%d, start to create", createReq.dnodeId);
×
UNCOV
700
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_MNODE), NULL, _OVER);
×
701

UNCOV
702
  pObj = mndAcquireMnode(pMnode, createReq.dnodeId);
×
UNCOV
703
  if (pObj != NULL) {
×
UNCOV
704
    code = TSDB_CODE_MND_MNODE_ALREADY_EXIST;
×
UNCOV
705
    goto _OVER;
×
UNCOV
706
  } else if (terrno != TSDB_CODE_MND_MNODE_NOT_EXIST) {
×
707
    goto _OVER;
×
708
  }
709

UNCOV
710
  pDnode = mndAcquireDnode(pMnode, createReq.dnodeId);
×
UNCOV
711
  if (pDnode == NULL) {
×
UNCOV
712
    code = TSDB_CODE_MND_DNODE_NOT_EXIST;
×
UNCOV
713
    goto _OVER;
×
714
  }
715

UNCOV
716
  if (sdbGetSize(pMnode->pSdb, SDB_MNODE) >= 3) {
×
UNCOV
717
    code = TSDB_CODE_MND_TOO_MANY_MNODES;
×
UNCOV
718
    goto _OVER;
×
719
  }
720

UNCOV
721
  if (!mndIsDnodeOnline(pDnode, taosGetTimestampMs())) {
×
UNCOV
722
    code = TSDB_CODE_DNODE_OFFLINE;
×
UNCOV
723
    goto _OVER;
×
724
  }
725

UNCOV
726
  code = mndCreateMnode(pMnode, pReq, pDnode, &createReq);
×
UNCOV
727
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
728

UNCOV
729
  char    obj[40] = {0};
×
UNCOV
730
  int32_t bytes = snprintf(obj, sizeof(obj), "%d", createReq.dnodeId);
×
UNCOV
731
  if ((uint32_t)bytes < sizeof(obj)) {
×
UNCOV
732
    auditRecord(pReq, pMnode->clusterId, "createMnode", "", obj, createReq.sql, createReq.sqlLen);
×
733
  } else {
734
    mError("mnode:%d, failed to audit create req since %s", createReq.dnodeId, tstrerror(TSDB_CODE_OUT_OF_RANGE));
×
735
  }
736

737
_OVER:
×
UNCOV
738
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
739
    mError("mnode:%d, failed to create since %s", createReq.dnodeId, terrstr());
×
740
  }
741

UNCOV
742
  mndReleaseMnode(pMnode, pObj);
×
UNCOV
743
  mndReleaseDnode(pMnode, pDnode);
×
UNCOV
744
  tFreeSMCreateQnodeReq(&createReq);
×
745

UNCOV
746
  TAOS_RETURN(code);
×
747
}
748

UNCOV
749
static int32_t mndSetDropMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
×
UNCOV
750
  int32_t  code = 0;
×
UNCOV
751
  SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj);
×
UNCOV
752
  if (pRedoRaw == NULL) {
×
753
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
754
    if (terrno != 0) code = terrno;
×
755
    TAOS_RETURN(code);
×
756
  }
UNCOV
757
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
UNCOV
758
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
×
UNCOV
759
  TAOS_RETURN(code);
×
760
}
761

UNCOV
762
static int32_t mndSetDropMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
×
UNCOV
763
  int32_t  code = 0;
×
UNCOV
764
  SSdbRaw *pCommitRaw = mndMnodeActionEncode(pObj);
×
UNCOV
765
  if (pCommitRaw == NULL) {
×
766
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
767
    if (terrno != 0) code = terrno;
×
768
    TAOS_RETURN(code);
×
769
  }
UNCOV
770
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
×
UNCOV
771
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
×
UNCOV
772
  TAOS_RETURN(code);
×
773
}
774

UNCOV
775
static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj,
×
776
                                          bool force) {
UNCOV
777
  int32_t        code = 0;
×
UNCOV
778
  SSdb          *pSdb = pMnode->pSdb;
×
UNCOV
779
  void          *pIter = NULL;
×
UNCOV
780
  SDDropMnodeReq dropReq = {0};
×
UNCOV
781
  SEpSet         dropEpSet = {0};
×
782

UNCOV
783
  dropReq.dnodeId = pDnode->id;
×
UNCOV
784
  dropEpSet.numOfEps = 1;
×
UNCOV
785
  dropEpSet.eps[0].port = pDnode->port;
×
UNCOV
786
  memcpy(dropEpSet.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
×
787

UNCOV
788
  int32_t totalMnodes = sdbGetSize(pSdb, SDB_MNODE);
×
UNCOV
789
  if (totalMnodes == 2) {
×
UNCOV
790
    if (force) {
×
791
      mError("cant't force drop dnode, since a mnode on it and replica is 2");
×
792
      code = TSDB_CODE_MNODE_ONLY_TWO_MNODE;
×
793
      TAOS_RETURN(code);
×
794
    }
UNCOV
795
    mInfo("vgId:1, has %d mnodes, exec redo log first", totalMnodes);
×
UNCOV
796
    TAOS_CHECK_RETURN(mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj));
×
UNCOV
797
    if (!force) {
×
UNCOV
798
      TAOS_CHECK_RETURN(mndBuildDropMnodeRedoAction(pTrans, &dropReq, &dropEpSet));
×
799
    }
UNCOV
800
  } else if (totalMnodes == 3) {
×
UNCOV
801
    mInfo("vgId:1, has %d mnodes, exec redo action first", totalMnodes);
×
UNCOV
802
    if (!force) {
×
UNCOV
803
      TAOS_CHECK_RETURN(mndBuildDropMnodeRedoAction(pTrans, &dropReq, &dropEpSet));
×
804
    }
UNCOV
805
    TAOS_CHECK_RETURN(mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj));
×
806
  } else {
807
    TAOS_RETURN(-1);
×
808
  }
809

UNCOV
810
  TAOS_RETURN(code);
×
811
}
812

UNCOV
813
int32_t mndSetDropMnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj, bool force) {
×
UNCOV
814
  if (pObj == NULL) return 0;
×
UNCOV
815
  pObj->lastIndex = pMnode->applied;
×
UNCOV
816
  TAOS_CHECK_RETURN(mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj, force));
×
UNCOV
817
  TAOS_CHECK_RETURN(mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj));
×
UNCOV
818
  return 0;
×
819
}
820

UNCOV
821
static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) {
×
UNCOV
822
  int32_t code = -1;
×
UNCOV
823
  STrans *pTrans = NULL;
×
824

UNCOV
825
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "drop-mnode");
×
UNCOV
826
  if (pTrans == NULL) {
×
827
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
828
    if (terrno != 0) code = terrno;
×
829
    goto _OVER;
×
830
  }
UNCOV
831
  mndTransSetSerial(pTrans);
×
UNCOV
832
  mInfo("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id);
×
UNCOV
833
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
×
834

UNCOV
835
  TAOS_CHECK_GOTO(mndSetDropMnodeInfoToTrans(pMnode, pTrans, pObj, false), NULL, _OVER);
×
UNCOV
836
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
837

UNCOV
838
  code = 0;
×
839

UNCOV
840
_OVER:
×
UNCOV
841
  mndTransDrop(pTrans);
×
UNCOV
842
  TAOS_RETURN(code);
×
843
}
844

UNCOV
845
static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) {
×
UNCOV
846
  SMnode        *pMnode = pReq->info.node;
×
UNCOV
847
  int32_t        code = -1;
×
UNCOV
848
  SMnodeObj     *pObj = NULL;
×
UNCOV
849
  SMDropMnodeReq dropReq = {0};
×
850

UNCOV
851
  TAOS_CHECK_GOTO(tDeserializeSCreateDropMQSNodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
×
852

UNCOV
853
  mInfo("mnode:%d, start to drop", dropReq.dnodeId);
×
UNCOV
854
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE), NULL, _OVER);
×
855

UNCOV
856
  if (dropReq.dnodeId <= 0) {
×
857
    code = TSDB_CODE_INVALID_MSG;
×
858
    goto _OVER;
×
859
  }
860

UNCOV
861
  pObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
×
UNCOV
862
  if (pObj == NULL) {
×
UNCOV
863
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
864
    if (terrno != 0) code = terrno;
×
UNCOV
865
    goto _OVER;
×
866
  }
867

UNCOV
868
  if (pMnode->selfDnodeId == dropReq.dnodeId) {
×
UNCOV
869
    code = TSDB_CODE_MND_CANT_DROP_LEADER;
×
UNCOV
870
    goto _OVER;
×
871
  }
872

UNCOV
873
  if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
×
874
    code = TSDB_CODE_MND_TOO_FEW_MNODES;
×
875
    goto _OVER;
×
876
  }
877

UNCOV
878
  if (!mndIsDnodeOnline(pObj->pDnode, taosGetTimestampMs())) {
×
UNCOV
879
    code = TSDB_CODE_DNODE_OFFLINE;
×
UNCOV
880
    goto _OVER;
×
881
  }
882

UNCOV
883
  code = mndDropMnode(pMnode, pReq, pObj);
×
UNCOV
884
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
885

UNCOV
886
  char obj[40] = {0};
×
UNCOV
887
  (void)tsnprintf(obj, sizeof(obj), "%d", dropReq.dnodeId);
×
888

UNCOV
889
  auditRecord(pReq, pMnode->clusterId, "dropMnode", "", obj, dropReq.sql, dropReq.sqlLen);
×
890

UNCOV
891
_OVER:
×
UNCOV
892
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
893
    mError("mnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr());
×
894
  }
895

UNCOV
896
  mndReleaseMnode(pMnode, pObj);
×
UNCOV
897
  tFreeSMCreateQnodeReq(&dropReq);
×
UNCOV
898
  TAOS_RETURN(code);
×
899
}
900

UNCOV
901
static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
UNCOV
902
  SMnode    *pMnode = pReq->info.node;
×
UNCOV
903
  SSdb      *pSdb = pMnode->pSdb;
×
UNCOV
904
  int32_t    numOfRows = 0;
×
UNCOV
905
  int32_t    cols = 0;
×
UNCOV
906
  SMnodeObj *pObj = NULL;
×
UNCOV
907
  SMnodeObj *pSelfObj = NULL;
×
UNCOV
908
  ESdbStatus objStatus = 0;
×
909
  char      *pWrite;
UNCOV
910
  int64_t    curMs = taosGetTimestampMs();
×
UNCOV
911
  int        code = 0;
×
912

UNCOV
913
  pSelfObj = sdbAcquire(pSdb, SDB_MNODE, &pMnode->selfDnodeId);
×
UNCOV
914
  if (pSelfObj == NULL) {
×
915
    mError("mnode:%d, failed to acquire self %s", pMnode->selfDnodeId, terrstr());
×
916
    goto _out;
×
917
  }
918

UNCOV
919
  while (numOfRows < rows) {
×
UNCOV
920
    pShow->pIter = sdbFetchAll(pSdb, SDB_MNODE, pShow->pIter, (void **)&pObj, &objStatus, true);
×
UNCOV
921
    if (pShow->pIter == NULL) break;
×
922

UNCOV
923
    cols = 0;
×
UNCOV
924
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
925
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->id, false);
×
UNCOV
926
    if (code != 0) {
×
927
      mError("mnode:%d, failed to set col data val since %s", pObj->id, tstrerror(code));
×
928
      sdbCancelFetch(pSdb, pShow->pIter);
×
929
      sdbRelease(pSdb, pObj);
×
930
      goto _out;
×
931
    }
932

UNCOV
933
    char b1[TSDB_EP_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
934
    STR_WITH_MAXSIZE_TO_VARSTR(b1, pObj->pDnode->ep, TSDB_EP_LEN + VARSTR_HEADER_SIZE);
×
935

UNCOV
936
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
937
    code = colDataSetVal(pColInfo, numOfRows, b1, false);
×
UNCOV
938
    if (code != 0) {
×
939
      mError("mnode:%d, failed to set col data val since %s", pObj->id, tstrerror(code));
×
940
      sdbCancelFetch(pSdb, pShow->pIter);
×
941
      sdbRelease(pSdb, pObj);
×
942
      goto _out;
×
943
    }
944

UNCOV
945
    char role[20] = "offline";
×
UNCOV
946
    if (pObj->id == pMnode->selfDnodeId) {
×
UNCOV
947
      snprintf(role, sizeof(role), "%s%s", syncStr(TAOS_SYNC_STATE_LEADER), pMnode->restored ? "" : "*");
×
948
    }
UNCOV
949
    bool isDnodeOnline = mndIsDnodeOnline(pObj->pDnode, curMs);
×
UNCOV
950
    if (isDnodeOnline) {
×
UNCOV
951
      tstrncpy(role, syncStr(pObj->syncState), sizeof(role));
×
UNCOV
952
      if (pObj->syncState == TAOS_SYNC_STATE_LEADER && pObj->id != pMnode->selfDnodeId) {
×
953
        tstrncpy(role, syncStr(TAOS_SYNC_STATE_ERROR), sizeof(role));
×
954
        mError("mnode:%d, is leader too", pObj->id);
×
955
      }
956
    }
UNCOV
957
    char b2[12 + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
958
    STR_WITH_MAXSIZE_TO_VARSTR(b2, role, pShow->pMeta->pSchemas[cols].bytes);
×
UNCOV
959
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
960
    code = colDataSetVal(pColInfo, numOfRows, (const char *)b2, false);
×
UNCOV
961
    if (code != 0) goto _err;
×
962

UNCOV
963
    const char *status = "ready";
×
UNCOV
964
    if (objStatus == SDB_STATUS_CREATING) status = "creating";
×
UNCOV
965
    if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
×
UNCOV
966
    if (!isDnodeOnline) status = "offline";
×
UNCOV
967
    char b3[9 + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
968
    STR_WITH_MAXSIZE_TO_VARSTR(b3, status, pShow->pMeta->pSchemas[cols].bytes);
×
UNCOV
969
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
970
    code = colDataSetVal(pColInfo, numOfRows, (const char *)b3, false);
×
UNCOV
971
    if (code != 0) goto _err;
×
972

UNCOV
973
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
974
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->createdTime, false);
×
UNCOV
975
    if (code != 0) goto _err;
×
976

UNCOV
977
    int64_t roleTimeMs = (isDnodeOnline) ? pObj->roleTimeMs : 0;
×
UNCOV
978
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
979
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&roleTimeMs, false);
×
UNCOV
980
    if (code != 0) goto _err;
×
981

UNCOV
982
    numOfRows++;
×
UNCOV
983
    sdbRelease(pSdb, pObj);
×
984
  }
985

UNCOV
986
  pShow->numOfRows += numOfRows;
×
987

UNCOV
988
_out:
×
UNCOV
989
  sdbRelease(pSdb, pSelfObj);
×
UNCOV
990
  return numOfRows;
×
991

992
_err:
×
993
  mError("mnode:%d, failed to set col data val since %s", pObj->id, tstrerror(code));
×
994
  sdbCancelFetch(pSdb, pShow->pIter);
×
995
  sdbRelease(pSdb, pObj);
×
996
  sdbRelease(pSdb, pSelfObj);
×
997
  return numOfRows;
×
998
}
999

1000
static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter) {
×
1001
  SSdb *pSdb = pMnode->pSdb;
×
1002
  sdbCancelFetchByType(pSdb, pIter, SDB_MNODE);
×
1003
}
×
1004

1005
static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) {
×
1006
#if 1
1007
  return 0;
×
1008
#else
1009
  int32_t         code = 0;
1010
  SMnode         *pMnode = pReq->info.node;
1011
  SDAlterMnodeReq alterReq = {0};
1012

1013
  TAOS_CHECK_RETURN(tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq));
1014

1015
  SMnodeOpt option = {.deploy = true, .numOfReplicas = alterReq.replica, .selfIndex = -1};
1016
  memcpy(option.replicas, alterReq.replicas, sizeof(alterReq.replicas));
1017
  for (int32_t i = 0; i < option.numOfReplicas; ++i) {
1018
    if (alterReq.replicas[i].id == pMnode->selfDnodeId) {
1019
      option.selfIndex = i;
1020
    }
1021
  }
1022

1023
  if (option.selfIndex == -1) {
1024
    mInfo("alter mnode not processed since selfIndex is -1", terrstr());
1025
    return 0;
1026
  }
1027

1028
  if ((code = mndWriteFile(pMnode->path, &option)) != 0) {
1029
    mError("failed to write mnode file since %s", terrstr());
1030
    TAOS_RETURN(code);
1031
  }
1032

1033
  SSyncCfg cfg = {.replicaNum = alterReq.replica, .myIndex = -1};
1034
  for (int32_t i = 0; i < alterReq.replica; ++i) {
1035
    SNodeInfo *pNode = &cfg.nodeInfo[i];
1036
    tstrncpy(pNode->nodeFqdn, alterReq.replicas[i].fqdn, sizeof(pNode->nodeFqdn));
1037
    pNode->nodePort = alterReq.replicas[i].port;
1038
    if (alterReq.replicas[i].id == pMnode->selfDnodeId) {
1039
      cfg.myIndex = i;
1040
    }
1041
  }
1042

1043
  if (cfg.myIndex == -1) {
1044
    mError("failed to alter mnode since myindex is -1");
1045
    return -1;
1046
  } else {
1047
    mInfo("start to alter mnode sync, replica:%d myIndex:%d", cfg.replicaNum, cfg.myIndex);
1048
    for (int32_t i = 0; i < alterReq.replica; ++i) {
1049
      SNodeInfo *pNode = &cfg.nodeInfo[i];
1050
      mInfo("index:%d, fqdn:%s port:%d", i, pNode->nodeFqdn, pNode->nodePort);
1051
    }
1052
  }
1053

1054
  code = syncReconfig(pMnode->syncMgmt.sync, &cfg);
1055
  if (code != 0) {
1056
    mError("failed to sync reconfig since %s", terrstr());
1057
  } else {
1058
    mInfo("alter mnode sync success");
1059
  }
1060

1061
  TAOS_RETURN(code);
1062
#endif
1063
}
1064

1065
static void mndReloadSyncConfig(SMnode *pMnode) {
13✔
1066
  SSdb      *pSdb = pMnode->pSdb;
13✔
1067
  SMnodeObj *pObj = NULL;
13✔
1068
  ESdbStatus objStatus = 0;
13✔
1069
  void      *pIter = NULL;
13✔
1070
  int32_t    updatingMnodes = 0;
13✔
1071
  int32_t    readyMnodes = 0;
13✔
1072
  int32_t    code = 0;
13✔
1073
  SSyncCfg   cfg = {
13✔
1074
        .myIndex = -1,
1075
        .lastIndex = 0,
1076
  };
1077
  SyncIndex maxIndex = 0;
13✔
1078

1079
  while (1) {
1080
    pIter = sdbFetchAll(pSdb, SDB_MNODE, pIter, (void **)&pObj, &objStatus, false);
26✔
1081
    if (pIter == NULL) break;
26✔
1082
    if (objStatus == SDB_STATUS_CREATING || objStatus == SDB_STATUS_DROPPING) {
13!
UNCOV
1083
      mInfo("vgId:1, has updating mnode:%d, status:%s", pObj->id, sdbStatusName(objStatus));
×
UNCOV
1084
      updatingMnodes++;
×
1085
    }
1086
    if (objStatus == SDB_STATUS_READY) {
13!
1087
      mInfo("vgId:1, has ready mnode:%d, status:%s", pObj->id, sdbStatusName(objStatus));
13!
1088
      readyMnodes++;
13✔
1089
    }
1090

1091
    if (objStatus == SDB_STATUS_READY || objStatus == SDB_STATUS_CREATING) {
13!
1092
      SNodeInfo *pNode = &cfg.nodeInfo[cfg.totalReplicaNum];
13✔
1093
      pNode->nodeId = pObj->pDnode->id;
13✔
1094
      pNode->clusterId = mndGetClusterId(pMnode);
13✔
1095
      pNode->nodePort = pObj->pDnode->port;
13✔
1096
      pNode->nodeRole = pObj->role;
13✔
1097
      tstrncpy(pNode->nodeFqdn, pObj->pDnode->fqdn, TSDB_FQDN_LEN);
13✔
1098
      code = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
13✔
1099
      if (code != 0) {
13!
1100
        mError("mnode:%d, failed to update dnode info since %s", pObj->id, terrstr());
×
1101
      }
1102
      mInfo("vgId:1, ep:%s:%u dnode:%d", pNode->nodeFqdn, pNode->nodePort, pNode->nodeId);
13!
1103
      if (pObj->pDnode->id == pMnode->selfDnodeId) {
13!
1104
        cfg.myIndex = cfg.totalReplicaNum;
13✔
1105
      }
1106
      if (pNode->nodeRole == TAOS_SYNC_ROLE_VOTER) {
13!
1107
        cfg.replicaNum++;
13✔
1108
      }
1109
      cfg.totalReplicaNum++;
13✔
1110
      if (pObj->lastIndex > cfg.lastIndex) {
13!
UNCOV
1111
        cfg.lastIndex = pObj->lastIndex;
×
1112
      }
1113
    }
1114

1115
    if (objStatus == SDB_STATUS_DROPPING) {
13!
UNCOV
1116
      if (pObj->lastIndex > cfg.lastIndex) {
×
UNCOV
1117
        cfg.lastIndex = pObj->lastIndex;
×
1118
      }
1119
    }
1120

1121
    mInfo("vgId:1, mnode:%d, role:%d, lastIndex:%" PRId64, pObj->id, pObj->role, pObj->lastIndex);
13!
1122

1123
    sdbReleaseLock(pSdb, pObj, false);
13✔
1124
  }
1125

1126
  // if (readyMnodes <= 0 || updatingMnodes <= 0) {
1127
  //   mInfo("vgId:1, mnode sync not reconfig since readyMnodes:%d updatingMnodes:%d", readyMnodes, updatingMnodes);
1128
  //   return;
1129
  // }
1130

1131
  if (cfg.myIndex == -1) {
13!
1132
#if 1
UNCOV
1133
    mInfo("vgId:1, mnode sync not reconfig since selfIndex is -1");
×
1134
#else
1135
    // cannot reconfig because the leader may fail to elect after reboot
1136
    mInfo("vgId:1, mnode sync not reconfig since selfIndex is -1, do sync stop oper");
1137
    syncStop(pMnode->syncMgmt.sync);
1138
#endif
UNCOV
1139
    return;
×
1140
  }
1141

1142
  if (pMnode->syncMgmt.sync > 0) {
13✔
1143
    mInfo("vgId:1, mnode sync reconfig, totalReplica:%d replica:%d myIndex:%d", cfg.totalReplicaNum, cfg.replicaNum,
12!
1144
          cfg.myIndex);
1145

1146
    for (int32_t i = 0; i < cfg.totalReplicaNum; ++i) {
24✔
1147
      SNodeInfo *pNode = &cfg.nodeInfo[i];
12✔
1148
      mInfo("vgId:1, index:%d, ep:%s:%u dnode:%d cluster:%" PRId64 " role:%d", i, pNode->nodeFqdn, pNode->nodePort,
12!
1149
            pNode->nodeId, pNode->clusterId, pNode->nodeRole);
1150
    }
1151

1152
    int32_t code = syncReconfig(pMnode->syncMgmt.sync, &cfg);
12✔
1153
    if (code != 0) {
12!
1154
      mError("vgId:1, mnode sync reconfig failed since %s", terrstr());
×
1155
    } else {
1156
      mInfo("vgId:1, mnode sync reconfig success");
12!
1157
    }
1158
  }
1159
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc