• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4754

25 Sep 2025 05:58AM UTC coverage: 57.946% (-1.0%) from 58.977%
#4754

push

travis-ci

web-flow
enh: taos command line support '-uroot' on windows (#33055)

133189 of 293169 branches covered (45.43%)

Branch coverage included in aggregate %.

201677 of 284720 relevant lines covered (70.83%)

5398749.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.88
/source/dnode/mnode/impl/src/mndVgroup.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "mndArbGroup.h"
19
#include "mndDb.h"
20
#include "mndDnode.h"
21
#include "mndMnode.h"
22
#include "mndPrivilege.h"
23
#include "mndShow.h"
24
#include "mndStb.h"
25
#include "mndStream.h"
26
#include "mndTopic.h"
27
#include "mndTrans.h"
28
#include "mndUser.h"
29
#include "mndVgroup.h"
30
#include "tmisce.h"
31

32
#define VGROUP_VER_NUMBER   1
33
#define VGROUP_RESERVE_SIZE 60
34

35
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup);
36
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup);
37
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew);
38
static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw);
39

40
static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
41
static void    mndCancelGetNextVgroup(SMnode *pMnode, void *pIter);
42
static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
43
static void    mndCancelGetNextVnode(SMnode *pMnode, void *pIter);
44

45
static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq);
46
static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq);
47
static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq);
48
static int32_t mndProcessVgroupBalanceLeaderMsg(SRpcMsg *pReq);
49

50
int32_t mndInitVgroup(SMnode *pMnode) {
1,919✔
51
  SSdbTable table = {
1,919✔
52
      .sdbType = SDB_VGROUP,
53
      .keyType = SDB_KEY_INT32,
54
      .encodeFp = (SdbEncodeFp)mndVgroupActionEncode,
55
      .decodeFp = (SdbDecodeFp)mndVgroupActionDecode,
56
      .insertFp = (SdbInsertFp)mndVgroupActionInsert,
57
      .updateFp = (SdbUpdateFp)mndVgroupActionUpdate,
58
      .deleteFp = (SdbDeleteFp)mndVgroupActionDelete,
59
      .validateFp = (SdbValidateFp)mndNewVgActionValidate,
60
  };
61

62
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndTransProcessRsp);
1,919✔
63
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_REPLICA_RSP, mndTransProcessRsp);
1,919✔
64
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIG_RSP, mndTransProcessRsp);
1,919✔
65
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIRM_RSP, mndTransProcessRsp);
1,919✔
66
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_HASHRANGE_RSP, mndTransProcessRsp);
1,919✔
67
  mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndTransProcessRsp);
1,919✔
68
  mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_RSP, mndTransProcessRsp);
1,919✔
69
  mndSetMsgHandle(pMnode, TDMT_VND_DISABLE_WRITE_RSP, mndTransProcessRsp);
1,919✔
70
  mndSetMsgHandle(pMnode, TDMT_SYNC_FORCE_FOLLOWER_RSP, mndTransProcessRsp);
1,919✔
71
  mndSetMsgHandle(pMnode, TDMT_DND_ALTER_VNODE_TYPE_RSP, mndTransProcessRsp);
1,919✔
72
  mndSetMsgHandle(pMnode, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP_RSP, mndTransProcessRsp);
1,919✔
73
  mndSetMsgHandle(pMnode, TDMT_SYNC_CONFIG_CHANGE_RSP, mndTransProcessRsp);
1,919✔
74

75
  mndSetMsgHandle(pMnode, TDMT_MND_REDISTRIBUTE_VGROUP, mndProcessRedistributeVgroupMsg);
1,919✔
76
  mndSetMsgHandle(pMnode, TDMT_MND_SPLIT_VGROUP, mndProcessSplitVgroupMsg);
1,919✔
77
  // mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessVgroupBalanceLeaderMsg);
78
  mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessBalanceVgroupMsg);
1,919✔
79
  mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP_LEADER, mndProcessVgroupBalanceLeaderMsg);
1,919✔
80

81
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndRetrieveVgroups);
1,919✔
82
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndCancelGetNextVgroup);
1,919✔
83
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndRetrieveVnodes);
1,919✔
84
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndCancelGetNextVnode);
1,919✔
85

86
  return sdbSetTable(pMnode->pSdb, table);
1,919✔
87
}
88

89
void mndCleanupVgroup(SMnode *pMnode) {}
1,919✔
90

91
SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) {
37,559✔
92
  int32_t code = 0;
37,559✔
93
  int32_t lino = 0;
37,559✔
94
  terrno = TSDB_CODE_OUT_OF_MEMORY;
37,559✔
95

96
  SSdbRaw *pRaw = sdbAllocRaw(SDB_VGROUP, VGROUP_VER_NUMBER, sizeof(SVgObj) + VGROUP_RESERVE_SIZE);
37,559✔
97
  if (pRaw == NULL) goto _OVER;
37,559!
98

99
  int32_t dataPos = 0;
37,559✔
100
  SDB_SET_INT32(pRaw, dataPos, pVgroup->vgId, _OVER)
37,559!
101
  SDB_SET_INT64(pRaw, dataPos, pVgroup->createdTime, _OVER)
37,559!
102
  SDB_SET_INT64(pRaw, dataPos, pVgroup->updateTime, _OVER)
37,559!
103
  SDB_SET_INT32(pRaw, dataPos, pVgroup->version, _OVER)
37,559!
104
  SDB_SET_INT32(pRaw, dataPos, pVgroup->hashBegin, _OVER)
37,559!
105
  SDB_SET_INT32(pRaw, dataPos, pVgroup->hashEnd, _OVER)
37,559!
106
  SDB_SET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, _OVER)
37,559!
107
  SDB_SET_INT64(pRaw, dataPos, pVgroup->dbUid, _OVER)
37,559!
108
  SDB_SET_INT8(pRaw, dataPos, pVgroup->isTsma, _OVER)
37,559!
109
  SDB_SET_INT8(pRaw, dataPos, pVgroup->replica, _OVER)
37,559!
110
  for (int8_t i = 0; i < pVgroup->replica; ++i) {
81,041✔
111
    SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
43,482✔
112
    SDB_SET_INT32(pRaw, dataPos, pVgid->dnodeId, _OVER)
43,482!
113
  }
114
  SDB_SET_INT32(pRaw, dataPos, pVgroup->syncConfChangeVer, _OVER)
37,559!
115
  SDB_SET_INT32(pRaw, dataPos, pVgroup->mountVgId, _OVER)
37,559!
116
  SDB_SET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER)
37,559!
117
  SDB_SET_DATALEN(pRaw, dataPos, _OVER)
37,559!
118

119
  terrno = 0;
37,559✔
120

121
_OVER:
37,559✔
122
  if (terrno != 0) {
37,559!
123
    mError("vgId:%d, failed to encode to raw:%p since %s", pVgroup->vgId, pRaw, terrstr());
×
124
    sdbFreeRaw(pRaw);
×
125
    return NULL;
×
126
  }
127

128
  mTrace("vgId:%d, encode to raw:%p, row:%p", pVgroup->vgId, pRaw, pVgroup);
37,559✔
129
  return pRaw;
37,559✔
130
}
131

132
SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
33,601✔
133
  int32_t code = 0;
33,601✔
134
  int32_t lino = 0;
33,601✔
135
  terrno = TSDB_CODE_OUT_OF_MEMORY;
33,601✔
136
  SSdbRow *pRow = NULL;
33,601✔
137
  SVgObj  *pVgroup = NULL;
33,601✔
138

139
  int8_t sver = 0;
33,601✔
140
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
33,601!
141

142
  if (sver < 1 || sver > VGROUP_VER_NUMBER) {
33,601!
143
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
144
    goto _OVER;
×
145
  }
146

147
  pRow = sdbAllocRow(sizeof(SVgObj));
33,601✔
148
  if (pRow == NULL) goto _OVER;
33,601!
149

150
  pVgroup = sdbGetRowObj(pRow);
33,601✔
151
  if (pVgroup == NULL) goto _OVER;
33,601!
152

153
  int32_t dataPos = 0;
33,601✔
154
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->vgId, _OVER)
33,601!
155
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->createdTime, _OVER)
33,601!
156
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->updateTime, _OVER)
33,601!
157
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->version, _OVER)
33,601!
158
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashBegin, _OVER)
33,601!
159
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashEnd, _OVER)
33,601!
160
  SDB_GET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, _OVER)
33,601!
161
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->dbUid, _OVER)
33,601!
162
  SDB_GET_INT8(pRaw, dataPos, &pVgroup->isTsma, _OVER)
33,601!
163
  SDB_GET_INT8(pRaw, dataPos, &pVgroup->replica, _OVER)
33,601!
164
  for (int8_t i = 0; i < pVgroup->replica; ++i) {
73,802✔
165
    SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
40,201✔
166
    SDB_GET_INT32(pRaw, dataPos, &pVgid->dnodeId, _OVER)
40,201!
167
    if (pVgroup->replica == 1) {
40,201✔
168
      pVgid->syncState = TAOS_SYNC_STATE_LEADER;
30,243✔
169
    }
170
  }
171
  if (dataPos + 2 * sizeof(int32_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen) {
33,601!
172
    SDB_GET_INT32(pRaw, dataPos, &pVgroup->syncConfChangeVer, _OVER)
33,601!
173
  }
174
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->mountVgId, _OVER)
33,601!
175
  SDB_GET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER)
33,601!
176

177
  terrno = 0;
33,601✔
178

179
_OVER:
33,601✔
180
  if (terrno != 0) {
33,601!
181
    mError("vgId:%d, failed to decode from raw:%p since %s", pVgroup == NULL ? 0 : pVgroup->vgId, pRaw, terrstr());
×
182
    taosMemoryFreeClear(pRow);
×
183
    return NULL;
×
184
  }
185

186
  mTrace("vgId:%d, decode from raw:%p, row:%p", pVgroup->vgId, pRaw, pVgroup);
33,601✔
187
  return pRow;
33,601✔
188
}
189

190
static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw) {
9,245✔
191
  SSdb    *pSdb = pMnode->pSdb;
9,245✔
192
  SSdbRow *pRow = NULL;
9,245✔
193
  SVgObj  *pVgroup = NULL;
9,245✔
194
  int      code = -1;
9,245✔
195

196
  pRow = mndVgroupActionDecode(pRaw);
9,245✔
197
  if (pRow == NULL) {
9,245!
198
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
199
    if (terrno != 0) code = terrno;
×
200
    goto _OVER;
×
201
  }
202
  pVgroup = sdbGetRowObj(pRow);
9,245✔
203
  if (pVgroup == NULL) {
9,245!
204
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
205
    if (terrno != 0) code = terrno;
×
206
    goto _OVER;
×
207
  }
208

209
  int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
9,245✔
210
  if (maxVgId > pVgroup->vgId) {
9,245!
211
    mError("trans:%d, vgroup id %d already in use. maxVgId:%d", pTrans->id, pVgroup->vgId, maxVgId);
×
212
    goto _OVER;
×
213
  }
214

215
  code = 0;
9,245✔
216
_OVER:
9,245✔
217
  if (pVgroup) mndVgroupActionDelete(pSdb, pVgroup);
9,245!
218
  taosMemoryFreeClear(pRow);
9,245!
219
  TAOS_RETURN(code);
9,245✔
220
}
221

222
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup) {
10,512✔
223
  mTrace("vgId:%d, perform insert action, row:%p", pVgroup->vgId, pVgroup);
10,512✔
224
  return 0;
10,512✔
225
}
226

227
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup) {
33,601✔
228
  mTrace("vgId:%d, perform delete action, row:%p", pVgroup->vgId, pVgroup);
33,601✔
229
  return 0;
33,601✔
230
}
231

232
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew) {
9,962✔
233
  mTrace("vgId:%d, perform update action, old row:%p new row:%p", pOld->vgId, pOld, pNew);
9,962✔
234
  pOld->updateTime = pNew->updateTime;
9,962✔
235
  pOld->version = pNew->version;
9,962✔
236
  pOld->hashBegin = pNew->hashBegin;
9,962✔
237
  pOld->hashEnd = pNew->hashEnd;
9,962✔
238
  pOld->replica = pNew->replica;
9,962✔
239
  pOld->isTsma = pNew->isTsma;
9,962✔
240
  for (int32_t i = 0; i < pNew->replica; ++i) {
22,545✔
241
    SVnodeGid *pNewGid = &pNew->vnodeGid[i];
12,583✔
242
    for (int32_t j = 0; j < pOld->replica; ++j) {
33,148✔
243
      SVnodeGid *pOldGid = &pOld->vnodeGid[j];
20,565✔
244
      if (pNewGid->dnodeId == pOldGid->dnodeId) {
20,565✔
245
        pNewGid->syncState = pOldGid->syncState;
12,163✔
246
        pNewGid->syncRestore = pOldGid->syncRestore;
12,163✔
247
        pNewGid->syncCanRead = pOldGid->syncCanRead;
12,163✔
248
        pNewGid->syncAppliedIndex = pOldGid->syncAppliedIndex;
12,163✔
249
        pNewGid->syncCommitIndex = pOldGid->syncCommitIndex;
12,163✔
250
        pNewGid->bufferSegmentUsed = pOldGid->bufferSegmentUsed;
12,163✔
251
        pNewGid->bufferSegmentSize = pOldGid->bufferSegmentSize;
12,163✔
252
      }
253
    }
254
  }
255
  pNew->numOfTables = pOld->numOfTables;
9,962✔
256
  pNew->numOfTimeSeries = pOld->numOfTimeSeries;
9,962✔
257
  pNew->totalStorage = pOld->totalStorage;
9,962✔
258
  pNew->compStorage = pOld->compStorage;
9,962✔
259
  pNew->pointsWritten = pOld->pointsWritten;
9,962✔
260
  pNew->compact = pOld->compact;
9,962✔
261
  memcpy(pOld->vnodeGid, pNew->vnodeGid, (TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA) * sizeof(SVnodeGid));
9,962✔
262
  pOld->syncConfChangeVer = pNew->syncConfChangeVer;
9,962✔
263
  return 0;
9,962✔
264
}
265

266
SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId) {
272,920✔
267
  SSdb   *pSdb = pMnode->pSdb;
272,920✔
268
  SVgObj *pVgroup = sdbAcquire(pSdb, SDB_VGROUP, &vgId);
272,920✔
269
  if (pVgroup == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
272,920✔
270
    terrno = TSDB_CODE_MND_VGROUP_NOT_EXIST;
2,753✔
271
  }
272
  return pVgroup;
272,920✔
273
}
274

275
void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) {
270,501✔
276
  SSdb *pSdb = pMnode->pSdb;
270,501✔
277
  sdbRelease(pSdb, pVgroup);
270,501✔
278
}
270,501✔
279

280
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
9,598✔
281
  SCreateVnodeReq createReq = {0};
9,598✔
282
  createReq.vgId = pVgroup->vgId;
9,598✔
283
  memcpy(createReq.db, pDb->name, TSDB_DB_FNAME_LEN);
9,598✔
284
  createReq.dbUid = pDb->uid;
9,598✔
285
  createReq.vgVersion = pVgroup->version;
9,598✔
286
  createReq.numOfStables = pDb->cfg.numOfStables;
9,598✔
287
  createReq.buffer = pDb->cfg.buffer;
9,598✔
288
  createReq.pageSize = pDb->cfg.pageSize;
9,598✔
289
  createReq.pages = pDb->cfg.pages;
9,598✔
290
  createReq.cacheLastSize = pDb->cfg.cacheLastSize;
9,598✔
291
  createReq.daysPerFile = pDb->cfg.daysPerFile;
9,598✔
292
  createReq.daysToKeep0 = pDb->cfg.daysToKeep0;
9,598✔
293
  createReq.daysToKeep1 = pDb->cfg.daysToKeep1;
9,598✔
294
  createReq.daysToKeep2 = pDb->cfg.daysToKeep2;
9,598✔
295
  createReq.keepTimeOffset = pDb->cfg.keepTimeOffset;
9,598✔
296
  createReq.ssChunkSize = pDb->cfg.ssChunkSize;
9,598✔
297
  createReq.ssKeepLocal = pDb->cfg.ssKeepLocal;
9,598✔
298
  createReq.ssCompact = pDb->cfg.ssCompact;
9,598✔
299
  createReq.minRows = pDb->cfg.minRows;
9,598✔
300
  createReq.maxRows = pDb->cfg.maxRows;
9,598✔
301
  createReq.walFsyncPeriod = pDb->cfg.walFsyncPeriod;
9,598✔
302
  createReq.walLevel = pDb->cfg.walLevel;
9,598✔
303
  createReq.precision = pDb->cfg.precision;
9,598✔
304
  createReq.compression = pDb->cfg.compression;
9,598✔
305
  createReq.strict = pDb->cfg.strict;
9,598✔
306
  createReq.cacheLast = pDb->cfg.cacheLast;
9,598✔
307
  createReq.replica = 0;
9,598✔
308
  createReq.learnerReplica = 0;
9,598✔
309
  createReq.selfIndex = -1;
9,598✔
310
  createReq.learnerSelfIndex = -1;
9,598✔
311
  createReq.hashBegin = pVgroup->hashBegin;
9,598✔
312
  createReq.hashEnd = pVgroup->hashEnd;
9,598✔
313
  createReq.hashMethod = pDb->cfg.hashMethod;
9,598✔
314
  createReq.numOfRetensions = pDb->cfg.numOfRetensions;
9,598✔
315
  createReq.pRetensions = pDb->cfg.pRetensions;
9,598✔
316
  createReq.isTsma = pVgroup->isTsma;
9,598✔
317
  createReq.pTsma = pVgroup->pTsma;
9,598✔
318
  createReq.walRetentionPeriod = pDb->cfg.walRetentionPeriod;
9,598✔
319
  createReq.walRetentionSize = pDb->cfg.walRetentionSize;
9,598✔
320
  createReq.walRollPeriod = pDb->cfg.walRollPeriod;
9,598✔
321
  createReq.walSegmentSize = pDb->cfg.walSegmentSize;
9,598✔
322
  createReq.sstTrigger = pDb->cfg.sstTrigger;
9,598✔
323
  createReq.hashPrefix = pDb->cfg.hashPrefix;
9,598✔
324
  createReq.hashSuffix = pDb->cfg.hashSuffix;
9,598✔
325
  createReq.tsdbPageSize = pDb->cfg.tsdbPageSize;
9,598✔
326
  createReq.changeVersion = ++(pVgroup->syncConfChangeVer);
9,598✔
327
  createReq.encryptAlgorithm = pDb->cfg.encryptAlgorithm;
9,598✔
328
  int32_t code = 0;
9,598✔
329

330
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
22,135✔
331
    SReplica *pReplica = NULL;
12,537✔
332

333
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
12,537✔
334
      pReplica = &createReq.replicas[createReq.replica];
12,321✔
335
    } else {
336
      pReplica = &createReq.learnerReplicas[createReq.learnerReplica];
216✔
337
    }
338

339
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
12,537✔
340
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
12,537✔
341
    if (pVgidDnode == NULL) {
12,537!
342
      return NULL;
×
343
    }
344

345
    pReplica->id = pVgidDnode->id;
12,537✔
346
    pReplica->port = pVgidDnode->port;
12,537✔
347
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
12,537✔
348
    mndReleaseDnode(pMnode, pVgidDnode);
12,537✔
349

350
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
12,537✔
351
      if (pDnode->id == pVgid->dnodeId) {
12,321✔
352
        createReq.selfIndex = createReq.replica;
9,382✔
353
      }
354
    } else {
355
      if (pDnode->id == pVgid->dnodeId) {
216!
356
        createReq.learnerSelfIndex = createReq.learnerReplica;
216✔
357
      }
358
    }
359

360
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
12,537✔
361
      createReq.replica++;
12,321✔
362
    } else {
363
      createReq.learnerReplica++;
216✔
364
    }
365
  }
366

367
  if (createReq.selfIndex == -1 && createReq.learnerSelfIndex == -1) {
9,598!
368
    terrno = TSDB_CODE_APP_ERROR;
×
369
    return NULL;
×
370
  }
371

372
  createReq.changeVersion = pVgroup->syncConfChangeVer;
9,598✔
373

374
  mInfo(
9,598!
375
      "vgId:%d, build create vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d "
376
      "changeVersion:%d",
377
      createReq.vgId, createReq.replica, createReq.selfIndex, createReq.learnerReplica, createReq.learnerSelfIndex,
378
      createReq.strict, createReq.changeVersion);
379
  for (int32_t i = 0; i < createReq.replica; ++i) {
21,919✔
380
    mInfo("vgId:%d, replica:%d ep:%s:%u", createReq.vgId, i, createReq.replicas[i].fqdn, createReq.replicas[i].port);
12,321!
381
  }
382
  for (int32_t i = 0; i < createReq.learnerReplica; ++i) {
9,814✔
383
    mInfo("vgId:%d, replica:%d ep:%s:%u", createReq.vgId, i, createReq.learnerReplicas[i].fqdn,
216!
384
          createReq.learnerReplicas[i].port);
385
  }
386

387
  int32_t contLen = tSerializeSCreateVnodeReq(NULL, 0, &createReq);
9,598✔
388
  if (contLen < 0) {
9,598!
389
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
390
    return NULL;
×
391
  }
392

393
  void *pReq = taosMemoryMalloc(contLen);
9,598!
394
  if (pReq == NULL) {
9,598!
395
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
396
    return NULL;
×
397
  }
398

399
  code = tSerializeSCreateVnodeReq(pReq, contLen, &createReq);
9,598✔
400
  if (code < 0) {
9,598!
401
    terrno = TSDB_CODE_APP_ERROR;
×
402
    taosMemoryFree(pReq);
×
403
    mError("vgId:%d, failed to serialize create vnode req,since %s", createReq.vgId, terrstr());
×
404
    return NULL;
×
405
  }
406
  *pContLen = contLen;
9,598✔
407
  return pReq;
9,598✔
408
}
409

410
static void *mndBuildAlterVnodeConfigReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
437✔
411
  SAlterVnodeConfigReq alterReq = {0};
437✔
412
  alterReq.vgVersion = pVgroup->version;
437✔
413
  alterReq.buffer = pDb->cfg.buffer;
437✔
414
  alterReq.pageSize = pDb->cfg.pageSize;
437✔
415
  alterReq.pages = pDb->cfg.pages;
437✔
416
  alterReq.cacheLastSize = pDb->cfg.cacheLastSize;
437✔
417
  alterReq.daysPerFile = pDb->cfg.daysPerFile;
437✔
418
  alterReq.daysToKeep0 = pDb->cfg.daysToKeep0;
437✔
419
  alterReq.daysToKeep1 = pDb->cfg.daysToKeep1;
437✔
420
  alterReq.daysToKeep2 = pDb->cfg.daysToKeep2;
437✔
421
  alterReq.keepTimeOffset = pDb->cfg.keepTimeOffset;
437✔
422
  alterReq.walFsyncPeriod = pDb->cfg.walFsyncPeriod;
437✔
423
  alterReq.walLevel = pDb->cfg.walLevel;
437✔
424
  alterReq.strict = pDb->cfg.strict;
437✔
425
  alterReq.cacheLast = pDb->cfg.cacheLast;
437✔
426
  alterReq.sttTrigger = pDb->cfg.sstTrigger;
437✔
427
  alterReq.minRows = pDb->cfg.minRows;
437✔
428
  alterReq.walRetentionPeriod = pDb->cfg.walRetentionPeriod;
437✔
429
  alterReq.walRetentionSize = pDb->cfg.walRetentionSize;
437✔
430
  alterReq.ssKeepLocal = pDb->cfg.ssKeepLocal;
437✔
431
  alterReq.ssCompact = pDb->cfg.ssCompact;
437✔
432

433
  mInfo("vgId:%d, build alter vnode config req", pVgroup->vgId);
437!
434
  int32_t contLen = tSerializeSAlterVnodeConfigReq(NULL, 0, &alterReq);
437✔
435
  if (contLen < 0) {
437!
436
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
437
    return NULL;
×
438
  }
439
  contLen += sizeof(SMsgHead);
437✔
440

441
  void *pReq = taosMemoryMalloc(contLen);
437!
442
  if (pReq == NULL) {
437!
443
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
444
    return NULL;
×
445
  }
446

447
  SMsgHead *pHead = pReq;
437✔
448
  pHead->contLen = htonl(contLen);
437✔
449
  pHead->vgId = htonl(pVgroup->vgId);
437✔
450

451
  if (tSerializeSAlterVnodeConfigReq((char *)pReq + sizeof(SMsgHead), contLen, &alterReq) < 0) {
437!
452
    taosMemoryFree(pReq);
×
453
    mError("vgId:%d, failed to serialize alter vnode config req,since %s", pVgroup->vgId, terrstr());
×
454
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
455
    return NULL;
×
456
  }
457
  *pContLen = contLen;
437✔
458
  return pReq;
437✔
459
}
460

461
static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId,
1,534✔
462
                                          int32_t *pContLen) {
463
  SAlterVnodeReplicaReq alterReq = {
1,534✔
464
      .vgId = pVgroup->vgId,
1,534✔
465
      .strict = pDb->cfg.strict,
1,534✔
466
      .replica = 0,
467
      .learnerReplica = 0,
468
      .selfIndex = -1,
469
      .learnerSelfIndex = -1,
470
      .changeVersion = ++(pVgroup->syncConfChangeVer),
1,534✔
471
  };
472

473
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
6,087✔
474
    SReplica *pReplica = NULL;
4,553✔
475

476
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
4,553✔
477
      pReplica = &alterReq.replicas[alterReq.replica];
4,203✔
478
      alterReq.replica++;
4,203✔
479
    } else {
480
      pReplica = &alterReq.learnerReplicas[alterReq.learnerReplica];
350✔
481
      alterReq.learnerReplica++;
350✔
482
    }
483

484
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
4,553✔
485
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
4,553✔
486
    if (pVgidDnode == NULL) return NULL;
4,553!
487

488
    pReplica->id = pVgidDnode->id;
4,553✔
489
    pReplica->port = pVgidDnode->port;
4,553✔
490
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
4,553✔
491
    mndReleaseDnode(pMnode, pVgidDnode);
4,553✔
492

493
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
4,553✔
494
      if (dnodeId == pVgid->dnodeId) {
4,203✔
495
        alterReq.selfIndex = v;
1,534✔
496
      }
497
    } else {
498
      if (dnodeId == pVgid->dnodeId) {
350!
499
        alterReq.learnerSelfIndex = v;
×
500
      }
501
    }
502
  }
503

504
  mInfo(
1,534!
505
      "vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d "
506
      "changeVersion:%d",
507
      alterReq.vgId, alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica, alterReq.learnerSelfIndex,
508
      alterReq.strict, alterReq.changeVersion);
509
  for (int32_t i = 0; i < alterReq.replica; ++i) {
5,737✔
510
    mInfo("vgId:%d, replica:%d ep:%s:%u", alterReq.vgId, i, alterReq.replicas[i].fqdn, alterReq.replicas[i].port);
4,203!
511
  }
512
  for (int32_t i = 0; i < alterReq.learnerReplica; ++i) {
1,884✔
513
    mInfo("vgId:%d, learnerReplica:%d ep:%s:%u", alterReq.vgId, i, alterReq.learnerReplicas[i].fqdn,
350!
514
          alterReq.learnerReplicas[i].port);
515
  }
516

517
  if (alterReq.selfIndex == -1 && alterReq.learnerSelfIndex == -1) {
1,534!
518
    terrno = TSDB_CODE_APP_ERROR;
×
519
    return NULL;
×
520
  }
521

522
  int32_t contLen = tSerializeSAlterVnodeReplicaReq(NULL, 0, &alterReq);
1,534✔
523
  if (contLen < 0) {
1,534!
524
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
525
    return NULL;
×
526
  }
527

528
  void *pReq = taosMemoryMalloc(contLen);
1,534!
529
  if (pReq == NULL) {
1,534!
530
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
531
    return NULL;
×
532
  }
533

534
  if (tSerializeSAlterVnodeReplicaReq(pReq, contLen, &alterReq) < 0) {
1,534!
535
    mError("vgId:%d, failed to serialize alter vnode req,since %s", alterReq.vgId, terrstr());
×
536
    taosMemoryFree(pReq);
×
537
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
538
    return NULL;
×
539
  }
540
  *pContLen = contLen;
1,534✔
541
  return pReq;
1,534✔
542
}
543

544
static void *mndBuildCheckLearnCatchupReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId,
×
545
                                          int32_t *pContLen) {
546
  SCheckLearnCatchupReq req = {
×
547
      .vgId = pVgroup->vgId,
×
548
      .strict = pDb->cfg.strict,
×
549
      .replica = 0,
550
      .learnerReplica = 0,
551
      .selfIndex = -1,
552
      .learnerSelfIndex = -1,
553
  };
554

555
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
×
556
    SReplica *pReplica = NULL;
×
557

558
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
559
      pReplica = &req.replicas[req.replica];
×
560
      req.replica++;
×
561
    } else {
562
      pReplica = &req.learnerReplicas[req.learnerReplica];
×
563
      req.learnerReplica++;
×
564
    }
565

566
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
×
567
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
×
568
    if (pVgidDnode == NULL) return NULL;
×
569

570
    pReplica->id = pVgidDnode->id;
×
571
    pReplica->port = pVgidDnode->port;
×
572
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
×
573
    mndReleaseDnode(pMnode, pVgidDnode);
×
574

575
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
576
      if (dnodeId == pVgid->dnodeId) {
×
577
        req.selfIndex = v;
×
578
      }
579
    } else {
580
      if (dnodeId == pVgid->dnodeId) {
×
581
        req.learnerSelfIndex = v;
×
582
      }
583
    }
584
  }
585

586
  mInfo("vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d",
×
587
        req.vgId, req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict);
588
  for (int32_t i = 0; i < req.replica; ++i) {
×
589
    mInfo("vgId:%d, replica:%d ep:%s:%u", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port);
×
590
  }
591
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
×
592
    mInfo("vgId:%d, learnerReplica:%d ep:%s:%u", req.vgId, i, req.learnerReplicas[i].fqdn, req.learnerReplicas[i].port);
×
593
  }
594

595
  if (req.selfIndex == -1 && req.learnerSelfIndex == -1) {
×
596
    terrno = TSDB_CODE_APP_ERROR;
×
597
    return NULL;
×
598
  }
599

600
  int32_t contLen = tSerializeSAlterVnodeReplicaReq(NULL, 0, &req);
×
601
  if (contLen < 0) {
×
602
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
603
    return NULL;
×
604
  }
605

606
  void *pReq = taosMemoryMalloc(contLen);
×
607
  if (pReq == NULL) {
×
608
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
609
    return NULL;
×
610
  }
611

612
  if (tSerializeSAlterVnodeReplicaReq(pReq, contLen, &req) < 0) {
×
613
    mError("vgId:%d, failed to serialize alter vnode req,since %s", req.vgId, terrstr());
×
614
    taosMemoryFree(pReq);
×
615
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
616
    return NULL;
×
617
  }
618
  *pContLen = contLen;
×
619
  return pReq;
×
620
}
621

622
static void *mndBuildDisableVnodeWriteReq(SMnode *pMnode, SDbObj *pDb, int32_t vgId, int32_t *pContLen) {
88✔
623
  SDisableVnodeWriteReq disableReq = {
88✔
624
      .vgId = vgId,
625
      .disable = 1,
626
  };
627

628
  mInfo("vgId:%d, build disable vnode write req", vgId);
88!
629
  int32_t contLen = tSerializeSDisableVnodeWriteReq(NULL, 0, &disableReq);
88✔
630
  if (contLen < 0) {
88!
631
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
632
    return NULL;
×
633
  }
634

635
  void *pReq = taosMemoryMalloc(contLen);
88!
636
  if (pReq == NULL) {
88!
637
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
638
    return NULL;
×
639
  }
640

641
  if (tSerializeSDisableVnodeWriteReq(pReq, contLen, &disableReq) < 0) {
88!
642
    mError("vgId:%d, failed to serialize disable vnode write req,since %s", vgId, terrstr());
×
643
    taosMemoryFree(pReq);
×
644
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
645
    return NULL;
×
646
  }
647
  *pContLen = contLen;
88✔
648
  return pReq;
88✔
649
}
650

651
static void *mndBuildAlterVnodeHashRangeReq(SMnode *pMnode, int32_t srcVgId, SVgObj *pVgroup, int32_t *pContLen) {
88✔
652
  SAlterVnodeHashRangeReq alterReq = {
88✔
653
      .srcVgId = srcVgId,
654
      .dstVgId = pVgroup->vgId,
88✔
655
      .hashBegin = pVgroup->hashBegin,
88✔
656
      .hashEnd = pVgroup->hashEnd,
88✔
657
      .changeVersion = ++(pVgroup->syncConfChangeVer),
88✔
658
  };
659

660
  mInfo("vgId:%d, build alter vnode hashrange req, dstVgId:%d, hashrange:[%u, %u]", srcVgId, pVgroup->vgId,
88!
661
        pVgroup->hashBegin, pVgroup->hashEnd);
662
  int32_t contLen = tSerializeSAlterVnodeHashRangeReq(NULL, 0, &alterReq);
88✔
663
  if (contLen < 0) {
88!
664
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
665
    return NULL;
×
666
  }
667

668
  void *pReq = taosMemoryMalloc(contLen);
88!
669
  if (pReq == NULL) {
88!
670
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
671
    return NULL;
×
672
  }
673

674
  if (tSerializeSAlterVnodeHashRangeReq(pReq, contLen, &alterReq) < 0) {
88!
675
    mError("vgId:%d, failed to serialize alter vnode hashrange req,since %s", srcVgId, terrstr());
×
676
    taosMemoryFree(pReq);
×
677
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
678
    return NULL;
×
679
  }
680
  *pContLen = contLen;
88✔
681
  return pReq;
88✔
682
}
683

684
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
13,370✔
685
  SDropVnodeReq dropReq = {0};
13,370✔
686
  dropReq.dnodeId = pDnode->id;
13,370✔
687
  dropReq.vgId = pVgroup->vgId;
13,370✔
688
  memcpy(dropReq.db, pDb->name, TSDB_DB_FNAME_LEN);
13,370✔
689
  dropReq.dbUid = pDb->uid;
13,370✔
690

691
  mInfo("vgId:%d, build drop vnode req", dropReq.vgId);
13,370!
692
  int32_t contLen = tSerializeSDropVnodeReq(NULL, 0, &dropReq);
13,370✔
693
  if (contLen < 0) {
13,370!
694
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
695
    return NULL;
×
696
  }
697

698
  void *pReq = taosMemoryMalloc(contLen);
13,370!
699
  if (pReq == NULL) {
13,370!
700
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
701
    return NULL;
×
702
  }
703

704
  if (tSerializeSDropVnodeReq(pReq, contLen, &dropReq) < 0) {
13,370!
705
    mError("vgId:%d, failed to serialize drop vnode req,since %s", dropReq.vgId, terrstr());
×
706
    taosMemoryFree(pReq);
×
707
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
708
    return NULL;
×
709
  }
710
  *pContLen = contLen;
13,370✔
711
  return pReq;
13,370✔
712
}
713

714
static bool mndResetDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
5,655✔
715
  SDnodeObj *pDnode = pObj;
5,655✔
716
  pDnode->numOfVnodes = 0;
5,655✔
717
  pDnode->numOfOtherNodes = 0;
5,655✔
718
  return true;
5,655✔
719
}
720

721
static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
5,655✔
722
  SDnodeObj *pDnode = pObj;
5,655✔
723
  SArray    *pArray = p1;
5,655✔
724
  int32_t    exceptDnodeId = *(int32_t *)p2;
5,655✔
725
  SArray    *dnodeList = p3;
5,655✔
726

727
  if (exceptDnodeId == pDnode->id) {
5,655✔
728
    return true;
14✔
729
  }
730

731
  if (dnodeList != NULL) {
5,641✔
732
    int32_t dnodeListSize = taosArrayGetSize(dnodeList);
95✔
733
    if (dnodeListSize > 0) {
95!
734
      bool inDnodeList = false;
95✔
735
      for (int32_t index = 0; index < dnodeListSize; ++index) {
310✔
736
        int32_t dnodeId = *(int32_t *)taosArrayGet(dnodeList, index);
215✔
737
        if (pDnode->id == dnodeId) {
215✔
738
          inDnodeList = true;
43✔
739
        }
740
      }
741
      if (!inDnodeList) {
95✔
742
        return true;
52✔
743
      }
744
    } else {
745
      return true;  // TS-6191
×
746
    }
747
  }
748

749
  int64_t curMs = taosGetTimestampMs();
5,589✔
750
  bool    online = mndIsDnodeOnline(pDnode, curMs);
5,589✔
751
  bool    isMnode = mndIsMnode(pMnode, pDnode->id);
5,589✔
752
  pDnode->numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
5,589✔
753
  pDnode->memUsed = mndGetVnodesMemory(pMnode, pDnode->id);
5,589✔
754

755
  mInfo("dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d online:%d memory avail:%" PRId64 " used:%" PRId64, pDnode->id,
5,589!
756
        pDnode->numOfVnodes, pDnode->numOfSupportVnodes, isMnode, online, pDnode->memAvail, pDnode->memUsed);
757

758
  if (isMnode) {
5,589✔
759
    pDnode->numOfOtherNodes++;
4,457✔
760
  }
761

762
  if (online && pDnode->numOfSupportVnodes > 0) {
5,589✔
763
    if (taosArrayPush(pArray, pDnode) == NULL) return false;
5,488!
764
  }
765
  return true;
5,589✔
766
}
767

768
static bool isDnodeInList(SArray *dnodeList, int32_t dnodeId) {
×
769
  int32_t dnodeListSize = taosArrayGetSize(dnodeList);
×
770
  for (int32_t i = 0; i < dnodeListSize; ++i) {
×
771
    int32_t id = *(int32_t *)TARRAY_GET_ELEM(dnodeList, i);
×
772
    if (id == dnodeId) {
×
773
      return true;
×
774
    }
775
  }
776
  return false;
×
777
}
778

779
#ifdef TD_ENTERPRISE
780
static float mndGetDnodeScore1(SDnodeObj *pDnode, int32_t additionDnodes, float ratio) {
×
781
  float totalDnodes = pDnode->numOfVnodes + (float)pDnode->numOfOtherNodes * ratio + additionDnodes;
×
782
  float result = totalDnodes / pDnode->numOfSupportVnodes;
×
783
  return pDnode->numOfVnodes > 0 ? -result : result;
×
784
}
785

786
static int32_t mndCompareDnodeVnodes1(SDnodeObj *pDnode1, SDnodeObj *pDnode2) {
×
787
  float d1Score = mndGetDnodeScore1(pDnode1, 0, 0.9);
×
788
  float d2Score = mndGetDnodeScore1(pDnode2, 0, 0.9);
×
789
  if (d1Score == d2Score) {
×
790
    if (pDnode1->id == pDnode2->id) {
×
791
      return 0;
×
792
    }
793
    return pDnode1->id > pDnode2->id ? 1 : -1;
×
794
  }
795
  return d1Score > d2Score ? 1 : -1;
×
796
}
797

798
static bool mndBuildDnodesListFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
×
799
  SDnodeObj *pDnode = pObj;
×
800
  SArray    *pArray = p1;
×
801

802
  bool isMnode = mndIsMnode(pMnode, pDnode->id);
×
803
  pDnode->numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
×
804

805
  if (isMnode) {
×
806
    pDnode->numOfOtherNodes++;
×
807
  }
808

809
  if (pDnode->numOfSupportVnodes > 0) {
×
810
    if (taosArrayPush(pArray, pDnode) == NULL) return false;
×
811
  }
812
  return true;
×
813
}
814

815
// TS-6191
816
static int32_t mndBuildNodesCheckDualReplica(SMnode *pMnode, int32_t nDnodes, SArray *dnodeList, SArray **ppDnodeList) {
4,242✔
817
  int32_t code = 0;
4,242✔
818
  if (!grantCheckDualReplicaDnodes(pMnode)) {
4,242!
819
    TAOS_RETURN(code);
4,242✔
820
  }
821
  SSdb   *pSdb = pMnode->pSdb;
×
822
  SArray *pArray = taosArrayInit(nDnodes, sizeof(SDnodeObj));
×
823
  if (pArray == NULL) {
×
824
    TAOS_RETURN(code = terrno);
×
825
  }
826
  *ppDnodeList = pArray;
×
827

828
  sdbTraverse(pSdb, SDB_DNODE, mndResetDnodesArrayFp, NULL, NULL, NULL);
×
829
  sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesListFp, pArray, NULL, NULL);
×
830

831
  int32_t arrSize = taosArrayGetSize(pArray);
×
832
  if (arrSize <= 0) {
×
833
    TAOS_RETURN(code);
×
834
  }
835
  if (arrSize > 1) taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes1);
×
836

837
  int32_t dnodeListSize = taosArrayGetSize(dnodeList);
×
838
  if (dnodeListSize <= 0) {
×
839
    if (arrSize > 2) taosArrayRemoveBatch(pArray, 2, arrSize - 2, NULL);
×
840
  } else {
841
    int32_t nDnodesWithVnodes = 0;
×
842
    for (int32_t i = 0; i < arrSize; ++i) {
×
843
      SDnodeObj *pDnode = TARRAY_GET_ELEM(pArray, i);
×
844
      if (pDnode->numOfVnodes <= 0) {
×
845
        break;
×
846
      }
847
      ++nDnodesWithVnodes;
×
848
    }
849
    int32_t dnodeId = -1;
×
850
    if (nDnodesWithVnodes == 1) {
×
851
      dnodeId = ((SDnodeObj *)TARRAY_GET_ELEM(pArray, 0))->id;
×
852
    } else if (nDnodesWithVnodes >= 2) {
×
853
      // must select the dnodes from the 1st 2 dnodes
854
      taosArrayRemoveBatch(pArray, 2, arrSize - 2, NULL);
×
855
    }
856
    for (int32_t i = 0; i < TARRAY_SIZE(pArray);) {
×
857
      SDnodeObj *pDnode = taosArrayGet(pArray, i);
×
858
      if (!isDnodeInList(dnodeList, pDnode->id)) {
×
859
        taosArrayRemove(pArray, i);
×
860
        continue;
×
861
      }
862
      ++i;
×
863
    }
864
    if (nDnodesWithVnodes == 1) {
×
865
      SDnodeObj *pDnode = taosArrayGet(pArray, 0);
×
866
      if (pDnode && (pDnode->id != dnodeId)) {  // the first dnode is not in dnodeList, remove the last element
×
867
        taosArrayRemove(pArray, taosArrayGetSize(pArray) - 1);
×
868
      }
869
    }
870
  }
871

872
  TAOS_RETURN(code);
×
873
}
874
#endif
875

876
SArray *mndBuildDnodesArray(SMnode *pMnode, int32_t exceptDnodeId, SArray *dnodeList) {
4,242✔
877
  SSdb   *pSdb = pMnode->pSdb;
4,242✔
878
  int32_t numOfDnodes = mndGetDnodeSize(pMnode);
4,242✔
879
  SArray *tDnodeList = NULL;
4,242✔
880
  SArray *pDnodeList = NULL;
4,242✔
881

882
  SArray *pArray = taosArrayInit(numOfDnodes, sizeof(SDnodeObj));
4,242✔
883
  if (pArray == NULL) {
4,242!
884
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
885
    return NULL;
×
886
  }
887
  if (taosArrayGetSize(dnodeList) > 0) {
4,242✔
888
    tDnodeList = dnodeList;
19✔
889
  }
890
#ifdef TD_ENTERPRISE
891
  if (0 != mndBuildNodesCheckDualReplica(pMnode, numOfDnodes, tDnodeList, &pDnodeList)) {
4,242!
892
    taosArrayDestroy(pArray);
×
893
    return NULL;
×
894
  }
895
#endif
896
  sdbTraverse(pSdb, SDB_DNODE, mndResetDnodesArrayFp, NULL, NULL, NULL);
4,242✔
897
  sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesArrayFp, pArray, &exceptDnodeId, pDnodeList ? pDnodeList : tDnodeList);
4,242!
898

899
  mDebug("build %d dnodes array", (int32_t)taosArrayGetSize(pArray));
4,242✔
900
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
9,730✔
901
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
5,488✔
902
    mDebug("dnode:%d, vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes);
5,488✔
903
  }
904
  taosArrayDestroy(pDnodeList);
4,242✔
905
  return pArray;
4,242✔
906
}
907

908
static int32_t mndCompareDnodeId(int32_t *dnode1Id, int32_t *dnode2Id) {
×
909
  if (*dnode1Id == *dnode2Id) {
×
910
    return 0;
×
911
  }
912
  return *dnode1Id > *dnode2Id ? 1 : -1;
×
913
}
914

915
static float mndGetDnodeScore(SDnodeObj *pDnode, int32_t additionDnodes, float ratio) {
20,700✔
916
  float totalDnodes = pDnode->numOfVnodes + (float)pDnode->numOfOtherNodes * ratio + additionDnodes;
20,700✔
917
  return totalDnodes / pDnode->numOfSupportVnodes;
20,700✔
918
}
919

920
static int32_t mndCompareDnodeVnodes(SDnodeObj *pDnode1, SDnodeObj *pDnode2) {
4,871✔
921
  float d1Score = mndGetDnodeScore(pDnode1, 0, 0.9);
4,871✔
922
  float d2Score = mndGetDnodeScore(pDnode2, 0, 0.9);
4,871✔
923
  if (d1Score == d2Score) {
4,871✔
924
    return 0;
1,724✔
925
  }
926
  return d1Score > d2Score ? 1 : -1;
3,147✔
927
}
928

929
void mndSortVnodeGid(SVgObj *pVgroup) {
8,571✔
930
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
18,077✔
931
    for (int32_t j = 0; j < pVgroup->replica - 1 - i; ++j) {
10,904✔
932
      if (pVgroup->vnodeGid[j].dnodeId > pVgroup->vnodeGid[j + 1].dnodeId) {
1,398✔
933
        TSWAP(pVgroup->vnodeGid[j], pVgroup->vnodeGid[j + 1]);
536✔
934
      }
935
    }
936
  }
937
}
8,571✔
938

939
static int32_t mndGetAvailableDnode(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, SArray *pArray) {
8,500✔
940
  mDebug("start to sort %d dnodes", (int32_t)taosArrayGetSize(pArray));
8,500✔
941
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
8,500✔
942
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
19,589✔
943
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
11,089✔
944
    mDebug("dnode:%d, score:%f", pDnode->id, mndGetDnodeScore(pDnode, 0, 0.9));
11,089✔
945
  }
946

947
  int32_t size = taosArrayGetSize(pArray);
8,500✔
948
  if (size < pVgroup->replica) {
8,500✔
949
    mError("db:%s, vgId:%d, no enough online dnodes:%d to alloc %d replica", pVgroup->dbName, pVgroup->vgId, size,
9!
950
           pVgroup->replica);
951
    TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_DNODES);
9✔
952
  }
953

954
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
17,771✔
955
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
9,280✔
956
    SDnodeObj *pDnode = taosArrayGet(pArray, v);
9,280✔
957
    if (pDnode == NULL) {
9,280!
958
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_DNODES);
×
959
    }
960
    if (pDnode->numOfVnodes >= pDnode->numOfSupportVnodes) {
9,280!
961
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_VNODES);
×
962
    }
963

964
    int64_t vgMem = mndGetVgroupMemory(pMnode, pDb, pVgroup);
9,280✔
965
    if (pDnode->memAvail - vgMem - pDnode->memUsed <= 0) {
9,280!
966
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d, avail:%" PRId64 " used:%" PRId64,
×
967
             pVgroup->dbName, pVgroup->vgId, vgMem, pDnode->id, pDnode->memAvail, pDnode->memUsed);
968
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE);
×
969
    } else {
970
      pDnode->memUsed += vgMem;
9,280✔
971
    }
972

973
    pVgid->dnodeId = pDnode->id;
9,280✔
974
    if (pVgroup->replica == 1) {
9,280✔
975
      pVgid->syncState = TAOS_SYNC_STATE_LEADER;
8,092✔
976
    } else {
977
      pVgid->syncState = TAOS_SYNC_STATE_FOLLOWER;
1,188✔
978
    }
979

980
    mInfo("db:%s, vgId:%d, vn:%d is alloced, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
9,280!
981
          pVgroup->dbName, pVgroup->vgId, v, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
982
    pDnode->numOfVnodes++;
9,280✔
983
  }
984

985
  mndSortVnodeGid(pVgroup);
8,491✔
986
  return 0;
8,491✔
987
}
988

989
int32_t mndAllocSmaVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup) {
×
990
  int32_t code = 0;
×
991
  SArray *pArray = mndBuildDnodesArray(pMnode, 0, NULL);
×
992
  if (pArray == NULL) {
×
993
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
994
    if (terrno != 0) code = terrno;
×
995
    TAOS_RETURN(code);
×
996
  }
997

998
  pVgroup->vgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
×
999
  pVgroup->isTsma = 1;
×
1000
  pVgroup->createdTime = taosGetTimestampMs();
×
1001
  pVgroup->updateTime = pVgroup->createdTime;
×
1002
  pVgroup->version = 1;
×
1003
  memcpy(pVgroup->dbName, pDb->name, TSDB_DB_FNAME_LEN);
×
1004
  pVgroup->dbUid = pDb->uid;
×
1005
  pVgroup->replica = 1;
×
1006

1007
  if (mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray) != 0) return -1;
×
1008
  taosArrayDestroy(pArray);
×
1009

1010
  mInfo("db:%s, sma vgId:%d is alloced", pDb->name, pVgroup->vgId);
×
1011
  return 0;
×
1012
}
1013

1014
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups, SArray *dnodeList) {
3,993✔
1015
  int32_t code = -1;
3,993✔
1016
  SArray *pArray = NULL;
3,993✔
1017
  SVgObj *pVgroups = NULL;
3,993✔
1018

1019
  pVgroups = taosMemoryCalloc(pDb->cfg.numOfVgroups, sizeof(SVgObj));
3,993!
1020
  if (pVgroups == NULL) {
3,993!
1021
    code = terrno;
×
1022
    goto _OVER;
×
1023
  }
1024

1025
  pArray = mndBuildDnodesArray(pMnode, 0, dnodeList);
3,993✔
1026
  if (pArray == NULL) {
3,993!
1027
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1028
    if (terrno != 0) code = terrno;
×
1029
    goto _OVER;
×
1030
  }
1031

1032
  mInfo("db:%s, total %d dnodes used to create %d vgroups (%d vnodes)", pDb->name, (int32_t)taosArrayGetSize(pArray),
3,993!
1033
        pDb->cfg.numOfVgroups, pDb->cfg.numOfVgroups * pDb->cfg.replications);
1034

1035
  int32_t  allocedVgroups = 0;
3,993✔
1036
  int32_t  maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
3,993✔
1037
  uint32_t hashMin = 0;
3,993✔
1038
  uint32_t hashMax = UINT32_MAX;
3,993✔
1039
  uint32_t hashInterval = (hashMax - hashMin) / pDb->cfg.numOfVgroups;
3,993✔
1040

1041
  if (maxVgId < 2) maxVgId = 2;
3,993✔
1042

1043
  for (uint32_t v = 0; v < pDb->cfg.numOfVgroups; v++) {
12,484✔
1044
    SVgObj *pVgroup = &pVgroups[v];
8,500✔
1045
    pVgroup->vgId = maxVgId++;
8,500✔
1046
    pVgroup->createdTime = taosGetTimestampMs();
8,500✔
1047
    pVgroup->updateTime = pVgroups->createdTime;
8,500✔
1048
    pVgroup->version = 1;
8,500✔
1049
    pVgroup->hashBegin = hashMin + hashInterval * v;
8,500✔
1050
    if (v == pDb->cfg.numOfVgroups - 1) {
8,500✔
1051
      pVgroup->hashEnd = hashMax;
3,988✔
1052
    } else {
1053
      pVgroup->hashEnd = hashMin + hashInterval * (v + 1) - 1;
4,512✔
1054
    }
1055

1056
    memcpy(pVgroup->dbName, pDb->name, TSDB_DB_FNAME_LEN);
8,500✔
1057
    pVgroup->dbUid = pDb->uid;
8,500✔
1058
    pVgroup->replica = pDb->cfg.replications;
8,500✔
1059

1060
    if ((code = mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray)) != 0) {
8,500✔
1061
      goto _OVER;
9✔
1062
    }
1063

1064
    allocedVgroups++;
8,491✔
1065
  }
1066

1067
  *ppVgroups = pVgroups;
3,984✔
1068
  code = 0;
3,984✔
1069

1070
  mInfo("db:%s, total %d vgroups is alloced, replica:%d", pDb->name, pDb->cfg.numOfVgroups, pDb->cfg.replications);
3,984!
1071

1072
_OVER:
×
1073
  if (code != 0) taosMemoryFree(pVgroups);
3,993!
1074
  taosArrayDestroy(pArray);
3,993✔
1075
  TAOS_RETURN(code);
3,993✔
1076
}
1077

1078
SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup) {
77,968✔
1079
  SEpSet epset = {0};
77,968✔
1080

1081
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
175,206✔
1082
    const SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
97,238✔
1083
    SDnodeObj       *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
97,238✔
1084
    if (pDnode == NULL) continue;
97,238✔
1085

1086
    if (pVgid->syncState == TAOS_SYNC_STATE_LEADER || pVgid->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
97,208✔
1087
      epset.inUse = epset.numOfEps;
77,026✔
1088
    }
1089

1090
    if (addEpIntoEpSet(&epset, pDnode->fqdn, pDnode->port) != 0) {
97,208!
1091
      mWarn("vgId:%d, failed to add ep:%s:%d into epset", pVgroup->vgId, pDnode->fqdn, pDnode->port);
×
1092
    }
1093
    mndReleaseDnode(pMnode, pDnode);
97,208✔
1094
  }
1095
  epsetSort(&epset);
77,968✔
1096

1097
  return epset;
77,968✔
1098
}
1099

1100
SEpSet mndGetVgroupEpsetById(SMnode *pMnode, int32_t vgId) {
572✔
1101
  SEpSet epset = {0};
572✔
1102

1103
  SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
572✔
1104
  if (!pVgroup) return epset;
572!
1105

1106
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
1,244✔
1107
    const SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
672✔
1108
    SDnodeObj       *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
672✔
1109
    if (pDnode == NULL) continue;
672!
1110

1111
    if (pVgid->syncState == TAOS_SYNC_STATE_LEADER || pVgid->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
672✔
1112
      epset.inUse = epset.numOfEps;
547✔
1113
    }
1114

1115
    if (addEpIntoEpSet(&epset, pDnode->fqdn, pDnode->port) != 0) {
672!
1116
      mWarn("vgId:%d, failed to add ep:%s:%d into epset", pVgroup->vgId, pDnode->fqdn, pDnode->port);
×
1117
    }
1118
    mndReleaseDnode(pMnode, pDnode);
672✔
1119
  }
1120

1121
  mndReleaseVgroup(pMnode, pVgroup);
572✔
1122
  return epset;
572✔
1123
}
1124

1125
static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
620✔
1126
  SMnode *pMnode = pReq->info.node;
620✔
1127
  SSdb   *pSdb = pMnode->pSdb;
620✔
1128
  int32_t numOfRows = 0;
620✔
1129
  SVgObj *pVgroup = NULL;
620✔
1130
  int32_t cols = 0;
620✔
1131
  int64_t curMs = taosGetTimestampMs();
620✔
1132
  int32_t code = 0, lino = 0;
620✔
1133

1134
  SDbObj *pDb = NULL;
620✔
1135
  if (strlen(pShow->db) > 0) {
620✔
1136
    pDb = mndAcquireDb(pMnode, pShow->db);
510✔
1137
    if (pDb == NULL) {
510!
1138
      return 0;
×
1139
    }
1140
  }
1141

1142
  while (numOfRows < rows) {
2,997!
1143
    pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
2,997✔
1144
    if (pShow->pIter == NULL) break;
2,997✔
1145

1146
    if (pDb != NULL && pVgroup->dbUid != pDb->uid) {
2,377✔
1147
      sdbRelease(pSdb, pVgroup);
643✔
1148
      continue;
643✔
1149
    }
1150

1151
    cols = 0;
1,734✔
1152
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,734✔
1153
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->vgId, false, pVgroup, pShow->pIter, _OVER);
1,734!
1154

1155
    SName name = {0};
1,734✔
1156
    char  db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
1,734✔
1157
    code = tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB);
1,734✔
1158
    if (code != 0) {
1,734!
1159
      mError("vgId:%d, failed to set dbName, since %s", pVgroup->vgId, tstrerror(code));
×
1160
      sdbRelease(pSdb, pVgroup);
×
1161
      sdbCancelFetch(pSdb, pShow->pIter);
×
1162
      return code;
×
1163
    }
1164
    (void)tNameGetDbName(&name, varDataVal(db));
1,734✔
1165
    varDataSetLen(db, strlen(varDataVal(db)));
1,734✔
1166

1167
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,734✔
1168
    COL_DATA_SET_VAL_GOTO((const char *)db, false, pVgroup, pShow->pIter, _OVER);
1,734!
1169

1170
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,734✔
1171
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->numOfTables, false, pVgroup, pShow->pIter, _OVER);
1,734!
1172

1173
    // default 3 replica, add 1 replica if move vnode
1174
    for (int32_t i = 0; i < 4; ++i) {
8,670✔
1175
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
6,936✔
1176
      if (i < pVgroup->replica) {
6,936✔
1177
        int16_t dnodeId = (int16_t)pVgroup->vnodeGid[i].dnodeId;
3,288✔
1178
        COL_DATA_SET_VAL_GOTO((const char *)&dnodeId, false, pVgroup, pShow->pIter, _OVER);
3,288!
1179

1180
        bool       exist = false;
3,288✔
1181
        bool       online = false;
3,288✔
1182
        SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgroup->vnodeGid[i].dnodeId);
3,288✔
1183
        if (pDnode != NULL) {
3,288!
1184
          exist = true;
3,288✔
1185
          online = mndIsDnodeOnline(pDnode, curMs);
3,288✔
1186
          mndReleaseDnode(pMnode, pDnode);
3,288✔
1187
        }
1188

1189
        char buf1[20] = {0};
3,288✔
1190
        char role[20] = "offline";
3,288✔
1191
        if (!exist) {
3,288!
1192
          tstrncpy(role, "dropping", sizeof(role));
×
1193
        } else if (online) {
3,288✔
1194
          char *star = "";
3,196✔
1195
          if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEADER ||
3,196✔
1196
              pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
1,807✔
1197
            if (!pVgroup->vnodeGid[i].syncRestore && !pVgroup->vnodeGid[i].syncCanRead) {
1,391!
1198
              star = "**";
359✔
1199
            } else if (!pVgroup->vnodeGid[i].syncRestore && pVgroup->vnodeGid[i].syncCanRead) {
1,032!
1200
              star = "*";
×
1201
            } else {
1202
            }
1203
          }
1204
          snprintf(role, sizeof(role), "%s%s", syncStr(pVgroup->vnodeGid[i].syncState), star);
3,196✔
1205
          /*
1206
          mInfo("db:%s, learner progress:%d", pDb->name, pVgroup->vnodeGid[i].learnerProgress);
1207

1208
          if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEARNER) {
1209
            if(pVgroup->vnodeGid[i].learnerProgress < 0){
1210
              snprintf(role, sizeof(role), "%s-",
1211
                syncStr(pVgroup->vnodeGid[i].syncState));
1212

1213
            }
1214
            else if(pVgroup->vnodeGid[i].learnerProgress >= 100){
1215
              snprintf(role, sizeof(role), "%s--",
1216
                syncStr(pVgroup->vnodeGid[i].syncState));
1217
            }
1218
            else{
1219
              snprintf(role, sizeof(role), "%s%d",
1220
                syncStr(pVgroup->vnodeGid[i].syncState), pVgroup->vnodeGid[i].learnerProgress);
1221
            }
1222
          }
1223
          else{
1224
            snprintf(role, sizeof(role), "%s%s", syncStr(pVgroup->vnodeGid[i].syncState), star);
1225
          }
1226
          */
1227
        } else {
1228
        }
1229
        STR_WITH_MAXSIZE_TO_VARSTR(buf1, role, pShow->pMeta->pSchemas[cols].bytes);
3,288✔
1230

1231
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3,288✔
1232
        COL_DATA_SET_VAL_GOTO((const char *)buf1, false, pVgroup, pShow->pIter, _OVER);
3,288!
1233

1234
        char applyStr[TSDB_SYNC_APPLY_COMMIT_LEN + 1] = {0};
3,288✔
1235
        char buf[TSDB_SYNC_APPLY_COMMIT_LEN + VARSTR_HEADER_SIZE + 1] = {0};
3,288✔
1236
        snprintf(applyStr, sizeof(applyStr), "%" PRId64 "/%" PRId64, pVgroup->vnodeGid[i].syncAppliedIndex,
3,288✔
1237
                 pVgroup->vnodeGid[i].syncCommitIndex);
3,288✔
1238
        STR_WITH_MAXSIZE_TO_VARSTR(buf, applyStr, pShow->pMeta->pSchemas[cols].bytes);
3,288✔
1239

1240
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3,288✔
1241
        COL_DATA_SET_VAL_GOTO((const char *)&buf, false, pVgroup, pShow->pIter, _OVER);
3,288!
1242
      } else {
1243
        colDataSetNULL(pColInfo, numOfRows);
3,648!
1244
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3,648✔
1245
        colDataSetNULL(pColInfo, numOfRows);
3,648!
1246
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3,648✔
1247
        colDataSetNULL(pColInfo, numOfRows);
3,648!
1248
      }
1249
    }
1250

1251
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,734✔
1252
    int32_t cacheUsage = (int32_t)pVgroup->cacheUsage;
1,734✔
1253
    COL_DATA_SET_VAL_GOTO((const char *)&cacheUsage, false, pVgroup, pShow->pIter, _OVER);
1,734!
1254

1255
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,734✔
1256
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->numOfCachedTables, false, pVgroup, pShow->pIter, _OVER);
1,734!
1257

1258
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,734✔
1259
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->isTsma, false, pVgroup, pShow->pIter, _OVER);
1,734!
1260

1261
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,734✔
1262
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->mountVgId, false, pVgroup, pShow->pIter, _OVER);
1,734!
1263

1264
    numOfRows++;
1,734✔
1265
    sdbRelease(pSdb, pVgroup);
1,734✔
1266
  }
1267
_OVER:
×
1268
  if (pDb != NULL) {
620✔
1269
    mndReleaseDb(pMnode, pDb);
510✔
1270
  }
1271
  if (code != 0) {
620!
1272
    mError("failed to retrieve vgroup info at line %d since %s", lino, tstrerror(code));
×
1273
    TAOS_RETURN(code);
×
1274
  }
1275

1276
  pShow->numOfRows += numOfRows;
620✔
1277
  return numOfRows;
620✔
1278
}
1279

1280
static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter) {
×
1281
  SSdb *pSdb = pMnode->pSdb;
×
1282
  sdbCancelFetchByType(pSdb, pIter, SDB_VGROUP);
×
1283
}
×
1284

1285
static bool mndGetVnodesNumFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
21,546✔
1286
  SVgObj  *pVgroup = pObj;
21,546✔
1287
  int32_t  dnodeId = *(int32_t *)p1;
21,546✔
1288
  int32_t *pNumOfVnodes = (int32_t *)p2;
21,546✔
1289

1290
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
57,329✔
1291
    if (pVgroup->vnodeGid[v].dnodeId == dnodeId) {
35,783✔
1292
      (*pNumOfVnodes)++;
13,563✔
1293
    }
1294
  }
1295

1296
  return true;
21,546✔
1297
}
1298

1299
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) {
11,196✔
1300
  int32_t numOfVnodes = 0;
11,196✔
1301
  sdbTraverse(pMnode->pSdb, SDB_VGROUP, mndGetVnodesNumFp, &dnodeId, &numOfVnodes, NULL);
11,196✔
1302
  return numOfVnodes;
11,196✔
1303
}
1304

1305
int64_t mndGetVgroupMemory(SMnode *pMnode, SDbObj *pDbInput, SVgObj *pVgroup) {
20,369✔
1306
  SDbObj *pDb = pDbInput;
20,369✔
1307
  if (pDbInput == NULL) {
20,369✔
1308
    pDb = mndAcquireDb(pMnode, pVgroup->dbName);
9,815✔
1309
  }
1310

1311
  int64_t vgroupMemroy = 0;
20,369✔
1312
  if (pDb != NULL) {
20,369✔
1313
    int64_t buffer = (int64_t)pDb->cfg.buffer * 1024 * 1024;
20,351✔
1314
    int64_t cache = (int64_t)pDb->cfg.pages * pDb->cfg.pageSize * 1024;
20,351✔
1315
    vgroupMemroy = buffer + cache;
20,351✔
1316
    int64_t cacheLast = (int64_t)pDb->cfg.cacheLastSize * 1024 * 1024;
20,351✔
1317
    if (pDb->cfg.cacheLast > 0) {
20,351✔
1318
      vgroupMemroy += cacheLast;
1,765✔
1319
    }
1320
    mDebug("db:%s, vgroup:%d, buffer:%" PRId64 " cache:%" PRId64 " cacheLast:%" PRId64, pDb->name, pVgroup->vgId,
20,351✔
1321
           buffer, cache, cacheLast);
1322
  }
1323

1324
  if (pDbInput == NULL) {
20,369✔
1325
    mndReleaseDb(pMnode, pDb);
9,815✔
1326
  }
1327
  return vgroupMemroy;
20,369✔
1328
}
1329

1330
static bool mndGetVnodeMemroyFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
12,709✔
1331
  SVgObj  *pVgroup = pObj;
12,709✔
1332
  int32_t  dnodeId = *(int32_t *)p1;
12,709✔
1333
  int64_t *pVnodeMemory = (int64_t *)p2;
12,709✔
1334

1335
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
30,767✔
1336
    if (pVgroup->vnodeGid[v].dnodeId == dnodeId) {
18,058✔
1337
      *pVnodeMemory += mndGetVgroupMemory(pMnode, NULL, pVgroup);
9,481✔
1338
    }
1339
  }
1340

1341
  return true;
12,709✔
1342
}
1343

1344
int64_t mndGetVnodesMemory(SMnode *pMnode, int32_t dnodeId) {
5,591✔
1345
  int64_t vnodeMemory = 0;
5,591✔
1346
  sdbTraverse(pMnode->pSdb, SDB_VGROUP, mndGetVnodeMemroyFp, &dnodeId, &vnodeMemory, NULL);
5,591✔
1347
  return vnodeMemory;
5,591✔
1348
}
1349

1350
void calculateRstoreFinishTime(double rate, int64_t applyCount, char *restoreStr, size_t restoreStrSize) {
3✔
1351
  if (rate == 0) {
3!
1352
    snprintf(restoreStr, restoreStrSize, "0:0:0");
3✔
1353
    return;
3✔
1354
  }
1355

1356
  int64_t costTime = applyCount / rate;
×
1357
  int64_t totalSeconds = costTime / 1000;
×
1358
  int64_t hours = totalSeconds / 3600;
×
1359
  totalSeconds %= 3600;
×
1360
  int64_t minutes = totalSeconds / 60;
×
1361
  int64_t seconds = totalSeconds % 60;
×
1362
  snprintf(restoreStr, restoreStrSize, "%" PRId64 ":%" PRId64 ":%" PRId64, hours, minutes, seconds);
×
1363
}
1364

1365
static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
63✔
1366
  SMnode *pMnode = pReq->info.node;
63✔
1367
  SSdb   *pSdb = pMnode->pSdb;
63✔
1368
  int32_t numOfRows = 0;
63✔
1369
  SVgObj *pVgroup = NULL;
63✔
1370
  int32_t cols = 0;
63✔
1371
  int64_t curMs = taosGetTimestampMs();
63✔
1372
  int32_t code = 0;
63✔
1373

1374
  while (numOfRows < rows - TSDB_MAX_REPLICA) {
171!
1375
    pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
171✔
1376
    if (pShow->pIter == NULL) break;
171✔
1377

1378
    for (int32_t i = 0; i < pVgroup->replica && numOfRows < rows; ++i) {
327!
1379
      SVnodeGid       *pGid = &pVgroup->vnodeGid[i];
219✔
1380
      SColumnInfoData *pColInfo = NULL;
219✔
1381
      cols = 0;
219✔
1382

1383
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
219✔
1384
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pGid->dnodeId, false);
219✔
1385
      if (code != 0) {
219!
1386
        mError("vgId:%d, failed to set dnodeId, since %s", pVgroup->vgId, tstrerror(code));
×
1387
        return code;
×
1388
      }
1389
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
219✔
1390
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->vgId, false);
219✔
1391
      if (code != 0) {
219!
1392
        mError("vgId:%d, failed to set vgId, since %s", pVgroup->vgId, tstrerror(code));
×
1393
        return code;
×
1394
      }
1395

1396
      // db_name
1397
      const char *dbname = mndGetDbStr(pVgroup->dbName);
219✔
1398
      char        b1[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
219✔
1399
      if (dbname != NULL) {
219!
1400
        STR_WITH_MAXSIZE_TO_VARSTR(b1, dbname, TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE);
219✔
1401
      } else {
1402
        STR_WITH_MAXSIZE_TO_VARSTR(b1, "NULL", TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE);
×
1403
      }
1404
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
219✔
1405
      code = colDataSetVal(pColInfo, numOfRows, (const char *)b1, false);
219✔
1406
      if (code != 0) {
219!
1407
        mError("vgId:%d, failed to set dbName, since %s", pVgroup->vgId, tstrerror(code));
×
1408
        return code;
×
1409
      }
1410

1411
      // dnode is online?
1412
      SDnodeObj *pDnode = mndAcquireDnode(pMnode, pGid->dnodeId);
219✔
1413
      if (pDnode == NULL) {
219!
1414
        mError("failed to acquire dnode. dnodeId:%d", pGid->dnodeId);
×
1415
        break;
×
1416
      }
1417
      bool isDnodeOnline = mndIsDnodeOnline(pDnode, curMs);
219✔
1418

1419
      char       buf[20] = {0};
219✔
1420
      ESyncState syncState = (isDnodeOnline) ? pGid->syncState : TAOS_SYNC_STATE_OFFLINE;
219✔
1421
      STR_TO_VARSTR(buf, syncStr(syncState));
219✔
1422
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
219✔
1423
      code = colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
219✔
1424
      if (code != 0) {
219!
1425
        mError("vgId:%d, failed to set syncState, since %s", pVgroup->vgId, tstrerror(code));
×
1426
        return code;
×
1427
      }
1428

1429
      int64_t roleTimeMs = (isDnodeOnline) ? pGid->roleTimeMs : 0;
219✔
1430
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
219✔
1431
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&roleTimeMs, false);
219✔
1432
      if (code != 0) {
219!
1433
        mError("vgId:%d, failed to set roleTimeMs, since %s", pVgroup->vgId, tstrerror(code));
×
1434
        return code;
×
1435
      }
1436

1437
      int64_t startTimeMs = (isDnodeOnline) ? pGid->startTimeMs : 0;
219✔
1438
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
219✔
1439
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&startTimeMs, false);
219✔
1440
      if (code != 0) {
219!
1441
        mError("vgId:%d, failed to set startTimeMs, since %s", pVgroup->vgId, tstrerror(code));
×
1442
        return code;
×
1443
      }
1444

1445
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
219✔
1446
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pGid->syncRestore, false);
219✔
1447
      if (code != 0) {
219!
1448
        mError("vgId:%d, failed to set syncRestore, since %s", pVgroup->vgId, tstrerror(code));
×
1449
        return code;
×
1450
      }
1451

1452
      int64_t unappliedCount = pGid->syncCommitIndex - pGid->syncAppliedIndex;
219✔
1453
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
219✔
1454
      char restoreStr[20] = {0};
219✔
1455
      if (unappliedCount > 0) {
219✔
1456
        calculateRstoreFinishTime(pGid->appliedRate, unappliedCount, restoreStr, sizeof(restoreStr));
3✔
1457
      }
1458
      STR_TO_VARSTR(buf, restoreStr);
219✔
1459
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&buf, false);
219✔
1460
      if (code != 0) {
219!
1461
        mError("vgId:%d, failed to set syncRestore finish time, since %s", pVgroup->vgId, tstrerror(code));
×
1462
        return code;
×
1463
      }
1464

1465
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
219✔
1466
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&unappliedCount, false);
219✔
1467
      if (code != 0) {
219!
1468
        mError("vgId:%d, failed to set syncRestore, since %s", pVgroup->vgId, tstrerror(code));
×
1469
        return code;
×
1470
      }
1471

1472
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
219✔
1473
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pGid->bufferSegmentUsed, false);
219✔
1474
      if (code != 0) {
219!
1475
        mError("vgId:%d, failed to set buffer segment used, since %s", pVgroup->vgId, tstrerror(code));
×
1476
        return code;
×
1477
      }
1478

1479
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
219✔
1480
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pGid->bufferSegmentSize, false);
219✔
1481
      if (code != 0) {
219!
1482
        mError("vgId:%d, failed to set buffer segment size, since %s", pVgroup->vgId, tstrerror(code));
×
1483
        return code;
×
1484
      }
1485

1486
      numOfRows++;
219✔
1487
      sdbRelease(pSdb, pDnode);
219✔
1488
    }
1489

1490
    sdbRelease(pSdb, pVgroup);
108✔
1491
  }
1492

1493
  pShow->numOfRows += numOfRows;
63✔
1494
  return numOfRows;
63✔
1495
}
1496

1497
static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter) {
×
1498
  SSdb *pSdb = pMnode->pSdb;
×
1499
  sdbCancelFetchByType(pSdb, pIter, SDB_VGROUP);
×
1500
}
×
1501

1502
static int32_t mndAddVnodeToVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray) {
209✔
1503
  int32_t code = 0;
209✔
1504
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
209✔
1505
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
780✔
1506
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
571✔
1507
    mInfo("trans:%d, dnode:%d, equivalent vnodes:%d others:%d", pTrans->id, pDnode->id, pDnode->numOfVnodes,
571!
1508
          pDnode->numOfOtherNodes);
1509
  }
1510

1511
  SVnodeGid *pVgid = &pVgroup->vnodeGid[pVgroup->replica];
209✔
1512
  for (int32_t d = 0; d < taosArrayGetSize(pArray); ++d) {
257✔
1513
    SDnodeObj *pDnode = taosArrayGet(pArray, d);
245✔
1514

1515
    bool used = false;
245✔
1516
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
572✔
1517
      if (pDnode->id == pVgroup->vnodeGid[vn].dnodeId) {
375✔
1518
        used = true;
48✔
1519
        break;
48✔
1520
      }
1521
    }
1522
    if (used) continue;
245✔
1523

1524
    if (pDnode == NULL) {
197!
1525
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_DNODES);
×
1526
    }
1527
    if (pDnode->numOfVnodes >= pDnode->numOfSupportVnodes) {
197!
1528
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_VNODES);
×
1529
    }
1530

1531
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
197✔
1532
    if (pDnode->memAvail - vgMem - pDnode->memUsed <= 0) {
197!
1533
      mError("trans:%d, db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
1534
             pTrans->id, pVgroup->dbName, pVgroup->vgId, vgMem, pDnode->id, pDnode->memAvail, pDnode->memUsed);
1535
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE);
×
1536
    } else {
1537
      pDnode->memUsed += vgMem;
197✔
1538
    }
1539

1540
    pVgid->dnodeId = pDnode->id;
197✔
1541
    pVgid->syncState = TAOS_SYNC_STATE_OFFLINE;
197✔
1542
    mInfo("trans:%id, db:%s, vgId:%d, vn:%d is added, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
197!
1543
          pTrans->id, pVgroup->dbName, pVgroup->vgId, pVgroup->replica, vgMem, pVgid->dnodeId, pDnode->memAvail,
1544
          pDnode->memUsed);
1545

1546
    pVgroup->replica++;
197✔
1547
    pDnode->numOfVnodes++;
197✔
1548

1549
    SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
197✔
1550
    if (pVgRaw == NULL) {
197!
1551
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1552
      if (terrno != 0) code = terrno;
×
1553
      TAOS_RETURN(code);
×
1554
    }
1555
    if ((code = mndTransAppendGroupRedolog(pTrans, pVgRaw, pVgroup->vgId)) != 0) {
197!
1556
      sdbFreeRaw(pVgRaw);
×
1557
      TAOS_RETURN(code);
×
1558
    }
1559
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
197✔
1560
    if (code != 0) {
197!
1561
      mError("trans:%d, vgId:%d, failed to set raw status since %s at line:%d", pTrans->id, pVgroup->vgId,
×
1562
             tstrerror(code), __LINE__);
1563
    }
1564
    TAOS_RETURN(code);
197✔
1565
  }
1566

1567
  code = TSDB_CODE_MND_NO_ENOUGH_DNODES;
12✔
1568
  mError("trans:%d, db:%s, failed to add vnode to vgId:%d since %s", pTrans->id, pVgroup->dbName, pVgroup->vgId,
12!
1569
         tstrerror(code));
1570
  TAOS_RETURN(code);
12✔
1571
}
1572

1573
static int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray,
47✔
1574
                                        SVnodeGid *pDelVgid) {
1575
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
47✔
1576
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
173✔
1577
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
126✔
1578
    mInfo("trans:%d, dnode:%d, equivalent vnodes:%d others:%d", pTrans->id, pDnode->id, pDnode->numOfVnodes,
126!
1579
          pDnode->numOfOtherNodes);
1580
  }
1581

1582
  int32_t code = -1;
47✔
1583
  for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) {
48!
1584
    SDnodeObj *pDnode = taosArrayGet(pArray, d);
48✔
1585

1586
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
85✔
1587
      SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
84✔
1588
      if (pVgid->dnodeId == pDnode->id) {
84✔
1589
        int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
47✔
1590
        pDnode->memUsed -= vgMem;
47✔
1591
        mInfo("trans:%d, db:%s, vgId:%d, vn:%d is removed, memory:%" PRId64 ", dnode:%d avail:%" PRId64
47!
1592
              " used:%" PRId64,
1593
              pTrans->id, pVgroup->dbName, pVgroup->vgId, vn, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
1594
        pDnode->numOfVnodes--;
47✔
1595
        pVgroup->replica--;
47✔
1596
        *pDelVgid = *pVgid;
47✔
1597
        *pVgid = pVgroup->vnodeGid[pVgroup->replica];
47✔
1598
        memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
47✔
1599
        code = 0;
47✔
1600
        goto _OVER;
47✔
1601
      }
1602
    }
1603
  }
1604

1605
_OVER:
×
1606
  if (code != 0) {
47!
1607
    code = TSDB_CODE_APP_ERROR;
×
1608
    mError("trans:%d, db:%s, failed to remove vnode from vgId:%d since %s", pTrans->id, pVgroup->dbName, pVgroup->vgId,
×
1609
           tstrerror(code));
1610
    TAOS_RETURN(code);
×
1611
  }
1612

1613
  for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
134✔
1614
    SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
87✔
1615
    mInfo("trans:%d, db:%s, vgId:%d, vn:%d dnode:%d is reserved", pTrans->id, pVgroup->dbName, pVgroup->vgId, vn,
87!
1616
          pVgid->dnodeId);
1617
  }
1618

1619
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
47✔
1620
  if (pVgRaw == NULL) {
47!
1621
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1622
    if (terrno != 0) code = terrno;
×
1623
    TAOS_RETURN(code);
×
1624
  }
1625
  if (mndTransAppendGroupRedolog(pTrans, pVgRaw, pVgroup->vgId) != 0) {
47!
1626
    sdbFreeRaw(pVgRaw);
×
1627
    TAOS_RETURN(code);
×
1628
  }
1629
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
47✔
1630
  if (code != 0) {
47!
1631
    mError("trans:%d, vgId:%d, failed to set raw status since %s at line:%d", pTrans->id, pVgroup->vgId,
×
1632
           tstrerror(code), __LINE__);
1633
  }
1634

1635
  TAOS_RETURN(code);
47✔
1636
}
1637

1638
static int32_t mndRemoveVnodeFromVgroupWithoutSave(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray,
×
1639
                                                   SVnodeGid *pDelVgid) {
1640
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
×
1641
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
×
1642
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
×
1643
    mInfo("dnode:%d, equivalent vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes);
×
1644
  }
1645

1646
  int32_t code = -1;
×
1647
  for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) {
×
1648
    SDnodeObj *pDnode = taosArrayGet(pArray, d);
×
1649

1650
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
×
1651
      SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
×
1652
      if (pVgid->dnodeId == pDnode->id) {
×
1653
        int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
×
1654
        pDnode->memUsed -= vgMem;
×
1655
        mInfo("db:%s, vgId:%d, vn:%d is removed, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
×
1656
              pVgroup->dbName, pVgroup->vgId, vn, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
1657
        pDnode->numOfVnodes--;
×
1658
        pVgroup->replica--;
×
1659
        *pDelVgid = *pVgid;
×
1660
        *pVgid = pVgroup->vnodeGid[pVgroup->replica];
×
1661
        memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
×
1662
        code = 0;
×
1663
        goto _OVER;
×
1664
      }
1665
    }
1666
  }
1667

1668
_OVER:
×
1669
  if (code != 0) {
×
1670
    code = TSDB_CODE_APP_ERROR;
×
1671
    mError("db:%s, failed to remove vnode from vgId:%d since %s", pVgroup->dbName, pVgroup->vgId, tstrerror(code));
×
1672
    TAOS_RETURN(code);
×
1673
  }
1674

1675
  for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
×
1676
    SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
×
1677
    mInfo("db:%s, vgId:%d, vn:%d dnode:%d is reserved", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId);
×
1678
  }
1679

1680
  TAOS_RETURN(code);
×
1681
}
1682

1683
int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid) {
9,588✔
1684
  int32_t      code = 0;
9,588✔
1685
  STransAction action = {0};
9,588✔
1686

1687
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
9,588✔
1688
  if (pDnode == NULL) return -1;
9,588!
1689
  action.epSet = mndGetDnodeEpset(pDnode);
9,588✔
1690
  mndReleaseDnode(pMnode, pDnode);
9,588✔
1691

1692
  int32_t contLen = 0;
9,588✔
1693
  void   *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
9,588✔
1694
  if (pReq == NULL) return -1;
9,588!
1695

1696
  action.pCont = pReq;
9,588✔
1697
  action.contLen = contLen;
9,588✔
1698
  action.msgType = TDMT_DND_CREATE_VNODE;
9,588✔
1699
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
9,588✔
1700
  action.groupId = pVgroup->vgId;
9,588✔
1701

1702
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
9,588!
1703
    taosMemoryFree(pReq);
×
1704
    TAOS_RETURN(code);
×
1705
  }
1706

1707
  TAOS_RETURN(code);
9,588✔
1708
}
1709

1710
int32_t mndRestoreAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
10✔
1711
                                       SDnodeObj *pDnode) {
1712
  int32_t      code = 0;
10✔
1713
  STransAction action = {0};
10✔
1714

1715
  action.epSet = mndGetDnodeEpset(pDnode);
10✔
1716

1717
  int32_t contLen = 0;
10✔
1718
  void   *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
10✔
1719
  if (pReq == NULL) {
10!
1720
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1721
    if (terrno != 0) code = terrno;
×
1722
    TAOS_RETURN(code);
×
1723
  }
1724

1725
  action.pCont = pReq;
10✔
1726
  action.contLen = contLen;
10✔
1727
  action.msgType = TDMT_DND_CREATE_VNODE;
10✔
1728
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
10✔
1729
  action.groupId = pVgroup->vgId;
10✔
1730

1731
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
10!
1732
    taosMemoryFree(pReq);
×
1733
    TAOS_RETURN(code);
×
1734
  }
1735

1736
  TAOS_RETURN(code);
10✔
1737
}
1738

1739
int32_t mndAddAlterVnodeConfirmAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
594✔
1740
  int32_t      code = 0;
594✔
1741
  STransAction action = {0};
594✔
1742
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
594✔
1743

1744
  mInfo("trans:%d, vgId:%d, build alter vnode confirm req", pTrans->id, pVgroup->vgId);
594!
1745
  int32_t   contLen = sizeof(SMsgHead);
594✔
1746
  SMsgHead *pHead = taosMemoryMalloc(contLen);
594!
1747
  if (pHead == NULL) {
594!
1748
    TAOS_RETURN(terrno);
×
1749
  }
1750

1751
  pHead->contLen = htonl(contLen);
594✔
1752
  pHead->vgId = htonl(pVgroup->vgId);
594✔
1753

1754
  action.pCont = pHead;
594✔
1755
  action.contLen = contLen;
594✔
1756
  action.msgType = TDMT_VND_ALTER_CONFIRM;
594✔
1757
  // incorrect redirect result will cause this erro
1758
  action.retryCode = TSDB_CODE_VND_INVALID_VGROUP_ID;
594✔
1759
  action.groupId = pVgroup->vgId;
594✔
1760

1761
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
594!
1762
    taosMemoryFree(pHead);
×
1763
    TAOS_RETURN(code);
×
1764
  }
1765

1766
  TAOS_RETURN(code);
594✔
1767
}
1768

1769
int32_t mndAddChangeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pOldVgroup, SVgObj *pNewVgroup,
×
1770
                                 int32_t dnodeId) {
1771
  int32_t      code = 0;
×
1772
  STransAction action = {0};
×
1773
  action.epSet = mndGetVgroupEpset(pMnode, pNewVgroup);
×
1774

1775
  int32_t contLen = 0;
×
1776
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pNewVgroup, dnodeId, &contLen);
×
1777
  if (pReq == NULL) {
×
1778
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1779
    if (terrno != 0) code = terrno;
×
1780
    TAOS_RETURN(code);
×
1781
  }
1782

1783
  int32_t totallen = contLen + sizeof(SMsgHead);
×
1784

1785
  SMsgHead *pHead = taosMemoryMalloc(totallen);
×
1786
  if (pHead == NULL) {
×
1787
    taosMemoryFree(pReq);
×
1788
    TAOS_RETURN(terrno);
×
1789
  }
1790

1791
  pHead->contLen = htonl(totallen);
×
1792
  pHead->vgId = htonl(pNewVgroup->vgId);
×
1793

1794
  memcpy((void *)(pHead + 1), pReq, contLen);
×
1795
  taosMemoryFree(pReq);
×
1796

1797
  action.pCont = pHead;
×
1798
  action.contLen = totallen;
×
1799
  action.msgType = TDMT_SYNC_CONFIG_CHANGE;
×
1800

1801
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
1802
    taosMemoryFree(pHead);
×
1803
    TAOS_RETURN(code);
×
1804
  }
1805

1806
  TAOS_RETURN(code);
×
1807
}
1808

1809
static int32_t mndAddAlterVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, int32_t srcVgId, SVgObj *pVgroup) {
88✔
1810
  int32_t      code = 0;
88✔
1811
  STransAction action = {0};
88✔
1812
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
88✔
1813

1814
  int32_t contLen = 0;
88✔
1815
  void   *pReq = mndBuildAlterVnodeHashRangeReq(pMnode, srcVgId, pVgroup, &contLen);
88✔
1816
  if (pReq == NULL) {
88!
1817
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1818
    if (terrno != 0) code = terrno;
×
1819
    TAOS_RETURN(code);
×
1820
  }
1821

1822
  action.pCont = pReq;
88✔
1823
  action.contLen = contLen;
88✔
1824
  action.msgType = TDMT_VND_ALTER_HASHRANGE;
88✔
1825
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
88✔
1826

1827
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
88!
1828
    taosMemoryFree(pReq);
×
1829
    TAOS_RETURN(code);
×
1830
  }
1831

1832
  mInfo("trans:%d, add alter vnode hash range action for from vgId:%d to vgId:%d", pTrans->id, srcVgId, pVgroup->vgId);
88!
1833
  TAOS_RETURN(code);
88✔
1834
}
1835

1836
int32_t mndAddAlterVnodeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
437✔
1837
  int32_t      code = 0;
437✔
1838
  STransAction action = {0};
437✔
1839
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
437✔
1840

1841
  int32_t contLen = 0;
437✔
1842
  void   *pReq = mndBuildAlterVnodeConfigReq(pMnode, pDb, pVgroup, &contLen);
437✔
1843
  if (pReq == NULL) {
437!
1844
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1845
    if (terrno != 0) code = terrno;
×
1846
    TAOS_RETURN(code);
×
1847
  }
1848

1849
  action.pCont = pReq;
437✔
1850
  action.contLen = contLen;
437✔
1851
  action.msgType = TDMT_VND_ALTER_CONFIG;
437✔
1852
  action.groupId = pVgroup->vgId;
437✔
1853

1854
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
437!
1855
    taosMemoryFree(pReq);
×
1856
    TAOS_RETURN(code);
×
1857
  }
1858

1859
  TAOS_RETURN(code);
437✔
1860
}
1861

1862
int32_t mndAddNewVgPrepareAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVg) {
8,587✔
1863
  int32_t  code = 0;
8,587✔
1864
  SSdbRaw *pRaw = mndVgroupActionEncode(pVg);
8,587✔
1865
  if (pRaw == NULL) {
8,587!
1866
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1867
    if (terrno != 0) code = terrno;
×
1868
    goto _err;
×
1869
  }
1870

1871
  TAOS_CHECK_GOTO(mndTransAppendPrepareLog(pTrans, pRaw), NULL, _err);
8,587!
1872
  if (sdbSetRawStatus(pRaw, SDB_STATUS_CREATING) != 0) {
8,587!
1873
    mError("vgId:%d, failed to set raw status at line:%d", pVg->vgId, __LINE__);
×
1874
  }
1875
  if (code != 0) {
8,587!
1876
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVg->vgId, tstrerror(code), __LINE__);
×
1877
    TAOS_RETURN(code);
×
1878
  }
1879
  pRaw = NULL;
8,587✔
1880
  TAOS_RETURN(code);
8,587✔
1881

1882
_err:
×
1883
  sdbFreeRaw(pRaw);
×
1884
  TAOS_RETURN(code);
×
1885
}
1886

1887
int32_t mndAddAlterVnodeReplicaAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
1,318✔
1888
  int32_t    code = 0;
1,318✔
1889
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
1,318✔
1890
  if (pDnode == NULL) {
1,318!
1891
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1892
    if (terrno != 0) code = terrno;
×
1893
    TAOS_RETURN(code);
×
1894
  }
1895

1896
  STransAction action = {0};
1,318✔
1897
  action.epSet = mndGetDnodeEpset(pDnode);
1,318✔
1898
  mndReleaseDnode(pMnode, pDnode);
1,318✔
1899

1900
  int32_t contLen = 0;
1,318✔
1901
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
1,318✔
1902
  if (pReq == NULL) {
1,318!
1903
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1904
    if (terrno != 0) code = terrno;
×
1905
    TAOS_RETURN(code);
×
1906
  }
1907

1908
  action.pCont = pReq;
1,318✔
1909
  action.contLen = contLen;
1,318✔
1910
  action.msgType = TDMT_VND_ALTER_REPLICA;
1,318✔
1911
  action.groupId = pVgroup->vgId;
1,318✔
1912

1913
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
1,318!
1914
    taosMemoryFree(pReq);
×
1915
    TAOS_RETURN(code);
×
1916
  }
1917

1918
  TAOS_RETURN(code);
1,318✔
1919
}
1920

1921
int32_t mndAddCheckLearnerCatchupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
×
1922
  int32_t    code = 0;
×
1923
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
×
1924
  if (pDnode == NULL) {
×
1925
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1926
    if (terrno != 0) code = terrno;
×
1927
    TAOS_RETURN(code);
×
1928
  }
1929

1930
  STransAction action = {0};
×
1931
  action.epSet = mndGetDnodeEpset(pDnode);
×
1932
  mndReleaseDnode(pMnode, pDnode);
×
1933

1934
  int32_t contLen = 0;
×
1935
  void   *pReq = mndBuildCheckLearnCatchupReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
×
1936
  if (pReq == NULL) {
×
1937
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1938
    if (terrno != 0) code = terrno;
×
1939
    TAOS_RETURN(code);
×
1940
  }
1941

1942
  action.pCont = pReq;
×
1943
  action.contLen = contLen;
×
1944
  action.msgType = TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP;
×
1945
  action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
1946
  action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
×
1947

1948
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
1949
    taosMemoryFree(pReq);
×
1950
    TAOS_RETURN(code);
×
1951
  }
1952

1953
  TAOS_RETURN(code);
×
1954
}
1955

1956
int32_t mndAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
206✔
1957
  int32_t    code = 0;
206✔
1958
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
206✔
1959
  if (pDnode == NULL) {
206!
1960
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1961
    if (terrno != 0) code = terrno;
×
1962
    TAOS_RETURN(code);
×
1963
  }
1964

1965
  STransAction action = {0};
206✔
1966
  action.epSet = mndGetDnodeEpset(pDnode);
206✔
1967
  mndReleaseDnode(pMnode, pDnode);
206✔
1968

1969
  int32_t contLen = 0;
206✔
1970
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
206✔
1971
  if (pReq == NULL) {
206!
1972
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1973
    if (terrno != 0) code = terrno;
×
1974
    TAOS_RETURN(code);
×
1975
  }
1976

1977
  action.pCont = pReq;
206✔
1978
  action.contLen = contLen;
206✔
1979
  action.msgType = TDMT_DND_ALTER_VNODE_TYPE;
206✔
1980
  action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
206✔
1981
  action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
206✔
1982
  action.groupId = pVgroup->vgId;
206✔
1983

1984
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
206!
1985
    taosMemoryFree(pReq);
×
1986
    TAOS_RETURN(code);
×
1987
  }
1988

1989
  TAOS_RETURN(code);
206✔
1990
}
1991

1992
int32_t mndRestoreAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
10✔
1993
                                          SDnodeObj *pDnode) {
1994
  int32_t      code = 0;
10✔
1995
  STransAction action = {0};
10✔
1996
  action.epSet = mndGetDnodeEpset(pDnode);
10✔
1997

1998
  int32_t contLen = 0;
10✔
1999
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, pDnode->id, &contLen);
10✔
2000
  if (pReq == NULL) {
10!
2001
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2002
    if (terrno != 0) code = terrno;
×
2003
    TAOS_RETURN(code);
×
2004
  }
2005

2006
  action.pCont = pReq;
10✔
2007
  action.contLen = contLen;
10✔
2008
  action.msgType = TDMT_DND_ALTER_VNODE_TYPE;
10✔
2009
  action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
10✔
2010
  action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
10✔
2011
  action.groupId = pVgroup->vgId;
10✔
2012

2013
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
10!
2014
    taosMemoryFree(pReq);
×
2015
    TAOS_RETURN(code);
×
2016
  }
2017

2018
  TAOS_RETURN(code);
10✔
2019
}
2020

2021
static int32_t mndAddDisableVnodeWriteAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
88✔
2022
                                             int32_t dnodeId) {
2023
  int32_t    code = 0;
88✔
2024
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
88✔
2025
  if (pDnode == NULL) {
88!
2026
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2027
    if (terrno != 0) code = terrno;
×
2028
    TAOS_RETURN(code);
×
2029
  }
2030

2031
  STransAction action = {0};
88✔
2032
  action.epSet = mndGetDnodeEpset(pDnode);
88✔
2033
  mndReleaseDnode(pMnode, pDnode);
88✔
2034

2035
  int32_t contLen = 0;
88✔
2036
  void   *pReq = mndBuildDisableVnodeWriteReq(pMnode, pDb, pVgroup->vgId, &contLen);
88✔
2037
  if (pReq == NULL) {
88!
2038
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2039
    if (terrno != 0) code = terrno;
×
2040
    TAOS_RETURN(code);
×
2041
  }
2042

2043
  action.pCont = pReq;
88✔
2044
  action.contLen = contLen;
88✔
2045
  action.msgType = TDMT_VND_DISABLE_WRITE;
88✔
2046

2047
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
88!
2048
    taosMemoryFree(pReq);
×
2049
    TAOS_RETURN(code);
×
2050
  }
2051

2052
  TAOS_RETURN(code);
88✔
2053
}
2054

2055
int32_t mndAddDropVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid,
13,370✔
2056
                              bool isRedo) {
2057
  int32_t      code = 0;
13,370✔
2058
  STransAction action = {0};
13,370✔
2059

2060
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
13,370✔
2061
  if (pDnode == NULL) {
13,370!
2062
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2063
    if (terrno != 0) code = terrno;
×
2064
    TAOS_RETURN(code);
×
2065
  }
2066
  action.epSet = mndGetDnodeEpset(pDnode);
13,370✔
2067
  mndReleaseDnode(pMnode, pDnode);
13,370✔
2068

2069
  int32_t contLen = 0;
13,370✔
2070
  void   *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
13,370✔
2071
  if (pReq == NULL) {
13,370!
2072
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2073
    if (terrno != 0) code = terrno;
×
2074
    TAOS_RETURN(code);
×
2075
  }
2076

2077
  action.pCont = pReq;
13,370✔
2078
  action.contLen = contLen;
13,370✔
2079
  action.msgType = TDMT_DND_DROP_VNODE;
13,370✔
2080
  action.acceptableCode = TSDB_CODE_VND_NOT_EXIST;
13,370✔
2081
  action.groupId = pVgroup->vgId;
13,370✔
2082

2083
  if (isRedo) {
13,370✔
2084
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
4,090!
2085
      taosMemoryFree(pReq);
×
2086
      TAOS_RETURN(code);
×
2087
    }
2088
  } else {
2089
    if ((code = mndTransAppendUndoAction(pTrans, &action)) != 0) {
9,280!
2090
      taosMemoryFree(pReq);
×
2091
      TAOS_RETURN(code);
×
2092
    }
2093
  }
2094

2095
  TAOS_RETURN(code);
13,370✔
2096
}
2097

2098
int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t vnIndex,
29✔
2099
                                    SArray *pArray, bool force, bool unsafe) {
2100
  int32_t code = 0;
29✔
2101
  SVgObj  newVg = {0};
29✔
2102
  memcpy(&newVg, pVgroup, sizeof(SVgObj));
29✔
2103

2104
  mInfo("vgId:%d, trans:%d, vgroup info before move, replica:%d", newVg.vgId, pTrans->id, newVg.replica);
29!
2105
  for (int32_t i = 0; i < newVg.replica; ++i) {
94✔
2106
    mInfo("vgId:%d, trans:%d, vnode:%d dnode:%d", newVg.vgId, pTrans->id, i, newVg.vnodeGid[i].dnodeId);
65!
2107
  }
2108

2109
  if (!force) {
29!
2110
#if 1
2111
    {
2112
#else
2113
    if (newVg.replica == 1) {
2114
#endif
2115
      mInfo("vgId:%d, trans:%d, will add 1 vnode, replca:%d", pVgroup->vgId, pTrans->id, newVg.replica);
29!
2116
      TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray));
29!
2117
      for (int32_t i = 0; i < newVg.replica - 1; ++i) {
94✔
2118
        TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId));
65!
2119
      }
2120
      TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[newVg.replica - 1]));
29!
2121
      TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg));
29!
2122

2123
      mInfo("vgId:%d, trans:%d, will remove 1 vnode, replca:2", pVgroup->vgId, pTrans->id);
29!
2124
      newVg.replica--;
29✔
2125
      SVnodeGid del = newVg.vnodeGid[vnIndex];
29✔
2126
      newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
29✔
2127
      memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
29✔
2128
      {
2129
        SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
29✔
2130
        if (pRaw == NULL) {
29!
2131
          code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2132
          if (terrno != 0) code = terrno;
×
2133
          TAOS_RETURN(code);
×
2134
        }
2135
        if ((code = mndTransAppendGroupRedolog(pTrans, pRaw, pVgroup->vgId)) != 0) {
29!
2136
          sdbFreeRaw(pRaw);
×
2137
          TAOS_RETURN(code);
×
2138
        }
2139
        code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
29✔
2140
        if (code != 0) {
29!
2141
          mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2142
          return code;
×
2143
        }
2144
      }
2145

2146
      TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true));
29!
2147
      for (int32_t i = 0; i < newVg.replica; ++i) {
94✔
2148
        TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId));
65!
2149
      }
2150
      TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg));
29!
2151
#if 1
2152
    }
2153
#else
2154
    } else {  // new replica == 3
2155
      mInfo("vgId:%d, will add 1 vnode, replca:3", pVgroup->vgId);
2156
      if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1;
2157
      mInfo("vgId:%d, will remove 1 vnode, replca:4", pVgroup->vgId);
2158
      newVg.replica--;
2159
      SVnodeGid del = newVg.vnodeGid[vnIndex];
2160
      newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
2161
      memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
2162
      {
2163
        SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
2164
        if (pRaw == NULL) return -1;
2165
        if (mndTransAppendRedolog(pTrans, pRaw) != 0) {
2166
          sdbFreeRaw(pRaw);
2167
          return -1;
2168
        }
2169
      }
2170

2171
      if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true) != 0) return -1;
2172
      for (int32_t i = 0; i < newVg.replica; ++i) {
2173
        if (i == vnIndex) continue;
2174
        if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1;
2175
      }
2176
      if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[vnIndex]) != 0) return -1;
2177
      if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
2178
    }
2179
#endif
2180
  } else {
2181
    mInfo("vgId:%d, will add 1 vnode and force remove 1 vnode", pVgroup->vgId);
×
2182
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray));
×
2183
    newVg.replica--;
×
2184
    // SVnodeGid del = newVg.vnodeGid[vnIndex];
2185
    newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
×
2186
    memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
×
2187
    {
2188
      SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
×
2189
      if (pRaw == NULL) {
×
2190
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2191
        if (terrno != 0) code = terrno;
×
2192
        TAOS_RETURN(code);
×
2193
      }
2194
      if ((code = mndTransAppendGroupRedolog(pTrans, pRaw, pVgroup->vgId)) != 0) {
×
2195
        sdbFreeRaw(pRaw);
×
2196
        TAOS_RETURN(code);
×
2197
      }
2198
      code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
×
2199
      if (code != 0) {
×
2200
        mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2201
        return code;
×
2202
      }
2203
    }
2204

2205
    for (int32_t i = 0; i < newVg.replica; ++i) {
×
2206
      if (i != vnIndex) {
×
2207
        TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId));
×
2208
      }
2209
    }
2210
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[vnIndex]));
×
2211
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg));
×
2212

2213
    if (newVg.replica == 1) {
×
2214
      if (force && !unsafe) {
×
2215
        TAOS_RETURN(TSDB_CODE_VND_META_DATA_UNSAFE_DELETE);
×
2216
      }
2217

2218
      SSdb *pSdb = pMnode->pSdb;
×
2219
      void *pIter = NULL;
×
2220

2221
      while (1) {
×
2222
        SStbObj *pStb = NULL;
×
2223
        pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
×
2224
        if (pIter == NULL) break;
×
2225

2226
        if (strcmp(pStb->db, pDb->name) == 0) {
×
2227
          if ((code = mndSetForceDropCreateStbRedoActions(pMnode, pTrans, &newVg, pStb)) != 0) {
×
2228
            sdbCancelFetch(pSdb, pIter);
×
2229
            sdbRelease(pSdb, pStb);
×
2230
            TAOS_RETURN(code);
×
2231
          }
2232
        }
2233

2234
        sdbRelease(pSdb, pStb);
×
2235
      }
2236

2237
      mInfo("vgId:%d, all data is dropped since replica=1", pVgroup->vgId);
×
2238
    }
2239
  }
2240

2241
  {
2242
    SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
29✔
2243
    if (pRaw == NULL) {
29!
2244
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2245
      if (terrno != 0) code = terrno;
×
2246
      TAOS_RETURN(code);
×
2247
    }
2248
    if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
29!
2249
      sdbFreeRaw(pRaw);
×
2250
      TAOS_RETURN(code);
×
2251
    }
2252
    code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
29✔
2253
    if (code != 0) {
29!
2254
      mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2255
      return code;
×
2256
    }
2257
  }
2258

2259
  mInfo("vgId:%d, vgroup info after move, replica:%d", newVg.vgId, newVg.replica);
29!
2260
  for (int32_t i = 0; i < newVg.replica; ++i) {
94✔
2261
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
65!
2262
  }
2263
  TAOS_RETURN(code);
29✔
2264
}
2265

2266
int32_t mndSetMoveVgroupsInfoToTrans(SMnode *pMnode, STrans *pTrans, int32_t delDnodeId, bool force, bool unsafe) {
14✔
2267
  int32_t code = 0;
14✔
2268
  SArray *pArray = mndBuildDnodesArray(pMnode, delDnodeId, NULL);
14✔
2269
  if (pArray == NULL) {
14!
2270
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2271
    if (terrno != 0) code = terrno;
×
2272
    TAOS_RETURN(code);
×
2273
  }
2274

2275
  void *pIter = NULL;
14✔
2276
  while (1) {
39✔
2277
    SVgObj *pVgroup = NULL;
53✔
2278
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
53✔
2279
    if (pIter == NULL) break;
53✔
2280

2281
    int32_t vnIndex = -1;
39✔
2282
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
71✔
2283
      if (pVgroup->vnodeGid[i].dnodeId == delDnodeId) {
61✔
2284
        vnIndex = i;
29✔
2285
        break;
29✔
2286
      }
2287
    }
2288

2289
    code = 0;
39✔
2290
    if (vnIndex != -1) {
39✔
2291
      mInfo("vgId:%d, trans:%d, vnode:%d will be removed from dnode:%d, force:%d", pVgroup->vgId, pTrans->id, vnIndex,
29!
2292
            delDnodeId, force);
2293
      SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
29✔
2294
      code = mndSetMoveVgroupInfoToTrans(pMnode, pTrans, pDb, pVgroup, vnIndex, pArray, force, unsafe);
29✔
2295
      mndReleaseDb(pMnode, pDb);
29✔
2296
    }
2297

2298
    sdbRelease(pMnode->pSdb, pVgroup);
39✔
2299

2300
    if (code != 0) {
39!
2301
      sdbCancelFetch(pMnode->pSdb, pIter);
×
2302
      break;
×
2303
    }
2304
  }
2305

2306
  taosArrayDestroy(pArray);
14✔
2307
  TAOS_RETURN(code);
14✔
2308
}
2309

2310
static int32_t mndAddIncVgroupReplicaToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
111✔
2311
                                             int32_t newDnodeId) {
2312
  int32_t code = 0;
111✔
2313
  mInfo("vgId:%d, will add 1 vnode, replica:%d dnode:%d", pVgroup->vgId, pVgroup->replica, newDnodeId);
111!
2314

2315
  // assoc dnode
2316
  SVnodeGid *pGid = &pVgroup->vnodeGid[pVgroup->replica];
111✔
2317
  pVgroup->replica++;
111✔
2318
  pGid->dnodeId = newDnodeId;
111✔
2319
  pGid->syncState = TAOS_SYNC_STATE_OFFLINE;
111✔
2320
  pGid->nodeRole = TAOS_SYNC_ROLE_LEARNER;
111✔
2321

2322
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
111✔
2323
  if (pVgRaw == NULL) {
111!
2324
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2325
    if (terrno != 0) code = terrno;
×
2326
    TAOS_RETURN(code);
×
2327
  }
2328
  if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
111!
2329
    sdbFreeRaw(pVgRaw);
×
2330
    TAOS_RETURN(code);
×
2331
  }
2332
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
111✔
2333
  if (code != 0) {
111!
2334
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVgroup->vgId, tstrerror(code), __LINE__);
×
2335
    TAOS_RETURN(code);
×
2336
  }
2337

2338
  // learner
2339
  for (int32_t i = 0; i < pVgroup->replica - 1; ++i) {
366✔
2340
    TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId));
255!
2341
  }
2342
  TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pGid));
111!
2343

2344
  // voter
2345
  pGid->nodeRole = TAOS_SYNC_ROLE_VOTER;
111✔
2346
  TAOS_CHECK_RETURN(mndAddAlterVnodeTypeAction(pMnode, pTrans, pDb, pVgroup, pGid->dnodeId));
111!
2347
  for (int32_t i = 0; i < pVgroup->replica - 1; ++i) {
366✔
2348
    TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId));
255!
2349
  }
2350

2351
  // confirm
2352
  TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup));
111!
2353

2354
  TAOS_RETURN(code);
111✔
2355
}
2356

2357
static int32_t mndAddDecVgroupReplicaFromTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
111✔
2358
                                               int32_t delDnodeId) {
2359
  int32_t code = 0;
111✔
2360
  mInfo("vgId:%d, will remove 1 vnode, replica:%d dnode:%d", pVgroup->vgId, pVgroup->replica, delDnodeId);
111!
2361

2362
  SVnodeGid *pGid = NULL;
111✔
2363
  SVnodeGid  delGid = {0};
111✔
2364
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
184!
2365
    if (pVgroup->vnodeGid[i].dnodeId == delDnodeId) {
184✔
2366
      pGid = &pVgroup->vnodeGid[i];
111✔
2367
      break;
111✔
2368
    }
2369
  }
2370

2371
  if (pGid == NULL) return 0;
111!
2372

2373
  pVgroup->replica--;
111✔
2374
  memcpy(&delGid, pGid, sizeof(SVnodeGid));
111✔
2375
  memcpy(pGid, &pVgroup->vnodeGid[pVgroup->replica], sizeof(SVnodeGid));
111✔
2376
  memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
111✔
2377

2378
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
111✔
2379
  if (pVgRaw == NULL) {
111!
2380
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2381
    if (terrno != 0) code = terrno;
×
2382
    TAOS_RETURN(code);
×
2383
  }
2384
  if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
111!
2385
    sdbFreeRaw(pVgRaw);
×
2386
    TAOS_RETURN(code);
×
2387
  }
2388
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
111✔
2389
  if (code != 0) {
111!
2390
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVgroup->vgId, tstrerror(code), __LINE__);
×
2391
    TAOS_RETURN(code);
×
2392
  }
2393

2394
  TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pDb, pVgroup, &delGid, true));
111!
2395
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
366✔
2396
    TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId));
255!
2397
  }
2398
  TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup));
111!
2399

2400
  TAOS_RETURN(code);
111✔
2401
}
2402

2403
static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup, SDnodeObj *pNew1,
68✔
2404
                                     SDnodeObj *pOld1, SDnodeObj *pNew2, SDnodeObj *pOld2, SDnodeObj *pNew3,
2405
                                     SDnodeObj *pOld3) {
2406
  int32_t code = -1;
68✔
2407
  STrans *pTrans = NULL;
68✔
2408

2409
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "red-vgroup");
68✔
2410
  if (pTrans == NULL) {
68!
2411
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2412
    if (terrno != 0) code = terrno;
×
2413
    goto _OVER;
×
2414
  }
2415

2416
  mndTransSetDbName(pTrans, pVgroup->dbName, NULL);
68✔
2417
  TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
68✔
2418

2419
  mndTransSetSerial(pTrans);
67✔
2420
  mInfo("trans:%d, used to redistribute vgroup, vgId:%d", pTrans->id, pVgroup->vgId);
67!
2421

2422
  SVgObj newVg = {0};
67✔
2423
  memcpy(&newVg, pVgroup, sizeof(SVgObj));
67✔
2424
  mInfo("vgId:%d, vgroup info before redistribute, replica:%d", newVg.vgId, newVg.replica);
67!
2425
  for (int32_t i = 0; i < newVg.replica; ++i) {
212✔
2426
    mInfo("vgId:%d, vnode:%d dnode:%d role:%s", newVg.vgId, i, newVg.vnodeGid[i].dnodeId,
145!
2427
          syncStr(newVg.vnodeGid[i].syncState));
2428
  }
2429

2430
  if (pNew1 != NULL && pOld1 != NULL) {
67!
2431
    int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew1->id);
67✔
2432
    if (numOfVnodes >= pNew1->numOfSupportVnodes) {
67✔
2433
      mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew1->id, numOfVnodes,
1!
2434
             pNew1->numOfSupportVnodes);
2435
      code = TSDB_CODE_MND_NO_ENOUGH_VNODES;
1✔
2436
      goto _OVER;
1✔
2437
    }
2438

2439
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
66✔
2440
    if (pNew1->memAvail - vgMem - pNew1->memUsed <= 0) {
66!
2441
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
2442
             pVgroup->dbName, pVgroup->vgId, vgMem, pNew1->id, pNew1->memAvail, pNew1->memUsed);
2443
      code = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
×
2444
      goto _OVER;
×
2445
    } else {
2446
      pNew1->memUsed += vgMem;
66✔
2447
    }
2448

2449
    TAOS_CHECK_GOTO(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew1->id), NULL, _OVER);
66!
2450
    TAOS_CHECK_GOTO(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld1->id), NULL, _OVER);
66!
2451
  }
2452

2453
  if (pNew2 != NULL && pOld2 != NULL) {
66!
2454
    int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew2->id);
16✔
2455
    if (numOfVnodes >= pNew2->numOfSupportVnodes) {
16!
2456
      mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew2->id, numOfVnodes,
×
2457
             pNew2->numOfSupportVnodes);
2458
      code = TSDB_CODE_MND_NO_ENOUGH_VNODES;
×
2459
      goto _OVER;
×
2460
    }
2461
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
16✔
2462
    if (pNew2->memAvail - vgMem - pNew2->memUsed <= 0) {
16!
2463
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
2464
             pVgroup->dbName, pVgroup->vgId, vgMem, pNew2->id, pNew2->memAvail, pNew2->memUsed);
2465
      code = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
×
2466
      goto _OVER;
×
2467
    } else {
2468
      pNew2->memUsed += vgMem;
16✔
2469
    }
2470
    TAOS_CHECK_GOTO(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew2->id), NULL, _OVER);
16!
2471
    TAOS_CHECK_GOTO(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld2->id), NULL, _OVER);
16!
2472
  }
2473

2474
  if (pNew3 != NULL && pOld3 != NULL) {
66!
2475
    int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew3->id);
8✔
2476
    if (numOfVnodes >= pNew3->numOfSupportVnodes) {
8!
2477
      mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew3->id, numOfVnodes,
×
2478
             pNew3->numOfSupportVnodes);
2479
      code = TSDB_CODE_MND_NO_ENOUGH_VNODES;
×
2480
      goto _OVER;
×
2481
    }
2482
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
8✔
2483
    if (pNew3->memAvail - vgMem - pNew3->memUsed <= 0) {
8!
2484
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
2485
             pVgroup->dbName, pVgroup->vgId, vgMem, pNew3->id, pNew3->memAvail, pNew3->memUsed);
2486
      code = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
×
2487
      goto _OVER;
×
2488
    } else {
2489
      pNew3->memUsed += vgMem;
8✔
2490
    }
2491
    TAOS_CHECK_GOTO(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew3->id), NULL, _OVER);
8!
2492
    TAOS_CHECK_GOTO(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld3->id), NULL, _OVER);
8!
2493
  }
2494

2495
  {
2496
    SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
66✔
2497
    if (pRaw == NULL) {
66!
2498
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2499
      if (terrno != 0) code = terrno;
×
2500
      goto _OVER;
×
2501
    }
2502
    if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
66!
2503
      sdbFreeRaw(pRaw);
×
2504
      goto _OVER;
×
2505
    }
2506
    code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
66✔
2507
    if (code != 0) {
66!
2508
      mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2509
      goto _OVER;
×
2510
    }
2511
  }
2512

2513
  mInfo("vgId:%d, vgroup info after redistribute, replica:%d", newVg.vgId, newVg.replica);
66!
2514
  for (int32_t i = 0; i < newVg.replica; ++i) {
208✔
2515
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
142!
2516
  }
2517

2518
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
66✔
2519
  code = 0;
56✔
2520

2521
_OVER:
68✔
2522
  mndTransDrop(pTrans);
68✔
2523
  mndReleaseDb(pMnode, pDb);
68✔
2524
  TAOS_RETURN(code);
68✔
2525
}
2526

2527
static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) {
84✔
2528
  SMnode    *pMnode = pReq->info.node;
84✔
2529
  SDnodeObj *pNew1 = NULL;
84✔
2530
  SDnodeObj *pNew2 = NULL;
84✔
2531
  SDnodeObj *pNew3 = NULL;
84✔
2532
  SDnodeObj *pOld1 = NULL;
84✔
2533
  SDnodeObj *pOld2 = NULL;
84✔
2534
  SDnodeObj *pOld3 = NULL;
84✔
2535
  SVgObj    *pVgroup = NULL;
84✔
2536
  SDbObj    *pDb = NULL;
84✔
2537
  int32_t    code = -1;
84✔
2538
  int64_t    curMs = taosGetTimestampMs();
84✔
2539
  int32_t    newDnodeId[3] = {0};
84✔
2540
  int32_t    oldDnodeId[3] = {0};
84✔
2541
  int32_t    newIndex = -1;
84✔
2542
  int32_t    oldIndex = -1;
84✔
2543

2544
  SRedistributeVgroupReq req = {0};
84✔
2545
  if (tDeserializeSRedistributeVgroupReq(pReq->pCont, pReq->contLen, &req) != 0) {
84!
2546
    code = TSDB_CODE_INVALID_MSG;
×
2547
    goto _OVER;
×
2548
  }
2549

2550
  mInfo("vgId:%d, start to redistribute vgroup to dnode %d:%d:%d", req.vgId, req.dnodeId1, req.dnodeId2, req.dnodeId3);
84!
2551
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_REDISTRIBUTE_VGROUP)) != 0) {
84✔
2552
    goto _OVER;
1✔
2553
  }
2554

2555
  pVgroup = mndAcquireVgroup(pMnode, req.vgId);
83✔
2556
  if (pVgroup == NULL) {
83✔
2557
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
3✔
2558
    if (terrno != 0) code = terrno;
3!
2559
    goto _OVER;
3✔
2560
  }
2561
  if (pVgroup->mountVgId) {
80!
2562
    code = TSDB_CODE_MND_MOUNT_OBJ_NOT_SUPPORT;
×
2563
    goto _OVER;
×
2564
  }
2565
  pDb = mndAcquireDb(pMnode, pVgroup->dbName);
80✔
2566
  if (pDb == NULL) {
80!
2567
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2568
    if (terrno != 0) code = terrno;
×
2569
    goto _OVER;
×
2570
  }
2571

2572
  if (pVgroup->replica == 1) {
80✔
2573
    if (req.dnodeId1 <= 0 || req.dnodeId2 > 0 || req.dnodeId3 > 0) {
31!
2574
      code = TSDB_CODE_MND_INVALID_REPLICA;
×
2575
      goto _OVER;
×
2576
    }
2577

2578
    if (req.dnodeId1 == pVgroup->vnodeGid[0].dnodeId) {
31!
2579
      // terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
2580
      code = 0;
×
2581
      goto _OVER;
×
2582
    }
2583

2584
    pNew1 = mndAcquireDnode(pMnode, req.dnodeId1);
31✔
2585
    if (pNew1 == NULL) {
31!
2586
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2587
      if (terrno != 0) code = terrno;
×
2588
      goto _OVER;
×
2589
    }
2590
    if (!mndIsDnodeOnline(pNew1, curMs)) {
31!
2591
      code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2592
      goto _OVER;
×
2593
    }
2594

2595
    pOld1 = mndAcquireDnode(pMnode, pVgroup->vnodeGid[0].dnodeId);
31✔
2596
    if (pOld1 == NULL) {
31!
2597
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2598
      if (terrno != 0) code = terrno;
×
2599
      goto _OVER;
×
2600
    }
2601
    if (!mndIsDnodeOnline(pOld1, curMs)) {
31✔
2602
      code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
2✔
2603
      goto _OVER;
2✔
2604
    }
2605

2606
    code = mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, NULL, NULL, NULL, NULL);
29✔
2607

2608
  } else if (pVgroup->replica == 3) {
49!
2609
    if (req.dnodeId1 <= 0 || req.dnodeId2 <= 0 || req.dnodeId3 <= 0) {
49!
2610
      code = TSDB_CODE_MND_INVALID_REPLICA;
4✔
2611
      goto _OVER;
4✔
2612
    }
2613

2614
    if (req.dnodeId1 == req.dnodeId2 || req.dnodeId1 == req.dnodeId3 || req.dnodeId2 == req.dnodeId3) {
45!
2615
      code = TSDB_CODE_MND_INVALID_REPLICA;
1✔
2616
      goto _OVER;
1✔
2617
    }
2618

2619
    if (req.dnodeId1 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId1 != pVgroup->vnodeGid[1].dnodeId &&
44✔
2620
        req.dnodeId1 != pVgroup->vnodeGid[2].dnodeId) {
22✔
2621
      newDnodeId[++newIndex] = req.dnodeId1;
19✔
2622
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
19!
2623
    }
2624

2625
    if (req.dnodeId2 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[1].dnodeId &&
44✔
2626
        req.dnodeId2 != pVgroup->vnodeGid[2].dnodeId) {
29✔
2627
      newDnodeId[++newIndex] = req.dnodeId2;
22✔
2628
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
22!
2629
    }
2630

2631
    if (req.dnodeId3 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId3 != pVgroup->vnodeGid[1].dnodeId &&
44✔
2632
        req.dnodeId3 != pVgroup->vnodeGid[2].dnodeId) {
36✔
2633
      newDnodeId[++newIndex] = req.dnodeId3;
29✔
2634
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
29!
2635
    }
2636

2637
    if (req.dnodeId1 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[0].dnodeId &&
44✔
2638
        req.dnodeId3 != pVgroup->vnodeGid[0].dnodeId) {
28✔
2639
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[0].dnodeId;
25✔
2640
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
25!
2641
    }
2642

2643
    if (req.dnodeId1 != pVgroup->vnodeGid[1].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[1].dnodeId &&
44✔
2644
        req.dnodeId3 != pVgroup->vnodeGid[1].dnodeId) {
23✔
2645
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[1].dnodeId;
18✔
2646
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
18!
2647
    }
2648

2649
    if (req.dnodeId1 != pVgroup->vnodeGid[2].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[2].dnodeId &&
44✔
2650
        req.dnodeId3 != pVgroup->vnodeGid[2].dnodeId) {
34✔
2651
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[2].dnodeId;
27✔
2652
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
27!
2653
    }
2654

2655
    if (newDnodeId[0] != 0) {
44✔
2656
      pNew1 = mndAcquireDnode(pMnode, newDnodeId[0]);
42✔
2657
      if (pNew1 == NULL) {
42!
2658
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2659
        if (terrno != 0) code = terrno;
×
2660
        goto _OVER;
×
2661
      }
2662
      if (!mndIsDnodeOnline(pNew1, curMs)) {
42✔
2663
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
1✔
2664
        goto _OVER;
1✔
2665
      }
2666
    }
2667

2668
    if (newDnodeId[1] != 0) {
43✔
2669
      pNew2 = mndAcquireDnode(pMnode, newDnodeId[1]);
18✔
2670
      if (pNew2 == NULL) {
18!
2671
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2672
        if (terrno != 0) code = terrno;
×
2673
        goto _OVER;
×
2674
      }
2675
      if (!mndIsDnodeOnline(pNew2, curMs)) {
18!
2676
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2677
        goto _OVER;
×
2678
      }
2679
    }
2680

2681
    if (newDnodeId[2] != 0) {
43✔
2682
      pNew3 = mndAcquireDnode(pMnode, newDnodeId[2]);
10✔
2683
      if (pNew3 == NULL) {
10!
2684
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2685
        if (terrno != 0) code = terrno;
×
2686
        goto _OVER;
×
2687
      }
2688
      if (!mndIsDnodeOnline(pNew3, curMs)) {
10!
2689
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2690
        goto _OVER;
×
2691
      }
2692
    }
2693

2694
    if (oldDnodeId[0] != 0) {
43✔
2695
      pOld1 = mndAcquireDnode(pMnode, oldDnodeId[0]);
41✔
2696
      if (pOld1 == NULL) {
41!
2697
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2698
        if (terrno != 0) code = terrno;
×
2699
        goto _OVER;
×
2700
      }
2701
      if (!mndIsDnodeOnline(pOld1, curMs)) {
41✔
2702
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
2✔
2703
        goto _OVER;
2✔
2704
      }
2705
    }
2706

2707
    if (oldDnodeId[1] != 0) {
41✔
2708
      pOld2 = mndAcquireDnode(pMnode, oldDnodeId[1]);
16✔
2709
      if (pOld2 == NULL) {
16!
2710
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2711
        if (terrno != 0) code = terrno;
×
2712
        goto _OVER;
×
2713
      }
2714
      if (!mndIsDnodeOnline(pOld2, curMs)) {
16!
2715
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2716
        goto _OVER;
×
2717
      }
2718
    }
2719

2720
    if (oldDnodeId[2] != 0) {
41✔
2721
      pOld3 = mndAcquireDnode(pMnode, oldDnodeId[2]);
8✔
2722
      if (pOld3 == NULL) {
8!
2723
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2724
        if (terrno != 0) code = terrno;
×
2725
        goto _OVER;
×
2726
      }
2727
      if (!mndIsDnodeOnline(pOld3, curMs)) {
8!
2728
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2729
        goto _OVER;
×
2730
      }
2731
    }
2732

2733
    if (pNew1 == NULL && pOld1 == NULL && pNew2 == NULL && pOld2 == NULL && pNew3 == NULL && pOld3 == NULL) {
41!
2734
      // terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
2735
      code = 0;
2✔
2736
      goto _OVER;
2✔
2737
    }
2738

2739
    code = mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, pNew2, pOld2, pNew3, pOld3);
39✔
2740

2741
  } else {
2742
    code = TSDB_CODE_MND_REQ_REJECTED;
×
2743
    goto _OVER;
×
2744
  }
2745

2746
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
68✔
2747

2748
  char obj[33] = {0};
68✔
2749
  (void)tsnprintf(obj, sizeof(obj), "%d", req.vgId);
68✔
2750

2751
  auditRecord(pReq, pMnode->clusterId, "RedistributeVgroup", "", obj, req.sql, req.sqlLen);
68✔
2752

2753
_OVER:
84✔
2754
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
84✔
2755
    mError("vgId:%d, failed to redistribute to dnode %d:%d:%d since %s", req.vgId, req.dnodeId1, req.dnodeId2,
26!
2756
           req.dnodeId3, tstrerror(code));
2757
  }
2758

2759
  mndReleaseDnode(pMnode, pNew1);
84✔
2760
  mndReleaseDnode(pMnode, pNew2);
84✔
2761
  mndReleaseDnode(pMnode, pNew3);
84✔
2762
  mndReleaseDnode(pMnode, pOld1);
84✔
2763
  mndReleaseDnode(pMnode, pOld2);
84✔
2764
  mndReleaseDnode(pMnode, pOld3);
84✔
2765
  mndReleaseVgroup(pMnode, pVgroup);
84✔
2766
  mndReleaseDb(pMnode, pDb);
84✔
2767
  tFreeSRedistributeVgroupReq(&req);
84✔
2768

2769
  TAOS_RETURN(code);
84✔
2770
}
2771

2772
static void *mndBuildSForceBecomeFollowerReq(SMnode *pMnode, SVgObj *pVgroup, int32_t dnodeId, int32_t *pContLen) {
18✔
2773
  SForceBecomeFollowerReq balanceReq = {
18✔
2774
      .vgId = pVgroup->vgId,
18✔
2775
  };
2776

2777
  int32_t contLen = tSerializeSForceBecomeFollowerReq(NULL, 0, &balanceReq);
18✔
2778
  if (contLen < 0) {
18!
2779
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2780
    return NULL;
×
2781
  }
2782
  contLen += sizeof(SMsgHead);
18✔
2783

2784
  void *pReq = taosMemoryMalloc(contLen);
18!
2785
  if (pReq == NULL) {
18!
2786
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2787
    return NULL;
×
2788
  }
2789

2790
  SMsgHead *pHead = pReq;
18✔
2791
  pHead->contLen = htonl(contLen);
18✔
2792
  pHead->vgId = htonl(pVgroup->vgId);
18✔
2793

2794
  if (tSerializeSForceBecomeFollowerReq((char *)pReq + sizeof(SMsgHead), contLen, &balanceReq) < 0) {
18!
2795
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2796
    taosMemoryFree(pReq);
×
2797
    return NULL;
×
2798
  }
2799
  *pContLen = contLen;
18✔
2800
  return pReq;
18✔
2801
}
2802

2803
int32_t mndAddBalanceVgroupLeaderAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t dnodeId) {
18✔
2804
  int32_t    code = 0;
18✔
2805
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
18✔
2806
  if (pDnode == NULL) {
18!
2807
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2808
    if (terrno != 0) code = terrno;
×
2809
    TAOS_RETURN(code);
×
2810
  }
2811

2812
  STransAction action = {0};
18✔
2813
  action.epSet = mndGetDnodeEpset(pDnode);
18✔
2814
  mndReleaseDnode(pMnode, pDnode);
18✔
2815

2816
  int32_t contLen = 0;
18✔
2817
  void   *pReq = mndBuildSForceBecomeFollowerReq(pMnode, pVgroup, dnodeId, &contLen);
18✔
2818
  if (pReq == NULL) {
18!
2819
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2820
    if (terrno != 0) code = terrno;
×
2821
    TAOS_RETURN(code);
×
2822
  }
2823

2824
  action.pCont = pReq;
18✔
2825
  action.contLen = contLen;
18✔
2826
  action.msgType = TDMT_SYNC_FORCE_FOLLOWER;
18✔
2827

2828
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
18!
2829
    taosMemoryFree(pReq);
×
2830
    TAOS_RETURN(code);
×
2831
  }
2832

2833
  TAOS_RETURN(code);
18✔
2834
}
2835

2836
int32_t mndAddVgroupBalanceToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTrans) {
23✔
2837
  int32_t code = 0;
23✔
2838
  SSdb   *pSdb = pMnode->pSdb;
23✔
2839

2840
  int32_t vgid = pVgroup->vgId;
23✔
2841
  int8_t  replica = pVgroup->replica;
23✔
2842

2843
  if (pVgroup->replica <= 1) {
23✔
2844
    mInfo("trans:%d, vgid:%d no need to balance, replica:%d", pTrans->id, vgid, replica);
1!
2845
    return -1;
1✔
2846
  }
2847

2848
  int32_t dnodeId = 0;
22✔
2849

2850
  for (int i = 0; i < replica; i++) {
53✔
2851
    if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEADER) {
49✔
2852
      dnodeId = pVgroup->vnodeGid[i].dnodeId;
18✔
2853
      break;
18✔
2854
    }
2855
  }
2856

2857
  bool       exist = false;
22✔
2858
  bool       online = false;
22✔
2859
  int64_t    curMs = taosGetTimestampMs();
22✔
2860
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
22✔
2861
  if (pDnode != NULL) {
22✔
2862
    exist = true;
18✔
2863
    online = mndIsDnodeOnline(pDnode, curMs);
18✔
2864
    mndReleaseDnode(pMnode, pDnode);
18✔
2865
  }
2866

2867
  if (exist && online) {
40!
2868
    mInfo("trans:%d, vgid:%d leader to dnode:%d", pTrans->id, vgid, dnodeId);
18!
2869

2870
    if ((code = mndAddBalanceVgroupLeaderAction(pMnode, pTrans, pVgroup, dnodeId)) != 0) {
18!
2871
      mError("trans:%d, vgid:%d failed to be balanced to dnode:%d", pTrans->id, vgid, dnodeId);
×
2872
      TAOS_RETURN(code);
×
2873
    }
2874

2875
    SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
18✔
2876
    if (pDb == NULL) {
18!
2877
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2878
      if (terrno != 0) code = terrno;
×
2879
      mError("trans:%d, vgid:%d failed to be balanced to dnode:%d, because db not exist", pTrans->id, vgid, dnodeId);
×
2880
      TAOS_RETURN(code);
×
2881
    }
2882

2883
    mndReleaseDb(pMnode, pDb);
18✔
2884
  } else {
2885
    mInfo("trans:%d, vgid:%d cant be balanced to dnode:%d, exist:%d, online:%d", pTrans->id, vgid, dnodeId, exist,
4!
2886
          online);
2887
  }
2888

2889
  TAOS_RETURN(code);
22✔
2890
}
2891

2892
extern int32_t mndProcessVgroupBalanceLeaderMsgImp(SRpcMsg *pReq);
2893

2894
int32_t mndProcessVgroupBalanceLeaderMsg(SRpcMsg *pReq) { return mndProcessVgroupBalanceLeaderMsgImp(pReq); }
12✔
2895

2896
#ifndef TD_ENTERPRISE
2897
int32_t mndProcessVgroupBalanceLeaderMsgImp(SRpcMsg *pReq) { return 0; }
2898
#endif
2899

2900
static int32_t mndCheckDnodeMemory(SMnode *pMnode, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pOldVgroup,
437✔
2901
                                   SVgObj *pNewVgroup, SArray *pArray) {
2902
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
1,164✔
2903
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
727✔
2904
    bool       inVgroup = false;
727✔
2905
    int64_t    oldMemUsed = 0;
727✔
2906
    int64_t    newMemUsed = 0;
727✔
2907
    mDebug("db:%s, vgId:%d, check dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName, pNewVgroup->vgId,
727!
2908
           pDnode->id, pDnode->memAvail, pDnode->memUsed);
2909
    for (int32_t j = 0; j < pOldVgroup->replica; ++j) {
2,042✔
2910
      SVnodeGid *pVgId = &pOldVgroup->vnodeGid[j];
1,315✔
2911
      if (pDnode->id == pVgId->dnodeId) {
1,315✔
2912
        oldMemUsed = mndGetVgroupMemory(pMnode, pOldDb, pOldVgroup);
633✔
2913
        inVgroup = true;
633✔
2914
      }
2915
    }
2916
    for (int32_t j = 0; j < pNewVgroup->replica; ++j) {
2,042✔
2917
      SVnodeGid *pVgId = &pNewVgroup->vnodeGid[j];
1,315✔
2918
      if (pDnode->id == pVgId->dnodeId) {
1,315✔
2919
        newMemUsed = mndGetVgroupMemory(pMnode, pNewDb, pNewVgroup);
633✔
2920
        inVgroup = true;
633✔
2921
      }
2922
    }
2923

2924
    mDebug("db:%s, vgId:%d, memory in dnode:%d, oldUsed:%" PRId64 ", newUsed:%" PRId64, pNewVgroup->dbName,
727!
2925
           pNewVgroup->vgId, pDnode->id, oldMemUsed, newMemUsed);
2926

2927
    pDnode->memUsed = pDnode->memUsed - oldMemUsed + newMemUsed;
727✔
2928
    if (pDnode->memAvail - pDnode->memUsed <= 0) {
727!
2929
      mError("db:%s, vgId:%d, no enough memory in dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName,
×
2930
             pNewVgroup->vgId, pDnode->id, pDnode->memAvail, pDnode->memUsed);
2931
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE);
×
2932
    } else if (inVgroup) {
727✔
2933
      mInfo("db:%s, vgId:%d, memory in dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName, pNewVgroup->vgId,
633!
2934
            pDnode->id, pDnode->memAvail, pDnode->memUsed);
2935
    } else {
2936
    }
2937
  }
2938
  return 0;
437✔
2939
}
2940

2941
int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
529✔
2942
                                  SArray *pArray, SVgObj *pNewVgroup) {
2943
  int32_t code = 0;
529✔
2944
  memcpy(pNewVgroup, pVgroup, sizeof(SVgObj));
529✔
2945

2946
  if (pVgroup->replica <= 0 || pVgroup->replica == pNewDb->cfg.replications) {
529!
2947
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfigAction(pMnode, pTrans, pNewDb, pVgroup));
437!
2948
    TAOS_CHECK_RETURN(mndCheckDnodeMemory(pMnode, pOldDb, pNewDb, pNewVgroup, pVgroup, pArray));
437!
2949
    return 0;
437✔
2950
  }
2951

2952
  // mndTransSetGroupParallel(pTrans);
2953

2954
  if (pNewDb->cfg.replications == 3) {
92✔
2955
    mInfo("trans:%d, db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pTrans->id, pVgroup->dbName, pVgroup->vgId,
85!
2956
          pVgroup->vnodeGid[0].dnodeId);
2957

2958
    // add second
2959
    if (pNewVgroup->replica == 1) {
85!
2960
      TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
85✔
2961
    }
2962

2963
    // learner stage
2964
    pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
84✔
2965
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
84✔
2966
    TAOS_CHECK_RETURN(
84!
2967
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
2968

2969
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &pNewVgroup->vnodeGid[1]));
84!
2970

2971
    // follower stage
2972
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
84✔
2973
    TAOS_CHECK_RETURN(mndAddAlterVnodeTypeAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
84!
2974
    TAOS_CHECK_RETURN(
84!
2975
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
2976

2977
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
84!
2978

2979
    // add third
2980
    if (pNewVgroup->replica == 2) {
84!
2981
      TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
84✔
2982
    }
2983

2984
    pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
73✔
2985
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
73✔
2986
    pNewVgroup->vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
73✔
2987
    TAOS_CHECK_RETURN(
73!
2988
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
2989
    TAOS_CHECK_RETURN(
73!
2990
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
2991
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &pNewVgroup->vnodeGid[2]));
73!
2992

2993
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
73!
2994
  } else if (pNewDb->cfg.replications == 1) {
7!
2995
    mInfo("trans:%d, db:%s, vgId:%d, will remove 2 vnodes, vn:0 dnode:%d vn:1 dnode:%d vn:2 dnode:%d", pTrans->id,
7!
2996
          pVgroup->dbName, pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId, pVgroup->vnodeGid[1].dnodeId,
2997
          pVgroup->vnodeGid[2].dnodeId);
2998

2999
    SVnodeGid del1 = {0};
7✔
3000
    SVnodeGid del2 = {0};
7✔
3001
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroup(pMnode, pTrans, pNewVgroup, pArray, &del1));
7!
3002
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &del1, true));
7!
3003
    TAOS_CHECK_RETURN(
7!
3004
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3005
    TAOS_CHECK_RETURN(
7!
3006
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
3007
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
7!
3008

3009
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroup(pMnode, pTrans, pNewVgroup, pArray, &del2));
7!
3010
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &del2, true));
7!
3011
    TAOS_CHECK_RETURN(
7!
3012
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3013
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
7!
3014
  } else if (pNewDb->cfg.replications == 2) {
×
3015
    mInfo("trans:%d, db:%s, vgId:%d, will add 1 vnode, vn:0 dnode:%d", pTrans->id, pVgroup->dbName, pVgroup->vgId,
×
3016
          pVgroup->vnodeGid[0].dnodeId);
3017

3018
    // add second
3019
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
×
3020

3021
    // learner stage
3022
    pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3023
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3024
    TAOS_CHECK_RETURN(
×
3025
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3026

3027
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &pNewVgroup->vnodeGid[1]));
×
3028

3029
    // follower stage
3030
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3031
    TAOS_CHECK_RETURN(mndAddAlterVnodeTypeAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
×
3032
    TAOS_CHECK_RETURN(
×
3033
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3034

3035
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
×
3036
  } else {
3037
    return -1;
×
3038
  }
3039

3040
  mndSortVnodeGid(pNewVgroup);
80✔
3041

3042
  {
3043
    SSdbRaw *pVgRaw = mndVgroupActionEncode(pNewVgroup);
80✔
3044
    if (pVgRaw == NULL) {
80!
3045
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3046
      if (terrno != 0) code = terrno;
×
3047
      TAOS_RETURN(code);
×
3048
    }
3049
    if ((code = mndTransAppendCommitlog(pTrans, pVgRaw)) != 0) {
80!
3050
      sdbFreeRaw(pVgRaw);
×
3051
      TAOS_RETURN(code);
×
3052
    }
3053
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
80✔
3054
    if (code != 0) {
80!
3055
      mError("vgId:%d, failed to set raw status since %s at line:%d", pNewVgroup->vgId, tstrerror(code), __LINE__);
×
3056
      TAOS_RETURN(code);
×
3057
    }
3058
  }
3059

3060
  TAOS_RETURN(code);
80✔
3061
}
3062

3063
int32_t mndBuildRaftAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
×
3064
                                      SArray *pArray) {
3065
  int32_t code = 0;
×
3066
  SVgObj  newVgroup = {0};
×
3067
  memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
×
3068

3069
  if (pVgroup->replica <= 0 || pVgroup->replica == pNewDb->cfg.replications) {
×
3070
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfigAction(pMnode, pTrans, pNewDb, pVgroup));
×
3071
    TAOS_CHECK_RETURN(mndCheckDnodeMemory(pMnode, pOldDb, pNewDb, &newVgroup, pVgroup, pArray));
×
3072
    return 0;
×
3073
  }
3074

3075
  mndTransSetSerial(pTrans);
×
3076

3077
  mInfo("trans:%d, vgId:%d, alter vgroup, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id, pVgroup->vgId,
×
3078
        pVgroup->syncConfChangeVer, pVgroup->version, pVgroup->replica);
3079

3080
  if (newVgroup.replica == 1 && pNewDb->cfg.replications == 3) {
×
3081
    mInfo("db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId,
×
3082
          pVgroup->vnodeGid[0].dnodeId);
3083

3084
    // add second
3085
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVgroup, pArray));
×
3086
    // add third
3087
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVgroup, pArray));
×
3088

3089
    // add learner stage
3090
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3091
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3092
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3093
    TAOS_CHECK_RETURN(
×
3094
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3095
    mInfo("trans:%d, vgId:%d, add change config, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id,
×
3096
          pVgroup->vgId, newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
3097
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[1]));
×
3098
    mInfo("trans:%d, vgId:%d, create vnode, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id, pVgroup->vgId,
×
3099
          newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
3100
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[2]));
×
3101
    mInfo("trans:%d, vgId:%d, create vnode, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id, pVgroup->vgId,
×
3102
          newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
3103

3104
    // check learner
3105
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3106
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3107
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3108
    TAOS_CHECK_RETURN(
×
3109
        mndAddCheckLearnerCatchupAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[1].dnodeId));
3110
    TAOS_CHECK_RETURN(
×
3111
        mndAddCheckLearnerCatchupAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[2].dnodeId));
3112

3113
    // change raft type
3114
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3115
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3116
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3117
    TAOS_CHECK_RETURN(
×
3118
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3119

3120
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3121

3122
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3123
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3124
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3125
    TAOS_CHECK_RETURN(
×
3126
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3127

3128
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3129

3130
    SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
×
3131
    if (pVgRaw == NULL) {
×
3132
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3133
      if (terrno != 0) code = terrno;
×
3134
      TAOS_RETURN(code);
×
3135
    }
3136
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
3137
      sdbFreeRaw(pVgRaw);
×
3138
      TAOS_RETURN(code);
×
3139
    }
3140
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3141
    if (code != 0) {
×
3142
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3143
             __LINE__);
3144
      TAOS_RETURN(code);
×
3145
    }
3146
  } else if (newVgroup.replica == 3 && pNewDb->cfg.replications == 1) {
×
3147
    mInfo("db:%s, vgId:%d, will remove 2 vnodes, vn:0 dnode:%d vn:1 dnode:%d vn:2 dnode:%d", pVgroup->dbName,
×
3148
          pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId, pVgroup->vnodeGid[1].dnodeId, pVgroup->vnodeGid[2].dnodeId);
3149

3150
    SVnodeGid del1 = {0};
×
3151
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroupWithoutSave(pMnode, pTrans, &newVgroup, pArray, &del1));
×
3152

3153
    TAOS_CHECK_RETURN(
×
3154
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3155

3156
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3157

3158
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del1, true));
×
3159

3160
    SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
×
3161
    if (pVgRaw == NULL) {
×
3162
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3163
      if (terrno != 0) code = terrno;
×
3164
      TAOS_RETURN(code);
×
3165
    }
3166
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
3167
      sdbFreeRaw(pVgRaw);
×
3168
      TAOS_RETURN(code);
×
3169
    }
3170
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3171
    if (code != 0) {
×
3172
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3173
             __LINE__);
3174
      TAOS_RETURN(code);
×
3175
    }
3176

3177
    SVnodeGid del2 = {0};
×
3178
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroupWithoutSave(pMnode, pTrans, &newVgroup, pArray, &del2));
×
3179

3180
    TAOS_CHECK_RETURN(
×
3181
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3182

3183
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3184

3185
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del2, true));
×
3186

3187
    pVgRaw = mndVgroupActionEncode(&newVgroup);
×
3188
    if (pVgRaw == NULL) {
×
3189
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3190
      if (terrno != 0) code = terrno;
×
3191
      TAOS_RETURN(code);
×
3192
    }
3193
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
3194
      sdbFreeRaw(pVgRaw);
×
3195
      TAOS_RETURN(code);
×
3196
    }
3197
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3198
    if (code != 0) {
×
3199
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3200
             __LINE__);
3201
      TAOS_RETURN(code);
×
3202
    }
3203
  } else {
3204
    return -1;
×
3205
  }
3206

3207
  mndSortVnodeGid(&newVgroup);
×
3208

3209
  {
3210
    SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
×
3211
    if (pVgRaw == NULL) {
×
3212
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3213
      if (terrno != 0) code = terrno;
×
3214
      TAOS_RETURN(code);
×
3215
    }
3216
    if ((code = mndTransAppendCommitlog(pTrans, pVgRaw)) != 0) {
×
3217
      sdbFreeRaw(pVgRaw);
×
3218
      TAOS_RETURN(code);
×
3219
    }
3220
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3221
    if (code != 0) {
×
3222
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3223
             __LINE__);
3224
      TAOS_RETURN(code);
×
3225
    }
3226
  }
3227

3228
  TAOS_RETURN(code);
×
3229
}
3230

3231
int32_t mndBuildRestoreAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *db, SVgObj *pVgroup, SDnodeObj *pDnode,
10✔
3232
                                         SDnodeObj *pAnotherDnode) {
3233
  int32_t code = 0;
10✔
3234
  SVgObj  newVgroup = {0};
10✔
3235
  memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
10✔
3236

3237
  mInfo("trans:%d, db:%s, vgId:%d, restore vnodes, vn:0 dnode:%d", pTrans->id, pVgroup->dbName, pVgroup->vgId,
10!
3238
        pVgroup->vnodeGid[0].dnodeId);
3239

3240
  if (newVgroup.replica == 1) {
10!
3241
    int selected = 0;
×
3242
    for (int i = 0; i < newVgroup.replica; i++) {
×
3243
      newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3244
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3245
        selected = i;
×
3246
      }
3247
    }
3248
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, &newVgroup.vnodeGid[selected]));
×
3249
  } else if (newVgroup.replica == 2) {
10!
3250
    for (int i = 0; i < newVgroup.replica; i++) {
×
3251
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3252
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3253
      } else {
3254
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3255
      }
3256
    }
3257
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pAnotherDnode));
×
3258

3259
    for (int i = 0; i < newVgroup.replica; i++) {
×
3260
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3261
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3262
      } else {
3263
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3264
      }
3265
    }
3266
    TAOS_CHECK_RETURN(mndRestoreAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, pDnode));
×
3267

3268
    for (int i = 0; i < newVgroup.replica; i++) {
×
3269
      newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3270
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3271
      }
3272
    }
3273
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pDnode));
×
3274
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pAnotherDnode));
×
3275
  } else if (newVgroup.replica == 3) {
10!
3276
    for (int i = 0; i < newVgroup.replica; i++) {
40✔
3277
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
30✔
3278
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_LEARNER;
10✔
3279
      } else {
3280
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
20✔
3281
      }
3282
    }
3283
    TAOS_CHECK_RETURN(mndRestoreAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, pDnode));
10!
3284

3285
    for (int i = 0; i < newVgroup.replica; i++) {
40✔
3286
      newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
30✔
3287
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
30✔
3288
      }
3289
    }
3290
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pDnode));
10!
3291
  }
3292
  SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
10✔
3293
  if (pVgRaw == NULL) {
10!
3294
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3295
    if (terrno != 0) code = terrno;
×
3296
    TAOS_RETURN(code);
×
3297
  }
3298
  if ((code = mndTransAppendCommitlog(pTrans, pVgRaw)) != 0) {
10!
3299
    sdbFreeRaw(pVgRaw);
×
3300
    TAOS_RETURN(code);
×
3301
  }
3302
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
10✔
3303
  if (code != 0) {
10!
3304
    mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code), __LINE__);
×
3305
    TAOS_RETURN(code);
×
3306
  }
3307

3308
  TAOS_RETURN(code);
10✔
3309
}
3310

3311
static int32_t mndAddAdjustVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
×
3312
  return 0;
×
3313
}
3314

3315
typedef int32_t (*FpTransActionCb)(STrans *pTrans, SSdbRaw *pRaw);
3316

3317
static int32_t mndAddVgStatusAction(STrans *pTrans, SVgObj *pVg, ESdbStatus vgStatus, ETrnStage stage) {
187✔
3318
  int32_t         code = 0;
187✔
3319
  FpTransActionCb appendActionCb = (stage == TRN_STAGE_COMMIT_ACTION) ? mndTransAppendCommitlog : mndTransAppendRedolog;
187✔
3320
  SSdbRaw        *pRaw = mndVgroupActionEncode(pVg);
187✔
3321
  if (pRaw == NULL) {
187!
3322
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3323
    if (terrno != 0) code = terrno;
×
3324
    goto _err;
×
3325
  }
3326
  if ((code = appendActionCb(pTrans, pRaw)) != 0) goto _err;
187!
3327
  code = sdbSetRawStatus(pRaw, vgStatus);
187✔
3328
  if (code != 0) {
187!
3329
    mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", pVg->vgId, tstrerror(code), __LINE__);
×
3330
    goto _err;
×
3331
  }
3332
  pRaw = NULL;
187✔
3333
  TAOS_RETURN(code);
187✔
3334
_err:
×
3335
  sdbFreeRaw(pRaw);
×
3336
  TAOS_RETURN(code);
×
3337
}
3338

3339
static int32_t mndAddDbStatusAction(STrans *pTrans, SDbObj *pDb, ESdbStatus dbStatus, ETrnStage stage) {
77✔
3340
  int32_t         code = 0;
77✔
3341
  FpTransActionCb appendActionCb = (stage == TRN_STAGE_COMMIT_ACTION) ? mndTransAppendCommitlog : mndTransAppendRedolog;
77✔
3342
  SSdbRaw        *pRaw = mndDbActionEncode(pDb);
77✔
3343
  if (pRaw == NULL) {
77!
3344
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3345
    if (terrno != 0) code = terrno;
×
3346
    goto _err;
×
3347
  }
3348
  if ((code = appendActionCb(pTrans, pRaw)) != 0) goto _err;
77!
3349
  code = sdbSetRawStatus(pRaw, dbStatus);
77✔
3350
  if (code != 0) {
77!
3351
    mError("db:%s, failed to set raw status to ready, error:%s, line:%d", pDb->name, tstrerror(code), __LINE__);
×
3352
    goto _err;
×
3353
  }
3354
  pRaw = NULL;
77✔
3355
  TAOS_RETURN(code);
77✔
3356
_err:
×
3357
  sdbFreeRaw(pRaw);
×
3358
  TAOS_RETURN(code);
×
3359
}
3360

3361
int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) {
45✔
3362
  int32_t code = -1;
45✔
3363
  STrans *pTrans = NULL;
45✔
3364
  SDbObj  dbObj = {0};
45✔
3365
  SArray *pArray = mndBuildDnodesArray(pMnode, 0, NULL);
45✔
3366

3367
#if defined(USE_SHARED_STORAGE)
3368
  if (tsSsEnabled) {
45!
3369
    code = TSDB_CODE_OPS_NOT_SUPPORT;
×
3370
    mError("vgId:%d, db:%s, shared storage exists, split vgroup not allowed", pVgroup->vgId, pVgroup->dbName);
×
3371
    goto _OVER;
×
3372
  }
3373
#endif
3374

3375
  if (pDb->cfg.withArbitrator) {
45!
3376
    code = TSDB_CODE_OPS_NOT_SUPPORT;
×
3377
    mError("vgId:%d, db:%s, with arbitrator, split vgroup not allowed", pVgroup->vgId, pVgroup->dbName);
×
3378
    goto _OVER;
×
3379
  }
3380

3381
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "split-vgroup");
45✔
3382
  if (pTrans == NULL) {
45!
3383
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3384
    if (terrno != 0) code = terrno;
×
3385
    goto _OVER;
×
3386
  }
3387
  mndTransSetSerial(pTrans);
45✔
3388
  mInfo("trans:%d, used to split vgroup, vgId:%d", pTrans->id, pVgroup->vgId);
45!
3389

3390
  mndTransSetDbName(pTrans, pDb->name, NULL);
45✔
3391
  TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
45✔
3392

3393
  SVgObj newVg1 = {0};
44✔
3394
  memcpy(&newVg1, pVgroup, sizeof(SVgObj));
44✔
3395
  mInfo("vgId:%d, vgroup info before split, replica:%d hashBegin:%u hashEnd:%u", newVg1.vgId, newVg1.replica,
44!
3396
        newVg1.hashBegin, newVg1.hashEnd);
3397
  for (int32_t i = 0; i < newVg1.replica; ++i) {
154✔
3398
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg1.vgId, i, newVg1.vnodeGid[i].dnodeId);
110!
3399
  }
3400

3401
  if (newVg1.replica == 1) {
44✔
3402
    TAOS_CHECK_GOTO(mndAddVnodeToVgroup(pMnode, pTrans, &newVg1, pArray), NULL, _OVER);
11!
3403

3404
    newVg1.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
11✔
3405
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId), NULL,
11!
3406
                    _OVER);
3407
    TAOS_CHECK_GOTO(mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg1, &newVg1.vnodeGid[1]), NULL, _OVER);
11!
3408

3409
    newVg1.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
11✔
3410
    TAOS_CHECK_GOTO(mndAddAlterVnodeTypeAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[1].dnodeId), NULL, _OVER);
11!
3411
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId), NULL,
11!
3412
                    _OVER);
3413

3414
    TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1), NULL, _OVER);
11!
3415
  } else if (newVg1.replica == 3) {
33!
3416
    SVnodeGid del1 = {0};
33✔
3417
    TAOS_CHECK_GOTO(mndRemoveVnodeFromVgroup(pMnode, pTrans, &newVg1, pArray, &del1), NULL, _OVER);
33!
3418
    TAOS_CHECK_GOTO(mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg1, &del1, true), NULL, _OVER);
33!
3419
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId), NULL,
33!
3420
                    _OVER);
3421
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[1].dnodeId), NULL,
33!
3422
                    _OVER);
3423
  } else {
3424
    goto _OVER;
×
3425
  }
3426

3427
  for (int32_t i = 0; i < newVg1.replica; ++i) {
132✔
3428
    TAOS_CHECK_GOTO(mndAddDisableVnodeWriteAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[i].dnodeId), NULL,
88!
3429
                    _OVER);
3430
  }
3431
  TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1), NULL, _OVER);
44!
3432

3433
  SVgObj newVg2 = {0};
44✔
3434
  memcpy(&newVg2, &newVg1, sizeof(SVgObj));
44✔
3435
  newVg1.replica = 1;
44✔
3436
  newVg1.hashEnd = newVg1.hashBegin / 2 + newVg1.hashEnd / 2;
44✔
3437
  memset(&newVg1.vnodeGid[1], 0, sizeof(SVnodeGid));
44✔
3438

3439
  newVg2.replica = 1;
44✔
3440
  newVg2.hashBegin = newVg1.hashEnd + 1;
44✔
3441
  memcpy(&newVg2.vnodeGid[0], &newVg2.vnodeGid[1], sizeof(SVnodeGid));
44✔
3442
  memset(&newVg2.vnodeGid[1], 0, sizeof(SVnodeGid));
44✔
3443

3444
  mInfo("vgId:%d, vgroup info after split, replica:%d hashrange:[%u, %u] vnode:0 dnode:%d", newVg1.vgId, newVg1.replica,
44!
3445
        newVg1.hashBegin, newVg1.hashEnd, newVg1.vnodeGid[0].dnodeId);
3446
  for (int32_t i = 0; i < newVg1.replica; ++i) {
88✔
3447
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg1.vgId, i, newVg1.vnodeGid[i].dnodeId);
44!
3448
  }
3449
  mInfo("vgId:%d, vgroup info after split, replica:%d hashrange:[%u, %u] vnode:0 dnode:%d", newVg2.vgId, newVg2.replica,
44!
3450
        newVg2.hashBegin, newVg2.hashEnd, newVg2.vnodeGid[0].dnodeId);
3451
  for (int32_t i = 0; i < newVg1.replica; ++i) {
88✔
3452
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg2.vgId, i, newVg2.vnodeGid[i].dnodeId);
44!
3453
  }
3454

3455
  // alter vgId and hash range
3456
  int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
44✔
3457
  int32_t srcVgId = newVg1.vgId;
44✔
3458
  newVg1.vgId = maxVgId;
44✔
3459
  TAOS_CHECK_GOTO(mndAddNewVgPrepareAction(pMnode, pTrans, &newVg1), NULL, _OVER);
44!
3460
  TAOS_CHECK_GOTO(mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg1), NULL, _OVER);
44!
3461

3462
  maxVgId++;
44✔
3463
  srcVgId = newVg2.vgId;
44✔
3464
  newVg2.vgId = maxVgId;
44✔
3465
  TAOS_CHECK_GOTO(mndAddNewVgPrepareAction(pMnode, pTrans, &newVg2), NULL, _OVER);
44!
3466
  TAOS_CHECK_GOTO(mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg2), NULL, _OVER);
44!
3467

3468
  TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1), NULL, _OVER);
44!
3469
  TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg2), NULL, _OVER);
44!
3470

3471
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg1, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION), NULL, _OVER);
44!
3472
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg2, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION), NULL, _OVER);
44!
3473
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, pVgroup, SDB_STATUS_DROPPED, TRN_STAGE_REDO_ACTION), NULL, _OVER);
44!
3474

3475
  // update db status
3476
  memcpy(&dbObj, pDb, sizeof(SDbObj));
44✔
3477
  if (dbObj.cfg.pRetensions != NULL) {
44!
3478
    dbObj.cfg.pRetensions = taosArrayDup(pDb->cfg.pRetensions, NULL);
×
3479
    if (dbObj.cfg.pRetensions == NULL) {
×
3480
      code = terrno;
×
3481
      goto _OVER;
×
3482
    }
3483
  }
3484
  dbObj.vgVersion++;
44✔
3485
  dbObj.updateTime = taosGetTimestampMs();
44✔
3486
  dbObj.cfg.numOfVgroups++;
44✔
3487
  TAOS_CHECK_GOTO(mndAddDbStatusAction(pTrans, &dbObj, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION), NULL, _OVER);
44!
3488

3489
  // adjust vgroup replica
3490
  if (pDb->cfg.replications != newVg1.replica) {
44✔
3491
    SVgObj tmpGroup = {0};
33✔
3492
    TAOS_CHECK_GOTO(mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg1, pArray, &tmpGroup), NULL, _OVER);
33✔
3493
  } else {
3494
    TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg1, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
11!
3495
  }
3496

3497
  if (pDb->cfg.replications != newVg2.replica) {
40✔
3498
    SVgObj tmpGroup = {0};
29✔
3499
    TAOS_CHECK_GOTO(mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg2, pArray, &tmpGroup), NULL, _OVER);
29✔
3500
  } else {
3501
    TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg2, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
11!
3502
  }
3503

3504
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, pVgroup, SDB_STATUS_DROPPED, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
33!
3505

3506
  // commit db status
3507
  dbObj.vgVersion++;
33✔
3508
  dbObj.updateTime = taosGetTimestampMs();
33✔
3509
  TAOS_CHECK_GOTO(mndAddDbStatusAction(pTrans, &dbObj, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
33!
3510

3511
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
33!
3512
  code = 0;
33✔
3513

3514
_OVER:
45✔
3515
  taosArrayDestroy(pArray);
45✔
3516
  mndTransDrop(pTrans);
45✔
3517
  taosArrayDestroy(dbObj.cfg.pRetensions);
45✔
3518
  TAOS_RETURN(code);
45✔
3519
}
3520

3521
extern int32_t mndProcessSplitVgroupMsgImp(SRpcMsg *pReq);
3522

3523
static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) { return mndProcessSplitVgroupMsgImp(pReq); }
47✔
3524

3525
#ifndef TD_ENTERPRISE
3526
int32_t mndProcessSplitVgroupMsgImp(SRpcMsg *pReq) { return 0; }
3527
#endif
3528

3529
static int32_t mndSetBalanceVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
21✔
3530
                                              SDnodeObj *pSrc, SDnodeObj *pDst) {
3531
  int32_t code = 0;
21✔
3532
  SVgObj  newVg = {0};
21✔
3533
  memcpy(&newVg, pVgroup, sizeof(SVgObj));
21✔
3534
  mInfo("vgId:%d, vgroup info before balance, replica:%d", newVg.vgId, newVg.replica);
21!
3535
  for (int32_t i = 0; i < newVg.replica; ++i) {
62✔
3536
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
41!
3537
  }
3538

3539
  TAOS_CHECK_RETURN(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pDst->id));
21!
3540
  TAOS_CHECK_RETURN(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pSrc->id));
21!
3541

3542
  {
3543
    SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
21✔
3544
    if (pRaw == NULL) {
21!
3545
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3546
      if (terrno != 0) code = terrno;
×
3547
      TAOS_RETURN(code);
×
3548
    }
3549
    if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
21!
3550
      sdbFreeRaw(pRaw);
×
3551
      TAOS_RETURN(code);
×
3552
    }
3553
    code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
21✔
3554
    if (code != 0) {
21!
3555
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
3556
      TAOS_RETURN(code);
×
3557
    }
3558
  }
3559

3560
  mInfo("vgId:%d, vgroup info after balance, replica:%d", newVg.vgId, newVg.replica);
21!
3561
  for (int32_t i = 0; i < newVg.replica; ++i) {
62✔
3562
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
41!
3563
  }
3564
  TAOS_RETURN(code);
21✔
3565
}
3566

3567
static int32_t mndBalanceVgroupBetweenDnode(SMnode *pMnode, STrans *pTrans, SDnodeObj *pSrc, SDnodeObj *pDst,
21✔
3568
                                            SHashObj *pBalancedVgroups) {
3569
  void   *pIter = NULL;
21✔
3570
  int32_t code = -1;
21✔
3571
  SSdb   *pSdb = pMnode->pSdb;
21✔
3572

3573
  while (1) {
13✔
3574
    SVgObj *pVgroup = NULL;
34✔
3575
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
34✔
3576
    if (pIter == NULL) break;
34!
3577
    if (taosHashGet(pBalancedVgroups, &pVgroup->vgId, sizeof(int32_t)) != NULL) {
34✔
3578
      sdbRelease(pSdb, pVgroup);
12✔
3579
      continue;
13✔
3580
    }
3581

3582
    bool existInSrc = false;
22✔
3583
    bool existInDst = false;
22✔
3584
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
64✔
3585
      SVnodeGid *pGid = &pVgroup->vnodeGid[i];
42✔
3586
      if (pGid->dnodeId == pSrc->id) existInSrc = true;
42✔
3587
      if (pGid->dnodeId == pDst->id) existInDst = true;
42!
3588
    }
3589

3590
    if (!existInSrc || existInDst) {
22!
3591
      sdbRelease(pSdb, pVgroup);
1✔
3592
      continue;
1✔
3593
    }
3594

3595
    SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
21✔
3596
    if (pDb == NULL) {
21!
3597
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3598
      if (terrno != 0) code = terrno;
×
3599
      mError("vgId:%d, balance vgroup can't find db obj dbName:%s", pVgroup->vgId, pVgroup->dbName);
×
3600
      goto _OUT;
×
3601
    }
3602

3603
    if (pDb->cfg.withArbitrator) {
21!
3604
      mInfo("vgId:%d, db:%s, with arbitrator, balance vgroup not allowed", pVgroup->vgId, pVgroup->dbName);
×
3605
      goto _OUT;
×
3606
    }
3607

3608
    code = mndSetBalanceVgroupInfoToTrans(pMnode, pTrans, pDb, pVgroup, pSrc, pDst);
21✔
3609
    if (code == 0) {
21!
3610
      code = taosHashPut(pBalancedVgroups, &pVgroup->vgId, sizeof(int32_t), &pVgroup->vgId, sizeof(int32_t));
21✔
3611
    }
3612

3613
  _OUT:
×
3614
    mndReleaseDb(pMnode, pDb);
21✔
3615
    sdbRelease(pSdb, pVgroup);
21✔
3616
    sdbCancelFetch(pSdb, pIter);
21✔
3617
    break;
21✔
3618
  }
3619

3620
  return code;
21✔
3621
}
3622

3623
static int32_t mndBalanceVgroup(SMnode *pMnode, SRpcMsg *pReq, SArray *pArray) {
24✔
3624
  int32_t   code = -1;
24✔
3625
  int32_t   numOfVgroups = 0;
24✔
3626
  STrans   *pTrans = NULL;
24✔
3627
  SHashObj *pBalancedVgroups = NULL;
24✔
3628

3629
  pBalancedVgroups = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
24✔
3630
  if (pBalancedVgroups == NULL) goto _OVER;
24!
3631

3632
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "balance-vgroup");
24✔
3633
  if (pTrans == NULL) {
24!
3634
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3635
    if (terrno != 0) code = terrno;
×
3636
    goto _OVER;
×
3637
  }
3638
  mndTransSetSerial(pTrans);
24✔
3639
  mInfo("trans:%d, used to balance vgroup", pTrans->id);
24!
3640
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
24✔
3641
  TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
14✔
3642

3643
  while (1) {
21✔
3644
    taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
34✔
3645
    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
147✔
3646
      SDnodeObj *pDnode = taosArrayGet(pArray, i);
113✔
3647
      mInfo("dnode:%d, equivalent vnodes:%d others:%d support:%d, score:%f", pDnode->id, pDnode->numOfVnodes,
113!
3648
            pDnode->numOfSupportVnodes, pDnode->numOfOtherNodes, mndGetDnodeScore(pDnode, 0, 1));
3649
    }
3650

3651
    SDnodeObj *pSrc = taosArrayGet(pArray, taosArrayGetSize(pArray) - 1);
34✔
3652
    SDnodeObj *pDst = taosArrayGet(pArray, 0);
34✔
3653

3654
    float srcScore = mndGetDnodeScore(pSrc, -1, 1);
34✔
3655
    float dstScore = mndGetDnodeScore(pDst, 1, 1);
34✔
3656
    mInfo("trans:%d, after balance, src dnode:%d score:%f, dst dnode:%d score:%f", pTrans->id, pSrc->id, dstScore,
34!
3657
          pDst->id, dstScore);
3658

3659
    if (srcScore > dstScore - 0.000001) {
34✔
3660
      code = mndBalanceVgroupBetweenDnode(pMnode, pTrans, pSrc, pDst, pBalancedVgroups);
21✔
3661
      if (code == 0) {
21!
3662
        pSrc->numOfVnodes--;
21✔
3663
        pDst->numOfVnodes++;
21✔
3664
        numOfVgroups++;
21✔
3665
        continue;
21✔
3666
      } else {
3667
        mInfo("trans:%d, no vgroup need to balance from dnode:%d to dnode:%d", pTrans->id, pSrc->id, pDst->id);
×
3668
        break;
×
3669
      }
3670
    } else {
3671
      mInfo("trans:%d, no vgroup need to balance any more", pTrans->id);
13!
3672
      break;
13✔
3673
    }
3674
  }
3675

3676
  if (numOfVgroups <= 0) {
13!
3677
    mInfo("no need to balance vgroup");
×
3678
    code = 0;
×
3679
  } else {
3680
    mInfo("start to balance vgroup, numOfVgroups:%d", numOfVgroups);
13!
3681
    if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
13!
3682
    code = TSDB_CODE_ACTION_IN_PROGRESS;
13✔
3683
  }
3684

3685
_OVER:
24✔
3686
  taosHashCleanup(pBalancedVgroups);
24✔
3687
  mndTransDrop(pTrans);
24✔
3688
  TAOS_RETURN(code);
24✔
3689
}
3690

3691
static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) {
27✔
3692
  SMnode *pMnode = pReq->info.node;
27✔
3693
  int32_t code = -1;
27✔
3694
  SArray *pArray = NULL;
27✔
3695
  void   *pIter = NULL;
27✔
3696
  int64_t curMs = taosGetTimestampMs();
27✔
3697

3698
  SBalanceVgroupReq req = {0};
27✔
3699
  if (tDeserializeSBalanceVgroupReq(pReq->pCont, pReq->contLen, &req) != 0) {
27!
3700
    code = TSDB_CODE_INVALID_MSG;
×
3701
    goto _OVER;
×
3702
  }
3703

3704
  mInfo("start to balance vgroup");
27!
3705
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_BALANCE_VGROUP)) != 0) {
27✔
3706
    goto _OVER;
1✔
3707
  }
3708

3709
  if (sdbGetSize(pMnode->pSdb, SDB_MOUNT) > 0) {
26!
3710
    code = TSDB_CODE_MND_MOUNT_NOT_EMPTY;
×
3711
    goto _OVER;
×
3712
  }
3713

3714
  while (1) {
79✔
3715
    SDnodeObj *pDnode = NULL;
105✔
3716
    pIter = sdbFetch(pMnode->pSdb, SDB_DNODE, pIter, (void **)&pDnode);
105✔
3717
    if (pIter == NULL) break;
105✔
3718
    if (!mndIsDnodeOnline(pDnode, curMs)) {
81✔
3719
      sdbCancelFetch(pMnode->pSdb, pIter);
2✔
3720
      code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
2✔
3721
      mError("failed to balance vgroup since %s, dnode:%d", terrstr(), pDnode->id);
2!
3722
      sdbRelease(pMnode->pSdb, pDnode);
2✔
3723
      goto _OVER;
2✔
3724
    }
3725

3726
    sdbRelease(pMnode->pSdb, pDnode);
79✔
3727
  }
3728

3729
  pArray = mndBuildDnodesArray(pMnode, 0, NULL);
24✔
3730
  if (pArray == NULL) {
24!
3731
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3732
    if (terrno != 0) code = terrno;
×
3733
    goto _OVER;
×
3734
  }
3735

3736
  if (taosArrayGetSize(pArray) < 2) {
24!
3737
    mInfo("no need to balance vgroup since dnode num less than 2");
×
3738
    code = 0;
×
3739
  } else {
3740
    code = mndBalanceVgroup(pMnode, pReq, pArray);
24✔
3741
  }
3742

3743
  auditRecord(pReq, pMnode->clusterId, "balanceVgroup", "", "", req.sql, req.sqlLen);
24✔
3744

3745
_OVER:
27✔
3746
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
27!
3747
    mError("failed to balance vgroup since %s", tstrerror(code));
14!
3748
  }
3749

3750
  taosArrayDestroy(pArray);
27✔
3751
  tFreeSBalanceVgroupReq(&req);
27✔
3752
  TAOS_RETURN(code);
27✔
3753
}
3754

3755
bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; }
345,458!
3756

3757
bool mndVgroupInDnode(SVgObj *pVgroup, int32_t dnodeId) {
16✔
3758
  for (int i = 0; i < pVgroup->replica; i++) {
42✔
3759
    if (pVgroup->vnodeGid[i].dnodeId == dnodeId) return true;
36✔
3760
  }
3761
  return false;
6✔
3762
}
3763

3764
static void *mndBuildCompactVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen, int64_t compactTs,
102✔
3765
                                     STimeWindow tw, bool metaOnly) {
3766
  SCompactVnodeReq compactReq = {0};
102✔
3767
  compactReq.dbUid = pDb->uid;
102✔
3768
  compactReq.compactStartTime = compactTs;
102✔
3769
  compactReq.tw = tw;
102✔
3770
  compactReq.metaOnly = metaOnly;
102✔
3771
  tstrncpy(compactReq.db, pDb->name, TSDB_DB_FNAME_LEN);
102✔
3772

3773
  mInfo("vgId:%d, build compact vnode config req", pVgroup->vgId);
102!
3774
  int32_t contLen = tSerializeSCompactVnodeReq(NULL, 0, &compactReq);
102✔
3775
  if (contLen < 0) {
102!
3776
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
3777
    return NULL;
×
3778
  }
3779
  contLen += sizeof(SMsgHead);
102✔
3780

3781
  void *pReq = taosMemoryMalloc(contLen);
102!
3782
  if (pReq == NULL) {
102!
3783
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
3784
    return NULL;
×
3785
  }
3786

3787
  SMsgHead *pHead = pReq;
102✔
3788
  pHead->contLen = htonl(contLen);
102✔
3789
  pHead->vgId = htonl(pVgroup->vgId);
102✔
3790

3791
  if (tSerializeSCompactVnodeReq((char *)pReq + sizeof(SMsgHead), contLen, &compactReq) < 0) {
102!
3792
    taosMemoryFree(pReq);
×
3793
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
3794
    return NULL;
×
3795
  }
3796
  *pContLen = contLen;
102✔
3797
  return pReq;
102✔
3798
}
3799

3800
static int32_t mndAddCompactVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs,
102✔
3801
                                        STimeWindow tw, bool metaOnly) {
3802
  int32_t      code = 0;
102✔
3803
  STransAction action = {0};
102✔
3804
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
102✔
3805

3806
  int32_t contLen = 0;
102✔
3807
  void   *pReq = mndBuildCompactVnodeReq(pMnode, pDb, pVgroup, &contLen, compactTs, tw, metaOnly);
102✔
3808
  if (pReq == NULL) {
102!
3809
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3810
    if (terrno != 0) code = terrno;
×
3811
    TAOS_RETURN(code);
×
3812
  }
3813

3814
  action.pCont = pReq;
102✔
3815
  action.contLen = contLen;
102✔
3816
  action.msgType = TDMT_VND_COMPACT;
102✔
3817

3818
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
102!
3819
    taosMemoryFree(pReq);
×
3820
    TAOS_RETURN(code);
×
3821
  }
3822

3823
  TAOS_RETURN(code);
102✔
3824
}
3825

3826
int32_t mndBuildCompactVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs,
102✔
3827
                                    STimeWindow tw, bool metaOnly) {
3828
  TAOS_CHECK_RETURN(mndAddCompactVnodeAction(pMnode, pTrans, pDb, pVgroup, compactTs, tw, metaOnly));
102!
3829
  return 0;
102✔
3830
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc