• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.11
/source/dnode/mnode/impl/src/mndVgroup.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "mndArbGroup.h"
19
#include "mndDb.h"
20
#include "mndDnode.h"
21
#include "mndMnode.h"
22
#include "mndPrivilege.h"
23
#include "mndShow.h"
24
#include "mndStb.h"
25
#include "mndStream.h"
26
#include "mndTopic.h"
27
#include "mndTrans.h"
28
#include "mndUser.h"
29
#include "mndVgroup.h"
30
#include "tmisce.h"
31

32
#define VGROUP_VER_NUMBER   1
33
#define VGROUP_RESERVE_SIZE 64
34

35
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup);
36
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup);
37
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew);
38
static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw);
39

40
static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
41
static void    mndCancelGetNextVgroup(SMnode *pMnode, void *pIter);
42
static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
43
static void    mndCancelGetNextVnode(SMnode *pMnode, void *pIter);
44

45
static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq);
46
static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq);
47
static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq);
48
static int32_t mndProcessVgroupBalanceLeaderMsg(SRpcMsg *pReq);
49

50
int32_t mndInitVgroup(SMnode *pMnode) {
1,748✔
51
  SSdbTable table = {
1,748✔
52
      .sdbType = SDB_VGROUP,
53
      .keyType = SDB_KEY_INT32,
54
      .encodeFp = (SdbEncodeFp)mndVgroupActionEncode,
55
      .decodeFp = (SdbDecodeFp)mndVgroupActionDecode,
56
      .insertFp = (SdbInsertFp)mndVgroupActionInsert,
57
      .updateFp = (SdbUpdateFp)mndVgroupActionUpdate,
58
      .deleteFp = (SdbDeleteFp)mndVgroupActionDelete,
59
      .validateFp = (SdbValidateFp)mndNewVgActionValidate,
60
  };
61

62
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndTransProcessRsp);
1,748✔
63
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_REPLICA_RSP, mndTransProcessRsp);
1,748✔
64
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIG_RSP, mndTransProcessRsp);
1,748✔
65
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIRM_RSP, mndTransProcessRsp);
1,748✔
66
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_HASHRANGE_RSP, mndTransProcessRsp);
1,748✔
67
  mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndTransProcessRsp);
1,748✔
68
  mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_RSP, mndTransProcessRsp);
1,748✔
69
  mndSetMsgHandle(pMnode, TDMT_VND_DISABLE_WRITE_RSP, mndTransProcessRsp);
1,748✔
70
  mndSetMsgHandle(pMnode, TDMT_SYNC_FORCE_FOLLOWER_RSP, mndTransProcessRsp);
1,748✔
71
  mndSetMsgHandle(pMnode, TDMT_DND_ALTER_VNODE_TYPE_RSP, mndTransProcessRsp);
1,748✔
72
  mndSetMsgHandle(pMnode, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP_RSP, mndTransProcessRsp);
1,748✔
73
  mndSetMsgHandle(pMnode, TDMT_SYNC_CONFIG_CHANGE_RSP, mndTransProcessRsp);
1,748✔
74

75
  mndSetMsgHandle(pMnode, TDMT_MND_REDISTRIBUTE_VGROUP, mndProcessRedistributeVgroupMsg);
1,748✔
76
  mndSetMsgHandle(pMnode, TDMT_MND_SPLIT_VGROUP, mndProcessSplitVgroupMsg);
1,748✔
77
  // mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessVgroupBalanceLeaderMsg);
78
  mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessBalanceVgroupMsg);
1,748✔
79
  mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP_LEADER, mndProcessVgroupBalanceLeaderMsg);
1,748✔
80

81
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndRetrieveVgroups);
1,748✔
82
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndCancelGetNextVgroup);
1,748✔
83
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndRetrieveVnodes);
1,748✔
84
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndCancelGetNextVnode);
1,748✔
85

86
  return sdbSetTable(pMnode->pSdb, table);
1,748✔
87
}
88

89
void mndCleanupVgroup(SMnode *pMnode) {}
1,747✔
90

91
SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) {
40,780✔
92
  int32_t code = 0;
40,780✔
93
  int32_t lino = 0;
40,780✔
94
  terrno = TSDB_CODE_OUT_OF_MEMORY;
40,780✔
95

96
  SSdbRaw *pRaw = sdbAllocRaw(SDB_VGROUP, VGROUP_VER_NUMBER, sizeof(SVgObj) + VGROUP_RESERVE_SIZE);
40,780✔
97
  if (pRaw == NULL) goto _OVER;
40,780!
98

99
  int32_t dataPos = 0;
40,780✔
100
  SDB_SET_INT32(pRaw, dataPos, pVgroup->vgId, _OVER)
40,780!
101
  SDB_SET_INT64(pRaw, dataPos, pVgroup->createdTime, _OVER)
40,780!
102
  SDB_SET_INT64(pRaw, dataPos, pVgroup->updateTime, _OVER)
40,780!
103
  SDB_SET_INT32(pRaw, dataPos, pVgroup->version, _OVER)
40,780!
104
  SDB_SET_INT32(pRaw, dataPos, pVgroup->hashBegin, _OVER)
40,780!
105
  SDB_SET_INT32(pRaw, dataPos, pVgroup->hashEnd, _OVER)
40,780!
106
  SDB_SET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, _OVER)
40,780!
107
  SDB_SET_INT64(pRaw, dataPos, pVgroup->dbUid, _OVER)
40,780!
108
  SDB_SET_INT8(pRaw, dataPos, pVgroup->isTsma, _OVER)
40,780!
109
  SDB_SET_INT8(pRaw, dataPos, pVgroup->replica, _OVER)
40,780!
110
  for (int8_t i = 0; i < pVgroup->replica; ++i) {
86,644✔
111
    SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
45,864✔
112
    SDB_SET_INT32(pRaw, dataPos, pVgid->dnodeId, _OVER)
45,864!
113
  }
114
  SDB_SET_INT32(pRaw, dataPos, pVgroup->syncConfChangeVer, _OVER)
40,780!
115
  SDB_SET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER)
40,780!
116
  SDB_SET_DATALEN(pRaw, dataPos, _OVER)
40,780!
117

118
  terrno = 0;
40,780✔
119

120
_OVER:
40,780✔
121
  if (terrno != 0) {
40,780!
122
    mError("vgId:%d, failed to encode to raw:%p since %s", pVgroup->vgId, pRaw, terrstr());
×
123
    sdbFreeRaw(pRaw);
×
124
    return NULL;
×
125
  }
126

127
  mTrace("vgId:%d, encode to raw:%p, row:%p", pVgroup->vgId, pRaw, pVgroup);
40,780✔
128
  return pRaw;
40,780✔
129
}
130

131
SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
34,141✔
132
  int32_t code = 0;
34,141✔
133
  int32_t lino = 0;
34,141✔
134
  terrno = TSDB_CODE_OUT_OF_MEMORY;
34,141✔
135
  SSdbRow *pRow = NULL;
34,141✔
136
  SVgObj  *pVgroup = NULL;
34,141✔
137

138
  int8_t sver = 0;
34,141✔
139
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
34,141!
140

141
  if (sver < 1 || sver > VGROUP_VER_NUMBER) {
34,141!
142
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
143
    goto _OVER;
×
144
  }
145

146
  pRow = sdbAllocRow(sizeof(SVgObj));
34,141✔
147
  if (pRow == NULL) goto _OVER;
34,141!
148

149
  pVgroup = sdbGetRowObj(pRow);
34,141✔
150
  if (pVgroup == NULL) goto _OVER;
34,141!
151

152
  int32_t dataPos = 0;
34,141✔
153
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->vgId, _OVER)
34,141!
154
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->createdTime, _OVER)
34,141!
155
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->updateTime, _OVER)
34,141!
156
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->version, _OVER)
34,141!
157
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashBegin, _OVER)
34,141!
158
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashEnd, _OVER)
34,141!
159
  SDB_GET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, _OVER)
34,141!
160
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->dbUid, _OVER)
34,141!
161
  SDB_GET_INT8(pRaw, dataPos, &pVgroup->isTsma, _OVER)
34,141!
162
  SDB_GET_INT8(pRaw, dataPos, &pVgroup->replica, _OVER)
34,141!
163
  for (int8_t i = 0; i < pVgroup->replica; ++i) {
74,170✔
164
    SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
40,029✔
165
    SDB_GET_INT32(pRaw, dataPos, &pVgid->dnodeId, _OVER)
40,029!
166
    if (pVgroup->replica == 1) {
40,029✔
167
      pVgid->syncState = TAOS_SYNC_STATE_LEADER;
31,168✔
168
    }
169
  }
170
  if (dataPos + sizeof(int32_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen) {
34,141!
171
    SDB_GET_INT32(pRaw, dataPos, &pVgroup->syncConfChangeVer, _OVER)
34,141!
172
  }
173

174
  SDB_GET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER)
34,141!
175

176
  terrno = 0;
34,141✔
177

178
_OVER:
34,141✔
179
  if (terrno != 0) {
34,141!
180
    mError("vgId:%d, failed to decode from raw:%p since %s", pVgroup == NULL ? 0 : pVgroup->vgId, pRaw, terrstr());
×
181
    taosMemoryFreeClear(pRow);
×
182
    return NULL;
×
183
  }
184

185
  mTrace("vgId:%d, decode from raw:%p, row:%p", pVgroup->vgId, pRaw, pVgroup);
34,141✔
186
  return pRow;
34,141✔
187
}
188

189
static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw) {
9,433✔
190
  SSdb    *pSdb = pMnode->pSdb;
9,433✔
191
  SSdbRow *pRow = NULL;
9,433✔
192
  SVgObj  *pVgroup = NULL;
9,433✔
193
  int      code = -1;
9,433✔
194

195
  pRow = mndVgroupActionDecode(pRaw);
9,433✔
196
  if (pRow == NULL) {
9,433!
197
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
198
    if (terrno != 0) code = terrno;
×
199
    goto _OVER;
×
200
  }
201
  pVgroup = sdbGetRowObj(pRow);
9,433✔
202
  if (pVgroup == NULL) {
9,433!
203
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
204
    if (terrno != 0) code = terrno;
×
205
    goto _OVER;
×
206
  }
207

208
  int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
9,433✔
209
  if (maxVgId > pVgroup->vgId) {
9,433!
210
    mError("trans:%d, vgroup id %d already in use. maxVgId:%d", pTrans->id, pVgroup->vgId, maxVgId);
×
211
    goto _OVER;
×
212
  }
213

214
  code = 0;
9,433✔
215
_OVER:
9,433✔
216
  if (pVgroup) mndVgroupActionDelete(pSdb, pVgroup);
9,433!
217
  taosMemoryFreeClear(pRow);
9,433!
218
  TAOS_RETURN(code);
9,433✔
219
}
220

221
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup) {
10,310✔
222
  mTrace("vgId:%d, perform insert action, row:%p", pVgroup->vgId, pVgroup);
10,310✔
223
  return 0;
10,310✔
224
}
225

226
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup) {
34,141✔
227
  mTrace("vgId:%d, perform delete action, row:%p", pVgroup->vgId, pVgroup);
34,141✔
228
  return 0;
34,141✔
229
}
230

231
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew) {
9,965✔
232
  mTrace("vgId:%d, perform update action, old row:%p new row:%p", pOld->vgId, pOld, pNew);
9,965✔
233
  pOld->updateTime = pNew->updateTime;
9,965✔
234
  pOld->version = pNew->version;
9,965✔
235
  pOld->hashBegin = pNew->hashBegin;
9,965✔
236
  pOld->hashEnd = pNew->hashEnd;
9,965✔
237
  pOld->replica = pNew->replica;
9,965✔
238
  pOld->isTsma = pNew->isTsma;
9,965✔
239
  for (int32_t i = 0; i < pNew->replica; ++i) {
22,265✔
240
    SVnodeGid *pNewGid = &pNew->vnodeGid[i];
12,300✔
241
    for (int32_t j = 0; j < pOld->replica; ++j) {
31,748✔
242
      SVnodeGid *pOldGid = &pOld->vnodeGid[j];
19,448✔
243
      if (pNewGid->dnodeId == pOldGid->dnodeId) {
19,448✔
244
        pNewGid->syncState = pOldGid->syncState;
11,913✔
245
        pNewGid->syncRestore = pOldGid->syncRestore;
11,913✔
246
        pNewGid->syncCanRead = pOldGid->syncCanRead;
11,913✔
247
        pNewGid->syncAppliedIndex = pOldGid->syncAppliedIndex;
11,913✔
248
        pNewGid->syncCommitIndex = pOldGid->syncCommitIndex;
11,913✔
249
      }
250
    }
251
  }
252
  pNew->numOfTables = pOld->numOfTables;
9,965✔
253
  pNew->numOfTimeSeries = pOld->numOfTimeSeries;
9,965✔
254
  pNew->totalStorage = pOld->totalStorage;
9,965✔
255
  pNew->compStorage = pOld->compStorage;
9,965✔
256
  pNew->pointsWritten = pOld->pointsWritten;
9,965✔
257
  pNew->compact = pOld->compact;
9,965✔
258
  memcpy(pOld->vnodeGid, pNew->vnodeGid, (TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA) * sizeof(SVnodeGid));
9,965✔
259
  pOld->syncConfChangeVer = pNew->syncConfChangeVer;
9,965✔
260
  return 0;
9,965✔
261
}
262

263
SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId) {
1,745,193✔
264
  SSdb   *pSdb = pMnode->pSdb;
1,745,193✔
265
  SVgObj *pVgroup = sdbAcquire(pSdb, SDB_VGROUP, &vgId);
1,745,193✔
266
  if (pVgroup == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
1,745,194✔
267
    terrno = TSDB_CODE_MND_VGROUP_NOT_EXIST;
538,474✔
268
  }
269
  return pVgroup;
1,745,194✔
270
}
271

272
void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) {
1,212,977✔
273
  SSdb *pSdb = pMnode->pSdb;
1,212,977✔
274
  sdbRelease(pSdb, pVgroup);
1,212,977✔
275
}
1,212,977✔
276

277
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
9,708✔
278
  SCreateVnodeReq createReq = {0};
9,708✔
279
  createReq.vgId = pVgroup->vgId;
9,708✔
280
  memcpy(createReq.db, pDb->name, TSDB_DB_FNAME_LEN);
9,708✔
281
  createReq.dbUid = pDb->uid;
9,708✔
282
  createReq.vgVersion = pVgroup->version;
9,708✔
283
  createReq.numOfStables = pDb->cfg.numOfStables;
9,708✔
284
  createReq.buffer = pDb->cfg.buffer;
9,708✔
285
  createReq.pageSize = pDb->cfg.pageSize;
9,708✔
286
  createReq.pages = pDb->cfg.pages;
9,708✔
287
  createReq.cacheLastSize = pDb->cfg.cacheLastSize;
9,708✔
288
  createReq.daysPerFile = pDb->cfg.daysPerFile;
9,708✔
289
  createReq.daysToKeep0 = pDb->cfg.daysToKeep0;
9,708✔
290
  createReq.daysToKeep1 = pDb->cfg.daysToKeep1;
9,708✔
291
  createReq.daysToKeep2 = pDb->cfg.daysToKeep2;
9,708✔
292
  createReq.keepTimeOffset = pDb->cfg.keepTimeOffset;
9,708✔
293
  createReq.s3ChunkSize = pDb->cfg.s3ChunkSize;
9,708✔
294
  createReq.s3KeepLocal = pDb->cfg.s3KeepLocal;
9,708✔
295
  createReq.s3Compact = pDb->cfg.s3Compact;
9,708✔
296
  createReq.minRows = pDb->cfg.minRows;
9,708✔
297
  createReq.maxRows = pDb->cfg.maxRows;
9,708✔
298
  createReq.walFsyncPeriod = pDb->cfg.walFsyncPeriod;
9,708✔
299
  createReq.walLevel = pDb->cfg.walLevel;
9,708✔
300
  createReq.precision = pDb->cfg.precision;
9,708✔
301
  createReq.compression = pDb->cfg.compression;
9,708✔
302
  createReq.strict = pDb->cfg.strict;
9,708✔
303
  createReq.cacheLast = pDb->cfg.cacheLast;
9,708✔
304
  createReq.replica = 0;
9,708✔
305
  createReq.learnerReplica = 0;
9,708✔
306
  createReq.selfIndex = -1;
9,708✔
307
  createReq.learnerSelfIndex = -1;
9,708✔
308
  createReq.hashBegin = pVgroup->hashBegin;
9,708✔
309
  createReq.hashEnd = pVgroup->hashEnd;
9,708✔
310
  createReq.hashMethod = pDb->cfg.hashMethod;
9,708✔
311
  createReq.numOfRetensions = pDb->cfg.numOfRetensions;
9,708✔
312
  createReq.pRetensions = pDb->cfg.pRetensions;
9,708✔
313
  createReq.isTsma = pVgroup->isTsma;
9,708✔
314
  createReq.pTsma = pVgroup->pTsma;
9,708✔
315
  createReq.walRetentionPeriod = pDb->cfg.walRetentionPeriod;
9,708✔
316
  createReq.walRetentionSize = pDb->cfg.walRetentionSize;
9,708✔
317
  createReq.walRollPeriod = pDb->cfg.walRollPeriod;
9,708✔
318
  createReq.walSegmentSize = pDb->cfg.walSegmentSize;
9,708✔
319
  createReq.sstTrigger = pDb->cfg.sstTrigger;
9,708✔
320
  createReq.hashPrefix = pDb->cfg.hashPrefix;
9,708✔
321
  createReq.hashSuffix = pDb->cfg.hashSuffix;
9,708✔
322
  createReq.tsdbPageSize = pDb->cfg.tsdbPageSize;
9,708✔
323
  createReq.changeVersion = ++(pVgroup->syncConfChangeVer);
9,708✔
324
  createReq.encryptAlgorithm = pDb->cfg.encryptAlgorithm;
9,708✔
325
  int32_t code = 0;
9,708✔
326

327
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
22,182✔
328
    SReplica *pReplica = NULL;
12,474✔
329

330
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
12,474✔
331
      pReplica = &createReq.replicas[createReq.replica];
12,296✔
332
    } else {
333
      pReplica = &createReq.learnerReplicas[createReq.learnerReplica];
178✔
334
    }
335

336
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
12,474✔
337
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
12,474✔
338
    if (pVgidDnode == NULL) {
12,474!
339
      return NULL;
×
340
    }
341

342
    pReplica->id = pVgidDnode->id;
12,474✔
343
    pReplica->port = pVgidDnode->port;
12,474✔
344
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
12,474✔
345
    mndReleaseDnode(pMnode, pVgidDnode);
12,474✔
346

347
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
12,474✔
348
      if (pDnode->id == pVgid->dnodeId) {
12,296✔
349
        createReq.selfIndex = createReq.replica;
9,530✔
350
      }
351
    } else {
352
      if (pDnode->id == pVgid->dnodeId) {
178!
353
        createReq.learnerSelfIndex = createReq.learnerReplica;
178✔
354
      }
355
    }
356

357
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
12,474✔
358
      createReq.replica++;
12,296✔
359
    } else {
360
      createReq.learnerReplica++;
178✔
361
    }
362
  }
363

364
  if (createReq.selfIndex == -1 && createReq.learnerSelfIndex == -1) {
9,708!
365
    terrno = TSDB_CODE_APP_ERROR;
×
366
    return NULL;
×
367
  }
368

369
  createReq.changeVersion = pVgroup->syncConfChangeVer;
9,708✔
370

371
  mInfo(
9,708!
372
      "vgId:%d, build create vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d "
373
      "changeVersion:%d",
374
      createReq.vgId, createReq.replica, createReq.selfIndex, createReq.learnerReplica, createReq.learnerSelfIndex,
375
      createReq.strict, createReq.changeVersion);
376
  for (int32_t i = 0; i < createReq.replica; ++i) {
22,004✔
377
    mInfo("vgId:%d, replica:%d ep:%s:%u", createReq.vgId, i, createReq.replicas[i].fqdn, createReq.replicas[i].port);
12,296!
378
  }
379
  for (int32_t i = 0; i < createReq.learnerReplica; ++i) {
9,886✔
380
    mInfo("vgId:%d, replica:%d ep:%s:%u", createReq.vgId, i, createReq.learnerReplicas[i].fqdn,
178!
381
          createReq.learnerReplicas[i].port);
382
  }
383

384
  int32_t contLen = tSerializeSCreateVnodeReq(NULL, 0, &createReq);
9,708✔
385
  if (contLen < 0) {
9,708!
386
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
387
    return NULL;
×
388
  }
389

390
  void *pReq = taosMemoryMalloc(contLen);
9,708!
391
  if (pReq == NULL) {
9,708!
392
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
393
    return NULL;
×
394
  }
395

396
  code = tSerializeSCreateVnodeReq(pReq, contLen, &createReq);
9,708✔
397
  if (code < 0) {
9,708!
398
    terrno = TSDB_CODE_APP_ERROR;
×
399
    taosMemoryFree(pReq);
×
400
    mError("vgId:%d, failed to serialize create vnode req,since %s", createReq.vgId, terrstr());
×
401
    return NULL;
×
402
  }
403
  *pContLen = contLen;
9,708✔
404
  return pReq;
9,708✔
405
}
406

407
static void *mndBuildAlterVnodeConfigReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
413✔
408
  SAlterVnodeConfigReq alterReq = {0};
413✔
409
  alterReq.vgVersion = pVgroup->version;
413✔
410
  alterReq.buffer = pDb->cfg.buffer;
413✔
411
  alterReq.pageSize = pDb->cfg.pageSize;
413✔
412
  alterReq.pages = pDb->cfg.pages;
413✔
413
  alterReq.cacheLastSize = pDb->cfg.cacheLastSize;
413✔
414
  alterReq.daysPerFile = pDb->cfg.daysPerFile;
413✔
415
  alterReq.daysToKeep0 = pDb->cfg.daysToKeep0;
413✔
416
  alterReq.daysToKeep1 = pDb->cfg.daysToKeep1;
413✔
417
  alterReq.daysToKeep2 = pDb->cfg.daysToKeep2;
413✔
418
  alterReq.keepTimeOffset = pDb->cfg.keepTimeOffset;
413✔
419
  alterReq.walFsyncPeriod = pDb->cfg.walFsyncPeriod;
413✔
420
  alterReq.walLevel = pDb->cfg.walLevel;
413✔
421
  alterReq.strict = pDb->cfg.strict;
413✔
422
  alterReq.cacheLast = pDb->cfg.cacheLast;
413✔
423
  alterReq.sttTrigger = pDb->cfg.sstTrigger;
413✔
424
  alterReq.minRows = pDb->cfg.minRows;
413✔
425
  alterReq.walRetentionPeriod = pDb->cfg.walRetentionPeriod;
413✔
426
  alterReq.walRetentionSize = pDb->cfg.walRetentionSize;
413✔
427
  alterReq.s3KeepLocal = pDb->cfg.s3KeepLocal;
413✔
428
  alterReq.s3Compact = pDb->cfg.s3Compact;
413✔
429

430
  mInfo("vgId:%d, build alter vnode config req", pVgroup->vgId);
413!
431
  int32_t contLen = tSerializeSAlterVnodeConfigReq(NULL, 0, &alterReq);
413✔
432
  if (contLen < 0) {
413!
433
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
434
    return NULL;
×
435
  }
436
  contLen += sizeof(SMsgHead);
413✔
437

438
  void *pReq = taosMemoryMalloc(contLen);
413!
439
  if (pReq == NULL) {
413!
440
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
441
    return NULL;
×
442
  }
443

444
  SMsgHead *pHead = pReq;
413✔
445
  pHead->contLen = htonl(contLen);
413✔
446
  pHead->vgId = htonl(pVgroup->vgId);
413✔
447

448
  if (tSerializeSAlterVnodeConfigReq((char *)pReq + sizeof(SMsgHead), contLen, &alterReq) < 0) {
413!
449
    taosMemoryFree(pReq);
×
450
    mError("vgId:%d, failed to serialize alter vnode config req,since %s", pVgroup->vgId, terrstr());
×
451
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
452
    return NULL;
×
453
  }
454
  *pContLen = contLen;
413✔
455
  return pReq;
413✔
456
}
457

458
static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId,
1,316✔
459
                                          int32_t *pContLen) {
460
  SAlterVnodeReplicaReq alterReq = {
1,316✔
461
      .vgId = pVgroup->vgId,
1,316✔
462
      .strict = pDb->cfg.strict,
1,316✔
463
      .replica = 0,
464
      .learnerReplica = 0,
465
      .selfIndex = -1,
466
      .learnerSelfIndex = -1,
467
      .changeVersion = ++(pVgroup->syncConfChangeVer),
1,316✔
468
  };
469

470
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
5,354✔
471
    SReplica *pReplica = NULL;
4,038✔
472

473
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
4,038✔
474
      pReplica = &alterReq.replicas[alterReq.replica];
3,726✔
475
      alterReq.replica++;
3,726✔
476
    } else {
477
      pReplica = &alterReq.learnerReplicas[alterReq.learnerReplica];
312✔
478
      alterReq.learnerReplica++;
312✔
479
    }
480

481
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
4,038✔
482
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
4,038✔
483
    if (pVgidDnode == NULL) return NULL;
4,038!
484

485
    pReplica->id = pVgidDnode->id;
4,038✔
486
    pReplica->port = pVgidDnode->port;
4,038✔
487
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
4,038✔
488
    mndReleaseDnode(pMnode, pVgidDnode);
4,038✔
489

490
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
4,038✔
491
      if (dnodeId == pVgid->dnodeId) {
3,726✔
492
        alterReq.selfIndex = v;
1,316✔
493
      }
494
    } else {
495
      if (dnodeId == pVgid->dnodeId) {
312!
496
        alterReq.learnerSelfIndex = v;
×
497
      }
498
    }
499
  }
500

501
  mInfo(
1,316!
502
      "vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d "
503
      "changeVersion:%d",
504
      alterReq.vgId, alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica, alterReq.learnerSelfIndex,
505
      alterReq.strict, alterReq.changeVersion);
506
  for (int32_t i = 0; i < alterReq.replica; ++i) {
5,042✔
507
    mInfo("vgId:%d, replica:%d ep:%s:%u", alterReq.vgId, i, alterReq.replicas[i].fqdn, alterReq.replicas[i].port);
3,726!
508
  }
509
  for (int32_t i = 0; i < alterReq.learnerReplica; ++i) {
1,628✔
510
    mInfo("vgId:%d, learnerReplica:%d ep:%s:%u", alterReq.vgId, i, alterReq.learnerReplicas[i].fqdn,
312!
511
          alterReq.learnerReplicas[i].port);
512
  }
513

514
  if (alterReq.selfIndex == -1 && alterReq.learnerSelfIndex == -1) {
1,316!
515
    terrno = TSDB_CODE_APP_ERROR;
×
516
    return NULL;
×
517
  }
518

519
  int32_t contLen = tSerializeSAlterVnodeReplicaReq(NULL, 0, &alterReq);
1,316✔
520
  if (contLen < 0) {
1,316!
521
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
522
    return NULL;
×
523
  }
524

525
  void *pReq = taosMemoryMalloc(contLen);
1,316!
526
  if (pReq == NULL) {
1,316!
527
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
528
    return NULL;
×
529
  }
530

531
  if (tSerializeSAlterVnodeReplicaReq(pReq, contLen, &alterReq) < 0) {
1,316!
532
    mError("vgId:%d, failed to serialize alter vnode req,since %s", alterReq.vgId, terrstr());
×
533
    taosMemoryFree(pReq);
×
534
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
535
    return NULL;
×
536
  }
537
  *pContLen = contLen;
1,316✔
538
  return pReq;
1,316✔
539
}
540

541
static void *mndBuildCheckLearnCatchupReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId,
×
542
                                          int32_t *pContLen) {
543
  SCheckLearnCatchupReq req = {
×
544
      .vgId = pVgroup->vgId,
×
545
      .strict = pDb->cfg.strict,
×
546
      .replica = 0,
547
      .learnerReplica = 0,
548
      .selfIndex = -1,
549
      .learnerSelfIndex = -1,
550
  };
551

552
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
×
553
    SReplica *pReplica = NULL;
×
554

555
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
556
      pReplica = &req.replicas[req.replica];
×
557
      req.replica++;
×
558
    } else {
559
      pReplica = &req.learnerReplicas[req.learnerReplica];
×
560
      req.learnerReplica++;
×
561
    }
562

563
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
×
564
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
×
565
    if (pVgidDnode == NULL) return NULL;
×
566

567
    pReplica->id = pVgidDnode->id;
×
568
    pReplica->port = pVgidDnode->port;
×
569
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
×
570
    mndReleaseDnode(pMnode, pVgidDnode);
×
571

572
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
573
      if (dnodeId == pVgid->dnodeId) {
×
574
        req.selfIndex = v;
×
575
      }
576
    } else {
577
      if (dnodeId == pVgid->dnodeId) {
×
578
        req.learnerSelfIndex = v;
×
579
      }
580
    }
581
  }
582

583
  mInfo("vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d",
×
584
        req.vgId, req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict);
585
  for (int32_t i = 0; i < req.replica; ++i) {
×
586
    mInfo("vgId:%d, replica:%d ep:%s:%u", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port);
×
587
  }
588
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
×
589
    mInfo("vgId:%d, learnerReplica:%d ep:%s:%u", req.vgId, i, req.learnerReplicas[i].fqdn, req.learnerReplicas[i].port);
×
590
  }
591

592
  if (req.selfIndex == -1 && req.learnerSelfIndex == -1) {
×
593
    terrno = TSDB_CODE_APP_ERROR;
×
594
    return NULL;
×
595
  }
596

597
  int32_t contLen = tSerializeSAlterVnodeReplicaReq(NULL, 0, &req);
×
598
  if (contLen < 0) {
×
599
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
600
    return NULL;
×
601
  }
602

603
  void *pReq = taosMemoryMalloc(contLen);
×
604
  if (pReq == NULL) {
×
605
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
606
    return NULL;
×
607
  }
608

609
  if (tSerializeSAlterVnodeReplicaReq(pReq, contLen, &req) < 0) {
×
610
    mError("vgId:%d, failed to serialize alter vnode req,since %s", req.vgId, terrstr());
×
611
    taosMemoryFree(pReq);
×
612
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
613
    return NULL;
×
614
  }
615
  *pContLen = contLen;
×
616
  return pReq;
×
617
}
618

619
static void *mndBuildDisableVnodeWriteReq(SMnode *pMnode, SDbObj *pDb, int32_t vgId, int32_t *pContLen) {
46✔
620
  SDisableVnodeWriteReq disableReq = {
46✔
621
      .vgId = vgId,
622
      .disable = 1,
623
  };
624

625
  mInfo("vgId:%d, build disable vnode write req", vgId);
46!
626
  int32_t contLen = tSerializeSDisableVnodeWriteReq(NULL, 0, &disableReq);
46✔
627
  if (contLen < 0) {
46!
628
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
629
    return NULL;
×
630
  }
631

632
  void *pReq = taosMemoryMalloc(contLen);
46!
633
  if (pReq == NULL) {
46!
634
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
635
    return NULL;
×
636
  }
637

638
  if (tSerializeSDisableVnodeWriteReq(pReq, contLen, &disableReq) < 0) {
46!
639
    mError("vgId:%d, failed to serialize disable vnode write req,since %s", vgId, terrstr());
×
640
    taosMemoryFree(pReq);
×
641
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
642
    return NULL;
×
643
  }
644
  *pContLen = contLen;
46✔
645
  return pReq;
46✔
646
}
647

648
static void *mndBuildAlterVnodeHashRangeReq(SMnode *pMnode, int32_t srcVgId, SVgObj *pVgroup, int32_t *pContLen) {
46✔
649
  SAlterVnodeHashRangeReq alterReq = {
46✔
650
      .srcVgId = srcVgId,
651
      .dstVgId = pVgroup->vgId,
46✔
652
      .hashBegin = pVgroup->hashBegin,
46✔
653
      .hashEnd = pVgroup->hashEnd,
46✔
654
      .changeVersion = ++(pVgroup->syncConfChangeVer),
46✔
655
  };
656

657
  mInfo("vgId:%d, build alter vnode hashrange req, dstVgId:%d, hashrange:[%u, %u]", srcVgId, pVgroup->vgId,
46!
658
        pVgroup->hashBegin, pVgroup->hashEnd);
659
  int32_t contLen = tSerializeSAlterVnodeHashRangeReq(NULL, 0, &alterReq);
46✔
660
  if (contLen < 0) {
46!
661
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
662
    return NULL;
×
663
  }
664

665
  void *pReq = taosMemoryMalloc(contLen);
46!
666
  if (pReq == NULL) {
46!
667
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
668
    return NULL;
×
669
  }
670

671
  if (tSerializeSAlterVnodeHashRangeReq(pReq, contLen, &alterReq) < 0) {
46!
672
    mError("vgId:%d, failed to serialize alter vnode hashrange req,since %s", srcVgId, terrstr());
×
673
    taosMemoryFree(pReq);
×
674
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
675
    return NULL;
×
676
  }
677
  *pContLen = contLen;
46✔
678
  return pReq;
46✔
679
}
680

681
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
14,094✔
682
  SDropVnodeReq dropReq = {0};
14,094✔
683
  dropReq.dnodeId = pDnode->id;
14,094✔
684
  dropReq.vgId = pVgroup->vgId;
14,094✔
685
  memcpy(dropReq.db, pDb->name, TSDB_DB_FNAME_LEN);
14,094✔
686
  dropReq.dbUid = pDb->uid;
14,094✔
687

688
  mInfo("vgId:%d, build drop vnode req", dropReq.vgId);
14,094!
689
  int32_t contLen = tSerializeSDropVnodeReq(NULL, 0, &dropReq);
14,094✔
690
  if (contLen < 0) {
14,094!
691
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
692
    return NULL;
×
693
  }
694

695
  void *pReq = taosMemoryMalloc(contLen);
14,094!
696
  if (pReq == NULL) {
14,094!
697
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
698
    return NULL;
×
699
  }
700

701
  if (tSerializeSDropVnodeReq(pReq, contLen, &dropReq) < 0) {
14,094!
702
    mError("vgId:%d, failed to serialize drop vnode req,since %s", dropReq.vgId, terrstr());
×
703
    taosMemoryFree(pReq);
×
704
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
705
    return NULL;
×
706
  }
707
  *pContLen = contLen;
14,094✔
708
  return pReq;
14,094✔
709
}
710

711
static bool mndResetDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
5,976✔
712
  SDnodeObj *pDnode = pObj;
5,976✔
713
  pDnode->numOfVnodes = 0;
5,976✔
714
  pDnode->numOfOtherNodes = 0;
5,976✔
715
  return true;
5,976✔
716
}
717

718
static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
5,976✔
719
  SDnodeObj *pDnode = pObj;
5,976✔
720
  SArray    *pArray = p1;
5,976✔
721
  int32_t    exceptDnodeId = *(int32_t *)p2;
5,976✔
722
  SArray    *dnodeList = p3;
5,976✔
723

724
  if (exceptDnodeId == pDnode->id) {
5,976✔
725
    return true;
16✔
726
  }
727

728
  if (dnodeList != NULL) {
5,960✔
729
    int32_t dnodeListSize = taosArrayGetSize(dnodeList);
5,560✔
730
    if (dnodeListSize > 0) {
5,560✔
731
      bool inDnodeList = false;
95✔
732
      for (int32_t index = 0; index < dnodeListSize; ++index) {
310✔
733
        int32_t dnodeId = *(int32_t *)taosArrayGet(dnodeList, index);
215✔
734
        if (pDnode->id == dnodeId) {
215✔
735
          inDnodeList = true;
43✔
736
        }
737
      }
738
      if (!inDnodeList) {
95✔
739
        return true;
52✔
740
      }
741
    }
742
  }
743

744
  int64_t curMs = taosGetTimestampMs();
5,908✔
745
  bool    online = mndIsDnodeOnline(pDnode, curMs);
5,908✔
746
  bool    isMnode = mndIsMnode(pMnode, pDnode->id);
5,908✔
747
  pDnode->numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
5,908✔
748
  pDnode->memUsed = mndGetVnodesMemory(pMnode, pDnode->id);
5,908✔
749

750
  mInfo("dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d online:%d memory avail:%" PRId64 " used:%" PRId64, pDnode->id,
5,908!
751
        pDnode->numOfVnodes, pDnode->numOfSupportVnodes, isMnode, online, pDnode->memAvail, pDnode->memUsed);
752

753
  if (isMnode) {
5,908✔
754
    pDnode->numOfOtherNodes++;
4,886✔
755
  }
756

757
  if (online && pDnode->numOfSupportVnodes > 0) {
5,908✔
758
    if (taosArrayPush(pArray, pDnode) == NULL) return false;
5,269!
759
  }
760
  return true;
5,908✔
761
}
762

763
SArray *mndBuildDnodesArray(SMnode *pMnode, int32_t exceptDnodeId, SArray *dnodeList) {
4,599✔
764
  SSdb   *pSdb = pMnode->pSdb;
4,599✔
765
  int32_t numOfDnodes = mndGetDnodeSize(pMnode);
4,599✔
766

767
  SArray *pArray = taosArrayInit(numOfDnodes, sizeof(SDnodeObj));
4,599✔
768
  if (pArray == NULL) {
4,599!
769
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
770
    return NULL;
×
771
  }
772

773
  sdbTraverse(pSdb, SDB_DNODE, mndResetDnodesArrayFp, NULL, NULL, NULL);
4,599✔
774
  sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesArrayFp, pArray, &exceptDnodeId, dnodeList);
4,599✔
775

776
  mDebug("build %d dnodes array", (int32_t)taosArrayGetSize(pArray));
4,599✔
777
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
9,868✔
778
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
5,269✔
779
    mDebug("dnode:%d, vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes);
5,269✔
780
  }
781
  return pArray;
4,599✔
782
}
783

784
static int32_t mndCompareDnodeId(int32_t *dnode1Id, int32_t *dnode2Id) {
×
785
  if (*dnode1Id == *dnode2Id) {
×
786
    return 0;
×
787
  }
788
  return *dnode1Id > *dnode2Id ? 1 : -1;
×
789
}
790

791
static float mndGetDnodeScore(SDnodeObj *pDnode, int32_t additionDnodes, float ratio) {
18,769✔
792
  float totalDnodes = pDnode->numOfVnodes + (float)pDnode->numOfOtherNodes * ratio + additionDnodes;
18,769✔
793
  return totalDnodes / pDnode->numOfSupportVnodes;
18,769✔
794
}
795

796
static int32_t mndCompareDnodeVnodes(SDnodeObj *pDnode1, SDnodeObj *pDnode2) {
4,674✔
797
  float d1Score = mndGetDnodeScore(pDnode1, 0, 0.9);
4,674✔
798
  float d2Score = mndGetDnodeScore(pDnode2, 0, 0.9);
4,674✔
799
  if (d1Score == d2Score) {
4,674✔
800
    return 0;
1,719✔
801
  }
802
  return d1Score > d2Score ? 1 : -1;
2,955✔
803
}
804

805
void mndSortVnodeGid(SVgObj *pVgroup) {
8,819✔
806
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
18,569✔
807
    for (int32_t j = 0; j < pVgroup->replica - 1 - i; ++j) {
11,143✔
808
      if (pVgroup->vnodeGid[j].dnodeId > pVgroup->vnodeGid[j + 1].dnodeId) {
1,393✔
809
        TSWAP(pVgroup->vnodeGid[j], pVgroup->vnodeGid[j + 1]);
457✔
810
      }
811
    }
812
  }
813
}
8,819✔
814

815
static int32_t mndGetAvailableDnode(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, SArray *pArray) {
9,250✔
816
  mDebug("start to sort %d dnodes", (int32_t)taosArrayGetSize(pArray));
9,250✔
817
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
9,250✔
818
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
20,752✔
819
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
11,502✔
820
    mDebug("dnode:%d, score:%f", pDnode->id, mndGetDnodeScore(pDnode, 0, 0.9));
11,502✔
821
  }
822

823
  int32_t size = taosArrayGetSize(pArray);
9,250✔
824
  if (size < pVgroup->replica) {
9,250✔
825
    mError("db:%s, vgId:%d, no enough online dnodes:%d to alloc %d replica", pVgroup->dbName, pVgroup->vgId, size,
476!
826
           pVgroup->replica);
827
    TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_DNODES);
476✔
828
  }
829

830
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
18,397✔
831
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
9,623✔
832
    SDnodeObj *pDnode = taosArrayGet(pArray, v);
9,623✔
833
    if (pDnode == NULL) {
9,623!
834
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_DNODES);
×
835
    }
836
    if (pDnode->numOfVnodes >= pDnode->numOfSupportVnodes) {
9,623!
837
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_VNODES);
×
838
    }
839

840
    int64_t vgMem = mndGetVgroupMemory(pMnode, pDb, pVgroup);
9,623✔
841
    if (pDnode->memAvail - vgMem - pDnode->memUsed <= 0) {
9,623!
842
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d, avail:%" PRId64 " used:%" PRId64,
×
843
             pVgroup->dbName, pVgroup->vgId, vgMem, pDnode->id, pDnode->memAvail, pDnode->memUsed);
844
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE);
×
845
    } else {
846
      pDnode->memUsed += vgMem;
9,623✔
847
    }
848

849
    pVgid->dnodeId = pDnode->id;
9,623✔
850
    if (pVgroup->replica == 1) {
9,623✔
851
      pVgid->syncState = TAOS_SYNC_STATE_LEADER;
8,346✔
852
    } else {
853
      pVgid->syncState = TAOS_SYNC_STATE_FOLLOWER;
1,277✔
854
    }
855

856
    mInfo("db:%s, vgId:%d, vn:%d is alloced, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
9,623!
857
          pVgroup->dbName, pVgroup->vgId, v, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
858
    pDnode->numOfVnodes++;
9,623✔
859
  }
860

861
  mndSortVnodeGid(pVgroup);
8,774✔
862
  return 0;
8,774✔
863
}
864

UNCOV
865
int32_t mndAllocSmaVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup) {
×
UNCOV
866
  int32_t code = 0;
×
UNCOV
867
  SArray *pArray = mndBuildDnodesArray(pMnode, 0, NULL);
×
UNCOV
868
  if (pArray == NULL) {
×
869
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
870
    if (terrno != 0) code = terrno;
×
871
    TAOS_RETURN(code);
×
872
  }
873

UNCOV
874
  pVgroup->vgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
×
UNCOV
875
  pVgroup->isTsma = 1;
×
UNCOV
876
  pVgroup->createdTime = taosGetTimestampMs();
×
UNCOV
877
  pVgroup->updateTime = pVgroup->createdTime;
×
UNCOV
878
  pVgroup->version = 1;
×
UNCOV
879
  memcpy(pVgroup->dbName, pDb->name, TSDB_DB_FNAME_LEN);
×
UNCOV
880
  pVgroup->dbUid = pDb->uid;
×
UNCOV
881
  pVgroup->replica = 1;
×
882

UNCOV
883
  if (mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray) != 0) return -1;
×
UNCOV
884
  taosArrayDestroy(pArray);
×
885

UNCOV
886
  mInfo("db:%s, sma vgId:%d is alloced", pDb->name, pVgroup->vgId);
×
UNCOV
887
  return 0;
×
888
}
889

890
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups, SArray *dnodeList) {
4,372✔
891
  int32_t code = -1;
4,372✔
892
  SArray *pArray = NULL;
4,372✔
893
  SVgObj *pVgroups = NULL;
4,372✔
894

895
  pVgroups = taosMemoryCalloc(pDb->cfg.numOfVgroups, sizeof(SVgObj));
4,372!
896
  if (pVgroups == NULL) {
4,372!
897
    code = terrno;
×
898
    goto _OVER;
×
899
  }
900

901
  pArray = mndBuildDnodesArray(pMnode, 0, dnodeList);
4,372✔
902
  if (pArray == NULL) {
4,372!
903
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
904
    if (terrno != 0) code = terrno;
×
905
    goto _OVER;
×
906
  }
907

908
  mInfo("db:%s, total %d dnodes used to create %d vgroups (%d vnodes)", pDb->name, (int32_t)taosArrayGetSize(pArray),
4,372!
909
        pDb->cfg.numOfVgroups, pDb->cfg.numOfVgroups * pDb->cfg.replications);
910

911
  int32_t  allocedVgroups = 0;
4,372✔
912
  int32_t  maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
4,372✔
913
  uint32_t hashMin = 0;
4,372✔
914
  uint32_t hashMax = UINT32_MAX;
4,372✔
915
  uint32_t hashInterval = (hashMax - hashMin) / pDb->cfg.numOfVgroups;
4,372✔
916

917
  if (maxVgId < 2) maxVgId = 2;
4,372✔
918

919
  for (uint32_t v = 0; v < pDb->cfg.numOfVgroups; v++) {
13,146✔
920
    SVgObj *pVgroup = &pVgroups[v];
9,250✔
921
    pVgroup->vgId = maxVgId++;
9,250✔
922
    pVgroup->createdTime = taosGetTimestampMs();
9,250✔
923
    pVgroup->updateTime = pVgroups->createdTime;
9,250✔
924
    pVgroup->version = 1;
9,250✔
925
    pVgroup->hashBegin = hashMin + hashInterval * v;
9,250✔
926
    if (v == pDb->cfg.numOfVgroups - 1) {
9,250✔
927
      pVgroup->hashEnd = hashMax;
4,014✔
928
    } else {
929
      pVgroup->hashEnd = hashMin + hashInterval * (v + 1) - 1;
5,236✔
930
    }
931

932
    memcpy(pVgroup->dbName, pDb->name, TSDB_DB_FNAME_LEN);
9,250✔
933
    pVgroup->dbUid = pDb->uid;
9,250✔
934
    pVgroup->replica = pDb->cfg.replications;
9,250✔
935

936
    if ((code = mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray)) != 0) {
9,250✔
937
      goto _OVER;
476✔
938
    }
939

940
    allocedVgroups++;
8,774✔
941
  }
942

943
  *ppVgroups = pVgroups;
3,896✔
944
  code = 0;
3,896✔
945

946
  mInfo("db:%s, total %d vgroups is alloced, replica:%d", pDb->name, pDb->cfg.numOfVgroups, pDb->cfg.replications);
3,896!
947

948
_OVER:
×
949
  if (code != 0) taosMemoryFree(pVgroups);
4,372!
950
  taosArrayDestroy(pArray);
4,372✔
951
  TAOS_RETURN(code);
4,372✔
952
}
953

954
SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup) {
537,020✔
955
  SEpSet epset = {0};
537,020✔
956

957
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
1,118,226✔
958
    const SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
581,206✔
959
    SDnodeObj       *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
581,206✔
960
    if (pDnode == NULL) continue;
581,206✔
961

962
    if (pVgid->syncState == TAOS_SYNC_STATE_LEADER || pVgid->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
581,061!
963
      epset.inUse = epset.numOfEps;
532,006✔
964
    }
965

966
    if (addEpIntoEpSet(&epset, pDnode->fqdn, pDnode->port) != 0) {
581,061!
967
      mWarn("vgId:%d, failed to add ep:%s:%d into epset", pVgroup->vgId, pDnode->fqdn, pDnode->port);
×
968
    }
969
    mndReleaseDnode(pMnode, pDnode);
581,061✔
970
  }
971
  epsetSort(&epset);
537,020✔
972

973
  return epset;
537,020✔
974
}
975

976
SEpSet mndGetVgroupEpsetById(SMnode *pMnode, int32_t vgId) {
16✔
977
  SEpSet epset = {0};
16✔
978

979
  SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
16✔
980
  if (!pVgroup) return epset;
16!
981

982
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
48✔
983
    const SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
32✔
984
    SDnodeObj       *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
32✔
985
    if (pDnode == NULL) continue;
32!
986

987
    if (pVgid->syncState == TAOS_SYNC_STATE_LEADER || pVgid->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
32!
988
      epset.inUse = epset.numOfEps;
4✔
989
    }
990

991
    if (addEpIntoEpSet(&epset, pDnode->fqdn, pDnode->port) != 0) {
32!
992
      mWarn("vgId:%d, failed to add ep:%s:%d into epset", pVgroup->vgId, pDnode->fqdn, pDnode->port);
×
993
    }
994
    mndReleaseDnode(pMnode, pDnode);
32✔
995
  }
996

997
  mndReleaseVgroup(pMnode, pVgroup);
16✔
998
  return epset;
16✔
999
}
1000

1001
static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
13,519✔
1002
  SMnode *pMnode = pReq->info.node;
13,519✔
1003
  SSdb   *pSdb = pMnode->pSdb;
13,519✔
1004
  int32_t numOfRows = 0;
13,519✔
1005
  SVgObj *pVgroup = NULL;
13,519✔
1006
  int32_t cols = 0;
13,519✔
1007
  int64_t curMs = taosGetTimestampMs();
13,519✔
1008
  int32_t code = 0;
13,519✔
1009

1010
  SDbObj *pDb = NULL;
13,519✔
1011
  if (strlen(pShow->db) > 0) {
13,519✔
1012
    pDb = mndAcquireDb(pMnode, pShow->db);
523✔
1013
    if (pDb == NULL) {
523!
1014
      return 0;
×
1015
    }
1016
  }
1017

1018
  while (numOfRows < rows) {
676,619✔
1019
    pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
676,606✔
1020
    if (pShow->pIter == NULL) break;
676,506✔
1021

1022
    if (pDb != NULL && pVgroup->dbUid != pDb->uid) {
662,982✔
1023
      sdbRelease(pSdb, pVgroup);
136✔
1024
      continue;
136✔
1025
    }
1026

1027
    cols = 0;
662,846✔
1028
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
662,846✔
1029
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->vgId, false);
662,532✔
1030
    if (code != 0) {
661,583!
1031
      mError("vgId:%d, failed to set vgId, since %s", pVgroup->vgId, tstrerror(code));
×
1032
      return code;
×
1033
    }
1034

1035
    SName name = {0};
661,583✔
1036
    char  db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
661,583✔
1037
    code = tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB);
661,583✔
1038
    if (code != 0) {
662,800!
1039
      mError("vgId:%d, failed to set dbName, since %s", pVgroup->vgId, tstrerror(code));
×
1040
      return code;
×
1041
    }
1042
    (void)tNameGetDbName(&name, varDataVal(db));
662,800✔
1043
    varDataSetLen(db, strlen(varDataVal(db)));
662,768✔
1044

1045
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
662,768✔
1046
    code = colDataSetVal(pColInfo, numOfRows, (const char *)db, false);
662,384✔
1047
    if (code != 0) {
660,929!
1048
      mError("vgId:%d, failed to set dbName, since %s", pVgroup->vgId, tstrerror(code));
×
1049
      return code;
×
1050
    }
1051

1052
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
660,929✔
1053
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->numOfTables, false);
660,585✔
1054
    if (code != 0) {
660,006!
1055
      mError("vgId:%d, failed to set numOfTables, since %s", pVgroup->vgId, tstrerror(code));
×
1056
      return code;
×
1057
    }
1058

1059
    // default 3 replica, add 1 replica if move vnode
1060
    for (int32_t i = 0; i < 4; ++i) {
3,291,076✔
1061
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,611,326✔
1062
      if (i < pVgroup->replica) {
2,609,556✔
1063
        int16_t dnodeId = (int16_t)pVgroup->vnodeGid[i].dnodeId;
637,572✔
1064
        code = colDataSetVal(pColInfo, numOfRows, (const char *)&dnodeId, false);
637,572✔
1065
        if (code != 0) {
661,410!
1066
          mError("vgId:%d, failed to set dnodeId, since %s", pVgroup->vgId, tstrerror(code));
×
1067
          return code;
×
1068
        }
1069

1070
        bool       exist = false;
661,410✔
1071
        bool       online = false;
661,410✔
1072
        SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgroup->vnodeGid[i].dnodeId);
661,410✔
1073
        if (pDnode != NULL) {
664,061!
1074
          exist = true;
664,062✔
1075
          online = mndIsDnodeOnline(pDnode, curMs);
664,062✔
1076
          mndReleaseDnode(pMnode, pDnode);
663,964✔
1077
        }
1078

1079
        char buf1[20] = {0};
664,260✔
1080
        char role[20] = "offline";
664,260✔
1081
        if (!exist) {
664,260!
1082
          tstrncpy(role, "dropping", sizeof(role));
×
1083
        } else if (online) {
664,260✔
1084
          char *star = "";
664,123✔
1085
          if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEADER ||
664,123✔
1086
              pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
1,364!
1087
            if (!pVgroup->vnodeGid[i].syncRestore && !pVgroup->vnodeGid[i].syncCanRead) {
662,759!
1088
              star = "**";
302✔
1089
            } else if (!pVgroup->vnodeGid[i].syncRestore && pVgroup->vnodeGid[i].syncCanRead) {
662,457!
1090
              star = "*";
×
1091
            } else {
1092
            }
1093
          }
1094
          snprintf(role, sizeof(role), "%s%s", syncStr(pVgroup->vnodeGid[i].syncState), star);
664,123✔
1095
          /*
1096
          mInfo("db:%s, learner progress:%d", pDb->name, pVgroup->vnodeGid[i].learnerProgress);
1097

1098
          if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEARNER) {
1099
            if(pVgroup->vnodeGid[i].learnerProgress < 0){
1100
              snprintf(role, sizeof(role), "%s-",
1101
                syncStr(pVgroup->vnodeGid[i].syncState));
1102

1103
            }
1104
            else if(pVgroup->vnodeGid[i].learnerProgress >= 100){
1105
              snprintf(role, sizeof(role), "%s--",
1106
                syncStr(pVgroup->vnodeGid[i].syncState));
1107
            }
1108
            else{
1109
              snprintf(role, sizeof(role), "%s%d",
1110
                syncStr(pVgroup->vnodeGid[i].syncState), pVgroup->vnodeGid[i].learnerProgress);
1111
            }
1112
          }
1113
          else{
1114
            snprintf(role, sizeof(role), "%s%s", syncStr(pVgroup->vnodeGid[i].syncState), star);
1115
          }
1116
          */
1117
        } else {
1118
        }
1119
        STR_WITH_MAXSIZE_TO_VARSTR(buf1, role, pShow->pMeta->pSchemas[cols].bytes);
664,211✔
1120

1121
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
664,211✔
1122
        code = colDataSetVal(pColInfo, numOfRows, (const char *)buf1, false);
663,750✔
1123
        if (code != 0) {
662,321!
1124
          mError("vgId:%d, failed to set role, since %s", pVgroup->vgId, tstrerror(code));
×
1125
          return code;
×
1126
        }
1127

1128
        char applyStr[TSDB_SYNC_APPLY_COMMIT_LEN + 1] = {0};
662,321✔
1129
        char buf[TSDB_SYNC_APPLY_COMMIT_LEN + VARSTR_HEADER_SIZE + 1] = {0};
662,321✔
1130
        snprintf(applyStr, sizeof(applyStr), "%" PRId64 "/%" PRId64, pVgroup->vnodeGid[i].syncAppliedIndex,
662,321✔
1131
                 pVgroup->vnodeGid[i].syncCommitIndex);
662,321✔
1132
        STR_WITH_MAXSIZE_TO_VARSTR(buf, applyStr, pShow->pMeta->pSchemas[cols].bytes);
662,321✔
1133

1134
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
662,321✔
1135
        code = colDataSetVal(pColInfo, numOfRows, (const char *)&buf, false);
663,492✔
1136
        if (code != 0) {
661,989!
1137
          mError("vgId:%d, failed to set role, since %s", pVgroup->vgId, tstrerror(code));
×
1138
          return code;
×
1139
        }
1140
      } else {
1141
        colDataSetNULL(pColInfo, numOfRows);
1,971,984✔
1142
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,971,984✔
1143
        colDataSetNULL(pColInfo, numOfRows);
1,969,918!
1144
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,969,918✔
1145
        colDataSetNULL(pColInfo, numOfRows);
1,969,081!
1146
      }
1147
    }
1148

1149
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
679,750✔
1150
    int32_t cacheUsage = (int32_t)pVgroup->cacheUsage;
660,685✔
1151
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&cacheUsage, false);
660,685✔
1152
    if (code != 0) {
660,148!
1153
      mError("vgId:%d, failed to set cacheUsage, since %s", pVgroup->vgId, tstrerror(code));
×
1154
      return code;
×
1155
    }
1156

1157
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
660,148✔
1158
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->numOfCachedTables, false);
659,620✔
1159
    if (code != 0) {
659,381!
1160
      mError("vgId:%d, failed to set numOfCachedTables, since %s", pVgroup->vgId, tstrerror(code));
×
1161
      return code;
×
1162
    }
1163

1164
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
659,381✔
1165
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->isTsma, false);
658,928✔
1166
    if (code != 0) {
659,218!
1167
      mError("vgId:%d, failed to set isTsma, since %s", pVgroup->vgId, tstrerror(code));
×
1168
      return code;
×
1169
    }
1170
    numOfRows++;
659,218✔
1171
    sdbRelease(pSdb, pVgroup);
659,218✔
1172
  }
1173

1174
  if (pDb != NULL) {
13,537✔
1175
    mndReleaseDb(pMnode, pDb);
523✔
1176
  }
1177

1178
  pShow->numOfRows += numOfRows;
13,524✔
1179
  return numOfRows;
13,524✔
1180
}
1181

1182
static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter) {
×
1183
  SSdb *pSdb = pMnode->pSdb;
×
1184
  sdbCancelFetchByType(pSdb, pIter, SDB_VGROUP);
×
1185
}
×
1186

1187
static bool mndGetVnodesNumFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
338,443✔
1188
  SVgObj  *pVgroup = pObj;
338,443✔
1189
  int32_t  dnodeId = *(int32_t *)p1;
338,443✔
1190
  int32_t *pNumOfVnodes = (int32_t *)p2;
338,443✔
1191

1192
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
689,069✔
1193
    if (pVgroup->vnodeGid[v].dnodeId == dnodeId) {
350,626✔
1194
      (*pNumOfVnodes)++;
329,565✔
1195
    }
1196
  }
1197

1198
  return true;
338,443✔
1199
}
1200

1201
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) {
15,681✔
1202
  int32_t numOfVnodes = 0;
15,681✔
1203
  sdbTraverse(pMnode->pSdb, SDB_VGROUP, mndGetVnodesNumFp, &dnodeId, &numOfVnodes, NULL);
15,681✔
1204
  return numOfVnodes;
15,692✔
1205
}
1206

1207
int64_t mndGetVgroupMemory(SMnode *pMnode, SDbObj *pDbInput, SVgObj *pVgroup) {
35,919✔
1208
  SDbObj *pDb = pDbInput;
35,919✔
1209
  if (pDbInput == NULL) {
35,919✔
1210
    pDb = mndAcquireDb(pMnode, pVgroup->dbName);
25,054✔
1211
  }
1212

1213
  int64_t vgroupMemroy = 0;
35,919✔
1214
  if (pDb != NULL) {
35,919✔
1215
    int64_t buffer = (int64_t)pDb->cfg.buffer * 1024 * 1024;
35,917✔
1216
    int64_t cache = (int64_t)pDb->cfg.pages * pDb->cfg.pageSize * 1024;
35,917✔
1217
    vgroupMemroy = buffer + cache;
35,917✔
1218
    int64_t cacheLast = (int64_t)pDb->cfg.cacheLastSize * 1024 * 1024;
35,917✔
1219
    if (pDb->cfg.cacheLast > 0) {
35,917✔
1220
      vgroupMemroy += cacheLast;
9,912✔
1221
    }
1222
    mDebug("db:%s, vgroup:%d, buffer:%" PRId64 " cache:%" PRId64 " cacheLast:%" PRId64, pDb->name, pVgroup->vgId,
35,917✔
1223
           buffer, cache, cacheLast);
1224
  }
1225

1226
  if (pDbInput == NULL) {
35,919✔
1227
    mndReleaseDb(pMnode, pDb);
25,054✔
1228
  }
1229
  return vgroupMemroy;
35,919✔
1230
}
1231

1232
static bool mndGetVnodeMemroyFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
27,904✔
1233
  SVgObj  *pVgroup = pObj;
27,904✔
1234
  int32_t  dnodeId = *(int32_t *)p1;
27,904✔
1235
  int64_t *pVnodeMemory = (int64_t *)p2;
27,904✔
1236

1237
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
61,277✔
1238
    if (pVgroup->vnodeGid[v].dnodeId == dnodeId) {
33,373✔
1239
      *pVnodeMemory += mndGetVgroupMemory(pMnode, NULL, pVgroup);
24,810✔
1240
    }
1241
  }
1242

1243
  return true;
27,904✔
1244
}
1245

1246
int64_t mndGetVnodesMemory(SMnode *pMnode, int32_t dnodeId) {
5,908✔
1247
  int64_t vnodeMemory = 0;
5,908✔
1248
  sdbTraverse(pMnode->pSdb, SDB_VGROUP, mndGetVnodeMemroyFp, &dnodeId, &vnodeMemory, NULL);
5,908✔
1249
  return vnodeMemory;
5,908✔
1250
}
1251

1252
void calculateRstoreFinishTime(double rate, int64_t applyCount, char *restoreStr, size_t restoreStrSize) {
14✔
1253
  if (rate == 0) {
14✔
1254
    snprintf(restoreStr, restoreStrSize, "0:0:0");
4✔
1255
    return;
4✔
1256
  }
1257

1258
  int64_t costTime = applyCount / rate;
10✔
1259
  int64_t totalSeconds = costTime / 1000;
10✔
1260
  int64_t hours = totalSeconds / 3600;
10✔
1261
  totalSeconds %= 3600;
10✔
1262
  int64_t minutes = totalSeconds / 60;
10✔
1263
  int64_t seconds = totalSeconds % 60;
10✔
1264
  snprintf(restoreStr, restoreStrSize, "%" PRId64 ":%" PRId64 ":%" PRId64, hours, minutes, seconds);
10✔
1265
}
1266

1267
static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
9,102✔
1268
  SMnode *pMnode = pReq->info.node;
9,102✔
1269
  SSdb   *pSdb = pMnode->pSdb;
9,102✔
1270
  int32_t numOfRows = 0;
9,102✔
1271
  SVgObj *pVgroup = NULL;
9,102✔
1272
  int32_t cols = 0;
9,102✔
1273
  int64_t curMs = taosGetTimestampMs();
9,102✔
1274
  int32_t code = 0;
9,102✔
1275

1276
  while (numOfRows < rows - TSDB_MAX_REPLICA) {
347,350!
1277
    pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
347,350✔
1278
    if (pShow->pIter == NULL) break;
347,307✔
1279

1280
    for (int32_t i = 0; i < pVgroup->replica && numOfRows < rows; ++i) {
676,625!
1281
      SVnodeGid       *pGid = &pVgroup->vnodeGid[i];
338,372✔
1282
      SColumnInfoData *pColInfo = NULL;
338,372✔
1283
      cols = 0;
338,372✔
1284

1285
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
338,372✔
1286
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pGid->dnodeId, false);
338,030✔
1287
      if (code != 0) {
337,643!
1288
        mError("vgId:%d, failed to set dnodeId, since %s", pVgroup->vgId, tstrerror(code));
×
1289
        return code;
×
1290
      }
1291
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
337,643✔
1292
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->vgId, false);
337,271✔
1293
      if (code != 0) {
337,302!
1294
        mError("vgId:%d, failed to set vgId, since %s", pVgroup->vgId, tstrerror(code));
×
1295
        return code;
×
1296
      }
1297

1298
      // db_name
1299
      const char *dbname = mndGetDbStr(pVgroup->dbName);
337,302✔
1300
      char        b1[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
337,485✔
1301
      if (dbname != NULL) {
337,485!
1302
        STR_WITH_MAXSIZE_TO_VARSTR(b1, dbname, TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE);
338,050✔
1303
      } else {
1304
        STR_WITH_MAXSIZE_TO_VARSTR(b1, "NULL", TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE);
×
1305
      }
1306
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
337,485✔
1307
      code = colDataSetVal(pColInfo, numOfRows, (const char *)b1, false);
337,532✔
1308
      if (code != 0) {
337,432!
1309
        mError("vgId:%d, failed to set dbName, since %s", pVgroup->vgId, tstrerror(code));
×
1310
        return code;
×
1311
      }
1312

1313
      // dnode is online?
1314
      SDnodeObj *pDnode = mndAcquireDnode(pMnode, pGid->dnodeId);
337,432✔
1315
      if (pDnode == NULL) {
338,299!
1316
        mError("failed to acquire dnode. dnodeId:%d", pGid->dnodeId);
×
1317
        break;
×
1318
      }
1319
      bool isDnodeOnline = mndIsDnodeOnline(pDnode, curMs);
338,299✔
1320

1321
      char       buf[20] = {0};
338,254✔
1322
      ESyncState syncState = (isDnodeOnline) ? pGid->syncState : TAOS_SYNC_STATE_OFFLINE;
338,254✔
1323
      STR_TO_VARSTR(buf, syncStr(syncState));
338,254✔
1324
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
338,127✔
1325
      code = colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
337,479✔
1326
      if (code != 0) {
337,025!
1327
        mError("vgId:%d, failed to set syncState, since %s", pVgroup->vgId, tstrerror(code));
×
1328
        return code;
×
1329
      }
1330

1331
      int64_t roleTimeMs = (isDnodeOnline) ? pGid->roleTimeMs : 0;
337,025✔
1332
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
337,025✔
1333
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&roleTimeMs, false);
336,619✔
1334
      if (code != 0) {
336,577!
1335
        mError("vgId:%d, failed to set roleTimeMs, since %s", pVgroup->vgId, tstrerror(code));
×
1336
        return code;
×
1337
      }
1338

1339
      int64_t startTimeMs = (isDnodeOnline) ? pGid->startTimeMs : 0;
336,577✔
1340
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
336,577✔
1341
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&startTimeMs, false);
336,102✔
1342
      if (code != 0) {
336,392!
1343
        mError("vgId:%d, failed to set startTimeMs, since %s", pVgroup->vgId, tstrerror(code));
×
1344
        return code;
×
1345
      }
1346

1347
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
336,392✔
1348
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pGid->syncRestore, false);
335,974✔
1349
      if (code != 0) {
336,334!
1350
        mError("vgId:%d, failed to set syncRestore, since %s", pVgroup->vgId, tstrerror(code));
×
1351
        return code;
×
1352
      }
1353

1354
      int64_t unappliedCount = pGid->syncCommitIndex - pGid->syncAppliedIndex;
336,334✔
1355
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
336,334✔
1356
      char restoreStr[20] = {0};
335,887✔
1357
      if (unappliedCount > 0) {
335,887✔
1358
        calculateRstoreFinishTime(pGid->appliedRate, unappliedCount, restoreStr, sizeof(restoreStr));
14✔
1359
      }
1360
      STR_TO_VARSTR(buf, restoreStr);
335,887✔
1361
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&buf, false);
335,887✔
1362
      if (code != 0) {
337,262!
1363
        mError("vgId:%d, failed to set syncRestore finish time, since %s", pVgroup->vgId, tstrerror(code));
×
1364
        return code;
×
1365
      }
1366

1367
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
337,262✔
1368
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&unappliedCount, false);
336,565✔
1369
      if (code != 0) {
336,581!
1370
        mError("vgId:%d, failed to set syncRestore, since %s", pVgroup->vgId, tstrerror(code));
×
1371
        return code;
×
1372
      }
1373

1374
      numOfRows++;
336,581✔
1375
      sdbRelease(pSdb, pDnode);
336,581✔
1376
    }
1377

1378
    sdbRelease(pSdb, pVgroup);
338,253✔
1379
  }
1380

1381
  pShow->numOfRows += numOfRows;
9,106✔
1382
  return numOfRows;
9,106✔
1383
}
1384

1385
static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter) {
×
1386
  SSdb *pSdb = pMnode->pSdb;
×
1387
  sdbCancelFetchByType(pSdb, pIter, SDB_VGROUP);
×
1388
}
×
1389

1390
static int32_t mndAddVnodeToVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray) {
137✔
1391
  int32_t code = 0;
137✔
1392
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
137✔
1393
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
524✔
1394
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
387✔
1395
    mInfo("dnode:%d, equivalent vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes);
387!
1396
  }
1397

1398
  SVnodeGid *pVgid = &pVgroup->vnodeGid[pVgroup->replica];
137✔
1399
  for (int32_t d = 0; d < taosArrayGetSize(pArray); ++d) {
173✔
1400
    SDnodeObj *pDnode = taosArrayGet(pArray, d);
163✔
1401

1402
    bool used = false;
163✔
1403
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
383✔
1404
      if (pDnode->id == pVgroup->vnodeGid[vn].dnodeId) {
256✔
1405
        used = true;
36✔
1406
        break;
36✔
1407
      }
1408
    }
1409
    if (used) continue;
163✔
1410

1411
    if (pDnode == NULL) {
127!
1412
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_DNODES);
×
1413
    }
1414
    if (pDnode->numOfVnodes >= pDnode->numOfSupportVnodes) {
127!
1415
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_VNODES);
×
1416
    }
1417

1418
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
127✔
1419
    if (pDnode->memAvail - vgMem - pDnode->memUsed <= 0) {
127!
1420
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
1421
             pVgroup->dbName, pVgroup->vgId, vgMem, pDnode->id, pDnode->memAvail, pDnode->memUsed);
1422
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE);
×
1423
    } else {
1424
      pDnode->memUsed += vgMem;
127✔
1425
    }
1426

1427
    pVgid->dnodeId = pDnode->id;
127✔
1428
    pVgid->syncState = TAOS_SYNC_STATE_OFFLINE;
127✔
1429
    mInfo("db:%s, vgId:%d, vn:%d is added, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
127!
1430
          pVgroup->dbName, pVgroup->vgId, pVgroup->replica, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
1431

1432
    pVgroup->replica++;
127✔
1433
    pDnode->numOfVnodes++;
127✔
1434

1435
    SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
127✔
1436
    if (pVgRaw == NULL) {
127!
1437
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1438
      if (terrno != 0) code = terrno;
×
1439
      TAOS_RETURN(code);
×
1440
    }
1441
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
127!
1442
      sdbFreeRaw(pVgRaw);
×
1443
      TAOS_RETURN(code);
×
1444
    }
1445
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
127✔
1446
    if (code != 0) {
127!
1447
      mError("vgId:%d, failed to set raw status since %s at line:%d", pVgroup->vgId, tstrerror(code), __LINE__);
×
1448
    }
1449
    TAOS_RETURN(code);
127✔
1450
  }
1451

1452
  code = TSDB_CODE_MND_NO_ENOUGH_DNODES;
10✔
1453
  mError("db:%s, failed to add vnode to vgId:%d since %s", pVgroup->dbName, pVgroup->vgId, tstrerror(code));
10!
1454
  TAOS_RETURN(code);
10✔
1455
}
1456

1457
static int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray,
27✔
1458
                                        SVnodeGid *pDelVgid) {
1459
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
27✔
1460
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
123✔
1461
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
96✔
1462
    mInfo("dnode:%d, equivalent vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes);
96!
1463
  }
1464

1465
  int32_t code = -1;
27✔
1466
  for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) {
39!
1467
    SDnodeObj *pDnode = taosArrayGet(pArray, d);
39✔
1468

1469
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
95✔
1470
      SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
83✔
1471
      if (pVgid->dnodeId == pDnode->id) {
83✔
1472
        int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
27✔
1473
        pDnode->memUsed -= vgMem;
27✔
1474
        mInfo("db:%s, vgId:%d, vn:%d is removed, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
27!
1475
              pVgroup->dbName, pVgroup->vgId, vn, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
1476
        pDnode->numOfVnodes--;
27✔
1477
        pVgroup->replica--;
27✔
1478
        *pDelVgid = *pVgid;
27✔
1479
        *pVgid = pVgroup->vnodeGid[pVgroup->replica];
27✔
1480
        memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
27✔
1481
        code = 0;
27✔
1482
        goto _OVER;
27✔
1483
      }
1484
    }
1485
  }
1486

1487
_OVER:
×
1488
  if (code != 0) {
27!
1489
    code = TSDB_CODE_APP_ERROR;
×
1490
    mError("db:%s, failed to remove vnode from vgId:%d since %s", pVgroup->dbName, pVgroup->vgId, tstrerror(code));
×
1491
    TAOS_RETURN(code);
×
1492
  }
1493

1494
  for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
77✔
1495
    SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
50✔
1496
    mInfo("db:%s, vgId:%d, vn:%d dnode:%d is reserved", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId);
50!
1497
  }
1498

1499
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
27✔
1500
  if (pVgRaw == NULL) {
27!
1501
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1502
    if (terrno != 0) code = terrno;
×
1503
    TAOS_RETURN(code);
×
1504
  }
1505
  if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) {
27!
1506
    sdbFreeRaw(pVgRaw);
×
1507
    TAOS_RETURN(code);
×
1508
  }
1509
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
27✔
1510
  if (code != 0) {
27!
1511
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVgroup->vgId, tstrerror(code), __LINE__);
×
1512
  }
1513

1514
  TAOS_RETURN(code);
27✔
1515
}
1516

1517
static int32_t mndRemoveVnodeFromVgroupWithoutSave(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray,
×
1518
                                                   SVnodeGid *pDelVgid) {
1519
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
×
1520
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
×
1521
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
×
1522
    mInfo("dnode:%d, equivalent vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes);
×
1523
  }
1524

1525
  int32_t code = -1;
×
1526
  for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) {
×
1527
    SDnodeObj *pDnode = taosArrayGet(pArray, d);
×
1528

1529
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
×
1530
      SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
×
1531
      if (pVgid->dnodeId == pDnode->id) {
×
1532
        int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
×
1533
        pDnode->memUsed -= vgMem;
×
1534
        mInfo("db:%s, vgId:%d, vn:%d is removed, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
×
1535
              pVgroup->dbName, pVgroup->vgId, vn, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
1536
        pDnode->numOfVnodes--;
×
1537
        pVgroup->replica--;
×
1538
        *pDelVgid = *pVgid;
×
1539
        *pVgid = pVgroup->vnodeGid[pVgroup->replica];
×
1540
        memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
×
1541
        code = 0;
×
1542
        goto _OVER;
×
1543
      }
1544
    }
1545
  }
1546

1547
_OVER:
×
1548
  if (code != 0) {
×
1549
    code = TSDB_CODE_APP_ERROR;
×
1550
    mError("db:%s, failed to remove vnode from vgId:%d since %s", pVgroup->dbName, pVgroup->vgId, tstrerror(code));
×
1551
    TAOS_RETURN(code);
×
1552
  }
1553

1554
  for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
×
1555
    SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
×
1556
    mInfo("db:%s, vgId:%d, vn:%d dnode:%d is reserved", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId);
×
1557
  }
1558

1559
  TAOS_RETURN(code);
×
1560
}
1561

1562
int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid) {
9,698✔
1563
  int32_t      code = 0;
9,698✔
1564
  STransAction action = {0};
9,698✔
1565

1566
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
9,698✔
1567
  if (pDnode == NULL) return -1;
9,698!
1568
  action.epSet = mndGetDnodeEpset(pDnode);
9,698✔
1569
  mndReleaseDnode(pMnode, pDnode);
9,698✔
1570

1571
  int32_t contLen = 0;
9,698✔
1572
  void   *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
9,698✔
1573
  if (pReq == NULL) return -1;
9,698!
1574

1575
  action.pCont = pReq;
9,698✔
1576
  action.contLen = contLen;
9,698✔
1577
  action.msgType = TDMT_DND_CREATE_VNODE;
9,698✔
1578
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
9,698✔
1579

1580
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
9,698!
1581
    taosMemoryFree(pReq);
×
1582
    TAOS_RETURN(code);
×
1583
  }
1584

1585
  TAOS_RETURN(code);
9,698✔
1586
}
1587

1588
int32_t mndRestoreAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
10✔
1589
                                       SDnodeObj *pDnode) {
1590
  int32_t      code = 0;
10✔
1591
  STransAction action = {0};
10✔
1592

1593
  action.epSet = mndGetDnodeEpset(pDnode);
10✔
1594

1595
  int32_t contLen = 0;
10✔
1596
  void   *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
10✔
1597
  if (pReq == NULL) {
10!
1598
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1599
    if (terrno != 0) code = terrno;
×
1600
    TAOS_RETURN(code);
×
1601
  }
1602

1603
  action.pCont = pReq;
10✔
1604
  action.contLen = contLen;
10✔
1605
  action.msgType = TDMT_DND_CREATE_VNODE;
10✔
1606
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
10✔
1607

1608
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
10!
1609
    taosMemoryFree(pReq);
×
1610
    TAOS_RETURN(code);
×
1611
  }
1612

1613
  TAOS_RETURN(code);
10✔
1614
}
1615

1616
int32_t mndAddAlterVnodeConfirmAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
460✔
1617
  int32_t      code = 0;
460✔
1618
  STransAction action = {0};
460✔
1619
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
460✔
1620

1621
  mInfo("vgId:%d, build alter vnode confirm req", pVgroup->vgId);
460!
1622
  int32_t   contLen = sizeof(SMsgHead);
460✔
1623
  SMsgHead *pHead = taosMemoryMalloc(contLen);
460!
1624
  if (pHead == NULL) {
460!
1625
    TAOS_RETURN(terrno);
×
1626
  }
1627

1628
  pHead->contLen = htonl(contLen);
460✔
1629
  pHead->vgId = htonl(pVgroup->vgId);
460✔
1630

1631
  action.pCont = pHead;
460✔
1632
  action.contLen = contLen;
460✔
1633
  action.msgType = TDMT_VND_ALTER_CONFIRM;
460✔
1634
  // incorrect redirect result will cause this erro
1635
  action.retryCode = TSDB_CODE_VND_INVALID_VGROUP_ID;
460✔
1636

1637
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
460!
1638
    taosMemoryFree(pHead);
×
1639
    TAOS_RETURN(code);
×
1640
  }
1641

1642
  TAOS_RETURN(code);
460✔
1643
}
1644

1645
int32_t mndAddChangeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pOldVgroup, SVgObj *pNewVgroup,
×
1646
                                 int32_t dnodeId) {
1647
  int32_t      code = 0;
×
1648
  STransAction action = {0};
×
1649
  action.epSet = mndGetVgroupEpset(pMnode, pNewVgroup);
×
1650

1651
  int32_t contLen = 0;
×
1652
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pNewVgroup, dnodeId, &contLen);
×
1653
  if (pReq == NULL) {
×
1654
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1655
    if (terrno != 0) code = terrno;
×
1656
    TAOS_RETURN(code);
×
1657
  }
1658

1659
  int32_t totallen = contLen + sizeof(SMsgHead);
×
1660

1661
  SMsgHead *pHead = taosMemoryMalloc(totallen);
×
1662
  if (pHead == NULL) {
×
1663
    taosMemoryFree(pReq);
×
1664
    TAOS_RETURN(terrno);
×
1665
  }
1666

1667
  pHead->contLen = htonl(totallen);
×
1668
  pHead->vgId = htonl(pNewVgroup->vgId);
×
1669

1670
  memcpy((void *)(pHead + 1), pReq, contLen);
×
1671
  taosMemoryFree(pReq);
×
1672

1673
  action.pCont = pHead;
×
1674
  action.contLen = totallen;
×
1675
  action.msgType = TDMT_SYNC_CONFIG_CHANGE;
×
1676

1677
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
1678
    taosMemoryFree(pHead);
×
1679
    TAOS_RETURN(code);
×
1680
  }
1681

1682
  TAOS_RETURN(code);
×
1683
}
1684

1685
static int32_t mndAddAlterVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, int32_t srcVgId, SVgObj *pVgroup) {
46✔
1686
  int32_t      code = 0;
46✔
1687
  STransAction action = {0};
46✔
1688
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
46✔
1689

1690
  int32_t contLen = 0;
46✔
1691
  void   *pReq = mndBuildAlterVnodeHashRangeReq(pMnode, srcVgId, pVgroup, &contLen);
46✔
1692
  if (pReq == NULL) {
46!
1693
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1694
    if (terrno != 0) code = terrno;
×
1695
    TAOS_RETURN(code);
×
1696
  }
1697

1698
  action.pCont = pReq;
46✔
1699
  action.contLen = contLen;
46✔
1700
  action.msgType = TDMT_VND_ALTER_HASHRANGE;
46✔
1701
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
46✔
1702

1703
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
46!
1704
    taosMemoryFree(pReq);
×
1705
    TAOS_RETURN(code);
×
1706
  }
1707

1708
  mInfo("trans:%d, add alter vnode hash range action for from vgId:%d to vgId:%d", pTrans->id, srcVgId, pVgroup->vgId);
46!
1709
  TAOS_RETURN(code);
46✔
1710
}
1711

1712
int32_t mndAddAlterVnodeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
413✔
1713
  int32_t      code = 0;
413✔
1714
  STransAction action = {0};
413✔
1715
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
413✔
1716

1717
  int32_t contLen = 0;
413✔
1718
  void   *pReq = mndBuildAlterVnodeConfigReq(pMnode, pDb, pVgroup, &contLen);
413✔
1719
  if (pReq == NULL) {
413!
1720
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1721
    if (terrno != 0) code = terrno;
×
1722
    TAOS_RETURN(code);
×
1723
  }
1724

1725
  action.pCont = pReq;
413✔
1726
  action.contLen = contLen;
413✔
1727
  action.msgType = TDMT_VND_ALTER_CONFIG;
413✔
1728

1729
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
413!
1730
    taosMemoryFree(pReq);
×
1731
    TAOS_RETURN(code);
×
1732
  }
1733

1734
  TAOS_RETURN(code);
413✔
1735
}
1736

1737
int32_t mndAddNewVgPrepareAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVg) {
8,738✔
1738
  int32_t  code = 0;
8,738✔
1739
  SSdbRaw *pRaw = mndVgroupActionEncode(pVg);
8,738✔
1740
  if (pRaw == NULL) {
8,738!
1741
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1742
    if (terrno != 0) code = terrno;
×
1743
    goto _err;
×
1744
  }
1745

1746
  TAOS_CHECK_GOTO(mndTransAppendPrepareLog(pTrans, pRaw), NULL, _err);
8,738!
1747
  if (sdbSetRawStatus(pRaw, SDB_STATUS_CREATING) != 0) {
8,738!
1748
    mError("vgId:%d, failed to set raw status at line:%d", pVg->vgId, __LINE__);
×
1749
  }
1750
  if (code != 0) {
8,738!
1751
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVg->vgId, tstrerror(code), __LINE__);
×
1752
    TAOS_RETURN(code);
×
1753
  }
1754
  pRaw = NULL;
8,738✔
1755
  TAOS_RETURN(code);
8,738✔
1756

1757
_err:
×
1758
  sdbFreeRaw(pRaw);
×
1759
  TAOS_RETURN(code);
×
1760
}
1761

1762
int32_t mndAddAlterVnodeReplicaAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
1,138✔
1763
  int32_t    code = 0;
1,138✔
1764
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
1,138✔
1765
  if (pDnode == NULL) {
1,138!
1766
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1767
    if (terrno != 0) code = terrno;
×
1768
    TAOS_RETURN(code);
×
1769
  }
1770

1771
  STransAction action = {0};
1,138✔
1772
  action.epSet = mndGetDnodeEpset(pDnode);
1,138✔
1773
  mndReleaseDnode(pMnode, pDnode);
1,138✔
1774

1775
  int32_t contLen = 0;
1,138✔
1776
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
1,138✔
1777
  if (pReq == NULL) {
1,138!
1778
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1779
    if (terrno != 0) code = terrno;
×
1780
    TAOS_RETURN(code);
×
1781
  }
1782

1783
  action.pCont = pReq;
1,138✔
1784
  action.contLen = contLen;
1,138✔
1785
  action.msgType = TDMT_VND_ALTER_REPLICA;
1,138✔
1786

1787
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
1,138!
1788
    taosMemoryFree(pReq);
×
1789
    TAOS_RETURN(code);
×
1790
  }
1791

1792
  TAOS_RETURN(code);
1,138✔
1793
}
1794

1795
int32_t mndAddCheckLearnerCatchupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
×
1796
  int32_t    code = 0;
×
1797
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
×
1798
  if (pDnode == NULL) {
×
1799
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1800
    if (terrno != 0) code = terrno;
×
1801
    TAOS_RETURN(code);
×
1802
  }
1803

1804
  STransAction action = {0};
×
1805
  action.epSet = mndGetDnodeEpset(pDnode);
×
1806
  mndReleaseDnode(pMnode, pDnode);
×
1807

1808
  int32_t contLen = 0;
×
1809
  void   *pReq = mndBuildCheckLearnCatchupReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
×
1810
  if (pReq == NULL) {
×
1811
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1812
    if (terrno != 0) code = terrno;
×
1813
    TAOS_RETURN(code);
×
1814
  }
1815

1816
  action.pCont = pReq;
×
1817
  action.contLen = contLen;
×
1818
  action.msgType = TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP;
×
1819
  action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
1820
  action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
×
1821

1822
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
1823
    taosMemoryFree(pReq);
×
1824
    TAOS_RETURN(code);
×
1825
  }
1826

1827
  TAOS_RETURN(code);
×
1828
}
1829

1830
int32_t mndAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
168✔
1831
  int32_t    code = 0;
168✔
1832
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
168✔
1833
  if (pDnode == NULL) {
168!
1834
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1835
    if (terrno != 0) code = terrno;
×
1836
    TAOS_RETURN(code);
×
1837
  }
1838

1839
  STransAction action = {0};
168✔
1840
  action.epSet = mndGetDnodeEpset(pDnode);
168✔
1841
  mndReleaseDnode(pMnode, pDnode);
168✔
1842

1843
  int32_t contLen = 0;
168✔
1844
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
168✔
1845
  if (pReq == NULL) {
168!
1846
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1847
    if (terrno != 0) code = terrno;
×
1848
    TAOS_RETURN(code);
×
1849
  }
1850

1851
  action.pCont = pReq;
168✔
1852
  action.contLen = contLen;
168✔
1853
  action.msgType = TDMT_DND_ALTER_VNODE_TYPE;
168✔
1854
  action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
168✔
1855
  action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
168✔
1856

1857
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
168!
1858
    taosMemoryFree(pReq);
×
1859
    TAOS_RETURN(code);
×
1860
  }
1861

1862
  TAOS_RETURN(code);
168✔
1863
}
1864

1865
int32_t mndRestoreAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
10✔
1866
                                          SDnodeObj *pDnode) {
1867
  int32_t      code = 0;
10✔
1868
  STransAction action = {0};
10✔
1869
  action.epSet = mndGetDnodeEpset(pDnode);
10✔
1870

1871
  int32_t contLen = 0;
10✔
1872
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, pDnode->id, &contLen);
10✔
1873
  if (pReq == NULL) {
10!
1874
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1875
    if (terrno != 0) code = terrno;
×
1876
    TAOS_RETURN(code);
×
1877
  }
1878

1879
  action.pCont = pReq;
10✔
1880
  action.contLen = contLen;
10✔
1881
  action.msgType = TDMT_DND_ALTER_VNODE_TYPE;
10✔
1882
  action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
10✔
1883
  action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
10✔
1884

1885
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
10!
1886
    taosMemoryFree(pReq);
×
1887
    TAOS_RETURN(code);
×
1888
  }
1889

1890
  TAOS_RETURN(code);
10✔
1891
}
1892

1893
static int32_t mndAddDisableVnodeWriteAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
46✔
1894
                                             int32_t dnodeId) {
1895
  int32_t    code = 0;
46✔
1896
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
46✔
1897
  if (pDnode == NULL) {
46!
1898
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1899
    if (terrno != 0) code = terrno;
×
1900
    TAOS_RETURN(code);
×
1901
  }
1902

1903
  STransAction action = {0};
46✔
1904
  action.epSet = mndGetDnodeEpset(pDnode);
46✔
1905
  mndReleaseDnode(pMnode, pDnode);
46✔
1906

1907
  int32_t contLen = 0;
46✔
1908
  void   *pReq = mndBuildDisableVnodeWriteReq(pMnode, pDb, pVgroup->vgId, &contLen);
46✔
1909
  if (pReq == NULL) {
46!
1910
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1911
    if (terrno != 0) code = terrno;
×
1912
    TAOS_RETURN(code);
×
1913
  }
1914

1915
  action.pCont = pReq;
46✔
1916
  action.contLen = contLen;
46✔
1917
  action.msgType = TDMT_VND_DISABLE_WRITE;
46✔
1918

1919
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
46!
1920
    taosMemoryFree(pReq);
×
1921
    TAOS_RETURN(code);
×
1922
  }
1923

1924
  TAOS_RETURN(code);
46✔
1925
}
1926

1927
int32_t mndAddDropVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid,
14,094✔
1928
                              bool isRedo) {
1929
  int32_t      code = 0;
14,094✔
1930
  STransAction action = {0};
14,094✔
1931

1932
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
14,094✔
1933
  if (pDnode == NULL) {
14,094!
1934
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1935
    if (terrno != 0) code = terrno;
×
1936
    TAOS_RETURN(code);
×
1937
  }
1938
  action.epSet = mndGetDnodeEpset(pDnode);
14,094✔
1939
  mndReleaseDnode(pMnode, pDnode);
14,094✔
1940

1941
  int32_t contLen = 0;
14,094✔
1942
  void   *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
14,094✔
1943
  if (pReq == NULL) {
14,094!
1944
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1945
    if (terrno != 0) code = terrno;
×
1946
    TAOS_RETURN(code);
×
1947
  }
1948

1949
  action.pCont = pReq;
14,094✔
1950
  action.contLen = contLen;
14,094✔
1951
  action.msgType = TDMT_DND_DROP_VNODE;
14,094✔
1952
  action.acceptableCode = TSDB_CODE_VND_NOT_EXIST;
14,094✔
1953

1954
  if (isRedo) {
14,094✔
1955
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
4,637!
1956
      taosMemoryFree(pReq);
×
1957
      TAOS_RETURN(code);
×
1958
    }
1959
  } else {
1960
    if ((code = mndTransAppendUndoAction(pTrans, &action)) != 0) {
9,457!
1961
      taosMemoryFree(pReq);
×
1962
      TAOS_RETURN(code);
×
1963
    }
1964
  }
1965

1966
  TAOS_RETURN(code);
14,094✔
1967
}
1968

1969
int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t vnIndex,
33✔
1970
                                    SArray *pArray, bool force, bool unsafe) {
1971
  int32_t code = 0;
33✔
1972
  SVgObj  newVg = {0};
33✔
1973
  memcpy(&newVg, pVgroup, sizeof(SVgObj));
33✔
1974

1975
  mInfo("vgId:%d, vgroup info before move, replica:%d", newVg.vgId, newVg.replica);
33!
1976
  for (int32_t i = 0; i < newVg.replica; ++i) {
102✔
1977
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
69!
1978
  }
1979

1980
  if (!force) {
33✔
1981
#if 1
1982
    {
1983
#else
1984
    if (newVg.replica == 1) {
1985
#endif
1986
      mInfo("vgId:%d, will add 1 vnode, replca:%d", pVgroup->vgId, newVg.replica);
29!
1987
      TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray));
29✔
1988
      for (int32_t i = 0; i < newVg.replica - 1; ++i) {
88✔
1989
        TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId));
60!
1990
      }
1991
      TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[newVg.replica - 1]));
28!
1992
      TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg));
28!
1993

1994
      mInfo("vgId:%d, will remove 1 vnode, replca:2", pVgroup->vgId);
28!
1995
      newVg.replica--;
28✔
1996
      SVnodeGid del = newVg.vnodeGid[vnIndex];
28✔
1997
      newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
28✔
1998
      memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
28✔
1999
      {
2000
        SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
28✔
2001
        if (pRaw == NULL) {
28!
2002
          code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2003
          if (terrno != 0) code = terrno;
×
2004
          TAOS_RETURN(code);
×
2005
        }
2006
        if ((code = mndTransAppendRedolog(pTrans, pRaw)) != 0) {
28!
2007
          sdbFreeRaw(pRaw);
×
2008
          TAOS_RETURN(code);
×
2009
        }
2010
        code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
28✔
2011
        if (code != 0) {
28!
2012
          mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2013
          return code;
×
2014
        }
2015
      }
2016

2017
      TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true));
28!
2018
      for (int32_t i = 0; i < newVg.replica; ++i) {
88✔
2019
        TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId));
60!
2020
      }
2021
      TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg));
28!
2022
#if 1
2023
    }
2024
#else
2025
    } else {  // new replica == 3
2026
      mInfo("vgId:%d, will add 1 vnode, replca:3", pVgroup->vgId);
2027
      if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1;
2028
      mInfo("vgId:%d, will remove 1 vnode, replca:4", pVgroup->vgId);
2029
      newVg.replica--;
2030
      SVnodeGid del = newVg.vnodeGid[vnIndex];
2031
      newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
2032
      memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
2033
      {
2034
        SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
2035
        if (pRaw == NULL) return -1;
2036
        if (mndTransAppendRedolog(pTrans, pRaw) != 0) {
2037
          sdbFreeRaw(pRaw);
2038
          return -1;
2039
        }
2040
      }
2041

2042
      if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true) != 0) return -1;
2043
      for (int32_t i = 0; i < newVg.replica; ++i) {
2044
        if (i == vnIndex) continue;
2045
        if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1;
2046
      }
2047
      if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[vnIndex]) != 0) return -1;
2048
      if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
2049
    }
2050
#endif
2051
  } else {
2052
    mInfo("vgId:%d, will add 1 vnode and force remove 1 vnode", pVgroup->vgId);
4!
2053
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray));
4!
2054
    newVg.replica--;
4✔
2055
    // SVnodeGid del = newVg.vnodeGid[vnIndex];
2056
    newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
4✔
2057
    memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
4✔
2058
    {
2059
      SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
4✔
2060
      if (pRaw == NULL) {
4!
2061
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2062
        if (terrno != 0) code = terrno;
×
2063
        TAOS_RETURN(code);
×
2064
      }
2065
      if ((code = mndTransAppendRedolog(pTrans, pRaw)) != 0) {
4!
2066
        sdbFreeRaw(pRaw);
×
2067
        TAOS_RETURN(code);
×
2068
      }
2069
      code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
4✔
2070
      if (code != 0) {
4!
2071
        mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2072
        return code;
×
2073
      }
2074
    }
2075

2076
    for (int32_t i = 0; i < newVg.replica; ++i) {
12✔
2077
      if (i != vnIndex) {
8✔
2078
        TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId));
4!
2079
      }
2080
    }
2081
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[vnIndex]));
4!
2082
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg));
4!
2083

2084
    if (newVg.replica == 1) {
4✔
2085
      if (force && !unsafe) {
2!
2086
        TAOS_RETURN(TSDB_CODE_VND_META_DATA_UNSAFE_DELETE);
1✔
2087
      }
2088

2089
      SSdb *pSdb = pMnode->pSdb;
1✔
2090
      void *pIter = NULL;
1✔
2091

2092
      while (1) {
3✔
2093
        SStbObj *pStb = NULL;
4✔
2094
        pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
4✔
2095
        if (pIter == NULL) break;
4✔
2096

2097
        if (strcmp(pStb->db, pDb->name) == 0) {
3✔
2098
          if ((code = mndSetForceDropCreateStbRedoActions(pMnode, pTrans, &newVg, pStb)) != 0) {
2!
2099
            sdbCancelFetch(pSdb, pIter);
×
2100
            sdbRelease(pSdb, pStb);
×
2101
            TAOS_RETURN(code);
×
2102
          }
2103
        }
2104

2105
        sdbRelease(pSdb, pStb);
3✔
2106
      }
2107

2108
      mInfo("vgId:%d, all data is dropped since replica=1", pVgroup->vgId);
1!
2109
    }
2110
  }
2111

2112
  {
2113
    SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
31✔
2114
    if (pRaw == NULL) {
31!
2115
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2116
      if (terrno != 0) code = terrno;
×
2117
      TAOS_RETURN(code);
×
2118
    }
2119
    if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
31!
2120
      sdbFreeRaw(pRaw);
×
2121
      TAOS_RETURN(code);
×
2122
    }
2123
    code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
31✔
2124
    if (code != 0) {
31!
2125
      mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2126
      return code;
×
2127
    }
2128
  }
2129

2130
  mInfo("vgId:%d, vgroup info after move, replica:%d", newVg.vgId, newVg.replica);
31!
2131
  for (int32_t i = 0; i < newVg.replica; ++i) {
98✔
2132
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
67!
2133
  }
2134
  TAOS_RETURN(code);
31✔
2135
}
2136

2137
int32_t mndSetMoveVgroupsInfoToTrans(SMnode *pMnode, STrans *pTrans, int32_t delDnodeId, bool force, bool unsafe) {
16✔
2138
  int32_t code = 0;
16✔
2139
  SArray *pArray = mndBuildDnodesArray(pMnode, delDnodeId, NULL);
16✔
2140
  if (pArray == NULL) {
16!
2141
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2142
    if (terrno != 0) code = terrno;
×
2143
    TAOS_RETURN(code);
×
2144
  }
2145

2146
  void *pIter = NULL;
16✔
2147
  while (1) {
44✔
2148
    SVgObj *pVgroup = NULL;
60✔
2149
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
60✔
2150
    if (pIter == NULL) break;
60✔
2151

2152
    int32_t vnIndex = -1;
46✔
2153
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
81✔
2154
      if (pVgroup->vnodeGid[i].dnodeId == delDnodeId) {
68✔
2155
        vnIndex = i;
33✔
2156
        break;
33✔
2157
      }
2158
    }
2159

2160
    code = 0;
46✔
2161
    if (vnIndex != -1) {
46✔
2162
      mInfo("vgId:%d, vnode:%d will be removed from dnode:%d, force:%d", pVgroup->vgId, vnIndex, delDnodeId, force);
33!
2163
      SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
33✔
2164
      code = mndSetMoveVgroupInfoToTrans(pMnode, pTrans, pDb, pVgroup, vnIndex, pArray, force, unsafe);
33✔
2165
      mndReleaseDb(pMnode, pDb);
33✔
2166
    }
2167

2168
    sdbRelease(pMnode->pSdb, pVgroup);
46✔
2169

2170
    if (code != 0) {
46✔
2171
      sdbCancelFetch(pMnode->pSdb, pIter);
2✔
2172
      break;
2✔
2173
    }
2174
  }
2175

2176
  taosArrayDestroy(pArray);
16✔
2177
  TAOS_RETURN(code);
16✔
2178
}
2179

2180
static int32_t mndAddIncVgroupReplicaToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
114✔
2181
                                             int32_t newDnodeId) {
2182
  int32_t code = 0;
114✔
2183
  mInfo("vgId:%d, will add 1 vnode, replica:%d dnode:%d", pVgroup->vgId, pVgroup->replica, newDnodeId);
114!
2184

2185
  // assoc dnode
2186
  SVnodeGid *pGid = &pVgroup->vnodeGid[pVgroup->replica];
114✔
2187
  pVgroup->replica++;
114✔
2188
  pGid->dnodeId = newDnodeId;
114✔
2189
  pGid->syncState = TAOS_SYNC_STATE_OFFLINE;
114✔
2190
  pGid->nodeRole = TAOS_SYNC_ROLE_LEARNER;
114✔
2191

2192
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
114✔
2193
  if (pVgRaw == NULL) {
114!
2194
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2195
    if (terrno != 0) code = terrno;
×
2196
    TAOS_RETURN(code);
×
2197
  }
2198
  if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
114!
2199
    sdbFreeRaw(pVgRaw);
×
2200
    TAOS_RETURN(code);
×
2201
  }
2202
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
114✔
2203
  if (code != 0) {
114!
2204
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVgroup->vgId, tstrerror(code), __LINE__);
×
2205
    TAOS_RETURN(code);
×
2206
  }
2207

2208
  // learner
2209
  for (int32_t i = 0; i < pVgroup->replica - 1; ++i) {
372✔
2210
    TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId));
258!
2211
  }
2212
  TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pGid));
114!
2213

2214
  // voter
2215
  pGid->nodeRole = TAOS_SYNC_ROLE_VOTER;
114✔
2216
  TAOS_CHECK_RETURN(mndAddAlterVnodeTypeAction(pMnode, pTrans, pDb, pVgroup, pGid->dnodeId));
114!
2217
  for (int32_t i = 0; i < pVgroup->replica - 1; ++i) {
372✔
2218
    TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId));
258!
2219
  }
2220

2221
  // confirm
2222
  TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup));
114!
2223

2224
  TAOS_RETURN(code);
114✔
2225
}
2226

2227
static int32_t mndAddDecVgroupReplicaFromTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
114✔
2228
                                               int32_t delDnodeId) {
2229
  int32_t code = 0;
114✔
2230
  mInfo("vgId:%d, will remove 1 vnode, replica:%d dnode:%d", pVgroup->vgId, pVgroup->replica, delDnodeId);
114!
2231

2232
  SVnodeGid *pGid = NULL;
114✔
2233
  SVnodeGid  delGid = {0};
114✔
2234
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
193!
2235
    if (pVgroup->vnodeGid[i].dnodeId == delDnodeId) {
193✔
2236
      pGid = &pVgroup->vnodeGid[i];
114✔
2237
      break;
114✔
2238
    }
2239
  }
2240

2241
  if (pGid == NULL) return 0;
114!
2242

2243
  pVgroup->replica--;
114✔
2244
  memcpy(&delGid, pGid, sizeof(SVnodeGid));
114✔
2245
  memcpy(pGid, &pVgroup->vnodeGid[pVgroup->replica], sizeof(SVnodeGid));
114✔
2246
  memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
114✔
2247

2248
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
114✔
2249
  if (pVgRaw == NULL) {
114!
2250
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2251
    if (terrno != 0) code = terrno;
×
2252
    TAOS_RETURN(code);
×
2253
  }
2254
  if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
114!
2255
    sdbFreeRaw(pVgRaw);
×
2256
    TAOS_RETURN(code);
×
2257
  }
2258
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
114✔
2259
  if (code != 0) {
114!
2260
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVgroup->vgId, tstrerror(code), __LINE__);
×
2261
    TAOS_RETURN(code);
×
2262
  }
2263

2264
  TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pDb, pVgroup, &delGid, true));
114!
2265
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
372✔
2266
    TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId));
258!
2267
  }
2268
  TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup));
114!
2269

2270
  TAOS_RETURN(code);
114✔
2271
}
2272

2273
static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup, SDnodeObj *pNew1,
68✔
2274
                                     SDnodeObj *pOld1, SDnodeObj *pNew2, SDnodeObj *pOld2, SDnodeObj *pNew3,
2275
                                     SDnodeObj *pOld3) {
2276
  int32_t code = -1;
68✔
2277
  STrans *pTrans = NULL;
68✔
2278

2279
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "red-vgroup");
68✔
2280
  if (pTrans == NULL) {
68!
2281
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2282
    if (terrno != 0) code = terrno;
×
2283
    goto _OVER;
×
2284
  }
2285

2286
  mndTransSetDbName(pTrans, pVgroup->dbName, NULL);
68✔
2287
  TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
68✔
2288

2289
  mndTransSetSerial(pTrans);
67✔
2290
  mInfo("trans:%d, used to redistribute vgroup, vgId:%d", pTrans->id, pVgroup->vgId);
67!
2291

2292
  SVgObj newVg = {0};
67✔
2293
  memcpy(&newVg, pVgroup, sizeof(SVgObj));
67✔
2294
  mInfo("vgId:%d, vgroup info before redistribute, replica:%d", newVg.vgId, newVg.replica);
67!
2295
  for (int32_t i = 0; i < newVg.replica; ++i) {
212✔
2296
    mInfo("vgId:%d, vnode:%d dnode:%d role:%s", newVg.vgId, i, newVg.vnodeGid[i].dnodeId,
145!
2297
          syncStr(newVg.vnodeGid[i].syncState));
2298
  }
2299

2300
  if (pNew1 != NULL && pOld1 != NULL) {
67!
2301
    int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew1->id);
67✔
2302
    if (numOfVnodes >= pNew1->numOfSupportVnodes) {
67✔
2303
      mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew1->id, numOfVnodes,
1!
2304
             pNew1->numOfSupportVnodes);
2305
      code = TSDB_CODE_MND_NO_ENOUGH_VNODES;
1✔
2306
      goto _OVER;
1✔
2307
    }
2308

2309
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
66✔
2310
    if (pNew1->memAvail - vgMem - pNew1->memUsed <= 0) {
66!
2311
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
2312
             pVgroup->dbName, pVgroup->vgId, vgMem, pNew1->id, pNew1->memAvail, pNew1->memUsed);
2313
      code = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
×
2314
      goto _OVER;
×
2315
    } else {
2316
      pNew1->memUsed += vgMem;
66✔
2317
    }
2318

2319
    TAOS_CHECK_GOTO(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew1->id), NULL, _OVER);
66!
2320
    TAOS_CHECK_GOTO(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld1->id), NULL, _OVER);
66!
2321
  }
2322

2323
  if (pNew2 != NULL && pOld2 != NULL) {
66!
2324
    int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew2->id);
16✔
2325
    if (numOfVnodes >= pNew2->numOfSupportVnodes) {
16!
2326
      mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew2->id, numOfVnodes,
×
2327
             pNew2->numOfSupportVnodes);
2328
      code = TSDB_CODE_MND_NO_ENOUGH_VNODES;
×
2329
      goto _OVER;
×
2330
    }
2331
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
16✔
2332
    if (pNew2->memAvail - vgMem - pNew2->memUsed <= 0) {
16!
2333
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
2334
             pVgroup->dbName, pVgroup->vgId, vgMem, pNew2->id, pNew2->memAvail, pNew2->memUsed);
2335
      code = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
×
2336
      goto _OVER;
×
2337
    } else {
2338
      pNew2->memUsed += vgMem;
16✔
2339
    }
2340
    TAOS_CHECK_GOTO(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew2->id), NULL, _OVER);
16!
2341
    TAOS_CHECK_GOTO(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld2->id), NULL, _OVER);
16!
2342
  }
2343

2344
  if (pNew3 != NULL && pOld3 != NULL) {
66!
2345
    int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew3->id);
8✔
2346
    if (numOfVnodes >= pNew3->numOfSupportVnodes) {
8!
2347
      mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew3->id, numOfVnodes,
×
2348
             pNew3->numOfSupportVnodes);
2349
      code = TSDB_CODE_MND_NO_ENOUGH_VNODES;
×
2350
      goto _OVER;
×
2351
    }
2352
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
8✔
2353
    if (pNew3->memAvail - vgMem - pNew3->memUsed <= 0) {
8!
2354
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
2355
             pVgroup->dbName, pVgroup->vgId, vgMem, pNew3->id, pNew3->memAvail, pNew3->memUsed);
2356
      code = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
×
2357
      goto _OVER;
×
2358
    } else {
2359
      pNew3->memUsed += vgMem;
8✔
2360
    }
2361
    TAOS_CHECK_GOTO(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew3->id), NULL, _OVER);
8!
2362
    TAOS_CHECK_GOTO(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld3->id), NULL, _OVER);
8!
2363
  }
2364

2365
  {
2366
    SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
66✔
2367
    if (pRaw == NULL) {
66!
2368
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2369
      if (terrno != 0) code = terrno;
×
2370
      goto _OVER;
×
2371
    }
2372
    if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
66!
2373
      sdbFreeRaw(pRaw);
×
2374
      goto _OVER;
×
2375
    }
2376
    code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
66✔
2377
    if (code != 0) {
66!
2378
      mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2379
      goto _OVER;
×
2380
    }
2381
  }
2382

2383
  mInfo("vgId:%d, vgroup info after redistribute, replica:%d", newVg.vgId, newVg.replica);
66!
2384
  for (int32_t i = 0; i < newVg.replica; ++i) {
208✔
2385
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
142!
2386
  }
2387

2388
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
66!
2389
  code = 0;
66✔
2390

2391
_OVER:
68✔
2392
  mndTransDrop(pTrans);
68✔
2393
  mndReleaseDb(pMnode, pDb);
68✔
2394
  TAOS_RETURN(code);
68✔
2395
}
2396

2397
static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) {
84✔
2398
  SMnode    *pMnode = pReq->info.node;
84✔
2399
  SDnodeObj *pNew1 = NULL;
84✔
2400
  SDnodeObj *pNew2 = NULL;
84✔
2401
  SDnodeObj *pNew3 = NULL;
84✔
2402
  SDnodeObj *pOld1 = NULL;
84✔
2403
  SDnodeObj *pOld2 = NULL;
84✔
2404
  SDnodeObj *pOld3 = NULL;
84✔
2405
  SVgObj    *pVgroup = NULL;
84✔
2406
  SDbObj    *pDb = NULL;
84✔
2407
  int32_t    code = -1;
84✔
2408
  int64_t    curMs = taosGetTimestampMs();
84✔
2409
  int32_t    newDnodeId[3] = {0};
84✔
2410
  int32_t    oldDnodeId[3] = {0};
84✔
2411
  int32_t    newIndex = -1;
84✔
2412
  int32_t    oldIndex = -1;
84✔
2413

2414
  SRedistributeVgroupReq req = {0};
84✔
2415
  if (tDeserializeSRedistributeVgroupReq(pReq->pCont, pReq->contLen, &req) != 0) {
84!
2416
    code = TSDB_CODE_INVALID_MSG;
×
2417
    goto _OVER;
×
2418
  }
2419

2420
  mInfo("vgId:%d, start to redistribute vgroup to dnode %d:%d:%d", req.vgId, req.dnodeId1, req.dnodeId2, req.dnodeId3);
84!
2421
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_REDISTRIBUTE_VGROUP)) != 0) {
84✔
2422
    goto _OVER;
1✔
2423
  }
2424

2425
  pVgroup = mndAcquireVgroup(pMnode, req.vgId);
83✔
2426
  if (pVgroup == NULL) {
83✔
2427
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
3✔
2428
    if (terrno != 0) code = terrno;
3!
2429
    goto _OVER;
3✔
2430
  }
2431

2432
  pDb = mndAcquireDb(pMnode, pVgroup->dbName);
80✔
2433
  if (pDb == NULL) {
80!
2434
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2435
    if (terrno != 0) code = terrno;
×
2436
    goto _OVER;
×
2437
  }
2438

2439
  if (pVgroup->replica == 1) {
80✔
2440
    if (req.dnodeId1 <= 0 || req.dnodeId2 > 0 || req.dnodeId3 > 0) {
32!
2441
      code = TSDB_CODE_MND_INVALID_REPLICA;
×
2442
      goto _OVER;
×
2443
    }
2444

2445
    if (req.dnodeId1 == pVgroup->vnodeGid[0].dnodeId) {
32✔
2446
      // terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
2447
      code = 0;
1✔
2448
      goto _OVER;
1✔
2449
    }
2450

2451
    pNew1 = mndAcquireDnode(pMnode, req.dnodeId1);
31✔
2452
    if (pNew1 == NULL) {
31!
2453
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2454
      if (terrno != 0) code = terrno;
×
2455
      goto _OVER;
×
2456
    }
2457
    if (!mndIsDnodeOnline(pNew1, curMs)) {
31!
2458
      code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2459
      goto _OVER;
×
2460
    }
2461

2462
    pOld1 = mndAcquireDnode(pMnode, pVgroup->vnodeGid[0].dnodeId);
31✔
2463
    if (pOld1 == NULL) {
31!
2464
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2465
      if (terrno != 0) code = terrno;
×
2466
      goto _OVER;
×
2467
    }
2468
    if (!mndIsDnodeOnline(pOld1, curMs)) {
31✔
2469
      code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
2✔
2470
      goto _OVER;
2✔
2471
    }
2472

2473
    code = mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, NULL, NULL, NULL, NULL);
29✔
2474

2475
  } else if (pVgroup->replica == 3) {
48!
2476
    if (req.dnodeId1 <= 0 || req.dnodeId2 <= 0 || req.dnodeId3 <= 0) {
48!
2477
      code = TSDB_CODE_MND_INVALID_REPLICA;
4✔
2478
      goto _OVER;
4✔
2479
    }
2480

2481
    if (req.dnodeId1 == req.dnodeId2 || req.dnodeId1 == req.dnodeId3 || req.dnodeId2 == req.dnodeId3) {
44!
2482
      code = TSDB_CODE_MND_INVALID_REPLICA;
1✔
2483
      goto _OVER;
1✔
2484
    }
2485

2486
    if (req.dnodeId1 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId1 != pVgroup->vnodeGid[1].dnodeId &&
43✔
2487
        req.dnodeId1 != pVgroup->vnodeGid[2].dnodeId) {
22✔
2488
      newDnodeId[++newIndex] = req.dnodeId1;
19✔
2489
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
19!
2490
    }
2491

2492
    if (req.dnodeId2 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[1].dnodeId &&
43✔
2493
        req.dnodeId2 != pVgroup->vnodeGid[2].dnodeId) {
29✔
2494
      newDnodeId[++newIndex] = req.dnodeId2;
22✔
2495
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
22!
2496
    }
2497

2498
    if (req.dnodeId3 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId3 != pVgroup->vnodeGid[1].dnodeId &&
43✔
2499
        req.dnodeId3 != pVgroup->vnodeGid[2].dnodeId) {
35✔
2500
      newDnodeId[++newIndex] = req.dnodeId3;
29✔
2501
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
29!
2502
    }
2503

2504
    if (req.dnodeId1 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[0].dnodeId &&
43✔
2505
        req.dnodeId3 != pVgroup->vnodeGid[0].dnodeId) {
24✔
2506
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[0].dnodeId;
19✔
2507
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
19!
2508
    }
2509

2510
    if (req.dnodeId1 != pVgroup->vnodeGid[1].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[1].dnodeId &&
43✔
2511
        req.dnodeId3 != pVgroup->vnodeGid[1].dnodeId) {
27✔
2512
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[1].dnodeId;
24✔
2513
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
24!
2514
    }
2515

2516
    if (req.dnodeId1 != pVgroup->vnodeGid[2].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[2].dnodeId &&
43✔
2517
        req.dnodeId3 != pVgroup->vnodeGid[2].dnodeId) {
33✔
2518
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[2].dnodeId;
27✔
2519
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
27!
2520
    }
2521

2522
    if (newDnodeId[0] != 0) {
43✔
2523
      pNew1 = mndAcquireDnode(pMnode, newDnodeId[0]);
42✔
2524
      if (pNew1 == NULL) {
42!
2525
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2526
        if (terrno != 0) code = terrno;
×
2527
        goto _OVER;
×
2528
      }
2529
      if (!mndIsDnodeOnline(pNew1, curMs)) {
42✔
2530
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
1✔
2531
        goto _OVER;
1✔
2532
      }
2533
    }
2534

2535
    if (newDnodeId[1] != 0) {
42✔
2536
      pNew2 = mndAcquireDnode(pMnode, newDnodeId[1]);
18✔
2537
      if (pNew2 == NULL) {
18!
2538
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2539
        if (terrno != 0) code = terrno;
×
2540
        goto _OVER;
×
2541
      }
2542
      if (!mndIsDnodeOnline(pNew2, curMs)) {
18!
2543
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2544
        goto _OVER;
×
2545
      }
2546
    }
2547

2548
    if (newDnodeId[2] != 0) {
42✔
2549
      pNew3 = mndAcquireDnode(pMnode, newDnodeId[2]);
10✔
2550
      if (pNew3 == NULL) {
10!
2551
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2552
        if (terrno != 0) code = terrno;
×
2553
        goto _OVER;
×
2554
      }
2555
      if (!mndIsDnodeOnline(pNew3, curMs)) {
10!
2556
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2557
        goto _OVER;
×
2558
      }
2559
    }
2560

2561
    if (oldDnodeId[0] != 0) {
42✔
2562
      pOld1 = mndAcquireDnode(pMnode, oldDnodeId[0]);
41✔
2563
      if (pOld1 == NULL) {
41!
2564
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2565
        if (terrno != 0) code = terrno;
×
2566
        goto _OVER;
×
2567
      }
2568
      if (!mndIsDnodeOnline(pOld1, curMs)) {
41✔
2569
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
2✔
2570
        goto _OVER;
2✔
2571
      }
2572
    }
2573

2574
    if (oldDnodeId[1] != 0) {
40✔
2575
      pOld2 = mndAcquireDnode(pMnode, oldDnodeId[1]);
16✔
2576
      if (pOld2 == NULL) {
16!
2577
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2578
        if (terrno != 0) code = terrno;
×
2579
        goto _OVER;
×
2580
      }
2581
      if (!mndIsDnodeOnline(pOld2, curMs)) {
16!
2582
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2583
        goto _OVER;
×
2584
      }
2585
    }
2586

2587
    if (oldDnodeId[2] != 0) {
40✔
2588
      pOld3 = mndAcquireDnode(pMnode, oldDnodeId[2]);
8✔
2589
      if (pOld3 == NULL) {
8!
2590
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2591
        if (terrno != 0) code = terrno;
×
2592
        goto _OVER;
×
2593
      }
2594
      if (!mndIsDnodeOnline(pOld3, curMs)) {
8!
2595
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2596
        goto _OVER;
×
2597
      }
2598
    }
2599

2600
    if (pNew1 == NULL && pOld1 == NULL && pNew2 == NULL && pOld2 == NULL && pNew3 == NULL && pOld3 == NULL) {
40!
2601
      // terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
2602
      code = 0;
1✔
2603
      goto _OVER;
1✔
2604
    }
2605

2606
    code = mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, pNew2, pOld2, pNew3, pOld3);
39✔
2607

2608
  } else {
2609
    code = TSDB_CODE_MND_REQ_REJECTED;
×
2610
    goto _OVER;
×
2611
  }
2612

2613
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
68✔
2614

2615
  char obj[33] = {0};
68✔
2616
  (void)tsnprintf(obj, sizeof(obj), "%d", req.vgId);
68✔
2617

2618
  auditRecord(pReq, pMnode->clusterId, "RedistributeVgroup", "", obj, req.sql, req.sqlLen);
68✔
2619

2620
_OVER:
84✔
2621
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
84✔
2622
    mError("vgId:%d, failed to redistribute to dnode %d:%d:%d since %s", req.vgId, req.dnodeId1, req.dnodeId2,
16!
2623
           req.dnodeId3, tstrerror(code));
2624
  }
2625

2626
  mndReleaseDnode(pMnode, pNew1);
84✔
2627
  mndReleaseDnode(pMnode, pNew2);
84✔
2628
  mndReleaseDnode(pMnode, pNew3);
84✔
2629
  mndReleaseDnode(pMnode, pOld1);
84✔
2630
  mndReleaseDnode(pMnode, pOld2);
84✔
2631
  mndReleaseDnode(pMnode, pOld3);
84✔
2632
  mndReleaseVgroup(pMnode, pVgroup);
84✔
2633
  mndReleaseDb(pMnode, pDb);
84✔
2634
  tFreeSRedistributeVgroupReq(&req);
84✔
2635

2636
  TAOS_RETURN(code);
84✔
2637
}
2638

2639
static void *mndBuildSForceBecomeFollowerReq(SMnode *pMnode, SVgObj *pVgroup, int32_t dnodeId, int32_t *pContLen) {
12✔
2640
  SForceBecomeFollowerReq balanceReq = {
12✔
2641
      .vgId = pVgroup->vgId,
12✔
2642
  };
2643

2644
  int32_t contLen = tSerializeSForceBecomeFollowerReq(NULL, 0, &balanceReq);
12✔
2645
  if (contLen < 0) {
12!
2646
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2647
    return NULL;
×
2648
  }
2649
  contLen += sizeof(SMsgHead);
12✔
2650

2651
  void *pReq = taosMemoryMalloc(contLen);
12!
2652
  if (pReq == NULL) {
12!
2653
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2654
    return NULL;
×
2655
  }
2656

2657
  SMsgHead *pHead = pReq;
12✔
2658
  pHead->contLen = htonl(contLen);
12✔
2659
  pHead->vgId = htonl(pVgroup->vgId);
12✔
2660

2661
  if (tSerializeSForceBecomeFollowerReq((char *)pReq + sizeof(SMsgHead), contLen, &balanceReq) < 0) {
12!
2662
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2663
    taosMemoryFree(pReq);
×
2664
    return NULL;
×
2665
  }
2666
  *pContLen = contLen;
12✔
2667
  return pReq;
12✔
2668
}
2669

2670
int32_t mndAddBalanceVgroupLeaderAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t dnodeId) {
12✔
2671
  int32_t    code = 0;
12✔
2672
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
12✔
2673
  if (pDnode == NULL) {
12!
2674
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2675
    if (terrno != 0) code = terrno;
×
2676
    TAOS_RETURN(code);
×
2677
  }
2678

2679
  STransAction action = {0};
12✔
2680
  action.epSet = mndGetDnodeEpset(pDnode);
12✔
2681
  mndReleaseDnode(pMnode, pDnode);
12✔
2682

2683
  int32_t contLen = 0;
12✔
2684
  void   *pReq = mndBuildSForceBecomeFollowerReq(pMnode, pVgroup, dnodeId, &contLen);
12✔
2685
  if (pReq == NULL) {
12!
2686
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2687
    if (terrno != 0) code = terrno;
×
2688
    TAOS_RETURN(code);
×
2689
  }
2690

2691
  action.pCont = pReq;
12✔
2692
  action.contLen = contLen;
12✔
2693
  action.msgType = TDMT_SYNC_FORCE_FOLLOWER;
12✔
2694

2695
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
12!
2696
    taosMemoryFree(pReq);
×
2697
    TAOS_RETURN(code);
×
2698
  }
2699

2700
  TAOS_RETURN(code);
12✔
2701
}
2702

2703
int32_t mndAddVgroupBalanceToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTrans) {
14✔
2704
  int32_t code = 0;
14✔
2705
  SSdb   *pSdb = pMnode->pSdb;
14✔
2706

2707
  int32_t vgid = pVgroup->vgId;
14✔
2708
  int8_t  replica = pVgroup->replica;
14✔
2709

2710
  if (pVgroup->replica <= 1) {
14✔
2711
    mInfo("trans:%d, vgid:%d no need to balance, replica:%d", pTrans->id, vgid, replica);
1!
2712
    return -1;
1✔
2713
  }
2714

2715
  int32_t dnodeId = 0;
13✔
2716

2717
  for (int i = 0; i < replica; i++) {
26✔
2718
    if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEADER) {
25✔
2719
      dnodeId = pVgroup->vnodeGid[i].dnodeId;
12✔
2720
      break;
12✔
2721
    }
2722
  }
2723

2724
  bool       exist = false;
13✔
2725
  bool       online = false;
13✔
2726
  int64_t    curMs = taosGetTimestampMs();
13✔
2727
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
13✔
2728
  if (pDnode != NULL) {
13✔
2729
    exist = true;
12✔
2730
    online = mndIsDnodeOnline(pDnode, curMs);
12✔
2731
    mndReleaseDnode(pMnode, pDnode);
12✔
2732
  }
2733

2734
  if (exist && online) {
25!
2735
    mInfo("trans:%d, vgid:%d leader to dnode:%d", pTrans->id, vgid, dnodeId);
12!
2736

2737
    if ((code = mndAddBalanceVgroupLeaderAction(pMnode, pTrans, pVgroup, dnodeId)) != 0) {
12!
2738
      mError("trans:%d, vgid:%d failed to be balanced to dnode:%d", pTrans->id, vgid, dnodeId);
×
2739
      TAOS_RETURN(code);
×
2740
    }
2741

2742
    SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
12✔
2743
    if (pDb == NULL) {
12!
2744
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2745
      if (terrno != 0) code = terrno;
×
2746
      mError("trans:%d, vgid:%d failed to be balanced to dnode:%d, because db not exist", pTrans->id, vgid, dnodeId);
×
2747
      TAOS_RETURN(code);
×
2748
    }
2749

2750
    mndReleaseDb(pMnode, pDb);
12✔
2751
  } else {
2752
    mInfo("trans:%d, vgid:%d cant be balanced to dnode:%d, exist:%d, online:%d", pTrans->id, vgid, dnodeId, exist,
1!
2753
          online);
2754
  }
2755

2756
  TAOS_RETURN(code);
13✔
2757
}
2758

2759
extern int32_t mndProcessVgroupBalanceLeaderMsgImp(SRpcMsg *pReq);
2760

2761
int32_t mndProcessVgroupBalanceLeaderMsg(SRpcMsg *pReq) { return mndProcessVgroupBalanceLeaderMsgImp(pReq); }
5✔
2762

2763
#ifndef TD_ENTERPRISE
2764
int32_t mndProcessVgroupBalanceLeaderMsgImp(SRpcMsg *pReq) { return 0; }
2765
#endif
2766

2767
static int32_t mndCheckDnodeMemory(SMnode *pMnode, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pOldVgroup,
413✔
2768
                                   SVgObj *pNewVgroup, SArray *pArray) {
2769
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
1,064✔
2770
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
651✔
2771
    bool       inVgroup = false;
651✔
2772
    int64_t    oldMemUsed = 0;
651✔
2773
    int64_t    newMemUsed = 0;
651✔
2774
    mDebug("db:%s, vgId:%d, check dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName, pNewVgroup->vgId,
651✔
2775
           pDnode->id, pDnode->memAvail, pDnode->memUsed);
2776
    for (int32_t j = 0; j < pOldVgroup->replica; ++j) {
1,926✔
2777
      SVnodeGid *pVgId = &pOldVgroup->vnodeGid[j];
1,275✔
2778
      if (pDnode->id == pVgId->dnodeId) {
1,275✔
2779
        oldMemUsed = mndGetVgroupMemory(pMnode, pOldDb, pOldVgroup);
621✔
2780
        inVgroup = true;
621✔
2781
      }
2782
    }
2783
    for (int32_t j = 0; j < pNewVgroup->replica; ++j) {
1,926✔
2784
      SVnodeGid *pVgId = &pNewVgroup->vnodeGid[j];
1,275✔
2785
      if (pDnode->id == pVgId->dnodeId) {
1,275✔
2786
        newMemUsed = mndGetVgroupMemory(pMnode, pNewDb, pNewVgroup);
621✔
2787
        inVgroup = true;
621✔
2788
      }
2789
    }
2790

2791
    mDebug("db:%s, vgId:%d, memory in dnode:%d, oldUsed:%" PRId64 ", newUsed:%" PRId64, pNewVgroup->dbName,
651✔
2792
           pNewVgroup->vgId, pDnode->id, oldMemUsed, newMemUsed);
2793

2794
    pDnode->memUsed = pDnode->memUsed - oldMemUsed + newMemUsed;
651✔
2795
    if (pDnode->memAvail - pDnode->memUsed <= 0) {
651!
2796
      mError("db:%s, vgId:%d, no enough memory in dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName,
×
2797
             pNewVgroup->vgId, pDnode->id, pDnode->memAvail, pDnode->memUsed);
2798
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE);
×
2799
    } else if (inVgroup) {
651✔
2800
      mInfo("db:%s, vgId:%d, memory in dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName, pNewVgroup->vgId,
621!
2801
            pDnode->id, pDnode->memAvail, pDnode->memUsed);
2802
    } else {
2803
    }
2804
  }
2805
  return 0;
413✔
2806
}
2807

2808
int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
467✔
2809
                                  SArray *pArray, SVgObj *pNewVgroup) {
2810
  int32_t code = 0;
467✔
2811
  memcpy(pNewVgroup, pVgroup, sizeof(SVgObj));
467✔
2812

2813
  if (pVgroup->replica <= 0 || pVgroup->replica == pNewDb->cfg.replications) {
467!
2814
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfigAction(pMnode, pTrans, pNewDb, pVgroup));
413!
2815
    TAOS_CHECK_RETURN(mndCheckDnodeMemory(pMnode, pOldDb, pNewDb, pNewVgroup, pVgroup, pArray));
413!
2816
    return 0;
413✔
2817
  }
2818

2819
  mndTransSetSerial(pTrans);
54✔
2820

2821
  if (pNewDb->cfg.replications == 3) {
54✔
2822
    mInfo("db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId,
50!
2823
          pVgroup->vnodeGid[0].dnodeId);
2824

2825
    // add second
2826
    if (pNewVgroup->replica == 1) {
50!
2827
      TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
50!
2828
    }
2829

2830
    // learner stage
2831
    pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
50✔
2832
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
50✔
2833
    TAOS_CHECK_RETURN(
50!
2834
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
2835

2836
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &pNewVgroup->vnodeGid[1]));
50!
2837

2838
    // follower stage
2839
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
50✔
2840
    TAOS_CHECK_RETURN(mndAddAlterVnodeTypeAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
50!
2841
    TAOS_CHECK_RETURN(
50!
2842
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
2843

2844
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
50!
2845

2846
    // add third
2847
    if (pNewVgroup->replica == 2) {
50!
2848
      TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
50✔
2849
    }
2850

2851
    pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
41✔
2852
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
41✔
2853
    pNewVgroup->vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
41✔
2854
    TAOS_CHECK_RETURN(
41!
2855
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
2856
    TAOS_CHECK_RETURN(
41!
2857
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
2858
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &pNewVgroup->vnodeGid[2]));
41!
2859

2860
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
41!
2861
  } else if (pNewDb->cfg.replications == 1) {
4!
2862
    mInfo("db:%s, vgId:%d, will remove 2 vnodes, vn:0 dnode:%d vn:1 dnode:%d vn:2 dnode:%d", pVgroup->dbName,
4!
2863
          pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId, pVgroup->vnodeGid[1].dnodeId, pVgroup->vnodeGid[2].dnodeId);
2864

2865
    SVnodeGid del1 = {0};
4✔
2866
    SVnodeGid del2 = {0};
4✔
2867
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroup(pMnode, pTrans, pNewVgroup, pArray, &del1));
4!
2868
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &del1, true));
4!
2869
    TAOS_CHECK_RETURN(
4!
2870
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
2871
    TAOS_CHECK_RETURN(
4!
2872
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
2873
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
4!
2874

2875
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroup(pMnode, pTrans, pNewVgroup, pArray, &del2));
4!
2876
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &del2, true));
4!
2877
    TAOS_CHECK_RETURN(
4!
2878
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
2879
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
4!
2880
  } else if (pNewDb->cfg.replications == 2) {
×
2881
    mInfo("db:%s, vgId:%d, will add 1 vnode, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId,
×
2882
          pVgroup->vnodeGid[0].dnodeId);
2883

2884
    // add second
2885
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
×
2886

2887
    // learner stage
2888
    pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2889
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
2890
    TAOS_CHECK_RETURN(
×
2891
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
2892

2893
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &pNewVgroup->vnodeGid[1]));
×
2894

2895
    // follower stage
2896
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2897
    TAOS_CHECK_RETURN(mndAddAlterVnodeTypeAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
×
2898
    TAOS_CHECK_RETURN(
×
2899
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
2900

2901
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
×
2902
  } else {
2903
    return -1;
×
2904
  }
2905

2906
  mndSortVnodeGid(pNewVgroup);
45✔
2907

2908
  {
2909
    SSdbRaw *pVgRaw = mndVgroupActionEncode(pNewVgroup);
45✔
2910
    if (pVgRaw == NULL) {
45!
2911
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2912
      if (terrno != 0) code = terrno;
×
2913
      TAOS_RETURN(code);
×
2914
    }
2915
    if ((code = mndTransAppendCommitlog(pTrans, pVgRaw)) != 0) {
45!
2916
      sdbFreeRaw(pVgRaw);
×
2917
      TAOS_RETURN(code);
×
2918
    }
2919
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
45✔
2920
    if (code != 0) {
45!
2921
      mError("vgId:%d, failed to set raw status since %s at line:%d", pNewVgroup->vgId, tstrerror(code), __LINE__);
×
2922
      TAOS_RETURN(code);
×
2923
    }
2924
  }
2925

2926
  TAOS_RETURN(code);
45✔
2927
}
2928

2929
int32_t mndBuildRaftAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
×
2930
                                      SArray *pArray) {
2931
  int32_t code = 0;
×
2932
  SVgObj  newVgroup = {0};
×
2933
  memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
×
2934

2935
  if (pVgroup->replica <= 0 || pVgroup->replica == pNewDb->cfg.replications) {
×
2936
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfigAction(pMnode, pTrans, pNewDb, pVgroup));
×
2937
    TAOS_CHECK_RETURN(mndCheckDnodeMemory(pMnode, pOldDb, pNewDb, &newVgroup, pVgroup, pArray));
×
2938
    return 0;
×
2939
  }
2940

2941
  mndTransSetSerial(pTrans);
×
2942

2943
  mInfo("trans:%d, vgId:%d, alter vgroup, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id, pVgroup->vgId,
×
2944
        pVgroup->syncConfChangeVer, pVgroup->version, pVgroup->replica);
2945

2946
  if (newVgroup.replica == 1 && pNewDb->cfg.replications == 3) {
×
2947
    mInfo("db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId,
×
2948
          pVgroup->vnodeGid[0].dnodeId);
2949

2950
    // add second
2951
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVgroup, pArray));
×
2952
    // add third
2953
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVgroup, pArray));
×
2954

2955
    // add learner stage
2956
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2957
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
2958
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
2959
    TAOS_CHECK_RETURN(
×
2960
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
2961
    mInfo("trans:%d, vgId:%d, add change config, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id,
×
2962
          pVgroup->vgId, newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
2963
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[1]));
×
2964
    mInfo("trans:%d, vgId:%d, create vnode, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id, pVgroup->vgId,
×
2965
          newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
2966
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[2]));
×
2967
    mInfo("trans:%d, vgId:%d, create vnode, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id, pVgroup->vgId,
×
2968
          newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
2969

2970
    // check learner
2971
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2972
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2973
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2974
    TAOS_CHECK_RETURN(
×
2975
        mndAddCheckLearnerCatchupAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[1].dnodeId));
2976
    TAOS_CHECK_RETURN(
×
2977
        mndAddCheckLearnerCatchupAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[2].dnodeId));
2978

2979
    // change raft type
2980
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2981
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2982
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
2983
    TAOS_CHECK_RETURN(
×
2984
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
2985

2986
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
2987

2988
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2989
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2990
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2991
    TAOS_CHECK_RETURN(
×
2992
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
2993

2994
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
2995

2996
    SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
×
2997
    if (pVgRaw == NULL) {
×
2998
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2999
      if (terrno != 0) code = terrno;
×
3000
      TAOS_RETURN(code);
×
3001
    }
3002
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
3003
      sdbFreeRaw(pVgRaw);
×
3004
      TAOS_RETURN(code);
×
3005
    }
3006
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3007
    if (code != 0) {
×
3008
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3009
             __LINE__);
3010
      TAOS_RETURN(code);
×
3011
    }
3012
  } else if (newVgroup.replica == 3 && pNewDb->cfg.replications == 1) {
×
3013
    mInfo("db:%s, vgId:%d, will remove 2 vnodes, vn:0 dnode:%d vn:1 dnode:%d vn:2 dnode:%d", pVgroup->dbName,
×
3014
          pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId, pVgroup->vnodeGid[1].dnodeId, pVgroup->vnodeGid[2].dnodeId);
3015

3016
    SVnodeGid del1 = {0};
×
3017
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroupWithoutSave(pMnode, pTrans, &newVgroup, pArray, &del1));
×
3018

3019
    TAOS_CHECK_RETURN(
×
3020
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3021

3022
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3023

3024
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del1, true));
×
3025

3026
    SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
×
3027
    if (pVgRaw == NULL) {
×
3028
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3029
      if (terrno != 0) code = terrno;
×
3030
      TAOS_RETURN(code);
×
3031
    }
3032
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
3033
      sdbFreeRaw(pVgRaw);
×
3034
      TAOS_RETURN(code);
×
3035
    }
3036
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3037
    if (code != 0) {
×
3038
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3039
             __LINE__);
3040
      TAOS_RETURN(code);
×
3041
    }
3042

3043
    SVnodeGid del2 = {0};
×
3044
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroupWithoutSave(pMnode, pTrans, &newVgroup, pArray, &del2));
×
3045

3046
    TAOS_CHECK_RETURN(
×
3047
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3048

3049
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3050

3051
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del2, true));
×
3052

3053
    pVgRaw = mndVgroupActionEncode(&newVgroup);
×
3054
    if (pVgRaw == NULL) {
×
3055
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3056
      if (terrno != 0) code = terrno;
×
3057
      TAOS_RETURN(code);
×
3058
    }
3059
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
3060
      sdbFreeRaw(pVgRaw);
×
3061
      TAOS_RETURN(code);
×
3062
    }
3063
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3064
    if (code != 0) {
×
3065
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3066
             __LINE__);
3067
      TAOS_RETURN(code);
×
3068
    }
3069
  } else {
3070
    return -1;
×
3071
  }
3072

3073
  mndSortVnodeGid(&newVgroup);
×
3074

3075
  {
3076
    SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
×
3077
    if (pVgRaw == NULL) {
×
3078
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3079
      if (terrno != 0) code = terrno;
×
3080
      TAOS_RETURN(code);
×
3081
    }
3082
    if ((code = mndTransAppendCommitlog(pTrans, pVgRaw)) != 0) {
×
3083
      sdbFreeRaw(pVgRaw);
×
3084
      TAOS_RETURN(code);
×
3085
    }
3086
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3087
    if (code != 0) {
×
3088
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3089
             __LINE__);
3090
      TAOS_RETURN(code);
×
3091
    }
3092
  }
3093

3094
  TAOS_RETURN(code);
×
3095
}
3096

3097
int32_t mndBuildRestoreAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *db, SVgObj *pVgroup, SDnodeObj *pDnode,
10✔
3098
                                         SDnodeObj *pAnotherDnode) {
3099
  int32_t code = 0;
10✔
3100
  SVgObj  newVgroup = {0};
10✔
3101
  memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
10✔
3102

3103
  mInfo("db:%s, vgId:%d, restore vnodes, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId);
10!
3104

3105
  if (newVgroup.replica == 1) {
10!
3106
    int selected = 0;
×
3107
    for (int i = 0; i < newVgroup.replica; i++) {
×
3108
      newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3109
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3110
        selected = i;
×
3111
      }
3112
    }
3113
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, &newVgroup.vnodeGid[selected]));
×
3114
  } else if (newVgroup.replica == 2) {
10!
3115
    for (int i = 0; i < newVgroup.replica; i++) {
×
3116
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3117
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3118
      } else {
3119
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3120
      }
3121
    }
3122
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pAnotherDnode));
×
3123

3124
    for (int i = 0; i < newVgroup.replica; i++) {
×
3125
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3126
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3127
      } else {
3128
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3129
      }
3130
    }
3131
    TAOS_CHECK_RETURN(mndRestoreAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, pDnode));
×
3132

3133
    for (int i = 0; i < newVgroup.replica; i++) {
×
3134
      newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3135
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3136
      }
3137
    }
3138
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pDnode));
×
3139
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pAnotherDnode));
×
3140
  } else if (newVgroup.replica == 3) {
10!
3141
    for (int i = 0; i < newVgroup.replica; i++) {
40✔
3142
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
30✔
3143
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_LEARNER;
10✔
3144
      } else {
3145
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
20✔
3146
      }
3147
    }
3148
    TAOS_CHECK_RETURN(mndRestoreAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, pDnode));
10!
3149

3150
    for (int i = 0; i < newVgroup.replica; i++) {
40✔
3151
      newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
30✔
3152
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
30✔
3153
      }
3154
    }
3155
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pDnode));
10!
3156
  }
3157
  SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
10✔
3158
  if (pVgRaw == NULL) {
10!
3159
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3160
    if (terrno != 0) code = terrno;
×
3161
    TAOS_RETURN(code);
×
3162
  }
3163
  if ((code = mndTransAppendCommitlog(pTrans, pVgRaw)) != 0) {
10!
3164
    sdbFreeRaw(pVgRaw);
×
3165
    TAOS_RETURN(code);
×
3166
  }
3167
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
10✔
3168
  if (code != 0) {
10!
3169
    mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code), __LINE__);
×
3170
    TAOS_RETURN(code);
×
3171
  }
3172

3173
  TAOS_RETURN(code);
10✔
3174
}
3175

3176
static int32_t mndAddAdjustVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
×
3177
  return 0;
×
3178
}
3179

3180
typedef int32_t (*FpTransActionCb)(STrans *pTrans, SSdbRaw *pRaw);
3181

3182
static int32_t mndAddVgStatusAction(STrans *pTrans, SVgObj *pVg, ESdbStatus vgStatus, ETrnStage stage) {
91✔
3183
  int32_t         code = 0;
91✔
3184
  FpTransActionCb appendActionCb = (stage == TRN_STAGE_COMMIT_ACTION) ? mndTransAppendCommitlog : mndTransAppendRedolog;
91✔
3185
  SSdbRaw        *pRaw = mndVgroupActionEncode(pVg);
91✔
3186
  if (pRaw == NULL) {
91!
3187
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3188
    if (terrno != 0) code = terrno;
×
3189
    goto _err;
×
3190
  }
3191
  if ((code = appendActionCb(pTrans, pRaw)) != 0) goto _err;
91!
3192
  code = sdbSetRawStatus(pRaw, vgStatus);
91✔
3193
  if (code != 0) {
91!
3194
    mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", pVg->vgId, tstrerror(code), __LINE__);
×
3195
    goto _err;
×
3196
  }
3197
  pRaw = NULL;
91✔
3198
  TAOS_RETURN(code);
91✔
3199
_err:
×
3200
  sdbFreeRaw(pRaw);
×
3201
  TAOS_RETURN(code);
×
3202
}
3203

3204
static int32_t mndAddDbStatusAction(STrans *pTrans, SDbObj *pDb, ESdbStatus dbStatus, ETrnStage stage) {
37✔
3205
  int32_t         code = 0;
37✔
3206
  FpTransActionCb appendActionCb = (stage == TRN_STAGE_COMMIT_ACTION) ? mndTransAppendCommitlog : mndTransAppendRedolog;
37✔
3207
  SSdbRaw        *pRaw = mndDbActionEncode(pDb);
37✔
3208
  if (pRaw == NULL) {
37!
3209
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3210
    if (terrno != 0) code = terrno;
×
3211
    goto _err;
×
3212
  }
3213
  if ((code = appendActionCb(pTrans, pRaw)) != 0) goto _err;
37!
3214
  code = sdbSetRawStatus(pRaw, dbStatus);
37✔
3215
  if (code != 0) {
37!
3216
    mError("db:%s, failed to set raw status to ready, error:%s, line:%d", pDb->name, tstrerror(code), __LINE__);
×
3217
    goto _err;
×
3218
  }
3219
  pRaw = NULL;
37✔
3220
  TAOS_RETURN(code);
37✔
3221
_err:
×
3222
  sdbFreeRaw(pRaw);
×
3223
  TAOS_RETURN(code);
×
3224
}
3225

3226
int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) {
24✔
3227
  int32_t code = -1;
24✔
3228
  STrans *pTrans = NULL;
24✔
3229
  SDbObj  dbObj = {0};
24✔
3230
  SArray *pArray = mndBuildDnodesArray(pMnode, 0, NULL);
24✔
3231

3232
  int32_t numOfStreams = 0;
24✔
3233
  if ((code = mndGetNumOfStreams(pMnode, pDb->name, &numOfStreams)) != 0) {
24!
3234
    goto _OVER;
×
3235
  }
3236
  if (numOfStreams > 0) {
24!
3237
    code = TSDB_CODE_MND_STREAM_MUST_BE_DELETED;
×
3238
    goto _OVER;
×
3239
  }
3240

3241
#if defined(USE_S3)
3242
  extern int8_t tsS3Enabled;
3243
  if (tsS3Enabled) {
24!
3244
    code = TSDB_CODE_OPS_NOT_SUPPORT;
×
3245
    mError("vgId:%d, db:%s, s3 exists, split vgroup not allowed", pVgroup->vgId, pVgroup->dbName);
×
3246
    goto _OVER;
×
3247
  }
3248
#endif
3249

3250
  if (pDb->cfg.withArbitrator) {
24!
3251
    code = TSDB_CODE_OPS_NOT_SUPPORT;
×
3252
    mError("vgId:%d, db:%s, with arbitrator, split vgroup not allowed", pVgroup->vgId, pVgroup->dbName);
×
3253
    goto _OVER;
×
3254
  }
3255

3256
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "split-vgroup");
24✔
3257
  if (pTrans == NULL) {
24!
3258
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3259
    if (terrno != 0) code = terrno;
×
3260
    goto _OVER;
×
3261
  }
3262
  mndTransSetSerial(pTrans);
24✔
3263
  mInfo("trans:%d, used to split vgroup, vgId:%d", pTrans->id, pVgroup->vgId);
24!
3264

3265
  mndTransSetDbName(pTrans, pDb->name, NULL);
24✔
3266
  TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
24✔
3267

3268
  SVgObj newVg1 = {0};
23✔
3269
  memcpy(&newVg1, pVgroup, sizeof(SVgObj));
23✔
3270
  mInfo("vgId:%d, vgroup info before split, replica:%d hashBegin:%u hashEnd:%u", newVg1.vgId, newVg1.replica,
23!
3271
        newVg1.hashBegin, newVg1.hashEnd);
3272
  for (int32_t i = 0; i < newVg1.replica; ++i) {
84✔
3273
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg1.vgId, i, newVg1.vnodeGid[i].dnodeId);
61!
3274
  }
3275

3276
  if (newVg1.replica == 1) {
23✔
3277
    TAOS_CHECK_GOTO(mndAddVnodeToVgroup(pMnode, pTrans, &newVg1, pArray), NULL, _OVER);
4!
3278

3279
    newVg1.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
4✔
3280
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId), NULL,
4!
3281
                    _OVER);
3282
    TAOS_CHECK_GOTO(mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg1, &newVg1.vnodeGid[1]), NULL, _OVER);
4!
3283

3284
    newVg1.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
4✔
3285
    TAOS_CHECK_GOTO(mndAddAlterVnodeTypeAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[1].dnodeId), NULL, _OVER);
4!
3286
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId), NULL,
4!
3287
                    _OVER);
3288

3289
    TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1), NULL, _OVER);
4!
3290
  } else if (newVg1.replica == 3) {
19!
3291
    SVnodeGid del1 = {0};
19✔
3292
    TAOS_CHECK_GOTO(mndRemoveVnodeFromVgroup(pMnode, pTrans, &newVg1, pArray, &del1), NULL, _OVER);
19!
3293
    TAOS_CHECK_GOTO(mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg1, &del1, true), NULL, _OVER);
19!
3294
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId), NULL,
19!
3295
                    _OVER);
3296
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[1].dnodeId), NULL,
19!
3297
                    _OVER);
3298
  } else {
3299
    goto _OVER;
×
3300
  }
3301

3302
  for (int32_t i = 0; i < newVg1.replica; ++i) {
69✔
3303
    TAOS_CHECK_GOTO(mndAddDisableVnodeWriteAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[i].dnodeId), NULL,
46!
3304
                    _OVER);
3305
  }
3306
  TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1), NULL, _OVER);
23!
3307

3308
  SVgObj newVg2 = {0};
23✔
3309
  memcpy(&newVg2, &newVg1, sizeof(SVgObj));
23✔
3310
  newVg1.replica = 1;
23✔
3311
  newVg1.hashEnd = newVg1.hashBegin / 2 + newVg1.hashEnd / 2;
23✔
3312
  memset(&newVg1.vnodeGid[1], 0, sizeof(SVnodeGid));
23✔
3313

3314
  newVg2.replica = 1;
23✔
3315
  newVg2.hashBegin = newVg1.hashEnd + 1;
23✔
3316
  memcpy(&newVg2.vnodeGid[0], &newVg2.vnodeGid[1], sizeof(SVnodeGid));
23✔
3317
  memset(&newVg2.vnodeGid[1], 0, sizeof(SVnodeGid));
23✔
3318

3319
  mInfo("vgId:%d, vgroup info after split, replica:%d hashrange:[%u, %u] vnode:0 dnode:%d", newVg1.vgId, newVg1.replica,
23!
3320
        newVg1.hashBegin, newVg1.hashEnd, newVg1.vnodeGid[0].dnodeId);
3321
  for (int32_t i = 0; i < newVg1.replica; ++i) {
46✔
3322
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg1.vgId, i, newVg1.vnodeGid[i].dnodeId);
23!
3323
  }
3324
  mInfo("vgId:%d, vgroup info after split, replica:%d hashrange:[%u, %u] vnode:0 dnode:%d", newVg2.vgId, newVg2.replica,
23!
3325
        newVg2.hashBegin, newVg2.hashEnd, newVg2.vnodeGid[0].dnodeId);
3326
  for (int32_t i = 0; i < newVg1.replica; ++i) {
46✔
3327
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg2.vgId, i, newVg2.vnodeGid[i].dnodeId);
23!
3328
  }
3329

3330
  // alter vgId and hash range
3331
  int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
23✔
3332
  int32_t srcVgId = newVg1.vgId;
23✔
3333
  newVg1.vgId = maxVgId;
23✔
3334
  TAOS_CHECK_GOTO(mndAddNewVgPrepareAction(pMnode, pTrans, &newVg1), NULL, _OVER);
23!
3335
  TAOS_CHECK_GOTO(mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg1), NULL, _OVER);
23!
3336

3337
  maxVgId++;
23✔
3338
  srcVgId = newVg2.vgId;
23✔
3339
  newVg2.vgId = maxVgId;
23✔
3340
  TAOS_CHECK_GOTO(mndAddNewVgPrepareAction(pMnode, pTrans, &newVg2), NULL, _OVER);
23!
3341
  TAOS_CHECK_GOTO(mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg2), NULL, _OVER);
23!
3342

3343
  TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1), NULL, _OVER);
23!
3344
  TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg2), NULL, _OVER);
23!
3345

3346
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg1, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION), NULL, _OVER);
23!
3347
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg2, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION), NULL, _OVER);
23!
3348
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, pVgroup, SDB_STATUS_DROPPED, TRN_STAGE_REDO_ACTION), NULL, _OVER);
23!
3349

3350
  // update db status
3351
  memcpy(&dbObj, pDb, sizeof(SDbObj));
23✔
3352
  if (dbObj.cfg.pRetensions != NULL) {
23!
3353
    dbObj.cfg.pRetensions = taosArrayDup(pDb->cfg.pRetensions, NULL);
×
3354
    if (dbObj.cfg.pRetensions == NULL) {
×
3355
      code = terrno;
×
3356
      goto _OVER;
×
3357
    }
3358
  }
3359
  dbObj.vgVersion++;
23✔
3360
  dbObj.updateTime = taosGetTimestampMs();
23✔
3361
  dbObj.cfg.numOfVgroups++;
23✔
3362
  TAOS_CHECK_GOTO(mndAddDbStatusAction(pTrans, &dbObj, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION), NULL, _OVER);
23!
3363

3364
  // adjust vgroup replica
3365
  if (pDb->cfg.replications != newVg1.replica) {
23✔
3366
    SVgObj tmpGroup = {0};
19✔
3367
    TAOS_CHECK_GOTO(mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg1, pArray, &tmpGroup), NULL, _OVER);
19!
3368
  } else {
3369
    TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg1, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
4!
3370
  }
3371

3372
  if (pDb->cfg.replications != newVg2.replica) {
23✔
3373
    SVgObj tmpGroup = {0};
19✔
3374
    TAOS_CHECK_GOTO(mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg2, pArray, &tmpGroup), NULL, _OVER);
19✔
3375
  } else {
3376
    TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg2, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
4!
3377
  }
3378

3379
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, pVgroup, SDB_STATUS_DROPPED, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
14!
3380

3381
  // commit db status
3382
  dbObj.vgVersion++;
14✔
3383
  dbObj.updateTime = taosGetTimestampMs();
14✔
3384
  TAOS_CHECK_GOTO(mndAddDbStatusAction(pTrans, &dbObj, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
14!
3385

3386
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
14!
3387
  code = 0;
14✔
3388

3389
_OVER:
24✔
3390
  taosArrayDestroy(pArray);
24✔
3391
  mndTransDrop(pTrans);
24✔
3392
  taosArrayDestroy(dbObj.cfg.pRetensions);
24✔
3393
  TAOS_RETURN(code);
24✔
3394
}
3395

3396
extern int32_t mndProcessSplitVgroupMsgImp(SRpcMsg *pReq);
3397

3398
static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) { return mndProcessSplitVgroupMsgImp(pReq); }
24✔
3399

3400
#ifndef TD_ENTERPRISE
3401
int32_t mndProcessSplitVgroupMsgImp(SRpcMsg *pReq) { return 0; }
3402
#endif
3403

3404
static int32_t mndSetBalanceVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
24✔
3405
                                              SDnodeObj *pSrc, SDnodeObj *pDst) {
3406
  int32_t code = 0;
24✔
3407
  SVgObj  newVg = {0};
24✔
3408
  memcpy(&newVg, pVgroup, sizeof(SVgObj));
24✔
3409
  mInfo("vgId:%d, vgroup info before balance, replica:%d", newVg.vgId, newVg.replica);
24!
3410
  for (int32_t i = 0; i < newVg.replica; ++i) {
68✔
3411
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
44!
3412
  }
3413

3414
  TAOS_CHECK_RETURN(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pDst->id));
24!
3415
  TAOS_CHECK_RETURN(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pSrc->id));
24!
3416

3417
  {
3418
    SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
24✔
3419
    if (pRaw == NULL) {
24!
3420
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3421
      if (terrno != 0) code = terrno;
×
3422
      TAOS_RETURN(code);
×
3423
    }
3424
    if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
24!
3425
      sdbFreeRaw(pRaw);
×
3426
      TAOS_RETURN(code);
×
3427
    }
3428
    code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
24✔
3429
    if (code != 0) {
24!
3430
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
3431
      TAOS_RETURN(code);
×
3432
    }
3433
  }
3434

3435
  mInfo("vgId:%d, vgroup info after balance, replica:%d", newVg.vgId, newVg.replica);
24!
3436
  for (int32_t i = 0; i < newVg.replica; ++i) {
68✔
3437
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
44!
3438
  }
3439
  TAOS_RETURN(code);
24✔
3440
}
3441

3442
static int32_t mndBalanceVgroupBetweenDnode(SMnode *pMnode, STrans *pTrans, SDnodeObj *pSrc, SDnodeObj *pDst,
24✔
3443
                                            SHashObj *pBalancedVgroups) {
3444
  void   *pIter = NULL;
24✔
3445
  int32_t code = -1;
24✔
3446
  SSdb   *pSdb = pMnode->pSdb;
24✔
3447

3448
  while (1) {
16✔
3449
    SVgObj *pVgroup = NULL;
40✔
3450
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
40✔
3451
    if (pIter == NULL) break;
40!
3452
    if (taosHashGet(pBalancedVgroups, &pVgroup->vgId, sizeof(int32_t)) != NULL) {
40✔
3453
      sdbRelease(pSdb, pVgroup);
15✔
3454
      continue;
16✔
3455
    }
3456

3457
    bool existInSrc = false;
25✔
3458
    bool existInDst = false;
25✔
3459
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
70✔
3460
      SVnodeGid *pGid = &pVgroup->vnodeGid[i];
45✔
3461
      if (pGid->dnodeId == pSrc->id) existInSrc = true;
45✔
3462
      if (pGid->dnodeId == pDst->id) existInDst = true;
45!
3463
    }
3464

3465
    if (!existInSrc || existInDst) {
25!
3466
      sdbRelease(pSdb, pVgroup);
1✔
3467
      continue;
1✔
3468
    }
3469

3470
    SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
24✔
3471
    if (pDb == NULL) {
24!
3472
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3473
      if (terrno != 0) code = terrno;
×
3474
      mError("vgId:%d, balance vgroup can't find db obj dbName:%s", pVgroup->vgId, pVgroup->dbName);
×
3475
      goto _OUT;
×
3476
    }
3477

3478
    if (pDb->cfg.withArbitrator) {
24!
3479
      mInfo("vgId:%d, db:%s, with arbitrator, balance vgroup not allowed", pVgroup->vgId, pVgroup->dbName);
×
3480
      goto _OUT;
×
3481
    }
3482

3483
    code = mndSetBalanceVgroupInfoToTrans(pMnode, pTrans, pDb, pVgroup, pSrc, pDst);
24✔
3484
    if (code == 0) {
24!
3485
      code = taosHashPut(pBalancedVgroups, &pVgroup->vgId, sizeof(int32_t), &pVgroup->vgId, sizeof(int32_t));
24✔
3486
    }
3487

3488
  _OUT:
×
3489
    mndReleaseDb(pMnode, pDb);
24✔
3490
    sdbRelease(pSdb, pVgroup);
24✔
3491
    sdbCancelFetch(pSdb, pIter);
24✔
3492
    break;
24✔
3493
  }
3494

3495
  return code;
24✔
3496
}
3497

3498
static int32_t mndBalanceVgroup(SMnode *pMnode, SRpcMsg *pReq, SArray *pArray) {
15✔
3499
  int32_t   code = -1;
15✔
3500
  int32_t   numOfVgroups = 0;
15✔
3501
  STrans   *pTrans = NULL;
15✔
3502
  SHashObj *pBalancedVgroups = NULL;
15✔
3503

3504
  pBalancedVgroups = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
15✔
3505
  if (pBalancedVgroups == NULL) goto _OVER;
15!
3506

3507
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "balance-vgroup");
15✔
3508
  if (pTrans == NULL) {
15!
3509
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3510
    if (terrno != 0) code = terrno;
×
3511
    goto _OVER;
×
3512
  }
3513
  mndTransSetSerial(pTrans);
15✔
3514
  mInfo("trans:%d, used to balance vgroup", pTrans->id);
15!
3515
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
15!
3516
  TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
15✔
3517

3518
  while (1) {
24✔
3519
    taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
38✔
3520
    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
163✔
3521
      SDnodeObj *pDnode = taosArrayGet(pArray, i);
125✔
3522
      mInfo("dnode:%d, equivalent vnodes:%d others:%d support:%d, score:%f", pDnode->id, pDnode->numOfVnodes,
125!
3523
            pDnode->numOfSupportVnodes, pDnode->numOfOtherNodes, mndGetDnodeScore(pDnode, 0, 1));
3524
    }
3525

3526
    SDnodeObj *pSrc = taosArrayGet(pArray, taosArrayGetSize(pArray) - 1);
38✔
3527
    SDnodeObj *pDst = taosArrayGet(pArray, 0);
38✔
3528

3529
    float srcScore = mndGetDnodeScore(pSrc, -1, 1);
38✔
3530
    float dstScore = mndGetDnodeScore(pDst, 1, 1);
38✔
3531
    mInfo("trans:%d, after balance, src dnode:%d score:%f, dst dnode:%d score:%f", pTrans->id, pSrc->id, dstScore,
38!
3532
          pDst->id, dstScore);
3533

3534
    if (srcScore > dstScore - 0.000001) {
38✔
3535
      code = mndBalanceVgroupBetweenDnode(pMnode, pTrans, pSrc, pDst, pBalancedVgroups);
24✔
3536
      if (code == 0) {
24!
3537
        pSrc->numOfVnodes--;
24✔
3538
        pDst->numOfVnodes++;
24✔
3539
        numOfVgroups++;
24✔
3540
        continue;
24✔
3541
      } else {
3542
        mInfo("trans:%d, no vgroup need to balance from dnode:%d to dnode:%d", pTrans->id, pSrc->id, pDst->id);
×
3543
        break;
×
3544
      }
3545
    } else {
3546
      mInfo("trans:%d, no vgroup need to balance any more", pTrans->id);
14!
3547
      break;
14✔
3548
    }
3549
  }
3550

3551
  if (numOfVgroups <= 0) {
14!
3552
    mInfo("no need to balance vgroup");
×
3553
    code = 0;
×
3554
  } else {
3555
    mInfo("start to balance vgroup, numOfVgroups:%d", numOfVgroups);
14!
3556
    if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
14!
3557
    code = TSDB_CODE_ACTION_IN_PROGRESS;
14✔
3558
  }
3559

3560
_OVER:
15✔
3561
  taosHashCleanup(pBalancedVgroups);
15✔
3562
  mndTransDrop(pTrans);
15✔
3563
  TAOS_RETURN(code);
15✔
3564
}
3565

3566
static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) {
19✔
3567
  SMnode *pMnode = pReq->info.node;
19✔
3568
  int32_t code = -1;
19✔
3569
  SArray *pArray = NULL;
19✔
3570
  void   *pIter = NULL;
19✔
3571
  int64_t curMs = taosGetTimestampMs();
19✔
3572

3573
  SBalanceVgroupReq req = {0};
19✔
3574
  if (tDeserializeSBalanceVgroupReq(pReq->pCont, pReq->contLen, &req) != 0) {
19!
3575
    code = TSDB_CODE_INVALID_MSG;
×
3576
    goto _OVER;
×
3577
  }
3578

3579
  mInfo("start to balance vgroup");
19!
3580
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_BALANCE_VGROUP)) != 0) {
19✔
3581
    goto _OVER;
1✔
3582
  }
3583

3584
  while (1) {
51✔
3585
    SDnodeObj *pDnode = NULL;
69✔
3586
    pIter = sdbFetch(pMnode->pSdb, SDB_DNODE, pIter, (void **)&pDnode);
69✔
3587
    if (pIter == NULL) break;
69✔
3588
    if (!mndIsDnodeOnline(pDnode, curMs)) {
53✔
3589
      sdbCancelFetch(pMnode->pSdb, pIter);
2✔
3590
      code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
2✔
3591
      mError("failed to balance vgroup since %s, dnode:%d", terrstr(), pDnode->id);
2!
3592
      sdbRelease(pMnode->pSdb, pDnode);
2✔
3593
      goto _OVER;
2✔
3594
    }
3595

3596
    sdbRelease(pMnode->pSdb, pDnode);
51✔
3597
  }
3598

3599
  pArray = mndBuildDnodesArray(pMnode, 0, NULL);
16✔
3600
  if (pArray == NULL) {
16!
3601
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3602
    if (terrno != 0) code = terrno;
×
3603
    goto _OVER;
×
3604
  }
3605

3606
  if (taosArrayGetSize(pArray) < 2) {
16✔
3607
    mInfo("no need to balance vgroup since dnode num less than 2");
1!
3608
    code = 0;
1✔
3609
  } else {
3610
    code = mndBalanceVgroup(pMnode, pReq, pArray);
15✔
3611
  }
3612

3613
  auditRecord(pReq, pMnode->clusterId, "balanceVgroup", "", "", req.sql, req.sqlLen);
16✔
3614

3615
_OVER:
19✔
3616
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
19✔
3617
    mError("failed to balance vgroup since %s", tstrerror(code));
4!
3618
  }
3619

3620
  taosArrayDestroy(pArray);
19✔
3621
  tFreeSBalanceVgroupReq(&req);
19✔
3622
  TAOS_RETURN(code);
19✔
3623
}
3624

3625
bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; }
68,058,292!
3626

3627
bool mndVgroupInDnode(SVgObj *pVgroup, int32_t dnodeId) {
16✔
3628
  for (int i = 0; i < pVgroup->replica; i++) {
42✔
3629
    if (pVgroup->vnodeGid[i].dnodeId == dnodeId) return true;
36✔
3630
  }
3631
  return false;
6✔
3632
}
3633

3634
static void *mndBuildCompactVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen, int64_t compactTs,
28✔
3635
                                     STimeWindow tw, bool metaOnly) {
3636
  SCompactVnodeReq compactReq = {0};
28✔
3637
  compactReq.dbUid = pDb->uid;
28✔
3638
  compactReq.compactStartTime = compactTs;
28✔
3639
  compactReq.tw = tw;
28✔
3640
  compactReq.metaOnly = metaOnly;
28✔
3641
  tstrncpy(compactReq.db, pDb->name, TSDB_DB_FNAME_LEN);
28✔
3642

3643
  mInfo("vgId:%d, build compact vnode config req", pVgroup->vgId);
28!
3644
  int32_t contLen = tSerializeSCompactVnodeReq(NULL, 0, &compactReq);
28✔
3645
  if (contLen < 0) {
28!
3646
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
3647
    return NULL;
×
3648
  }
3649
  contLen += sizeof(SMsgHead);
28✔
3650

3651
  void *pReq = taosMemoryMalloc(contLen);
28!
3652
  if (pReq == NULL) {
28!
3653
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
3654
    return NULL;
×
3655
  }
3656

3657
  SMsgHead *pHead = pReq;
28✔
3658
  pHead->contLen = htonl(contLen);
28✔
3659
  pHead->vgId = htonl(pVgroup->vgId);
28✔
3660

3661
  if (tSerializeSCompactVnodeReq((char *)pReq + sizeof(SMsgHead), contLen, &compactReq) < 0) {
28!
3662
    taosMemoryFree(pReq);
×
3663
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
3664
    return NULL;
×
3665
  }
3666
  *pContLen = contLen;
28✔
3667
  return pReq;
28✔
3668
}
3669

3670
static int32_t mndAddCompactVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs,
28✔
3671
                                        STimeWindow tw, bool metaOnly) {
3672
  int32_t      code = 0;
28✔
3673
  STransAction action = {0};
28✔
3674
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
28✔
3675

3676
  int32_t contLen = 0;
28✔
3677
  void   *pReq = mndBuildCompactVnodeReq(pMnode, pDb, pVgroup, &contLen, compactTs, tw, metaOnly);
28✔
3678
  if (pReq == NULL) {
28!
3679
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3680
    if (terrno != 0) code = terrno;
×
3681
    TAOS_RETURN(code);
×
3682
  }
3683

3684
  action.pCont = pReq;
28✔
3685
  action.contLen = contLen;
28✔
3686
  action.msgType = TDMT_VND_COMPACT;
28✔
3687

3688
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
28!
3689
    taosMemoryFree(pReq);
×
3690
    TAOS_RETURN(code);
×
3691
  }
3692

3693
  TAOS_RETURN(code);
28✔
3694
}
3695

3696
int32_t mndBuildCompactVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs,
28✔
3697
                                    STimeWindow tw, bool metaOnly) {
3698
  TAOS_CHECK_RETURN(mndAddCompactVnodeAction(pMnode, pTrans, pDb, pVgroup, compactTs, tw, metaOnly));
28!
3699
  return 0;
28✔
3700
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc