• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4996

19 Mar 2026 02:16AM UTC coverage: 72.069% (+0.07%) from 71.996%
#4996

push

travis-ci

web-flow
feat: SQL firewall black/white list (#34798)

461 of 618 new or added lines in 4 files covered. (74.6%)

380 existing lines in 128 files now uncovered.

245359 of 340448 relevant lines covered (72.07%)

135732617.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.92
/source/dnode/mnode/impl/src/mndVgroup.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndVgroup.h"
18
#include "audit.h"
19
#include "mndArbGroup.h"
20
#include "mndDb.h"
21
#include "mndDnode.h"
22
#include "mndEncryptAlgr.h"
23
#include "mndMnode.h"
24
#include "mndPrivilege.h"
25
#include "mndShow.h"
26
#include "mndStb.h"
27
#include "mndStream.h"
28
#include "mndTopic.h"
29
#include "mndTrans.h"
30
#include "mndUser.h"
31
#include "tmisce.h"
32

33
#define VGROUP_VER_COMPAT_MOUNT_KEEP_VER 2
34
#define VGROUP_VER_NUMBER                VGROUP_VER_COMPAT_MOUNT_KEEP_VER
35
#define VGROUP_RESERVE_SIZE              60
36
// since 3.3.6.32/3.3.8.6 mountId + keepVersion + keepVersionTime + VGROUP_RESERVE_SIZE = 4 + 8 + 8 + 60 = 80
37
#define DLEN_AFTER_SYNC_CONF_CHANGE_VER 80
38

39
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup);
40
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup);
41
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew);
42
static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw);
43

44
static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
45
static void    mndCancelGetNextVgroup(SMnode *pMnode, void *pIter);
46
static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
47
static void    mndCancelGetNextVnode(SMnode *pMnode, void *pIter);
48

49
static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq);
50
static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq);
51
static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq);
52
static int32_t mndProcessVgroupBalanceLeaderMsg(SRpcMsg *pReq);
53
static int32_t mndProcessSetVgroupKeepVersionReq(SRpcMsg *pReq);
54

55
int32_t mndInitVgroup(SMnode *pMnode) {
442,653✔
56
  SSdbTable table = {
442,653✔
57
      .sdbType = SDB_VGROUP,
58
      .keyType = SDB_KEY_INT32,
59
      .encodeFp = (SdbEncodeFp)mndVgroupActionEncode,
60
      .decodeFp = (SdbDecodeFp)mndVgroupActionDecode,
61
      .insertFp = (SdbInsertFp)mndVgroupActionInsert,
62
      .updateFp = (SdbUpdateFp)mndVgroupActionUpdate,
63
      .deleteFp = (SdbDeleteFp)mndVgroupActionDelete,
64
      .validateFp = (SdbValidateFp)mndNewVgActionValidate,
65
  };
66

67
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndTransProcessRsp);
442,653✔
68
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_REPLICA_RSP, mndTransProcessRsp);
442,653✔
69
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIG_RSP, mndTransProcessRsp);
442,653✔
70
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIRM_RSP, mndTransProcessRsp);
442,653✔
71
  mndSetMsgHandle(pMnode, TDMT_VND_SET_KEEP_VERSION_RSP, mndTransProcessRsp);
442,653✔
72
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_HASHRANGE_RSP, mndTransProcessRsp);
442,653✔
73
  mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndTransProcessRsp);
442,653✔
74
  mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_RSP, mndTransProcessRsp);
442,653✔
75
  mndSetMsgHandle(pMnode, TDMT_VND_SCAN_RSP, mndTransProcessRsp);
442,653✔
76
  mndSetMsgHandle(pMnode, TDMT_VND_DISABLE_WRITE_RSP, mndTransProcessRsp);
442,653✔
77
  mndSetMsgHandle(pMnode, TDMT_SYNC_FORCE_FOLLOWER_RSP, mndTransProcessRsp);
442,653✔
78
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_ELECTBASELINE_RSP, mndTransProcessRsp);
442,653✔
79
  
80
  mndSetMsgHandle(pMnode, TDMT_DND_ALTER_VNODE_TYPE_RSP, mndTransProcessRsp);
442,653✔
81
  mndSetMsgHandle(pMnode, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP_RSP, mndTransProcessRsp);
442,653✔
82
  mndSetMsgHandle(pMnode, TDMT_SYNC_CONFIG_CHANGE_RSP, mndTransProcessRsp);
442,653✔
83

84
  mndSetMsgHandle(pMnode, TDMT_MND_REDISTRIBUTE_VGROUP, mndProcessRedistributeVgroupMsg);
442,653✔
85
  mndSetMsgHandle(pMnode, TDMT_MND_SPLIT_VGROUP, mndProcessSplitVgroupMsg);
442,653✔
86
  // mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessVgroupBalanceLeaderMsg);
87
  mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessBalanceVgroupMsg);
442,653✔
88
  mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP_LEADER, mndProcessVgroupBalanceLeaderMsg);
442,653✔
89
  mndSetMsgHandle(pMnode, TDMT_MND_SET_VGROUP_KEEP_VERSION, mndProcessSetVgroupKeepVersionReq);
442,653✔
90

91
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndRetrieveVgroups);
442,653✔
92
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndCancelGetNextVgroup);
442,653✔
93
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndRetrieveVnodes);
442,653✔
94
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndCancelGetNextVnode);
442,653✔
95

96
  return sdbSetTable(pMnode->pSdb, table);
442,653✔
97
}
98

99
void mndCleanupVgroup(SMnode *pMnode) {}
442,590✔
100

101
SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) {
11,539,185✔
102
  int32_t code = 0;
11,539,185✔
103
  int32_t lino = 0;
11,539,185✔
104
  terrno = TSDB_CODE_OUT_OF_MEMORY;
11,539,185✔
105

106
  SSdbRaw *pRaw = sdbAllocRaw(SDB_VGROUP, VGROUP_VER_NUMBER, sizeof(SVgObj) + VGROUP_RESERVE_SIZE);
11,539,185✔
107
  if (pRaw == NULL) goto _OVER;
11,539,185✔
108

109
  int32_t dataPos = 0;
11,539,185✔
110
  SDB_SET_INT32(pRaw, dataPos, pVgroup->vgId, _OVER)
11,539,185✔
111
  SDB_SET_INT64(pRaw, dataPos, pVgroup->createdTime, _OVER)
11,539,185✔
112
  SDB_SET_INT64(pRaw, dataPos, pVgroup->updateTime, _OVER)
11,539,185✔
113
  SDB_SET_INT32(pRaw, dataPos, pVgroup->version, _OVER)
11,539,185✔
114
  SDB_SET_INT32(pRaw, dataPos, pVgroup->hashBegin, _OVER)
11,539,185✔
115
  SDB_SET_INT32(pRaw, dataPos, pVgroup->hashEnd, _OVER)
11,539,185✔
116
  SDB_SET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, _OVER)
11,539,185✔
117
  SDB_SET_INT64(pRaw, dataPos, pVgroup->dbUid, _OVER)
11,539,185✔
118
  SDB_SET_INT8(pRaw, dataPos, pVgroup->isTsma, _OVER)
11,539,185✔
119
  SDB_SET_INT8(pRaw, dataPos, pVgroup->replica, _OVER)
11,539,185✔
120
  for (int8_t i = 0; i < pVgroup->replica; ++i) {
24,958,588✔
121
    SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
13,419,403✔
122
    SDB_SET_INT32(pRaw, dataPos, pVgid->dnodeId, _OVER)
13,419,403✔
123
  }
124
  SDB_SET_INT32(pRaw, dataPos, pVgroup->syncConfChangeVer, _OVER)
11,539,185✔
125
  SDB_SET_INT32(pRaw, dataPos, pVgroup->mountVgId, _OVER)
11,539,185✔
126
  SDB_SET_INT64(pRaw, dataPos, pVgroup->keepVersion, _OVER)
11,539,185✔
127
  SDB_SET_INT64(pRaw, dataPos, pVgroup->keepVersionTime, _OVER)
11,539,185✔
128
  SDB_SET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER)
11,539,185✔
129
  SDB_SET_DATALEN(pRaw, dataPos, _OVER)
11,539,185✔
130

131
  terrno = 0;
11,539,185✔
132

133
_OVER:
11,539,185✔
134
  if (terrno != 0) {
11,539,185✔
135
    mError("vgId:%d, failed to encode to raw:%p since %s", pVgroup->vgId, pRaw, terrstr());
×
136
    sdbFreeRaw(pRaw);
×
137
    return NULL;
×
138
  }
139

140
  mTrace("vgId:%d, encode to raw:%p, row:%p", pVgroup->vgId, pRaw, pVgroup);
11,539,185✔
141
  return pRaw;
11,539,185✔
142
}
143

144
SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
10,407,038✔
145
  int32_t code = 0;
10,407,038✔
146
  int32_t lino = 0;
10,407,038✔
147
  terrno = TSDB_CODE_OUT_OF_MEMORY;
10,407,038✔
148
  SSdbRow *pRow = NULL;
10,407,038✔
149
  SVgObj  *pVgroup = NULL;
10,407,038✔
150

151
  int8_t sver = 0;
10,407,038✔
152
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
10,407,038✔
153

154
  if (sver < 1 || sver > VGROUP_VER_NUMBER) {
10,407,038✔
155
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
156
    goto _OVER;
×
157
  }
158

159
  pRow = sdbAllocRow(sizeof(SVgObj));
10,407,038✔
160
  if (pRow == NULL) goto _OVER;
10,407,038✔
161

162
  pVgroup = sdbGetRowObj(pRow);
10,407,038✔
163
  if (pVgroup == NULL) goto _OVER;
10,407,038✔
164

165
  int32_t dataPos = 0;
10,407,038✔
166
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->vgId, _OVER)
10,407,038✔
167
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->createdTime, _OVER)
10,407,038✔
168
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->updateTime, _OVER)
10,407,038✔
169
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->version, _OVER)
10,407,038✔
170
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashBegin, _OVER)
10,407,038✔
171
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashEnd, _OVER)
10,407,038✔
172
  SDB_GET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, _OVER)
10,407,038✔
173
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->dbUid, _OVER)
10,407,038✔
174
  SDB_GET_INT8(pRaw, dataPos, &pVgroup->isTsma, _OVER)
10,407,038✔
175
  SDB_GET_INT8(pRaw, dataPos, &pVgroup->replica, _OVER)
10,407,038✔
176
  for (int8_t i = 0; i < pVgroup->replica; ++i) {
22,643,827✔
177
    SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
12,236,789✔
178
    SDB_GET_INT32(pRaw, dataPos, &pVgid->dnodeId, _OVER)
12,236,789✔
179
    if (pVgroup->replica == 1) {
12,236,789✔
180
      pVgid->syncState = TAOS_SYNC_STATE_LEADER;
9,449,891✔
181
    }
182
    pVgid->snapSeq = -1;
12,236,789✔
183
  }
184
  if (dataPos + 2 * sizeof(int32_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen) {
10,407,038✔
185
    SDB_GET_INT32(pRaw, dataPos, &pVgroup->syncConfChangeVer, _OVER)
10,407,038✔
186
  }
187

188
  int32_t dlenAfterSyncConfChangeVer = pRaw->dataLen - dataPos;
10,407,038✔
189
  if (dataPos + sizeof(int32_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen) {
10,407,038✔
190
    SDB_GET_INT32(pRaw, dataPos, &pVgroup->mountVgId, _OVER)
10,407,038✔
191
  }
192
  if (dataPos + sizeof(int64_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen) {
10,407,038✔
193
    SDB_GET_INT64(pRaw, dataPos, &pVgroup->keepVersion, _OVER)
10,407,038✔
194
  }
195
  if (dataPos + sizeof(int64_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen) {
10,407,038✔
196
    SDB_GET_INT64(pRaw, dataPos, &pVgroup->keepVersionTime, _OVER)
10,407,038✔
197
  }
198
  if (dataPos + VGROUP_RESERVE_SIZE <= pRaw->dataLen) {
10,407,038✔
199
    SDB_GET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER)
10,407,038✔
200
  }
201

202
  if (sver < VGROUP_VER_COMPAT_MOUNT_KEEP_VER) {
10,407,038✔
203
    if (dlenAfterSyncConfChangeVer == DLEN_AFTER_SYNC_CONF_CHANGE_VER) {
×
204
      pVgroup->mountVgId = 0;
×
205
    }
206
    pVgroup->keepVersion = -1;
×
207
    pVgroup->keepVersionTime = 0;
×
208
  }
209

210
  terrno = 0;
10,407,038✔
211

212
_OVER:
10,407,038✔
213
  if (terrno != 0) {
10,407,038✔
214
    mError("vgId:%d, failed to decode from raw:%p since %s", pVgroup == NULL ? 0 : pVgroup->vgId, pRaw, terrstr());
×
215
    taosMemoryFreeClear(pRow);
×
216
    return NULL;
×
217
  }
218

219
  mTrace("vgId:%d, decode from raw:%p, row:%p", pVgroup->vgId, pRaw, pVgroup);
10,407,038✔
220
  return pRow;
10,407,038✔
221
}
222

223
static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw) {
2,736,050✔
224
  SSdb    *pSdb = pMnode->pSdb;
2,736,050✔
225
  SSdbRow *pRow = NULL;
2,736,050✔
226
  SVgObj  *pVgroup = NULL;
2,736,050✔
227
  int      code = -1;
2,736,050✔
228

229
  pRow = mndVgroupActionDecode(pRaw);
2,736,050✔
230
  if (pRow == NULL) {
2,736,050✔
231
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
232
    if (terrno != 0) code = terrno;
×
233
    goto _OVER;
×
234
  }
235
  pVgroup = sdbGetRowObj(pRow);
2,736,050✔
236
  if (pVgroup == NULL) {
2,736,050✔
237
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
238
    if (terrno != 0) code = terrno;
×
239
    goto _OVER;
×
240
  }
241

242
  int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
2,736,050✔
243
  if (maxVgId > pVgroup->vgId) {
2,736,050✔
244
    mError("trans:%d, vgroup id %d already in use. maxVgId:%d", pTrans->id, pVgroup->vgId, maxVgId);
×
245
    goto _OVER;
×
246
  }
247

248
  code = 0;
2,736,050✔
249
_OVER:
2,736,050✔
250
  if (pVgroup) mndVgroupActionDelete(pSdb, pVgroup);
2,736,050✔
251
  taosMemoryFreeClear(pRow);
2,736,050✔
252
  TAOS_RETURN(code);
2,736,050✔
253
}
254

255
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup) {
3,051,041✔
256
  mTrace("vgId:%d, perform insert action, row:%p", pVgroup->vgId, pVgroup);
3,051,041✔
257
  return 0;
3,051,041✔
258
}
259

260
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup) {
10,395,326✔
261
  mTrace("vgId:%d, perform delete action, row:%p", pVgroup->vgId, pVgroup);
10,395,326✔
262
  return 0;
10,395,326✔
263
}
264

265
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew) {
3,151,938✔
266
  mTrace("vgId:%d, perform update action, old row:%p new row:%p", pOld->vgId, pOld, pNew);
3,151,938✔
267
  pOld->updateTime = pNew->updateTime;
3,151,938✔
268
  pOld->version = pNew->version;
3,151,938✔
269
  pOld->hashBegin = pNew->hashBegin;
3,151,938✔
270
  pOld->hashEnd = pNew->hashEnd;
3,151,938✔
271
  pOld->replica = pNew->replica;
3,151,938✔
272
  pOld->isTsma = pNew->isTsma;
3,151,938✔
273
  pOld->keepVersion = pNew->keepVersion;
3,151,938✔
274
  pOld->keepVersionTime = pNew->keepVersionTime;
3,151,938✔
275
  for (int32_t i = 0; i < pNew->replica; ++i) {
7,262,943✔
276
    SVnodeGid *pNewGid = &pNew->vnodeGid[i];
4,111,005✔
277
    for (int32_t j = 0; j < pOld->replica; ++j) {
11,162,790✔
278
      SVnodeGid *pOldGid = &pOld->vnodeGid[j];
7,051,785✔
279
      if (pNewGid->dnodeId == pOldGid->dnodeId) {
7,051,785✔
280
        pNewGid->syncState = pOldGid->syncState;
3,879,122✔
281
        pNewGid->syncRestore = pOldGid->syncRestore;
3,879,122✔
282
        pNewGid->syncCanRead = pOldGid->syncCanRead;
3,879,122✔
283
        pNewGid->syncAppliedIndex = pOldGid->syncAppliedIndex;
3,879,122✔
284
        pNewGid->syncCommitIndex = pOldGid->syncCommitIndex;
3,879,122✔
285
        pNewGid->bufferSegmentUsed = pOldGid->bufferSegmentUsed;
3,879,122✔
286
        pNewGid->bufferSegmentSize = pOldGid->bufferSegmentSize;
3,879,122✔
287
        pNewGid->learnerProgress = pOldGid->learnerProgress;
3,879,122✔
288
        pNewGid->snapSeq = pOldGid->snapSeq;
3,879,122✔
289
        pNewGid->syncTotalIndex = pOldGid->syncTotalIndex;
3,879,122✔
290
      }
291
    }
292
  }
293
  pNew->numOfTables = pOld->numOfTables;
3,151,938✔
294
  pNew->numOfTimeSeries = pOld->numOfTimeSeries;
3,151,938✔
295
  pNew->totalStorage = pOld->totalStorage;
3,151,938✔
296
  pNew->compStorage = pOld->compStorage;
3,151,938✔
297
  pNew->pointsWritten = pOld->pointsWritten;
3,151,938✔
298
  pNew->compact = pOld->compact;
3,151,938✔
299
  memcpy(pOld->vnodeGid, pNew->vnodeGid, (TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA) * sizeof(SVnodeGid));
3,151,938✔
300
  pOld->syncConfChangeVer = pNew->syncConfChangeVer;
3,151,938✔
301
  tstrncpy(pOld->dbName, pNew->dbName, TSDB_DB_FNAME_LEN);
3,151,938✔
302
  return 0;
3,151,938✔
303
}
304

305
SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId) {
133,794,513✔
306
  SSdb   *pSdb = pMnode->pSdb;
133,794,513✔
307
  SVgObj *pVgroup = sdbAcquire(pSdb, SDB_VGROUP, &vgId);
133,794,513✔
308
  if (pVgroup == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
133,794,513✔
309
    terrno = TSDB_CODE_MND_VGROUP_NOT_EXIST;
1,001,994✔
310
  }
311
  return pVgroup;
133,794,513✔
312
}
313

314
void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) {
133,001,356✔
315
  SSdb *pSdb = pMnode->pSdb;
133,001,356✔
316
  sdbRelease(pSdb, pVgroup);
133,001,356✔
317
}
133,001,356✔
318

319
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
3,022,958✔
320
  SCreateVnodeReq createReq = {0};
3,022,958✔
321
  createReq.vgId = pVgroup->vgId;
3,022,958✔
322
  memcpy(createReq.db, pDb->name, TSDB_DB_FNAME_LEN);
3,022,958✔
323
  createReq.dbUid = pDb->uid;
3,022,958✔
324
  createReq.vgVersion = pVgroup->version;
3,022,958✔
325
  createReq.numOfStables = pDb->cfg.numOfStables;
3,022,958✔
326
  createReq.buffer = pDb->cfg.buffer;
3,022,958✔
327
  createReq.pageSize = pDb->cfg.pageSize;
3,022,958✔
328
  createReq.pages = pDb->cfg.pages;
3,022,958✔
329
  createReq.cacheLastSize = pDb->cfg.cacheLastSize;
3,022,958✔
330
  createReq.daysPerFile = pDb->cfg.daysPerFile;
3,022,958✔
331
  createReq.daysToKeep0 = pDb->cfg.daysToKeep0;
3,022,958✔
332
  createReq.daysToKeep1 = pDb->cfg.daysToKeep1;
3,022,958✔
333
  createReq.daysToKeep2 = pDb->cfg.daysToKeep2;
3,022,958✔
334
  createReq.keepTimeOffset = pDb->cfg.keepTimeOffset;
3,022,958✔
335
  createReq.ssChunkSize = pDb->cfg.ssChunkSize;
3,022,958✔
336
  createReq.ssKeepLocal = pDb->cfg.ssKeepLocal;
3,022,958✔
337
  createReq.ssCompact = pDb->cfg.ssCompact;
3,022,958✔
338
  createReq.minRows = pDb->cfg.minRows;
3,022,958✔
339
  createReq.maxRows = pDb->cfg.maxRows;
3,022,958✔
340
  createReq.walFsyncPeriod = pDb->cfg.walFsyncPeriod;
3,022,958✔
341
  createReq.walLevel = pDb->cfg.walLevel;
3,022,958✔
342
  createReq.precision = pDb->cfg.precision;
3,022,958✔
343
  createReq.compression = pDb->cfg.compression;
3,022,958✔
344
  createReq.strict = pDb->cfg.strict;
3,022,958✔
345
  createReq.cacheLast = pDb->cfg.cacheLast;
3,022,958✔
346
  createReq.replica = 0;
3,022,958✔
347
  createReq.learnerReplica = 0;
3,022,958✔
348
  createReq.selfIndex = -1;
3,022,958✔
349
  createReq.learnerSelfIndex = -1;
3,022,958✔
350
  createReq.hashBegin = pVgroup->hashBegin;
3,022,958✔
351
  createReq.hashEnd = pVgroup->hashEnd;
3,022,958✔
352
  createReq.hashMethod = pDb->cfg.hashMethod;
3,022,958✔
353
  createReq.numOfRetensions = pDb->cfg.numOfRetensions;
3,022,958✔
354
  createReq.pRetensions = pDb->cfg.pRetensions;
3,022,958✔
355
  createReq.isTsma = pVgroup->isTsma;
3,022,958✔
356
  createReq.pTsma = pVgroup->pTsma;
3,022,958✔
357
  createReq.walRetentionPeriod = pDb->cfg.walRetentionPeriod;
3,022,958✔
358
  createReq.walRetentionSize = pDb->cfg.walRetentionSize;
3,022,958✔
359
  createReq.walRollPeriod = pDb->cfg.walRollPeriod;
3,022,958✔
360
  createReq.walSegmentSize = pDb->cfg.walSegmentSize;
3,022,958✔
361
  createReq.sstTrigger = pDb->cfg.sstTrigger;
3,022,958✔
362
  createReq.hashPrefix = pDb->cfg.hashPrefix;
3,022,958✔
363
  createReq.hashSuffix = pDb->cfg.hashSuffix;
3,022,958✔
364
  createReq.tsdbPageSize = pDb->cfg.tsdbPageSize;
3,022,958✔
365
  createReq.changeVersion = ++(pVgroup->syncConfChangeVer);
3,022,958✔
366
  // createReq.encryptAlgorithm = pDb->cfg.encryptAlgorithm;
367
  memset(createReq.encryptAlgrName, 0, TSDB_ENCRYPT_ALGR_NAME_LEN);
3,022,958✔
368
  if (pDb->cfg.encryptAlgorithm > 0) {
3,022,958✔
369
    mndGetEncryptOsslAlgrNameById(pMnode, pDb->cfg.encryptAlgorithm, createReq.encryptAlgrName);
4,307✔
370
  }
371
  createReq.isAudit = pDb->cfg.isAudit ? 1 : 0;
3,022,958✔
372
  createReq.allowDrop = pDb->cfg.allowDrop;
3,022,958✔
373
  createReq.secureDelete = pDb->cfg.secureDelete;
3,022,958✔
374
  int32_t code = 0;
3,022,958✔
375

376
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
7,129,958✔
377
    SReplica *pReplica = NULL;
4,107,000✔
378

379
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
4,107,000✔
380
      pReplica = &createReq.replicas[createReq.replica];
3,997,645✔
381
    } else {
382
      pReplica = &createReq.learnerReplicas[createReq.learnerReplica];
109,355✔
383
    }
384

385
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
4,107,000✔
386
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
4,107,000✔
387
    if (pVgidDnode == NULL) {
4,107,000✔
388
      return NULL;
×
389
    }
390

391
    pReplica->id = pVgidDnode->id;
4,107,000✔
392
    pReplica->port = pVgidDnode->port;
4,107,000✔
393
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
4,107,000✔
394
    mndReleaseDnode(pMnode, pVgidDnode);
4,107,000✔
395

396
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
4,107,000✔
397
      if (pDnode->id == pVgid->dnodeId) {
3,997,645✔
398
        createReq.selfIndex = createReq.replica;
2,913,603✔
399
      }
400
    } else {
401
      if (pDnode->id == pVgid->dnodeId) {
109,355✔
402
        createReq.learnerSelfIndex = createReq.learnerReplica;
109,355✔
403
      }
404
    }
405

406
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
4,107,000✔
407
      createReq.replica++;
3,997,645✔
408
    } else {
409
      createReq.learnerReplica++;
109,355✔
410
    }
411
  }
412

413
  if (createReq.selfIndex == -1 && createReq.learnerSelfIndex == -1) {
3,022,958✔
414
    terrno = TSDB_CODE_APP_ERROR;
×
415
    return NULL;
×
416
  }
417

418
  createReq.changeVersion = pVgroup->syncConfChangeVer;
3,022,958✔
419

420
  mInfo(
3,022,958✔
421
      "vgId:%d, build create vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d "
422
      "changeVersion:%d",
423
      createReq.vgId, createReq.replica, createReq.selfIndex, createReq.learnerReplica, createReq.learnerSelfIndex,
424
      createReq.strict, createReq.changeVersion);
425
  for (int32_t i = 0; i < createReq.replica; ++i) {
7,020,603✔
426
    mInfo("vgId:%d, replica:%d ep:%s:%u", createReq.vgId, i, createReq.replicas[i].fqdn, createReq.replicas[i].port);
3,997,645✔
427
  }
428
  for (int32_t i = 0; i < createReq.learnerReplica; ++i) {
3,132,313✔
429
    mInfo("vgId:%d, replica:%d ep:%s:%u", createReq.vgId, i, createReq.learnerReplicas[i].fqdn,
109,355✔
430
          createReq.learnerReplicas[i].port);
431
  }
432

433
  int32_t contLen = tSerializeSCreateVnodeReq(NULL, 0, &createReq);
3,022,958✔
434
  if (contLen < 0) {
3,022,958✔
435
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
436
    return NULL;
×
437
  }
438

439
  void *pReq = taosMemoryMalloc(contLen);
3,022,958✔
440
  if (pReq == NULL) {
3,022,958✔
441
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
442
    return NULL;
×
443
  }
444

445
  code = tSerializeSCreateVnodeReq(pReq, contLen, &createReq);
3,022,958✔
446
  if (code < 0) {
3,022,958✔
447
    terrno = TSDB_CODE_APP_ERROR;
×
448
    taosMemoryFree(pReq);
×
449
    mError("vgId:%d, failed to serialize create vnode req,since %s", createReq.vgId, terrstr());
×
450
    return NULL;
×
451
  }
452
  *pContLen = contLen;
3,022,958✔
453
  return pReq;
3,022,958✔
454
}
455

456
static void *mndBuildAlterVnodeConfigReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
206,394✔
457
  SAlterVnodeConfigReq alterReq = {0};
206,394✔
458
  alterReq.vgVersion = pVgroup->version;
206,394✔
459
  alterReq.buffer = pDb->cfg.buffer;
206,394✔
460
  alterReq.pageSize = pDb->cfg.pageSize;
206,394✔
461
  alterReq.pages = pDb->cfg.pages;
206,394✔
462
  alterReq.cacheLastSize = pDb->cfg.cacheLastSize;
206,394✔
463
  alterReq.daysPerFile = pDb->cfg.daysPerFile;
206,394✔
464
  alterReq.daysToKeep0 = pDb->cfg.daysToKeep0;
206,394✔
465
  alterReq.daysToKeep1 = pDb->cfg.daysToKeep1;
206,394✔
466
  alterReq.daysToKeep2 = pDb->cfg.daysToKeep2;
206,394✔
467
  alterReq.keepTimeOffset = pDb->cfg.keepTimeOffset;
206,394✔
468
  alterReq.walFsyncPeriod = pDb->cfg.walFsyncPeriod;
206,394✔
469
  alterReq.walLevel = pDb->cfg.walLevel;
206,394✔
470
  alterReq.strict = pDb->cfg.strict;
206,394✔
471
  alterReq.cacheLast = pDb->cfg.cacheLast;
206,394✔
472
  alterReq.sttTrigger = pDb->cfg.sstTrigger;
206,394✔
473
  alterReq.minRows = pDb->cfg.minRows;
206,394✔
474
  alterReq.walRetentionPeriod = pDb->cfg.walRetentionPeriod;
206,394✔
475
  alterReq.walRetentionSize = pDb->cfg.walRetentionSize;
206,394✔
476
  alterReq.ssKeepLocal = pDb->cfg.ssKeepLocal;
206,394✔
477
  alterReq.ssCompact = pDb->cfg.ssCompact;
206,394✔
478
  alterReq.allowDrop = (int8_t)pDb->cfg.allowDrop;
206,394✔
479
  alterReq.secureDelete = pDb->cfg.secureDelete;
206,394✔
480

481
  mInfo("vgId:%d, build alter vnode config req", pVgroup->vgId);
206,394✔
482
  int32_t contLen = tSerializeSAlterVnodeConfigReq(NULL, 0, &alterReq);
206,394✔
483
  if (contLen < 0) {
206,394✔
484
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
485
    return NULL;
×
486
  }
487
  contLen += sizeof(SMsgHead);
206,394✔
488

489
  void *pReq = taosMemoryMalloc(contLen);
206,394✔
490
  if (pReq == NULL) {
206,394✔
491
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
492
    return NULL;
×
493
  }
494

495
  SMsgHead *pHead = pReq;
206,394✔
496
  pHead->contLen = htonl(contLen);
206,394✔
497
  pHead->vgId = htonl(pVgroup->vgId);
206,394✔
498

499
  if (tSerializeSAlterVnodeConfigReq((char *)pReq + sizeof(SMsgHead), contLen, &alterReq) < 0) {
206,394✔
500
    taosMemoryFree(pReq);
×
501
    mError("vgId:%d, failed to serialize alter vnode config req,since %s", pVgroup->vgId, terrstr());
×
502
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
503
    return NULL;
×
504
  }
505
  *pContLen = contLen;
206,394✔
506
  return pReq;
206,394✔
507
}
508

509
static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId,
830,705✔
510
                                          int32_t *pContLen) {
511
  SAlterVnodeReplicaReq alterReq = {
1,661,410✔
512
      .vgId = pVgroup->vgId,
830,705✔
513
      .strict = pDb->cfg.strict,
830,705✔
514
      .replica = 0,
515
      .learnerReplica = 0,
516
      .selfIndex = -1,
517
      .learnerSelfIndex = -1,
518
      .changeVersion = ++(pVgroup->syncConfChangeVer),
1,661,410✔
519
  };
520

521
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
3,401,212✔
522
    SReplica *pReplica = NULL;
2,570,507✔
523

524
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
2,570,507✔
525
      pReplica = &alterReq.replicas[alterReq.replica];
2,368,673✔
526
      alterReq.replica++;
2,368,673✔
527
    } else {
528
      pReplica = &alterReq.learnerReplicas[alterReq.learnerReplica];
201,834✔
529
      alterReq.learnerReplica++;
201,834✔
530
    }
531

532
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
2,570,507✔
533
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
2,570,507✔
534
    if (pVgidDnode == NULL) return NULL;
2,570,507✔
535

536
    pReplica->id = pVgidDnode->id;
2,570,507✔
537
    pReplica->port = pVgidDnode->port;
2,570,507✔
538
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
2,570,507✔
539
    mndReleaseDnode(pMnode, pVgidDnode);
2,570,507✔
540

541
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
2,570,507✔
542
      if (dnodeId == pVgid->dnodeId) {
2,368,673✔
543
        alterReq.selfIndex = v;
830,705✔
544
      }
545
    } else {
546
      if (dnodeId == pVgid->dnodeId) {
201,834✔
547
        alterReq.learnerSelfIndex = v;
×
548
      }
549
    }
550
  }
551

552
  mInfo(
830,705✔
553
      "vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d "
554
      "changeVersion:%d",
555
      alterReq.vgId, alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica, alterReq.learnerSelfIndex,
556
      alterReq.strict, alterReq.changeVersion);
557
  for (int32_t i = 0; i < alterReq.replica; ++i) {
3,199,378✔
558
    mInfo("vgId:%d, replica:%d ep:%s:%u", alterReq.vgId, i, alterReq.replicas[i].fqdn, alterReq.replicas[i].port);
2,368,673✔
559
  }
560
  for (int32_t i = 0; i < alterReq.learnerReplica; ++i) {
1,032,539✔
561
    mInfo("vgId:%d, learnerReplica:%d ep:%s:%u", alterReq.vgId, i, alterReq.learnerReplicas[i].fqdn,
201,834✔
562
          alterReq.learnerReplicas[i].port);
563
  }
564

565
  if (alterReq.selfIndex == -1 && alterReq.learnerSelfIndex == -1) {
830,705✔
566
    terrno = TSDB_CODE_APP_ERROR;
×
567
    return NULL;
×
568
  }
569

570
  int32_t contLen = tSerializeSAlterVnodeReplicaReq(NULL, 0, &alterReq);
830,705✔
571
  if (contLen < 0) {
830,705✔
572
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
573
    return NULL;
×
574
  }
575

576
  void *pReq = taosMemoryMalloc(contLen);
830,705✔
577
  if (pReq == NULL) {
830,705✔
578
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
579
    return NULL;
×
580
  }
581

582
  if (tSerializeSAlterVnodeReplicaReq(pReq, contLen, &alterReq) < 0) {
830,705✔
583
    mError("vgId:%d, failed to serialize alter vnode req,since %s", alterReq.vgId, terrstr());
×
584
    taosMemoryFree(pReq);
×
585
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
586
    return NULL;
×
587
  }
588
  *pContLen = contLen;
830,705✔
589
  return pReq;
830,705✔
590
}
591

592
static void *mndBuildCheckLearnCatchupReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId,
×
593
                                          int32_t *pContLen) {
594
  SCheckLearnCatchupReq req = {
×
595
      .vgId = pVgroup->vgId,
×
596
      .strict = pDb->cfg.strict,
×
597
      .replica = 0,
598
      .learnerReplica = 0,
599
      .selfIndex = -1,
600
      .learnerSelfIndex = -1,
601
  };
602

603
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
×
604
    SReplica *pReplica = NULL;
×
605

606
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
607
      pReplica = &req.replicas[req.replica];
×
608
      req.replica++;
×
609
    } else {
610
      pReplica = &req.learnerReplicas[req.learnerReplica];
×
611
      req.learnerReplica++;
×
612
    }
613

614
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
×
615
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
×
616
    if (pVgidDnode == NULL) return NULL;
×
617

618
    pReplica->id = pVgidDnode->id;
×
619
    pReplica->port = pVgidDnode->port;
×
620
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
×
621
    mndReleaseDnode(pMnode, pVgidDnode);
×
622

623
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
624
      if (dnodeId == pVgid->dnodeId) {
×
625
        req.selfIndex = v;
×
626
      }
627
    } else {
628
      if (dnodeId == pVgid->dnodeId) {
×
629
        req.learnerSelfIndex = v;
×
630
      }
631
    }
632
  }
633

634
  mInfo("vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d",
×
635
        req.vgId, req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict);
636
  for (int32_t i = 0; i < req.replica; ++i) {
×
637
    mInfo("vgId:%d, replica:%d ep:%s:%u", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port);
×
638
  }
639
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
×
640
    mInfo("vgId:%d, learnerReplica:%d ep:%s:%u", req.vgId, i, req.learnerReplicas[i].fqdn, req.learnerReplicas[i].port);
×
641
  }
642

643
  if (req.selfIndex == -1 && req.learnerSelfIndex == -1) {
×
644
    terrno = TSDB_CODE_APP_ERROR;
×
645
    return NULL;
×
646
  }
647

648
  int32_t contLen = tSerializeSAlterVnodeReplicaReq(NULL, 0, &req);
×
649
  if (contLen < 0) {
×
650
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
651
    return NULL;
×
652
  }
653

654
  void *pReq = taosMemoryMalloc(contLen);
×
655
  if (pReq == NULL) {
×
656
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
657
    return NULL;
×
658
  }
659

660
  if (tSerializeSAlterVnodeReplicaReq(pReq, contLen, &req) < 0) {
×
661
    mError("vgId:%d, failed to serialize alter vnode req,since %s", req.vgId, terrstr());
×
662
    taosMemoryFree(pReq);
×
663
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
664
    return NULL;
×
665
  }
666
  *pContLen = contLen;
×
667
  return pReq;
×
668
}
669

670
static void *mndBuildDisableVnodeWriteReq(SMnode *pMnode, SDbObj *pDb, int32_t vgId, int32_t *pContLen) {
32,720✔
671
  SDisableVnodeWriteReq disableReq = {
32,720✔
672
      .vgId = vgId,
673
      .disable = 1,
674
  };
675

676
  mInfo("vgId:%d, build disable vnode write req", vgId);
32,720✔
677
  int32_t contLen = tSerializeSDisableVnodeWriteReq(NULL, 0, &disableReq);
32,720✔
678
  if (contLen < 0) {
32,720✔
679
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
680
    return NULL;
×
681
  }
682

683
  void *pReq = taosMemoryMalloc(contLen);
32,720✔
684
  if (pReq == NULL) {
32,720✔
685
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
686
    return NULL;
×
687
  }
688

689
  if (tSerializeSDisableVnodeWriteReq(pReq, contLen, &disableReq) < 0) {
32,720✔
690
    mError("vgId:%d, failed to serialize disable vnode write req,since %s", vgId, terrstr());
×
691
    taosMemoryFree(pReq);
×
692
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
693
    return NULL;
×
694
  }
695
  *pContLen = contLen;
32,720✔
696
  return pReq;
32,720✔
697
}
698

699
static void *mndBuildAlterVnodeHashRangeReq(SMnode *pMnode, int32_t srcVgId, SVgObj *pVgroup, int32_t *pContLen) {
32,720✔
700
  SAlterVnodeHashRangeReq alterReq = {
65,440✔
701
      .srcVgId = srcVgId,
702
      .dstVgId = pVgroup->vgId,
32,720✔
703
      .hashBegin = pVgroup->hashBegin,
32,720✔
704
      .hashEnd = pVgroup->hashEnd,
32,720✔
705
      .changeVersion = ++(pVgroup->syncConfChangeVer),
65,440✔
706
  };
707

708
  mInfo("vgId:%d, build alter vnode hashrange req, dstVgId:%d, hashrange:[%u, %u]", srcVgId, pVgroup->vgId,
32,720✔
709
        pVgroup->hashBegin, pVgroup->hashEnd);
710
  int32_t contLen = tSerializeSAlterVnodeHashRangeReq(NULL, 0, &alterReq);
32,720✔
711
  if (contLen < 0) {
32,720✔
712
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
713
    return NULL;
×
714
  }
715

716
  void *pReq = taosMemoryMalloc(contLen);
32,720✔
717
  if (pReq == NULL) {
32,720✔
718
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
719
    return NULL;
×
720
  }
721

722
  if (tSerializeSAlterVnodeHashRangeReq(pReq, contLen, &alterReq) < 0) {
32,720✔
723
    mError("vgId:%d, failed to serialize alter vnode hashrange req,since %s", srcVgId, terrstr());
×
724
    taosMemoryFree(pReq);
×
725
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
726
    return NULL;
×
727
  }
728
  *pContLen = contLen;
32,720✔
729
  return pReq;
32,720✔
730
}
731

732
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
4,460,302✔
733
  SDropVnodeReq dropReq = {0};
4,460,302✔
734
  dropReq.dnodeId = pDnode->id;
4,460,302✔
735
  dropReq.vgId = pVgroup->vgId;
4,460,302✔
736
  memcpy(dropReq.db, pDb->name, TSDB_DB_FNAME_LEN);
4,460,302✔
737
  dropReq.dbUid = pDb->uid;
4,460,302✔
738

739
  mInfo("vgId:%d, build drop vnode req", dropReq.vgId);
4,460,302✔
740
  int32_t contLen = tSerializeSDropVnodeReq(NULL, 0, &dropReq);
4,460,302✔
741
  if (contLen < 0) {
4,460,302✔
742
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
743
    return NULL;
×
744
  }
745

746
  void *pReq = taosMemoryMalloc(contLen);
4,460,302✔
747
  if (pReq == NULL) {
4,460,302✔
748
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
749
    return NULL;
×
750
  }
751

752
  if (tSerializeSDropVnodeReq(pReq, contLen, &dropReq) < 0) {
4,460,302✔
753
    mError("vgId:%d, failed to serialize drop vnode req,since %s", dropReq.vgId, terrstr());
×
754
    taosMemoryFree(pReq);
×
755
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
756
    return NULL;
×
757
  }
758
  *pContLen = contLen;
4,460,302✔
759
  return pReq;
4,460,302✔
760
}
761

762
static bool mndResetDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
1,969,630✔
763
  SDnodeObj *pDnode = pObj;
1,969,630✔
764
  pDnode->numOfVnodes = 0;
1,969,630✔
765
  pDnode->numOfOtherNodes = 0;
1,969,630✔
766
  return true;
1,969,630✔
767
}
768

769
static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
1,969,630✔
770
  SDnodeObj *pDnode = pObj;
1,969,630✔
771
  SArray    *pArray = p1;
1,969,630✔
772
  int32_t    exceptDnodeId = *(int32_t *)p2;
1,969,630✔
773
  SArray    *dnodeList = p3;
1,969,630✔
774

775
  if (exceptDnodeId == pDnode->id) {
1,969,630✔
776
    return true;
7,450✔
777
  }
778

779
  if (dnodeList != NULL) {
1,962,180✔
780
    int32_t dnodeListSize = taosArrayGetSize(dnodeList);
71,128✔
781
    if (dnodeListSize > 0) {
71,128✔
782
      bool inDnodeList = false;
71,128✔
783
      for (int32_t index = 0; index < dnodeListSize; ++index) {
231,784✔
784
        int32_t dnodeId = *(int32_t *)taosArrayGet(dnodeList, index);
160,656✔
785
        if (pDnode->id == dnodeId) {
160,656✔
786
          inDnodeList = true;
32,856✔
787
        }
788
      }
789
      if (!inDnodeList) {
71,128✔
790
        return true;
38,272✔
791
      }
792
    } else {
793
      return true;  // TS-6191
×
794
    }
795
  }
796

797
  int64_t curMs = taosGetTimestampMs();
1,923,908✔
798
  bool    online = mndIsDnodeOnline(pDnode, curMs);
1,923,908✔
799
  bool    isMnode = mndIsMnode(pMnode, pDnode->id);
1,923,908✔
800
  pDnode->numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
1,923,908✔
801
  pDnode->memUsed = mndGetVnodesMemory(pMnode, pDnode->id);
1,923,908✔
802

803
  mInfo("dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d online:%d memory avail:%" PRId64 " used:%" PRId64, pDnode->id,
1,923,908✔
804
        pDnode->numOfVnodes, pDnode->numOfSupportVnodes, isMnode, online, pDnode->memAvail, pDnode->memUsed);
805

806
  if (isMnode) {
1,923,908✔
807
    pDnode->numOfOtherNodes++;
1,376,804✔
808
  }
809

810
  if (online && pDnode->numOfSupportVnodes > 0) {
1,923,908✔
811
    if (taosArrayPush(pArray, pDnode) == NULL) return false;
1,876,923✔
812
  }
813
  return true;
1,923,908✔
814
}
815

816
static bool isDnodeInList(SArray *dnodeList, int32_t dnodeId) {
×
817
  int32_t dnodeListSize = taosArrayGetSize(dnodeList);
×
818
  for (int32_t i = 0; i < dnodeListSize; ++i) {
×
819
    int32_t id = *(int32_t *)TARRAY_GET_ELEM(dnodeList, i);
×
820
    if (id == dnodeId) {
×
821
      return true;
×
822
    }
823
  }
824
  return false;
×
825
}
826

827
#ifdef TD_ENTERPRISE
828
static float mndGetDnodeScore1(SDnodeObj *pDnode, int32_t additionDnodes, float ratio) {
×
829
  float totalDnodes = pDnode->numOfVnodes + (float)pDnode->numOfOtherNodes * ratio + additionDnodes;
×
830
  float result = totalDnodes / pDnode->numOfSupportVnodes;
×
831
  return pDnode->numOfVnodes > 0 ? -result : result;
×
832
}
833

834
static int32_t mndCompareDnodeVnodes1(SDnodeObj *pDnode1, SDnodeObj *pDnode2) {
×
835
  float d1Score = mndGetDnodeScore1(pDnode1, 0, 0.9);
×
836
  float d2Score = mndGetDnodeScore1(pDnode2, 0, 0.9);
×
837
  if (d1Score == d2Score) {
×
838
    if (pDnode1->id == pDnode2->id) {
×
839
      return 0;
×
840
    }
841
    return pDnode1->id > pDnode2->id ? 1 : -1;
×
842
  }
843
  return d1Score > d2Score ? 1 : -1;
×
844
}
845

846
static bool mndBuildDnodesListFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
×
847
  SDnodeObj *pDnode = pObj;
×
848
  SArray    *pArray = p1;
×
849

850
  bool isMnode = mndIsMnode(pMnode, pDnode->id);
×
851
  pDnode->numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
×
852

853
  if (isMnode) {
×
854
    pDnode->numOfOtherNodes++;
×
855
  }
856

857
  if (pDnode->numOfSupportVnodes > 0) {
×
858
    if (taosArrayPush(pArray, pDnode) == NULL) return false;
×
859
  }
860
  return true;
×
861
}
862

863
// TS-6191
864
static int32_t mndBuildNodesCheckDualReplica(SMnode *pMnode, int32_t nDnodes, SArray *dnodeList, SArray **ppDnodeList) {
1,342,336✔
865
  int32_t code = 0;
1,342,336✔
866
  if (!grantCheckDualReplicaDnodes(pMnode)) {
1,342,336✔
867
    TAOS_RETURN(code);
1,342,336✔
868
  }
869
  SSdb   *pSdb = pMnode->pSdb;
×
870
  SArray *pArray = taosArrayInit(nDnodes, sizeof(SDnodeObj));
×
871
  if (pArray == NULL) {
×
872
    TAOS_RETURN(code = terrno);
×
873
  }
874
  *ppDnodeList = pArray;
×
875

876
  sdbTraverse(pSdb, SDB_DNODE, mndResetDnodesArrayFp, NULL, NULL, NULL);
×
877
  sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesListFp, pArray, NULL, NULL);
×
878

879
  int32_t arrSize = taosArrayGetSize(pArray);
×
880
  if (arrSize <= 0) {
×
881
    TAOS_RETURN(code);
×
882
  }
883
  if (arrSize > 1) taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes1);
×
884

885
  int32_t dnodeListSize = taosArrayGetSize(dnodeList);
×
886
  if (dnodeListSize <= 0) {
×
887
    if (arrSize > 2) taosArrayRemoveBatch(pArray, 2, arrSize - 2, NULL);
×
888
  } else {
889
    int32_t nDnodesWithVnodes = 0;
×
890
    for (int32_t i = 0; i < arrSize; ++i) {
×
891
      SDnodeObj *pDnode = TARRAY_GET_ELEM(pArray, i);
×
892
      if (pDnode->numOfVnodes <= 0) {
×
893
        break;
×
894
      }
895
      ++nDnodesWithVnodes;
×
896
    }
897
    int32_t dnodeId = -1;
×
898
    if (nDnodesWithVnodes == 1) {
×
899
      dnodeId = ((SDnodeObj *)TARRAY_GET_ELEM(pArray, 0))->id;
×
900
    } else if (nDnodesWithVnodes >= 2) {
×
901
      // must select the dnodes from the 1st 2 dnodes
902
      taosArrayRemoveBatch(pArray, 2, arrSize - 2, NULL);
×
903
    }
904
    for (int32_t i = 0; i < TARRAY_SIZE(pArray);) {
×
905
      SDnodeObj *pDnode = taosArrayGet(pArray, i);
×
906
      if (!isDnodeInList(dnodeList, pDnode->id)) {
×
907
        taosArrayRemove(pArray, i);
×
908
        continue;
×
909
      }
910
      ++i;
×
911
    }
912
    if (nDnodesWithVnodes == 1) {
×
913
      SDnodeObj *pDnode = taosArrayGet(pArray, 0);
×
914
      if (pDnode && (pDnode->id != dnodeId)) {  // the first dnode is not in dnodeList, remove the last element
×
915
        taosArrayRemove(pArray, taosArrayGetSize(pArray) - 1);
×
916
      }
917
    }
918
  }
919

920
  TAOS_RETURN(code);
×
921
}
922
#endif
923

924
SArray *mndBuildDnodesArray(SMnode *pMnode, int32_t exceptDnodeId, SArray *dnodeList) {
1,342,336✔
925
  SSdb   *pSdb = pMnode->pSdb;
1,342,336✔
926
  int32_t numOfDnodes = mndGetDnodeSize(pMnode);
1,342,336✔
927
  SArray *tDnodeList = NULL;
1,342,336✔
928
  SArray *pDnodeList = NULL;
1,342,336✔
929

930
  SArray *pArray = taosArrayInit(numOfDnodes, sizeof(SDnodeObj));
1,342,336✔
931
  if (pArray == NULL) {
1,342,336✔
932
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
933
    return NULL;
×
934
  }
935
  if (taosArrayGetSize(dnodeList) > 0) {
1,342,336✔
936
    tDnodeList = dnodeList;
14,588✔
937
  }
938
#ifdef TD_ENTERPRISE
939
  if (0 != mndBuildNodesCheckDualReplica(pMnode, numOfDnodes, tDnodeList, &pDnodeList)) {
1,342,336✔
940
    taosArrayDestroy(pArray);
×
941
    return NULL;
×
942
  }
943
#endif
944
  sdbTraverse(pSdb, SDB_DNODE, mndResetDnodesArrayFp, NULL, NULL, NULL);
1,342,336✔
945
  sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesArrayFp, pArray, &exceptDnodeId, pDnodeList ? pDnodeList : tDnodeList);
1,342,336✔
946

947
  mDebug("build %d dnodes array", (int32_t)taosArrayGetSize(pArray));
1,342,336✔
948
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
3,219,259✔
949
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
1,876,923✔
950
    mDebug("dnode:%d, vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes);
1,876,923✔
951
  }
952
  taosArrayDestroy(pDnodeList);
1,342,336✔
953
  return pArray;
1,342,336✔
954
}
955

956
static int32_t mndCompareDnodeId(int32_t *dnode1Id, int32_t *dnode2Id) {
×
957
  if (*dnode1Id == *dnode2Id) {
×
958
    return 0;
×
959
  }
960
  return *dnode1Id > *dnode2Id ? 1 : -1;
×
961
}
962

963
static float mndGetDnodeScore(SDnodeObj *pDnode, int32_t additionDnodes, float ratio) {
9,550,036✔
964
  float totalDnodes = pDnode->numOfVnodes + (float)pDnode->numOfOtherNodes * ratio + additionDnodes;
9,550,036✔
965
  return totalDnodes / pDnode->numOfSupportVnodes;
9,550,036✔
966
}
967

968
static int32_t mndCompareDnodeVnodes(SDnodeObj *pDnode1, SDnodeObj *pDnode2) {
2,961,483✔
969
  float d1Score = mndGetDnodeScore(pDnode1, 0, 0.9);
2,961,483✔
970
  float d2Score = mndGetDnodeScore(pDnode2, 0, 0.9);
2,961,483✔
971
  if (d1Score == d2Score) {
2,961,483✔
972
    return 0;
973,696✔
973
  }
974
  return d1Score > d2Score ? 1 : -1;
1,987,787✔
975
}
976

977
void mndSortVnodeGid(SVgObj *pVgroup) {
2,639,574✔
978
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
5,603,461✔
979
    for (int32_t j = 0; j < pVgroup->replica - 1 - i; ++j) {
3,446,469✔
980
      if (pVgroup->vnodeGid[j].dnodeId > pVgroup->vnodeGid[j + 1].dnodeId) {
482,582✔
981
        TSWAP(pVgroup->vnodeGid[j], pVgroup->vnodeGid[j + 1]);
223,978✔
982
      }
983
    }
984
  }
985
}
2,639,574✔
986

987
static int32_t mndGetAvailableDnode(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, SArray *pArray) {
2,610,366✔
988
  mDebug("start to sort %d dnodes", (int32_t)taosArrayGetSize(pArray));
2,610,366✔
989
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
2,610,366✔
990
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
6,172,601✔
991
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
3,562,235✔
992
    mDebug("dnode:%d, score:%f", pDnode->id, mndGetDnodeScore(pDnode, 0, 0.9));
3,562,235✔
993
  }
994

995
  int32_t size = taosArrayGetSize(pArray);
2,610,366✔
996
  if (size < pVgroup->replica) {
2,610,366✔
997
    mError("db:%s, vgId:%d, no enough online dnodes:%d to alloc %d replica", pVgroup->dbName, pVgroup->vgId, size,
5,146✔
998
           pVgroup->replica);
999
    TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_DNODES);
5,146✔
1000
  }
1001

1002
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
5,474,529✔
1003
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
2,869,309✔
1004
    SDnodeObj *pDnode = taosArrayGet(pArray, v);
2,869,309✔
1005
    if (pDnode == NULL) {
2,869,309✔
1006
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_DNODES);
×
1007
    }
1008
    if (pDnode->numOfVnodes >= pDnode->numOfSupportVnodes) {
2,869,309✔
1009
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_VNODES);
×
1010
    }
1011

1012
    int64_t vgMem = mndGetVgroupMemory(pMnode, pDb, pVgroup);
2,869,309✔
1013
    if (pDnode->memAvail - vgMem - pDnode->memUsed <= 0) {
2,869,309✔
1014
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d, avail:%" PRId64 " used:%" PRId64,
×
1015
             pVgroup->dbName, pVgroup->vgId, vgMem, pDnode->id, pDnode->memAvail, pDnode->memUsed);
1016
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE);
×
1017
    } else {
1018
      pDnode->memUsed += vgMem;
2,869,309✔
1019
    }
1020

1021
    pVgid->dnodeId = pDnode->id;
2,869,309✔
1022
    if (pVgroup->replica == 1) {
2,869,309✔
1023
      pVgid->syncState = TAOS_SYNC_STATE_LEADER;
2,470,010✔
1024
    } else {
1025
      pVgid->syncState = TAOS_SYNC_STATE_FOLLOWER;
399,299✔
1026
    }
1027

1028
    mInfo("db:%s, vgId:%d, vn:%d is alloced, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
2,869,309✔
1029
          pVgroup->dbName, pVgroup->vgId, v, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
1030
    pDnode->numOfVnodes++;
2,869,309✔
1031
  }
1032

1033
  mndSortVnodeGid(pVgroup);
2,605,220✔
1034
  return 0;
2,605,220✔
1035
}
1036

1037
int32_t mndAllocSmaVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup) {
×
1038
  int32_t code = 0;
×
1039
  SArray *pArray = mndBuildDnodesArray(pMnode, 0, NULL);
×
1040
  if (pArray == NULL) {
×
1041
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1042
    if (terrno != 0) code = terrno;
×
1043
    TAOS_RETURN(code);
×
1044
  }
1045

1046
  pVgroup->vgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
×
1047
  pVgroup->isTsma = 1;
×
1048
  pVgroup->createdTime = taosGetTimestampMs();
×
1049
  pVgroup->updateTime = pVgroup->createdTime;
×
1050
  pVgroup->version = 1;
×
1051
  memcpy(pVgroup->dbName, pDb->name, TSDB_DB_FNAME_LEN);
×
1052
  pVgroup->dbUid = pDb->uid;
×
1053
  pVgroup->replica = 1;
×
1054
  pVgroup->keepVersion = -1;  // default: WAL keep version disabled
×
1055
  pVgroup->keepVersionTime = 0;
×
1056

1057
  if (mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray) != 0) return -1;
×
1058
  taosArrayDestroy(pArray);
×
1059

1060
  mInfo("db:%s, sma vgId:%d is alloced", pDb->name, pVgroup->vgId);
×
1061
  return 0;
×
1062
}
1063

1064
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups, SArray *dnodeList) {
1,217,687✔
1065
  int32_t code = -1;
1,217,687✔
1066
  SArray *pArray = NULL;
1,217,687✔
1067
  SVgObj *pVgroups = NULL;
1,217,687✔
1068

1069
  pVgroups = taosMemoryCalloc(pDb->cfg.numOfVgroups, sizeof(SVgObj));
1,217,687✔
1070
  if (pVgroups == NULL) {
1,217,687✔
1071
    code = terrno;
×
1072
    goto _OVER;
×
1073
  }
1074

1075
  pArray = mndBuildDnodesArray(pMnode, 0, dnodeList);
1,217,687✔
1076
  if (pArray == NULL) {
1,217,687✔
1077
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1078
    if (terrno != 0) code = terrno;
×
1079
    goto _OVER;
×
1080
  }
1081

1082
  mInfo("db:%s, total %d dnodes used to create %d vgroups (%d vnodes)", pDb->name, (int32_t)taosArrayGetSize(pArray),
1,217,687✔
1083
        pDb->cfg.numOfVgroups, pDb->cfg.numOfVgroups * pDb->cfg.replications);
1084

1085
  int32_t  allocedVgroups = 0;
1,217,687✔
1086
  int32_t  maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
1,217,687✔
1087
  uint32_t hashMin = 0;
1,217,687✔
1088
  uint32_t hashMax = UINT32_MAX;
1,217,687✔
1089
  uint32_t hashInterval = (hashMax - hashMin) / pDb->cfg.numOfVgroups;
1,217,687✔
1090

1091
  if (maxVgId < 2) maxVgId = 2;
1,217,687✔
1092

1093
  for (uint32_t v = 0; v < pDb->cfg.numOfVgroups; v++) {
3,822,907✔
1094
    SVgObj *pVgroup = &pVgroups[v];
2,610,366✔
1095
    pVgroup->vgId = maxVgId++;
2,610,366✔
1096
    pVgroup->createdTime = taosGetTimestampMs();
2,610,366✔
1097
    pVgroup->updateTime = pVgroups->createdTime;
2,610,366✔
1098
    pVgroup->version = 1;
2,610,366✔
1099
    pVgroup->hashBegin = hashMin + hashInterval * v;
2,610,366✔
1100
    if (v == pDb->cfg.numOfVgroups - 1) {
2,610,366✔
1101
      pVgroup->hashEnd = hashMax;
1,215,291✔
1102
    } else {
1103
      pVgroup->hashEnd = hashMin + hashInterval * (v + 1) - 1;
1,395,075✔
1104
    }
1105

1106
    memcpy(pVgroup->dbName, pDb->name, TSDB_DB_FNAME_LEN);
2,610,366✔
1107
    pVgroup->dbUid = pDb->uid;
2,610,366✔
1108
    pVgroup->replica = pDb->cfg.replications;
2,610,366✔
1109
    pVgroup->keepVersion = -1;  // default: WAL keep version disabled
2,610,366✔
1110
    pVgroup->keepVersionTime = 0;
2,610,366✔
1111

1112
    if ((code = mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray)) != 0) {
2,610,366✔
1113
      goto _OVER;
5,146✔
1114
    }
1115

1116
    allocedVgroups++;
2,605,220✔
1117
  }
1118

1119
  *ppVgroups = pVgroups;
1,212,541✔
1120
  code = 0;
1,212,541✔
1121

1122
  mInfo("db:%s, total %d vgroups is alloced, replica:%d", pDb->name, pDb->cfg.numOfVgroups, pDb->cfg.replications);
1,212,541✔
1123

1124
_OVER:
×
1125
  if (code != 0) taosMemoryFree(pVgroups);
1,217,687✔
1126
  taosArrayDestroy(pArray);
1,217,687✔
1127
  TAOS_RETURN(code);
1,217,687✔
1128
}
1129

1130
SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup) {
29,913,887✔
1131
  SEpSet epset = {0};
29,913,887✔
1132

1133
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
66,371,580✔
1134
    const SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
36,457,693✔
1135
    SDnodeObj       *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
36,457,693✔
1136
    if (pDnode == NULL) continue;
36,457,693✔
1137

1138
    if (pVgid->syncState == TAOS_SYNC_STATE_LEADER || pVgid->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
36,442,944✔
1139
      epset.inUse = epset.numOfEps;
29,482,542✔
1140
    }
1141

1142
    if (addEpIntoEpSet(&epset, pDnode->fqdn, pDnode->port) != 0) {
36,442,944✔
1143
      mWarn("vgId:%d, failed to add ep:%s:%d into epset", pVgroup->vgId, pDnode->fqdn, pDnode->port);
×
1144
    }
1145
    mndReleaseDnode(pMnode, pDnode);
36,442,944✔
1146
  }
1147
  epsetSort(&epset);
29,913,887✔
1148

1149
  return epset;
29,913,887✔
1150
}
1151

1152
SEpSet mndGetVgroupEpsetById(SMnode *pMnode, int32_t vgId) {
590,359✔
1153
  SEpSet epset = {0};
590,359✔
1154

1155
  SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
590,359✔
1156
  if (!pVgroup) return epset;
590,359✔
1157

1158
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
1,240,580✔
1159
    const SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
650,221✔
1160
    SDnodeObj       *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
650,221✔
1161
    if (pDnode == NULL) continue;
650,221✔
1162

1163
    if (pVgid->syncState == TAOS_SYNC_STATE_LEADER || pVgid->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
650,221✔
1164
      epset.inUse = epset.numOfEps;
564,277✔
1165
    }
1166

1167
    if (addEpIntoEpSet(&epset, pDnode->fqdn, pDnode->port) != 0) {
650,221✔
1168
      mWarn("vgId:%d, failed to add ep:%s:%d into epset", pVgroup->vgId, pDnode->fqdn, pDnode->port);
×
1169
    }
1170
    mndReleaseDnode(pMnode, pDnode);
650,221✔
1171
  }
1172

1173
  mndReleaseVgroup(pMnode, pVgroup);
590,359✔
1174
  return epset;
590,359✔
1175
}
1176

1177
static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
268,537✔
1178
  SMnode   *pMnode = pReq->info.node;
268,537✔
1179
  SSdb     *pSdb = pMnode->pSdb;
268,537✔
1180
  int32_t   numOfRows = 0;
268,537✔
1181
  SVgObj   *pVgroup = NULL;
268,537✔
1182
  SDbObj   *pVgDb = NULL;
268,537✔
1183
  int32_t   cols = 0;
268,537✔
1184
  int64_t   curMs = taosGetTimestampMs();
268,537✔
1185
  int32_t   code = 0, lino = 0;
268,537✔
1186
  SDbObj   *pDb = NULL;
268,537✔
1187
  SUserObj *pUser = NULL;
268,537✔
1188
  SDbObj   *pIterDb = NULL;
268,537✔
1189
  char      objFName[TSDB_OBJ_FNAME_LEN + 1] = {0};
268,537✔
1190
  bool      showAll = false, showIter = false;
268,537✔
1191
  int64_t   dbUid = 0;
268,537✔
1192

1193
  MND_SHOW_CHECK_OBJ_PRIVILEGE_ALL(RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), PRIV_SHOW_VGROUPS, PRIV_OBJ_DB, 0, _OVER);
268,537✔
1194

1195
  if (strlen(pShow->db) > 0) {
268,537✔
1196
    pDb = mndAcquireDb(pMnode, pShow->db);
233,354✔
1197
    if (pDb == NULL) {
233,354✔
1198
      goto _OVER;
×
1199
    }
1200
  }
1201

1202
  while (numOfRows < rows) {
1,512,988✔
1203
    pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
1,512,988✔
1204
    if (pShow->pIter == NULL) break;
1,512,988✔
1205

1206
    if (pDb != NULL && pVgroup->dbUid != pDb->uid) {
1,244,451✔
1207
      sdbRelease(pSdb, pVgroup);
430,703✔
1208
      continue;
430,703✔
1209
    }
1210

1211
    MND_SHOW_CHECK_DB_PRIVILEGE(pDb, pVgroup->dbName, pVgroup, RPC_MSG_TOKEN(pReq), MND_OPER_SHOW_VGROUPS, _OVER);
813,748✔
1212

1213
    cols = 0;
812,420✔
1214
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
812,420✔
1215
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->vgId, false, pVgroup, pShow->pIter, _OVER);
812,420✔
1216

1217
    SName name = {0};
812,420✔
1218
    char  db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
812,420✔
1219
    code = tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB);
812,420✔
1220
    if (code != 0) {
812,420✔
1221
      mError("vgId:%d, failed to set dbName, since %s", pVgroup->vgId, tstrerror(code));
×
1222
      sdbRelease(pSdb, pVgroup);
×
1223
      // sdbCancelFetch(pSdb, pShow->pIter);
1224
      goto _OVER;
×
1225
    }
1226
    (void)tNameGetDbName(&name, varDataVal(db));
812,420✔
1227
    varDataSetLen(db, strlen(varDataVal(db)));
812,420✔
1228

1229
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
812,420✔
1230
    COL_DATA_SET_VAL_GOTO((const char *)db, false, pVgroup, pShow->pIter, _OVER);
812,420✔
1231

1232
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
812,420✔
1233
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->numOfTables, false, pVgroup, pShow->pIter, _OVER);
812,420✔
1234

1235
    bool isReady = false;
812,420✔
1236
    bool isLeaderRestored = false;
812,420✔
1237
    bool hasFollowerRestored = false;
812,420✔
1238
    ESyncState leaderState = TAOS_SYNC_STATE_OFFLINE;
812,420✔
1239
    // default 3 replica, add 1 replica if move vnode
1240
    for (int32_t i = 0; i < 4; ++i) {
4,062,100✔
1241
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3,249,680✔
1242
      if (i < pVgroup->replica) {
3,249,680✔
1243
        int16_t dnodeId = (int16_t)pVgroup->vnodeGid[i].dnodeId;
1,730,206✔
1244
        COL_DATA_SET_VAL_GOTO((const char *)&dnodeId, false, pVgroup, pShow->pIter, _OVER);
1,730,206✔
1245

1246
        bool       exist = false;
1,730,206✔
1247
        bool       online = false;
1,730,206✔
1248
        SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgroup->vnodeGid[i].dnodeId);
1,730,206✔
1249
        if (pDnode != NULL) {
1,730,206✔
1250
          exist = true;
1,730,206✔
1251
          online = mndIsDnodeOnline(pDnode, curMs);
1,730,206✔
1252
          mndReleaseDnode(pMnode, pDnode);
1,730,206✔
1253
        }
1254

1255
        char buf1[20] = {0};
1,730,206✔
1256
        char role[20] = "offline";
1,730,206✔
1257
        if (!exist) {
1,730,206✔
1258
          tstrncpy(role, "dropping", sizeof(role));
×
1259
        } else if (online) {
1,730,206✔
1260
          if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEADER ||
1,709,664✔
1261
              pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
1,077,884✔
1262
            if (pVgroup->vnodeGid[i].syncRestore) {
631,780✔
1263
              isLeaderRestored = true;
546,719✔
1264
            }
1265
          } else if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_FOLLOWER) {
1,077,884✔
1266
            if (pVgroup->vnodeGid[i].syncRestore) {
895,538✔
1267
              hasFollowerRestored = true;
529,600✔
1268
            }
1269
          }
1270
          if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEADER ||
1,709,664✔
1271
              pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER)
1,077,884✔
1272
            leaderState = pVgroup->vnodeGid[i].syncState;
631,780✔
1273
          snprintf(role, sizeof(role), "%s", syncStr(pVgroup->vnodeGid[i].syncState));
1,709,664✔
1274
        }
1275
        STR_WITH_MAXSIZE_TO_VARSTR(buf1, role, pShow->pMeta->pSchemas[cols].bytes);
1,730,206✔
1276

1277
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,730,206✔
1278
        COL_DATA_SET_VAL_GOTO((const char *)buf1, false, pVgroup, pShow->pIter, _OVER);
1,730,206✔
1279

1280
        char applyStr[TSDB_SYNC_APPLY_COMMIT_LEN + 1] = {0};
1,730,206✔
1281
        char buf[TSDB_SYNC_APPLY_COMMIT_LEN + VARSTR_HEADER_SIZE + 1] = {0};
1,730,206✔
1282

1283
        if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEARNER &&
1,730,206✔
1284
            (pVgroup->vnodeGid[i].snapSeq > 0 && pVgroup->vnodeGid[i].snapSeq < SYNC_SNAPSHOT_SEQ_END)) {
42,838✔
1285
          if (pDb != NULL) {
×
1286
            mInfo("db:%s, learner progress:%d", pDb->name, pVgroup->vnodeGid[i].learnerProgress);
×
1287
          } else {
1288
            mInfo("db:null, learner progress:%d", pVgroup->vnodeGid[i].learnerProgress);
×
1289
          }
1290

1291
          snprintf(applyStr, sizeof(applyStr), "%" PRId64 "/%" PRId64 "/%" PRId64 "(snap:%d)(learner:%d)",
×
1292
                   pVgroup->vnodeGid[i].syncAppliedIndex, pVgroup->vnodeGid[i].syncCommitIndex,
×
1293
                   pVgroup->vnodeGid[i].syncTotalIndex, pVgroup->vnodeGid[i].snapSeq,
×
1294
                   pVgroup->vnodeGid[i].learnerProgress);
×
1295
        } else if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEARNER) {
1,730,206✔
1296
          if (pDb != NULL) {
42,838✔
1297
            mInfo("db:%s, learner progress:%d", pDb->name, pVgroup->vnodeGid[i].learnerProgress);
40,814✔
1298
          } else {
1299
            mInfo("db:null, learner progress:%d", pVgroup->vnodeGid[i].learnerProgress);
2,024✔
1300
          }
1301

1302
          snprintf(applyStr, sizeof(applyStr), "%" PRId64 "/%" PRId64 "/%" PRId64 "(learner:%d)",
171,352✔
1303
                   pVgroup->vnodeGid[i].syncAppliedIndex, pVgroup->vnodeGid[i].syncCommitIndex,
85,676✔
1304
                   pVgroup->vnodeGid[i].syncTotalIndex, pVgroup->vnodeGid[i].learnerProgress);
85,676✔
1305
        } else if (pVgroup->vnodeGid[i].snapSeq > 0 && pVgroup->vnodeGid[i].snapSeq < SYNC_SNAPSHOT_SEQ_END) {
1,687,368✔
1306
          snprintf(applyStr, sizeof(applyStr), "%" PRId64 "/%" PRId64 "(snap:%d)",
1,035✔
1307
                   pVgroup->vnodeGid[i].syncAppliedIndex, pVgroup->vnodeGid[i].syncCommitIndex,
690✔
1308
                   pVgroup->vnodeGid[i].snapSeq);
345✔
1309
        } else {
1310
          snprintf(applyStr, sizeof(applyStr), "%" PRId64 "/%" PRId64, pVgroup->vnodeGid[i].syncAppliedIndex,
1,687,023✔
1311
                   pVgroup->vnodeGid[i].syncCommitIndex);
1,687,023✔
1312
        }
1313

1314
        STR_WITH_MAXSIZE_TO_VARSTR(buf, applyStr, pShow->pMeta->pSchemas[cols].bytes);
1,730,206✔
1315

1316
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,730,206✔
1317
        COL_DATA_SET_VAL_GOTO((const char *)&buf, false, pVgroup, pShow->pIter, _OVER);
1,730,206✔
1318
      } else {
1319
        colDataSetNULL(pColInfo, numOfRows);
1,519,474✔
1320
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,519,474✔
1321
        colDataSetNULL(pColInfo, numOfRows);
1,519,474✔
1322
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,519,474✔
1323
        colDataSetNULL(pColInfo, numOfRows);
1,519,474✔
1324
      }
1325
    }
1326

1327
    if (pVgroup->replica >= 3) {
812,420✔
1328
      if (isLeaderRestored && hasFollowerRestored) isReady = true;
371,268✔
1329
    } else if (pVgroup->replica == 2) {
441,152✔
1330
      if (leaderState == TAOS_SYNC_STATE_LEADER) {
175,250✔
1331
        if (isLeaderRestored && hasFollowerRestored) isReady = true;
90,204✔
1332
      } else if (leaderState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
85,046✔
1333
        if (isLeaderRestored) isReady = true;
×
1334
      }
1335
    } else {
1336
      if (isLeaderRestored) isReady = true;
265,902✔
1337
    }
1338
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
812,420✔
1339
    COL_DATA_SET_VAL_GOTO((const char *)&isReady, false, pVgroup, pShow->pIter, _OVER);
812,420✔
1340

1341
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
812,420✔
1342
    int64_t cacheUsage = (int64_t)pVgroup->cacheUsage;
812,420✔
1343
    COL_DATA_SET_VAL_GOTO((const char *)&cacheUsage, false, pVgroup, pShow->pIter, _OVER);
812,420✔
1344

1345
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
812,420✔
1346
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->numOfCachedTables, false, pVgroup, pShow->pIter, _OVER);
812,420✔
1347

1348
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
812,420✔
1349
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->isTsma, false, pVgroup, pShow->pIter, _OVER);
812,420✔
1350

1351
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
812,420✔
1352
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->mountVgId, false, pVgroup, pShow->pIter, _OVER);
812,420✔
1353

1354
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
812,420✔
1355
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->keepVersion, false, pVgroup, pShow->pIter, _OVER);
812,420✔
1356

1357
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
812,420✔
1358
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->keepVersionTime, false, pVgroup, pShow->pIter, _OVER);
812,420✔
1359

1360
    numOfRows++;
812,420✔
1361
    sdbRelease(pSdb, pVgroup);
812,420✔
1362
  }
1363
_OVER:
268,537✔
1364
  if (pUser) mndReleaseUser(pMnode, pUser);
268,537✔
1365
  if (pDb != NULL) {
268,537✔
1366
    mndReleaseDb(pMnode, pDb);
233,354✔
1367
  }
1368
  if (code != 0) {
268,537✔
1369
    mError("failed to retrieve vgroup info at line %d since %s", lino, tstrerror(code));
×
1370
    TAOS_RETURN(code);
×
1371
  }
1372

1373
  pShow->numOfRows += numOfRows;
268,537✔
1374
  return numOfRows;
268,537✔
1375
}
1376

1377
static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter) {
×
1378
  SSdb *pSdb = pMnode->pSdb;
×
1379
  sdbCancelFetchByType(pSdb, pIter, SDB_VGROUP);
×
1380
}
×
1381

1382
static bool mndGetVnodesNumFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
10,008,766✔
1383
  SVgObj  *pVgroup = pObj;
10,008,766✔
1384
  int32_t  dnodeId = *(int32_t *)p1;
10,008,766✔
1385
  int32_t *pNumOfVnodes = (int32_t *)p2;
10,008,766✔
1386

1387
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
26,358,086✔
1388
    if (pVgroup->vnodeGid[v].dnodeId == dnodeId) {
16,349,320✔
1389
      (*pNumOfVnodes)++;
6,518,625✔
1390
    }
1391
  }
1392

1393
  return true;
10,008,766✔
1394
}
1395

1396
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) {
3,910,728✔
1397
  int32_t numOfVnodes = 0;
3,910,728✔
1398
  sdbTraverse(pMnode->pSdb, SDB_VGROUP, mndGetVnodesNumFp, &dnodeId, &numOfVnodes, NULL);
3,910,728✔
1399
  return numOfVnodes;
3,910,728✔
1400
}
1401

1402
int64_t mndGetVgroupMemory(SMnode *pMnode, SDbObj *pDbInput, SVgObj *pVgroup) {
8,316,279✔
1403
  SDbObj *pDb = pDbInput;
8,316,279✔
1404
  if (pDbInput == NULL) {
8,316,279✔
1405
    pDb = mndAcquireDb(pMnode, pVgroup->dbName);
4,790,298✔
1406
  }
1407

1408
  int64_t vgroupMemroy = 0;
8,316,279✔
1409
  if (pDb != NULL) {
8,316,279✔
1410
    int64_t buffer = (int64_t)pDb->cfg.buffer * 1024 * 1024;
8,316,279✔
1411
    int64_t cache = (int64_t)pDb->cfg.pages * pDb->cfg.pageSize * 1024;
8,316,279✔
1412
    vgroupMemroy = buffer + cache;
8,316,279✔
1413
    int64_t cacheLast = (int64_t)pDb->cfg.cacheLastSize * 1024 * 1024;
8,316,279✔
1414
    if (pDb->cfg.cacheLast > 0) {
8,316,279✔
1415
      vgroupMemroy += cacheLast;
1,041,277✔
1416
    }
1417
    mDebug("db:%s, vgroup:%d, buffer:%" PRId64 " cache:%" PRId64 " cacheLast:%" PRId64, pDb->name, pVgroup->vgId,
8,316,279✔
1418
           buffer, cache, cacheLast);
1419
  }
1420

1421
  if (pDbInput == NULL) {
8,316,279✔
1422
    mndReleaseDb(pMnode, pDb);
4,790,298✔
1423
  }
1424
  return vgroupMemroy;
8,316,279✔
1425
}
1426

1427
static bool mndGetVnodeMemroyFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
6,624,470✔
1428
  SVgObj  *pVgroup = pObj;
6,624,470✔
1429
  int32_t  dnodeId = *(int32_t *)p1;
6,624,470✔
1430
  int64_t *pVnodeMemory = (int64_t *)p2;
6,624,470✔
1431

1432
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
15,417,484✔
1433
    if (pVgroup->vnodeGid[v].dnodeId == dnodeId) {
8,793,014✔
1434
      *pVnodeMemory += mndGetVgroupMemory(pMnode, NULL, pVgroup);
4,637,268✔
1435
    }
1436
  }
1437

1438
  return true;
6,624,470✔
1439
}
1440

1441
int64_t mndGetVnodesMemory(SMnode *pMnode, int32_t dnodeId) {
1,924,193✔
1442
  int64_t vnodeMemory = 0;
1,924,193✔
1443
  sdbTraverse(pMnode->pSdb, SDB_VGROUP, mndGetVnodeMemroyFp, &dnodeId, &vnodeMemory, NULL);
1,924,193✔
1444
  return vnodeMemory;
1,924,193✔
1445
}
1446

1447
void calculateRstoreFinishTime(double rate, int64_t applyCount, char *restoreStr, size_t restoreStrSize) {
2,034✔
1448
  if (rate == 0) {
2,034✔
1449
    snprintf(restoreStr, restoreStrSize, "0:0:0");
2,034✔
1450
    return;
2,034✔
1451
  }
1452

1453
  int64_t costTime = applyCount / rate;
×
1454
  int64_t totalSeconds = costTime / 1000;
×
1455
  int64_t hours = totalSeconds / 3600;
×
1456
  totalSeconds %= 3600;
×
1457
  int64_t minutes = totalSeconds / 60;
×
1458
  int64_t seconds = totalSeconds % 60;
×
1459
  snprintf(restoreStr, restoreStrSize, "%" PRId64 ":%" PRId64 ":%" PRId64, hours, minutes, seconds);
×
1460
}
1461

1462
static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
70,725✔
1463
  SMnode   *pMnode = pReq->info.node;
70,725✔
1464
  SSdb     *pSdb = pMnode->pSdb;
70,725✔
1465
  int32_t   numOfRows = 0;
70,725✔
1466
  SVgObj   *pVgroup = NULL;
70,725✔
1467
  SDbObj   *pVgDb = NULL;
70,725✔
1468
  int32_t   cols = 0;
70,725✔
1469
  int64_t   curMs = taosGetTimestampMs();
70,725✔
1470
  int32_t   code = 0, lino = 0;
70,725✔
1471
  SUserObj *pUser = NULL;
70,725✔
1472
  SDbObj   *pDb = NULL, *pIterDb = NULL;
70,725✔
1473
  char      objFName[TSDB_OBJ_FNAME_LEN + 1] = {0};
70,725✔
1474
  bool      showAll = false, showIter = false;
70,725✔
1475
  int64_t   dbUid = 0;
70,725✔
1476

1477
  MND_SHOW_CHECK_OBJ_PRIVILEGE_ALL(RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), PRIV_SHOW_VNODES, PRIV_OBJ_DB, 0, _OVER);
70,725✔
1478

1479
  while (numOfRows < rows - TSDB_MAX_REPLICA) {
215,866✔
1480
    pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
215,866✔
1481
    if (pShow->pIter == NULL) break;
215,866✔
1482

1483
    MND_SHOW_CHECK_DB_PRIVILEGE(pDb, pVgroup->dbName, pVgroup, RPC_MSG_TOKEN(pReq), MND_OPER_SHOW_VNODES, _OVER);
145,141✔
1484

1485
    for (int32_t i = 0; i < pVgroup->replica && numOfRows < rows; ++i) {
372,285✔
1486
      SVnodeGid       *pGid = &pVgroup->vnodeGid[i];
227,144✔
1487
      SColumnInfoData *pColInfo = NULL;
227,144✔
1488
      cols = 0;
227,144✔
1489

1490
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
227,144✔
1491
      COL_DATA_SET_VAL_GOTO((const char *)&pGid->dnodeId, false, pVgroup, pShow->pIter, _OVER);
227,144✔
1492
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
227,144✔
1493
      COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->vgId, false, pVgroup, pShow->pIter, _OVER);
227,144✔
1494

1495
      // db_name
1496
      const char *dbname = mndGetDbStr(pVgroup->dbName);
227,144✔
1497
      char        b1[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
227,144✔
1498
      if (dbname != NULL) {
227,144✔
1499
        STR_WITH_MAXSIZE_TO_VARSTR(b1, dbname, TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE);
227,144✔
1500
      } else {
1501
        STR_WITH_MAXSIZE_TO_VARSTR(b1, "NULL", TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE);
×
1502
      }
1503
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
227,144✔
1504
      COL_DATA_SET_VAL_GOTO((const char *)b1, false, pVgroup, pShow->pIter, _OVER);
227,144✔
1505

1506
      // dnode is online?
1507
      SDnodeObj *pDnode = mndAcquireDnode(pMnode, pGid->dnodeId);
227,144✔
1508
      if (pDnode == NULL) {
227,144✔
1509
        mError("failed to acquire dnode. dnodeId:%d", pGid->dnodeId);
×
1510
        break;
×
1511
      }
1512
      bool isDnodeOnline = mndIsDnodeOnline(pDnode, curMs);
227,144✔
1513
      sdbRelease(pSdb, pDnode);
227,144✔
1514

1515
      char       buf[20] = {0};
227,144✔
1516
      ESyncState syncState = (isDnodeOnline) ? pGid->syncState : TAOS_SYNC_STATE_OFFLINE;
227,144✔
1517
      STR_TO_VARSTR(buf, syncStr(syncState));
227,144✔
1518
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
227,144✔
1519
      COL_DATA_SET_VAL_GOTO((const char *)buf, false, pVgroup, pShow->pIter, _OVER);
227,144✔
1520

1521
      int64_t roleTimeMs = (isDnodeOnline) ? pGid->roleTimeMs : 0;
227,144✔
1522
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
227,144✔
1523
      COL_DATA_SET_VAL_GOTO((const char *)&roleTimeMs, false, pVgroup, pShow->pIter, _OVER);
227,144✔
1524

1525
      int64_t startTimeMs = (isDnodeOnline) ? pGid->startTimeMs : 0;
227,144✔
1526
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
227,144✔
1527
      COL_DATA_SET_VAL_GOTO((const char *)&startTimeMs, false, pVgroup, pShow->pIter, _OVER);
227,144✔
1528

1529
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
227,144✔
1530
      COL_DATA_SET_VAL_GOTO((const char *)&pGid->syncRestore, false, pVgroup, pShow->pIter, _OVER);
227,144✔
1531

1532
      int64_t unappliedCount = pGid->syncCommitIndex - pGid->syncAppliedIndex;
227,144✔
1533
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
227,144✔
1534
      char restoreStr[20] = {0};
227,144✔
1535
      if (unappliedCount > 0) {
227,144✔
1536
        calculateRstoreFinishTime(pGid->appliedRate, unappliedCount, restoreStr, sizeof(restoreStr));
2,034✔
1537
      }
1538
      STR_TO_VARSTR(buf, restoreStr);
227,144✔
1539
      COL_DATA_SET_VAL_GOTO((const char *)&buf, false, pVgroup, pShow->pIter, _OVER);
227,144✔
1540

1541
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
227,144✔
1542
      COL_DATA_SET_VAL_GOTO((const char *)&unappliedCount, false, pVgroup, pShow->pIter, _OVER);
227,144✔
1543

1544
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
227,144✔
1545
      COL_DATA_SET_VAL_GOTO((const char *)&pGid->bufferSegmentUsed, false, pVgroup, pShow->pIter, _OVER);
227,144✔
1546

1547
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
227,144✔
1548
      COL_DATA_SET_VAL_GOTO((const char *)&pGid->bufferSegmentSize, false, pVgroup, pShow->pIter, _OVER);
227,144✔
1549

1550
      numOfRows++;
227,144✔
1551
    }
1552
    sdbRelease(pSdb, pVgroup);
145,141✔
1553
  }
1554
_OVER:
70,725✔
1555
  if (pUser) mndReleaseUser(pMnode, pUser);
70,725✔
1556
  if (pDb) mndReleaseDb(pMnode, pDb);
70,725✔
1557
  if (code != 0) {
70,725✔
1558
    mError("failed to retrieve vnode info at line %d since %s", lino, tstrerror(code));
×
1559
    return code;
×
1560
  }
1561
  pShow->numOfRows += numOfRows;
70,725✔
1562
  return numOfRows;
70,725✔
1563
}
1564

1565
static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter) {
×
1566
  SSdb *pSdb = pMnode->pSdb;
×
1567
  sdbCancelFetchByType(pSdb, pIter, SDB_VGROUP);
×
1568
}
×
1569

1570
static int32_t mndAddVnodeToVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray) {
86,986✔
1571
  int32_t code = 0;
86,986✔
1572
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
86,986✔
1573
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
347,809✔
1574
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
260,823✔
1575
    mInfo("trans:%d, dnode:%d, equivalent vnodes:%d others:%d", pTrans->id, pDnode->id, pDnode->numOfVnodes,
260,823✔
1576
          pDnode->numOfOtherNodes);
1577
  }
1578

1579
  SVnodeGid *pVgid = &pVgroup->vnodeGid[pVgroup->replica];
86,986✔
1580
  for (int32_t d = 0; d < taosArrayGetSize(pArray); ++d) {
109,506✔
1581
    SDnodeObj *pDnode = taosArrayGet(pArray, d);
106,186✔
1582

1583
    bool used = false;
106,186✔
1584
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
248,461✔
1585
      if (pDnode->id == pVgroup->vnodeGid[vn].dnodeId) {
164,795✔
1586
        used = true;
22,520✔
1587
        break;
22,520✔
1588
      }
1589
    }
1590
    if (used) continue;
106,186✔
1591

1592
    if (pDnode == NULL) {
83,666✔
1593
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_DNODES);
×
1594
    }
1595
    if (pDnode->numOfVnodes >= pDnode->numOfSupportVnodes) {
83,666✔
1596
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_VNODES);
×
1597
    }
1598

1599
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
83,666✔
1600
    if (pDnode->memAvail - vgMem - pDnode->memUsed <= 0) {
83,666✔
1601
      mError("trans:%d, db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
1602
             pTrans->id, pVgroup->dbName, pVgroup->vgId, vgMem, pDnode->id, pDnode->memAvail, pDnode->memUsed);
1603
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE);
×
1604
    } else {
1605
      pDnode->memUsed += vgMem;
83,666✔
1606
    }
1607

1608
    pVgid->dnodeId = pDnode->id;
83,666✔
1609
    pVgid->syncState = TAOS_SYNC_STATE_OFFLINE;
83,666✔
1610
    mInfo("trans:%id, db:%s, vgId:%d, vn:%d is added, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
83,666✔
1611
          pTrans->id, pVgroup->dbName, pVgroup->vgId, pVgroup->replica, vgMem, pVgid->dnodeId, pDnode->memAvail,
1612
          pDnode->memUsed);
1613

1614
    pVgroup->replica++;
83,666✔
1615
    pDnode->numOfVnodes++;
83,666✔
1616

1617
    SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
83,666✔
1618
    if (pVgRaw == NULL) {
83,666✔
1619
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1620
      if (terrno != 0) code = terrno;
×
1621
      TAOS_RETURN(code);
×
1622
    }
1623
    if ((code = mndTransAppendGroupRedolog(pTrans, pVgRaw, pVgroup->vgId)) != 0) {
83,666✔
1624
      sdbFreeRaw(pVgRaw);
×
1625
      TAOS_RETURN(code);
×
1626
    }
1627
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
83,666✔
1628
    if (code != 0) {
83,666✔
1629
      mError("trans:%d, vgId:%d, failed to set raw status since %s at line:%d", pTrans->id, pVgroup->vgId,
×
1630
             tstrerror(code), __LINE__);
1631
    }
1632
    TAOS_RETURN(code);
83,666✔
1633
  }
1634

1635
  code = TSDB_CODE_MND_NO_ENOUGH_DNODES;
3,320✔
1636
  mError("trans:%d, db:%s, failed to add vnode to vgId:%d since %s", pTrans->id, pVgroup->dbName, pVgroup->vgId,
3,320✔
1637
         tstrerror(code));
1638
  TAOS_RETURN(code);
3,320✔
1639
}
1640

1641
static int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray,
17,038✔
1642
                                        SVnodeGid *pDelVgid) {
1643
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
17,038✔
1644
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
66,349✔
1645
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
49,311✔
1646
    mInfo("trans:%d, dnode:%d, equivalent vnodes:%d others:%d", pTrans->id, pDnode->id, pDnode->numOfVnodes,
49,311✔
1647
          pDnode->numOfOtherNodes);
1648
  }
1649

1650
  int32_t code = -1;
17,038✔
1651
  for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) {
19,766✔
1652
    SDnodeObj *pDnode = taosArrayGet(pArray, d);
19,466✔
1653

1654
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
38,555✔
1655
      SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
35,827✔
1656
      if (pVgid->dnodeId == pDnode->id) {
35,827✔
1657
        int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
16,738✔
1658
        pDnode->memUsed -= vgMem;
16,738✔
1659
        mInfo("trans:%d, db:%s, vgId:%d, vn:%d is removed, memory:%" PRId64 ", dnode:%d avail:%" PRId64
16,738✔
1660
              " used:%" PRId64,
1661
              pTrans->id, pVgroup->dbName, pVgroup->vgId, vn, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
1662
        pDnode->numOfVnodes--;
16,738✔
1663
        pVgroup->replica--;
16,738✔
1664
        *pDelVgid = *pVgid;
16,738✔
1665
        *pVgid = pVgroup->vnodeGid[pVgroup->replica];
16,738✔
1666
        memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
16,738✔
1667
        code = 0;
16,738✔
1668
        goto _OVER;
16,738✔
1669
      }
1670
    }
1671
  }
1672

1673
_OVER:
300✔
1674
  if (code != 0) {
17,038✔
1675
    code = TSDB_CODE_APP_ERROR;
300✔
1676
    mError("trans:%d, db:%s, failed to remove vnode from vgId:%d since %s", pTrans->id, pVgroup->dbName, pVgroup->vgId,
300✔
1677
           tstrerror(code));
1678
    TAOS_RETURN(code);
300✔
1679
  }
1680

1681
  for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
46,694✔
1682
    SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
29,956✔
1683
    mInfo("trans:%d, db:%s, vgId:%d, vn:%d dnode:%d is reserved", pTrans->id, pVgroup->dbName, pVgroup->vgId, vn,
29,956✔
1684
          pVgid->dnodeId);
1685
  }
1686

1687
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
16,738✔
1688
  if (pVgRaw == NULL) {
16,738✔
1689
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1690
    if (terrno != 0) code = terrno;
×
1691
    TAOS_RETURN(code);
×
1692
  }
1693
  if (mndTransAppendGroupRedolog(pTrans, pVgRaw, pVgroup->vgId) != 0) {
16,738✔
1694
    sdbFreeRaw(pVgRaw);
×
1695
    TAOS_RETURN(code);
×
1696
  }
1697
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
16,738✔
1698
  if (code != 0) {
16,738✔
1699
    mError("trans:%d, vgId:%d, failed to set raw status since %s at line:%d", pTrans->id, pVgroup->vgId,
×
1700
           tstrerror(code), __LINE__);
1701
  }
1702

1703
  TAOS_RETURN(code);
16,738✔
1704
}
1705

1706
static int32_t mndRemoveVnodeFromVgroupWithoutSave(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray,
×
1707
                                                   SVnodeGid *pDelVgid) {
1708
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
×
1709
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
×
1710
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
×
1711
    mInfo("dnode:%d, equivalent vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes);
×
1712
  }
1713

1714
  int32_t code = -1;
×
1715
  for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) {
×
1716
    SDnodeObj *pDnode = taosArrayGet(pArray, d);
×
1717

1718
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
×
1719
      SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
×
1720
      if (pVgid->dnodeId == pDnode->id) {
×
1721
        int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
×
1722
        pDnode->memUsed -= vgMem;
×
1723
        mInfo("db:%s, vgId:%d, vn:%d is removed, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
×
1724
              pVgroup->dbName, pVgroup->vgId, vn, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
1725
        pDnode->numOfVnodes--;
×
1726
        pVgroup->replica--;
×
1727
        *pDelVgid = *pVgid;
×
1728
        *pVgid = pVgroup->vnodeGid[pVgroup->replica];
×
1729
        memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
×
1730
        code = 0;
×
1731
        goto _OVER;
×
1732
      }
1733
    }
1734
  }
1735

1736
_OVER:
×
1737
  if (code != 0) {
×
1738
    code = TSDB_CODE_APP_ERROR;
×
1739
    mError("db:%s, failed to remove vnode from vgId:%d since %s", pVgroup->dbName, pVgroup->vgId, tstrerror(code));
×
1740
    TAOS_RETURN(code);
×
1741
  }
1742

1743
  for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
×
1744
    SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
×
1745
    mInfo("db:%s, vgId:%d, vn:%d dnode:%d is reserved", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId);
×
1746
  }
1747

1748
  TAOS_RETURN(code);
×
1749
}
1750

1751
int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid) {
3,020,833✔
1752
  int32_t      code = 0;
3,020,833✔
1753
  STransAction action = {0};
3,020,833✔
1754

1755
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
3,020,833✔
1756
  if (pDnode == NULL) return -1;
3,020,833✔
1757
  action.epSet = mndGetDnodeEpset(pDnode);
3,020,833✔
1758
  mndReleaseDnode(pMnode, pDnode);
3,020,833✔
1759

1760
  int32_t contLen = 0;
3,020,833✔
1761
  void   *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
3,020,833✔
1762
  if (pReq == NULL) return -1;
3,020,833✔
1763

1764
  action.pCont = pReq;
3,020,833✔
1765
  action.contLen = contLen;
3,020,833✔
1766
  action.msgType = TDMT_DND_CREATE_VNODE;
3,020,833✔
1767
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
3,020,833✔
1768
  action.groupId = pVgroup->vgId;
3,020,833✔
1769

1770
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
3,020,833✔
1771
    taosMemoryFree(pReq);
×
1772
    TAOS_RETURN(code);
×
1773
  }
1774

1775
  TAOS_RETURN(code);
3,020,833✔
1776
}
1777

1778
int32_t mndRestoreAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
2,125✔
1779
                                       SDnodeObj *pDnode) {
1780
  int32_t      code = 0;
2,125✔
1781
  STransAction action = {0};
2,125✔
1782

1783
  action.epSet = mndGetDnodeEpset(pDnode);
2,125✔
1784

1785
  int32_t contLen = 0;
2,125✔
1786
  void   *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
2,125✔
1787
  if (pReq == NULL) {
2,125✔
1788
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1789
    if (terrno != 0) code = terrno;
×
1790
    TAOS_RETURN(code);
×
1791
  }
1792

1793
  action.pCont = pReq;
2,125✔
1794
  action.contLen = contLen;
2,125✔
1795
  action.msgType = TDMT_DND_CREATE_VNODE;
2,125✔
1796
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
2,125✔
1797
  action.groupId = pVgroup->vgId;
2,125✔
1798

1799
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
2,125✔
1800
    taosMemoryFree(pReq);
×
1801
    TAOS_RETURN(code);
×
1802
  }
1803

1804
  TAOS_RETURN(code);
2,125✔
1805
}
1806

1807
int32_t mndAddAlterVnodeConfirmAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
295,567✔
1808
  int32_t      code = 0;
295,567✔
1809
  STransAction action = {0};
295,567✔
1810
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
295,567✔
1811

1812
  mInfo("trans:%d, vgId:%d, build alter vnode confirm req", pTrans->id, pVgroup->vgId);
295,567✔
1813
  int32_t   contLen = sizeof(SMsgHead);
295,567✔
1814
  SMsgHead *pHead = taosMemoryMalloc(contLen);
295,567✔
1815
  if (pHead == NULL) {
295,567✔
1816
    TAOS_RETURN(terrno);
×
1817
  }
1818

1819
  pHead->contLen = htonl(contLen);
295,567✔
1820
  pHead->vgId = htonl(pVgroup->vgId);
295,567✔
1821

1822
  action.pCont = pHead;
295,567✔
1823
  action.contLen = contLen;
295,567✔
1824
  action.msgType = TDMT_VND_ALTER_CONFIRM;
295,567✔
1825
  // incorrect redirect result will cause this erro
1826
  action.retryCode = TSDB_CODE_VND_INVALID_VGROUP_ID;
295,567✔
1827
  action.groupId = pVgroup->vgId;
295,567✔
1828

1829
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
295,567✔
1830
    taosMemoryFree(pHead);
×
1831
    TAOS_RETURN(code);
×
1832
  }
1833

1834
  TAOS_RETURN(code);
295,567✔
1835
}
1836

1837
int32_t mndAddChangeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pOldVgroup, SVgObj *pNewVgroup,
×
1838
                                 int32_t dnodeId) {
1839
  int32_t      code = 0;
×
1840
  STransAction action = {0};
×
1841
  action.epSet = mndGetVgroupEpset(pMnode, pNewVgroup);
×
1842

1843
  int32_t contLen = 0;
×
1844
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pNewVgroup, dnodeId, &contLen);
×
1845
  if (pReq == NULL) {
×
1846
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1847
    if (terrno != 0) code = terrno;
×
1848
    TAOS_RETURN(code);
×
1849
  }
1850

1851
  int32_t totallen = contLen + sizeof(SMsgHead);
×
1852

1853
  SMsgHead *pHead = taosMemoryMalloc(totallen);
×
1854
  if (pHead == NULL) {
×
1855
    taosMemoryFree(pReq);
×
1856
    TAOS_RETURN(terrno);
×
1857
  }
1858

1859
  pHead->contLen = htonl(totallen);
×
1860
  pHead->vgId = htonl(pNewVgroup->vgId);
×
1861

1862
  memcpy((void *)(pHead + 1), pReq, contLen);
×
1863
  taosMemoryFree(pReq);
×
1864

1865
  action.pCont = pHead;
×
1866
  action.contLen = totallen;
×
1867
  action.msgType = TDMT_SYNC_CONFIG_CHANGE;
×
1868

1869
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
1870
    taosMemoryFree(pHead);
×
1871
    TAOS_RETURN(code);
×
1872
  }
1873

1874
  TAOS_RETURN(code);
×
1875
}
1876

1877
static int32_t mndAddAlterVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, int32_t srcVgId, SVgObj *pVgroup) {
32,720✔
1878
  int32_t      code = 0;
32,720✔
1879
  STransAction action = {0};
32,720✔
1880
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
32,720✔
1881

1882
  int32_t contLen = 0;
32,720✔
1883
  void   *pReq = mndBuildAlterVnodeHashRangeReq(pMnode, srcVgId, pVgroup, &contLen);
32,720✔
1884
  if (pReq == NULL) {
32,720✔
1885
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1886
    if (terrno != 0) code = terrno;
×
1887
    TAOS_RETURN(code);
×
1888
  }
1889

1890
  action.pCont = pReq;
32,720✔
1891
  action.contLen = contLen;
32,720✔
1892
  action.msgType = TDMT_VND_ALTER_HASHRANGE;
32,720✔
1893
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
32,720✔
1894

1895
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
32,720✔
1896
    taosMemoryFree(pReq);
×
1897
    TAOS_RETURN(code);
×
1898
  }
1899

1900
  mInfo("trans:%d, add alter vnode hash range action for from vgId:%d to vgId:%d", pTrans->id, srcVgId, pVgroup->vgId);
32,720✔
1901
  TAOS_RETURN(code);
32,720✔
1902
}
1903

1904
int32_t mndAddAlterVnodeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
206,394✔
1905
  int32_t      code = 0;
206,394✔
1906
  STransAction action = {0};
206,394✔
1907
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
206,394✔
1908

1909
  int32_t contLen = 0;
206,394✔
1910
  void   *pReq = mndBuildAlterVnodeConfigReq(pMnode, pDb, pVgroup, &contLen);
206,394✔
1911
  if (pReq == NULL) {
206,394✔
1912
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1913
    if (terrno != 0) code = terrno;
×
1914
    TAOS_RETURN(code);
×
1915
  }
1916

1917
  action.pCont = pReq;
206,394✔
1918
  action.contLen = contLen;
206,394✔
1919
  action.msgType = TDMT_VND_ALTER_CONFIG;
206,394✔
1920
  action.groupId = pVgroup->vgId;
206,394✔
1921

1922
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
206,394✔
1923
    taosMemoryFree(pReq);
×
1924
    TAOS_RETURN(code);
×
1925
  }
1926

1927
  TAOS_RETURN(code);
206,394✔
1928
}
1929

1930
int32_t mndAddNewVgPrepareAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVg) {
2,639,080✔
1931
  int32_t  code = 0;
2,639,080✔
1932
  SSdbRaw *pRaw = mndVgroupActionEncode(pVg);
2,639,080✔
1933
  if (pRaw == NULL) {
2,639,080✔
1934
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1935
    if (terrno != 0) code = terrno;
×
1936
    goto _err;
×
1937
  }
1938

1939
  TAOS_CHECK_GOTO(mndTransAppendPrepareLog(pTrans, pRaw), NULL, _err);
2,639,080✔
1940
  if (sdbSetRawStatus(pRaw, SDB_STATUS_CREATING) != 0) {
2,639,080✔
1941
    mError("vgId:%d, failed to set raw status at line:%d", pVg->vgId, __LINE__);
×
1942
  }
1943
  if (code != 0) {
2,639,080✔
1944
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVg->vgId, tstrerror(code), __LINE__);
×
1945
    TAOS_RETURN(code);
×
1946
  }
1947
  pRaw = NULL;
2,639,080✔
1948
  TAOS_RETURN(code);
2,639,080✔
1949

1950
_err:
×
1951
  sdbFreeRaw(pRaw);
×
1952
  TAOS_RETURN(code);
×
1953
}
1954

1955
int32_t mndAddAlterVnodeReplicaAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
721,350✔
1956
  int32_t    code = 0;
721,350✔
1957
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
721,350✔
1958
  if (pDnode == NULL) {
721,350✔
1959
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1960
    if (terrno != 0) code = terrno;
×
1961
    TAOS_RETURN(code);
×
1962
  }
1963

1964
  STransAction action = {0};
721,350✔
1965
  action.epSet = mndGetDnodeEpset(pDnode);
721,350✔
1966
  mndReleaseDnode(pMnode, pDnode);
721,350✔
1967

1968
  int32_t contLen = 0;
721,350✔
1969
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
721,350✔
1970
  if (pReq == NULL) {
721,350✔
1971
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1972
    if (terrno != 0) code = terrno;
×
1973
    TAOS_RETURN(code);
×
1974
  }
1975

1976
  action.pCont = pReq;
721,350✔
1977
  action.contLen = contLen;
721,350✔
1978
  action.msgType = TDMT_VND_ALTER_REPLICA;
721,350✔
1979
  action.groupId = pVgroup->vgId;
721,350✔
1980

1981
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
721,350✔
1982
    taosMemoryFree(pReq);
×
1983
    TAOS_RETURN(code);
×
1984
  }
1985

1986
  TAOS_RETURN(code);
721,350✔
1987
}
1988

1989
int32_t mndAddCheckLearnerCatchupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
×
1990
  int32_t    code = 0;
×
1991
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
×
1992
  if (pDnode == NULL) {
×
1993
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1994
    if (terrno != 0) code = terrno;
×
1995
    TAOS_RETURN(code);
×
1996
  }
1997

1998
  STransAction action = {0};
×
1999
  action.epSet = mndGetDnodeEpset(pDnode);
×
2000
  mndReleaseDnode(pMnode, pDnode);
×
2001

2002
  int32_t contLen = 0;
×
2003
  void   *pReq = mndBuildCheckLearnCatchupReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
×
2004
  if (pReq == NULL) {
×
2005
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2006
    if (terrno != 0) code = terrno;
×
2007
    TAOS_RETURN(code);
×
2008
  }
2009

2010
  action.pCont = pReq;
×
2011
  action.contLen = contLen;
×
2012
  action.msgType = TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP;
×
2013
  action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
2014
  action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
×
2015

2016
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
2017
    taosMemoryFree(pReq);
×
2018
    TAOS_RETURN(code);
×
2019
  }
2020

2021
  TAOS_RETURN(code);
×
2022
}
2023

2024
int32_t mndAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
107,230✔
2025
  int32_t    code = 0;
107,230✔
2026
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
107,230✔
2027
  if (pDnode == NULL) {
107,230✔
2028
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2029
    if (terrno != 0) code = terrno;
×
2030
    TAOS_RETURN(code);
×
2031
  }
2032

2033
  STransAction action = {0};
107,230✔
2034
  action.epSet = mndGetDnodeEpset(pDnode);
107,230✔
2035
  mndReleaseDnode(pMnode, pDnode);
107,230✔
2036

2037
  int32_t contLen = 0;
107,230✔
2038
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
107,230✔
2039
  if (pReq == NULL) {
107,230✔
2040
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2041
    if (terrno != 0) code = terrno;
×
2042
    TAOS_RETURN(code);
×
2043
  }
2044

2045
  action.pCont = pReq;
107,230✔
2046
  action.contLen = contLen;
107,230✔
2047
  action.msgType = TDMT_DND_ALTER_VNODE_TYPE;
107,230✔
2048
  action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
107,230✔
2049
  action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
107,230✔
2050
  action.groupId = pVgroup->vgId;
107,230✔
2051

2052
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
107,230✔
2053
    taosMemoryFree(pReq);
×
2054
    TAOS_RETURN(code);
×
2055
  }
2056

2057
  TAOS_RETURN(code);
107,230✔
2058
}
2059

2060
int32_t mndRestoreAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
2,125✔
2061
                                          SDnodeObj *pDnode) {
2062
  int32_t      code = 0;
2,125✔
2063
  STransAction action = {0};
2,125✔
2064
  action.epSet = mndGetDnodeEpset(pDnode);
2,125✔
2065

2066
  int32_t contLen = 0;
2,125✔
2067
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, pDnode->id, &contLen);
2,125✔
2068
  if (pReq == NULL) {
2,125✔
2069
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2070
    if (terrno != 0) code = terrno;
×
2071
    TAOS_RETURN(code);
×
2072
  }
2073

2074
  action.pCont = pReq;
2,125✔
2075
  action.contLen = contLen;
2,125✔
2076
  action.msgType = TDMT_DND_ALTER_VNODE_TYPE;
2,125✔
2077
  action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
2,125✔
2078
  action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
2,125✔
2079
  action.groupId = pVgroup->vgId;
2,125✔
2080

2081
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
2,125✔
2082
    taosMemoryFree(pReq);
×
2083
    TAOS_RETURN(code);
×
2084
  }
2085

2086
  TAOS_RETURN(code);
2,125✔
2087
}
2088

2089
static int32_t mndAddDisableVnodeWriteAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
32,720✔
2090
                                             int32_t dnodeId) {
2091
  int32_t    code = 0;
32,720✔
2092
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
32,720✔
2093
  if (pDnode == NULL) {
32,720✔
2094
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2095
    if (terrno != 0) code = terrno;
×
2096
    TAOS_RETURN(code);
×
2097
  }
2098

2099
  STransAction action = {0};
32,720✔
2100
  action.epSet = mndGetDnodeEpset(pDnode);
32,720✔
2101
  mndReleaseDnode(pMnode, pDnode);
32,720✔
2102

2103
  int32_t contLen = 0;
32,720✔
2104
  void   *pReq = mndBuildDisableVnodeWriteReq(pMnode, pDb, pVgroup->vgId, &contLen);
32,720✔
2105
  if (pReq == NULL) {
32,720✔
2106
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2107
    if (terrno != 0) code = terrno;
×
2108
    TAOS_RETURN(code);
×
2109
  }
2110

2111
  action.pCont = pReq;
32,720✔
2112
  action.contLen = contLen;
32,720✔
2113
  action.msgType = TDMT_VND_DISABLE_WRITE;
32,720✔
2114

2115
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
32,720✔
2116
    taosMemoryFree(pReq);
×
2117
    TAOS_RETURN(code);
×
2118
  }
2119

2120
  TAOS_RETURN(code);
32,720✔
2121
}
2122

2123
int32_t mndAddDropVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid,
4,460,302✔
2124
                              bool isRedo) {
2125
  int32_t      code = 0;
4,460,302✔
2126
  STransAction action = {0};
4,460,302✔
2127

2128
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
4,460,302✔
2129
  if (pDnode == NULL) {
4,460,302✔
2130
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2131
    if (terrno != 0) code = terrno;
×
2132
    TAOS_RETURN(code);
×
2133
  }
2134
  action.epSet = mndGetDnodeEpset(pDnode);
4,460,302✔
2135
  mndReleaseDnode(pMnode, pDnode);
4,460,302✔
2136

2137
  int32_t contLen = 0;
4,460,302✔
2138
  void   *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
4,460,302✔
2139
  if (pReq == NULL) {
4,460,302✔
2140
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2141
    if (terrno != 0) code = terrno;
×
2142
    TAOS_RETURN(code);
×
2143
  }
2144

2145
  action.pCont = pReq;
4,460,302✔
2146
  action.contLen = contLen;
4,460,302✔
2147
  action.msgType = TDMT_DND_DROP_VNODE;
4,460,302✔
2148
  action.acceptableCode = TSDB_CODE_VND_NOT_EXIST;
4,460,302✔
2149
  action.groupId = pVgroup->vgId;
4,460,302✔
2150

2151
  if (isRedo) {
4,460,302✔
2152
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
1,590,993✔
2153
      taosMemoryFree(pReq);
×
2154
      TAOS_RETURN(code);
×
2155
    }
2156
  } else {
2157
    if ((code = mndTransAppendUndoAction(pTrans, &action)) != 0) {
2,869,309✔
2158
      taosMemoryFree(pReq);
×
2159
      TAOS_RETURN(code);
×
2160
    }
2161
  }
2162

2163
  TAOS_RETURN(code);
4,460,302✔
2164
}
2165

2166
int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t vnIndex,
14,904✔
2167
                                    SArray *pArray, bool force, bool unsafe) {
2168
  int32_t code = 0;
14,904✔
2169
  SVgObj  newVg = {0};
14,904✔
2170
  memcpy(&newVg, pVgroup, sizeof(SVgObj));
14,904✔
2171

2172
  mInfo("vgId:%d, trans:%d, vgroup info before move, replica:%d", newVg.vgId, pTrans->id, newVg.replica);
14,904✔
2173
  for (int32_t i = 0; i < newVg.replica; ++i) {
48,146✔
2174
    mInfo("vgId:%d, trans:%d, vnode:%d dnode:%d", newVg.vgId, pTrans->id, i, newVg.vnodeGid[i].dnodeId);
33,242✔
2175
  }
2176

2177
  if (!force) {
14,904✔
2178
#if 1
2179
    {
2180
#else
2181
    if (newVg.replica == 1) {
2182
#endif
2183
      mInfo("vgId:%d, trans:%d, will add 1 vnode, replca:%d", pVgroup->vgId, pTrans->id, newVg.replica);
14,904✔
2184
      TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray));
14,904✔
2185
      for (int32_t i = 0; i < newVg.replica - 1; ++i) {
48,146✔
2186
        TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId));
33,242✔
2187
      }
2188
      TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[newVg.replica - 1]));
14,904✔
2189
      TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg));
14,904✔
2190

2191
      mInfo("vgId:%d, trans:%d, will remove 1 vnode, replca:2", pVgroup->vgId, pTrans->id);
14,904✔
2192
      newVg.replica--;
14,904✔
2193
      SVnodeGid del = newVg.vnodeGid[vnIndex];
14,904✔
2194
      newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
14,904✔
2195
      memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
14,904✔
2196
      {
2197
        SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
14,904✔
2198
        if (pRaw == NULL) {
14,904✔
2199
          code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2200
          if (terrno != 0) code = terrno;
×
2201
          TAOS_RETURN(code);
×
2202
        }
2203
        if ((code = mndTransAppendGroupRedolog(pTrans, pRaw, pVgroup->vgId)) != 0) {
14,904✔
2204
          sdbFreeRaw(pRaw);
×
2205
          TAOS_RETURN(code);
×
2206
        }
2207
        code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
14,904✔
2208
        if (code != 0) {
14,904✔
2209
          mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2210
          return code;
×
2211
        }
2212
      }
2213

2214
      TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true));
14,904✔
2215
      for (int32_t i = 0; i < newVg.replica; ++i) {
48,146✔
2216
        TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId));
33,242✔
2217
      }
2218
      TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg));
14,904✔
2219
#if 1
2220
    }
2221
#else
2222
    } else {  // new replica == 3
2223
      mInfo("vgId:%d, will add 1 vnode, replca:3", pVgroup->vgId);
2224
      if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1;
2225
      mInfo("vgId:%d, will remove 1 vnode, replca:4", pVgroup->vgId);
2226
      newVg.replica--;
2227
      SVnodeGid del = newVg.vnodeGid[vnIndex];
2228
      newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
2229
      memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
2230
      {
2231
        SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
2232
        if (pRaw == NULL) return -1;
2233
        if (mndTransAppendRedolog(pTrans, pRaw) != 0) {
2234
          sdbFreeRaw(pRaw);
2235
          return -1;
2236
        }
2237
      }
2238

2239
      if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true) != 0) return -1;
2240
      for (int32_t i = 0; i < newVg.replica; ++i) {
2241
        if (i == vnIndex) continue;
2242
        if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1;
2243
      }
2244
      if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[vnIndex]) != 0) return -1;
2245
      if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
2246
    }
2247
#endif
2248
  } else {
2249
    mInfo("vgId:%d, will add 1 vnode and force remove 1 vnode", pVgroup->vgId);
×
2250
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray));
×
2251
    newVg.replica--;
×
2252
    // SVnodeGid del = newVg.vnodeGid[vnIndex];
2253
    newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
×
2254
    memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
×
2255
    {
2256
      SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
×
2257
      if (pRaw == NULL) {
×
2258
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2259
        if (terrno != 0) code = terrno;
×
2260
        TAOS_RETURN(code);
×
2261
      }
2262
      if ((code = mndTransAppendGroupRedolog(pTrans, pRaw, pVgroup->vgId)) != 0) {
×
2263
        sdbFreeRaw(pRaw);
×
2264
        TAOS_RETURN(code);
×
2265
      }
2266
      code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
×
2267
      if (code != 0) {
×
2268
        mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2269
        return code;
×
2270
      }
2271
    }
2272

2273
    for (int32_t i = 0; i < newVg.replica; ++i) {
×
2274
      if (i != vnIndex) {
×
2275
        TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId));
×
2276
      }
2277
    }
2278
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[vnIndex]));
×
2279
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg));
×
2280

2281
    if (newVg.replica == 1) {
×
2282
      if (force && !unsafe) {
×
2283
        TAOS_RETURN(TSDB_CODE_VND_META_DATA_UNSAFE_DELETE);
×
2284
      }
2285

2286
      SSdb *pSdb = pMnode->pSdb;
×
2287
      void *pIter = NULL;
×
2288

2289
      while (1) {
×
2290
        SStbObj *pStb = NULL;
×
2291
        pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
×
2292
        if (pIter == NULL) break;
×
2293

2294
        if (strcmp(pStb->db, pDb->name) == 0) {
×
2295
          if ((code = mndSetForceDropCreateStbRedoActions(pMnode, pTrans, &newVg, pStb)) != 0) {
×
2296
            sdbCancelFetch(pSdb, pIter);
×
2297
            sdbRelease(pSdb, pStb);
×
2298
            TAOS_RETURN(code);
×
2299
          }
2300
        }
2301

2302
        sdbRelease(pSdb, pStb);
×
2303
      }
2304

2305
      mInfo("vgId:%d, all data is dropped since replica=1", pVgroup->vgId);
×
2306
    }
2307
  }
2308

2309
  {
2310
    SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
14,904✔
2311
    if (pRaw == NULL) {
14,904✔
2312
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2313
      if (terrno != 0) code = terrno;
×
2314
      TAOS_RETURN(code);
×
2315
    }
2316
    if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
14,904✔
2317
      sdbFreeRaw(pRaw);
×
2318
      TAOS_RETURN(code);
×
2319
    }
2320
    code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
14,904✔
2321
    if (code != 0) {
14,904✔
2322
      mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2323
      return code;
×
2324
    }
2325
  }
2326

2327
  mInfo("vgId:%d, vgroup info after move, replica:%d", newVg.vgId, newVg.replica);
14,904✔
2328
  for (int32_t i = 0; i < newVg.replica; ++i) {
48,146✔
2329
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
33,242✔
2330
  }
2331
  TAOS_RETURN(code);
14,904✔
2332
}
2333

2334
int32_t mndSetMoveVgroupsInfoToTrans(SMnode *pMnode, STrans *pTrans, int32_t delDnodeId, bool force, bool unsafe) {
7,450✔
2335
  int32_t code = 0;
7,450✔
2336
  SArray *pArray = mndBuildDnodesArray(pMnode, delDnodeId, NULL);
7,450✔
2337
  if (pArray == NULL) {
7,450✔
2338
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2339
    if (terrno != 0) code = terrno;
×
2340
    TAOS_RETURN(code);
×
2341
  }
2342

2343
  void *pIter = NULL;
7,450✔
2344
  while (1) {
22,197✔
2345
    SVgObj *pVgroup = NULL;
29,647✔
2346
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
29,647✔
2347
    if (pIter == NULL) break;
29,647✔
2348

2349
    int32_t vnIndex = -1;
22,197✔
2350
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
44,454✔
2351
      if (pVgroup->vnodeGid[i].dnodeId == delDnodeId) {
37,161✔
2352
        vnIndex = i;
14,904✔
2353
        break;
14,904✔
2354
      }
2355
    }
2356

2357
    code = 0;
22,197✔
2358
    if (vnIndex != -1) {
22,197✔
2359
      mInfo("vgId:%d, trans:%d, vnode:%d will be removed from dnode:%d, force:%d", pVgroup->vgId, pTrans->id, vnIndex,
14,904✔
2360
            delDnodeId, force);
2361
      SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
14,904✔
2362
      code = mndSetMoveVgroupInfoToTrans(pMnode, pTrans, pDb, pVgroup, vnIndex, pArray, force, unsafe);
14,904✔
2363
      mndReleaseDb(pMnode, pDb);
14,904✔
2364
    }
2365

2366
    sdbRelease(pMnode->pSdb, pVgroup);
22,197✔
2367

2368
    if (code != 0) {
22,197✔
2369
      sdbCancelFetch(pMnode->pSdb, pIter);
×
2370
      break;
×
2371
    }
2372
  }
2373

2374
  taosArrayDestroy(pArray);
7,450✔
2375
  TAOS_RETURN(code);
7,450✔
2376
}
2377

2378
static int32_t mndAddIncVgroupReplicaToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
67,858✔
2379
                                             int32_t newDnodeId) {
2380
  int32_t code = 0;
67,858✔
2381
  mInfo("vgId:%d, will add 1 vnode, replica:%d dnode:%d", pVgroup->vgId, pVgroup->replica, newDnodeId);
67,858✔
2382

2383
  // assoc dnode
2384
  SVnodeGid *pGid = &pVgroup->vnodeGid[pVgroup->replica];
67,858✔
2385
  pVgroup->replica++;
67,858✔
2386
  pGid->dnodeId = newDnodeId;
67,858✔
2387
  pGid->syncState = TAOS_SYNC_STATE_OFFLINE;
67,858✔
2388
  pGid->nodeRole = TAOS_SYNC_ROLE_LEARNER;
67,858✔
2389

2390
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
67,858✔
2391
  if (pVgRaw == NULL) {
67,858✔
2392
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2393
    if (terrno != 0) code = terrno;
×
2394
    TAOS_RETURN(code);
×
2395
  }
2396
  if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
67,858✔
2397
    sdbFreeRaw(pVgRaw);
×
2398
    TAOS_RETURN(code);
×
2399
  }
2400
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
67,858✔
2401
  if (code != 0) {
67,858✔
2402
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVgroup->vgId, tstrerror(code), __LINE__);
×
2403
    TAOS_RETURN(code);
×
2404
  }
2405

2406
  // learner
2407
  for (int32_t i = 0; i < pVgroup->replica - 1; ++i) {
230,320✔
2408
    TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId));
162,462✔
2409
  }
2410
  TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pGid));
67,858✔
2411

2412
  // voter
2413
  pGid->nodeRole = TAOS_SYNC_ROLE_VOTER;
67,858✔
2414
  TAOS_CHECK_RETURN(mndAddAlterVnodeTypeAction(pMnode, pTrans, pDb, pVgroup, pGid->dnodeId));
67,858✔
2415
  for (int32_t i = 0; i < pVgroup->replica - 1; ++i) {
230,320✔
2416
    TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId));
162,462✔
2417
  }
2418

2419
  // confirm
2420
  TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup));
67,858✔
2421

2422
  TAOS_RETURN(code);
67,858✔
2423
}
2424

2425
static int32_t mndAddDecVgroupReplicaFromTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
67,858✔
2426
                                               int32_t delDnodeId) {
2427
  int32_t code = 0;
67,858✔
2428
  mInfo("vgId:%d, will remove 1 vnode, replica:%d dnode:%d", pVgroup->vgId, pVgroup->replica, delDnodeId);
67,858✔
2429

2430
  SVnodeGid *pGid = NULL;
67,858✔
2431
  SVnodeGid  delGid = {0};
67,858✔
2432
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
117,103✔
2433
    if (pVgroup->vnodeGid[i].dnodeId == delDnodeId) {
117,103✔
2434
      pGid = &pVgroup->vnodeGid[i];
67,858✔
2435
      break;
67,858✔
2436
    }
2437
  }
2438

2439
  if (pGid == NULL) return 0;
67,858✔
2440

2441
  pVgroup->replica--;
67,858✔
2442
  memcpy(&delGid, pGid, sizeof(SVnodeGid));
67,858✔
2443
  memcpy(pGid, &pVgroup->vnodeGid[pVgroup->replica], sizeof(SVnodeGid));
67,858✔
2444
  memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
67,858✔
2445

2446
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
67,858✔
2447
  if (pVgRaw == NULL) {
67,858✔
2448
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2449
    if (terrno != 0) code = terrno;
×
2450
    TAOS_RETURN(code);
×
2451
  }
2452
  if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
67,858✔
2453
    sdbFreeRaw(pVgRaw);
×
2454
    TAOS_RETURN(code);
×
2455
  }
2456
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
67,858✔
2457
  if (code != 0) {
67,858✔
2458
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVgroup->vgId, tstrerror(code), __LINE__);
×
2459
    TAOS_RETURN(code);
×
2460
  }
2461

2462
  TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pDb, pVgroup, &delGid, true));
67,858✔
2463
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
230,320✔
2464
    TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId));
162,462✔
2465
  }
2466
  TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup));
67,858✔
2467

2468
  TAOS_RETURN(code);
67,858✔
2469
}
2470

2471
static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup, SDnodeObj *pNew1,
37,815✔
2472
                                     SDnodeObj *pOld1, SDnodeObj *pNew2, SDnodeObj *pOld2, SDnodeObj *pNew3,
2473
                                     SDnodeObj *pOld3) {
2474
  int32_t code = -1;
37,815✔
2475
  STrans *pTrans = NULL;
37,815✔
2476

2477
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "red-vgroup");
37,815✔
2478
  if (pTrans == NULL) {
37,815✔
2479
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2480
    if (terrno != 0) code = terrno;
×
2481
    goto _OVER;
×
2482
  }
2483

2484
  mndTransSetDbName(pTrans, pVgroup->dbName, NULL);
37,815✔
2485
  TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
37,815✔
2486
  TAOS_CHECK_GOTO(mndTransCheckConflictWithRetention(pMnode, pTrans), NULL, _OVER);
37,748✔
2487

2488
  mndTransSetSerial(pTrans);
37,748✔
2489
  mInfo("trans:%d, used to redistribute vgroup, vgId:%d", pTrans->id, pVgroup->vgId);
37,748✔
2490

2491
  SVgObj newVg = {0};
37,748✔
2492
  memcpy(&newVg, pVgroup, sizeof(SVgObj));
37,748✔
2493
  mInfo("vgId:%d, vgroup info before redistribute, replica:%d", newVg.vgId, newVg.replica);
37,748✔
2494
  for (int32_t i = 0; i < newVg.replica; ++i) {
127,320✔
2495
    mInfo("vgId:%d, vnode:%d dnode:%d role:%s", newVg.vgId, i, newVg.vnodeGid[i].dnodeId,
89,572✔
2496
          syncStr(newVg.vnodeGid[i].syncState));
2497
  }
2498

2499
  if (pNew1 != NULL && pOld1 != NULL) {
37,748✔
2500
    int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew1->id);
37,748✔
2501
    if (numOfVnodes >= pNew1->numOfSupportVnodes) {
37,748✔
2502
      mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew1->id, numOfVnodes,
700✔
2503
             pNew1->numOfSupportVnodes);
2504
      code = TSDB_CODE_MND_NO_ENOUGH_VNODES;
700✔
2505
      goto _OVER;
700✔
2506
    }
2507

2508
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
37,048✔
2509
    if (pNew1->memAvail - vgMem - pNew1->memUsed <= 0) {
37,048✔
2510
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
2511
             pVgroup->dbName, pVgroup->vgId, vgMem, pNew1->id, pNew1->memAvail, pNew1->memUsed);
2512
      code = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
×
2513
      goto _OVER;
×
2514
    } else {
2515
      pNew1->memUsed += vgMem;
37,048✔
2516
    }
2517

2518
    TAOS_CHECK_GOTO(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew1->id), NULL, _OVER);
37,048✔
2519
    TAOS_CHECK_GOTO(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld1->id), NULL, _OVER);
37,048✔
2520
  }
2521

2522
  if (pNew2 != NULL && pOld2 != NULL) {
37,048✔
2523
    int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew2->id);
11,182✔
2524
    if (numOfVnodes >= pNew2->numOfSupportVnodes) {
11,182✔
2525
      mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew2->id, numOfVnodes,
×
2526
             pNew2->numOfSupportVnodes);
2527
      code = TSDB_CODE_MND_NO_ENOUGH_VNODES;
×
2528
      goto _OVER;
×
2529
    }
2530
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
11,182✔
2531
    if (pNew2->memAvail - vgMem - pNew2->memUsed <= 0) {
11,182✔
2532
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
2533
             pVgroup->dbName, pVgroup->vgId, vgMem, pNew2->id, pNew2->memAvail, pNew2->memUsed);
2534
      code = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
×
2535
      goto _OVER;
×
2536
    } else {
2537
      pNew2->memUsed += vgMem;
11,182✔
2538
    }
2539
    TAOS_CHECK_GOTO(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew2->id), NULL, _OVER);
11,182✔
2540
    TAOS_CHECK_GOTO(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld2->id), NULL, _OVER);
11,182✔
2541
  }
2542

2543
  if (pNew3 != NULL && pOld3 != NULL) {
37,048✔
2544
    int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew3->id);
4,396✔
2545
    if (numOfVnodes >= pNew3->numOfSupportVnodes) {
4,396✔
2546
      mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew3->id, numOfVnodes,
×
2547
             pNew3->numOfSupportVnodes);
2548
      code = TSDB_CODE_MND_NO_ENOUGH_VNODES;
×
2549
      goto _OVER;
×
2550
    }
2551
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
4,396✔
2552
    if (pNew3->memAvail - vgMem - pNew3->memUsed <= 0) {
4,396✔
2553
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
2554
             pVgroup->dbName, pVgroup->vgId, vgMem, pNew3->id, pNew3->memAvail, pNew3->memUsed);
2555
      code = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
×
2556
      goto _OVER;
×
2557
    } else {
2558
      pNew3->memUsed += vgMem;
4,396✔
2559
    }
2560
    TAOS_CHECK_GOTO(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew3->id), NULL, _OVER);
4,396✔
2561
    TAOS_CHECK_GOTO(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld3->id), NULL, _OVER);
4,396✔
2562
  }
2563

2564
  {
2565
    SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
37,048✔
2566
    if (pRaw == NULL) {
37,048✔
2567
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2568
      if (terrno != 0) code = terrno;
×
2569
      goto _OVER;
×
2570
    }
2571
    if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
37,048✔
2572
      sdbFreeRaw(pRaw);
×
2573
      goto _OVER;
×
2574
    }
2575
    code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
37,048✔
2576
    if (code != 0) {
37,048✔
2577
      mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2578
      goto _OVER;
×
2579
    }
2580
  }
2581

2582
  mInfo("vgId:%d, vgroup info after redistribute, replica:%d", newVg.vgId, newVg.replica);
37,048✔
2583
  for (int32_t i = 0; i < newVg.replica; ++i) {
124,520✔
2584
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
87,472✔
2585
  }
2586

2587
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
37,048✔
2588
  code = 0;
36,378✔
2589

2590
_OVER:
37,815✔
2591
  mndTransDrop(pTrans);
37,815✔
2592
  mndReleaseDb(pMnode, pDb);
37,815✔
2593
  TAOS_RETURN(code);
37,815✔
2594
}
2595

2596
static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) {
45,839✔
2597
  SMnode    *pMnode = pReq->info.node;
45,839✔
2598
  SDnodeObj *pNew1 = NULL;
45,839✔
2599
  SDnodeObj *pNew2 = NULL;
45,839✔
2600
  SDnodeObj *pNew3 = NULL;
45,839✔
2601
  SDnodeObj *pOld1 = NULL;
45,839✔
2602
  SDnodeObj *pOld2 = NULL;
45,839✔
2603
  SDnodeObj *pOld3 = NULL;
45,839✔
2604
  SVgObj    *pVgroup = NULL;
45,839✔
2605
  SDbObj    *pDb = NULL;
45,839✔
2606
  int32_t    code = -1;
45,839✔
2607
  int64_t    curMs = taosGetTimestampMs();
45,839✔
2608
  int32_t    newDnodeId[3] = {0};
45,839✔
2609
  int32_t    oldDnodeId[3] = {0};
45,839✔
2610
  int32_t    newIndex = -1;
45,839✔
2611
  int32_t    oldIndex = -1;
45,839✔
2612
  int64_t    tss = taosGetTimestampMs();
45,839✔
2613

2614
  SRedistributeVgroupReq req = {0};
45,839✔
2615
  if (tDeserializeSRedistributeVgroupReq(pReq->pCont, pReq->contLen, &req) != 0) {
45,839✔
2616
    code = TSDB_CODE_INVALID_MSG;
×
2617
    goto _OVER;
×
2618
  }
2619

2620
  mInfo("vgId:%d, start to redistribute vgroup to dnode %d:%d:%d", req.vgId, req.dnodeId1, req.dnodeId2, req.dnodeId3);
45,839✔
2621
  if ((code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_REDISTRIBUTE_VGROUP)) != 0) {
45,839✔
2622
    goto _OVER;
×
2623
  }
2624

2625
  pVgroup = mndAcquireVgroup(pMnode, req.vgId);
45,839✔
2626
  if (pVgroup == NULL) {
45,839✔
2627
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
2,100✔
2628
    if (terrno != 0) code = terrno;
2,100✔
2629
    goto _OVER;
2,100✔
2630
  }
2631
  if (pVgroup->mountVgId) {
43,739✔
2632
    code = TSDB_CODE_MND_MOUNT_OBJ_NOT_SUPPORT;
×
2633
    goto _OVER;
×
2634
  }
2635
  pDb = mndAcquireDb(pMnode, pVgroup->dbName);
43,739✔
2636
  if (pDb == NULL) {
43,739✔
2637
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2638
    if (terrno != 0) code = terrno;
×
2639
    goto _OVER;
×
2640
  }
2641

2642
  if (pVgroup->replica == 1) {
43,739✔
2643
    if (req.dnodeId1 <= 0 || req.dnodeId2 > 0 || req.dnodeId3 > 0) {
11,182✔
2644
      code = TSDB_CODE_MND_INVALID_REPLICA;
×
2645
      goto _OVER;
×
2646
    }
2647

2648
    if (req.dnodeId1 == pVgroup->vnodeGid[0].dnodeId) {
11,182✔
2649
      // terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
2650
      code = 0;
×
2651
      goto _OVER;
×
2652
    }
2653

2654
    pNew1 = mndAcquireDnode(pMnode, req.dnodeId1);
11,182✔
2655
    if (pNew1 == NULL) {
11,182✔
2656
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2657
      if (terrno != 0) code = terrno;
×
2658
      goto _OVER;
×
2659
    }
2660
    if (!mndIsDnodeOnline(pNew1, curMs)) {
11,182✔
2661
      code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2662
      goto _OVER;
×
2663
    }
2664

2665
    pOld1 = mndAcquireDnode(pMnode, pVgroup->vnodeGid[0].dnodeId);
11,182✔
2666
    if (pOld1 == NULL) {
11,182✔
2667
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2668
      if (terrno != 0) code = terrno;
×
2669
      goto _OVER;
×
2670
    }
2671
    if (!mndIsDnodeOnline(pOld1, curMs)) {
11,182✔
2672
      code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2673
      goto _OVER;
×
2674
    }
2675

2676
    code = mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, NULL, NULL, NULL, NULL);
11,182✔
2677

2678
  } else if (pVgroup->replica == 3) {
32,557✔
2679
    if (req.dnodeId1 <= 0 || req.dnodeId2 <= 0 || req.dnodeId3 <= 0) {
31,115✔
2680
      code = TSDB_CODE_MND_INVALID_REPLICA;
2,800✔
2681
      goto _OVER;
2,800✔
2682
    }
2683

2684
    if (req.dnodeId1 == req.dnodeId2 || req.dnodeId1 == req.dnodeId3 || req.dnodeId2 == req.dnodeId3) {
28,315✔
2685
      code = TSDB_CODE_MND_INVALID_REPLICA;
700✔
2686
      goto _OVER;
700✔
2687
    }
2688

2689
    if (req.dnodeId1 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId1 != pVgroup->vnodeGid[1].dnodeId &&
27,615✔
2690
        req.dnodeId1 != pVgroup->vnodeGid[2].dnodeId) {
12,889✔
2691
      newDnodeId[++newIndex] = req.dnodeId1;
11,489✔
2692
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
11,489✔
2693
    }
2694

2695
    if (req.dnodeId2 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[1].dnodeId &&
27,615✔
2696
        req.dnodeId2 != pVgroup->vnodeGid[2].dnodeId) {
18,481✔
2697
      newDnodeId[++newIndex] = req.dnodeId2;
13,412✔
2698
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
13,412✔
2699
    }
2700

2701
    if (req.dnodeId3 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId3 != pVgroup->vnodeGid[1].dnodeId &&
27,615✔
2702
        req.dnodeId3 != pVgroup->vnodeGid[2].dnodeId) {
21,846✔
2703
      newDnodeId[++newIndex] = req.dnodeId3;
17,646✔
2704
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
17,646✔
2705
    }
2706

2707
    if (req.dnodeId1 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[0].dnodeId &&
27,615✔
2708
        req.dnodeId3 != pVgroup->vnodeGid[0].dnodeId) {
18,393✔
2709
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[0].dnodeId;
14,225✔
2710
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
14,225✔
2711
    }
2712

2713
    if (req.dnodeId1 != pVgroup->vnodeGid[1].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[1].dnodeId &&
27,615✔
2714
        req.dnodeId3 != pVgroup->vnodeGid[1].dnodeId) {
12,977✔
2715
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[1].dnodeId;
11,376✔
2716
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
11,376✔
2717
    }
2718

2719
    if (req.dnodeId1 != pVgroup->vnodeGid[2].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[2].dnodeId &&
27,615✔
2720
        req.dnodeId3 != pVgroup->vnodeGid[2].dnodeId) {
21,146✔
2721
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[2].dnodeId;
16,946✔
2722
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
16,946✔
2723
    }
2724

2725
    if (newDnodeId[0] != 0) {
27,615✔
2726
      pNew1 = mndAcquireDnode(pMnode, newDnodeId[0]);
26,731✔
2727
      if (pNew1 == NULL) {
26,731✔
2728
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2729
        if (terrno != 0) code = terrno;
×
2730
        goto _OVER;
×
2731
      }
2732
      if (!mndIsDnodeOnline(pNew1, curMs)) {
26,731✔
2733
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
978✔
2734
        goto _OVER;
978✔
2735
      }
2736
    }
2737

2738
    if (newDnodeId[1] != 0) {
26,637✔
2739
      pNew2 = mndAcquireDnode(pMnode, newDnodeId[1]);
10,302✔
2740
      if (pNew2 == NULL) {
10,302✔
2741
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2742
        if (terrno != 0) code = terrno;
×
2743
        goto _OVER;
×
2744
      }
2745
      if (!mndIsDnodeOnline(pNew2, curMs)) {
10,302✔
2746
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2747
        goto _OVER;
×
2748
      }
2749
    }
2750

2751
    if (newDnodeId[2] != 0) {
26,637✔
2752
      pNew3 = mndAcquireDnode(pMnode, newDnodeId[2]);
4,958✔
2753
      if (pNew3 == NULL) {
4,958✔
2754
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2755
        if (terrno != 0) code = terrno;
×
2756
        goto _OVER;
×
2757
      }
2758
      if (!mndIsDnodeOnline(pNew3, curMs)) {
4,958✔
UNCOV
2759
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
UNCOV
2760
        goto _OVER;
×
2761
      }
2762
    }
2763

2764
    if (oldDnodeId[0] != 0) {
26,637✔
2765
      pOld1 = mndAcquireDnode(pMnode, oldDnodeId[0]);
25,753✔
2766
      if (pOld1 == NULL) {
25,753✔
2767
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2768
        if (terrno != 0) code = terrno;
×
2769
        goto _OVER;
×
2770
      }
2771
      if (!mndIsDnodeOnline(pOld1, curMs)) {
25,753✔
2772
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
562✔
2773
        goto _OVER;
562✔
2774
      }
2775
    }
2776

2777
    if (oldDnodeId[1] != 0) {
26,075✔
2778
      pOld2 = mndAcquireDnode(pMnode, oldDnodeId[1]);
9,740✔
2779
      if (pOld2 == NULL) {
9,740✔
2780
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2781
        if (terrno != 0) code = terrno;
×
2782
        goto _OVER;
×
2783
      }
2784
      if (!mndIsDnodeOnline(pOld2, curMs)) {
9,740✔
2785
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2786
        goto _OVER;
×
2787
      }
2788
    }
2789

2790
    if (oldDnodeId[2] != 0) {
26,075✔
2791
      pOld3 = mndAcquireDnode(pMnode, oldDnodeId[2]);
4,396✔
2792
      if (pOld3 == NULL) {
4,396✔
2793
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2794
        if (terrno != 0) code = terrno;
×
2795
        goto _OVER;
×
2796
      }
2797
      if (!mndIsDnodeOnline(pOld3, curMs)) {
4,396✔
2798
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2799
        goto _OVER;
×
2800
      }
2801
    }
2802

2803
    if (pNew1 == NULL && pOld1 == NULL && pNew2 == NULL && pOld2 == NULL && pNew3 == NULL && pOld3 == NULL) {
26,075✔
2804
      // terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
2805
      code = 0;
884✔
2806
      goto _OVER;
884✔
2807
    }
2808

2809
    code = mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, pNew2, pOld2, pNew3, pOld3);
25,191✔
2810

2811
  } else if (pVgroup->replica == 2) {
1,442✔
2812
    if (req.dnodeId1 <= 0 || req.dnodeId2 <= 0) {
1,442✔
2813
      code = TSDB_CODE_MND_INVALID_REPLICA;
×
2814
      goto _OVER;
×
2815
    }
2816

2817
    if (req.dnodeId1 == req.dnodeId2) {
1,442✔
2818
      code = TSDB_CODE_MND_INVALID_REPLICA;
×
2819
      goto _OVER;
×
2820
    }
2821

2822
    if (req.dnodeId1 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId1 != pVgroup->vnodeGid[1].dnodeId) {
1,442✔
2823
      newDnodeId[++newIndex] = req.dnodeId1;
1,442✔
2824
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
1,442✔
2825
    }
2826

2827
    if (req.dnodeId2 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[1].dnodeId) {
1,442✔
2828
      newDnodeId[++newIndex] = req.dnodeId2;
1,442✔
2829
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
1,442✔
2830
    }
2831

2832
    if (req.dnodeId1 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[0].dnodeId) {
1,442✔
2833
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[0].dnodeId;
1,442✔
2834
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
1,442✔
2835
    }
2836

2837
    if (req.dnodeId1 != pVgroup->vnodeGid[1].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[1].dnodeId) {
1,442✔
2838
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[1].dnodeId;
1,442✔
2839
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
1,442✔
2840
    }
2841

2842
    if (newDnodeId[0] != 0) {
1,442✔
2843
      pNew1 = mndAcquireDnode(pMnode, newDnodeId[0]);
1,442✔
2844
      if (pNew1 == NULL) {
1,442✔
2845
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2846
        if (terrno != 0) code = terrno;
×
2847
        goto _OVER;
×
2848
      }
2849
      if (!mndIsDnodeOnline(pNew1, curMs)) {
1,442✔
2850
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2851
        goto _OVER;
×
2852
      }
2853
    }
2854

2855
    if (newDnodeId[1] != 0) {
1,442✔
2856
      pNew2 = mndAcquireDnode(pMnode, newDnodeId[1]);
1,442✔
2857
      if (pNew2 == NULL) {
1,442✔
2858
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2859
        if (terrno != 0) code = terrno;
×
2860
        goto _OVER;
×
2861
      }
2862
      if (!mndIsDnodeOnline(pNew2, curMs)) {
1,442✔
2863
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2864
        goto _OVER;
×
2865
      }
2866
    }
2867

2868
    if (oldDnodeId[0] != 0) {
1,442✔
2869
      pOld1 = mndAcquireDnode(pMnode, oldDnodeId[0]);
1,442✔
2870
      if (pOld1 == NULL) {
1,442✔
2871
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2872
        if (terrno != 0) code = terrno;
×
2873
        goto _OVER;
×
2874
      }
2875
      if (!mndIsDnodeOnline(pOld1, curMs)) {
1,442✔
2876
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2877
        goto _OVER;
×
2878
      }
2879
    }
2880

2881
    if (oldDnodeId[1] != 0) {
1,442✔
2882
      pOld2 = mndAcquireDnode(pMnode, oldDnodeId[1]);
1,442✔
2883
      if (pOld2 == NULL) {
1,442✔
2884
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2885
        if (terrno != 0) code = terrno;
×
2886
        goto _OVER;
×
2887
      }
2888
      if (!mndIsDnodeOnline(pOld2, curMs)) {
1,442✔
2889
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2890
        goto _OVER;
×
2891
      }
2892
    }
2893

2894
    if (pNew1 == NULL && pOld1 == NULL && pNew2 == NULL && pOld2 == NULL) {
1,442✔
2895
      // terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
2896
      code = 0;
×
2897
      goto _OVER;
×
2898
    }
2899

2900
    code = mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, pNew2, pOld2, NULL, NULL);
1,442✔
2901
  } else {
2902
    code = TSDB_CODE_MND_REQ_REJECTED;
×
2903
    goto _OVER;
×
2904
  }
2905

2906
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
37,815✔
2907

2908
  if (tsAuditLevel >= AUDIT_LEVEL_CLUSTER) {
37,815✔
2909
    char obj[33] = {0};
37,815✔
2910
    (void)snprintf(obj, sizeof(obj), "%d", req.vgId);
37,815✔
2911

2912
    int64_t tse = taosGetTimestampMs();
37,815✔
2913
    double  duration = (double)(tse - tss);
37,815✔
2914
    duration = duration / 1000;
37,815✔
2915
    auditRecord(pReq, pMnode->clusterId, "RedistributeVgroup", "", obj, req.sql, req.sqlLen, duration, 0);
37,815✔
2916
  }
2917
_OVER:
45,839✔
2918
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
45,839✔
2919
    mError("vgId:%d, failed to redistribute to dnode %d:%d:%d since %s", req.vgId, req.dnodeId1, req.dnodeId2,
8,577✔
2920
           req.dnodeId3, tstrerror(code));
2921
  }
2922

2923
  mndReleaseDnode(pMnode, pNew1);
45,839✔
2924
  mndReleaseDnode(pMnode, pNew2);
45,839✔
2925
  mndReleaseDnode(pMnode, pNew3);
45,839✔
2926
  mndReleaseDnode(pMnode, pOld1);
45,839✔
2927
  mndReleaseDnode(pMnode, pOld2);
45,839✔
2928
  mndReleaseDnode(pMnode, pOld3);
45,839✔
2929
  mndReleaseVgroup(pMnode, pVgroup);
45,839✔
2930
  mndReleaseDb(pMnode, pDb);
45,839✔
2931
  tFreeSRedistributeVgroupReq(&req);
45,839✔
2932

2933
  TAOS_RETURN(code);
45,839✔
2934
}
2935

2936
static void *mndBuildSForceBecomeFollowerReq(SMnode *pMnode, SVgObj *pVgroup, int32_t dnodeId, int32_t *pContLen) {
5,161✔
2937
  SForceBecomeFollowerReq balanceReq = {
5,161✔
2938
      .vgId = pVgroup->vgId,
5,161✔
2939
  };
2940

2941
  int32_t contLen = tSerializeSForceBecomeFollowerReq(NULL, 0, &balanceReq);
5,161✔
2942
  if (contLen < 0) {
5,161✔
2943
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2944
    return NULL;
×
2945
  }
2946
  contLen += sizeof(SMsgHead);
5,161✔
2947

2948
  void *pReq = taosMemoryMalloc(contLen);
5,161✔
2949
  if (pReq == NULL) {
5,161✔
2950
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2951
    return NULL;
×
2952
  }
2953

2954
  SMsgHead *pHead = pReq;
5,161✔
2955
  pHead->contLen = htonl(contLen);
5,161✔
2956
  pHead->vgId = htonl(pVgroup->vgId);
5,161✔
2957

2958
  if (tSerializeSForceBecomeFollowerReq((char *)pReq + sizeof(SMsgHead), contLen, &balanceReq) < 0) {
5,161✔
2959
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2960
    taosMemoryFree(pReq);
×
2961
    return NULL;
×
2962
  }
2963
  *pContLen = contLen;
5,161✔
2964
  return pReq;
5,161✔
2965
}
2966

2967
int32_t mndAddBalanceVgroupLeaderAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t dnodeId) {
5,161✔
2968
  int32_t    code = 0;
5,161✔
2969
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
5,161✔
2970
  if (pDnode == NULL) {
5,161✔
2971
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2972
    if (terrno != 0) code = terrno;
×
2973
    TAOS_RETURN(code);
×
2974
  }
2975

2976
  STransAction action = {0};
5,161✔
2977
  action.epSet = mndGetDnodeEpset(pDnode);
5,161✔
2978
  mndReleaseDnode(pMnode, pDnode);
5,161✔
2979

2980
  int32_t contLen = 0;
5,161✔
2981
  void   *pReq = mndBuildSForceBecomeFollowerReq(pMnode, pVgroup, dnodeId, &contLen);
5,161✔
2982
  if (pReq == NULL) {
5,161✔
2983
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2984
    if (terrno != 0) code = terrno;
×
2985
    TAOS_RETURN(code);
×
2986
  }
2987

2988
  action.pCont = pReq;
5,161✔
2989
  action.contLen = contLen;
5,161✔
2990
  action.msgType = TDMT_SYNC_FORCE_FOLLOWER;
5,161✔
2991

2992
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
5,161✔
2993
    taosMemoryFree(pReq);
×
2994
    TAOS_RETURN(code);
×
2995
  }
2996

2997
  TAOS_RETURN(code);
5,161✔
2998
}
2999

3000
static void *mndBuildAlterVnodeElectBaselineReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId,
30,966✔
3001
                                          int32_t *pContLen, int32_t ms) {
3002
  SAlterVnodeElectBaselineReq alterReq = {
30,966✔
3003
      .vgId = pVgroup->vgId,
30,966✔
3004
      .electBaseLine = ms,
3005
  };
3006

3007
  int32_t contLen = tSerializeSAlterVnodeReplicaReq(NULL, 0, &alterReq);
30,966✔
3008
  if (contLen < 0) {
30,966✔
3009
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
3010
    return NULL;
×
3011
  }
3012

3013
  void *pReq = taosMemoryMalloc(contLen);
30,966✔
3014
  if (pReq == NULL) {
30,966✔
3015
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
3016
    return NULL;
×
3017
  }
3018

3019
  if (tSerializeSAlterVnodeReplicaReq(pReq, contLen, &alterReq) < 0) {
30,966✔
3020
    mError("vgId:%d, failed to serialize alter vnode req,since %s", alterReq.vgId, terrstr());
×
3021
    taosMemoryFree(pReq);
×
3022
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
3023
    return NULL;
×
3024
  }
3025
  *pContLen = contLen;
30,966✔
3026
  return pReq;
30,966✔
3027
}
3028

3029
static int32_t mndAddAlterVnodeElectionBaselineActionToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId, int32_t ms) {
30,966✔
3030
  int32_t    code = 0;
30,966✔
3031
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
30,966✔
3032
  if (pDnode == NULL) {
30,966✔
3033
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3034
    if (terrno != 0) code = terrno;
×
3035
    TAOS_RETURN(code);
×
3036
  }
3037

3038
  STransAction action = {0};
30,966✔
3039
  action.epSet = mndGetDnodeEpset(pDnode);
30,966✔
3040
  mndReleaseDnode(pMnode, pDnode);
30,966✔
3041

3042
  int32_t contLen = 0;
30,966✔
3043
  void   *pReq = mndBuildAlterVnodeElectBaselineReq(pMnode, pDb, pVgroup, dnodeId, &contLen, ms);
30,966✔
3044
  if (pReq == NULL) {
30,966✔
3045
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3046
    if (terrno != 0) code = terrno;
×
3047
    TAOS_RETURN(code);
×
3048
  }
3049

3050
  action.pCont = pReq;
30,966✔
3051
  action.contLen = contLen;
30,966✔
3052
  action.msgType = TDMT_VND_ALTER_ELECTBASELINE;
30,966✔
3053
  action.groupId = pVgroup->vgId;
30,966✔
3054

3055
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
30,966✔
3056
    taosMemoryFree(pReq);
×
3057
    TAOS_RETURN(code);
×
3058
  }
3059

3060
  TAOS_RETURN(code);
30,966✔
3061
}
3062

3063
static int32_t mndAddAlterVgroupElectionBaselineActionToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTrans, int32_t index){
10,322✔
3064
  int32_t code = 0;
10,322✔
3065
  SSdb   *pSdb = pMnode->pSdb;
10,322✔
3066

3067
  int32_t vgid = pVgroup->vgId;
10,322✔
3068
  int8_t  replica = pVgroup->replica;
10,322✔
3069

3070
  if (pVgroup->replica <= 1) {
10,322✔
3071
    mInfo("trans:%d, vgid:%d no need to balance, replica:%d", pTrans->id, vgid, replica);
×
3072
    return -1;
×
3073
  }
3074

3075
  for(int32_t i = 0; i < 3; i++){
41,288✔
3076
    if(i == index%3){
30,966✔
3077
      mInfo("trans:%d, balance leader to dnode:%d", pTrans->id, pVgroup->vnodeGid[i].dnodeId);
5,161✔
3078
      TAOS_CHECK_RETURN(mndAddAlterVnodeElectionBaselineActionToTrans(pMnode, pTrans, NULL, pVgroup,
5,161✔
3079
                                                                      pVgroup->vnodeGid[i].dnodeId, 1500));
3080
    }
3081
    else{
3082
    TAOS_CHECK_RETURN(
25,805✔
3083
        mndAddAlterVnodeElectionBaselineActionToTrans(pMnode, pTrans, NULL, pVgroup, pVgroup->vnodeGid[i].dnodeId, 5000));
3084
    }
3085
  }
3086
  return code; 
10,322✔
3087
}
3088

3089
int32_t mndAddVgroupBalanceToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTrans, int32_t index) {
6,292✔
3090
  int32_t code = 0;
6,292✔
3091
  SSdb   *pSdb = pMnode->pSdb;
6,292✔
3092

3093
  int32_t vgid = pVgroup->vgId;
6,292✔
3094
  int8_t  replica = pVgroup->replica;
6,292✔
3095

3096
  if (pVgroup->replica <= 1) {
6,292✔
3097
    mInfo("trans:%d, vgid:%d no need to balance, replica:%d", pTrans->id, vgid, replica);
279✔
3098
    return -1;
279✔
3099
  }
3100

3101
  int32_t dnodeId = 0;
6,013✔
3102

3103
  for (int i = 0; i < replica; i++) {
11,828✔
3104
    if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEADER) {
10,976✔
3105
      dnodeId = pVgroup->vnodeGid[i].dnodeId;
5,161✔
3106
      break;
5,161✔
3107
    }
3108
  }
3109

3110
  bool       exist = false;
6,013✔
3111
  bool       online = false;
6,013✔
3112
  int64_t    curMs = taosGetTimestampMs();
6,013✔
3113
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
6,013✔
3114
  if (pDnode != NULL) {
6,013✔
3115
    exist = true;
5,161✔
3116
    online = mndIsDnodeOnline(pDnode, curMs);
5,161✔
3117
    mndReleaseDnode(pMnode, pDnode);
5,161✔
3118
  }
3119

3120
  if (exist && online) {
11,174✔
3121
    mInfo("trans:%d, vgid:%d force drop leader from dnode:%d", pTrans->id, vgid, dnodeId);    
5,161✔
3122
    TAOS_CHECK_RETURN(mndAddAlterVgroupElectionBaselineActionToTrans(pMnode, pVgroup, pTrans, index));
5,161✔
3123

3124
    if ((code = mndAddBalanceVgroupLeaderAction(pMnode, pTrans, pVgroup, dnodeId)) != 0) {
5,161✔
3125
      mError("trans:%d, vgid:%d failed to be balanced to dnode:%d", pTrans->id, vgid, dnodeId);
×
3126
      TAOS_RETURN(code);
×
3127
    }
3128

3129
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, NULL, pVgroup));
5,161✔
3130

3131
    TAOS_CHECK_RETURN(mndAddAlterVgroupElectionBaselineActionToTrans(pMnode, pVgroup, pTrans, -1));
5,161✔
3132

3133
    SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
5,161✔
3134
    if (pDb == NULL) {
5,161✔
3135
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3136
      if (terrno != 0) code = terrno;
×
3137
      mError("trans:%d, vgid:%d failed to be balanced to dnode:%d, because db not exist", pTrans->id, vgid, dnodeId);
×
3138
      TAOS_RETURN(code);
×
3139
    }
3140

3141
    mndReleaseDb(pMnode, pDb);
5,161✔
3142
  } else {
3143
    mInfo("trans:%d, vgid:%d cant be balanced to dnode:%d, exist:%d, online:%d", pTrans->id, vgid, dnodeId, exist,
852✔
3144
          online);
3145
  }
3146

3147
  TAOS_RETURN(code);
6,013✔
3148
}
3149

3150
extern int32_t mndProcessVgroupBalanceLeaderMsgImp(SRpcMsg *pReq);
3151

3152
int32_t mndProcessVgroupBalanceLeaderMsg(SRpcMsg *pReq) { return mndProcessVgroupBalanceLeaderMsgImp(pReq); }
3,454✔
3153

3154
#ifndef TD_ENTERPRISE
3155
int32_t mndProcessVgroupBalanceLeaderMsgImp(SRpcMsg *pReq) { return 0; }
3156
#endif
3157

3158
static int32_t mndCheckDnodeMemory(SMnode *pMnode, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pOldVgroup,
206,394✔
3159
                                   SVgObj *pNewVgroup, SArray *pArray) {
3160
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
621,894✔
3161
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
415,500✔
3162
    bool       inVgroup = false;
415,500✔
3163
    int64_t    oldMemUsed = 0;
415,500✔
3164
    int64_t    newMemUsed = 0;
415,500✔
3165
    mDebug("db:%s, vgId:%d, check dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName, pNewVgroup->vgId,
415,500✔
3166
           pDnode->id, pDnode->memAvail, pDnode->memUsed);
3167
    for (int32_t j = 0; j < pOldVgroup->replica; ++j) {
1,195,116✔
3168
      SVnodeGid *pVgId = &pOldVgroup->vnodeGid[j];
779,616✔
3169
      if (pDnode->id == pVgId->dnodeId) {
779,616✔
3170
        oldMemUsed = mndGetVgroupMemory(pMnode, pOldDb, pOldVgroup);
327,766✔
3171
        inVgroup = true;
327,766✔
3172
      }
3173
    }
3174
    for (int32_t j = 0; j < pNewVgroup->replica; ++j) {
1,195,116✔
3175
      SVnodeGid *pVgId = &pNewVgroup->vnodeGid[j];
779,616✔
3176
      if (pDnode->id == pVgId->dnodeId) {
779,616✔
3177
        newMemUsed = mndGetVgroupMemory(pMnode, pNewDb, pNewVgroup);
327,766✔
3178
        inVgroup = true;
327,766✔
3179
      }
3180
    }
3181

3182
    mDebug("db:%s, vgId:%d, memory in dnode:%d, oldUsed:%" PRId64 ", newUsed:%" PRId64, pNewVgroup->dbName,
415,500✔
3183
           pNewVgroup->vgId, pDnode->id, oldMemUsed, newMemUsed);
3184

3185
    pDnode->memUsed = pDnode->memUsed - oldMemUsed + newMemUsed;
415,500✔
3186
    if (pDnode->memAvail - pDnode->memUsed <= 0) {
415,500✔
3187
      mError("db:%s, vgId:%d, no enough memory in dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName,
×
3188
             pNewVgroup->vgId, pDnode->id, pDnode->memAvail, pDnode->memUsed);
3189
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE);
×
3190
    } else if (inVgroup) {
415,500✔
3191
      mInfo("db:%s, vgId:%d, memory in dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName, pNewVgroup->vgId,
327,766✔
3192
            pDnode->id, pDnode->memAvail, pDnode->memUsed);
3193
    } else {
3194
    }
3195
  }
3196
  return 0;
206,394✔
3197
}
3198

3199
int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
244,068✔
3200
                                  SArray *pArray, SVgObj *pNewVgroup) {
3201
  int32_t code = 0;
244,068✔
3202
  memcpy(pNewVgroup, pVgroup, sizeof(SVgObj));
244,068✔
3203

3204
  if (pVgroup->replica <= 0 || pVgroup->replica == pNewDb->cfg.replications) {
244,068✔
3205
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfigAction(pMnode, pTrans, pNewDb, pVgroup));
206,394✔
3206
    TAOS_CHECK_RETURN(mndCheckDnodeMemory(pMnode, pOldDb, pNewDb, pNewVgroup, pVgroup, pArray));
206,394✔
3207
    return 0;
206,394✔
3208
  }
3209

3210
  // mndTransSetGroupParallel(pTrans);
3211

3212
  if (pNewDb->cfg.replications == 3) {
37,674✔
3213
    mInfo("trans:%d, db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pTrans->id, pVgroup->dbName, pVgroup->vgId,
32,710✔
3214
          pVgroup->vnodeGid[0].dnodeId);
3215

3216
    // add second
3217
    if (pNewVgroup->replica == 1) {
32,710✔
3218
      TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
32,710✔
3219
    }
3220

3221
    // learner stage
3222
    pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
31,988✔
3223
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
31,988✔
3224
    TAOS_CHECK_RETURN(
31,988✔
3225
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3226

3227
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &pNewVgroup->vnodeGid[1]));
31,988✔
3228

3229
    // follower stage
3230
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
31,988✔
3231
    TAOS_CHECK_RETURN(mndAddAlterVnodeTypeAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
31,988✔
3232
    TAOS_CHECK_RETURN(
31,988✔
3233
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3234

3235
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
31,988✔
3236

3237
    // add third
3238
    if (pNewVgroup->replica == 2) {
31,988✔
3239
      TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
31,988✔
3240
    }
3241

3242
    pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
29,390✔
3243
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
29,390✔
3244
    pNewVgroup->vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
29,390✔
3245
    TAOS_CHECK_RETURN(
29,390✔
3246
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3247
    TAOS_CHECK_RETURN(
29,390✔
3248
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
3249
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &pNewVgroup->vnodeGid[2]));
29,390✔
3250

3251
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
29,390✔
3252
  } else if (pNewDb->cfg.replications == 1) {
4,964✔
3253
    mInfo("trans:%d, db:%s, vgId:%d, will remove 2 vnodes, vn:0 dnode:%d vn:1 dnode:%d vn:2 dnode:%d", pTrans->id,
3,520✔
3254
          pVgroup->dbName, pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId, pVgroup->vnodeGid[1].dnodeId,
3255
          pVgroup->vnodeGid[2].dnodeId);
3256

3257
    SVnodeGid del1 = {0};
3,520✔
3258
    SVnodeGid del2 = {0};
3,520✔
3259
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroup(pMnode, pTrans, pNewVgroup, pArray, &del1));
3,520✔
3260
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &del1, true));
3,520✔
3261
    TAOS_CHECK_RETURN(
3,520✔
3262
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3263
    TAOS_CHECK_RETURN(
3,520✔
3264
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
3265
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
3,520✔
3266

3267
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroup(pMnode, pTrans, pNewVgroup, pArray, &del2));
3,520✔
3268
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &del2, true));
3,520✔
3269
    TAOS_CHECK_RETURN(
3,520✔
3270
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3271
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
3,520✔
3272
  } else if (pNewDb->cfg.replications == 2) {
1,444✔
3273
    mInfo("trans:%d, db:%s, vgId:%d, will add 1 vnode, vn:0 dnode:%d", pTrans->id, pVgroup->dbName, pVgroup->vgId,
1,444✔
3274
          pVgroup->vnodeGid[0].dnodeId);
3275

3276
    // add second
3277
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
1,444✔
3278

3279
    // learner stage
3280
    pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
1,444✔
3281
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
1,444✔
3282
    TAOS_CHECK_RETURN(
1,444✔
3283
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3284

3285
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &pNewVgroup->vnodeGid[1]));
1,444✔
3286

3287
    // follower stage
3288
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
1,444✔
3289
    TAOS_CHECK_RETURN(mndAddAlterVnodeTypeAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
1,444✔
3290
    TAOS_CHECK_RETURN(
1,444✔
3291
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3292

3293
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
1,444✔
3294
  } else {
3295
    return -1;
×
3296
  }
3297

3298
  mndSortVnodeGid(pNewVgroup);
34,354✔
3299

3300
  {
3301
    SSdbRaw *pVgRaw = mndVgroupActionEncode(pNewVgroup);
34,354✔
3302
    if (pVgRaw == NULL) {
34,354✔
3303
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3304
      if (terrno != 0) code = terrno;
×
3305
      TAOS_RETURN(code);
×
3306
    }
3307
    if ((code = mndTransAppendCommitlog(pTrans, pVgRaw)) != 0) {
34,354✔
3308
      sdbFreeRaw(pVgRaw);
×
3309
      TAOS_RETURN(code);
×
3310
    }
3311
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
34,354✔
3312
    if (code != 0) {
34,354✔
3313
      mError("vgId:%d, failed to set raw status since %s at line:%d", pNewVgroup->vgId, tstrerror(code), __LINE__);
×
3314
      TAOS_RETURN(code);
×
3315
    }
3316
  }
3317

3318
  TAOS_RETURN(code);
34,354✔
3319
}
3320

3321
int32_t mndBuildRaftAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
×
3322
                                      SArray *pArray) {
3323
  int32_t code = 0;
×
3324
  SVgObj  newVgroup = {0};
×
3325
  memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
×
3326

3327
  if (pVgroup->replica <= 0 || pVgroup->replica == pNewDb->cfg.replications) {
×
3328
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfigAction(pMnode, pTrans, pNewDb, pVgroup));
×
3329
    TAOS_CHECK_RETURN(mndCheckDnodeMemory(pMnode, pOldDb, pNewDb, &newVgroup, pVgroup, pArray));
×
3330
    return 0;
×
3331
  }
3332

3333
  mndTransSetSerial(pTrans);
×
3334

3335
  mInfo("trans:%d, vgId:%d, alter vgroup, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id, pVgroup->vgId,
×
3336
        pVgroup->syncConfChangeVer, pVgroup->version, pVgroup->replica);
3337

3338
  if (newVgroup.replica == 1 && pNewDb->cfg.replications == 3) {
×
3339
    mInfo("db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId,
×
3340
          pVgroup->vnodeGid[0].dnodeId);
3341

3342
    // add second
3343
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVgroup, pArray));
×
3344
    // add third
3345
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVgroup, pArray));
×
3346

3347
    // add learner stage
3348
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3349
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3350
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3351
    TAOS_CHECK_RETURN(
×
3352
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3353
    mInfo("trans:%d, vgId:%d, add change config, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id,
×
3354
          pVgroup->vgId, newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
3355
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[1]));
×
3356
    mInfo("trans:%d, vgId:%d, create vnode, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id, pVgroup->vgId,
×
3357
          newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
3358
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[2]));
×
3359
    mInfo("trans:%d, vgId:%d, create vnode, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id, pVgroup->vgId,
×
3360
          newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
3361

3362
    // check learner
3363
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3364
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3365
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3366
    TAOS_CHECK_RETURN(
×
3367
        mndAddCheckLearnerCatchupAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[1].dnodeId));
3368
    TAOS_CHECK_RETURN(
×
3369
        mndAddCheckLearnerCatchupAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[2].dnodeId));
3370

3371
    // change raft type
3372
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3373
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3374
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3375
    TAOS_CHECK_RETURN(
×
3376
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3377

3378
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3379

3380
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3381
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3382
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3383
    TAOS_CHECK_RETURN(
×
3384
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3385

3386
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3387

3388
    SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
×
3389
    if (pVgRaw == NULL) {
×
3390
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3391
      if (terrno != 0) code = terrno;
×
3392
      TAOS_RETURN(code);
×
3393
    }
3394
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
3395
      sdbFreeRaw(pVgRaw);
×
3396
      TAOS_RETURN(code);
×
3397
    }
3398
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3399
    if (code != 0) {
×
3400
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3401
             __LINE__);
3402
      TAOS_RETURN(code);
×
3403
    }
3404
  } else if (newVgroup.replica == 3 && pNewDb->cfg.replications == 1) {
×
3405
    mInfo("db:%s, vgId:%d, will remove 2 vnodes, vn:0 dnode:%d vn:1 dnode:%d vn:2 dnode:%d", pVgroup->dbName,
×
3406
          pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId, pVgroup->vnodeGid[1].dnodeId, pVgroup->vnodeGid[2].dnodeId);
3407

3408
    SVnodeGid del1 = {0};
×
3409
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroupWithoutSave(pMnode, pTrans, &newVgroup, pArray, &del1));
×
3410

3411
    TAOS_CHECK_RETURN(
×
3412
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3413

3414
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3415

3416
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del1, true));
×
3417

3418
    SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
×
3419
    if (pVgRaw == NULL) {
×
3420
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3421
      if (terrno != 0) code = terrno;
×
3422
      TAOS_RETURN(code);
×
3423
    }
3424
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
3425
      sdbFreeRaw(pVgRaw);
×
3426
      TAOS_RETURN(code);
×
3427
    }
3428
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3429
    if (code != 0) {
×
3430
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3431
             __LINE__);
3432
      TAOS_RETURN(code);
×
3433
    }
3434

3435
    SVnodeGid del2 = {0};
×
3436
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroupWithoutSave(pMnode, pTrans, &newVgroup, pArray, &del2));
×
3437

3438
    TAOS_CHECK_RETURN(
×
3439
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3440

3441
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3442

3443
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del2, true));
×
3444

3445
    pVgRaw = mndVgroupActionEncode(&newVgroup);
×
3446
    if (pVgRaw == NULL) {
×
3447
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3448
      if (terrno != 0) code = terrno;
×
3449
      TAOS_RETURN(code);
×
3450
    }
3451
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
3452
      sdbFreeRaw(pVgRaw);
×
3453
      TAOS_RETURN(code);
×
3454
    }
3455
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3456
    if (code != 0) {
×
3457
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3458
             __LINE__);
3459
      TAOS_RETURN(code);
×
3460
    }
3461
  } else {
3462
    return -1;
×
3463
  }
3464

3465
  mndSortVnodeGid(&newVgroup);
×
3466

3467
  {
3468
    SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
×
3469
    if (pVgRaw == NULL) {
×
3470
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3471
      if (terrno != 0) code = terrno;
×
3472
      TAOS_RETURN(code);
×
3473
    }
3474
    if ((code = mndTransAppendCommitlog(pTrans, pVgRaw)) != 0) {
×
3475
      sdbFreeRaw(pVgRaw);
×
3476
      TAOS_RETURN(code);
×
3477
    }
3478
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3479
    if (code != 0) {
×
3480
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3481
             __LINE__);
3482
      TAOS_RETURN(code);
×
3483
    }
3484
  }
3485

3486
  TAOS_RETURN(code);
×
3487
}
3488

3489
int32_t mndBuildRestoreAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *db, SVgObj *pVgroup, SDnodeObj *pDnode,
2,125✔
3490
                                         SDnodeObj *pAnotherDnode) {
3491
  int32_t code = 0;
2,125✔
3492
  SVgObj  newVgroup = {0};
2,125✔
3493
  memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
2,125✔
3494

3495
  mInfo("trans:%d, db:%s, vgId:%d, restore vnodes, vn:0 dnode:%d", pTrans->id, pVgroup->dbName, pVgroup->vgId,
2,125✔
3496
        pVgroup->vnodeGid[0].dnodeId);
3497

3498
  if (newVgroup.replica == 1) {
2,125✔
3499
    int selected = 0;
×
3500
    for (int i = 0; i < newVgroup.replica; i++) {
×
3501
      newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3502
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3503
        selected = i;
×
3504
      }
3505
    }
3506
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, &newVgroup.vnodeGid[selected]));
×
3507
  } else if (newVgroup.replica == 2) {
2,125✔
3508
    for (int i = 0; i < newVgroup.replica; i++) {
×
3509
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3510
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3511
      } else {
3512
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3513
      }
3514
    }
3515
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pAnotherDnode));
×
3516

3517
    for (int i = 0; i < newVgroup.replica; i++) {
×
3518
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3519
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3520
      } else {
3521
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3522
      }
3523
    }
3524
    TAOS_CHECK_RETURN(mndRestoreAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, pDnode));
×
3525

3526
    for (int i = 0; i < newVgroup.replica; i++) {
×
3527
      newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3528
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3529
      }
3530
    }
3531
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pDnode));
×
3532
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pAnotherDnode));
×
3533
  } else if (newVgroup.replica == 3) {
2,125✔
3534
    for (int i = 0; i < newVgroup.replica; i++) {
8,500✔
3535
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
6,375✔
3536
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_LEARNER;
2,125✔
3537
      } else {
3538
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
4,250✔
3539
      }
3540
    }
3541
    TAOS_CHECK_RETURN(mndRestoreAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, pDnode));
2,125✔
3542

3543
    for (int i = 0; i < newVgroup.replica; i++) {
8,500✔
3544
      newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
6,375✔
3545
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
6,375✔
3546
      }
3547
    }
3548
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pDnode));
2,125✔
3549
  }
3550
  SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
2,125✔
3551
  if (pVgRaw == NULL) {
2,125✔
3552
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3553
    if (terrno != 0) code = terrno;
×
3554
    TAOS_RETURN(code);
×
3555
  }
3556
  if ((code = mndTransAppendCommitlog(pTrans, pVgRaw)) != 0) {
2,125✔
3557
    sdbFreeRaw(pVgRaw);
×
3558
    TAOS_RETURN(code);
×
3559
  }
3560
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
2,125✔
3561
  if (code != 0) {
2,125✔
3562
    mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code), __LINE__);
×
3563
    TAOS_RETURN(code);
×
3564
  }
3565

3566
  TAOS_RETURN(code);
2,125✔
3567
}
3568

3569
static int32_t mndAddAdjustVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
×
3570
  return 0;
×
3571
}
3572

3573
typedef int32_t (*FpTransActionCb)(STrans *pTrans, SSdbRaw *pRaw);
3574

3575
static int32_t mndAddVgStatusAction(STrans *pTrans, SVgObj *pVg, ESdbStatus vgStatus, ETrnStage stage) {
74,722✔
3576
  int32_t         code = 0;
74,722✔
3577
  FpTransActionCb appendActionCb = (stage == TRN_STAGE_COMMIT_ACTION) ? mndTransAppendCommitlog : mndTransAppendRedolog;
74,722✔
3578
  SSdbRaw        *pRaw = mndVgroupActionEncode(pVg);
74,722✔
3579
  if (pRaw == NULL) {
74,722✔
3580
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3581
    if (terrno != 0) code = terrno;
×
3582
    goto _err;
×
3583
  }
3584
  if ((code = appendActionCb(pTrans, pRaw)) != 0) goto _err;
74,722✔
3585
  code = sdbSetRawStatus(pRaw, vgStatus);
74,722✔
3586
  if (code != 0) {
74,722✔
3587
    mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", pVg->vgId, tstrerror(code), __LINE__);
×
3588
    goto _err;
×
3589
  }
3590
  pRaw = NULL;
74,722✔
3591
  TAOS_RETURN(code);
74,722✔
3592
_err:
×
3593
  sdbFreeRaw(pRaw);
×
3594
  TAOS_RETURN(code);
×
3595
}
3596

3597
static int32_t mndAddDbStatusAction(STrans *pTrans, SDbObj *pDb, ESdbStatus dbStatus, ETrnStage stage) {
30,122✔
3598
  int32_t         code = 0;
30,122✔
3599
  FpTransActionCb appendActionCb = (stage == TRN_STAGE_COMMIT_ACTION) ? mndTransAppendCommitlog : mndTransAppendRedolog;
30,122✔
3600
  SSdbRaw        *pRaw = mndDbActionEncode(pDb);
30,122✔
3601
  if (pRaw == NULL) {
30,122✔
3602
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3603
    if (terrno != 0) code = terrno;
×
3604
    goto _err;
×
3605
  }
3606
  if ((code = appendActionCb(pTrans, pRaw)) != 0) goto _err;
30,122✔
3607
  code = sdbSetRawStatus(pRaw, dbStatus);
30,122✔
3608
  if (code != 0) {
30,122✔
3609
    mError("db:%s, failed to set raw status to ready, error:%s, line:%d", pDb->name, tstrerror(code), __LINE__);
×
3610
    goto _err;
×
3611
  }
3612
  pRaw = NULL;
30,122✔
3613
  TAOS_RETURN(code);
30,122✔
3614
_err:
×
3615
  sdbFreeRaw(pRaw);
×
3616
  TAOS_RETURN(code);
×
3617
}
3618

3619
int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) {
18,830✔
3620
  int32_t code = -1;
18,830✔
3621
  STrans *pTrans = NULL;
18,830✔
3622
  SDbObj  dbObj = {0};
18,830✔
3623
  SArray *pArray = mndBuildDnodesArray(pMnode, 0, NULL);
18,830✔
3624

3625
#if defined(USE_SHARED_STORAGE)
3626
  if (tsSsEnabled) {
18,830✔
3627
    code = TSDB_CODE_OPS_NOT_SUPPORT;
×
3628
    mError("vgId:%d, db:%s, shared storage exists, split vgroup not allowed", pVgroup->vgId, pVgroup->dbName);
×
3629
    goto _OVER;
×
3630
  }
3631
#endif
3632

3633
  /*
3634
    if (pDb->cfg.withArbitrator) {
3635
      code = TSDB_CODE_OPS_NOT_SUPPORT;
3636
      mError("vgId:%d, db:%s, with arbitrator, split vgroup not allowed", pVgroup->vgId, pVgroup->dbName);
3637
      goto _OVER;
3638
    }
3639
  */
3640

3641
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "split-vgroup");
18,830✔
3642
  if (pTrans == NULL) {
18,830✔
3643
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3644
    if (terrno != 0) code = terrno;
×
3645
    goto _OVER;
×
3646
  }
3647
  mndTransSetSerial(pTrans);
18,830✔
3648
  mInfo("trans:%d, used to split vgroup, vgId:%d", pTrans->id, pVgroup->vgId);
18,830✔
3649

3650
  mndTransSetDbName(pTrans, pDb->name, NULL);
18,830✔
3651
  TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
18,830✔
3652
  TAOS_CHECK_GOTO(mndTransCheckConflictWithRetention(pMnode, pTrans), NULL, _OVER);
16,660✔
3653

3654
  SVgObj newVg1 = {0};
16,660✔
3655
  memcpy(&newVg1, pVgroup, sizeof(SVgObj));
16,660✔
3656
  mInfo("vgId:%d, vgroup info before split, replica:%d hashBegin:%u hashEnd:%u", newVg1.vgId, newVg1.replica,
16,660✔
3657
        newVg1.hashBegin, newVg1.hashEnd);
3658
  for (int32_t i = 0; i < newVg1.replica; ++i) {
54,038✔
3659
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg1.vgId, i, newVg1.vnodeGid[i].dnodeId);
37,378✔
3660
  }
3661

3662
  if (newVg1.replica == 1) {
16,660✔
3663
    TAOS_CHECK_GOTO(mndAddVnodeToVgroup(pMnode, pTrans, &newVg1, pArray), NULL, _OVER);
5,940✔
3664

3665
    newVg1.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
5,940✔
3666
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId), NULL,
5,940✔
3667
                    _OVER);
3668
    TAOS_CHECK_GOTO(mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg1, &newVg1.vnodeGid[1]), NULL, _OVER);
5,940✔
3669

3670
    newVg1.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
5,940✔
3671
    TAOS_CHECK_GOTO(mndAddAlterVnodeTypeAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[1].dnodeId), NULL, _OVER);
5,940✔
3672
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId), NULL,
5,940✔
3673
                    _OVER);
3674

3675
    TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1), NULL, _OVER);
5,940✔
3676
  } else if (newVg1.replica == 3) {
10,720✔
3677
    SVnodeGid del1 = {0};
9,998✔
3678
    TAOS_CHECK_GOTO(mndRemoveVnodeFromVgroup(pMnode, pTrans, &newVg1, pArray, &del1), NULL, _OVER);
9,998✔
3679
    TAOS_CHECK_GOTO(mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg1, &del1, true), NULL, _OVER);
9,698✔
3680
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId), NULL,
9,698✔
3681
                    _OVER);
3682
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[1].dnodeId), NULL,
9,698✔
3683
                    _OVER);
3684
  } else {
3685
    // goto _OVER;
3686
  }
3687

3688
  for (int32_t i = 0; i < newVg1.replica; ++i) {
49,080✔
3689
    TAOS_CHECK_GOTO(mndAddDisableVnodeWriteAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[i].dnodeId), NULL,
32,720✔
3690
                    _OVER);
3691
  }
3692
  TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1), NULL, _OVER);
16,360✔
3693

3694
  SVgObj newVg2 = {0};
16,360✔
3695
  memcpy(&newVg2, &newVg1, sizeof(SVgObj));
16,360✔
3696
  newVg1.replica = 1;
16,360✔
3697
  newVg1.hashEnd = newVg1.hashBegin / 2 + newVg1.hashEnd / 2;
16,360✔
3698
  memset(&newVg1.vnodeGid[1], 0, sizeof(SVnodeGid));
16,360✔
3699

3700
  newVg2.replica = 1;
16,360✔
3701
  newVg2.hashBegin = newVg1.hashEnd + 1;
16,360✔
3702
  memcpy(&newVg2.vnodeGid[0], &newVg2.vnodeGid[1], sizeof(SVnodeGid));
16,360✔
3703
  memset(&newVg2.vnodeGid[1], 0, sizeof(SVnodeGid));
16,360✔
3704

3705
  mInfo("vgId:%d, vgroup info after split, replica:%d hashrange:[%u, %u] vnode:0 dnode:%d", newVg1.vgId, newVg1.replica,
16,360✔
3706
        newVg1.hashBegin, newVg1.hashEnd, newVg1.vnodeGid[0].dnodeId);
3707
  for (int32_t i = 0; i < newVg1.replica; ++i) {
32,720✔
3708
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg1.vgId, i, newVg1.vnodeGid[i].dnodeId);
16,360✔
3709
  }
3710
  mInfo("vgId:%d, vgroup info after split, replica:%d hashrange:[%u, %u] vnode:0 dnode:%d", newVg2.vgId, newVg2.replica,
16,360✔
3711
        newVg2.hashBegin, newVg2.hashEnd, newVg2.vnodeGid[0].dnodeId);
3712
  for (int32_t i = 0; i < newVg1.replica; ++i) {
32,720✔
3713
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg2.vgId, i, newVg2.vnodeGid[i].dnodeId);
16,360✔
3714
  }
3715

3716
  // alter vgId and hash range
3717
  int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
16,360✔
3718
  int32_t srcVgId = newVg1.vgId;
16,360✔
3719
  newVg1.vgId = maxVgId;
16,360✔
3720
  TAOS_CHECK_GOTO(mndAddNewVgPrepareAction(pMnode, pTrans, &newVg1), NULL, _OVER);
16,360✔
3721
  TAOS_CHECK_GOTO(mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg1), NULL, _OVER);
16,360✔
3722

3723
  maxVgId++;
16,360✔
3724
  srcVgId = newVg2.vgId;
16,360✔
3725
  newVg2.vgId = maxVgId;
16,360✔
3726
  TAOS_CHECK_GOTO(mndAddNewVgPrepareAction(pMnode, pTrans, &newVg2), NULL, _OVER);
16,360✔
3727
  TAOS_CHECK_GOTO(mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg2), NULL, _OVER);
16,360✔
3728

3729
  TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1), NULL, _OVER);
16,360✔
3730
  TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg2), NULL, _OVER);
16,360✔
3731

3732
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg1, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION), NULL, _OVER);
16,360✔
3733
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg2, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION), NULL, _OVER);
16,360✔
3734
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, pVgroup, SDB_STATUS_DROPPED, TRN_STAGE_REDO_ACTION), NULL, _OVER);
16,360✔
3735

3736
  // update db status
3737
  memcpy(&dbObj, pDb, sizeof(SDbObj));
16,360✔
3738
  if (dbObj.cfg.pRetensions != NULL) {
16,360✔
3739
    dbObj.cfg.pRetensions = taosArrayDup(pDb->cfg.pRetensions, NULL);
×
3740
    if (dbObj.cfg.pRetensions == NULL) {
×
3741
      code = terrno;
×
3742
      goto _OVER;
×
3743
    }
3744
  }
3745
  dbObj.vgVersion++;
16,360✔
3746
  dbObj.updateTime = taosGetTimestampMs();
16,360✔
3747
  dbObj.cfg.numOfVgroups++;
16,360✔
3748
  TAOS_CHECK_GOTO(mndAddDbStatusAction(pTrans, &dbObj, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION), NULL, _OVER);
16,360✔
3749

3750
  // adjust vgroup replica
3751
  if (pDb->cfg.replications != newVg1.replica) {
16,360✔
3752
    SVgObj tmpGroup = {0};
10,420✔
3753
    TAOS_CHECK_GOTO(mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg1, pArray, &tmpGroup), NULL, _OVER);
10,420✔
3754
  } else {
3755
    TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg1, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
5,940✔
3756
  }
3757

3758
  if (pDb->cfg.replications != newVg2.replica) {
15,487✔
3759
    SVgObj tmpGroup = {0};
9,547✔
3760
    TAOS_CHECK_GOTO(mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg2, pArray, &tmpGroup), NULL, _OVER);
9,547✔
3761
  } else {
3762
    TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg2, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
5,940✔
3763
  }
3764

3765
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, pVgroup, SDB_STATUS_DROPPED, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
13,762✔
3766

3767
  // commit db status
3768
  dbObj.vgVersion++;
13,762✔
3769
  dbObj.updateTime = taosGetTimestampMs();
13,762✔
3770
  TAOS_CHECK_GOTO(mndAddDbStatusAction(pTrans, &dbObj, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
13,762✔
3771

3772
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
13,762✔
3773
  code = 0;
13,762✔
3774

3775
_OVER:
18,830✔
3776
  taosArrayDestroy(pArray);
18,830✔
3777
  mndTransDrop(pTrans);
18,830✔
3778
  taosArrayDestroy(dbObj.cfg.pRetensions);
18,830✔
3779
  TAOS_RETURN(code);
18,830✔
3780
}
3781

3782
extern int32_t mndProcessSplitVgroupMsgImp(SRpcMsg *pReq);
3783

3784
static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) { return mndProcessSplitVgroupMsgImp(pReq); }
19,234✔
3785

3786
#ifndef TD_ENTERPRISE
3787
int32_t mndProcessSplitVgroupMsgImp(SRpcMsg *pReq) { return 0; }
3788
#endif
3789

3790
static int32_t mndSetBalanceVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
15,232✔
3791
                                              SDnodeObj *pSrc, SDnodeObj *pDst) {
3792
  int32_t code = 0;
15,232✔
3793
  SVgObj  newVg = {0};
15,232✔
3794
  memcpy(&newVg, pVgroup, sizeof(SVgObj));
15,232✔
3795
  mInfo("vgId:%d, vgroup info before balance, replica:%d", newVg.vgId, newVg.replica);
15,232✔
3796
  for (int32_t i = 0; i < newVg.replica; ++i) {
44,930✔
3797
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
29,698✔
3798
  }
3799

3800
  TAOS_CHECK_RETURN(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pDst->id));
15,232✔
3801
  TAOS_CHECK_RETURN(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pSrc->id));
15,232✔
3802

3803
  {
3804
    SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
15,232✔
3805
    if (pRaw == NULL) {
15,232✔
3806
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3807
      if (terrno != 0) code = terrno;
×
3808
      TAOS_RETURN(code);
×
3809
    }
3810
    if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
15,232✔
3811
      sdbFreeRaw(pRaw);
×
3812
      TAOS_RETURN(code);
×
3813
    }
3814
    code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
15,232✔
3815
    if (code != 0) {
15,232✔
3816
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
3817
      TAOS_RETURN(code);
×
3818
    }
3819
  }
3820

3821
  mInfo("vgId:%d, vgroup info after balance, replica:%d", newVg.vgId, newVg.replica);
15,232✔
3822
  for (int32_t i = 0; i < newVg.replica; ++i) {
44,930✔
3823
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
29,698✔
3824
  }
3825
  TAOS_RETURN(code);
15,232✔
3826
}
3827

3828
static int32_t mndBalanceVgroupBetweenDnode(SMnode *pMnode, STrans *pTrans, SDnodeObj *pSrc, SDnodeObj *pDst,
15,232✔
3829
                                            SHashObj *pBalancedVgroups) {
3830
  void   *pIter = NULL;
15,232✔
3831
  int32_t code = -1;
15,232✔
3832
  SSdb   *pSdb = pMnode->pSdb;
15,232✔
3833

3834
  while (1) {
9,397✔
3835
    SVgObj *pVgroup = NULL;
24,629✔
3836
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
24,629✔
3837
    if (pIter == NULL) break;
24,629✔
3838
    if (taosHashGet(pBalancedVgroups, &pVgroup->vgId, sizeof(int32_t)) != NULL) {
24,629✔
3839
      sdbRelease(pSdb, pVgroup);
8,671✔
3840
      continue;
8,671✔
3841
    }
3842

3843
    bool existInSrc = false;
15,958✔
3844
    bool existInDst = false;
15,958✔
3845
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
46,382✔
3846
      SVnodeGid *pGid = &pVgroup->vnodeGid[i];
30,424✔
3847
      if (pGid->dnodeId == pSrc->id) existInSrc = true;
30,424✔
3848
      if (pGid->dnodeId == pDst->id) existInDst = true;
30,424✔
3849
    }
3850

3851
    if (!existInSrc || existInDst) {
15,958✔
3852
      sdbRelease(pSdb, pVgroup);
726✔
3853
      continue;
726✔
3854
    }
3855

3856
    SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
15,232✔
3857
    if (pDb == NULL) {
15,232✔
3858
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3859
      if (terrno != 0) code = terrno;
×
3860
      mError("vgId:%d, balance vgroup can't find db obj dbName:%s", pVgroup->vgId, pVgroup->dbName);
×
3861
      goto _OUT;
×
3862
    }
3863

3864
    if (pDb->cfg.withArbitrator) {
15,232✔
3865
      mInfo("vgId:%d, db:%s, with arbitrator, balance vgroup not allowed", pVgroup->vgId, pVgroup->dbName);
×
3866
      goto _OUT;
×
3867
    }
3868

3869
    code = mndSetBalanceVgroupInfoToTrans(pMnode, pTrans, pDb, pVgroup, pSrc, pDst);
15,232✔
3870
    if (code == 0) {
15,232✔
3871
      code = taosHashPut(pBalancedVgroups, &pVgroup->vgId, sizeof(int32_t), &pVgroup->vgId, sizeof(int32_t));
15,232✔
3872
    }
3873

3874
  _OUT:
15,232✔
3875
    mndReleaseDb(pMnode, pDb);
15,232✔
3876
    sdbRelease(pSdb, pVgroup);
15,232✔
3877
    sdbCancelFetch(pSdb, pIter);
15,232✔
3878
    break;
15,232✔
3879
  }
3880

3881
  return code;
15,232✔
3882
}
3883

3884
static int32_t mndBalanceVgroup(SMnode *pMnode, SRpcMsg *pReq, SArray *pArray) {
10,193✔
3885
  int32_t   code = -1;
10,193✔
3886
  int32_t   numOfVgroups = 0;
10,193✔
3887
  STrans   *pTrans = NULL;
10,193✔
3888
  SHashObj *pBalancedVgroups = NULL;
10,193✔
3889

3890
  pBalancedVgroups = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
10,193✔
3891
  if (pBalancedVgroups == NULL) goto _OVER;
10,193✔
3892

3893
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "balance-vgroup");
10,193✔
3894
  if (pTrans == NULL) {
10,193✔
3895
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3896
    if (terrno != 0) code = terrno;
×
3897
    goto _OVER;
×
3898
  }
3899
  mndTransSetSerial(pTrans);
10,193✔
3900
  mInfo("trans:%d, used to balance vgroup", pTrans->id);
10,193✔
3901
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
10,193✔
3902
  TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
9,523✔
3903
  TAOS_CHECK_GOTO(mndTransCheckConflictWithRetention(pMnode, pTrans), NULL, _OVER);
9,456✔
3904

3905
  while (1) {
15,232✔
3906
    taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
24,688✔
3907
    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
106,670✔
3908
      SDnodeObj *pDnode = taosArrayGet(pArray, i);
81,982✔
3909
      mInfo("dnode:%d, equivalent vnodes:%d others:%d support:%d, score:%f", pDnode->id, pDnode->numOfVnodes,
81,982✔
3910
            pDnode->numOfSupportVnodes, pDnode->numOfOtherNodes, mndGetDnodeScore(pDnode, 0, 1));
3911
    }
3912

3913
    SDnodeObj *pSrc = taosArrayGet(pArray, taosArrayGetSize(pArray) - 1);
24,688✔
3914
    SDnodeObj *pDst = taosArrayGet(pArray, 0);
24,688✔
3915

3916
    float srcScore = mndGetDnodeScore(pSrc, -1, 1);
24,688✔
3917
    float dstScore = mndGetDnodeScore(pDst, 1, 1);
24,688✔
3918
    mInfo("trans:%d, after balance, src dnode:%d score:%f, dst dnode:%d score:%f", pTrans->id, pSrc->id, dstScore,
24,688✔
3919
          pDst->id, dstScore);
3920

3921
    if (srcScore > dstScore - 0.000001) {
24,688✔
3922
      code = mndBalanceVgroupBetweenDnode(pMnode, pTrans, pSrc, pDst, pBalancedVgroups);
15,232✔
3923
      if (code == 0) {
15,232✔
3924
        pSrc->numOfVnodes--;
15,232✔
3925
        pDst->numOfVnodes++;
15,232✔
3926
        numOfVgroups++;
15,232✔
3927
        continue;
15,232✔
3928
      } else {
3929
        mInfo("trans:%d, no vgroup need to balance from dnode:%d to dnode:%d", pTrans->id, pSrc->id, pDst->id);
×
3930
        break;
×
3931
      }
3932
    } else {
3933
      mInfo("trans:%d, no vgroup need to balance any more", pTrans->id);
9,456✔
3934
      break;
9,456✔
3935
    }
3936
  }
3937

3938
  if (numOfVgroups <= 0) {
9,456✔
3939
    mInfo("no need to balance vgroup");
×
3940
    code = 0;
×
3941
  } else {
3942
    mInfo("start to balance vgroup, numOfVgroups:%d", numOfVgroups);
9,456✔
3943
    if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
9,456✔
3944
    code = TSDB_CODE_ACTION_IN_PROGRESS;
9,456✔
3945
  }
3946

3947
_OVER:
10,193✔
3948
  taosHashCleanup(pBalancedVgroups);
10,193✔
3949
  mndTransDrop(pTrans);
10,193✔
3950
  TAOS_RETURN(code);
10,193✔
3951
}
3952

3953
static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) {
11,675✔
3954
  SMnode *pMnode = pReq->info.node;
11,675✔
3955
  int32_t code = -1;
11,675✔
3956
  SArray *pArray = NULL;
11,675✔
3957
  void   *pIter = NULL;
11,675✔
3958
  int64_t curMs = taosGetTimestampMs();
11,675✔
3959
  int64_t tss = taosGetTimestampMs();
11,675✔
3960

3961
  SBalanceVgroupReq req = {0};
11,675✔
3962
  if (tDeserializeSBalanceVgroupReq(pReq->pCont, pReq->contLen, &req) != 0) {
11,675✔
3963
    code = TSDB_CODE_INVALID_MSG;
×
3964
    goto _OVER;
×
3965
  }
3966

3967
  mInfo("start to balance vgroup");
11,675✔
3968
  if ((code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_BALANCE_VGROUP)) != 0) {
11,675✔
3969
    goto _OVER;
×
3970
  }
3971

3972
  if (sdbGetSize(pMnode->pSdb, SDB_MOUNT) > 0) {
11,675✔
3973
    code = TSDB_CODE_MND_MOUNT_NOT_EMPTY;
×
3974
    goto _OVER;
×
3975
  }
3976

3977
  while (1) {
35,695✔
3978
    SDnodeObj *pDnode = NULL;
47,370✔
3979
    pIter = sdbFetch(pMnode->pSdb, SDB_DNODE, pIter, (void **)&pDnode);
47,370✔
3980
    if (pIter == NULL) break;
47,370✔
3981
    if (!mndIsDnodeOnline(pDnode, curMs)) {
37,177✔
3982
      sdbCancelFetch(pMnode->pSdb, pIter);
1,482✔
3983
      code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
1,482✔
3984
      mError("failed to balance vgroup since %s, dnode:%d", terrstr(), pDnode->id);
1,482✔
3985
      sdbRelease(pMnode->pSdb, pDnode);
1,482✔
3986
      goto _OVER;
1,482✔
3987
    }
3988

3989
    sdbRelease(pMnode->pSdb, pDnode);
35,695✔
3990
  }
3991

3992
  pArray = mndBuildDnodesArray(pMnode, 0, NULL);
10,193✔
3993
  if (pArray == NULL) {
10,193✔
3994
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3995
    if (terrno != 0) code = terrno;
×
3996
    goto _OVER;
×
3997
  }
3998

3999
  if (taosArrayGetSize(pArray) < 2) {
10,193✔
4000
    mInfo("no need to balance vgroup since dnode num less than 2");
×
4001
    code = 0;
×
4002
  } else {
4003
    code = mndBalanceVgroup(pMnode, pReq, pArray);
10,193✔
4004
  }
4005

4006
  if (tsAuditLevel >= AUDIT_LEVEL_CLUSTER) {
10,193✔
4007
    int64_t tse = taosGetTimestampMs();
10,193✔
4008
    double  duration = (double)(tse - tss);
10,193✔
4009
    duration = duration / 1000;
10,193✔
4010
    auditRecord(pReq, pMnode->clusterId, "balanceVgroup", "", "", req.sql, req.sqlLen, duration, 0);
10,193✔
4011
  }
4012

4013
_OVER:
11,675✔
4014
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
11,675✔
4015
    mError("failed to balance vgroup since %s", tstrerror(code));
2,219✔
4016
  }
4017

4018
  taosArrayDestroy(pArray);
11,675✔
4019
  tFreeSBalanceVgroupReq(&req);
11,675✔
4020
  TAOS_RETURN(code);
11,675✔
4021
}
4022

4023
bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; }
121,456,270✔
4024

4025
bool mndVgroupInDnode(SVgObj *pVgroup, int32_t dnodeId) {
3,400✔
4026
  for (int i = 0; i < pVgroup->replica; i++) {
8,482✔
4027
    if (pVgroup->vnodeGid[i].dnodeId == dnodeId) return true;
7,207✔
4028
  }
4029
  return false;
1,275✔
4030
}
4031

4032
static void *mndBuildCompactVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen, int64_t compactTs,
138,843✔
4033
                                     STimeWindow tw, bool metaOnly, bool force, ETsdbOpType type,
4034
                                     ETriggerType triggerType) {
4035
  SCompactVnodeReq compactReq = {0};
138,843✔
4036
  compactReq.dbUid = pDb->uid;
138,843✔
4037
  compactReq.compactStartTime = compactTs;
138,843✔
4038
  compactReq.tw = tw;
138,843✔
4039
  compactReq.metaOnly = metaOnly;
138,843✔
4040
  compactReq.force = force;
138,843✔
4041
  compactReq.optrType = type;
138,843✔
4042
  compactReq.triggerType = triggerType;
138,843✔
4043
  tstrncpy(compactReq.db, pDb->name, TSDB_DB_FNAME_LEN);
138,843✔
4044

4045
  mInfo("vgId:%d, build compact vnode config req", pVgroup->vgId);
138,843✔
4046
  int32_t contLen = tSerializeSCompactVnodeReq(NULL, 0, &compactReq);
138,843✔
4047
  if (contLen < 0) {
138,843✔
4048
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
4049
    return NULL;
×
4050
  }
4051
  contLen += sizeof(SMsgHead);
138,843✔
4052

4053
  void *pReq = taosMemoryMalloc(contLen);
138,843✔
4054
  if (pReq == NULL) {
138,843✔
4055
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
4056
    return NULL;
×
4057
  }
4058

4059
  SMsgHead *pHead = pReq;
138,843✔
4060
  pHead->contLen = htonl(contLen);
138,843✔
4061
  pHead->vgId = htonl(pVgroup->vgId);
138,843✔
4062

4063
  if (tSerializeSCompactVnodeReq((char *)pReq + sizeof(SMsgHead), contLen, &compactReq) < 0) {
138,843✔
4064
    taosMemoryFree(pReq);
×
4065
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
4066
    return NULL;
×
4067
  }
4068
  *pContLen = contLen;
138,843✔
4069
  return pReq;
138,843✔
4070
}
4071

4072
static int32_t mndAddCompactVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs,
58,206✔
4073
                                        STimeWindow tw, bool metaOnly, bool force, ETsdbOpType type,
4074
                                        ETriggerType triggerType) {
4075
  int32_t      code = 0;
58,206✔
4076
  STransAction action = {0};
58,206✔
4077
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
58,206✔
4078

4079
  int32_t contLen = 0;
58,206✔
4080
  void   *pReq =
4081
      mndBuildCompactVnodeReq(pMnode, pDb, pVgroup, &contLen, compactTs, tw, metaOnly, force, type, triggerType);
58,206✔
4082
  if (pReq == NULL) {
58,206✔
4083
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
4084
    if (terrno != 0) code = terrno;
×
4085
    TAOS_RETURN(code);
×
4086
  }
4087

4088
  action.pCont = pReq;
58,206✔
4089
  action.contLen = contLen;
58,206✔
4090
  action.msgType = TDMT_VND_COMPACT;
58,206✔
4091

4092
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
58,206✔
4093
    taosMemoryFree(pReq);
×
4094
    TAOS_RETURN(code);
×
4095
  }
4096

4097
  TAOS_RETURN(code);
58,206✔
4098
}
4099

4100
int32_t mndBuildCompactVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs,
58,206✔
4101
                                    STimeWindow tw, bool metaOnly, bool force, ETsdbOpType type,
4102
                                    ETriggerType triggerType) {
4103
  TAOS_CHECK_RETURN(
58,206✔
4104
      mndAddCompactVnodeAction(pMnode, pTrans, pDb, pVgroup, compactTs, tw, metaOnly, force, type, triggerType));
4105
  return 0;
58,206✔
4106
}
4107

4108
int32_t mndBuildTrimVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t startTs,
80,637✔
4109
                                 STimeWindow tw, ETsdbOpType type, ETriggerType triggerType) {
4110
  int32_t      code = 0;
80,637✔
4111
  STransAction action = {0};
80,637✔
4112
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
80,637✔
4113

4114
  int32_t contLen = 0;
80,637✔
4115
  // reuse SCompactVnodeReq as SVTrimDbReq
4116
  void *pReq = mndBuildCompactVnodeReq(pMnode, pDb, pVgroup, &contLen, startTs, tw, false, false, type, triggerType);
80,637✔
4117
  if (pReq == NULL) {
80,637✔
4118
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
4119
    if (terrno != 0) code = terrno;
×
4120
    TAOS_RETURN(code);
×
4121
  }
4122

4123
  action.pCont = pReq;
80,637✔
4124
  action.contLen = contLen;
80,637✔
4125
  action.msgType = TDMT_VND_TRIM;
80,637✔
4126

4127
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
80,637✔
4128
    taosMemoryFree(pReq);
×
4129
    TAOS_RETURN(code);
×
4130
  }
4131

4132
  TAOS_RETURN(code);
80,637✔
4133
}
4134

4135
static int32_t mndProcessSetVgroupKeepVersionReq(SRpcMsg *pReq) {
1,127✔
4136
  SMnode *pMnode = pReq->info.node;
1,127✔
4137
  int32_t code = TSDB_CODE_SUCCESS;
1,127✔
4138
  STrans *pTrans = NULL;
1,127✔
4139
  SVgObj *pVgroup = NULL;
1,127✔
4140

4141
  SMndSetVgroupKeepVersionReq req = {0};
1,127✔
4142
  if (tDeserializeSMndSetVgroupKeepVersionReq(pReq->pCont, pReq->contLen, &req) != 0) {
1,127✔
4143
    code = TSDB_CODE_INVALID_MSG;
×
4144
    goto _OVER;
×
4145
  }
4146

4147
  mInfo("start to set vgroup keep version, vgId:%d, keepVersion:%" PRId64, req.vgId, req.keepVersion);
1,127✔
4148

4149
  // Check permission
4150
  if ((code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_WRITE_DB)) != 0) {
1,127✔
4151
    goto _OVER;
×
4152
  }
4153

4154
  // Get vgroup
4155
  pVgroup = mndAcquireVgroup(pMnode, req.vgId);
1,127✔
4156
  if (pVgroup == NULL) {
1,127✔
4157
    code = TSDB_CODE_MND_VGROUP_NOT_EXIST;
×
4158
    mError("vgId:%d not exist, failed to set keep version", req.vgId);
×
4159
    goto _OVER;
×
4160
  }
4161

4162
  // Create transaction
4163
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "set-vgroup-keep-version");
1,127✔
4164
  if (pTrans == NULL) {
1,127✔
4165
    code = terrno != 0 ? terrno : TSDB_CODE_MND_RETURN_VALUE_NULL;
×
4166
    mndReleaseVgroup(pMnode, pVgroup);
×
4167
    goto _OVER;
×
4168
  }
4169

4170
  mndTransSetSerial(pTrans);
1,127✔
4171
  mInfo("trans:%d, used to set vgroup keep version, vgId:%d keepVersion:%" PRId64, pTrans->id, req.vgId,
1,127✔
4172
        req.keepVersion);
4173

4174
  // Update SVgObj's keepVersion in mnode
4175
  SVgObj newVgroup = {0};
1,127✔
4176
  memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
1,127✔
4177
  newVgroup.keepVersion = req.keepVersion;
1,127✔
4178
  newVgroup.keepVersionTime = taosGetTimestampMs();
1,127✔
4179

4180
  // Add prepare log for SDB vgroup update (execute in PREPARE stage, before redo actions)
4181
  SSdbRaw *pCommitRaw = mndVgroupActionEncode(&newVgroup);
1,127✔
4182
  if (pCommitRaw == NULL) {
1,127✔
4183
    code = TSDB_CODE_OUT_OF_MEMORY;
×
4184
    mndReleaseVgroup(pMnode, pVgroup);
×
4185
    goto _OVER;
×
4186
  }
4187
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
1,127✔
4188
    code = terrno;
×
4189
    sdbFreeRaw(pCommitRaw);
×
4190
    mndReleaseVgroup(pMnode, pVgroup);
×
4191
    goto _OVER;
×
4192
  }
4193
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
1,127✔
4194
    mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", pVgroup->vgId, tstrerror(code), __LINE__);
×
4195
    sdbFreeRaw(pCommitRaw);
×
4196
    mndReleaseVgroup(pMnode, pVgroup);
×
4197
    goto _OVER;
×
4198
  }
4199

4200
  // Prepare message for vnodes
4201
  SVndSetKeepVersionReq vndReq = {.keepVersion = req.keepVersion};
1,127✔
4202
  int32_t               reqLen = tSerializeSVndSetKeepVersionReq(NULL, 0, &vndReq);
1,127✔
4203
  int32_t               contLen = reqLen + sizeof(SMsgHead);
1,127✔
4204

4205
  // Send to all replicas of the vgroup
4206
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
4,508✔
4207
    SMsgHead *pHead = taosMemoryMalloc(contLen);
3,381✔
4208
    if (pHead == NULL) {
3,381✔
4209
      code = TSDB_CODE_OUT_OF_MEMORY;
×
4210
      mndReleaseVgroup(pMnode, pVgroup);
×
4211
      goto _OVER;
×
4212
    }
4213

4214
    pHead->contLen = htonl(contLen);
3,381✔
4215
    pHead->vgId = htonl(pVgroup->vgId);
3,381✔
4216

4217
    if (tSerializeSVndSetKeepVersionReq((char *)pHead + sizeof(SMsgHead), reqLen, &vndReq) < 0) {
3,381✔
4218
      taosMemoryFree(pHead);
×
4219
      code = TSDB_CODE_OUT_OF_MEMORY;
×
4220
      mndReleaseVgroup(pMnode, pVgroup);
×
4221
      goto _OVER;
×
4222
    }
4223

4224
    // Get dnode and add action to transaction
4225
    SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgroup->vnodeGid[i].dnodeId);
3,381✔
4226
    if (pDnode == NULL) {
3,381✔
4227
      taosMemoryFree(pHead);
×
4228
      code = TSDB_CODE_MND_DNODE_NOT_EXIST;
×
4229
      mndReleaseVgroup(pMnode, pVgroup);
×
4230
      goto _OVER;
×
4231
    }
4232

4233
    STransAction action = {0};
3,381✔
4234
    action.epSet = mndGetDnodeEpset(pDnode);
3,381✔
4235
    mndReleaseDnode(pMnode, pDnode);
3,381✔
4236
    action.pCont = pHead;
3,381✔
4237
    action.contLen = contLen;
3,381✔
4238
    action.msgType = TDMT_VND_SET_KEEP_VERSION;
3,381✔
4239
    action.acceptableCode = TSDB_CODE_VND_STOPPED;
3,381✔
4240

4241
    if (mndTransAppendRedoAction(pTrans, &action) != 0) {
3,381✔
4242
      taosMemoryFree(pHead);
×
4243
      code = terrno;
×
4244
      mndReleaseVgroup(pMnode, pVgroup);
×
4245
      goto _OVER;
×
4246
    }
4247
  }
4248

4249
  mndReleaseVgroup(pMnode, pVgroup);
1,127✔
4250

4251
  // Prepare and execute transaction
4252
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
1,127✔
4253
    goto _OVER;
×
4254
  }
4255

4256
  code = TSDB_CODE_ACTION_IN_PROGRESS;
1,127✔
4257

4258
_OVER:
1,127✔
4259
  if (pTrans != NULL) mndTransDrop(pTrans);
1,127✔
4260

4261
  return code;
1,127✔
4262
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc