• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4506

15 Jul 2025 12:33AM UTC coverage: 62.026% (-0.7%) from 62.706%
#4506

push

travis-ci

web-flow
docs: update stream docs (#31874)

155391 of 320094 branches covered (48.55%)

Branch coverage included in aggregate %.

240721 of 318525 relevant lines covered (75.57%)

6529048.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.19
/source/dnode/mnode/impl/src/mndVgroup.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "mndArbGroup.h"
19
#include "mndDb.h"
20
#include "mndDnode.h"
21
#include "mndMnode.h"
22
#include "mndPrivilege.h"
23
#include "mndShow.h"
24
#include "mndStb.h"
25
#include "mndStream.h"
26
#include "mndTopic.h"
27
#include "mndTrans.h"
28
#include "mndUser.h"
29
#include "mndVgroup.h"
30
#include "tmisce.h"
31

32
#define VGROUP_VER_NUMBER   1
33
#define VGROUP_RESERVE_SIZE 60
34

35
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup);
36
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup);
37
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew);
38
static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw);
39

40
static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
41
static void    mndCancelGetNextVgroup(SMnode *pMnode, void *pIter);
42
static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
43
static void    mndCancelGetNextVnode(SMnode *pMnode, void *pIter);
44

45
static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq);
46
static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq);
47
static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq);
48
static int32_t mndProcessVgroupBalanceLeaderMsg(SRpcMsg *pReq);
49
int32_t mndTransProcessSsMigrateVgroupRsp(SRpcMsg *pRsp);
50

51
int32_t mndInitVgroup(SMnode *pMnode) {
2,477✔
52
  SSdbTable table = {
2,477✔
53
      .sdbType = SDB_VGROUP,
54
      .keyType = SDB_KEY_INT32,
55
      .encodeFp = (SdbEncodeFp)mndVgroupActionEncode,
56
      .decodeFp = (SdbDecodeFp)mndVgroupActionDecode,
57
      .insertFp = (SdbInsertFp)mndVgroupActionInsert,
58
      .updateFp = (SdbUpdateFp)mndVgroupActionUpdate,
59
      .deleteFp = (SdbDeleteFp)mndVgroupActionDelete,
60
      .validateFp = (SdbValidateFp)mndNewVgActionValidate,
61
  };
62

63
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndTransProcessRsp);
2,477✔
64
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_REPLICA_RSP, mndTransProcessRsp);
2,477✔
65
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIG_RSP, mndTransProcessRsp);
2,477✔
66
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIRM_RSP, mndTransProcessRsp);
2,477✔
67
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_HASHRANGE_RSP, mndTransProcessRsp);
2,477✔
68
  mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndTransProcessRsp);
2,477✔
69
  mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_RSP, mndTransProcessRsp);
2,477✔
70
  mndSetMsgHandle(pMnode, TDMT_VND_SSMIGRATE_RSP, mndTransProcessSsMigrateVgroupRsp);
2,477✔
71
  mndSetMsgHandle(pMnode, TDMT_VND_DISABLE_WRITE_RSP, mndTransProcessRsp);
2,477✔
72
  mndSetMsgHandle(pMnode, TDMT_SYNC_FORCE_FOLLOWER_RSP, mndTransProcessRsp);
2,477✔
73
  mndSetMsgHandle(pMnode, TDMT_DND_ALTER_VNODE_TYPE_RSP, mndTransProcessRsp);
2,477✔
74
  mndSetMsgHandle(pMnode, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP_RSP, mndTransProcessRsp);
2,477✔
75
  mndSetMsgHandle(pMnode, TDMT_SYNC_CONFIG_CHANGE_RSP, mndTransProcessRsp);
2,477✔
76

77
  mndSetMsgHandle(pMnode, TDMT_MND_REDISTRIBUTE_VGROUP, mndProcessRedistributeVgroupMsg);
2,477✔
78
  mndSetMsgHandle(pMnode, TDMT_MND_SPLIT_VGROUP, mndProcessSplitVgroupMsg);
2,477✔
79
  // mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessVgroupBalanceLeaderMsg);
80
  mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessBalanceVgroupMsg);
2,477✔
81
  mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP_LEADER, mndProcessVgroupBalanceLeaderMsg);
2,477✔
82

83
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndRetrieveVgroups);
2,477✔
84
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndCancelGetNextVgroup);
2,477✔
85
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndRetrieveVnodes);
2,477✔
86
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndCancelGetNextVnode);
2,477✔
87

88
  return sdbSetTable(pMnode->pSdb, table);
2,477✔
89
}
90

91
void mndCleanupVgroup(SMnode *pMnode) {}
2,476✔
92

93
SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) {
43,278✔
94
  int32_t code = 0;
43,278✔
95
  int32_t lino = 0;
43,278✔
96
  terrno = TSDB_CODE_OUT_OF_MEMORY;
43,278✔
97

98
  SSdbRaw *pRaw = sdbAllocRaw(SDB_VGROUP, VGROUP_VER_NUMBER, sizeof(SVgObj) + VGROUP_RESERVE_SIZE);
43,278✔
99
  if (pRaw == NULL) goto _OVER;
43,278!
100

101
  int32_t dataPos = 0;
43,278✔
102
  SDB_SET_INT32(pRaw, dataPos, pVgroup->vgId, _OVER)
43,278!
103
  SDB_SET_INT64(pRaw, dataPos, pVgroup->createdTime, _OVER)
43,278!
104
  SDB_SET_INT64(pRaw, dataPos, pVgroup->updateTime, _OVER)
43,278!
105
  SDB_SET_INT32(pRaw, dataPos, pVgroup->version, _OVER)
43,278!
106
  SDB_SET_INT32(pRaw, dataPos, pVgroup->hashBegin, _OVER)
43,278!
107
  SDB_SET_INT32(pRaw, dataPos, pVgroup->hashEnd, _OVER)
43,278!
108
  SDB_SET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, _OVER)
43,278!
109
  SDB_SET_INT64(pRaw, dataPos, pVgroup->dbUid, _OVER)
43,278!
110
  SDB_SET_INT8(pRaw, dataPos, pVgroup->isTsma, _OVER)
43,278!
111
  SDB_SET_INT8(pRaw, dataPos, pVgroup->replica, _OVER)
43,278!
112
  for (int8_t i = 0; i < pVgroup->replica; ++i) {
93,158✔
113
    SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
49,880✔
114
    SDB_SET_INT32(pRaw, dataPos, pVgid->dnodeId, _OVER)
49,880!
115
  }
116
  SDB_SET_INT32(pRaw, dataPos, pVgroup->syncConfChangeVer, _OVER)
43,278!
117
  SDB_SET_INT32(pRaw, dataPos, pVgroup->mountVgId, _OVER)
43,278!
118
  SDB_SET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER)
43,278!
119
  SDB_SET_DATALEN(pRaw, dataPos, _OVER)
43,278!
120

121
  terrno = 0;
43,278✔
122

123
_OVER:
43,278✔
124
  if (terrno != 0) {
43,278!
125
    mError("vgId:%d, failed to encode to raw:%p since %s", pVgroup->vgId, pRaw, terrstr());
×
126
    sdbFreeRaw(pRaw);
×
127
    return NULL;
×
128
  }
129

130
  mTrace("vgId:%d, encode to raw:%p, row:%p", pVgroup->vgId, pRaw, pVgroup);
43,278✔
131
  return pRaw;
43,278✔
132
}
133

134
SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
38,648✔
135
  int32_t code = 0;
38,648✔
136
  int32_t lino = 0;
38,648✔
137
  terrno = TSDB_CODE_OUT_OF_MEMORY;
38,648✔
138
  SSdbRow *pRow = NULL;
38,648✔
139
  SVgObj  *pVgroup = NULL;
38,648✔
140

141
  int8_t sver = 0;
38,648✔
142
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
38,648!
143

144
  if (sver < 1 || sver > VGROUP_VER_NUMBER) {
38,648!
145
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
146
    goto _OVER;
×
147
  }
148

149
  pRow = sdbAllocRow(sizeof(SVgObj));
38,648✔
150
  if (pRow == NULL) goto _OVER;
38,648!
151

152
  pVgroup = sdbGetRowObj(pRow);
38,648✔
153
  if (pVgroup == NULL) goto _OVER;
38,648!
154

155
  int32_t dataPos = 0;
38,648✔
156
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->vgId, _OVER)
38,648!
157
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->createdTime, _OVER)
38,648!
158
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->updateTime, _OVER)
38,648!
159
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->version, _OVER)
38,648!
160
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashBegin, _OVER)
38,648!
161
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashEnd, _OVER)
38,648!
162
  SDB_GET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, _OVER)
38,648!
163
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->dbUid, _OVER)
38,648!
164
  SDB_GET_INT8(pRaw, dataPos, &pVgroup->isTsma, _OVER)
38,648!
165
  SDB_GET_INT8(pRaw, dataPos, &pVgroup->replica, _OVER)
38,648!
166
  for (int8_t i = 0; i < pVgroup->replica; ++i) {
84,841✔
167
    SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
46,193✔
168
    SDB_GET_INT32(pRaw, dataPos, &pVgid->dnodeId, _OVER)
46,193!
169
    if (pVgroup->replica == 1) {
46,193✔
170
      pVgid->syncState = TAOS_SYNC_STATE_LEADER;
34,819✔
171
    }
172
  }
173
  if (dataPos + 2 * sizeof(int32_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen) {
38,648!
174
    SDB_GET_INT32(pRaw, dataPos, &pVgroup->syncConfChangeVer, _OVER)
38,648!
175
  }
176
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->mountVgId, _OVER)
38,648!
177
  SDB_GET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER)
38,648!
178

179
  terrno = 0;
38,648✔
180

181
_OVER:
38,648✔
182
  if (terrno != 0) {
38,648!
183
    mError("vgId:%d, failed to decode from raw:%p since %s", pVgroup == NULL ? 0 : pVgroup->vgId, pRaw, terrstr());
×
184
    taosMemoryFreeClear(pRow);
×
185
    return NULL;
×
186
  }
187

188
  mTrace("vgId:%d, decode from raw:%p, row:%p", pVgroup->vgId, pRaw, pVgroup);
38,648✔
189
  return pRow;
38,648✔
190
}
191

192
static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw) {
10,714✔
193
  SSdb    *pSdb = pMnode->pSdb;
10,714✔
194
  SSdbRow *pRow = NULL;
10,714✔
195
  SVgObj  *pVgroup = NULL;
10,714✔
196
  int      code = -1;
10,714✔
197

198
  pRow = mndVgroupActionDecode(pRaw);
10,714✔
199
  if (pRow == NULL) {
10,714!
200
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
201
    if (terrno != 0) code = terrno;
×
202
    goto _OVER;
×
203
  }
204
  pVgroup = sdbGetRowObj(pRow);
10,714✔
205
  if (pVgroup == NULL) {
10,714!
206
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
207
    if (terrno != 0) code = terrno;
×
208
    goto _OVER;
×
209
  }
210

211
  int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
10,714✔
212
  if (maxVgId > pVgroup->vgId) {
10,714!
213
    mError("trans:%d, vgroup id %d already in use. maxVgId:%d", pTrans->id, pVgroup->vgId, maxVgId);
×
214
    goto _OVER;
×
215
  }
216

217
  code = 0;
10,714✔
218
_OVER:
10,714✔
219
  if (pVgroup) mndVgroupActionDelete(pSdb, pVgroup);
10,714!
220
  taosMemoryFreeClear(pRow);
10,714!
221
  TAOS_RETURN(code);
10,714✔
222
}
223

224
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup) {
11,896✔
225
  mTrace("vgId:%d, perform insert action, row:%p", pVgroup->vgId, pVgroup);
11,896✔
226
  return 0;
11,896✔
227
}
228

229
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup) {
38,648✔
230
  mTrace("vgId:%d, perform delete action, row:%p", pVgroup->vgId, pVgroup);
38,648✔
231
  return 0;
38,648✔
232
}
233

234
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew) {
11,758✔
235
  mTrace("vgId:%d, perform update action, old row:%p new row:%p", pOld->vgId, pOld, pNew);
11,758✔
236
  pOld->updateTime = pNew->updateTime;
11,758✔
237
  pOld->version = pNew->version;
11,758✔
238
  pOld->hashBegin = pNew->hashBegin;
11,758✔
239
  pOld->hashEnd = pNew->hashEnd;
11,758✔
240
  pOld->replica = pNew->replica;
11,758✔
241
  pOld->isTsma = pNew->isTsma;
11,758✔
242
  for (int32_t i = 0; i < pNew->replica; ++i) {
26,906✔
243
    SVnodeGid *pNewGid = &pNew->vnodeGid[i];
15,148✔
244
    for (int32_t j = 0; j < pOld->replica; ++j) {
40,774✔
245
      SVnodeGid *pOldGid = &pOld->vnodeGid[j];
25,626✔
246
      if (pNewGid->dnodeId == pOldGid->dnodeId) {
25,626✔
247
        pNewGid->syncState = pOldGid->syncState;
14,543✔
248
        pNewGid->syncRestore = pOldGid->syncRestore;
14,543✔
249
        pNewGid->syncCanRead = pOldGid->syncCanRead;
14,543✔
250
        pNewGid->syncAppliedIndex = pOldGid->syncAppliedIndex;
14,543✔
251
        pNewGid->syncCommitIndex = pOldGid->syncCommitIndex;
14,543✔
252
        pNewGid->bufferSegmentUsed = pOldGid->bufferSegmentUsed;
14,543✔
253
        pNewGid->bufferSegmentSize = pOldGid->bufferSegmentSize;
14,543✔
254
      }
255
    }
256
  }
257
  pNew->numOfTables = pOld->numOfTables;
11,758✔
258
  pNew->numOfTimeSeries = pOld->numOfTimeSeries;
11,758✔
259
  pNew->totalStorage = pOld->totalStorage;
11,758✔
260
  pNew->compStorage = pOld->compStorage;
11,758✔
261
  pNew->pointsWritten = pOld->pointsWritten;
11,758✔
262
  pNew->compact = pOld->compact;
11,758✔
263
  memcpy(pOld->vnodeGid, pNew->vnodeGid, (TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA) * sizeof(SVnodeGid));
11,758✔
264
  pOld->syncConfChangeVer = pNew->syncConfChangeVer;
11,758✔
265
  return 0;
11,758✔
266
}
267

268
SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId) {
264,759✔
269
  SSdb   *pSdb = pMnode->pSdb;
264,759✔
270
  SVgObj *pVgroup = sdbAcquire(pSdb, SDB_VGROUP, &vgId);
264,759✔
271
  if (pVgroup == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
264,759✔
272
    terrno = TSDB_CODE_MND_VGROUP_NOT_EXIST;
4,176✔
273
  }
274
  return pVgroup;
264,759✔
275
}
276

277
void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) {
262,246✔
278
  SSdb *pSdb = pMnode->pSdb;
262,246✔
279
  sdbRelease(pSdb, pVgroup);
262,246✔
280
}
262,246✔
281

282
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
11,280✔
283
  SCreateVnodeReq createReq = {0};
11,280✔
284
  createReq.vgId = pVgroup->vgId;
11,280✔
285
  memcpy(createReq.db, pDb->name, TSDB_DB_FNAME_LEN);
11,280✔
286
  createReq.dbUid = pDb->uid;
11,280✔
287
  createReq.vgVersion = pVgroup->version;
11,280✔
288
  createReq.numOfStables = pDb->cfg.numOfStables;
11,280✔
289
  createReq.buffer = pDb->cfg.buffer;
11,280✔
290
  createReq.pageSize = pDb->cfg.pageSize;
11,280✔
291
  createReq.pages = pDb->cfg.pages;
11,280✔
292
  createReq.cacheLastSize = pDb->cfg.cacheLastSize;
11,280✔
293
  createReq.daysPerFile = pDb->cfg.daysPerFile;
11,280✔
294
  createReq.daysToKeep0 = pDb->cfg.daysToKeep0;
11,280✔
295
  createReq.daysToKeep1 = pDb->cfg.daysToKeep1;
11,280✔
296
  createReq.daysToKeep2 = pDb->cfg.daysToKeep2;
11,280✔
297
  createReq.keepTimeOffset = pDb->cfg.keepTimeOffset;
11,280✔
298
  createReq.ssChunkSize = pDb->cfg.ssChunkSize;
11,280✔
299
  createReq.ssKeepLocal = pDb->cfg.ssKeepLocal;
11,280✔
300
  createReq.ssCompact = pDb->cfg.ssCompact;
11,280✔
301
  createReq.minRows = pDb->cfg.minRows;
11,280✔
302
  createReq.maxRows = pDb->cfg.maxRows;
11,280✔
303
  createReq.walFsyncPeriod = pDb->cfg.walFsyncPeriod;
11,280✔
304
  createReq.walLevel = pDb->cfg.walLevel;
11,280✔
305
  createReq.precision = pDb->cfg.precision;
11,280✔
306
  createReq.compression = pDb->cfg.compression;
11,280✔
307
  createReq.strict = pDb->cfg.strict;
11,280✔
308
  createReq.cacheLast = pDb->cfg.cacheLast;
11,280✔
309
  createReq.replica = 0;
11,280✔
310
  createReq.learnerReplica = 0;
11,280✔
311
  createReq.selfIndex = -1;
11,280✔
312
  createReq.learnerSelfIndex = -1;
11,280✔
313
  createReq.hashBegin = pVgroup->hashBegin;
11,280✔
314
  createReq.hashEnd = pVgroup->hashEnd;
11,280✔
315
  createReq.hashMethod = pDb->cfg.hashMethod;
11,280✔
316
  createReq.numOfRetensions = pDb->cfg.numOfRetensions;
11,280✔
317
  createReq.pRetensions = pDb->cfg.pRetensions;
11,280✔
318
  createReq.isTsma = pVgroup->isTsma;
11,280✔
319
  createReq.pTsma = pVgroup->pTsma;
11,280✔
320
  createReq.walRetentionPeriod = pDb->cfg.walRetentionPeriod;
11,280✔
321
  createReq.walRetentionSize = pDb->cfg.walRetentionSize;
11,280✔
322
  createReq.walRollPeriod = pDb->cfg.walRollPeriod;
11,280✔
323
  createReq.walSegmentSize = pDb->cfg.walSegmentSize;
11,280✔
324
  createReq.sstTrigger = pDb->cfg.sstTrigger;
11,280✔
325
  createReq.hashPrefix = pDb->cfg.hashPrefix;
11,280✔
326
  createReq.hashSuffix = pDb->cfg.hashSuffix;
11,280✔
327
  createReq.tsdbPageSize = pDb->cfg.tsdbPageSize;
11,280✔
328
  createReq.changeVersion = ++(pVgroup->syncConfChangeVer);
11,280✔
329
  createReq.encryptAlgorithm = pDb->cfg.encryptAlgorithm;
11,280✔
330
  int32_t code = 0;
11,280✔
331

332
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
26,146✔
333
    SReplica *pReplica = NULL;
14,866✔
334

335
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
14,866✔
336
      pReplica = &createReq.replicas[createReq.replica];
14,600✔
337
    } else {
338
      pReplica = &createReq.learnerReplicas[createReq.learnerReplica];
266✔
339
    }
340

341
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
14,866✔
342
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
14,866✔
343
    if (pVgidDnode == NULL) {
14,866!
344
      return NULL;
×
345
    }
346

347
    pReplica->id = pVgidDnode->id;
14,866✔
348
    pReplica->port = pVgidDnode->port;
14,866✔
349
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
14,866✔
350
    mndReleaseDnode(pMnode, pVgidDnode);
14,866✔
351

352
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
14,866✔
353
      if (pDnode->id == pVgid->dnodeId) {
14,600✔
354
        createReq.selfIndex = createReq.replica;
11,014✔
355
      }
356
    } else {
357
      if (pDnode->id == pVgid->dnodeId) {
266!
358
        createReq.learnerSelfIndex = createReq.learnerReplica;
266✔
359
      }
360
    }
361

362
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
14,866✔
363
      createReq.replica++;
14,600✔
364
    } else {
365
      createReq.learnerReplica++;
266✔
366
    }
367
  }
368

369
  if (createReq.selfIndex == -1 && createReq.learnerSelfIndex == -1) {
11,280!
370
    terrno = TSDB_CODE_APP_ERROR;
×
371
    return NULL;
×
372
  }
373

374
  createReq.changeVersion = pVgroup->syncConfChangeVer;
11,280✔
375

376
  mInfo(
11,280!
377
      "vgId:%d, build create vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d "
378
      "changeVersion:%d",
379
      createReq.vgId, createReq.replica, createReq.selfIndex, createReq.learnerReplica, createReq.learnerSelfIndex,
380
      createReq.strict, createReq.changeVersion);
381
  for (int32_t i = 0; i < createReq.replica; ++i) {
25,880✔
382
    mInfo("vgId:%d, replica:%d ep:%s:%u", createReq.vgId, i, createReq.replicas[i].fqdn, createReq.replicas[i].port);
14,600!
383
  }
384
  for (int32_t i = 0; i < createReq.learnerReplica; ++i) {
11,546✔
385
    mInfo("vgId:%d, replica:%d ep:%s:%u", createReq.vgId, i, createReq.learnerReplicas[i].fqdn,
266!
386
          createReq.learnerReplicas[i].port);
387
  }
388

389
  int32_t contLen = tSerializeSCreateVnodeReq(NULL, 0, &createReq);
11,280✔
390
  if (contLen < 0) {
11,280!
391
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
392
    return NULL;
×
393
  }
394

395
  void *pReq = taosMemoryMalloc(contLen);
11,280!
396
  if (pReq == NULL) {
11,280!
397
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
398
    return NULL;
×
399
  }
400

401
  code = tSerializeSCreateVnodeReq(pReq, contLen, &createReq);
11,280✔
402
  if (code < 0) {
11,280!
403
    terrno = TSDB_CODE_APP_ERROR;
×
404
    taosMemoryFree(pReq);
×
405
    mError("vgId:%d, failed to serialize create vnode req,since %s", createReq.vgId, terrstr());
×
406
    return NULL;
×
407
  }
408
  *pContLen = contLen;
11,280✔
409
  return pReq;
11,280✔
410
}
411

412
static void *mndBuildAlterVnodeConfigReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
483✔
413
  SAlterVnodeConfigReq alterReq = {0};
483✔
414
  alterReq.vgVersion = pVgroup->version;
483✔
415
  alterReq.buffer = pDb->cfg.buffer;
483✔
416
  alterReq.pageSize = pDb->cfg.pageSize;
483✔
417
  alterReq.pages = pDb->cfg.pages;
483✔
418
  alterReq.cacheLastSize = pDb->cfg.cacheLastSize;
483✔
419
  alterReq.daysPerFile = pDb->cfg.daysPerFile;
483✔
420
  alterReq.daysToKeep0 = pDb->cfg.daysToKeep0;
483✔
421
  alterReq.daysToKeep1 = pDb->cfg.daysToKeep1;
483✔
422
  alterReq.daysToKeep2 = pDb->cfg.daysToKeep2;
483✔
423
  alterReq.keepTimeOffset = pDb->cfg.keepTimeOffset;
483✔
424
  alterReq.walFsyncPeriod = pDb->cfg.walFsyncPeriod;
483✔
425
  alterReq.walLevel = pDb->cfg.walLevel;
483✔
426
  alterReq.strict = pDb->cfg.strict;
483✔
427
  alterReq.cacheLast = pDb->cfg.cacheLast;
483✔
428
  alterReq.sttTrigger = pDb->cfg.sstTrigger;
483✔
429
  alterReq.minRows = pDb->cfg.minRows;
483✔
430
  alterReq.walRetentionPeriod = pDb->cfg.walRetentionPeriod;
483✔
431
  alterReq.walRetentionSize = pDb->cfg.walRetentionSize;
483✔
432
  alterReq.ssKeepLocal = pDb->cfg.ssKeepLocal;
483✔
433
  alterReq.ssCompact = pDb->cfg.ssCompact;
483✔
434

435
  mInfo("vgId:%d, build alter vnode config req", pVgroup->vgId);
483!
436
  int32_t contLen = tSerializeSAlterVnodeConfigReq(NULL, 0, &alterReq);
483✔
437
  if (contLen < 0) {
483!
438
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
439
    return NULL;
×
440
  }
441
  contLen += sizeof(SMsgHead);
483✔
442

443
  void *pReq = taosMemoryMalloc(contLen);
483!
444
  if (pReq == NULL) {
483!
445
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
446
    return NULL;
×
447
  }
448

449
  SMsgHead *pHead = pReq;
483✔
450
  pHead->contLen = htonl(contLen);
483✔
451
  pHead->vgId = htonl(pVgroup->vgId);
483✔
452

453
  if (tSerializeSAlterVnodeConfigReq((char *)pReq + sizeof(SMsgHead), contLen, &alterReq) < 0) {
483!
454
    taosMemoryFree(pReq);
×
455
    mError("vgId:%d, failed to serialize alter vnode config req,since %s", pVgroup->vgId, terrstr());
×
456
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
457
    return NULL;
×
458
  }
459
  *pContLen = contLen;
483✔
460
  return pReq;
483✔
461
}
462

463
static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId,
2,153✔
464
                                          int32_t *pContLen) {
465
  SAlterVnodeReplicaReq alterReq = {
2,153✔
466
      .vgId = pVgroup->vgId,
2,153✔
467
      .strict = pDb->cfg.strict,
2,153✔
468
      .replica = 0,
469
      .learnerReplica = 0,
470
      .selfIndex = -1,
471
      .learnerSelfIndex = -1,
472
      .changeVersion = ++(pVgroup->syncConfChangeVer),
2,153✔
473
  };
474

475
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
8,978✔
476
    SReplica *pReplica = NULL;
6,825✔
477

478
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
6,825✔
479
      pReplica = &alterReq.replicas[alterReq.replica];
6,313✔
480
      alterReq.replica++;
6,313✔
481
    } else {
482
      pReplica = &alterReq.learnerReplicas[alterReq.learnerReplica];
512✔
483
      alterReq.learnerReplica++;
512✔
484
    }
485

486
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
6,825✔
487
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
6,825✔
488
    if (pVgidDnode == NULL) return NULL;
6,825!
489

490
    pReplica->id = pVgidDnode->id;
6,825✔
491
    pReplica->port = pVgidDnode->port;
6,825✔
492
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
6,825✔
493
    mndReleaseDnode(pMnode, pVgidDnode);
6,825✔
494

495
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
6,825✔
496
      if (dnodeId == pVgid->dnodeId) {
6,313✔
497
        alterReq.selfIndex = v;
2,153✔
498
      }
499
    } else {
500
      if (dnodeId == pVgid->dnodeId) {
512!
501
        alterReq.learnerSelfIndex = v;
×
502
      }
503
    }
504
  }
505

506
  mInfo(
2,153!
507
      "vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d "
508
      "changeVersion:%d",
509
      alterReq.vgId, alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica, alterReq.learnerSelfIndex,
510
      alterReq.strict, alterReq.changeVersion);
511
  for (int32_t i = 0; i < alterReq.replica; ++i) {
8,466✔
512
    mInfo("vgId:%d, replica:%d ep:%s:%u", alterReq.vgId, i, alterReq.replicas[i].fqdn, alterReq.replicas[i].port);
6,313!
513
  }
514
  for (int32_t i = 0; i < alterReq.learnerReplica; ++i) {
2,665✔
515
    mInfo("vgId:%d, learnerReplica:%d ep:%s:%u", alterReq.vgId, i, alterReq.learnerReplicas[i].fqdn,
512!
516
          alterReq.learnerReplicas[i].port);
517
  }
518

519
  if (alterReq.selfIndex == -1 && alterReq.learnerSelfIndex == -1) {
2,153!
520
    terrno = TSDB_CODE_APP_ERROR;
×
521
    return NULL;
×
522
  }
523

524
  int32_t contLen = tSerializeSAlterVnodeReplicaReq(NULL, 0, &alterReq);
2,153✔
525
  if (contLen < 0) {
2,153!
526
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
527
    return NULL;
×
528
  }
529

530
  void *pReq = taosMemoryMalloc(contLen);
2,153!
531
  if (pReq == NULL) {
2,153!
532
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
533
    return NULL;
×
534
  }
535

536
  if (tSerializeSAlterVnodeReplicaReq(pReq, contLen, &alterReq) < 0) {
2,153!
537
    mError("vgId:%d, failed to serialize alter vnode req,since %s", alterReq.vgId, terrstr());
×
538
    taosMemoryFree(pReq);
×
539
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
540
    return NULL;
×
541
  }
542
  *pContLen = contLen;
2,153✔
543
  return pReq;
2,153✔
544
}
545

546
static void *mndBuildCheckLearnCatchupReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId,
×
547
                                          int32_t *pContLen) {
548
  SCheckLearnCatchupReq req = {
×
549
      .vgId = pVgroup->vgId,
×
550
      .strict = pDb->cfg.strict,
×
551
      .replica = 0,
552
      .learnerReplica = 0,
553
      .selfIndex = -1,
554
      .learnerSelfIndex = -1,
555
  };
556

557
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
×
558
    SReplica *pReplica = NULL;
×
559

560
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
561
      pReplica = &req.replicas[req.replica];
×
562
      req.replica++;
×
563
    } else {
564
      pReplica = &req.learnerReplicas[req.learnerReplica];
×
565
      req.learnerReplica++;
×
566
    }
567

568
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
×
569
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
×
570
    if (pVgidDnode == NULL) return NULL;
×
571

572
    pReplica->id = pVgidDnode->id;
×
573
    pReplica->port = pVgidDnode->port;
×
574
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
×
575
    mndReleaseDnode(pMnode, pVgidDnode);
×
576

577
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
578
      if (dnodeId == pVgid->dnodeId) {
×
579
        req.selfIndex = v;
×
580
      }
581
    } else {
582
      if (dnodeId == pVgid->dnodeId) {
×
583
        req.learnerSelfIndex = v;
×
584
      }
585
    }
586
  }
587

588
  mInfo("vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d",
×
589
        req.vgId, req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict);
590
  for (int32_t i = 0; i < req.replica; ++i) {
×
591
    mInfo("vgId:%d, replica:%d ep:%s:%u", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port);
×
592
  }
593
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
×
594
    mInfo("vgId:%d, learnerReplica:%d ep:%s:%u", req.vgId, i, req.learnerReplicas[i].fqdn, req.learnerReplicas[i].port);
×
595
  }
596

597
  if (req.selfIndex == -1 && req.learnerSelfIndex == -1) {
×
598
    terrno = TSDB_CODE_APP_ERROR;
×
599
    return NULL;
×
600
  }
601

602
  int32_t contLen = tSerializeSAlterVnodeReplicaReq(NULL, 0, &req);
×
603
  if (contLen < 0) {
×
604
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
605
    return NULL;
×
606
  }
607

608
  void *pReq = taosMemoryMalloc(contLen);
×
609
  if (pReq == NULL) {
×
610
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
611
    return NULL;
×
612
  }
613

614
  if (tSerializeSAlterVnodeReplicaReq(pReq, contLen, &req) < 0) {
×
615
    mError("vgId:%d, failed to serialize alter vnode req,since %s", req.vgId, terrstr());
×
616
    taosMemoryFree(pReq);
×
617
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
618
    return NULL;
×
619
  }
620
  *pContLen = contLen;
×
621
  return pReq;
×
622
}
623

624
static void *mndBuildDisableVnodeWriteReq(SMnode *pMnode, SDbObj *pDb, int32_t vgId, int32_t *pContLen) {
56✔
625
  SDisableVnodeWriteReq disableReq = {
56✔
626
      .vgId = vgId,
627
      .disable = 1,
628
  };
629

630
  mInfo("vgId:%d, build disable vnode write req", vgId);
56!
631
  int32_t contLen = tSerializeSDisableVnodeWriteReq(NULL, 0, &disableReq);
56✔
632
  if (contLen < 0) {
56!
633
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
634
    return NULL;
×
635
  }
636

637
  void *pReq = taosMemoryMalloc(contLen);
56!
638
  if (pReq == NULL) {
56!
639
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
640
    return NULL;
×
641
  }
642

643
  if (tSerializeSDisableVnodeWriteReq(pReq, contLen, &disableReq) < 0) {
56!
644
    mError("vgId:%d, failed to serialize disable vnode write req,since %s", vgId, terrstr());
×
645
    taosMemoryFree(pReq);
×
646
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
647
    return NULL;
×
648
  }
649
  *pContLen = contLen;
56✔
650
  return pReq;
56✔
651
}
652

653
static void *mndBuildAlterVnodeHashRangeReq(SMnode *pMnode, int32_t srcVgId, SVgObj *pVgroup, int32_t *pContLen) {
56✔
654
  SAlterVnodeHashRangeReq alterReq = {
56✔
655
      .srcVgId = srcVgId,
656
      .dstVgId = pVgroup->vgId,
56✔
657
      .hashBegin = pVgroup->hashBegin,
56✔
658
      .hashEnd = pVgroup->hashEnd,
56✔
659
      .changeVersion = ++(pVgroup->syncConfChangeVer),
56✔
660
  };
661

662
  mInfo("vgId:%d, build alter vnode hashrange req, dstVgId:%d, hashrange:[%u, %u]", srcVgId, pVgroup->vgId,
56!
663
        pVgroup->hashBegin, pVgroup->hashEnd);
664
  int32_t contLen = tSerializeSAlterVnodeHashRangeReq(NULL, 0, &alterReq);
56✔
665
  if (contLen < 0) {
56!
666
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
667
    return NULL;
×
668
  }
669

670
  void *pReq = taosMemoryMalloc(contLen);
56!
671
  if (pReq == NULL) {
56!
672
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
673
    return NULL;
×
674
  }
675

676
  if (tSerializeSAlterVnodeHashRangeReq(pReq, contLen, &alterReq) < 0) {
56!
677
    mError("vgId:%d, failed to serialize alter vnode hashrange req,since %s", srcVgId, terrstr());
×
678
    taosMemoryFree(pReq);
×
679
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
680
    return NULL;
×
681
  }
682
  *pContLen = contLen;
56✔
683
  return pReq;
56✔
684
}
685

686
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
15,508✔
687
  SDropVnodeReq dropReq = {0};
15,508✔
688
  dropReq.dnodeId = pDnode->id;
15,508✔
689
  dropReq.vgId = pVgroup->vgId;
15,508✔
690
  memcpy(dropReq.db, pDb->name, TSDB_DB_FNAME_LEN);
15,508✔
691
  dropReq.dbUid = pDb->uid;
15,508✔
692

693
  mInfo("vgId:%d, build drop vnode req", dropReq.vgId);
15,508!
694
  int32_t contLen = tSerializeSDropVnodeReq(NULL, 0, &dropReq);
15,508✔
695
  if (contLen < 0) {
15,508!
696
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
697
    return NULL;
×
698
  }
699

700
  void *pReq = taosMemoryMalloc(contLen);
15,508!
701
  if (pReq == NULL) {
15,508!
702
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
703
    return NULL;
×
704
  }
705

706
  if (tSerializeSDropVnodeReq(pReq, contLen, &dropReq) < 0) {
15,508!
707
    mError("vgId:%d, failed to serialize drop vnode req,since %s", dropReq.vgId, terrstr());
×
708
    taosMemoryFree(pReq);
×
709
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
710
    return NULL;
×
711
  }
712
  *pContLen = contLen;
15,508✔
713
  return pReq;
15,508✔
714
}
715

716
static bool mndResetDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
7,110✔
717
  SDnodeObj *pDnode = pObj;
7,110✔
718
  pDnode->numOfVnodes = 0;
7,110✔
719
  pDnode->numOfOtherNodes = 0;
7,110✔
720
  return true;
7,110✔
721
}
722

723
static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
7,104✔
724
  SDnodeObj *pDnode = pObj;
7,104✔
725
  SArray    *pArray = p1;
7,104✔
726
  int32_t    exceptDnodeId = *(int32_t *)p2;
7,104✔
727
  SArray    *dnodeList = p3;
7,104✔
728

729
  if (exceptDnodeId == pDnode->id) {
7,104✔
730
    return true;
27✔
731
  }
732

733
  if (dnodeList != NULL) {
7,077✔
734
    int32_t dnodeListSize = taosArrayGetSize(dnodeList);
196✔
735
    if (dnodeListSize > 0) {
196!
736
      bool inDnodeList = false;
196✔
737
      for (int32_t index = 0; index < dnodeListSize; ++index) {
638✔
738
        int32_t dnodeId = *(int32_t *)taosArrayGet(dnodeList, index);
442✔
739
        if (pDnode->id == dnodeId) {
442✔
740
          inDnodeList = true;
90✔
741
        }
742
      }
743
      if (!inDnodeList) {
196✔
744
        return true;
106✔
745
      }
746
    } else {
747
      return true;  // TS-6191
×
748
    }
749
  }
750

751
  int64_t curMs = taosGetTimestampMs();
6,971✔
752
  bool    online = mndIsDnodeOnline(pDnode, curMs);
6,971✔
753
  bool    isMnode = mndIsMnode(pMnode, pDnode->id);
6,971✔
754
  pDnode->numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
6,971✔
755
  pDnode->memUsed = mndGetVnodesMemory(pMnode, pDnode->id);
6,971✔
756

757
  mInfo("dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d online:%d memory avail:%" PRId64 " used:%" PRId64, pDnode->id,
6,971!
758
        pDnode->numOfVnodes, pDnode->numOfSupportVnodes, isMnode, online, pDnode->memAvail, pDnode->memUsed);
759

760
  if (isMnode) {
6,971✔
761
    pDnode->numOfOtherNodes++;
5,635✔
762
  }
763

764
  if (online && pDnode->numOfSupportVnodes > 0) {
6,971✔
765
    if (taosArrayPush(pArray, pDnode) == NULL) return false;
6,291!
766
  }
767
  return true;
6,971✔
768
}
769

770
static bool isDnodeInList(SArray *dnodeList, int32_t dnodeId) {
2✔
771
  int32_t dnodeListSize = taosArrayGetSize(dnodeList);
2✔
772
  for (int32_t i = 0; i < dnodeListSize; ++i) {
5!
773
    int32_t id = *(int32_t *)TARRAY_GET_ELEM(dnodeList, i);
5✔
774
    if (id == dnodeId) {
5✔
775
      return true;
2✔
776
    }
777
  }
778
  return false;
×
779
}
780

781
#ifdef TD_ENTERPRISE
782
static float mndGetDnodeScore1(SDnodeObj *pDnode, int32_t additionDnodes, float ratio) {
12✔
783
  float totalDnodes = pDnode->numOfVnodes + (float)pDnode->numOfOtherNodes * ratio + additionDnodes;
12✔
784
  float result = totalDnodes / pDnode->numOfSupportVnodes;
12✔
785
  return pDnode->numOfVnodes > 0 ? -result : result;
12✔
786
}
787

788
static int32_t mndCompareDnodeVnodes1(SDnodeObj *pDnode1, SDnodeObj *pDnode2) {
6✔
789
  float d1Score = mndGetDnodeScore1(pDnode1, 0, 0.9);
6✔
790
  float d2Score = mndGetDnodeScore1(pDnode2, 0, 0.9);
6✔
791
  if (d1Score == d2Score) {
6✔
792
    if (pDnode1->id == pDnode2->id) {
2!
793
      return 0;
×
794
    }
795
    return pDnode1->id > pDnode2->id ? 1 : -1;
2!
796
  }
797
  return d1Score > d2Score ? 1 : -1;
4!
798
}
799

800
static bool mndBuildDnodesListFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
6✔
801
  SDnodeObj *pDnode = pObj;
6✔
802
  SArray    *pArray = p1;
6✔
803

804
  bool isMnode = mndIsMnode(pMnode, pDnode->id);
6✔
805
  pDnode->numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
6✔
806

807
  if (isMnode) {
6✔
808
    pDnode->numOfOtherNodes++;
2✔
809
  }
810

811
  if (pDnode->numOfSupportVnodes > 0) {
6!
812
    if (taosArrayPush(pArray, pDnode) == NULL) return false;
6!
813
  }
814
  return true;
6✔
815
}
816

817
// TS-6191
818
static int32_t mndBuildNodesCheckDualReplica(SMnode *pMnode, int32_t nDnodes, SArray *dnodeList, SArray **ppDnodeList) {
5,356✔
819
  int32_t code = 0;
5,356✔
820
  if (!grantCheckDualReplicaDnodes(pMnode)) {
5,356✔
821
    TAOS_RETURN(code);
5,354✔
822
  }
823
  SSdb   *pSdb = pMnode->pSdb;
2✔
824
  SArray *pArray = taosArrayInit(nDnodes, sizeof(SDnodeObj));
2✔
825
  if (pArray == NULL) {
2!
826
    TAOS_RETURN(code = terrno);
×
827
  }
828
  *ppDnodeList = pArray;
2✔
829

830
  sdbTraverse(pSdb, SDB_DNODE, mndResetDnodesArrayFp, NULL, NULL, NULL);
2✔
831
  sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesListFp, pArray, NULL, NULL);
2✔
832

833
  int32_t arrSize = taosArrayGetSize(pArray);
2✔
834
  if (arrSize <= 0) {
2!
835
    TAOS_RETURN(code);
×
836
  }
837
  if (arrSize > 1) taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes1);
2!
838

839
  int32_t dnodeListSize = taosArrayGetSize(dnodeList);
2✔
840
  if (dnodeListSize <= 0) {
2✔
841
    if (arrSize > 2) taosArrayRemoveBatch(pArray, 2, arrSize - 2, NULL);
1!
842
  } else {
843
    int32_t nDnodesWithVnodes = 0;
1✔
844
    for (int32_t i = 0; i < arrSize; ++i) {
3!
845
      SDnodeObj *pDnode = TARRAY_GET_ELEM(pArray, i);
3✔
846
      if (pDnode->numOfVnodes <= 0) {
3✔
847
        break;
1✔
848
      }
849
      ++nDnodesWithVnodes;
2✔
850
    }
851
    int32_t dnodeId = -1;
1✔
852
    if (nDnodesWithVnodes == 1) {
1!
853
      dnodeId = ((SDnodeObj *)TARRAY_GET_ELEM(pArray, 0))->id;
×
854
    } else if (nDnodesWithVnodes >= 2) {
1!
855
      // must select the dnodes from the 1st 2 dnodes
856
      taosArrayRemoveBatch(pArray, 2, arrSize - 2, NULL);
1✔
857
    }
858
    for (int32_t i = 0; i < TARRAY_SIZE(pArray);) {
3✔
859
      SDnodeObj *pDnode = taosArrayGet(pArray, i);
2✔
860
      if (!isDnodeInList(dnodeList, pDnode->id)) {
2!
861
        taosArrayRemove(pArray, i);
×
862
        continue;
×
863
      }
864
      ++i;
2✔
865
    }
866
    if (nDnodesWithVnodes == 1) {
1!
867
      SDnodeObj *pDnode = taosArrayGet(pArray, 0);
×
868
      if (pDnode && (pDnode->id != dnodeId)) {  // the first dnode is not in dnodeList, remove the last element
×
869
        taosArrayRemove(pArray, taosArrayGetSize(pArray) - 1);
×
870
      }
871
    }
872
  }
873

874
  TAOS_RETURN(code);
2✔
875
}
876
#endif
877

878
SArray *mndBuildDnodesArray(SMnode *pMnode, int32_t exceptDnodeId, SArray *dnodeList) {
5,356✔
879
  SSdb   *pSdb = pMnode->pSdb;
5,356✔
880
  int32_t numOfDnodes = mndGetDnodeSize(pMnode);
5,356✔
881
  SArray *tDnodeList = NULL;
5,356✔
882
  SArray *pDnodeList = NULL;
5,356✔
883

884
  SArray *pArray = taosArrayInit(numOfDnodes, sizeof(SDnodeObj));
5,356✔
885
  if (pArray == NULL) {
5,356!
886
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
887
    return NULL;
×
888
  }
889
  if (taosArrayGetSize(dnodeList) > 0) {
5,356✔
890
    tDnodeList = dnodeList;
39✔
891
  }
892
#ifdef TD_ENTERPRISE
893
  if (0 != mndBuildNodesCheckDualReplica(pMnode, numOfDnodes, tDnodeList, &pDnodeList)) {
5,356!
894
    taosArrayDestroy(pArray);
×
895
    return NULL;
×
896
  }
897
#endif
898
  sdbTraverse(pSdb, SDB_DNODE, mndResetDnodesArrayFp, NULL, NULL, NULL);
5,356✔
899
  sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesArrayFp, pArray, &exceptDnodeId, pDnodeList ? pDnodeList : tDnodeList);
5,356✔
900

901
  mDebug("build %d dnodes array", (int32_t)taosArrayGetSize(pArray));
5,356✔
902
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
11,647✔
903
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
6,291✔
904
    mDebug("dnode:%d, vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes);
6,291✔
905
  }
906
  taosArrayDestroy(pDnodeList);
5,356✔
907
  return pArray;
5,356✔
908
}
909

910
static int32_t mndCompareDnodeId(int32_t *dnode1Id, int32_t *dnode2Id) {
×
911
  if (*dnode1Id == *dnode2Id) {
×
912
    return 0;
×
913
  }
914
  return *dnode1Id > *dnode2Id ? 1 : -1;
×
915
}
916

917
static float mndGetDnodeScore(SDnodeObj *pDnode, int32_t additionDnodes, float ratio) {
23,769✔
918
  float totalDnodes = pDnode->numOfVnodes + (float)pDnode->numOfOtherNodes * ratio + additionDnodes;
23,769✔
919
  return totalDnodes / pDnode->numOfSupportVnodes;
23,769✔
920
}
921

922
static int32_t mndCompareDnodeVnodes(SDnodeObj *pDnode1, SDnodeObj *pDnode2) {
5,536✔
923
  float d1Score = mndGetDnodeScore(pDnode1, 0, 0.9);
5,536✔
924
  float d2Score = mndGetDnodeScore(pDnode2, 0, 0.9);
5,536✔
925
  if (d1Score == d2Score) {
5,536✔
926
    return 0;
1,987✔
927
  }
928
  return d1Score > d2Score ? 1 : -1;
3,549✔
929
}
930

931
void mndSortVnodeGid(SVgObj *pVgroup) {
10,081✔
932
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
21,343✔
933
    for (int32_t j = 0; j < pVgroup->replica - 1 - i; ++j) {
13,027✔
934
      if (pVgroup->vnodeGid[j].dnodeId > pVgroup->vnodeGid[j + 1].dnodeId) {
1,765✔
935
        TSWAP(pVgroup->vnodeGid[j], pVgroup->vnodeGid[j + 1]);
631✔
936
      }
937
    }
938
  }
939
}
10,081✔
940

941
static int32_t mndGetAvailableDnode(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, SArray *pArray) {
10,482✔
942
  mDebug("start to sort %d dnodes", (int32_t)taosArrayGetSize(pArray));
10,482✔
943
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
10,482✔
944
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
23,559✔
945
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
13,077✔
946
    mDebug("dnode:%d, score:%f", pDnode->id, mndGetDnodeScore(pDnode, 0, 0.9));
13,077✔
947
  }
948

949
  int32_t size = taosArrayGetSize(pArray);
10,482✔
950
  if (size < pVgroup->replica) {
10,482✔
951
    mError("db:%s, vgId:%d, no enough online dnodes:%d to alloc %d replica", pVgroup->dbName, pVgroup->vgId, size,
471!
952
           pVgroup->replica);
953
    TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_DNODES);
471✔
954
  }
955

956
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
21,079✔
957
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
11,068✔
958
    SDnodeObj *pDnode = taosArrayGet(pArray, v);
11,068✔
959
    if (pDnode == NULL) {
11,068!
960
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_DNODES);
×
961
    }
962
    if (pDnode->numOfVnodes >= pDnode->numOfSupportVnodes) {
11,068!
963
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_VNODES);
×
964
    }
965

966
    int64_t vgMem = mndGetVgroupMemory(pMnode, pDb, pVgroup);
11,068✔
967
    if (pDnode->memAvail - vgMem - pDnode->memUsed <= 0) {
11,068!
968
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d, avail:%" PRId64 " used:%" PRId64,
×
969
             pVgroup->dbName, pVgroup->vgId, vgMem, pDnode->id, pDnode->memAvail, pDnode->memUsed);
970
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE);
×
971
    } else {
972
      pDnode->memUsed += vgMem;
11,068✔
973
    }
974

975
    pVgid->dnodeId = pDnode->id;
11,068✔
976
    if (pVgroup->replica == 1) {
11,068✔
977
      pVgid->syncState = TAOS_SYNC_STATE_LEADER;
9,476✔
978
    } else {
979
      pVgid->syncState = TAOS_SYNC_STATE_FOLLOWER;
1,592✔
980
    }
981

982
    mInfo("db:%s, vgId:%d, vn:%d is alloced, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
11,068!
983
          pVgroup->dbName, pVgroup->vgId, v, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
984
    pDnode->numOfVnodes++;
11,068✔
985
  }
986

987
  mndSortVnodeGid(pVgroup);
10,011✔
988
  return 0;
10,011✔
989
}
990

991
int32_t mndAllocSmaVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup) {
×
992
  int32_t code = 0;
×
993
  SArray *pArray = mndBuildDnodesArray(pMnode, 0, NULL);
×
994
  if (pArray == NULL) {
×
995
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
996
    if (terrno != 0) code = terrno;
×
997
    TAOS_RETURN(code);
×
998
  }
999

1000
  pVgroup->vgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
×
1001
  pVgroup->isTsma = 1;
×
1002
  pVgroup->createdTime = taosGetTimestampMs();
×
1003
  pVgroup->updateTime = pVgroup->createdTime;
×
1004
  pVgroup->version = 1;
×
1005
  memcpy(pVgroup->dbName, pDb->name, TSDB_DB_FNAME_LEN);
×
1006
  pVgroup->dbUid = pDb->uid;
×
1007
  pVgroup->replica = 1;
×
1008

1009
  if (mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray) != 0) return -1;
×
1010
  taosArrayDestroy(pArray);
×
1011

1012
  mInfo("db:%s, sma vgId:%d is alloced", pDb->name, pVgroup->vgId);
×
1013
  return 0;
×
1014
}
1015

1016
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups, SArray *dnodeList) {
5,139✔
1017
  int32_t code = -1;
5,139✔
1018
  SArray *pArray = NULL;
5,139✔
1019
  SVgObj *pVgroups = NULL;
5,139✔
1020

1021
  pVgroups = taosMemoryCalloc(pDb->cfg.numOfVgroups, sizeof(SVgObj));
5,139!
1022
  if (pVgroups == NULL) {
5,139!
1023
    code = terrno;
×
1024
    goto _OVER;
×
1025
  }
1026

1027
  pArray = mndBuildDnodesArray(pMnode, 0, dnodeList);
5,139✔
1028
  if (pArray == NULL) {
5,139!
1029
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1030
    if (terrno != 0) code = terrno;
×
1031
    goto _OVER;
×
1032
  }
1033

1034
  mInfo("db:%s, total %d dnodes used to create %d vgroups (%d vnodes)", pDb->name, (int32_t)taosArrayGetSize(pArray),
5,139!
1035
        pDb->cfg.numOfVgroups, pDb->cfg.numOfVgroups * pDb->cfg.replications);
1036

1037
  int32_t  allocedVgroups = 0;
5,139✔
1038
  int32_t  maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
5,139✔
1039
  uint32_t hashMin = 0;
5,139✔
1040
  uint32_t hashMax = UINT32_MAX;
5,139✔
1041
  uint32_t hashInterval = (hashMax - hashMin) / pDb->cfg.numOfVgroups;
5,139✔
1042

1043
  if (maxVgId < 2) maxVgId = 2;
5,139✔
1044

1045
  for (uint32_t v = 0; v < pDb->cfg.numOfVgroups; v++) {
15,150✔
1046
    SVgObj *pVgroup = &pVgroups[v];
10,482✔
1047
    pVgroup->vgId = maxVgId++;
10,482✔
1048
    pVgroup->createdTime = taosGetTimestampMs();
10,482✔
1049
    pVgroup->updateTime = pVgroups->createdTime;
10,482✔
1050
    pVgroup->version = 1;
10,482✔
1051
    pVgroup->hashBegin = hashMin + hashInterval * v;
10,482✔
1052
    if (v == pDb->cfg.numOfVgroups - 1) {
10,482✔
1053
      pVgroup->hashEnd = hashMax;
4,779✔
1054
    } else {
1055
      pVgroup->hashEnd = hashMin + hashInterval * (v + 1) - 1;
5,703✔
1056
    }
1057

1058
    memcpy(pVgroup->dbName, pDb->name, TSDB_DB_FNAME_LEN);
10,482✔
1059
    pVgroup->dbUid = pDb->uid;
10,482✔
1060
    pVgroup->replica = pDb->cfg.replications;
10,482✔
1061

1062
    if ((code = mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray)) != 0) {
10,482✔
1063
      goto _OVER;
471✔
1064
    }
1065

1066
    allocedVgroups++;
10,011✔
1067
  }
1068

1069
  *ppVgroups = pVgroups;
4,668✔
1070
  code = 0;
4,668✔
1071

1072
  mInfo("db:%s, total %d vgroups is alloced, replica:%d", pDb->name, pDb->cfg.numOfVgroups, pDb->cfg.replications);
4,668!
1073

1074
_OVER:
×
1075
  if (code != 0) taosMemoryFree(pVgroups);
5,139!
1076
  taosArrayDestroy(pArray);
5,139✔
1077
  TAOS_RETURN(code);
5,139✔
1078
}
1079

1080
SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup) {
101,889✔
1081
  SEpSet epset = {0};
101,889✔
1082

1083
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
222,341✔
1084
    const SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
120,452✔
1085
    SDnodeObj       *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
120,452✔
1086
    if (pDnode == NULL) continue;
120,452✔
1087

1088
    if (pVgid->syncState == TAOS_SYNC_STATE_LEADER || pVgid->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
120,398!
1089
      epset.inUse = epset.numOfEps;
99,835✔
1090
    }
1091

1092
    if (addEpIntoEpSet(&epset, pDnode->fqdn, pDnode->port) != 0) {
120,398!
1093
      mWarn("vgId:%d, failed to add ep:%s:%d into epset", pVgroup->vgId, pDnode->fqdn, pDnode->port);
×
1094
    }
1095
    mndReleaseDnode(pMnode, pDnode);
120,398✔
1096
  }
1097
  epsetSort(&epset);
101,889✔
1098

1099
  return epset;
101,889✔
1100
}
1101

1102
SEpSet mndGetVgroupEpsetById(SMnode *pMnode, int32_t vgId) {
26✔
1103
  SEpSet epset = {0};
26✔
1104

1105
  SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
26✔
1106
  if (!pVgroup) return epset;
26!
1107

1108
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
78✔
1109
    const SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
52✔
1110
    SDnodeObj       *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
52✔
1111
    if (pDnode == NULL) continue;
52!
1112

1113
    if (pVgid->syncState == TAOS_SYNC_STATE_LEADER || pVgid->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
52!
1114
      epset.inUse = epset.numOfEps;
8✔
1115
    }
1116

1117
    if (addEpIntoEpSet(&epset, pDnode->fqdn, pDnode->port) != 0) {
52!
1118
      mWarn("vgId:%d, failed to add ep:%s:%d into epset", pVgroup->vgId, pDnode->fqdn, pDnode->port);
×
1119
    }
1120
    mndReleaseDnode(pMnode, pDnode);
52✔
1121
  }
1122

1123
  mndReleaseVgroup(pMnode, pVgroup);
26✔
1124
  return epset;
26✔
1125
}
1126

1127
static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
818✔
1128
  SMnode *pMnode = pReq->info.node;
818✔
1129
  SSdb   *pSdb = pMnode->pSdb;
818✔
1130
  int32_t numOfRows = 0;
818✔
1131
  SVgObj *pVgroup = NULL;
818✔
1132
  int32_t cols = 0;
818✔
1133
  int64_t curMs = taosGetTimestampMs();
818✔
1134
  int32_t code = 0, lino = 0;
818✔
1135

1136
  SDbObj *pDb = NULL;
818✔
1137
  if (strlen(pShow->db) > 0) {
818✔
1138
    pDb = mndAcquireDb(pMnode, pShow->db);
768✔
1139
    if (pDb == NULL) {
768!
1140
      return 0;
×
1141
    }
1142
  }
1143

1144
  while (numOfRows < rows) {
3,644!
1145
    pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
3,644✔
1146
    if (pShow->pIter == NULL) break;
3,644✔
1147

1148
    if (pDb != NULL && pVgroup->dbUid != pDb->uid) {
2,826✔
1149
      sdbRelease(pSdb, pVgroup);
552✔
1150
      continue;
552✔
1151
    }
1152

1153
    cols = 0;
2,274✔
1154
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,274✔
1155
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->vgId, false, pVgroup, pShow->pIter, _OVER);
2,274!
1156

1157
    SName name = {0};
2,274✔
1158
    char  db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
2,274✔
1159
    code = tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB);
2,274✔
1160
    if (code != 0) {
2,274!
1161
      mError("vgId:%d, failed to set dbName, since %s", pVgroup->vgId, tstrerror(code));
×
1162
      sdbRelease(pSdb, pVgroup);
×
1163
      sdbCancelFetch(pSdb, pShow->pIter);
×
1164
      return code;
×
1165
    }
1166
    (void)tNameGetDbName(&name, varDataVal(db));
2,274✔
1167
    varDataSetLen(db, strlen(varDataVal(db)));
2,274✔
1168

1169
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,274✔
1170
    COL_DATA_SET_VAL_GOTO((const char *)db, false, pVgroup, pShow->pIter, _OVER);
2,274!
1171

1172
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,274✔
1173
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->numOfTables, false, pVgroup, pShow->pIter, _OVER);
2,274!
1174

1175
    // default 3 replica, add 1 replica if move vnode
1176
    for (int32_t i = 0; i < 4; ++i) {
11,370✔
1177
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
9,096✔
1178
      if (i < pVgroup->replica) {
9,096✔
1179
        int16_t dnodeId = (int16_t)pVgroup->vnodeGid[i].dnodeId;
4,303✔
1180
        COL_DATA_SET_VAL_GOTO((const char *)&dnodeId, false, pVgroup, pShow->pIter, _OVER);
4,303!
1181

1182
        bool       exist = false;
4,303✔
1183
        bool       online = false;
4,303✔
1184
        SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgroup->vnodeGid[i].dnodeId);
4,303✔
1185
        if (pDnode != NULL) {
4,303!
1186
          exist = true;
4,303✔
1187
          online = mndIsDnodeOnline(pDnode, curMs);
4,303✔
1188
          mndReleaseDnode(pMnode, pDnode);
4,303✔
1189
        }
1190

1191
        char buf1[20] = {0};
4,303✔
1192
        char role[20] = "offline";
4,303✔
1193
        if (!exist) {
4,303!
1194
          tstrncpy(role, "dropping", sizeof(role));
×
1195
        } else if (online) {
4,303✔
1196
          char *star = "";
4,144✔
1197
          if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEADER ||
4,144✔
1198
              pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
2,313!
1199
            if (!pVgroup->vnodeGid[i].syncRestore && !pVgroup->vnodeGid[i].syncCanRead) {
1,831!
1200
              star = "**";
498✔
1201
            } else if (!pVgroup->vnodeGid[i].syncRestore && pVgroup->vnodeGid[i].syncCanRead) {
1,333!
1202
              star = "*";
×
1203
            } else {
1204
            }
1205
          }
1206
          snprintf(role, sizeof(role), "%s%s", syncStr(pVgroup->vnodeGid[i].syncState), star);
4,144✔
1207
          /*
1208
          mInfo("db:%s, learner progress:%d", pDb->name, pVgroup->vnodeGid[i].learnerProgress);
1209

1210
          if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEARNER) {
1211
            if(pVgroup->vnodeGid[i].learnerProgress < 0){
1212
              snprintf(role, sizeof(role), "%s-",
1213
                syncStr(pVgroup->vnodeGid[i].syncState));
1214

1215
            }
1216
            else if(pVgroup->vnodeGid[i].learnerProgress >= 100){
1217
              snprintf(role, sizeof(role), "%s--",
1218
                syncStr(pVgroup->vnodeGid[i].syncState));
1219
            }
1220
            else{
1221
              snprintf(role, sizeof(role), "%s%d",
1222
                syncStr(pVgroup->vnodeGid[i].syncState), pVgroup->vnodeGid[i].learnerProgress);
1223
            }
1224
          }
1225
          else{
1226
            snprintf(role, sizeof(role), "%s%s", syncStr(pVgroup->vnodeGid[i].syncState), star);
1227
          }
1228
          */
1229
        } else {
1230
        }
1231
        STR_WITH_MAXSIZE_TO_VARSTR(buf1, role, pShow->pMeta->pSchemas[cols].bytes);
4,303✔
1232

1233
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
4,303✔
1234
        COL_DATA_SET_VAL_GOTO((const char *)buf1, false, pVgroup, pShow->pIter, _OVER);
4,303!
1235

1236
        char applyStr[TSDB_SYNC_APPLY_COMMIT_LEN + 1] = {0};
4,303✔
1237
        char buf[TSDB_SYNC_APPLY_COMMIT_LEN + VARSTR_HEADER_SIZE + 1] = {0};
4,303✔
1238
        snprintf(applyStr, sizeof(applyStr), "%" PRId64 "/%" PRId64, pVgroup->vnodeGid[i].syncAppliedIndex,
4,303✔
1239
                 pVgroup->vnodeGid[i].syncCommitIndex);
4,303✔
1240
        STR_WITH_MAXSIZE_TO_VARSTR(buf, applyStr, pShow->pMeta->pSchemas[cols].bytes);
4,303✔
1241

1242
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
4,303✔
1243
        COL_DATA_SET_VAL_GOTO((const char *)&buf, false, pVgroup, pShow->pIter, _OVER);
4,303!
1244
      } else {
1245
        colDataSetNULL(pColInfo, numOfRows);
4,793!
1246
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
4,793✔
1247
        colDataSetNULL(pColInfo, numOfRows);
4,793!
1248
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
4,793✔
1249
        colDataSetNULL(pColInfo, numOfRows);
4,793!
1250
      }
1251
    }
1252

1253
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,274✔
1254
    int32_t cacheUsage = (int32_t)pVgroup->cacheUsage;
2,274✔
1255
    COL_DATA_SET_VAL_GOTO((const char *)&cacheUsage, false, pVgroup, pShow->pIter, _OVER);
2,274!
1256

1257
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,274✔
1258
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->numOfCachedTables, false, pVgroup, pShow->pIter, _OVER);
2,274!
1259

1260
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,274✔
1261
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->isTsma, false, pVgroup, pShow->pIter, _OVER);
2,274!
1262

1263
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,274✔
1264
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->mountVgId, false, pVgroup, pShow->pIter, _OVER);
2,274!
1265

1266
    numOfRows++;
2,274✔
1267
    sdbRelease(pSdb, pVgroup);
2,274✔
1268
  }
1269
_OVER:
×
1270
  if (pDb != NULL) {
818✔
1271
    mndReleaseDb(pMnode, pDb);
768✔
1272
  }
1273
  if (code != 0) {
818!
1274
    mError("failed to retrieve vgroup info at line %d since %s", lino, tstrerror(code));
×
1275
    TAOS_RETURN(code);
×
1276
  }
1277

1278
  pShow->numOfRows += numOfRows;
818✔
1279
  return numOfRows;
818✔
1280
}
1281

1282
static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter) {
×
1283
  SSdb *pSdb = pMnode->pSdb;
×
1284
  sdbCancelFetchByType(pSdb, pIter, SDB_VGROUP);
×
1285
}
×
1286

1287
static bool mndGetVnodesNumFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
28,574✔
1288
  SVgObj  *pVgroup = pObj;
28,574✔
1289
  int32_t  dnodeId = *(int32_t *)p1;
28,574✔
1290
  int32_t *pNumOfVnodes = (int32_t *)p2;
28,574✔
1291

1292
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
79,203✔
1293
    if (pVgroup->vnodeGid[v].dnodeId == dnodeId) {
50,629✔
1294
      (*pNumOfVnodes)++;
16,768✔
1295
    }
1296
  }
1297

1298
  return true;
28,574✔
1299
}
1300

1301
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) {
13,602✔
1302
  int32_t numOfVnodes = 0;
13,602✔
1303
  sdbTraverse(pMnode->pSdb, SDB_VGROUP, mndGetVnodesNumFp, &dnodeId, &numOfVnodes, NULL);
13,602✔
1304
  return numOfVnodes;
13,602✔
1305
}
1306

1307
int64_t mndGetVgroupMemory(SMnode *pMnode, SDbObj *pDbInput, SVgObj *pVgroup) {
24,077✔
1308
  SDbObj *pDb = pDbInput;
24,077✔
1309
  if (pDbInput == NULL) {
24,077✔
1310
    pDb = mndAcquireDb(pMnode, pVgroup->dbName);
11,363✔
1311
  }
1312

1313
  int64_t vgroupMemroy = 0;
24,077✔
1314
  if (pDb != NULL) {
24,077!
1315
    int64_t buffer = (int64_t)pDb->cfg.buffer * 1024 * 1024;
24,077✔
1316
    int64_t cache = (int64_t)pDb->cfg.pages * pDb->cfg.pageSize * 1024;
24,077✔
1317
    vgroupMemroy = buffer + cache;
24,077✔
1318
    int64_t cacheLast = (int64_t)pDb->cfg.cacheLastSize * 1024 * 1024;
24,077✔
1319
    if (pDb->cfg.cacheLast > 0) {
24,077✔
1320
      vgroupMemroy += cacheLast;
2,373✔
1321
    }
1322
    mDebug("db:%s, vgroup:%d, buffer:%" PRId64 " cache:%" PRId64 " cacheLast:%" PRId64, pDb->name, pVgroup->vgId,
24,077✔
1323
           buffer, cache, cacheLast);
1324
  }
1325

1326
  if (pDbInput == NULL) {
24,077✔
1327
    mndReleaseDb(pMnode, pDb);
11,363✔
1328
  }
1329
  return vgroupMemroy;
24,077✔
1330
}
1331

1332
static bool mndGetVnodeMemroyFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
14,619✔
1333
  SVgObj  *pVgroup = pObj;
14,619✔
1334
  int32_t  dnodeId = *(int32_t *)p1;
14,619✔
1335
  int64_t *pVnodeMemory = (int64_t *)p2;
14,619✔
1336

1337
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
36,975✔
1338
    if (pVgroup->vnodeGid[v].dnodeId == dnodeId) {
22,356✔
1339
      *pVnodeMemory += mndGetVgroupMemory(pMnode, NULL, pVgroup);
10,987✔
1340
    }
1341
  }
1342

1343
  return true;
14,619✔
1344
}
1345

1346
int64_t mndGetVnodesMemory(SMnode *pMnode, int32_t dnodeId) {
6,971✔
1347
  int64_t vnodeMemory = 0;
6,971✔
1348
  sdbTraverse(pMnode->pSdb, SDB_VGROUP, mndGetVnodeMemroyFp, &dnodeId, &vnodeMemory, NULL);
6,971✔
1349
  return vnodeMemory;
6,971✔
1350
}
1351

1352
void calculateRstoreFinishTime(double rate, int64_t applyCount, char *restoreStr, size_t restoreStrSize) {
2✔
1353
  if (rate == 0) {
2!
1354
    snprintf(restoreStr, restoreStrSize, "0:0:0");
2✔
1355
    return;
2✔
1356
  }
1357

1358
  int64_t costTime = applyCount / rate;
×
1359
  int64_t totalSeconds = costTime / 1000;
×
1360
  int64_t hours = totalSeconds / 3600;
×
1361
  totalSeconds %= 3600;
×
1362
  int64_t minutes = totalSeconds / 60;
×
1363
  int64_t seconds = totalSeconds % 60;
×
1364
  snprintf(restoreStr, restoreStrSize, "%" PRId64 ":%" PRId64 ":%" PRId64, hours, minutes, seconds);
×
1365
}
1366

1367
static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
111✔
1368
  SMnode *pMnode = pReq->info.node;
111✔
1369
  SSdb   *pSdb = pMnode->pSdb;
111✔
1370
  int32_t numOfRows = 0;
111✔
1371
  SVgObj *pVgroup = NULL;
111✔
1372
  int32_t cols = 0;
111✔
1373
  int64_t curMs = taosGetTimestampMs();
111✔
1374
  int32_t code = 0;
111✔
1375

1376
  while (numOfRows < rows - TSDB_MAX_REPLICA) {
395!
1377
    pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
395✔
1378
    if (pShow->pIter == NULL) break;
395✔
1379

1380
    for (int32_t i = 0; i < pVgroup->replica && numOfRows < rows; ++i) {
944!
1381
      SVnodeGid       *pGid = &pVgroup->vnodeGid[i];
660✔
1382
      SColumnInfoData *pColInfo = NULL;
660✔
1383
      cols = 0;
660✔
1384

1385
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
660✔
1386
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pGid->dnodeId, false);
660✔
1387
      if (code != 0) {
660!
1388
        mError("vgId:%d, failed to set dnodeId, since %s", pVgroup->vgId, tstrerror(code));
×
1389
        return code;
×
1390
      }
1391
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
660✔
1392
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->vgId, false);
660✔
1393
      if (code != 0) {
660!
1394
        mError("vgId:%d, failed to set vgId, since %s", pVgroup->vgId, tstrerror(code));
×
1395
        return code;
×
1396
      }
1397

1398
      // db_name
1399
      const char *dbname = mndGetDbStr(pVgroup->dbName);
660✔
1400
      char        b1[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
660✔
1401
      if (dbname != NULL) {
660!
1402
        STR_WITH_MAXSIZE_TO_VARSTR(b1, dbname, TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE);
660✔
1403
      } else {
1404
        STR_WITH_MAXSIZE_TO_VARSTR(b1, "NULL", TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE);
×
1405
      }
1406
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
660✔
1407
      code = colDataSetVal(pColInfo, numOfRows, (const char *)b1, false);
660✔
1408
      if (code != 0) {
660!
1409
        mError("vgId:%d, failed to set dbName, since %s", pVgroup->vgId, tstrerror(code));
×
1410
        return code;
×
1411
      }
1412

1413
      // dnode is online?
1414
      SDnodeObj *pDnode = mndAcquireDnode(pMnode, pGid->dnodeId);
660✔
1415
      if (pDnode == NULL) {
660!
1416
        mError("failed to acquire dnode. dnodeId:%d", pGid->dnodeId);
×
1417
        break;
×
1418
      }
1419
      bool isDnodeOnline = mndIsDnodeOnline(pDnode, curMs);
660✔
1420

1421
      char       buf[20] = {0};
660✔
1422
      ESyncState syncState = (isDnodeOnline) ? pGid->syncState : TAOS_SYNC_STATE_OFFLINE;
660✔
1423
      STR_TO_VARSTR(buf, syncStr(syncState));
660✔
1424
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
660✔
1425
      code = colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
660✔
1426
      if (code != 0) {
660!
1427
        mError("vgId:%d, failed to set syncState, since %s", pVgroup->vgId, tstrerror(code));
×
1428
        return code;
×
1429
      }
1430

1431
      int64_t roleTimeMs = (isDnodeOnline) ? pGid->roleTimeMs : 0;
660✔
1432
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
660✔
1433
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&roleTimeMs, false);
660✔
1434
      if (code != 0) {
660!
1435
        mError("vgId:%d, failed to set roleTimeMs, since %s", pVgroup->vgId, tstrerror(code));
×
1436
        return code;
×
1437
      }
1438

1439
      int64_t startTimeMs = (isDnodeOnline) ? pGid->startTimeMs : 0;
660✔
1440
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
660✔
1441
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&startTimeMs, false);
660✔
1442
      if (code != 0) {
660!
1443
        mError("vgId:%d, failed to set startTimeMs, since %s", pVgroup->vgId, tstrerror(code));
×
1444
        return code;
×
1445
      }
1446

1447
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
660✔
1448
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pGid->syncRestore, false);
660✔
1449
      if (code != 0) {
660!
1450
        mError("vgId:%d, failed to set syncRestore, since %s", pVgroup->vgId, tstrerror(code));
×
1451
        return code;
×
1452
      }
1453

1454
      int64_t unappliedCount = pGid->syncCommitIndex - pGid->syncAppliedIndex;
660✔
1455
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
660✔
1456
      char restoreStr[20] = {0};
660✔
1457
      if (unappliedCount > 0) {
660✔
1458
        calculateRstoreFinishTime(pGid->appliedRate, unappliedCount, restoreStr, sizeof(restoreStr));
2✔
1459
      }
1460
      STR_TO_VARSTR(buf, restoreStr);
660✔
1461
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&buf, false);
660✔
1462
      if (code != 0) {
660!
1463
        mError("vgId:%d, failed to set syncRestore finish time, since %s", pVgroup->vgId, tstrerror(code));
×
1464
        return code;
×
1465
      }
1466

1467
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
660✔
1468
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&unappliedCount, false);
660✔
1469
      if (code != 0) {
660!
1470
        mError("vgId:%d, failed to set syncRestore, since %s", pVgroup->vgId, tstrerror(code));
×
1471
        return code;
×
1472
      }
1473

1474
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
660✔
1475
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pGid->bufferSegmentUsed, false);
660✔
1476
      if (code != 0) {
660!
1477
        mError("vgId:%d, failed to set buffer segment used, since %s", pVgroup->vgId, tstrerror(code));
×
1478
        return code;
×
1479
      }
1480

1481
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
660✔
1482
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pGid->bufferSegmentSize, false);
660✔
1483
      if (code != 0) {
660!
1484
        mError("vgId:%d, failed to set buffer segment size, since %s", pVgroup->vgId, tstrerror(code));
×
1485
        return code;
×
1486
      }
1487

1488
      numOfRows++;
660✔
1489
      sdbRelease(pSdb, pDnode);
660✔
1490
    }
1491

1492
    sdbRelease(pSdb, pVgroup);
284✔
1493
  }
1494

1495
  pShow->numOfRows += numOfRows;
111✔
1496
  return numOfRows;
111✔
1497
}
1498

1499
static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter) {
×
1500
  SSdb *pSdb = pMnode->pSdb;
×
1501
  sdbCancelFetchByType(pSdb, pIter, SDB_VGROUP);
×
1502
}
×
1503

1504
static int32_t mndAddVnodeToVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray) {
210✔
1505
  int32_t code = 0;
210✔
1506
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
210✔
1507
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
797✔
1508
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
587✔
1509
    mInfo("trans:%d, dnode:%d, equivalent vnodes:%d others:%d", pTrans->id, pDnode->id, pDnode->numOfVnodes,
587!
1510
          pDnode->numOfOtherNodes);
1511
  }
1512

1513
  SVnodeGid *pVgid = &pVgroup->vnodeGid[pVgroup->replica];
210✔
1514
  for (int32_t d = 0; d < taosArrayGetSize(pArray); ++d) {
269✔
1515
    SDnodeObj *pDnode = taosArrayGet(pArray, d);
256✔
1516

1517
    bool used = false;
256✔
1518
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
610✔
1519
      if (pDnode->id == pVgroup->vnodeGid[vn].dnodeId) {
413✔
1520
        used = true;
59✔
1521
        break;
59✔
1522
      }
1523
    }
1524
    if (used) continue;
256✔
1525

1526
    if (pDnode == NULL) {
197!
1527
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_DNODES);
×
1528
    }
1529
    if (pDnode->numOfVnodes >= pDnode->numOfSupportVnodes) {
197!
1530
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_VNODES);
×
1531
    }
1532

1533
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
197✔
1534
    if (pDnode->memAvail - vgMem - pDnode->memUsed <= 0) {
197!
1535
      mError("trans:%d, db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
1536
             pTrans->id, pVgroup->dbName, pVgroup->vgId, vgMem, pDnode->id, pDnode->memAvail, pDnode->memUsed);
1537
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE);
×
1538
    } else {
1539
      pDnode->memUsed += vgMem;
197✔
1540
    }
1541

1542
    pVgid->dnodeId = pDnode->id;
197✔
1543
    pVgid->syncState = TAOS_SYNC_STATE_OFFLINE;
197✔
1544
    mInfo("trans:%id, db:%s, vgId:%d, vn:%d is added, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
197!
1545
          pTrans->id, pVgroup->dbName, pVgroup->vgId, pVgroup->replica, vgMem, pVgid->dnodeId, pDnode->memAvail,
1546
          pDnode->memUsed);
1547

1548
    pVgroup->replica++;
197✔
1549
    pDnode->numOfVnodes++;
197✔
1550

1551
    SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
197✔
1552
    if (pVgRaw == NULL) {
197!
1553
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1554
      if (terrno != 0) code = terrno;
×
1555
      TAOS_RETURN(code);
×
1556
    }
1557
    if ((code = mndTransAppendGroupRedolog(pTrans, pVgRaw, pVgroup->vgId)) != 0) {
197!
1558
      sdbFreeRaw(pVgRaw);
×
1559
      TAOS_RETURN(code);
×
1560
    }
1561
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
197✔
1562
    if (code != 0) {
197!
1563
      mError("trans:%d, vgId:%d, failed to set raw status since %s at line:%d", pTrans->id, pVgroup->vgId,
×
1564
             tstrerror(code), __LINE__);
1565
    }
1566
    TAOS_RETURN(code);
197✔
1567
  }
1568

1569
  code = TSDB_CODE_MND_NO_ENOUGH_DNODES;
13✔
1570
  mError("trans:%d, db:%s, failed to add vnode to vgId:%d since %s", pTrans->id, pVgroup->dbName, pVgroup->vgId,
13!
1571
         tstrerror(code));
1572
  TAOS_RETURN(code);
13✔
1573
}
1574

1575
static int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray,
40✔
1576
                                        SVnodeGid *pDelVgid) {
1577
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
40✔
1578
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
173✔
1579
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
133✔
1580
    mInfo("trans:%d, dnode:%d, equivalent vnodes:%d others:%d", pTrans->id, pDnode->id, pDnode->numOfVnodes,
133!
1581
          pDnode->numOfOtherNodes);
1582
  }
1583

1584
  int32_t code = -1;
40✔
1585
  for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) {
53!
1586
    SDnodeObj *pDnode = taosArrayGet(pArray, d);
53✔
1587

1588
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
123✔
1589
      SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
110✔
1590
      if (pVgid->dnodeId == pDnode->id) {
110✔
1591
        int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
40✔
1592
        pDnode->memUsed -= vgMem;
40✔
1593
        mInfo("trans:%d, db:%s, vgId:%d, vn:%d is removed, memory:%" PRId64 ", dnode:%d avail:%" PRId64
40!
1594
              " used:%" PRId64,
1595
              pTrans->id, pVgroup->dbName, pVgroup->vgId, vn, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
1596
        pDnode->numOfVnodes--;
40✔
1597
        pVgroup->replica--;
40✔
1598
        *pDelVgid = *pVgid;
40✔
1599
        *pVgid = pVgroup->vnodeGid[pVgroup->replica];
40✔
1600
        memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
40✔
1601
        code = 0;
40✔
1602
        goto _OVER;
40✔
1603
      }
1604
    }
1605
  }
1606

1607
_OVER:
×
1608
  if (code != 0) {
40!
1609
    code = TSDB_CODE_APP_ERROR;
×
1610
    mError("trans:%d, db:%s, failed to remove vnode from vgId:%d since %s", pTrans->id, pVgroup->dbName, pVgroup->vgId,
×
1611
           tstrerror(code));
1612
    TAOS_RETURN(code);
×
1613
  }
1614

1615
  for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
112✔
1616
    SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
72✔
1617
    mInfo("trans:%d, db:%s, vgId:%d, vn:%d dnode:%d is reserved", pTrans->id, pVgroup->dbName, pVgroup->vgId, vn,
72!
1618
          pVgid->dnodeId);
1619
  }
1620

1621
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
40✔
1622
  if (pVgRaw == NULL) {
40!
1623
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1624
    if (terrno != 0) code = terrno;
×
1625
    TAOS_RETURN(code);
×
1626
  }
1627
  if (mndTransAppendGroupRedolog(pTrans, pVgRaw, pVgroup->vgId) != 0) {
40!
1628
    sdbFreeRaw(pVgRaw);
×
1629
    TAOS_RETURN(code);
×
1630
  }
1631
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
40✔
1632
  if (code != 0) {
40!
1633
    mError("trans:%d, vgId:%d, failed to set raw status since %s at line:%d", pTrans->id, pVgroup->vgId,
×
1634
           tstrerror(code), __LINE__);
1635
  }
1636

1637
  TAOS_RETURN(code);
40✔
1638
}
1639

1640
static int32_t mndRemoveVnodeFromVgroupWithoutSave(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray,
×
1641
                                                   SVnodeGid *pDelVgid) {
1642
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
×
1643
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
×
1644
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
×
1645
    mInfo("dnode:%d, equivalent vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes);
×
1646
  }
1647

1648
  int32_t code = -1;
×
1649
  for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) {
×
1650
    SDnodeObj *pDnode = taosArrayGet(pArray, d);
×
1651

1652
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
×
1653
      SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
×
1654
      if (pVgid->dnodeId == pDnode->id) {
×
1655
        int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
×
1656
        pDnode->memUsed -= vgMem;
×
1657
        mInfo("db:%s, vgId:%d, vn:%d is removed, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
×
1658
              pVgroup->dbName, pVgroup->vgId, vn, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
1659
        pDnode->numOfVnodes--;
×
1660
        pVgroup->replica--;
×
1661
        *pDelVgid = *pVgid;
×
1662
        *pVgid = pVgroup->vnodeGid[pVgroup->replica];
×
1663
        memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
×
1664
        code = 0;
×
1665
        goto _OVER;
×
1666
      }
1667
    }
1668
  }
1669

1670
_OVER:
×
1671
  if (code != 0) {
×
1672
    code = TSDB_CODE_APP_ERROR;
×
1673
    mError("db:%s, failed to remove vnode from vgId:%d since %s", pVgroup->dbName, pVgroup->vgId, tstrerror(code));
×
1674
    TAOS_RETURN(code);
×
1675
  }
1676

1677
  for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
×
1678
    SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
×
1679
    mInfo("db:%s, vgId:%d, vn:%d dnode:%d is reserved", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId);
×
1680
  }
1681

1682
  TAOS_RETURN(code);
×
1683
}
1684

1685
int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid) {
11,270✔
1686
  int32_t      code = 0;
11,270✔
1687
  STransAction action = {0};
11,270✔
1688

1689
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
11,270✔
1690
  if (pDnode == NULL) return -1;
11,270!
1691
  action.epSet = mndGetDnodeEpset(pDnode);
11,270✔
1692
  mndReleaseDnode(pMnode, pDnode);
11,270✔
1693

1694
  int32_t contLen = 0;
11,270✔
1695
  void   *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
11,270✔
1696
  if (pReq == NULL) return -1;
11,270!
1697

1698
  action.pCont = pReq;
11,270✔
1699
  action.contLen = contLen;
11,270✔
1700
  action.msgType = TDMT_DND_CREATE_VNODE;
11,270✔
1701
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
11,270✔
1702
  action.groupId = pVgroup->vgId;
11,270✔
1703

1704
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
11,270!
1705
    taosMemoryFree(pReq);
×
1706
    TAOS_RETURN(code);
×
1707
  }
1708

1709
  TAOS_RETURN(code);
11,270✔
1710
}
1711

1712
int32_t mndRestoreAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
10✔
1713
                                       SDnodeObj *pDnode) {
1714
  int32_t      code = 0;
10✔
1715
  STransAction action = {0};
10✔
1716

1717
  action.epSet = mndGetDnodeEpset(pDnode);
10✔
1718

1719
  int32_t contLen = 0;
10✔
1720
  void   *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
10✔
1721
  if (pReq == NULL) {
10!
1722
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1723
    if (terrno != 0) code = terrno;
×
1724
    TAOS_RETURN(code);
×
1725
  }
1726

1727
  action.pCont = pReq;
10✔
1728
  action.contLen = contLen;
10✔
1729
  action.msgType = TDMT_DND_CREATE_VNODE;
10✔
1730
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
10✔
1731
  action.groupId = pVgroup->vgId;
10✔
1732

1733
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
10!
1734
    taosMemoryFree(pReq);
×
1735
    TAOS_RETURN(code);
×
1736
  }
1737

1738
  TAOS_RETURN(code);
10✔
1739
}
1740

1741
int32_t mndAddAlterVnodeConfirmAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
705✔
1742
  int32_t      code = 0;
705✔
1743
  STransAction action = {0};
705✔
1744
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
705✔
1745

1746
  mInfo("trans:%d, vgId:%d, build alter vnode confirm req", pTrans->id, pVgroup->vgId);
705!
1747
  int32_t   contLen = sizeof(SMsgHead);
705✔
1748
  SMsgHead *pHead = taosMemoryMalloc(contLen);
705!
1749
  if (pHead == NULL) {
705!
1750
    TAOS_RETURN(terrno);
×
1751
  }
1752

1753
  pHead->contLen = htonl(contLen);
705✔
1754
  pHead->vgId = htonl(pVgroup->vgId);
705✔
1755

1756
  action.pCont = pHead;
705✔
1757
  action.contLen = contLen;
705✔
1758
  action.msgType = TDMT_VND_ALTER_CONFIRM;
705✔
1759
  // incorrect redirect result will cause this erro
1760
  action.retryCode = TSDB_CODE_VND_INVALID_VGROUP_ID;
705✔
1761
  action.groupId = pVgroup->vgId;
705✔
1762

1763
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
705!
1764
    taosMemoryFree(pHead);
×
1765
    TAOS_RETURN(code);
×
1766
  }
1767

1768
  TAOS_RETURN(code);
705✔
1769
}
1770

1771
int32_t mndAddChangeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pOldVgroup, SVgObj *pNewVgroup,
×
1772
                                 int32_t dnodeId) {
1773
  int32_t      code = 0;
×
1774
  STransAction action = {0};
×
1775
  action.epSet = mndGetVgroupEpset(pMnode, pNewVgroup);
×
1776

1777
  int32_t contLen = 0;
×
1778
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pNewVgroup, dnodeId, &contLen);
×
1779
  if (pReq == NULL) {
×
1780
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1781
    if (terrno != 0) code = terrno;
×
1782
    TAOS_RETURN(code);
×
1783
  }
1784

1785
  int32_t totallen = contLen + sizeof(SMsgHead);
×
1786

1787
  SMsgHead *pHead = taosMemoryMalloc(totallen);
×
1788
  if (pHead == NULL) {
×
1789
    taosMemoryFree(pReq);
×
1790
    TAOS_RETURN(terrno);
×
1791
  }
1792

1793
  pHead->contLen = htonl(totallen);
×
1794
  pHead->vgId = htonl(pNewVgroup->vgId);
×
1795

1796
  memcpy((void *)(pHead + 1), pReq, contLen);
×
1797
  taosMemoryFree(pReq);
×
1798

1799
  action.pCont = pHead;
×
1800
  action.contLen = totallen;
×
1801
  action.msgType = TDMT_SYNC_CONFIG_CHANGE;
×
1802

1803
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
1804
    taosMemoryFree(pHead);
×
1805
    TAOS_RETURN(code);
×
1806
  }
1807

1808
  TAOS_RETURN(code);
×
1809
}
1810

1811
static int32_t mndAddAlterVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, int32_t srcVgId, SVgObj *pVgroup) {
56✔
1812
  int32_t      code = 0;
56✔
1813
  STransAction action = {0};
56✔
1814
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
56✔
1815

1816
  int32_t contLen = 0;
56✔
1817
  void   *pReq = mndBuildAlterVnodeHashRangeReq(pMnode, srcVgId, pVgroup, &contLen);
56✔
1818
  if (pReq == NULL) {
56!
1819
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1820
    if (terrno != 0) code = terrno;
×
1821
    TAOS_RETURN(code);
×
1822
  }
1823

1824
  action.pCont = pReq;
56✔
1825
  action.contLen = contLen;
56✔
1826
  action.msgType = TDMT_VND_ALTER_HASHRANGE;
56✔
1827
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
56✔
1828

1829
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
56!
1830
    taosMemoryFree(pReq);
×
1831
    TAOS_RETURN(code);
×
1832
  }
1833

1834
  mInfo("trans:%d, add alter vnode hash range action for from vgId:%d to vgId:%d", pTrans->id, srcVgId, pVgroup->vgId);
56!
1835
  TAOS_RETURN(code);
56✔
1836
}
1837

1838
int32_t mndAddAlterVnodeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
483✔
1839
  int32_t      code = 0;
483✔
1840
  STransAction action = {0};
483✔
1841
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
483✔
1842

1843
  int32_t contLen = 0;
483✔
1844
  void   *pReq = mndBuildAlterVnodeConfigReq(pMnode, pDb, pVgroup, &contLen);
483✔
1845
  if (pReq == NULL) {
483!
1846
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1847
    if (terrno != 0) code = terrno;
×
1848
    TAOS_RETURN(code);
×
1849
  }
1850

1851
  action.pCont = pReq;
483✔
1852
  action.contLen = contLen;
483✔
1853
  action.msgType = TDMT_VND_ALTER_CONFIG;
483✔
1854
  action.groupId = pVgroup->vgId;
483✔
1855

1856
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
483!
1857
    taosMemoryFree(pReq);
×
1858
    TAOS_RETURN(code);
×
1859
  }
1860

1861
  TAOS_RETURN(code);
483✔
1862
}
1863

1864
int32_t mndAddNewVgPrepareAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVg) {
10,009✔
1865
  int32_t  code = 0;
10,009✔
1866
  SSdbRaw *pRaw = mndVgroupActionEncode(pVg);
10,009✔
1867
  if (pRaw == NULL) {
10,009!
1868
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1869
    if (terrno != 0) code = terrno;
×
1870
    goto _err;
×
1871
  }
1872

1873
  TAOS_CHECK_GOTO(mndTransAppendPrepareLog(pTrans, pRaw), NULL, _err);
10,009!
1874
  if (sdbSetRawStatus(pRaw, SDB_STATUS_CREATING) != 0) {
10,009!
1875
    mError("vgId:%d, failed to set raw status at line:%d", pVg->vgId, __LINE__);
×
1876
  }
1877
  if (code != 0) {
10,009!
1878
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVg->vgId, tstrerror(code), __LINE__);
×
1879
    TAOS_RETURN(code);
×
1880
  }
1881
  pRaw = NULL;
10,009✔
1882
  TAOS_RETURN(code);
10,009✔
1883

1884
_err:
×
1885
  sdbFreeRaw(pRaw);
×
1886
  TAOS_RETURN(code);
×
1887
}
1888

1889
int32_t mndAddAlterVnodeReplicaAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
1,887✔
1890
  int32_t    code = 0;
1,887✔
1891
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
1,887✔
1892
  if (pDnode == NULL) {
1,887!
1893
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1894
    if (terrno != 0) code = terrno;
×
1895
    TAOS_RETURN(code);
×
1896
  }
1897

1898
  STransAction action = {0};
1,887✔
1899
  action.epSet = mndGetDnodeEpset(pDnode);
1,887✔
1900
  mndReleaseDnode(pMnode, pDnode);
1,887✔
1901

1902
  int32_t contLen = 0;
1,887✔
1903
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
1,887✔
1904
  if (pReq == NULL) {
1,887!
1905
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1906
    if (terrno != 0) code = terrno;
×
1907
    TAOS_RETURN(code);
×
1908
  }
1909

1910
  action.pCont = pReq;
1,887✔
1911
  action.contLen = contLen;
1,887✔
1912
  action.msgType = TDMT_VND_ALTER_REPLICA;
1,887✔
1913
  action.groupId = pVgroup->vgId;
1,887✔
1914

1915
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
1,887!
1916
    taosMemoryFree(pReq);
×
1917
    TAOS_RETURN(code);
×
1918
  }
1919

1920
  TAOS_RETURN(code);
1,887✔
1921
}
1922

1923
int32_t mndAddCheckLearnerCatchupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
×
1924
  int32_t    code = 0;
×
1925
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
×
1926
  if (pDnode == NULL) {
×
1927
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1928
    if (terrno != 0) code = terrno;
×
1929
    TAOS_RETURN(code);
×
1930
  }
1931

1932
  STransAction action = {0};
×
1933
  action.epSet = mndGetDnodeEpset(pDnode);
×
1934
  mndReleaseDnode(pMnode, pDnode);
×
1935

1936
  int32_t contLen = 0;
×
1937
  void   *pReq = mndBuildCheckLearnCatchupReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
×
1938
  if (pReq == NULL) {
×
1939
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1940
    if (terrno != 0) code = terrno;
×
1941
    TAOS_RETURN(code);
×
1942
  }
1943

1944
  action.pCont = pReq;
×
1945
  action.contLen = contLen;
×
1946
  action.msgType = TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP;
×
1947
  action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
1948
  action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
×
1949

1950
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
1951
    taosMemoryFree(pReq);
×
1952
    TAOS_RETURN(code);
×
1953
  }
1954

1955
  TAOS_RETURN(code);
×
1956
}
1957

1958
int32_t mndAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
256✔
1959
  int32_t    code = 0;
256✔
1960
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
256✔
1961
  if (pDnode == NULL) {
256!
1962
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1963
    if (terrno != 0) code = terrno;
×
1964
    TAOS_RETURN(code);
×
1965
  }
1966

1967
  STransAction action = {0};
256✔
1968
  action.epSet = mndGetDnodeEpset(pDnode);
256✔
1969
  mndReleaseDnode(pMnode, pDnode);
256✔
1970

1971
  int32_t contLen = 0;
256✔
1972
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
256✔
1973
  if (pReq == NULL) {
256!
1974
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1975
    if (terrno != 0) code = terrno;
×
1976
    TAOS_RETURN(code);
×
1977
  }
1978

1979
  action.pCont = pReq;
256✔
1980
  action.contLen = contLen;
256✔
1981
  action.msgType = TDMT_DND_ALTER_VNODE_TYPE;
256✔
1982
  action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
256✔
1983
  action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
256✔
1984
  action.groupId = pVgroup->vgId;
256✔
1985

1986
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
256!
1987
    taosMemoryFree(pReq);
×
1988
    TAOS_RETURN(code);
×
1989
  }
1990

1991
  TAOS_RETURN(code);
256✔
1992
}
1993

1994
int32_t mndRestoreAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
10✔
1995
                                          SDnodeObj *pDnode) {
1996
  int32_t      code = 0;
10✔
1997
  STransAction action = {0};
10✔
1998
  action.epSet = mndGetDnodeEpset(pDnode);
10✔
1999

2000
  int32_t contLen = 0;
10✔
2001
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, pDnode->id, &contLen);
10✔
2002
  if (pReq == NULL) {
10!
2003
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2004
    if (terrno != 0) code = terrno;
×
2005
    TAOS_RETURN(code);
×
2006
  }
2007

2008
  action.pCont = pReq;
10✔
2009
  action.contLen = contLen;
10✔
2010
  action.msgType = TDMT_DND_ALTER_VNODE_TYPE;
10✔
2011
  action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
10✔
2012
  action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
10✔
2013
  action.groupId = pVgroup->vgId;
10✔
2014

2015
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
10!
2016
    taosMemoryFree(pReq);
×
2017
    TAOS_RETURN(code);
×
2018
  }
2019

2020
  TAOS_RETURN(code);
10✔
2021
}
2022

2023
static int32_t mndAddDisableVnodeWriteAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
56✔
2024
                                             int32_t dnodeId) {
2025
  int32_t    code = 0;
56✔
2026
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
56✔
2027
  if (pDnode == NULL) {
56!
2028
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2029
    if (terrno != 0) code = terrno;
×
2030
    TAOS_RETURN(code);
×
2031
  }
2032

2033
  STransAction action = {0};
56✔
2034
  action.epSet = mndGetDnodeEpset(pDnode);
56✔
2035
  mndReleaseDnode(pMnode, pDnode);
56✔
2036

2037
  int32_t contLen = 0;
56✔
2038
  void   *pReq = mndBuildDisableVnodeWriteReq(pMnode, pDb, pVgroup->vgId, &contLen);
56✔
2039
  if (pReq == NULL) {
56!
2040
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2041
    if (terrno != 0) code = terrno;
×
2042
    TAOS_RETURN(code);
×
2043
  }
2044

2045
  action.pCont = pReq;
56✔
2046
  action.contLen = contLen;
56✔
2047
  action.msgType = TDMT_VND_DISABLE_WRITE;
56✔
2048

2049
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
56!
2050
    taosMemoryFree(pReq);
×
2051
    TAOS_RETURN(code);
×
2052
  }
2053

2054
  TAOS_RETURN(code);
56✔
2055
}
2056

2057
int32_t mndAddDropVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid,
15,508✔
2058
                              bool isRedo) {
2059
  int32_t      code = 0;
15,508✔
2060
  STransAction action = {0};
15,508✔
2061

2062
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
15,508✔
2063
  if (pDnode == NULL) {
15,508!
2064
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2065
    if (terrno != 0) code = terrno;
×
2066
    TAOS_RETURN(code);
×
2067
  }
2068
  action.epSet = mndGetDnodeEpset(pDnode);
15,508✔
2069
  mndReleaseDnode(pMnode, pDnode);
15,508✔
2070

2071
  int32_t contLen = 0;
15,508✔
2072
  void   *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
15,508✔
2073
  if (pReq == NULL) {
15,508!
2074
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2075
    if (terrno != 0) code = terrno;
×
2076
    TAOS_RETURN(code);
×
2077
  }
2078

2079
  action.pCont = pReq;
15,508✔
2080
  action.contLen = contLen;
15,508✔
2081
  action.msgType = TDMT_DND_DROP_VNODE;
15,508✔
2082
  action.acceptableCode = TSDB_CODE_VND_NOT_EXIST;
15,508✔
2083
  action.groupId = pVgroup->vgId;
15,508✔
2084

2085
  if (isRedo) {
15,508✔
2086
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
4,614!
2087
      taosMemoryFree(pReq);
×
2088
      TAOS_RETURN(code);
×
2089
    }
2090
  } else {
2091
    if ((code = mndTransAppendUndoAction(pTrans, &action)) != 0) {
10,894!
2092
      taosMemoryFree(pReq);
×
2093
      TAOS_RETURN(code);
×
2094
    }
2095
  }
2096

2097
  TAOS_RETURN(code);
15,508✔
2098
}
2099

2100
int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t vnIndex,
59✔
2101
                                    SArray *pArray, bool force, bool unsafe) {
2102
  int32_t code = 0;
59✔
2103
  SVgObj  newVg = {0};
59✔
2104
  memcpy(&newVg, pVgroup, sizeof(SVgObj));
59✔
2105

2106
  mInfo("vgId:%d, trans:%d, vgroup info before move, replica:%d", newVg.vgId, pTrans->id, newVg.replica);
59!
2107
  for (int32_t i = 0; i < newVg.replica; ++i) {
188✔
2108
    mInfo("vgId:%d, trans:%d, vnode:%d dnode:%d", newVg.vgId, pTrans->id, i, newVg.vnodeGid[i].dnodeId);
129!
2109
  }
2110

2111
  if (!force) {
59✔
2112
#if 1
2113
    {
2114
#else
2115
    if (newVg.replica == 1) {
2116
#endif
2117
      mInfo("vgId:%d, trans:%d, will add 1 vnode, replca:%d", pVgroup->vgId, pTrans->id, newVg.replica);
51!
2118
      TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray));
51✔
2119
      for (int32_t i = 0; i < newVg.replica - 1; ++i) {
162✔
2120
        TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId));
112!
2121
      }
2122
      TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[newVg.replica - 1]));
50!
2123
      TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg));
50!
2124

2125
      mInfo("vgId:%d, trans:%d, will remove 1 vnode, replca:2", pVgroup->vgId, pTrans->id);
50!
2126
      newVg.replica--;
50✔
2127
      SVnodeGid del = newVg.vnodeGid[vnIndex];
50✔
2128
      newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
50✔
2129
      memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
50✔
2130
      {
2131
        SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
50✔
2132
        if (pRaw == NULL) {
50!
2133
          code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2134
          if (terrno != 0) code = terrno;
×
2135
          TAOS_RETURN(code);
×
2136
        }
2137
        if ((code = mndTransAppendGroupRedolog(pTrans, pRaw, pVgroup->vgId)) != 0) {
50!
2138
          sdbFreeRaw(pRaw);
×
2139
          TAOS_RETURN(code);
×
2140
        }
2141
        code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
50✔
2142
        if (code != 0) {
50!
2143
          mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2144
          return code;
×
2145
        }
2146
      }
2147

2148
      TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true));
50!
2149
      for (int32_t i = 0; i < newVg.replica; ++i) {
162✔
2150
        TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId));
112!
2151
      }
2152
      TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg));
50!
2153
#if 1
2154
    }
2155
#else
2156
    } else {  // new replica == 3
2157
      mInfo("vgId:%d, will add 1 vnode, replca:3", pVgroup->vgId);
2158
      if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1;
2159
      mInfo("vgId:%d, will remove 1 vnode, replca:4", pVgroup->vgId);
2160
      newVg.replica--;
2161
      SVnodeGid del = newVg.vnodeGid[vnIndex];
2162
      newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
2163
      memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
2164
      {
2165
        SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
2166
        if (pRaw == NULL) return -1;
2167
        if (mndTransAppendRedolog(pTrans, pRaw) != 0) {
2168
          sdbFreeRaw(pRaw);
2169
          return -1;
2170
        }
2171
      }
2172

2173
      if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true) != 0) return -1;
2174
      for (int32_t i = 0; i < newVg.replica; ++i) {
2175
        if (i == vnIndex) continue;
2176
        if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1;
2177
      }
2178
      if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[vnIndex]) != 0) return -1;
2179
      if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
2180
    }
2181
#endif
2182
  } else {
2183
    mInfo("vgId:%d, will add 1 vnode and force remove 1 vnode", pVgroup->vgId);
8!
2184
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray));
8!
2185
    newVg.replica--;
8✔
2186
    // SVnodeGid del = newVg.vnodeGid[vnIndex];
2187
    newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
8✔
2188
    memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
8✔
2189
    {
2190
      SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
8✔
2191
      if (pRaw == NULL) {
8!
2192
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2193
        if (terrno != 0) code = terrno;
×
2194
        TAOS_RETURN(code);
×
2195
      }
2196
      if ((code = mndTransAppendGroupRedolog(pTrans, pRaw, pVgroup->vgId)) != 0) {
8!
2197
        sdbFreeRaw(pRaw);
×
2198
        TAOS_RETURN(code);
×
2199
      }
2200
      code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
8✔
2201
      if (code != 0) {
8!
2202
        mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2203
        return code;
×
2204
      }
2205
    }
2206

2207
    for (int32_t i = 0; i < newVg.replica; ++i) {
24✔
2208
      if (i != vnIndex) {
16✔
2209
        TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId));
8!
2210
      }
2211
    }
2212
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[vnIndex]));
8!
2213
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg));
8!
2214

2215
    if (newVg.replica == 1) {
8✔
2216
      if (force && !unsafe) {
4!
2217
        TAOS_RETURN(TSDB_CODE_VND_META_DATA_UNSAFE_DELETE);
2✔
2218
      }
2219

2220
      SSdb *pSdb = pMnode->pSdb;
2✔
2221
      void *pIter = NULL;
2✔
2222

2223
      while (1) {
6✔
2224
        SStbObj *pStb = NULL;
8✔
2225
        pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
8✔
2226
        if (pIter == NULL) break;
8✔
2227

2228
        if (strcmp(pStb->db, pDb->name) == 0) {
6✔
2229
          if ((code = mndSetForceDropCreateStbRedoActions(pMnode, pTrans, &newVg, pStb)) != 0) {
4!
2230
            sdbCancelFetch(pSdb, pIter);
×
2231
            sdbRelease(pSdb, pStb);
×
2232
            TAOS_RETURN(code);
×
2233
          }
2234
        }
2235

2236
        sdbRelease(pSdb, pStb);
6✔
2237
      }
2238

2239
      mInfo("vgId:%d, all data is dropped since replica=1", pVgroup->vgId);
2!
2240
    }
2241
  }
2242

2243
  {
2244
    SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
56✔
2245
    if (pRaw == NULL) {
56!
2246
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2247
      if (terrno != 0) code = terrno;
×
2248
      TAOS_RETURN(code);
×
2249
    }
2250
    if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
56!
2251
      sdbFreeRaw(pRaw);
×
2252
      TAOS_RETURN(code);
×
2253
    }
2254
    code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
56✔
2255
    if (code != 0) {
56!
2256
      mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2257
      return code;
×
2258
    }
2259
  }
2260

2261
  mInfo("vgId:%d, vgroup info after move, replica:%d", newVg.vgId, newVg.replica);
56!
2262
  for (int32_t i = 0; i < newVg.replica; ++i) {
182✔
2263
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
126!
2264
  }
2265
  TAOS_RETURN(code);
56✔
2266
}
2267

2268
int32_t mndSetMoveVgroupsInfoToTrans(SMnode *pMnode, STrans *pTrans, int32_t delDnodeId, bool force, bool unsafe) {
27✔
2269
  int32_t code = 0;
27✔
2270
  SArray *pArray = mndBuildDnodesArray(pMnode, delDnodeId, NULL);
27✔
2271
  if (pArray == NULL) {
27!
2272
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2273
    if (terrno != 0) code = terrno;
×
2274
    TAOS_RETURN(code);
×
2275
  }
2276

2277
  void *pIter = NULL;
27✔
2278
  while (1) {
78✔
2279
    SVgObj *pVgroup = NULL;
105✔
2280
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
105✔
2281
    if (pIter == NULL) break;
105✔
2282

2283
    int32_t vnIndex = -1;
81✔
2284
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
146✔
2285
      if (pVgroup->vnodeGid[i].dnodeId == delDnodeId) {
124✔
2286
        vnIndex = i;
59✔
2287
        break;
59✔
2288
      }
2289
    }
2290

2291
    code = 0;
81✔
2292
    if (vnIndex != -1) {
81✔
2293
      mInfo("vgId:%d, trans:%d, vnode:%d will be removed from dnode:%d, force:%d", pVgroup->vgId, pTrans->id, vnIndex,
59!
2294
            delDnodeId, force);
2295
      SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
59✔
2296
      code = mndSetMoveVgroupInfoToTrans(pMnode, pTrans, pDb, pVgroup, vnIndex, pArray, force, unsafe);
59✔
2297
      mndReleaseDb(pMnode, pDb);
59✔
2298
    }
2299

2300
    sdbRelease(pMnode->pSdb, pVgroup);
81✔
2301

2302
    if (code != 0) {
81✔
2303
      sdbCancelFetch(pMnode->pSdb, pIter);
3✔
2304
      break;
3✔
2305
    }
2306
  }
2307

2308
  taosArrayDestroy(pArray);
27✔
2309
  TAOS_RETURN(code);
27✔
2310
}
2311

2312
static int32_t mndAddIncVgroupReplicaToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
179✔
2313
                                             int32_t newDnodeId) {
2314
  int32_t code = 0;
179✔
2315
  mInfo("vgId:%d, will add 1 vnode, replica:%d dnode:%d", pVgroup->vgId, pVgroup->replica, newDnodeId);
179!
2316

2317
  // assoc dnode
2318
  SVnodeGid *pGid = &pVgroup->vnodeGid[pVgroup->replica];
179✔
2319
  pVgroup->replica++;
179✔
2320
  pGid->dnodeId = newDnodeId;
179✔
2321
  pGid->syncState = TAOS_SYNC_STATE_OFFLINE;
179✔
2322
  pGid->nodeRole = TAOS_SYNC_ROLE_LEARNER;
179✔
2323

2324
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
179✔
2325
  if (pVgRaw == NULL) {
179!
2326
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2327
    if (terrno != 0) code = terrno;
×
2328
    TAOS_RETURN(code);
×
2329
  }
2330
  if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
179!
2331
    sdbFreeRaw(pVgRaw);
×
2332
    TAOS_RETURN(code);
×
2333
  }
2334
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
179✔
2335
  if (code != 0) {
179!
2336
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVgroup->vgId, tstrerror(code), __LINE__);
×
2337
    TAOS_RETURN(code);
×
2338
  }
2339

2340
  // learner
2341
  for (int32_t i = 0; i < pVgroup->replica - 1; ++i) {
614✔
2342
    TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId));
435!
2343
  }
2344
  TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pGid));
179!
2345

2346
  // voter
2347
  pGid->nodeRole = TAOS_SYNC_ROLE_VOTER;
179✔
2348
  TAOS_CHECK_RETURN(mndAddAlterVnodeTypeAction(pMnode, pTrans, pDb, pVgroup, pGid->dnodeId));
179!
2349
  for (int32_t i = 0; i < pVgroup->replica - 1; ++i) {
614✔
2350
    TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId));
435!
2351
  }
2352

2353
  // confirm
2354
  TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup));
179!
2355

2356
  TAOS_RETURN(code);
179✔
2357
}
2358

2359
static int32_t mndAddDecVgroupReplicaFromTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
179✔
2360
                                               int32_t delDnodeId) {
2361
  int32_t code = 0;
179✔
2362
  mInfo("vgId:%d, will remove 1 vnode, replica:%d dnode:%d", pVgroup->vgId, pVgroup->replica, delDnodeId);
179!
2363

2364
  SVnodeGid *pGid = NULL;
179✔
2365
  SVnodeGid  delGid = {0};
179✔
2366
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
307!
2367
    if (pVgroup->vnodeGid[i].dnodeId == delDnodeId) {
307✔
2368
      pGid = &pVgroup->vnodeGid[i];
179✔
2369
      break;
179✔
2370
    }
2371
  }
2372

2373
  if (pGid == NULL) return 0;
179!
2374

2375
  pVgroup->replica--;
179✔
2376
  memcpy(&delGid, pGid, sizeof(SVnodeGid));
179✔
2377
  memcpy(pGid, &pVgroup->vnodeGid[pVgroup->replica], sizeof(SVnodeGid));
179✔
2378
  memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
179✔
2379

2380
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
179✔
2381
  if (pVgRaw == NULL) {
179!
2382
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2383
    if (terrno != 0) code = terrno;
×
2384
    TAOS_RETURN(code);
×
2385
  }
2386
  if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
179!
2387
    sdbFreeRaw(pVgRaw);
×
2388
    TAOS_RETURN(code);
×
2389
  }
2390
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
179✔
2391
  if (code != 0) {
179!
2392
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVgroup->vgId, tstrerror(code), __LINE__);
×
2393
    TAOS_RETURN(code);
×
2394
  }
2395

2396
  TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pDb, pVgroup, &delGid, true));
179!
2397
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
614✔
2398
    TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId));
435!
2399
  }
2400
  TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup));
179!
2401

2402
  TAOS_RETURN(code);
179✔
2403
}
2404

2405
static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup, SDnodeObj *pNew1,
102✔
2406
                                     SDnodeObj *pOld1, SDnodeObj *pNew2, SDnodeObj *pOld2, SDnodeObj *pNew3,
2407
                                     SDnodeObj *pOld3) {
2408
  int32_t code = -1;
102✔
2409
  STrans *pTrans = NULL;
102✔
2410

2411
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "red-vgroup");
102✔
2412
  if (pTrans == NULL) {
102!
2413
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2414
    if (terrno != 0) code = terrno;
×
2415
    goto _OVER;
×
2416
  }
2417

2418
  mndTransSetDbName(pTrans, pVgroup->dbName, NULL);
102✔
2419
  TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
102✔
2420

2421
  mndTransSetSerial(pTrans);
101✔
2422
  mInfo("trans:%d, used to redistribute vgroup, vgId:%d", pTrans->id, pVgroup->vgId);
101!
2423

2424
  SVgObj newVg = {0};
101✔
2425
  memcpy(&newVg, pVgroup, sizeof(SVgObj));
101✔
2426
  mInfo("vgId:%d, vgroup info before redistribute, replica:%d", newVg.vgId, newVg.replica);
101!
2427
  for (int32_t i = 0; i < newVg.replica; ++i) {
342✔
2428
    mInfo("vgId:%d, vnode:%d dnode:%d role:%s", newVg.vgId, i, newVg.vnodeGid[i].dnodeId,
241!
2429
          syncStr(newVg.vnodeGid[i].syncState));
2430
  }
2431

2432
  if (pNew1 != NULL && pOld1 != NULL) {
101!
2433
    int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew1->id);
101✔
2434
    if (numOfVnodes >= pNew1->numOfSupportVnodes) {
101✔
2435
      mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew1->id, numOfVnodes,
2!
2436
             pNew1->numOfSupportVnodes);
2437
      code = TSDB_CODE_MND_NO_ENOUGH_VNODES;
2✔
2438
      goto _OVER;
2✔
2439
    }
2440

2441
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
99✔
2442
    if (pNew1->memAvail - vgMem - pNew1->memUsed <= 0) {
99!
2443
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
2444
             pVgroup->dbName, pVgroup->vgId, vgMem, pNew1->id, pNew1->memAvail, pNew1->memUsed);
2445
      code = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
×
2446
      goto _OVER;
×
2447
    } else {
2448
      pNew1->memUsed += vgMem;
99✔
2449
    }
2450

2451
    TAOS_CHECK_GOTO(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew1->id), NULL, _OVER);
99!
2452
    TAOS_CHECK_GOTO(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld1->id), NULL, _OVER);
99!
2453
  }
2454

2455
  if (pNew2 != NULL && pOld2 != NULL) {
99!
2456
    int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew2->id);
28✔
2457
    if (numOfVnodes >= pNew2->numOfSupportVnodes) {
28!
2458
      mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew2->id, numOfVnodes,
×
2459
             pNew2->numOfSupportVnodes);
2460
      code = TSDB_CODE_MND_NO_ENOUGH_VNODES;
×
2461
      goto _OVER;
×
2462
    }
2463
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
28✔
2464
    if (pNew2->memAvail - vgMem - pNew2->memUsed <= 0) {
28!
2465
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
2466
             pVgroup->dbName, pVgroup->vgId, vgMem, pNew2->id, pNew2->memAvail, pNew2->memUsed);
2467
      code = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
×
2468
      goto _OVER;
×
2469
    } else {
2470
      pNew2->memUsed += vgMem;
28✔
2471
    }
2472
    TAOS_CHECK_GOTO(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew2->id), NULL, _OVER);
28!
2473
    TAOS_CHECK_GOTO(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld2->id), NULL, _OVER);
28!
2474
  }
2475

2476
  if (pNew3 != NULL && pOld3 != NULL) {
99!
2477
    int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew3->id);
12✔
2478
    if (numOfVnodes >= pNew3->numOfSupportVnodes) {
12!
2479
      mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew3->id, numOfVnodes,
×
2480
             pNew3->numOfSupportVnodes);
2481
      code = TSDB_CODE_MND_NO_ENOUGH_VNODES;
×
2482
      goto _OVER;
×
2483
    }
2484
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
12✔
2485
    if (pNew3->memAvail - vgMem - pNew3->memUsed <= 0) {
12!
2486
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
2487
             pVgroup->dbName, pVgroup->vgId, vgMem, pNew3->id, pNew3->memAvail, pNew3->memUsed);
2488
      code = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
×
2489
      goto _OVER;
×
2490
    } else {
2491
      pNew3->memUsed += vgMem;
12✔
2492
    }
2493
    TAOS_CHECK_GOTO(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew3->id), NULL, _OVER);
12!
2494
    TAOS_CHECK_GOTO(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld3->id), NULL, _OVER);
12!
2495
  }
2496

2497
  {
2498
    SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
99✔
2499
    if (pRaw == NULL) {
99!
2500
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2501
      if (terrno != 0) code = terrno;
×
2502
      goto _OVER;
×
2503
    }
2504
    if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
99!
2505
      sdbFreeRaw(pRaw);
×
2506
      goto _OVER;
×
2507
    }
2508
    code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
99✔
2509
    if (code != 0) {
99!
2510
      mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2511
      goto _OVER;
×
2512
    }
2513
  }
2514

2515
  mInfo("vgId:%d, vgroup info after redistribute, replica:%d", newVg.vgId, newVg.replica);
99!
2516
  for (int32_t i = 0; i < newVg.replica; ++i) {
334✔
2517
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
235!
2518
  }
2519

2520
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
99!
2521
  code = 0;
99✔
2522

2523
_OVER:
102✔
2524
  mndTransDrop(pTrans);
102✔
2525
  mndReleaseDb(pMnode, pDb);
102✔
2526
  TAOS_RETURN(code);
102✔
2527
}
2528

2529
static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) {
128✔
2530
  SMnode    *pMnode = pReq->info.node;
128✔
2531
  SDnodeObj *pNew1 = NULL;
128✔
2532
  SDnodeObj *pNew2 = NULL;
128✔
2533
  SDnodeObj *pNew3 = NULL;
128✔
2534
  SDnodeObj *pOld1 = NULL;
128✔
2535
  SDnodeObj *pOld2 = NULL;
128✔
2536
  SDnodeObj *pOld3 = NULL;
128✔
2537
  SVgObj    *pVgroup = NULL;
128✔
2538
  SDbObj    *pDb = NULL;
128✔
2539
  int32_t    code = -1;
128✔
2540
  int64_t    curMs = taosGetTimestampMs();
128✔
2541
  int32_t    newDnodeId[3] = {0};
128✔
2542
  int32_t    oldDnodeId[3] = {0};
128✔
2543
  int32_t    newIndex = -1;
128✔
2544
  int32_t    oldIndex = -1;
128✔
2545

2546
  SRedistributeVgroupReq req = {0};
128✔
2547
  if (tDeserializeSRedistributeVgroupReq(pReq->pCont, pReq->contLen, &req) != 0) {
128!
2548
    code = TSDB_CODE_INVALID_MSG;
×
2549
    goto _OVER;
×
2550
  }
2551

2552
  mInfo("vgId:%d, start to redistribute vgroup to dnode %d:%d:%d", req.vgId, req.dnodeId1, req.dnodeId2, req.dnodeId3);
128!
2553
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_REDISTRIBUTE_VGROUP)) != 0) {
128✔
2554
    goto _OVER;
1✔
2555
  }
2556

2557
  pVgroup = mndAcquireVgroup(pMnode, req.vgId);
127✔
2558
  if (pVgroup == NULL) {
127✔
2559
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
6✔
2560
    if (terrno != 0) code = terrno;
6!
2561
    goto _OVER;
6✔
2562
  }
2563
  if (pVgroup->mountVgId) {
121!
2564
    code = TSDB_CODE_MND_MOUNT_OBJ_NOT_SUPPORT;
×
2565
    goto _OVER;
×
2566
  }
2567
  pDb = mndAcquireDb(pMnode, pVgroup->dbName);
121✔
2568
  if (pDb == NULL) {
121!
2569
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2570
    if (terrno != 0) code = terrno;
×
2571
    goto _OVER;
×
2572
  }
2573

2574
  if (pVgroup->replica == 1) {
121✔
2575
    if (req.dnodeId1 <= 0 || req.dnodeId2 > 0 || req.dnodeId3 > 0) {
35!
2576
      code = TSDB_CODE_MND_INVALID_REPLICA;
×
2577
      goto _OVER;
×
2578
    }
2579

2580
    if (req.dnodeId1 == pVgroup->vnodeGid[0].dnodeId) {
35✔
2581
      // terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
2582
      code = 0;
1✔
2583
      goto _OVER;
1✔
2584
    }
2585

2586
    pNew1 = mndAcquireDnode(pMnode, req.dnodeId1);
34✔
2587
    if (pNew1 == NULL) {
34!
2588
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2589
      if (terrno != 0) code = terrno;
×
2590
      goto _OVER;
×
2591
    }
2592
    if (!mndIsDnodeOnline(pNew1, curMs)) {
34!
2593
      code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2594
      goto _OVER;
×
2595
    }
2596

2597
    pOld1 = mndAcquireDnode(pMnode, pVgroup->vnodeGid[0].dnodeId);
34✔
2598
    if (pOld1 == NULL) {
34!
2599
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2600
      if (terrno != 0) code = terrno;
×
2601
      goto _OVER;
×
2602
    }
2603
    if (!mndIsDnodeOnline(pOld1, curMs)) {
34✔
2604
      code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
2✔
2605
      goto _OVER;
2✔
2606
    }
2607

2608
    code = mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, NULL, NULL, NULL, NULL);
32✔
2609

2610
  } else if (pVgroup->replica == 3) {
86!
2611
    if (req.dnodeId1 <= 0 || req.dnodeId2 <= 0 || req.dnodeId3 <= 0) {
86!
2612
      code = TSDB_CODE_MND_INVALID_REPLICA;
8✔
2613
      goto _OVER;
8✔
2614
    }
2615

2616
    if (req.dnodeId1 == req.dnodeId2 || req.dnodeId1 == req.dnodeId3 || req.dnodeId2 == req.dnodeId3) {
78!
2617
      code = TSDB_CODE_MND_INVALID_REPLICA;
2✔
2618
      goto _OVER;
2✔
2619
    }
2620

2621
    if (req.dnodeId1 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId1 != pVgroup->vnodeGid[1].dnodeId &&
76✔
2622
        req.dnodeId1 != pVgroup->vnodeGid[2].dnodeId) {
45✔
2623
      newDnodeId[++newIndex] = req.dnodeId1;
31✔
2624
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
31!
2625
    }
2626

2627
    if (req.dnodeId2 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[1].dnodeId &&
76✔
2628
        req.dnodeId2 != pVgroup->vnodeGid[2].dnodeId) {
48✔
2629
      newDnodeId[++newIndex] = req.dnodeId2;
38✔
2630
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
38!
2631
    }
2632

2633
    if (req.dnodeId3 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId3 != pVgroup->vnodeGid[1].dnodeId &&
76✔
2634
        req.dnodeId3 != pVgroup->vnodeGid[2].dnodeId) {
60✔
2635
      newDnodeId[++newIndex] = req.dnodeId3;
49✔
2636
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
49!
2637
    }
2638

2639
    if (req.dnodeId1 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[0].dnodeId &&
76✔
2640
        req.dnodeId3 != pVgroup->vnodeGid[0].dnodeId) {
50✔
2641
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[0].dnodeId;
39✔
2642
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
39!
2643
    }
2644

2645
    if (req.dnodeId1 != pVgroup->vnodeGid[1].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[1].dnodeId &&
76✔
2646
        req.dnodeId3 != pVgroup->vnodeGid[1].dnodeId) {
43✔
2647
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[1].dnodeId;
38✔
2648
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
38!
2649
    }
2650

2651
    if (req.dnodeId1 != pVgroup->vnodeGid[2].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[2].dnodeId &&
76✔
2652
        req.dnodeId3 != pVgroup->vnodeGid[2].dnodeId) {
52✔
2653
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[2].dnodeId;
41✔
2654
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
41!
2655
    }
2656

2657
    if (newDnodeId[0] != 0) {
76✔
2658
      pNew1 = mndAcquireDnode(pMnode, newDnodeId[0]);
74✔
2659
      if (pNew1 == NULL) {
74!
2660
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2661
        if (terrno != 0) code = terrno;
×
2662
        goto _OVER;
×
2663
      }
2664
      if (!mndIsDnodeOnline(pNew1, curMs)) {
74✔
2665
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
2✔
2666
        goto _OVER;
2✔
2667
      }
2668
    }
2669

2670
    if (newDnodeId[1] != 0) {
74✔
2671
      pNew2 = mndAcquireDnode(pMnode, newDnodeId[1]);
30✔
2672
      if (pNew2 == NULL) {
30!
2673
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2674
        if (terrno != 0) code = terrno;
×
2675
        goto _OVER;
×
2676
      }
2677
      if (!mndIsDnodeOnline(pNew2, curMs)) {
30!
2678
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2679
        goto _OVER;
×
2680
      }
2681
    }
2682

2683
    if (newDnodeId[2] != 0) {
74✔
2684
      pNew3 = mndAcquireDnode(pMnode, newDnodeId[2]);
14✔
2685
      if (pNew3 == NULL) {
14!
2686
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2687
        if (terrno != 0) code = terrno;
×
2688
        goto _OVER;
×
2689
      }
2690
      if (!mndIsDnodeOnline(pNew3, curMs)) {
14!
2691
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2692
        goto _OVER;
×
2693
      }
2694
    }
2695

2696
    if (oldDnodeId[0] != 0) {
74✔
2697
      pOld1 = mndAcquireDnode(pMnode, oldDnodeId[0]);
72✔
2698
      if (pOld1 == NULL) {
72!
2699
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2700
        if (terrno != 0) code = terrno;
×
2701
        goto _OVER;
×
2702
      }
2703
      if (!mndIsDnodeOnline(pOld1, curMs)) {
72✔
2704
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
2✔
2705
        goto _OVER;
2✔
2706
      }
2707
    }
2708

2709
    if (oldDnodeId[1] != 0) {
72✔
2710
      pOld2 = mndAcquireDnode(pMnode, oldDnodeId[1]);
28✔
2711
      if (pOld2 == NULL) {
28!
2712
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2713
        if (terrno != 0) code = terrno;
×
2714
        goto _OVER;
×
2715
      }
2716
      if (!mndIsDnodeOnline(pOld2, curMs)) {
28!
2717
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2718
        goto _OVER;
×
2719
      }
2720
    }
2721

2722
    if (oldDnodeId[2] != 0) {
72✔
2723
      pOld3 = mndAcquireDnode(pMnode, oldDnodeId[2]);
12✔
2724
      if (pOld3 == NULL) {
12!
2725
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2726
        if (terrno != 0) code = terrno;
×
2727
        goto _OVER;
×
2728
      }
2729
      if (!mndIsDnodeOnline(pOld3, curMs)) {
12!
2730
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2731
        goto _OVER;
×
2732
      }
2733
    }
2734

2735
    if (pNew1 == NULL && pOld1 == NULL && pNew2 == NULL && pOld2 == NULL && pNew3 == NULL && pOld3 == NULL) {
72!
2736
      // terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
2737
      code = 0;
2✔
2738
      goto _OVER;
2✔
2739
    }
2740

2741
    code = mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, pNew2, pOld2, pNew3, pOld3);
70✔
2742

2743
  } else {
2744
    code = TSDB_CODE_MND_REQ_REJECTED;
×
2745
    goto _OVER;
×
2746
  }
2747

2748
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
102✔
2749

2750
  char obj[33] = {0};
102✔
2751
  (void)tsnprintf(obj, sizeof(obj), "%d", req.vgId);
102✔
2752

2753
  auditRecord(pReq, pMnode->clusterId, "RedistributeVgroup", "", obj, req.sql, req.sqlLen);
102✔
2754

2755
_OVER:
128✔
2756
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
128✔
2757
    mError("vgId:%d, failed to redistribute to dnode %d:%d:%d since %s", req.vgId, req.dnodeId1, req.dnodeId2,
26!
2758
           req.dnodeId3, tstrerror(code));
2759
  }
2760

2761
  mndReleaseDnode(pMnode, pNew1);
128✔
2762
  mndReleaseDnode(pMnode, pNew2);
128✔
2763
  mndReleaseDnode(pMnode, pNew3);
128✔
2764
  mndReleaseDnode(pMnode, pOld1);
128✔
2765
  mndReleaseDnode(pMnode, pOld2);
128✔
2766
  mndReleaseDnode(pMnode, pOld3);
128✔
2767
  mndReleaseVgroup(pMnode, pVgroup);
128✔
2768
  mndReleaseDb(pMnode, pDb);
128✔
2769
  tFreeSRedistributeVgroupReq(&req);
128✔
2770

2771
  TAOS_RETURN(code);
128✔
2772
}
2773

2774
static void *mndBuildSForceBecomeFollowerReq(SMnode *pMnode, SVgObj *pVgroup, int32_t dnodeId, int32_t *pContLen) {
12✔
2775
  SForceBecomeFollowerReq balanceReq = {
12✔
2776
      .vgId = pVgroup->vgId,
12✔
2777
  };
2778

2779
  int32_t contLen = tSerializeSForceBecomeFollowerReq(NULL, 0, &balanceReq);
12✔
2780
  if (contLen < 0) {
12!
2781
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2782
    return NULL;
×
2783
  }
2784
  contLen += sizeof(SMsgHead);
12✔
2785

2786
  void *pReq = taosMemoryMalloc(contLen);
12!
2787
  if (pReq == NULL) {
12!
2788
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2789
    return NULL;
×
2790
  }
2791

2792
  SMsgHead *pHead = pReq;
12✔
2793
  pHead->contLen = htonl(contLen);
12✔
2794
  pHead->vgId = htonl(pVgroup->vgId);
12✔
2795

2796
  if (tSerializeSForceBecomeFollowerReq((char *)pReq + sizeof(SMsgHead), contLen, &balanceReq) < 0) {
12!
2797
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2798
    taosMemoryFree(pReq);
×
2799
    return NULL;
×
2800
  }
2801
  *pContLen = contLen;
12✔
2802
  return pReq;
12✔
2803
}
2804

2805
int32_t mndAddBalanceVgroupLeaderAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t dnodeId) {
12✔
2806
  int32_t    code = 0;
12✔
2807
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
12✔
2808
  if (pDnode == NULL) {
12!
2809
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2810
    if (terrno != 0) code = terrno;
×
2811
    TAOS_RETURN(code);
×
2812
  }
2813

2814
  STransAction action = {0};
12✔
2815
  action.epSet = mndGetDnodeEpset(pDnode);
12✔
2816
  mndReleaseDnode(pMnode, pDnode);
12✔
2817

2818
  int32_t contLen = 0;
12✔
2819
  void   *pReq = mndBuildSForceBecomeFollowerReq(pMnode, pVgroup, dnodeId, &contLen);
12✔
2820
  if (pReq == NULL) {
12!
2821
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2822
    if (terrno != 0) code = terrno;
×
2823
    TAOS_RETURN(code);
×
2824
  }
2825

2826
  action.pCont = pReq;
12✔
2827
  action.contLen = contLen;
12✔
2828
  action.msgType = TDMT_SYNC_FORCE_FOLLOWER;
12✔
2829

2830
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
12!
2831
    taosMemoryFree(pReq);
×
2832
    TAOS_RETURN(code);
×
2833
  }
2834

2835
  TAOS_RETURN(code);
12✔
2836
}
2837

2838
int32_t mndAddVgroupBalanceToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTrans) {
14✔
2839
  int32_t code = 0;
14✔
2840
  SSdb   *pSdb = pMnode->pSdb;
14✔
2841

2842
  int32_t vgid = pVgroup->vgId;
14✔
2843
  int8_t  replica = pVgroup->replica;
14✔
2844

2845
  if (pVgroup->replica <= 1) {
14✔
2846
    mInfo("trans:%d, vgid:%d no need to balance, replica:%d", pTrans->id, vgid, replica);
1!
2847
    return -1;
1✔
2848
  }
2849

2850
  int32_t dnodeId = 0;
13✔
2851

2852
  for (int i = 0; i < replica; i++) {
22✔
2853
    if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEADER) {
21✔
2854
      dnodeId = pVgroup->vnodeGid[i].dnodeId;
12✔
2855
      break;
12✔
2856
    }
2857
  }
2858

2859
  bool       exist = false;
13✔
2860
  bool       online = false;
13✔
2861
  int64_t    curMs = taosGetTimestampMs();
13✔
2862
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
13✔
2863
  if (pDnode != NULL) {
13✔
2864
    exist = true;
12✔
2865
    online = mndIsDnodeOnline(pDnode, curMs);
12✔
2866
    mndReleaseDnode(pMnode, pDnode);
12✔
2867
  }
2868

2869
  if (exist && online) {
25!
2870
    mInfo("trans:%d, vgid:%d leader to dnode:%d", pTrans->id, vgid, dnodeId);
12!
2871

2872
    if ((code = mndAddBalanceVgroupLeaderAction(pMnode, pTrans, pVgroup, dnodeId)) != 0) {
12!
2873
      mError("trans:%d, vgid:%d failed to be balanced to dnode:%d", pTrans->id, vgid, dnodeId);
×
2874
      TAOS_RETURN(code);
×
2875
    }
2876

2877
    SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
12✔
2878
    if (pDb == NULL) {
12!
2879
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2880
      if (terrno != 0) code = terrno;
×
2881
      mError("trans:%d, vgid:%d failed to be balanced to dnode:%d, because db not exist", pTrans->id, vgid, dnodeId);
×
2882
      TAOS_RETURN(code);
×
2883
    }
2884

2885
    mndReleaseDb(pMnode, pDb);
12✔
2886
  } else {
2887
    mInfo("trans:%d, vgid:%d cant be balanced to dnode:%d, exist:%d, online:%d", pTrans->id, vgid, dnodeId, exist,
1!
2888
          online);
2889
  }
2890

2891
  TAOS_RETURN(code);
13✔
2892
}
2893

2894
extern int32_t mndProcessVgroupBalanceLeaderMsgImp(SRpcMsg *pReq);
2895

2896
int32_t mndProcessVgroupBalanceLeaderMsg(SRpcMsg *pReq) { return mndProcessVgroupBalanceLeaderMsgImp(pReq); }
5✔
2897

2898
#ifndef TD_ENTERPRISE
2899
int32_t mndProcessVgroupBalanceLeaderMsgImp(SRpcMsg *pReq) { return 0; }
2900
#endif
2901

2902
static int32_t mndCheckDnodeMemory(SMnode *pMnode, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pOldVgroup,
483✔
2903
                                   SVgObj *pNewVgroup, SArray *pArray) {
2904
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
1,306✔
2905
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
823✔
2906
    bool       inVgroup = false;
823✔
2907
    int64_t    oldMemUsed = 0;
823✔
2908
    int64_t    newMemUsed = 0;
823✔
2909
    mDebug("db:%s, vgId:%d, check dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName, pNewVgroup->vgId,
823✔
2910
           pDnode->id, pDnode->memAvail, pDnode->memUsed);
2911
    for (int32_t j = 0; j < pOldVgroup->replica; ++j) {
2,666✔
2912
      SVnodeGid *pVgId = &pOldVgroup->vnodeGid[j];
1,843✔
2913
      if (pDnode->id == pVgId->dnodeId) {
1,843✔
2914
        oldMemUsed = mndGetVgroupMemory(pMnode, pOldDb, pOldVgroup);
823✔
2915
        inVgroup = true;
823✔
2916
      }
2917
    }
2918
    for (int32_t j = 0; j < pNewVgroup->replica; ++j) {
2,666✔
2919
      SVnodeGid *pVgId = &pNewVgroup->vnodeGid[j];
1,843✔
2920
      if (pDnode->id == pVgId->dnodeId) {
1,843✔
2921
        newMemUsed = mndGetVgroupMemory(pMnode, pNewDb, pNewVgroup);
823✔
2922
        inVgroup = true;
823✔
2923
      }
2924
    }
2925

2926
    mDebug("db:%s, vgId:%d, memory in dnode:%d, oldUsed:%" PRId64 ", newUsed:%" PRId64, pNewVgroup->dbName,
823✔
2927
           pNewVgroup->vgId, pDnode->id, oldMemUsed, newMemUsed);
2928

2929
    pDnode->memUsed = pDnode->memUsed - oldMemUsed + newMemUsed;
823✔
2930
    if (pDnode->memAvail - pDnode->memUsed <= 0) {
823!
2931
      mError("db:%s, vgId:%d, no enough memory in dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName,
×
2932
             pNewVgroup->vgId, pDnode->id, pDnode->memAvail, pDnode->memUsed);
2933
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE);
×
2934
    } else if (inVgroup) {
823!
2935
      mInfo("db:%s, vgId:%d, memory in dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName, pNewVgroup->vgId,
823!
2936
            pDnode->id, pDnode->memAvail, pDnode->memUsed);
2937
    } else {
2938
    }
2939
  }
2940
  return 0;
483✔
2941
}
2942

2943
int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
565✔
2944
                                  SArray *pArray, SVgObj *pNewVgroup) {
2945
  int32_t code = 0;
565✔
2946
  memcpy(pNewVgroup, pVgroup, sizeof(SVgObj));
565✔
2947

2948
  if (pVgroup->replica <= 0 || pVgroup->replica == pNewDb->cfg.replications) {
565!
2949
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfigAction(pMnode, pTrans, pNewDb, pVgroup));
483!
2950
    TAOS_CHECK_RETURN(mndCheckDnodeMemory(pMnode, pOldDb, pNewDb, pNewVgroup, pVgroup, pArray));
483!
2951
    return 0;
483✔
2952
  }
2953

2954
  // mndTransSetGroupParallel(pTrans);
2955

2956
  if (pNewDb->cfg.replications == 3) {
82✔
2957
    mInfo("trans:%d, db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pTrans->id, pVgroup->dbName, pVgroup->vgId,
74!
2958
          pVgroup->vnodeGid[0].dnodeId);
2959

2960
    // add second
2961
    if (pNewVgroup->replica == 1) {
74!
2962
      TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
74✔
2963
    }
2964

2965
    // learner stage
2966
    pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
73✔
2967
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
73✔
2968
    TAOS_CHECK_RETURN(
73!
2969
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
2970

2971
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &pNewVgroup->vnodeGid[1]));
73!
2972

2973
    // follower stage
2974
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
73✔
2975
    TAOS_CHECK_RETURN(mndAddAlterVnodeTypeAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
73!
2976
    TAOS_CHECK_RETURN(
73!
2977
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
2978

2979
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
73!
2980

2981
    // add third
2982
    if (pNewVgroup->replica == 2) {
73!
2983
      TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
73✔
2984
    }
2985

2986
    pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
62✔
2987
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
62✔
2988
    pNewVgroup->vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
62✔
2989
    TAOS_CHECK_RETURN(
62!
2990
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
2991
    TAOS_CHECK_RETURN(
62!
2992
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
2993
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &pNewVgroup->vnodeGid[2]));
62!
2994

2995
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
62!
2996
  } else if (pNewDb->cfg.replications == 1) {
8!
2997
    mInfo("trans:%d, db:%s, vgId:%d, will remove 2 vnodes, vn:0 dnode:%d vn:1 dnode:%d vn:2 dnode:%d", pTrans->id,
8!
2998
          pVgroup->dbName, pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId, pVgroup->vnodeGid[1].dnodeId,
2999
          pVgroup->vnodeGid[2].dnodeId);
3000

3001
    SVnodeGid del1 = {0};
8✔
3002
    SVnodeGid del2 = {0};
8✔
3003
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroup(pMnode, pTrans, pNewVgroup, pArray, &del1));
8!
3004
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &del1, true));
8!
3005
    TAOS_CHECK_RETURN(
8!
3006
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3007
    TAOS_CHECK_RETURN(
8!
3008
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
3009
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
8!
3010

3011
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroup(pMnode, pTrans, pNewVgroup, pArray, &del2));
8!
3012
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &del2, true));
8!
3013
    TAOS_CHECK_RETURN(
8!
3014
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3015
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
8!
3016
  } else if (pNewDb->cfg.replications == 2) {
×
3017
    mInfo("trans:%d, db:%s, vgId:%d, will add 1 vnode, vn:0 dnode:%d", pTrans->id, pVgroup->dbName, pVgroup->vgId,
×
3018
          pVgroup->vnodeGid[0].dnodeId);
3019

3020
    // add second
3021
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
×
3022

3023
    // learner stage
3024
    pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3025
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3026
    TAOS_CHECK_RETURN(
×
3027
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3028

3029
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &pNewVgroup->vnodeGid[1]));
×
3030

3031
    // follower stage
3032
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3033
    TAOS_CHECK_RETURN(mndAddAlterVnodeTypeAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
×
3034
    TAOS_CHECK_RETURN(
×
3035
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3036

3037
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
×
3038
  } else {
3039
    return -1;
×
3040
  }
3041

3042
  mndSortVnodeGid(pNewVgroup);
70✔
3043

3044
  {
3045
    SSdbRaw *pVgRaw = mndVgroupActionEncode(pNewVgroup);
70✔
3046
    if (pVgRaw == NULL) {
70!
3047
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3048
      if (terrno != 0) code = terrno;
×
3049
      TAOS_RETURN(code);
×
3050
    }
3051
    if ((code = mndTransAppendCommitlog(pTrans, pVgRaw)) != 0) {
70!
3052
      sdbFreeRaw(pVgRaw);
×
3053
      TAOS_RETURN(code);
×
3054
    }
3055
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
70✔
3056
    if (code != 0) {
70!
3057
      mError("vgId:%d, failed to set raw status since %s at line:%d", pNewVgroup->vgId, tstrerror(code), __LINE__);
×
3058
      TAOS_RETURN(code);
×
3059
    }
3060
  }
3061

3062
  TAOS_RETURN(code);
70✔
3063
}
3064

3065
int32_t mndBuildRaftAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
×
3066
                                      SArray *pArray) {
3067
  int32_t code = 0;
×
3068
  SVgObj  newVgroup = {0};
×
3069
  memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
×
3070

3071
  if (pVgroup->replica <= 0 || pVgroup->replica == pNewDb->cfg.replications) {
×
3072
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfigAction(pMnode, pTrans, pNewDb, pVgroup));
×
3073
    TAOS_CHECK_RETURN(mndCheckDnodeMemory(pMnode, pOldDb, pNewDb, &newVgroup, pVgroup, pArray));
×
3074
    return 0;
×
3075
  }
3076

3077
  mndTransSetSerial(pTrans);
×
3078

3079
  mInfo("trans:%d, vgId:%d, alter vgroup, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id, pVgroup->vgId,
×
3080
        pVgroup->syncConfChangeVer, pVgroup->version, pVgroup->replica);
3081

3082
  if (newVgroup.replica == 1 && pNewDb->cfg.replications == 3) {
×
3083
    mInfo("db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId,
×
3084
          pVgroup->vnodeGid[0].dnodeId);
3085

3086
    // add second
3087
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVgroup, pArray));
×
3088
    // add third
3089
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVgroup, pArray));
×
3090

3091
    // add learner stage
3092
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3093
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3094
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3095
    TAOS_CHECK_RETURN(
×
3096
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3097
    mInfo("trans:%d, vgId:%d, add change config, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id,
×
3098
          pVgroup->vgId, newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
3099
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[1]));
×
3100
    mInfo("trans:%d, vgId:%d, create vnode, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id, pVgroup->vgId,
×
3101
          newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
3102
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[2]));
×
3103
    mInfo("trans:%d, vgId:%d, create vnode, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id, pVgroup->vgId,
×
3104
          newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
3105

3106
    // check learner
3107
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3108
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3109
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3110
    TAOS_CHECK_RETURN(
×
3111
        mndAddCheckLearnerCatchupAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[1].dnodeId));
3112
    TAOS_CHECK_RETURN(
×
3113
        mndAddCheckLearnerCatchupAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[2].dnodeId));
3114

3115
    // change raft type
3116
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3117
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3118
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3119
    TAOS_CHECK_RETURN(
×
3120
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3121

3122
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3123

3124
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3125
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3126
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3127
    TAOS_CHECK_RETURN(
×
3128
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3129

3130
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3131

3132
    SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
×
3133
    if (pVgRaw == NULL) {
×
3134
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3135
      if (terrno != 0) code = terrno;
×
3136
      TAOS_RETURN(code);
×
3137
    }
3138
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
3139
      sdbFreeRaw(pVgRaw);
×
3140
      TAOS_RETURN(code);
×
3141
    }
3142
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3143
    if (code != 0) {
×
3144
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3145
             __LINE__);
3146
      TAOS_RETURN(code);
×
3147
    }
3148
  } else if (newVgroup.replica == 3 && pNewDb->cfg.replications == 1) {
×
3149
    mInfo("db:%s, vgId:%d, will remove 2 vnodes, vn:0 dnode:%d vn:1 dnode:%d vn:2 dnode:%d", pVgroup->dbName,
×
3150
          pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId, pVgroup->vnodeGid[1].dnodeId, pVgroup->vnodeGid[2].dnodeId);
3151

3152
    SVnodeGid del1 = {0};
×
3153
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroupWithoutSave(pMnode, pTrans, &newVgroup, pArray, &del1));
×
3154

3155
    TAOS_CHECK_RETURN(
×
3156
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3157

3158
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3159

3160
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del1, true));
×
3161

3162
    SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
×
3163
    if (pVgRaw == NULL) {
×
3164
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3165
      if (terrno != 0) code = terrno;
×
3166
      TAOS_RETURN(code);
×
3167
    }
3168
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
3169
      sdbFreeRaw(pVgRaw);
×
3170
      TAOS_RETURN(code);
×
3171
    }
3172
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3173
    if (code != 0) {
×
3174
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3175
             __LINE__);
3176
      TAOS_RETURN(code);
×
3177
    }
3178

3179
    SVnodeGid del2 = {0};
×
3180
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroupWithoutSave(pMnode, pTrans, &newVgroup, pArray, &del2));
×
3181

3182
    TAOS_CHECK_RETURN(
×
3183
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3184

3185
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3186

3187
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del2, true));
×
3188

3189
    pVgRaw = mndVgroupActionEncode(&newVgroup);
×
3190
    if (pVgRaw == NULL) {
×
3191
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3192
      if (terrno != 0) code = terrno;
×
3193
      TAOS_RETURN(code);
×
3194
    }
3195
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
3196
      sdbFreeRaw(pVgRaw);
×
3197
      TAOS_RETURN(code);
×
3198
    }
3199
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3200
    if (code != 0) {
×
3201
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3202
             __LINE__);
3203
      TAOS_RETURN(code);
×
3204
    }
3205
  } else {
3206
    return -1;
×
3207
  }
3208

3209
  mndSortVnodeGid(&newVgroup);
×
3210

3211
  {
3212
    SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
×
3213
    if (pVgRaw == NULL) {
×
3214
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3215
      if (terrno != 0) code = terrno;
×
3216
      TAOS_RETURN(code);
×
3217
    }
3218
    if ((code = mndTransAppendCommitlog(pTrans, pVgRaw)) != 0) {
×
3219
      sdbFreeRaw(pVgRaw);
×
3220
      TAOS_RETURN(code);
×
3221
    }
3222
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3223
    if (code != 0) {
×
3224
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3225
             __LINE__);
3226
      TAOS_RETURN(code);
×
3227
    }
3228
  }
3229

3230
  TAOS_RETURN(code);
×
3231
}
3232

3233
int32_t mndBuildRestoreAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *db, SVgObj *pVgroup, SDnodeObj *pDnode,
10✔
3234
                                         SDnodeObj *pAnotherDnode) {
3235
  int32_t code = 0;
10✔
3236
  SVgObj  newVgroup = {0};
10✔
3237
  memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
10✔
3238

3239
  mInfo("trans:%d, db:%s, vgId:%d, restore vnodes, vn:0 dnode:%d", pTrans->id, pVgroup->dbName, pVgroup->vgId,
10!
3240
        pVgroup->vnodeGid[0].dnodeId);
3241

3242
  if (newVgroup.replica == 1) {
10!
3243
    int selected = 0;
×
3244
    for (int i = 0; i < newVgroup.replica; i++) {
×
3245
      newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3246
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3247
        selected = i;
×
3248
      }
3249
    }
3250
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, &newVgroup.vnodeGid[selected]));
×
3251
  } else if (newVgroup.replica == 2) {
10!
3252
    for (int i = 0; i < newVgroup.replica; i++) {
×
3253
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3254
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3255
      } else {
3256
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3257
      }
3258
    }
3259
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pAnotherDnode));
×
3260

3261
    for (int i = 0; i < newVgroup.replica; i++) {
×
3262
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3263
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3264
      } else {
3265
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3266
      }
3267
    }
3268
    TAOS_CHECK_RETURN(mndRestoreAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, pDnode));
×
3269

3270
    for (int i = 0; i < newVgroup.replica; i++) {
×
3271
      newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3272
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3273
      }
3274
    }
3275
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pDnode));
×
3276
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pAnotherDnode));
×
3277
  } else if (newVgroup.replica == 3) {
10!
3278
    for (int i = 0; i < newVgroup.replica; i++) {
40✔
3279
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
30✔
3280
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_LEARNER;
10✔
3281
      } else {
3282
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
20✔
3283
      }
3284
    }
3285
    TAOS_CHECK_RETURN(mndRestoreAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, pDnode));
10!
3286

3287
    for (int i = 0; i < newVgroup.replica; i++) {
40✔
3288
      newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
30✔
3289
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
30✔
3290
      }
3291
    }
3292
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pDnode));
10!
3293
  }
3294
  SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
10✔
3295
  if (pVgRaw == NULL) {
10!
3296
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3297
    if (terrno != 0) code = terrno;
×
3298
    TAOS_RETURN(code);
×
3299
  }
3300
  if ((code = mndTransAppendCommitlog(pTrans, pVgRaw)) != 0) {
10!
3301
    sdbFreeRaw(pVgRaw);
×
3302
    TAOS_RETURN(code);
×
3303
  }
3304
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
10✔
3305
  if (code != 0) {
10!
3306
    mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code), __LINE__);
×
3307
    TAOS_RETURN(code);
×
3308
  }
3309

3310
  TAOS_RETURN(code);
10✔
3311
}
3312

3313
static int32_t mndAddAdjustVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
×
3314
  return 0;
×
3315
}
3316

3317
typedef int32_t (*FpTransActionCb)(STrans *pTrans, SSdbRaw *pRaw);
3318

3319
static int32_t mndAddVgStatusAction(STrans *pTrans, SVgObj *pVg, ESdbStatus vgStatus, ETrnStage stage) {
109✔
3320
  int32_t         code = 0;
109✔
3321
  FpTransActionCb appendActionCb = (stage == TRN_STAGE_COMMIT_ACTION) ? mndTransAppendCommitlog : mndTransAppendRedolog;
109✔
3322
  SSdbRaw        *pRaw = mndVgroupActionEncode(pVg);
109✔
3323
  if (pRaw == NULL) {
109!
3324
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3325
    if (terrno != 0) code = terrno;
×
3326
    goto _err;
×
3327
  }
3328
  if ((code = appendActionCb(pTrans, pRaw)) != 0) goto _err;
109!
3329
  code = sdbSetRawStatus(pRaw, vgStatus);
109✔
3330
  if (code != 0) {
109!
3331
    mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", pVg->vgId, tstrerror(code), __LINE__);
×
3332
    goto _err;
×
3333
  }
3334
  pRaw = NULL;
109✔
3335
  TAOS_RETURN(code);
109✔
3336
_err:
×
3337
  sdbFreeRaw(pRaw);
×
3338
  TAOS_RETURN(code);
×
3339
}
3340

3341
static int32_t mndAddDbStatusAction(STrans *pTrans, SDbObj *pDb, ESdbStatus dbStatus, ETrnStage stage) {
45✔
3342
  int32_t         code = 0;
45✔
3343
  FpTransActionCb appendActionCb = (stage == TRN_STAGE_COMMIT_ACTION) ? mndTransAppendCommitlog : mndTransAppendRedolog;
45✔
3344
  SSdbRaw        *pRaw = mndDbActionEncode(pDb);
45✔
3345
  if (pRaw == NULL) {
45!
3346
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3347
    if (terrno != 0) code = terrno;
×
3348
    goto _err;
×
3349
  }
3350
  if ((code = appendActionCb(pTrans, pRaw)) != 0) goto _err;
45!
3351
  code = sdbSetRawStatus(pRaw, dbStatus);
45✔
3352
  if (code != 0) {
45!
3353
    mError("db:%s, failed to set raw status to ready, error:%s, line:%d", pDb->name, tstrerror(code), __LINE__);
×
3354
    goto _err;
×
3355
  }
3356
  pRaw = NULL;
45✔
3357
  TAOS_RETURN(code);
45✔
3358
_err:
×
3359
  sdbFreeRaw(pRaw);
×
3360
  TAOS_RETURN(code);
×
3361
}
3362

3363
int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) {
29✔
3364
  int32_t code = -1;
29✔
3365
  STrans *pTrans = NULL;
29✔
3366
  SDbObj  dbObj = {0};
29✔
3367
  SArray *pArray = mndBuildDnodesArray(pMnode, 0, NULL);
29✔
3368

3369
  int32_t numOfStreams = 0;
29✔
3370
  if ((code = mndGetNumOfStreams(pMnode, pDb->name, &numOfStreams)) != 0) {
29!
3371
    goto _OVER;
×
3372
  }
3373
  if (numOfStreams > 0) {
29!
3374
    code = TSDB_CODE_MND_STREAM_MUST_BE_DELETED;
×
3375
    goto _OVER;
×
3376
  }
3377

3378
#if defined(USE_SHARED_STORAGE)
3379
  if (tsSsEnabled) {
29!
3380
    code = TSDB_CODE_OPS_NOT_SUPPORT;
×
3381
    mError("vgId:%d, db:%s, shared storage exists, split vgroup not allowed", pVgroup->vgId, pVgroup->dbName);
×
3382
    goto _OVER;
×
3383
  }
3384
#endif
3385

3386
  if (pDb->cfg.withArbitrator) {
29!
3387
    code = TSDB_CODE_OPS_NOT_SUPPORT;
×
3388
    mError("vgId:%d, db:%s, with arbitrator, split vgroup not allowed", pVgroup->vgId, pVgroup->dbName);
×
3389
    goto _OVER;
×
3390
  }
3391

3392
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "split-vgroup");
29✔
3393
  if (pTrans == NULL) {
29!
3394
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3395
    if (terrno != 0) code = terrno;
×
3396
    goto _OVER;
×
3397
  }
3398
  mndTransSetSerial(pTrans);
29✔
3399
  mInfo("trans:%d, used to split vgroup, vgId:%d", pTrans->id, pVgroup->vgId);
29!
3400

3401
  mndTransSetDbName(pTrans, pDb->name, NULL);
29✔
3402
  TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
29✔
3403

3404
  SVgObj newVg1 = {0};
28✔
3405
  memcpy(&newVg1, pVgroup, sizeof(SVgObj));
28✔
3406
  mInfo("vgId:%d, vgroup info before split, replica:%d hashBegin:%u hashEnd:%u", newVg1.vgId, newVg1.replica,
28!
3407
        newVg1.hashBegin, newVg1.hashEnd);
3408
  for (int32_t i = 0; i < newVg1.replica; ++i) {
104✔
3409
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg1.vgId, i, newVg1.vnodeGid[i].dnodeId);
76!
3410
  }
3411

3412
  if (newVg1.replica == 1) {
28✔
3413
    TAOS_CHECK_GOTO(mndAddVnodeToVgroup(pMnode, pTrans, &newVg1, pArray), NULL, _OVER);
4!
3414

3415
    newVg1.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
4✔
3416
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId), NULL,
4!
3417
                    _OVER);
3418
    TAOS_CHECK_GOTO(mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg1, &newVg1.vnodeGid[1]), NULL, _OVER);
4!
3419

3420
    newVg1.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
4✔
3421
    TAOS_CHECK_GOTO(mndAddAlterVnodeTypeAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[1].dnodeId), NULL, _OVER);
4!
3422
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId), NULL,
4!
3423
                    _OVER);
3424

3425
    TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1), NULL, _OVER);
4!
3426
  } else if (newVg1.replica == 3) {
24!
3427
    SVnodeGid del1 = {0};
24✔
3428
    TAOS_CHECK_GOTO(mndRemoveVnodeFromVgroup(pMnode, pTrans, &newVg1, pArray, &del1), NULL, _OVER);
24!
3429
    TAOS_CHECK_GOTO(mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg1, &del1, true), NULL, _OVER);
24!
3430
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId), NULL,
24!
3431
                    _OVER);
3432
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[1].dnodeId), NULL,
24!
3433
                    _OVER);
3434
  } else {
3435
    goto _OVER;
×
3436
  }
3437

3438
  for (int32_t i = 0; i < newVg1.replica; ++i) {
84✔
3439
    TAOS_CHECK_GOTO(mndAddDisableVnodeWriteAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[i].dnodeId), NULL,
56!
3440
                    _OVER);
3441
  }
3442
  TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1), NULL, _OVER);
28!
3443

3444
  SVgObj newVg2 = {0};
28✔
3445
  memcpy(&newVg2, &newVg1, sizeof(SVgObj));
28✔
3446
  newVg1.replica = 1;
28✔
3447
  newVg1.hashEnd = newVg1.hashBegin / 2 + newVg1.hashEnd / 2;
28✔
3448
  memset(&newVg1.vnodeGid[1], 0, sizeof(SVnodeGid));
28✔
3449

3450
  newVg2.replica = 1;
28✔
3451
  newVg2.hashBegin = newVg1.hashEnd + 1;
28✔
3452
  memcpy(&newVg2.vnodeGid[0], &newVg2.vnodeGid[1], sizeof(SVnodeGid));
28✔
3453
  memset(&newVg2.vnodeGid[1], 0, sizeof(SVnodeGid));
28✔
3454

3455
  mInfo("vgId:%d, vgroup info after split, replica:%d hashrange:[%u, %u] vnode:0 dnode:%d", newVg1.vgId, newVg1.replica,
28!
3456
        newVg1.hashBegin, newVg1.hashEnd, newVg1.vnodeGid[0].dnodeId);
3457
  for (int32_t i = 0; i < newVg1.replica; ++i) {
56✔
3458
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg1.vgId, i, newVg1.vnodeGid[i].dnodeId);
28!
3459
  }
3460
  mInfo("vgId:%d, vgroup info after split, replica:%d hashrange:[%u, %u] vnode:0 dnode:%d", newVg2.vgId, newVg2.replica,
28!
3461
        newVg2.hashBegin, newVg2.hashEnd, newVg2.vnodeGid[0].dnodeId);
3462
  for (int32_t i = 0; i < newVg1.replica; ++i) {
56✔
3463
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg2.vgId, i, newVg2.vnodeGid[i].dnodeId);
28!
3464
  }
3465

3466
  // alter vgId and hash range
3467
  int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
28✔
3468
  int32_t srcVgId = newVg1.vgId;
28✔
3469
  newVg1.vgId = maxVgId;
28✔
3470
  TAOS_CHECK_GOTO(mndAddNewVgPrepareAction(pMnode, pTrans, &newVg1), NULL, _OVER);
28!
3471
  TAOS_CHECK_GOTO(mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg1), NULL, _OVER);
28!
3472

3473
  maxVgId++;
28✔
3474
  srcVgId = newVg2.vgId;
28✔
3475
  newVg2.vgId = maxVgId;
28✔
3476
  TAOS_CHECK_GOTO(mndAddNewVgPrepareAction(pMnode, pTrans, &newVg2), NULL, _OVER);
28!
3477
  TAOS_CHECK_GOTO(mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg2), NULL, _OVER);
28!
3478

3479
  TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1), NULL, _OVER);
28!
3480
  TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg2), NULL, _OVER);
28!
3481

3482
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg1, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION), NULL, _OVER);
28!
3483
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg2, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION), NULL, _OVER);
28!
3484
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, pVgroup, SDB_STATUS_DROPPED, TRN_STAGE_REDO_ACTION), NULL, _OVER);
28!
3485

3486
  // update db status
3487
  memcpy(&dbObj, pDb, sizeof(SDbObj));
28✔
3488
  if (dbObj.cfg.pRetensions != NULL) {
28!
3489
    dbObj.cfg.pRetensions = taosArrayDup(pDb->cfg.pRetensions, NULL);
×
3490
    if (dbObj.cfg.pRetensions == NULL) {
×
3491
      code = terrno;
×
3492
      goto _OVER;
×
3493
    }
3494
  }
3495
  dbObj.vgVersion++;
28✔
3496
  dbObj.updateTime = taosGetTimestampMs();
28✔
3497
  dbObj.cfg.numOfVgroups++;
28✔
3498
  TAOS_CHECK_GOTO(mndAddDbStatusAction(pTrans, &dbObj, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION), NULL, _OVER);
28!
3499

3500
  // adjust vgroup replica
3501
  if (pDb->cfg.replications != newVg1.replica) {
28✔
3502
    SVgObj tmpGroup = {0};
24✔
3503
    TAOS_CHECK_GOTO(mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg1, pArray, &tmpGroup), NULL, _OVER);
24!
3504
  } else {
3505
    TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg1, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
4!
3506
  }
3507

3508
  if (pDb->cfg.replications != newVg2.replica) {
28✔
3509
    SVgObj tmpGroup = {0};
24✔
3510
    TAOS_CHECK_GOTO(mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg2, pArray, &tmpGroup), NULL, _OVER);
24✔
3511
  } else {
3512
    TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg2, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
4!
3513
  }
3514

3515
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, pVgroup, SDB_STATUS_DROPPED, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
17!
3516

3517
  // commit db status
3518
  dbObj.vgVersion++;
17✔
3519
  dbObj.updateTime = taosGetTimestampMs();
17✔
3520
  TAOS_CHECK_GOTO(mndAddDbStatusAction(pTrans, &dbObj, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
17!
3521

3522
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
17!
3523
  code = 0;
17✔
3524

3525
_OVER:
29✔
3526
  taosArrayDestroy(pArray);
29✔
3527
  mndTransDrop(pTrans);
29✔
3528
  taosArrayDestroy(dbObj.cfg.pRetensions);
29✔
3529
  TAOS_RETURN(code);
29✔
3530
}
3531

3532
extern int32_t mndProcessSplitVgroupMsgImp(SRpcMsg *pReq);
3533

3534
static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) { return mndProcessSplitVgroupMsgImp(pReq); }
29✔
3535

3536
#ifndef TD_ENTERPRISE
3537
int32_t mndProcessSplitVgroupMsgImp(SRpcMsg *pReq) { return 0; }
3538
#endif
3539

3540
static int32_t mndSetBalanceVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
40✔
3541
                                              SDnodeObj *pSrc, SDnodeObj *pDst) {
3542
  int32_t code = 0;
40✔
3543
  SVgObj  newVg = {0};
40✔
3544
  memcpy(&newVg, pVgroup, sizeof(SVgObj));
40✔
3545
  mInfo("vgId:%d, vgroup info before balance, replica:%d", newVg.vgId, newVg.replica);
40!
3546
  for (int32_t i = 0; i < newVg.replica; ++i) {
120✔
3547
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
80!
3548
  }
3549

3550
  TAOS_CHECK_RETURN(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pDst->id));
40!
3551
  TAOS_CHECK_RETURN(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pSrc->id));
40!
3552

3553
  {
3554
    SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
40✔
3555
    if (pRaw == NULL) {
40!
3556
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3557
      if (terrno != 0) code = terrno;
×
3558
      TAOS_RETURN(code);
×
3559
    }
3560
    if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
40!
3561
      sdbFreeRaw(pRaw);
×
3562
      TAOS_RETURN(code);
×
3563
    }
3564
    code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
40✔
3565
    if (code != 0) {
40!
3566
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
3567
      TAOS_RETURN(code);
×
3568
    }
3569
  }
3570

3571
  mInfo("vgId:%d, vgroup info after balance, replica:%d", newVg.vgId, newVg.replica);
40!
3572
  for (int32_t i = 0; i < newVg.replica; ++i) {
120✔
3573
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
80!
3574
  }
3575
  TAOS_RETURN(code);
40✔
3576
}
3577

3578
static int32_t mndBalanceVgroupBetweenDnode(SMnode *pMnode, STrans *pTrans, SDnodeObj *pSrc, SDnodeObj *pDst,
40✔
3579
                                            SHashObj *pBalancedVgroups) {
3580
  void   *pIter = NULL;
40✔
3581
  int32_t code = -1;
40✔
3582
  SSdb   *pSdb = pMnode->pSdb;
40✔
3583

3584
  while (1) {
27✔
3585
    SVgObj *pVgroup = NULL;
67✔
3586
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
67✔
3587
    if (pIter == NULL) break;
67!
3588
    if (taosHashGet(pBalancedVgroups, &pVgroup->vgId, sizeof(int32_t)) != NULL) {
67✔
3589
      sdbRelease(pSdb, pVgroup);
26✔
3590
      continue;
27✔
3591
    }
3592

3593
    bool existInSrc = false;
41✔
3594
    bool existInDst = false;
41✔
3595
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
122✔
3596
      SVnodeGid *pGid = &pVgroup->vnodeGid[i];
81✔
3597
      if (pGid->dnodeId == pSrc->id) existInSrc = true;
81✔
3598
      if (pGid->dnodeId == pDst->id) existInDst = true;
81!
3599
    }
3600

3601
    if (!existInSrc || existInDst) {
41!
3602
      sdbRelease(pSdb, pVgroup);
1✔
3603
      continue;
1✔
3604
    }
3605

3606
    SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
40✔
3607
    if (pDb == NULL) {
40!
3608
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3609
      if (terrno != 0) code = terrno;
×
3610
      mError("vgId:%d, balance vgroup can't find db obj dbName:%s", pVgroup->vgId, pVgroup->dbName);
×
3611
      goto _OUT;
×
3612
    }
3613

3614
    if (pDb->cfg.withArbitrator) {
40!
3615
      mInfo("vgId:%d, db:%s, with arbitrator, balance vgroup not allowed", pVgroup->vgId, pVgroup->dbName);
×
3616
      goto _OUT;
×
3617
    }
3618

3619
    code = mndSetBalanceVgroupInfoToTrans(pMnode, pTrans, pDb, pVgroup, pSrc, pDst);
40✔
3620
    if (code == 0) {
40!
3621
      code = taosHashPut(pBalancedVgroups, &pVgroup->vgId, sizeof(int32_t), &pVgroup->vgId, sizeof(int32_t));
40✔
3622
    }
3623

3624
  _OUT:
×
3625
    mndReleaseDb(pMnode, pDb);
40✔
3626
    sdbRelease(pSdb, pVgroup);
40✔
3627
    sdbCancelFetch(pSdb, pIter);
40✔
3628
    break;
40✔
3629
  }
3630

3631
  return code;
40✔
3632
}
3633

3634
static int32_t mndBalanceVgroup(SMnode *pMnode, SRpcMsg *pReq, SArray *pArray) {
24✔
3635
  int32_t   code = -1;
24✔
3636
  int32_t   numOfVgroups = 0;
24✔
3637
  STrans   *pTrans = NULL;
24✔
3638
  SHashObj *pBalancedVgroups = NULL;
24✔
3639

3640
  pBalancedVgroups = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
24✔
3641
  if (pBalancedVgroups == NULL) goto _OVER;
24!
3642

3643
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "balance-vgroup");
24✔
3644
  if (pTrans == NULL) {
24!
3645
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3646
    if (terrno != 0) code = terrno;
×
3647
    goto _OVER;
×
3648
  }
3649
  mndTransSetSerial(pTrans);
24✔
3650
  mInfo("trans:%d, used to balance vgroup", pTrans->id);
24!
3651
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
24!
3652
  TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
24✔
3653

3654
  while (1) {
40✔
3655
    taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
63✔
3656
    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
276✔
3657
      SDnodeObj *pDnode = taosArrayGet(pArray, i);
213✔
3658
      mInfo("dnode:%d, equivalent vnodes:%d others:%d support:%d, score:%f", pDnode->id, pDnode->numOfVnodes,
213!
3659
            pDnode->numOfSupportVnodes, pDnode->numOfOtherNodes, mndGetDnodeScore(pDnode, 0, 1));
3660
    }
3661

3662
    SDnodeObj *pSrc = taosArrayGet(pArray, taosArrayGetSize(pArray) - 1);
63✔
3663
    SDnodeObj *pDst = taosArrayGet(pArray, 0);
63✔
3664

3665
    float srcScore = mndGetDnodeScore(pSrc, -1, 1);
63✔
3666
    float dstScore = mndGetDnodeScore(pDst, 1, 1);
63✔
3667
    mInfo("trans:%d, after balance, src dnode:%d score:%f, dst dnode:%d score:%f", pTrans->id, pSrc->id, dstScore,
63!
3668
          pDst->id, dstScore);
3669

3670
    if (srcScore > dstScore - 0.000001) {
63✔
3671
      code = mndBalanceVgroupBetweenDnode(pMnode, pTrans, pSrc, pDst, pBalancedVgroups);
40✔
3672
      if (code == 0) {
40!
3673
        pSrc->numOfVnodes--;
40✔
3674
        pDst->numOfVnodes++;
40✔
3675
        numOfVgroups++;
40✔
3676
        continue;
40✔
3677
      } else {
3678
        mInfo("trans:%d, no vgroup need to balance from dnode:%d to dnode:%d", pTrans->id, pSrc->id, pDst->id);
×
3679
        break;
×
3680
      }
3681
    } else {
3682
      mInfo("trans:%d, no vgroup need to balance any more", pTrans->id);
23!
3683
      break;
23✔
3684
    }
3685
  }
3686

3687
  if (numOfVgroups <= 0) {
23!
3688
    mInfo("no need to balance vgroup");
×
3689
    code = 0;
×
3690
  } else {
3691
    mInfo("start to balance vgroup, numOfVgroups:%d", numOfVgroups);
23!
3692
    if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
23!
3693
    code = TSDB_CODE_ACTION_IN_PROGRESS;
23✔
3694
  }
3695

3696
_OVER:
24✔
3697
  taosHashCleanup(pBalancedVgroups);
24✔
3698
  mndTransDrop(pTrans);
24✔
3699
  TAOS_RETURN(code);
24✔
3700
}
3701

3702
static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) {
29✔
3703
  SMnode *pMnode = pReq->info.node;
29✔
3704
  int32_t code = -1;
29✔
3705
  SArray *pArray = NULL;
29✔
3706
  void   *pIter = NULL;
29✔
3707
  int64_t curMs = taosGetTimestampMs();
29✔
3708

3709
  SBalanceVgroupReq req = {0};
29✔
3710
  if (tDeserializeSBalanceVgroupReq(pReq->pCont, pReq->contLen, &req) != 0) {
29!
3711
    code = TSDB_CODE_INVALID_MSG;
×
3712
    goto _OVER;
×
3713
  }
3714

3715
  mInfo("start to balance vgroup");
29!
3716
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_BALANCE_VGROUP)) != 0) {
29✔
3717
    goto _OVER;
1✔
3718
  }
3719

3720
  if (sdbGetSize(pMnode->pSdb, SDB_MOUNT) > 0) {
28!
3721
    code = TSDB_CODE_MND_MOUNT_NOT_EMPTY;
×
3722
    goto _OVER;
×
3723
  }
3724

3725
  while (1) {
86✔
3726
    SDnodeObj *pDnode = NULL;
114✔
3727
    pIter = sdbFetch(pMnode->pSdb, SDB_DNODE, pIter, (void **)&pDnode);
114✔
3728
    if (pIter == NULL) break;
114✔
3729
    if (!mndIsDnodeOnline(pDnode, curMs)) {
89✔
3730
      sdbCancelFetch(pMnode->pSdb, pIter);
3✔
3731
      code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
3✔
3732
      mError("failed to balance vgroup since %s, dnode:%d", terrstr(), pDnode->id);
3!
3733
      sdbRelease(pMnode->pSdb, pDnode);
3✔
3734
      goto _OVER;
3✔
3735
    }
3736

3737
    sdbRelease(pMnode->pSdb, pDnode);
86✔
3738
  }
3739

3740
  pArray = mndBuildDnodesArray(pMnode, 0, NULL);
25✔
3741
  if (pArray == NULL) {
25!
3742
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3743
    if (terrno != 0) code = terrno;
×
3744
    goto _OVER;
×
3745
  }
3746

3747
  if (taosArrayGetSize(pArray) < 2) {
25✔
3748
    mInfo("no need to balance vgroup since dnode num less than 2");
1!
3749
    code = 0;
1✔
3750
  } else {
3751
    code = mndBalanceVgroup(pMnode, pReq, pArray);
24✔
3752
  }
3753

3754
  auditRecord(pReq, pMnode->clusterId, "balanceVgroup", "", "", req.sql, req.sqlLen);
25✔
3755

3756
_OVER:
29✔
3757
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
29✔
3758
    mError("failed to balance vgroup since %s", tstrerror(code));
5!
3759
  }
3760

3761
  taosArrayDestroy(pArray);
29✔
3762
  tFreeSBalanceVgroupReq(&req);
29✔
3763
  TAOS_RETURN(code);
29✔
3764
}
3765

3766
bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; }
518,931!
3767

3768
bool mndVgroupInDnode(SVgObj *pVgroup, int32_t dnodeId) {
16✔
3769
  for (int i = 0; i < pVgroup->replica; i++) {
42✔
3770
    if (pVgroup->vnodeGid[i].dnodeId == dnodeId) return true;
36✔
3771
  }
3772
  return false;
6✔
3773
}
3774

3775
static void *mndBuildCompactVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen, int64_t compactTs,
18✔
3776
                                     STimeWindow tw, bool metaOnly) {
3777
  SCompactVnodeReq compactReq = {0};
18✔
3778
  compactReq.dbUid = pDb->uid;
18✔
3779
  compactReq.compactStartTime = compactTs;
18✔
3780
  compactReq.tw = tw;
18✔
3781
  compactReq.metaOnly = metaOnly;
18✔
3782
  tstrncpy(compactReq.db, pDb->name, TSDB_DB_FNAME_LEN);
18✔
3783

3784
  mInfo("vgId:%d, build compact vnode config req", pVgroup->vgId);
18!
3785
  int32_t contLen = tSerializeSCompactVnodeReq(NULL, 0, &compactReq);
18✔
3786
  if (contLen < 0) {
18!
3787
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
3788
    return NULL;
×
3789
  }
3790
  contLen += sizeof(SMsgHead);
18✔
3791

3792
  void *pReq = taosMemoryMalloc(contLen);
18!
3793
  if (pReq == NULL) {
18!
3794
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
3795
    return NULL;
×
3796
  }
3797

3798
  SMsgHead *pHead = pReq;
18✔
3799
  pHead->contLen = htonl(contLen);
18✔
3800
  pHead->vgId = htonl(pVgroup->vgId);
18✔
3801

3802
  if (tSerializeSCompactVnodeReq((char *)pReq + sizeof(SMsgHead), contLen, &compactReq) < 0) {
18!
3803
    taosMemoryFree(pReq);
×
3804
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
3805
    return NULL;
×
3806
  }
3807
  *pContLen = contLen;
18✔
3808
  return pReq;
18✔
3809
}
3810

3811
static int32_t mndAddCompactVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs,
18✔
3812
                                        STimeWindow tw, bool metaOnly) {
3813
  int32_t      code = 0;
18✔
3814
  STransAction action = {0};
18✔
3815
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
18✔
3816

3817
  int32_t contLen = 0;
18✔
3818
  void   *pReq = mndBuildCompactVnodeReq(pMnode, pDb, pVgroup, &contLen, compactTs, tw, metaOnly);
18✔
3819
  if (pReq == NULL) {
18!
3820
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3821
    if (terrno != 0) code = terrno;
×
3822
    TAOS_RETURN(code);
×
3823
  }
3824

3825
  action.pCont = pReq;
18✔
3826
  action.contLen = contLen;
18✔
3827
  action.msgType = TDMT_VND_COMPACT;
18✔
3828

3829
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
18!
3830
    taosMemoryFree(pReq);
×
3831
    TAOS_RETURN(code);
×
3832
  }
3833

3834
  TAOS_RETURN(code);
18✔
3835
}
3836

3837
int32_t mndBuildCompactVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs,
18✔
3838
                                    STimeWindow tw, bool metaOnly) {
3839
  TAOS_CHECK_RETURN(mndAddCompactVnodeAction(pMnode, pTrans, pDb, pVgroup, compactTs, tw, metaOnly));
18!
3840
  return 0;
18✔
3841
}
3842

3843
static void *mndBuildSsMigrateVgroupReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen, SSsMigrateObj* pMigrateObj) {
×
3844
  SSsMigrateVgroupReq req = {.ssMigrateId = pMigrateObj->id, .nodeId = 0, .timestamp = pMigrateObj->startTime };
×
3845

3846
  mInfo("vgId:%d, build ssmigrate vnode config req", pVgroup->vgId);
×
3847
  int32_t contLen = tSerializeSSsMigrateVgroupReq(NULL, 0, &req);
×
3848
  if (contLen < 0) {
×
3849
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
3850
    return NULL;
×
3851
  }
3852
  contLen += sizeof(SMsgHead);
×
3853

3854
  void *pReq = taosMemoryMalloc(contLen);
×
3855
  if (pReq == NULL) {
×
3856
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
3857
    return NULL;
×
3858
  }
3859

3860
  SMsgHead *pHead = pReq;
×
3861
  pHead->contLen = htonl(contLen);
×
3862
  pHead->vgId = htonl(pVgroup->vgId);
×
3863

3864
  if (tSerializeSSsMigrateVgroupReq((char *)pReq + sizeof(SMsgHead), contLen - sizeof(SMsgHead), &req) < 0) {
×
3865
    taosMemoryFree(pReq);
×
3866
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
3867
    return NULL;
×
3868
  }
3869
  *pContLen = contLen;
×
3870
  return pReq;
×
3871
}
3872

3873
static int32_t mndAddSsMigrateVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SSsMigrateObj* pMigrateObj) {
×
3874
  int32_t      code = 0;
×
3875
  STransAction action = {0};
×
3876
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
3877

3878
  int32_t contLen = 0;
×
3879
  void   *pReq = mndBuildSsMigrateVgroupReq(pMnode, pDb, pVgroup, &contLen, pMigrateObj);
×
3880
  if (pReq == NULL) {
×
3881
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3882
    if (terrno != 0) code = terrno;
×
3883
    TAOS_RETURN(code);
×
3884
  }
3885

3886
  action.pCont = pReq;
×
3887
  action.contLen = contLen;
×
3888
  action.msgType = TDMT_VND_SSMIGRATE;
×
3889

3890
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
3891
    taosMemoryFree(pReq);
×
3892
    TAOS_RETURN(code);
×
3893
  }
3894

3895
  TAOS_RETURN(code);
×
3896
}
3897

3898
int32_t mndBuildSsMigrateVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SSsMigrateObj* pMigrateObj) {
×
3899
  TAOS_CHECK_RETURN(mndAddSsMigrateVgroupAction(pMnode, pTrans, pDb, pVgroup, pMigrateObj));
×
3900
  return 0;
×
3901
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc