• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5049

11 May 2026 06:30AM UTC coverage: 73.313% (+0.09%) from 73.222%
#5049

push

travis-ci

web-flow
feat: refactor taosdump code to improve backup speed and compression ratio (#35292)

6625 of 8435 new or added lines in 28 files covered. (78.54%)

2491 existing lines in 142 files now uncovered.

281233 of 383605 relevant lines covered (73.31%)

132489999.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.99
/source/dnode/mnode/impl/src/mndVgroup.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndVgroup.h"
18
#include "audit.h"
19
#include "mndArbGroup.h"
20
#include "mndDb.h"
21
#include "mndDnode.h"
22
#include "mndEncryptAlgr.h"
23
#include "mndMnode.h"
24
#include "mndPrivilege.h"
25
#include "mndShow.h"
26
#include "mndStb.h"
27
#include "mndStream.h"
28
#include "mndTopic.h"
29
#include "mndTrans.h"
30
#include "mndUser.h"
31
#include "tmisce.h"
32

33
#define VGROUP_VER_COMPAT_MOUNT_KEEP_VER 2
34
#define VGROUP_VER_NUMBER                VGROUP_VER_COMPAT_MOUNT_KEEP_VER
35
#define VGROUP_RESERVE_SIZE              60
36
// since 3.3.6.32/3.3.8.6 mountId + keepVersion + keepVersionTime + VGROUP_RESERVE_SIZE = 4 + 8 + 8 + 60 = 80
37
#define DLEN_AFTER_SYNC_CONF_CHANGE_VER 80
38

39
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup);
40
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup);
41
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew);
42
static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw);
43

44
static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
45
static void    mndCancelGetNextVgroup(SMnode *pMnode, void *pIter);
46
static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
47
static void    mndCancelGetNextVnode(SMnode *pMnode, void *pIter);
48

49
static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq);
50
static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq);
51
static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq);
52
static int32_t mndProcessVgroupBalanceLeaderMsg(SRpcMsg *pReq);
53
static int32_t mndProcessSetVgroupKeepVersionReq(SRpcMsg *pReq);
54

55
int32_t mndInitVgroup(SMnode *pMnode) {
543,505✔
56
  SSdbTable table = {
543,505✔
57
      .sdbType = SDB_VGROUP,
58
      .keyType = SDB_KEY_INT32,
59
      .encodeFp = (SdbEncodeFp)mndVgroupActionEncode,
60
      .decodeFp = (SdbDecodeFp)mndVgroupActionDecode,
61
      .insertFp = (SdbInsertFp)mndVgroupActionInsert,
62
      .updateFp = (SdbUpdateFp)mndVgroupActionUpdate,
63
      .deleteFp = (SdbDeleteFp)mndVgroupActionDelete,
64
      .validateFp = (SdbValidateFp)mndNewVgActionValidate,
65
  };
66

67
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndTransProcessRsp);
543,505✔
68
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_REPLICA_RSP, mndTransProcessRsp);
543,505✔
69
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIG_RSP, mndTransProcessRsp);
543,505✔
70
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIRM_RSP, mndTransProcessRsp);
543,505✔
71
  mndSetMsgHandle(pMnode, TDMT_VND_SET_KEEP_VERSION_RSP, mndTransProcessRsp);
543,505✔
72
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_HASHRANGE_RSP, mndTransProcessRsp);
543,505✔
73
  mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndTransProcessRsp);
543,505✔
74
  mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_RSP, mndTransProcessRsp);
543,505✔
75
  mndSetMsgHandle(pMnode, TDMT_VND_SCAN_RSP, mndTransProcessRsp);
543,505✔
76
  mndSetMsgHandle(pMnode, TDMT_VND_DISABLE_WRITE_RSP, mndTransProcessRsp);
543,505✔
77
  mndSetMsgHandle(pMnode, TDMT_SYNC_FORCE_FOLLOWER_RSP, mndTransProcessRsp);
543,505✔
78
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_ELECTBASELINE_RSP, mndTransProcessRsp);
543,505✔
79
  
80
  mndSetMsgHandle(pMnode, TDMT_DND_ALTER_VNODE_TYPE_RSP, mndTransProcessRsp);
543,505✔
81
  mndSetMsgHandle(pMnode, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP_RSP, mndTransProcessRsp);
543,505✔
82
  mndSetMsgHandle(pMnode, TDMT_SYNC_CONFIG_CHANGE_RSP, mndTransProcessRsp);
543,505✔
83

84
  mndSetMsgHandle(pMnode, TDMT_MND_REDISTRIBUTE_VGROUP, mndProcessRedistributeVgroupMsg);
543,505✔
85
  mndSetMsgHandle(pMnode, TDMT_MND_SPLIT_VGROUP, mndProcessSplitVgroupMsg);
543,505✔
86
  // mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessVgroupBalanceLeaderMsg);
87
  mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessBalanceVgroupMsg);
543,505✔
88
  mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP_LEADER, mndProcessVgroupBalanceLeaderMsg);
543,505✔
89
  mndSetMsgHandle(pMnode, TDMT_MND_SET_VGROUP_KEEP_VERSION, mndProcessSetVgroupKeepVersionReq);
543,505✔
90

91
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndRetrieveVgroups);
543,505✔
92
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndCancelGetNextVgroup);
543,505✔
93
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndRetrieveVnodes);
543,505✔
94
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndCancelGetNextVnode);
543,505✔
95

96
  return sdbSetTable(pMnode->pSdb, table);
543,505✔
97
}
98

99
void mndCleanupVgroup(SMnode *pMnode) {}
543,436✔
100

101
SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) {
14,341,305✔
102
  int32_t code = 0;
14,341,305✔
103
  int32_t lino = 0;
14,341,305✔
104
  terrno = TSDB_CODE_OUT_OF_MEMORY;
14,341,305✔
105

106
  SSdbRaw *pRaw = sdbAllocRaw(SDB_VGROUP, VGROUP_VER_NUMBER, sizeof(SVgObj) + VGROUP_RESERVE_SIZE);
14,341,305✔
107
  if (pRaw == NULL) goto _OVER;
14,341,305✔
108

109
  int32_t dataPos = 0;
14,341,305✔
110
  SDB_SET_INT32(pRaw, dataPos, pVgroup->vgId, _OVER)
14,341,305✔
111
  SDB_SET_INT64(pRaw, dataPos, pVgroup->createdTime, _OVER)
14,341,305✔
112
  SDB_SET_INT64(pRaw, dataPos, pVgroup->updateTime, _OVER)
14,341,305✔
113
  SDB_SET_INT32(pRaw, dataPos, pVgroup->version, _OVER)
14,341,305✔
114
  SDB_SET_INT32(pRaw, dataPos, pVgroup->hashBegin, _OVER)
14,341,305✔
115
  SDB_SET_INT32(pRaw, dataPos, pVgroup->hashEnd, _OVER)
14,341,305✔
116
  SDB_SET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, _OVER)
14,341,305✔
117
  SDB_SET_INT64(pRaw, dataPos, pVgroup->dbUid, _OVER)
14,341,305✔
118
  SDB_SET_INT8(pRaw, dataPos, pVgroup->isTsma, _OVER)
14,341,305✔
119
  SDB_SET_INT8(pRaw, dataPos, pVgroup->replica, _OVER)
14,341,305✔
120
  for (int8_t i = 0; i < pVgroup->replica; ++i) {
30,934,694✔
121
    SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
16,593,389✔
122
    SDB_SET_INT32(pRaw, dataPos, pVgid->dnodeId, _OVER)
16,593,389✔
123
  }
124
  SDB_SET_INT32(pRaw, dataPos, pVgroup->syncConfChangeVer, _OVER)
14,341,305✔
125
  SDB_SET_INT32(pRaw, dataPos, pVgroup->mountVgId, _OVER)
14,341,305✔
126
  SDB_SET_INT64(pRaw, dataPos, pVgroup->keepVersion, _OVER)
14,341,305✔
127
  SDB_SET_INT64(pRaw, dataPos, pVgroup->keepVersionTime, _OVER)
14,341,305✔
128
  SDB_SET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER)
14,341,305✔
129
  SDB_SET_DATALEN(pRaw, dataPos, _OVER)
14,341,305✔
130

131
  terrno = 0;
14,341,305✔
132

133
_OVER:
14,341,305✔
134
  if (terrno != 0) {
14,341,305✔
135
    mError("vgId:%d, failed to encode to raw:%p since %s", pVgroup->vgId, pRaw, terrstr());
×
136
    sdbFreeRaw(pRaw);
×
137
    return NULL;
×
138
  }
139

140
  mTrace("vgId:%d, encode to raw:%p, row:%p", pVgroup->vgId, pRaw, pVgroup);
14,341,305✔
141
  return pRaw;
14,341,305✔
142
}
143

144
SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
13,025,866✔
145
  int32_t code = 0;
13,025,866✔
146
  int32_t lino = 0;
13,025,866✔
147
  terrno = TSDB_CODE_OUT_OF_MEMORY;
13,025,866✔
148
  SSdbRow *pRow = NULL;
13,025,866✔
149
  SVgObj  *pVgroup = NULL;
13,025,866✔
150

151
  int8_t sver = 0;
13,025,866✔
152
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
13,025,866✔
153

154
  if (sver < 1 || sver > VGROUP_VER_NUMBER) {
13,025,866✔
155
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
156
    goto _OVER;
×
157
  }
158

159
  pRow = sdbAllocRow(sizeof(SVgObj));
13,025,866✔
160
  if (pRow == NULL) goto _OVER;
13,025,866✔
161

162
  pVgroup = sdbGetRowObj(pRow);
13,025,866✔
163
  if (pVgroup == NULL) goto _OVER;
13,025,866✔
164

165
  int32_t dataPos = 0;
13,025,866✔
166
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->vgId, _OVER)
13,025,866✔
167
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->createdTime, _OVER)
13,025,866✔
168
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->updateTime, _OVER)
13,025,866✔
169
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->version, _OVER)
13,025,866✔
170
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashBegin, _OVER)
13,025,866✔
171
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashEnd, _OVER)
13,025,866✔
172
  SDB_GET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, _OVER)
13,025,866✔
173
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->dbUid, _OVER)
13,025,866✔
174
  SDB_GET_INT8(pRaw, dataPos, &pVgroup->isTsma, _OVER)
13,025,866✔
175
  SDB_GET_INT8(pRaw, dataPos, &pVgroup->replica, _OVER)
13,025,866✔
176
  for (int8_t i = 0; i < pVgroup->replica; ++i) {
28,298,743✔
177
    SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
15,272,877✔
178
    SDB_GET_INT32(pRaw, dataPos, &pVgid->dnodeId, _OVER)
15,272,877✔
179
    if (pVgroup->replica == 1) {
15,272,877✔
180
      pVgid->syncState = TAOS_SYNC_STATE_LEADER;
11,851,985✔
181
    }
182
    pVgid->snapSeq = -1;
15,272,877✔
183
  }
184
  if (dataPos + 2 * sizeof(int32_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen) {
13,025,866✔
185
    SDB_GET_INT32(pRaw, dataPos, &pVgroup->syncConfChangeVer, _OVER)
13,025,866✔
186
  }
187

188
  int32_t dlenAfterSyncConfChangeVer = pRaw->dataLen - dataPos;
13,025,866✔
189
  if (dataPos + sizeof(int32_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen) {
13,025,866✔
190
    SDB_GET_INT32(pRaw, dataPos, &pVgroup->mountVgId, _OVER)
13,025,866✔
191
  }
192
  if (dataPos + sizeof(int64_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen) {
13,025,866✔
193
    SDB_GET_INT64(pRaw, dataPos, &pVgroup->keepVersion, _OVER)
13,025,866✔
194
  }
195
  if (dataPos + sizeof(int64_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen) {
13,025,866✔
196
    SDB_GET_INT64(pRaw, dataPos, &pVgroup->keepVersionTime, _OVER)
13,025,866✔
197
  }
198
  if (dataPos + VGROUP_RESERVE_SIZE <= pRaw->dataLen) {
13,025,866✔
199
    SDB_GET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER)
13,025,866✔
200
  }
201

202
  if (sver < VGROUP_VER_COMPAT_MOUNT_KEEP_VER) {
13,025,866✔
203
    if (dlenAfterSyncConfChangeVer == DLEN_AFTER_SYNC_CONF_CHANGE_VER) {
×
204
      pVgroup->mountVgId = 0;
×
205
    }
206
    pVgroup->keepVersion = -1;
×
207
    pVgroup->keepVersionTime = 0;
×
208
  }
209

210
  terrno = 0;
13,025,866✔
211

212
_OVER:
13,025,866✔
213
  if (terrno != 0) {
13,025,866✔
214
    mError("vgId:%d, failed to decode from raw:%p since %s", pVgroup == NULL ? 0 : pVgroup->vgId, pRaw, terrstr());
×
215
    taosMemoryFreeClear(pRow);
×
216
    return NULL;
×
217
  }
218

219
  mTrace("vgId:%d, decode from raw:%p, row:%p", pVgroup->vgId, pRaw, pVgroup);
13,025,866✔
220
  return pRow;
13,025,866✔
221
}
222

223
static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw) {
3,431,221✔
224
  SSdb    *pSdb = pMnode->pSdb;
3,431,221✔
225
  SSdbRow *pRow = NULL;
3,431,221✔
226
  SVgObj  *pVgroup = NULL;
3,431,221✔
227
  int      code = -1;
3,431,221✔
228

229
  pRow = mndVgroupActionDecode(pRaw);
3,431,221✔
230
  if (pRow == NULL) {
3,431,221✔
231
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
232
    if (terrno != 0) code = terrno;
×
233
    goto _OVER;
×
234
  }
235
  pVgroup = sdbGetRowObj(pRow);
3,431,221✔
236
  if (pVgroup == NULL) {
3,431,221✔
237
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
238
    if (terrno != 0) code = terrno;
×
239
    goto _OVER;
×
240
  }
241

242
  int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
3,431,221✔
243
  if (maxVgId > pVgroup->vgId) {
3,431,221✔
244
    mError("trans:%d, vgroup id %d already in use. maxVgId:%d", pTrans->id, pVgroup->vgId, maxVgId);
×
245
    goto _OVER;
×
246
  }
247

248
  code = 0;
3,431,221✔
249
_OVER:
3,431,221✔
250
  if (pVgroup) mndVgroupActionDelete(pSdb, pVgroup);
3,431,221✔
251
  taosMemoryFreeClear(pRow);
3,431,221✔
252
  TAOS_RETURN(code);
3,431,221✔
253
}
254

255
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup) {
3,827,891✔
256
  mTrace("vgId:%d, perform insert action, row:%p", pVgroup->vgId, pVgroup);
3,827,891✔
257
  return 0;
3,827,891✔
258
}
259

260
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup) {
13,011,466✔
261
  mTrace("vgId:%d, perform delete action, row:%p", pVgroup->vgId, pVgroup);
13,011,466✔
262
  return 0;
13,011,466✔
263
}
264

265
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew) {
3,916,607✔
266
  mTrace("vgId:%d, perform update action, old row:%p new row:%p", pOld->vgId, pOld, pNew);
3,916,607✔
267
  pOld->updateTime = pNew->updateTime;
3,916,607✔
268
  pOld->version = pNew->version;
3,916,607✔
269
  pOld->hashBegin = pNew->hashBegin;
3,916,607✔
270
  pOld->hashEnd = pNew->hashEnd;
3,916,607✔
271
  pOld->replica = pNew->replica;
3,916,607✔
272
  pOld->isTsma = pNew->isTsma;
3,916,607✔
273
  pOld->keepVersion = pNew->keepVersion;
3,916,607✔
274
  pOld->keepVersionTime = pNew->keepVersionTime;
3,916,607✔
275
  for (int32_t i = 0; i < pNew->replica; ++i) {
8,984,448✔
276
    SVnodeGid *pNewGid = &pNew->vnodeGid[i];
5,067,841✔
277
    for (int32_t j = 0; j < pOld->replica; ++j) {
13,657,270✔
278
      SVnodeGid *pOldGid = &pOld->vnodeGid[j];
8,589,429✔
279
      if (pNewGid->dnodeId == pOldGid->dnodeId) {
8,589,429✔
280
        pNewGid->syncState = pOldGid->syncState;
4,802,606✔
281
        pNewGid->syncRestore = pOldGid->syncRestore;
4,802,606✔
282
        pNewGid->syncCanRead = pOldGid->syncCanRead;
4,802,606✔
283
        pNewGid->syncAppliedIndex = pOldGid->syncAppliedIndex;
4,802,606✔
284
        pNewGid->syncCommitIndex = pOldGid->syncCommitIndex;
4,802,606✔
285
        pNewGid->bufferSegmentUsed = pOldGid->bufferSegmentUsed;
4,802,606✔
286
        pNewGid->bufferSegmentSize = pOldGid->bufferSegmentSize;
4,802,606✔
287
        pNewGid->learnerProgress = pOldGid->learnerProgress;
4,802,606✔
288
        pNewGid->snapSeq = pOldGid->snapSeq;
4,802,606✔
289
        pNewGid->syncTotalIndex = pOldGid->syncTotalIndex;
4,802,606✔
290
      }
291
    }
292
  }
293
  pNew->numOfTables = pOld->numOfTables;
3,916,607✔
294
  pNew->numOfTimeSeries = pOld->numOfTimeSeries;
3,916,607✔
295
  pNew->totalStorage = pOld->totalStorage;
3,916,607✔
296
  pNew->compStorage = pOld->compStorage;
3,916,607✔
297
  pNew->pointsWritten = pOld->pointsWritten;
3,916,607✔
298
  pNew->compact = pOld->compact;
3,916,607✔
299
  memcpy(pOld->vnodeGid, pNew->vnodeGid, (TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA) * sizeof(SVnodeGid));
3,916,607✔
300
  pOld->syncConfChangeVer = pNew->syncConfChangeVer;
3,916,607✔
301
  tstrncpy(pOld->dbName, pNew->dbName, TSDB_DB_FNAME_LEN);
3,916,607✔
302
  return 0;
3,916,607✔
303
}
304

305
SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId) {
187,316,890✔
306
  SSdb   *pSdb = pMnode->pSdb;
187,316,890✔
307
  SVgObj *pVgroup = sdbAcquire(pSdb, SDB_VGROUP, &vgId);
187,316,890✔
308
  if (pVgroup == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
187,316,890✔
309
    terrno = TSDB_CODE_MND_VGROUP_NOT_EXIST;
9,768,664✔
310
  }
311
  return pVgroup;
187,316,890✔
312
}
313

314
void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) {
177,821,421✔
315
  SSdb *pSdb = pMnode->pSdb;
177,821,421✔
316
  sdbRelease(pSdb, pVgroup);
177,821,421✔
317
}
177,821,421✔
318

319
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
3,734,575✔
320
  SCreateVnodeReq createReq = {0};
3,734,575✔
321
  createReq.vgId = pVgroup->vgId;
3,734,575✔
322
  memcpy(createReq.db, pDb->name, TSDB_DB_FNAME_LEN);
3,734,575✔
323
  createReq.dbUid = pDb->uid;
3,734,575✔
324
  createReq.vgVersion = pVgroup->version;
3,734,575✔
325
  createReq.numOfStables = pDb->cfg.numOfStables;
3,734,575✔
326
  createReq.buffer = pDb->cfg.buffer;
3,734,575✔
327
  createReq.pageSize = pDb->cfg.pageSize;
3,734,575✔
328
  createReq.pages = pDb->cfg.pages;
3,734,575✔
329
  createReq.cacheLastSize = pDb->cfg.cacheLastSize;
3,734,575✔
330
  createReq.cacheLastShardBits = pDb->cfg.cacheLastShardBits;
3,734,575✔
331
  createReq.daysPerFile = pDb->cfg.daysPerFile;
3,734,575✔
332
  createReq.daysToKeep0 = pDb->cfg.daysToKeep0;
3,734,575✔
333
  createReq.daysToKeep1 = pDb->cfg.daysToKeep1;
3,734,575✔
334
  createReq.daysToKeep2 = pDb->cfg.daysToKeep2;
3,734,575✔
335
  createReq.keepTimeOffset = pDb->cfg.keepTimeOffset;
3,734,575✔
336
  createReq.ssChunkSize = pDb->cfg.ssChunkSize;
3,734,575✔
337
  createReq.ssKeepLocal = pDb->cfg.ssKeepLocal;
3,734,575✔
338
  createReq.ssCompact = pDb->cfg.ssCompact;
3,734,575✔
339
  createReq.minRows = pDb->cfg.minRows;
3,734,575✔
340
  createReq.maxRows = pDb->cfg.maxRows;
3,734,575✔
341
  createReq.walFsyncPeriod = pDb->cfg.walFsyncPeriod;
3,734,575✔
342
  createReq.walLevel = pDb->cfg.walLevel;
3,734,575✔
343
  createReq.precision = pDb->cfg.precision;
3,734,575✔
344
  createReq.compression = pDb->cfg.compression;
3,734,575✔
345
  createReq.strict = pDb->cfg.strict;
3,734,575✔
346
  createReq.cacheLast = pDb->cfg.cacheLast;
3,734,575✔
347
  createReq.replica = 0;
3,734,575✔
348
  createReq.learnerReplica = 0;
3,734,575✔
349
  createReq.selfIndex = -1;
3,734,575✔
350
  createReq.learnerSelfIndex = -1;
3,734,575✔
351
  createReq.hashBegin = pVgroup->hashBegin;
3,734,575✔
352
  createReq.hashEnd = pVgroup->hashEnd;
3,734,575✔
353
  createReq.hashMethod = pDb->cfg.hashMethod;
3,734,575✔
354
  createReq.numOfRetensions = pDb->cfg.numOfRetensions;
3,734,575✔
355
  createReq.pRetensions = pDb->cfg.pRetensions;
3,734,575✔
356
  createReq.isTsma = pVgroup->isTsma;
3,734,575✔
357
  createReq.pTsma = pVgroup->pTsma;
3,734,575✔
358
  createReq.walRetentionPeriod = pDb->cfg.walRetentionPeriod;
3,734,575✔
359
  createReq.walRetentionSize = pDb->cfg.walRetentionSize;
3,734,575✔
360
  createReq.walRollPeriod = pDb->cfg.walRollPeriod;
3,734,575✔
361
  createReq.walSegmentSize = pDb->cfg.walSegmentSize;
3,734,575✔
362
  createReq.sstTrigger = pDb->cfg.sstTrigger;
3,734,575✔
363
  createReq.hashPrefix = pDb->cfg.hashPrefix;
3,734,575✔
364
  createReq.hashSuffix = pDb->cfg.hashSuffix;
3,734,575✔
365
  createReq.tsdbPageSize = pDb->cfg.tsdbPageSize;
3,734,575✔
366
  createReq.changeVersion = ++(pVgroup->syncConfChangeVer);
3,734,575✔
367
  // createReq.encryptAlgorithm = pDb->cfg.encryptAlgorithm;
368
  memset(createReq.encryptAlgrName, 0, TSDB_ENCRYPT_ALGR_NAME_LEN);
3,734,575✔
369
  if (pDb->cfg.encryptAlgorithm > 0) {
3,734,575✔
370
    mndGetEncryptOsslAlgrNameById(pMnode, pDb->cfg.encryptAlgorithm, createReq.encryptAlgrName);
5,644✔
371
  }
372
  createReq.isAudit = pDb->cfg.isAudit ? 1 : 0;
3,734,575✔
373
  createReq.allowDrop = pDb->cfg.allowDrop;
3,734,575✔
374
  createReq.securityLevel = pDb->cfg.securityLevel;
3,734,575✔
375
  createReq.secureDelete = pDb->cfg.secureDelete;
3,734,575✔
376
  int32_t code = 0;
3,734,575✔
377

378
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
8,745,629✔
379
    SReplica *pReplica = NULL;
5,011,054✔
380

381
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
5,011,054✔
382
      pReplica = &createReq.replicas[createReq.replica];
4,887,392✔
383
    } else {
384
      pReplica = &createReq.learnerReplicas[createReq.learnerReplica];
123,662✔
385
    }
386

387
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
5,011,054✔
388
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
5,011,054✔
389
    if (pVgidDnode == NULL) {
5,011,054✔
390
      return NULL;
×
391
    }
392

393
    pReplica->id = pVgidDnode->id;
5,011,054✔
394
    pReplica->port = pVgidDnode->port;
5,011,054✔
395
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
5,011,054✔
396
    mndReleaseDnode(pMnode, pVgidDnode);
5,011,054✔
397

398
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
5,011,054✔
399
      if (pDnode->id == pVgid->dnodeId) {
4,887,392✔
400
        createReq.selfIndex = createReq.replica;
3,610,913✔
401
      }
402
    } else {
403
      if (pDnode->id == pVgid->dnodeId) {
123,662✔
404
        createReq.learnerSelfIndex = createReq.learnerReplica;
123,662✔
405
      }
406
    }
407

408
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
5,011,054✔
409
      createReq.replica++;
4,887,392✔
410
    } else {
411
      createReq.learnerReplica++;
123,662✔
412
    }
413
  }
414

415
  if (createReq.selfIndex == -1 && createReq.learnerSelfIndex == -1) {
3,734,575✔
416
    terrno = TSDB_CODE_APP_ERROR;
×
417
    return NULL;
×
418
  }
419

420
  createReq.changeVersion = pVgroup->syncConfChangeVer;
3,734,575✔
421

422
  mInfo(
3,734,575✔
423
      "vgId:%d, build create vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d "
424
      "changeVersion:%d",
425
      createReq.vgId, createReq.replica, createReq.selfIndex, createReq.learnerReplica, createReq.learnerSelfIndex,
426
      createReq.strict, createReq.changeVersion);
427
  for (int32_t i = 0; i < createReq.replica; ++i) {
8,621,967✔
428
    mInfo("vgId:%d, replica:%d ep:%s:%u", createReq.vgId, i, createReq.replicas[i].fqdn, createReq.replicas[i].port);
4,887,392✔
429
  }
430
  for (int32_t i = 0; i < createReq.learnerReplica; ++i) {
3,858,237✔
431
    mInfo("vgId:%d, replica:%d ep:%s:%u", createReq.vgId, i, createReq.learnerReplicas[i].fqdn,
123,662✔
432
          createReq.learnerReplicas[i].port);
433
  }
434

435
  int32_t contLen = tSerializeSCreateVnodeReq(NULL, 0, &createReq);
3,734,575✔
436
  if (contLen < 0) {
3,734,575✔
437
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
438
    return NULL;
×
439
  }
440

441
  void *pReq = taosMemoryMalloc(contLen);
3,734,575✔
442
  if (pReq == NULL) {
3,734,575✔
443
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
444
    return NULL;
×
445
  }
446

447
  code = tSerializeSCreateVnodeReq(pReq, contLen, &createReq);
3,734,575✔
448
  if (code < 0) {
3,734,575✔
449
    terrno = TSDB_CODE_APP_ERROR;
×
450
    taosMemoryFree(pReq);
×
451
    mError("vgId:%d, failed to serialize create vnode req,since %s", createReq.vgId, terrstr());
×
452
    return NULL;
×
453
  }
454
  *pContLen = contLen;
3,734,575✔
455
  return pReq;
3,734,575✔
456
}
457

458
static void *mndBuildAlterVnodeConfigReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
228,380✔
459
  SAlterVnodeConfigReq alterReq = {0};
228,380✔
460
  alterReq.vgVersion = pVgroup->version;
228,380✔
461
  alterReq.buffer = pDb->cfg.buffer;
228,380✔
462
  alterReq.pageSize = pDb->cfg.pageSize;
228,380✔
463
  alterReq.pages = pDb->cfg.pages;
228,380✔
464
  alterReq.cacheLastSize = pDb->cfg.cacheLastSize;
228,380✔
465
  alterReq.cacheLastShardBits = pDb->cfg.cacheLastShardBits;
228,380✔
466
  alterReq.daysPerFile = pDb->cfg.daysPerFile;
228,380✔
467
  alterReq.daysToKeep0 = pDb->cfg.daysToKeep0;
228,380✔
468
  alterReq.daysToKeep1 = pDb->cfg.daysToKeep1;
228,380✔
469
  alterReq.daysToKeep2 = pDb->cfg.daysToKeep2;
228,380✔
470
  alterReq.keepTimeOffset = pDb->cfg.keepTimeOffset;
228,380✔
471
  alterReq.walFsyncPeriod = pDb->cfg.walFsyncPeriod;
228,380✔
472
  alterReq.walLevel = pDb->cfg.walLevel;
228,380✔
473
  alterReq.strict = pDb->cfg.strict;
228,380✔
474
  alterReq.cacheLast = pDb->cfg.cacheLast;
228,380✔
475
  alterReq.sttTrigger = pDb->cfg.sstTrigger;
228,380✔
476
  alterReq.minRows = pDb->cfg.minRows;
228,380✔
477
  alterReq.walRetentionPeriod = pDb->cfg.walRetentionPeriod;
228,380✔
478
  alterReq.walRetentionSize = pDb->cfg.walRetentionSize;
228,380✔
479
  alterReq.ssKeepLocal = pDb->cfg.ssKeepLocal;
228,380✔
480
  alterReq.ssCompact = pDb->cfg.ssCompact;
228,380✔
481
  alterReq.allowDrop = (int8_t)pDb->cfg.allowDrop;
228,380✔
482
  alterReq.securityLevel = (int8_t)pDb->cfg.securityLevel;
228,380✔
483
  alterReq.secureDelete = pDb->cfg.secureDelete;
228,380✔
484

485
  mInfo("vgId:%d, build alter vnode config req", pVgroup->vgId);
228,380✔
486
  int32_t contLen = tSerializeSAlterVnodeConfigReq(NULL, 0, &alterReq);
228,380✔
487
  if (contLen < 0) {
228,380✔
488
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
489
    return NULL;
×
490
  }
491
  contLen += sizeof(SMsgHead);
228,380✔
492

493
  void *pReq = taosMemoryMalloc(contLen);
228,380✔
494
  if (pReq == NULL) {
228,380✔
495
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
496
    return NULL;
×
497
  }
498

499
  SMsgHead *pHead = pReq;
228,380✔
500
  pHead->contLen = htonl(contLen);
228,380✔
501
  pHead->vgId = htonl(pVgroup->vgId);
228,380✔
502

503
  if (tSerializeSAlterVnodeConfigReq((char *)pReq + sizeof(SMsgHead), contLen, &alterReq) < 0) {
228,380✔
504
    taosMemoryFree(pReq);
×
505
    mError("vgId:%d, failed to serialize alter vnode config req,since %s", pVgroup->vgId, terrstr());
×
506
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
507
    return NULL;
×
508
  }
509
  *pContLen = contLen;
228,380✔
510
  return pReq;
228,380✔
511
}
512

513
static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId,
936,676✔
514
                                          int32_t *pContLen) {
515
  SAlterVnodeReplicaReq alterReq = {
1,873,352✔
516
      .vgId = pVgroup->vgId,
936,676✔
517
      .strict = pDb->cfg.strict,
936,676✔
518
      .replica = 0,
519
      .learnerReplica = 0,
520
      .selfIndex = -1,
521
      .learnerSelfIndex = -1,
522
      .changeVersion = ++(pVgroup->syncConfChangeVer),
1,873,352✔
523
  };
524

525
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
3,839,501✔
526
    SReplica *pReplica = NULL;
2,902,825✔
527

528
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
2,902,825✔
529
      pReplica = &alterReq.replicas[alterReq.replica];
2,675,520✔
530
      alterReq.replica++;
2,675,520✔
531
    } else {
532
      pReplica = &alterReq.learnerReplicas[alterReq.learnerReplica];
227,305✔
533
      alterReq.learnerReplica++;
227,305✔
534
    }
535

536
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
2,902,825✔
537
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
2,902,825✔
538
    if (pVgidDnode == NULL) return NULL;
2,902,825✔
539

540
    pReplica->id = pVgidDnode->id;
2,902,825✔
541
    pReplica->port = pVgidDnode->port;
2,902,825✔
542
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
2,902,825✔
543
    mndReleaseDnode(pMnode, pVgidDnode);
2,902,825✔
544

545
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
2,902,825✔
546
      if (dnodeId == pVgid->dnodeId) {
2,675,520✔
547
        alterReq.selfIndex = v;
936,676✔
548
      }
549
    } else {
550
      if (dnodeId == pVgid->dnodeId) {
227,305✔
551
        alterReq.learnerSelfIndex = v;
×
552
      }
553
    }
554
  }
555

556
  mInfo(
936,676✔
557
      "vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d "
558
      "changeVersion:%d",
559
      alterReq.vgId, alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica, alterReq.learnerSelfIndex,
560
      alterReq.strict, alterReq.changeVersion);
561
  for (int32_t i = 0; i < alterReq.replica; ++i) {
3,612,196✔
562
    mInfo("vgId:%d, replica:%d ep:%s:%u", alterReq.vgId, i, alterReq.replicas[i].fqdn, alterReq.replicas[i].port);
2,675,520✔
563
  }
564
  for (int32_t i = 0; i < alterReq.learnerReplica; ++i) {
1,163,981✔
565
    mInfo("vgId:%d, learnerReplica:%d ep:%s:%u", alterReq.vgId, i, alterReq.learnerReplicas[i].fqdn,
227,305✔
566
          alterReq.learnerReplicas[i].port);
567
  }
568

569
  if (alterReq.selfIndex == -1 && alterReq.learnerSelfIndex == -1) {
936,676✔
570
    terrno = TSDB_CODE_APP_ERROR;
×
571
    return NULL;
×
572
  }
573

574
  int32_t contLen = tSerializeSAlterVnodeReplicaReq(NULL, 0, &alterReq);
936,676✔
575
  if (contLen < 0) {
936,676✔
576
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
577
    return NULL;
×
578
  }
579

580
  void *pReq = taosMemoryMalloc(contLen);
936,676✔
581
  if (pReq == NULL) {
936,676✔
582
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
583
    return NULL;
×
584
  }
585

586
  if (tSerializeSAlterVnodeReplicaReq(pReq, contLen, &alterReq) < 0) {
936,676✔
587
    mError("vgId:%d, failed to serialize alter vnode req,since %s", alterReq.vgId, terrstr());
×
588
    taosMemoryFree(pReq);
×
589
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
590
    return NULL;
×
591
  }
592
  *pContLen = contLen;
936,676✔
593
  return pReq;
936,676✔
594
}
595

596
static void *mndBuildCheckLearnCatchupReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId,
×
597
                                          int32_t *pContLen) {
598
  SCheckLearnCatchupReq req = {
×
599
      .vgId = pVgroup->vgId,
×
600
      .strict = pDb->cfg.strict,
×
601
      .replica = 0,
602
      .learnerReplica = 0,
603
      .selfIndex = -1,
604
      .learnerSelfIndex = -1,
605
  };
606

607
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
×
608
    SReplica *pReplica = NULL;
×
609

610
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
611
      pReplica = &req.replicas[req.replica];
×
612
      req.replica++;
×
613
    } else {
614
      pReplica = &req.learnerReplicas[req.learnerReplica];
×
615
      req.learnerReplica++;
×
616
    }
617

618
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
×
619
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
×
620
    if (pVgidDnode == NULL) return NULL;
×
621

622
    pReplica->id = pVgidDnode->id;
×
623
    pReplica->port = pVgidDnode->port;
×
624
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
×
625
    mndReleaseDnode(pMnode, pVgidDnode);
×
626

627
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
628
      if (dnodeId == pVgid->dnodeId) {
×
629
        req.selfIndex = v;
×
630
      }
631
    } else {
632
      if (dnodeId == pVgid->dnodeId) {
×
633
        req.learnerSelfIndex = v;
×
634
      }
635
    }
636
  }
637

638
  mInfo("vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d",
×
639
        req.vgId, req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict);
640
  for (int32_t i = 0; i < req.replica; ++i) {
×
641
    mInfo("vgId:%d, replica:%d ep:%s:%u", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port);
×
642
  }
643
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
×
644
    mInfo("vgId:%d, learnerReplica:%d ep:%s:%u", req.vgId, i, req.learnerReplicas[i].fqdn, req.learnerReplicas[i].port);
×
645
  }
646

647
  if (req.selfIndex == -1 && req.learnerSelfIndex == -1) {
×
648
    terrno = TSDB_CODE_APP_ERROR;
×
649
    return NULL;
×
650
  }
651

652
  int32_t contLen = tSerializeSAlterVnodeReplicaReq(NULL, 0, &req);
×
653
  if (contLen < 0) {
×
654
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
655
    return NULL;
×
656
  }
657

658
  void *pReq = taosMemoryMalloc(contLen);
×
659
  if (pReq == NULL) {
×
660
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
661
    return NULL;
×
662
  }
663

664
  if (tSerializeSAlterVnodeReplicaReq(pReq, contLen, &req) < 0) {
×
665
    mError("vgId:%d, failed to serialize alter vnode req,since %s", req.vgId, terrstr());
×
666
    taosMemoryFree(pReq);
×
667
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
668
    return NULL;
×
669
  }
670
  *pContLen = contLen;
×
671
  return pReq;
×
672
}
673

674
static void *mndBuildDisableVnodeWriteReq(SMnode *pMnode, SDbObj *pDb, int32_t vgId, int32_t *pContLen) {
35,366✔
675
  SDisableVnodeWriteReq disableReq = {
35,366✔
676
      .vgId = vgId,
677
      .disable = 1,
678
  };
679

680
  mInfo("vgId:%d, build disable vnode write req", vgId);
35,366✔
681
  int32_t contLen = tSerializeSDisableVnodeWriteReq(NULL, 0, &disableReq);
35,366✔
682
  if (contLen < 0) {
35,366✔
683
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
684
    return NULL;
×
685
  }
686

687
  void *pReq = taosMemoryMalloc(contLen);
35,366✔
688
  if (pReq == NULL) {
35,366✔
689
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
690
    return NULL;
×
691
  }
692

693
  if (tSerializeSDisableVnodeWriteReq(pReq, contLen, &disableReq) < 0) {
35,366✔
694
    mError("vgId:%d, failed to serialize disable vnode write req,since %s", vgId, terrstr());
×
695
    taosMemoryFree(pReq);
×
696
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
697
    return NULL;
×
698
  }
699
  *pContLen = contLen;
35,366✔
700
  return pReq;
35,366✔
701
}
702

703
static void *mndBuildAlterVnodeHashRangeReq(SMnode *pMnode, int32_t srcVgId, SVgObj *pVgroup, int32_t *pContLen) {
35,366✔
704
  SAlterVnodeHashRangeReq alterReq = {
70,732✔
705
      .srcVgId = srcVgId,
706
      .dstVgId = pVgroup->vgId,
35,366✔
707
      .hashBegin = pVgroup->hashBegin,
35,366✔
708
      .hashEnd = pVgroup->hashEnd,
35,366✔
709
      .changeVersion = ++(pVgroup->syncConfChangeVer),
70,732✔
710
  };
711

712
  mInfo("vgId:%d, build alter vnode hashrange req, dstVgId:%d, hashrange:[%u, %u]", srcVgId, pVgroup->vgId,
35,366✔
713
        pVgroup->hashBegin, pVgroup->hashEnd);
714
  int32_t contLen = tSerializeSAlterVnodeHashRangeReq(NULL, 0, &alterReq);
35,366✔
715
  if (contLen < 0) {
35,366✔
716
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
717
    return NULL;
×
718
  }
719

720
  void *pReq = taosMemoryMalloc(contLen);
35,366✔
721
  if (pReq == NULL) {
35,366✔
722
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
723
    return NULL;
×
724
  }
725

726
  if (tSerializeSAlterVnodeHashRangeReq(pReq, contLen, &alterReq) < 0) {
35,366✔
727
    mError("vgId:%d, failed to serialize alter vnode hashrange req,since %s", srcVgId, terrstr());
×
728
    taosMemoryFree(pReq);
×
729
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
730
    return NULL;
×
731
  }
732
  *pContLen = contLen;
35,366✔
733
  return pReq;
35,366✔
734
}
735

736
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
5,531,954✔
737
  SDropVnodeReq dropReq = {0};
5,531,954✔
738
  dropReq.dnodeId = pDnode->id;
5,531,954✔
739
  dropReq.vgId = pVgroup->vgId;
5,531,954✔
740
  memcpy(dropReq.db, pDb->name, TSDB_DB_FNAME_LEN);
5,531,954✔
741
  dropReq.dbUid = pDb->uid;
5,531,954✔
742

743
  mInfo("vgId:%d, build drop vnode req", dropReq.vgId);
5,531,954✔
744
  int32_t contLen = tSerializeSDropVnodeReq(NULL, 0, &dropReq);
5,531,954✔
745
  if (contLen < 0) {
5,531,954✔
746
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
747
    return NULL;
×
748
  }
749

750
  void *pReq = taosMemoryMalloc(contLen);
5,531,954✔
751
  if (pReq == NULL) {
5,531,954✔
752
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
753
    return NULL;
×
754
  }
755

756
  if (tSerializeSDropVnodeReq(pReq, contLen, &dropReq) < 0) {
5,531,954✔
757
    mError("vgId:%d, failed to serialize drop vnode req,since %s", dropReq.vgId, terrstr());
×
758
    taosMemoryFree(pReq);
×
759
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
760
    return NULL;
×
761
  }
762
  *pContLen = contLen;
5,531,954✔
763
  return pReq;
5,531,954✔
764
}
765

766
static bool mndResetDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
2,422,861✔
767
  SDnodeObj *pDnode = pObj;
2,422,861✔
768
  pDnode->numOfVnodes = 0;
2,422,861✔
769
  pDnode->numOfOtherNodes = 0;
2,422,861✔
770
  return true;
2,422,861✔
771
}
772

773
static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
2,422,861✔
774
  SDnodeObj *pDnode = pObj;
2,422,861✔
775
  SArray    *pArray = p1;
2,422,861✔
776
  int32_t    exceptDnodeId = *(int32_t *)p2;
2,422,861✔
777
  SArray    *dnodeList = p3;
2,422,861✔
778

779
  if (exceptDnodeId == pDnode->id) {
2,422,861✔
780
    return true;
8,508✔
781
  }
782

783
  if (dnodeList != NULL) {
2,414,353✔
784
    int32_t dnodeListSize = taosArrayGetSize(dnodeList);
79,737✔
785
    if (dnodeListSize > 0) {
79,737✔
786
      bool inDnodeList = false;
79,737✔
787
      for (int32_t index = 0; index < dnodeListSize; ++index) {
259,836✔
788
        int32_t dnodeId = *(int32_t *)taosArrayGet(dnodeList, index);
180,099✔
789
        if (pDnode->id == dnodeId) {
180,099✔
790
          inDnodeList = true;
36,837✔
791
        }
792
      }
793
      if (!inDnodeList) {
79,737✔
794
        return true;
42,900✔
795
      }
796
    } else {
797
      return true;  // TS-6191
×
798
    }
799
  }
800

801
  int64_t curMs = taosGetTimestampMs();
2,371,453✔
802
  bool    online = mndIsDnodeOnline(pDnode, curMs);
2,371,453✔
803
  bool    isMnode = mndIsMnode(pMnode, pDnode->id);
2,371,453✔
804
  pDnode->numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
2,371,453✔
805
  pDnode->memUsed = mndGetVnodesMemory(pMnode, pDnode->id);
2,371,453✔
806

807
  mInfo("dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d online:%d memory avail:%" PRId64 " used:%" PRId64, pDnode->id,
2,371,453✔
808
        pDnode->numOfVnodes, pDnode->numOfSupportVnodes, isMnode, online, pDnode->memAvail, pDnode->memUsed);
809

810
  if (isMnode) {
2,371,453✔
811
    pDnode->numOfOtherNodes++;
1,737,044✔
812
  }
813

814
  if (online && pDnode->numOfSupportVnodes > 0) {
2,371,453✔
815
    if (taosArrayPush(pArray, pDnode) == NULL) return false;
2,313,514✔
816
  }
817
  return true;
2,371,453✔
818
}
819

820
static bool isDnodeInList(SArray *dnodeList, int32_t dnodeId) {
×
821
  int32_t dnodeListSize = taosArrayGetSize(dnodeList);
×
822
  for (int32_t i = 0; i < dnodeListSize; ++i) {
×
823
    int32_t id = *(int32_t *)TARRAY_GET_ELEM(dnodeList, i);
×
824
    if (id == dnodeId) {
×
825
      return true;
×
826
    }
827
  }
828
  return false;
×
829
}
830

831
#ifdef TD_ENTERPRISE
832
static float mndGetDnodeScore1(SDnodeObj *pDnode, int32_t additionDnodes, float ratio) {
×
833
  float totalDnodes = pDnode->numOfVnodes + (float)pDnode->numOfOtherNodes * ratio + additionDnodes;
×
834
  float result = totalDnodes / pDnode->numOfSupportVnodes;
×
835
  return pDnode->numOfVnodes > 0 ? -result : result;
×
836
}
837

838
static int32_t mndCompareDnodeVnodes1(SDnodeObj *pDnode1, SDnodeObj *pDnode2) {
×
839
  float d1Score = mndGetDnodeScore1(pDnode1, 0, 0.9);
×
840
  float d2Score = mndGetDnodeScore1(pDnode2, 0, 0.9);
×
841
  if (d1Score == d2Score) {
×
842
    if (pDnode1->id == pDnode2->id) {
×
843
      return 0;
×
844
    }
845
    return pDnode1->id > pDnode2->id ? 1 : -1;
×
846
  }
847
  return d1Score > d2Score ? 1 : -1;
×
848
}
849

850
static bool mndBuildDnodesListFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
×
851
  SDnodeObj *pDnode = pObj;
×
852
  SArray    *pArray = p1;
×
853

854
  bool isMnode = mndIsMnode(pMnode, pDnode->id);
×
855
  pDnode->numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
×
856

857
  if (isMnode) {
×
858
    pDnode->numOfOtherNodes++;
×
859
  }
860

861
  if (pDnode->numOfSupportVnodes > 0) {
×
862
    if (taosArrayPush(pArray, pDnode) == NULL) return false;
×
863
  }
864
  return true;
×
865
}
866

867
// TS-6191
868
static int32_t mndBuildNodesCheckDualReplica(SMnode *pMnode, int32_t nDnodes, SArray *dnodeList, SArray **ppDnodeList) {
1,692,271✔
869
  int32_t code = 0;
1,692,271✔
870
  if (!grantCheckDualReplicaDnodes(pMnode)) {
1,692,271✔
871
    TAOS_RETURN(code);
1,692,271✔
872
  }
873
  SSdb   *pSdb = pMnode->pSdb;
×
874
  SArray *pArray = taosArrayInit(nDnodes, sizeof(SDnodeObj));
×
875
  if (pArray == NULL) {
×
876
    TAOS_RETURN(code = terrno);
×
877
  }
878
  *ppDnodeList = pArray;
×
879

880
  sdbTraverse(pSdb, SDB_DNODE, mndResetDnodesArrayFp, NULL, NULL, NULL);
×
881
  sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesListFp, pArray, NULL, NULL);
×
882

883
  int32_t arrSize = taosArrayGetSize(pArray);
×
884
  if (arrSize <= 0) {
×
885
    TAOS_RETURN(code);
×
886
  }
887
  if (arrSize > 1) taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes1);
×
888

889
  int32_t dnodeListSize = taosArrayGetSize(dnodeList);
×
890
  if (dnodeListSize <= 0) {
×
891
    if (arrSize > 2) taosArrayRemoveBatch(pArray, 2, arrSize - 2, NULL);
×
892
  } else {
893
    int32_t nDnodesWithVnodes = 0;
×
894
    for (int32_t i = 0; i < arrSize; ++i) {
×
895
      SDnodeObj *pDnode = TARRAY_GET_ELEM(pArray, i);
×
896
      if (pDnode->numOfVnodes <= 0) {
×
897
        break;
×
898
      }
899
      ++nDnodesWithVnodes;
×
900
    }
901
    int32_t dnodeId = -1;
×
902
    if (nDnodesWithVnodes == 1) {
×
903
      dnodeId = ((SDnodeObj *)TARRAY_GET_ELEM(pArray, 0))->id;
×
904
    } else if (nDnodesWithVnodes >= 2) {
×
905
      // must select the dnodes from the 1st 2 dnodes
906
      taosArrayRemoveBatch(pArray, 2, arrSize - 2, NULL);
×
907
    }
908
    for (int32_t i = 0; i < TARRAY_SIZE(pArray);) {
×
909
      SDnodeObj *pDnode = taosArrayGet(pArray, i);
×
910
      if (!isDnodeInList(dnodeList, pDnode->id)) {
×
911
        taosArrayRemove(pArray, i);
×
912
        continue;
×
913
      }
914
      ++i;
×
915
    }
916
    if (nDnodesWithVnodes == 1) {
×
917
      SDnodeObj *pDnode = taosArrayGet(pArray, 0);
×
918
      if (pDnode && (pDnode->id != dnodeId)) {  // the first dnode is not in dnodeList, remove the last element
×
919
        taosArrayRemove(pArray, taosArrayGetSize(pArray) - 1);
×
920
      }
921
    }
922
  }
923

924
  TAOS_RETURN(code);
×
925
}
926
#endif
927

928
SArray *mndBuildDnodesArray(SMnode *pMnode, int32_t exceptDnodeId, SArray *dnodeList) {
1,692,271✔
929
  SSdb   *pSdb = pMnode->pSdb;
1,692,271✔
930
  int32_t numOfDnodes = mndGetDnodeSize(pMnode);
1,692,271✔
931
  SArray *tDnodeList = NULL;
1,692,271✔
932
  SArray *pDnodeList = NULL;
1,692,271✔
933

934
  SArray *pArray = taosArrayInit(numOfDnodes, sizeof(SDnodeObj));
1,692,271✔
935
  if (pArray == NULL) {
1,692,271✔
936
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
937
    return NULL;
×
938
  }
939
  if (taosArrayGetSize(dnodeList) > 0) {
1,692,271✔
940
    tDnodeList = dnodeList;
16,356✔
941
  }
942
#ifdef TD_ENTERPRISE
943
  if (0 != mndBuildNodesCheckDualReplica(pMnode, numOfDnodes, tDnodeList, &pDnodeList)) {
1,692,271✔
944
    taosArrayDestroy(pArray);
×
945
    return NULL;
×
946
  }
947
#endif
948
  sdbTraverse(pSdb, SDB_DNODE, mndResetDnodesArrayFp, NULL, NULL, NULL);
1,692,271✔
949
  sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesArrayFp, pArray, &exceptDnodeId, pDnodeList ? pDnodeList : tDnodeList);
1,692,271✔
950

951
  mDebug("build %d dnodes array", (int32_t)taosArrayGetSize(pArray));
1,692,271✔
952
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
4,005,785✔
953
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
2,313,514✔
954
    mDebug("dnode:%d, vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes);
2,313,514✔
955
  }
956
  taosArrayDestroy(pDnodeList);
1,692,271✔
957
  return pArray;
1,692,271✔
958
}
959

960
static int32_t mndCompareDnodeId(int32_t *dnode1Id, int32_t *dnode2Id) {
×
961
  if (*dnode1Id == *dnode2Id) {
×
962
    return 0;
×
963
  }
964
  return *dnode1Id > *dnode2Id ? 1 : -1;
×
965
}
966

967
static float mndGetDnodeScore(SDnodeObj *pDnode, int32_t additionDnodes, float ratio) {
11,479,919✔
968
  float totalDnodes = pDnode->numOfVnodes + (float)pDnode->numOfOtherNodes * ratio + additionDnodes;
11,479,919✔
969
  return totalDnodes / pDnode->numOfSupportVnodes;
11,479,919✔
970
}
971

972
static int32_t mndCompareDnodeVnodes(SDnodeObj *pDnode1, SDnodeObj *pDnode2) {
3,519,195✔
973
  float d1Score = mndGetDnodeScore(pDnode1, 0, 0.9);
3,519,195✔
974
  float d2Score = mndGetDnodeScore(pDnode2, 0, 0.9);
3,519,195✔
975
  if (d1Score == d2Score) {
3,519,195✔
976
    return 0;
1,172,034✔
977
  }
978
  return d1Score > d2Score ? 1 : -1;
2,347,161✔
979
}
980

981
void mndSortVnodeGid(SVgObj *pVgroup) {
3,284,237✔
982
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
6,950,826✔
983
    for (int32_t j = 0; j < pVgroup->replica - 1 - i; ++j) {
4,235,757✔
984
      if (pVgroup->vnodeGid[j].dnodeId > pVgroup->vnodeGid[j + 1].dnodeId) {
569,168✔
985
        TSWAP(pVgroup->vnodeGid[j], pVgroup->vnodeGid[j + 1]);
253,094✔
986
      }
987
    }
988
  }
989
}
3,284,237✔
990

991
static int32_t mndGetAvailableDnode(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, SArray *pArray) {
3,252,038✔
992
  mDebug("start to sort %d dnodes", (int32_t)taosArrayGetSize(pArray));
3,252,038✔
993
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
3,252,038✔
994
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
7,645,841✔
995
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
4,393,803✔
996
    mDebug("dnode:%d, score:%f", pDnode->id, mndGetDnodeScore(pDnode, 0, 0.9));
4,393,803✔
997
  }
998

999
  int32_t size = taosArrayGetSize(pArray);
3,252,038✔
1000
  if (size < pVgroup->replica) {
3,252,038✔
1001
    mError("db:%s, vgId:%d, no enough online dnodes:%d to alloc %d replica", pVgroup->dbName, pVgroup->vgId, size,
6,213✔
1002
           pVgroup->replica);
1003
    TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_DNODES);
6,213✔
1004
  }
1005

1006
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
6,806,958✔
1007
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
3,561,133✔
1008
    SDnodeObj *pDnode = taosArrayGet(pArray, v);
3,561,133✔
1009
    if (pDnode == NULL) {
3,561,133✔
1010
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_DNODES);
×
1011
    }
1012
    if (pDnode->numOfVnodes >= pDnode->numOfSupportVnodes) {
3,561,133✔
1013
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_VNODES);
×
1014
    }
1015

1016
    int64_t vgMem = mndGetVgroupMemory(pMnode, pDb, pVgroup);
3,561,133✔
1017
    if (pDnode->memAvail - vgMem - pDnode->memUsed <= 0) {
3,561,133✔
1018
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d, avail:%" PRId64 " used:%" PRId64,
×
1019
             pVgroup->dbName, pVgroup->vgId, vgMem, pDnode->id, pDnode->memAvail, pDnode->memUsed);
1020
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE);
×
1021
    } else {
1022
      pDnode->memUsed += vgMem;
3,561,133✔
1023
    }
1024

1025
    pVgid->dnodeId = pDnode->id;
3,561,133✔
1026
    if (pVgroup->replica == 1) {
3,561,133✔
1027
      pVgid->syncState = TAOS_SYNC_STATE_LEADER;
3,084,611✔
1028
    } else {
1029
      pVgid->syncState = TAOS_SYNC_STATE_FOLLOWER;
476,522✔
1030
    }
1031

1032
    mInfo("db:%s, vgId:%d, vn:%d is alloced, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
3,561,133✔
1033
          pVgroup->dbName, pVgroup->vgId, v, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
1034
    pDnode->numOfVnodes++;
3,561,133✔
1035
  }
1036

1037
  mndSortVnodeGid(pVgroup);
3,245,825✔
1038
  return 0;
3,245,825✔
1039
}
1040

1041
int32_t mndAllocSmaVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup) {
×
1042
  int32_t code = 0;
×
1043
  SArray *pArray = mndBuildDnodesArray(pMnode, 0, NULL);
×
1044
  if (pArray == NULL) {
×
1045
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1046
    if (terrno != 0) code = terrno;
×
1047
    TAOS_RETURN(code);
×
1048
  }
1049

1050
  pVgroup->vgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
×
1051
  pVgroup->isTsma = 1;
×
1052
  pVgroup->createdTime = taosGetTimestampMs();
×
1053
  pVgroup->updateTime = pVgroup->createdTime;
×
1054
  pVgroup->version = 1;
×
1055
  memcpy(pVgroup->dbName, pDb->name, TSDB_DB_FNAME_LEN);
×
1056
  pVgroup->dbUid = pDb->uid;
×
1057
  pVgroup->replica = 1;
×
1058
  pVgroup->keepVersion = -1;  // default: WAL keep version disabled
×
1059
  pVgroup->keepVersionTime = 0;
×
1060

1061
  if (mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray) != 0) return -1;
×
1062
  taosArrayDestroy(pArray);
×
1063

1064
  mInfo("db:%s, sma vgId:%d is alloced", pDb->name, pVgroup->vgId);
×
1065
  return 0;
×
1066
}
1067

1068
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups, SArray *dnodeList) {
1,554,013✔
1069
  int32_t code = -1;
1,554,013✔
1070
  SArray *pArray = NULL;
1,554,013✔
1071
  SVgObj *pVgroups = NULL;
1,554,013✔
1072

1073
  pVgroups = taosMemoryCalloc(pDb->cfg.numOfVgroups, sizeof(SVgObj));
1,554,013✔
1074
  if (pVgroups == NULL) {
1,554,013✔
1075
    code = terrno;
×
1076
    goto _OVER;
×
1077
  }
1078

1079
  pArray = mndBuildDnodesArray(pMnode, 0, dnodeList);
1,554,013✔
1080
  if (pArray == NULL) {
1,554,013✔
1081
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1082
    if (terrno != 0) code = terrno;
×
1083
    goto _OVER;
×
1084
  }
1085

1086
  mInfo("db:%s, total %d dnodes used to create %d vgroups (%d vnodes)", pDb->name, (int32_t)taosArrayGetSize(pArray),
1,554,013✔
1087
        pDb->cfg.numOfVgroups, pDb->cfg.numOfVgroups * pDb->cfg.replications);
1088

1089
  int32_t  allocedVgroups = 0;
1,554,013✔
1090
  int32_t  maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
1,554,013✔
1091
  uint32_t hashMin = 0;
1,554,013✔
1092
  uint32_t hashMax = UINT32_MAX;
1,554,013✔
1093
  uint32_t hashInterval = (hashMax - hashMin) / pDb->cfg.numOfVgroups;
1,554,013✔
1094

1095
  if (maxVgId < 2) maxVgId = 2;
1,554,013✔
1096

1097
  for (uint32_t v = 0; v < pDb->cfg.numOfVgroups; v++) {
4,799,838✔
1098
    SVgObj *pVgroup = &pVgroups[v];
3,252,038✔
1099
    pVgroup->vgId = maxVgId++;
3,252,038✔
1100
    pVgroup->createdTime = taosGetTimestampMs();
3,252,038✔
1101
    pVgroup->updateTime = pVgroups->createdTime;
3,252,038✔
1102
    pVgroup->version = 1;
3,252,038✔
1103
    pVgroup->hashBegin = hashMin + hashInterval * v;
3,252,038✔
1104
    if (v == pDb->cfg.numOfVgroups - 1) {
3,252,038✔
1105
      pVgroup->hashEnd = hashMax;
1,551,061✔
1106
    } else {
1107
      pVgroup->hashEnd = hashMin + hashInterval * (v + 1) - 1;
1,700,977✔
1108
    }
1109

1110
    memcpy(pVgroup->dbName, pDb->name, TSDB_DB_FNAME_LEN);
3,252,038✔
1111
    pVgroup->dbUid = pDb->uid;
3,252,038✔
1112
    pVgroup->replica = pDb->cfg.replications;
3,252,038✔
1113
    pVgroup->keepVersion = -1;  // default: WAL keep version disabled
3,252,038✔
1114
    pVgroup->keepVersionTime = 0;
3,252,038✔
1115

1116
    if ((code = mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray)) != 0) {
3,252,038✔
1117
      goto _OVER;
6,213✔
1118
    }
1119

1120
    allocedVgroups++;
3,245,825✔
1121
  }
1122

1123
  *ppVgroups = pVgroups;
1,547,800✔
1124
  code = 0;
1,547,800✔
1125

1126
  mInfo("db:%s, total %d vgroups is alloced, replica:%d", pDb->name, pDb->cfg.numOfVgroups, pDb->cfg.replications);
1,547,800✔
1127

1128
_OVER:
×
1129
  if (code != 0) taosMemoryFree(pVgroups);
1,554,013✔
1130
  taosArrayDestroy(pArray);
1,554,013✔
1131
  TAOS_RETURN(code);
1,554,013✔
1132
}
1133

1134
SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup) {
37,801,174✔
1135
  SEpSet epset = {0};
37,801,174✔
1136

1137
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
85,086,178✔
1138
    const SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
47,285,004✔
1139
    SDnodeObj       *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
47,285,004✔
1140
    if (pDnode == NULL) continue;
47,285,004✔
1141

1142
    if (pVgid->syncState == TAOS_SYNC_STATE_LEADER || pVgid->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
47,268,367✔
1143
      epset.inUse = epset.numOfEps;
37,145,647✔
1144
    }
1145

1146
    if (addEpIntoEpSet(&epset, pDnode->fqdn, pDnode->port) != 0) {
47,268,367✔
1147
      mWarn("vgId:%d, failed to add ep:%s:%d into epset", pVgroup->vgId, pDnode->fqdn, pDnode->port);
×
1148
    }
1149
    mndReleaseDnode(pMnode, pDnode);
47,268,367✔
1150
  }
1151
  epsetSort(&epset);
37,801,174✔
1152

1153
  return epset;
37,801,174✔
1154
}
1155

1156
SEpSet mndGetVgroupEpsetById(SMnode *pMnode, int32_t vgId) {
872,387✔
1157
  SEpSet epset = {0};
872,387✔
1158

1159
  SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
872,387✔
1160
  if (!pVgroup) return epset;
872,387✔
1161

1162
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
1,816,969✔
1163
    const SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
944,582✔
1164
    SDnodeObj       *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
944,582✔
1165
    if (pDnode == NULL) continue;
944,582✔
1166

1167
    if (pVgid->syncState == TAOS_SYNC_STATE_LEADER || pVgid->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
944,582✔
1168
      epset.inUse = epset.numOfEps;
839,772✔
1169
    }
1170

1171
    if (addEpIntoEpSet(&epset, pDnode->fqdn, pDnode->port) != 0) {
944,582✔
1172
      mWarn("vgId:%d, failed to add ep:%s:%d into epset", pVgroup->vgId, pDnode->fqdn, pDnode->port);
×
1173
    }
1174
    mndReleaseDnode(pMnode, pDnode);
944,582✔
1175
  }
1176

1177
  mndReleaseVgroup(pMnode, pVgroup);
872,387✔
1178
  return epset;
872,387✔
1179
}
1180

1181
static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
296,871✔
1182
  SMnode   *pMnode = pReq->info.node;
296,871✔
1183
  SSdb     *pSdb = pMnode->pSdb;
296,871✔
1184
  int32_t   numOfRows = 0;
296,871✔
1185
  SVgObj   *pVgroup = NULL;
296,871✔
1186
  SDbObj   *pVgDb = NULL;
296,871✔
1187
  int32_t   cols = 0;
296,871✔
1188
  int64_t   curMs = taosGetTimestampMs();
296,871✔
1189
  int32_t   code = 0, lino = 0;
296,871✔
1190
  SDbObj   *pDb = NULL;
296,871✔
1191
  SUserObj *pUser = NULL;
296,871✔
1192
  SDbObj   *pIterDb = NULL;
296,871✔
1193
  char      objFName[TSDB_OBJ_FNAME_LEN + 1] = {0};
296,871✔
1194
  bool      showAll = false, showIter = false;
296,871✔
1195
  int64_t   dbUid = 0;
296,871✔
1196

1197
  MND_SHOW_CHECK_OBJ_PRIVILEGE_ALL(RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), PRIV_SHOW_VGROUPS, PRIV_OBJ_DB, 0, _OVER);
296,871✔
1198

1199
  if (strlen(pShow->db) > 0) {
296,871✔
1200
    pDb = mndAcquireDb(pMnode, pShow->db);
255,180✔
1201
    if (pDb == NULL) {
255,180✔
1202
      goto _OVER;
×
1203
    }
1204
  }
1205

1206
  while (numOfRows < rows) {
1,686,864✔
1207
    pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
1,686,864✔
1208
    if (pShow->pIter == NULL) break;
1,686,864✔
1209

1210
    if (pDb != NULL && pVgroup->dbUid != pDb->uid) {
1,389,993✔
1211
      sdbRelease(pSdb, pVgroup);
487,121✔
1212
      continue;
487,121✔
1213
    }
1214

1215
    MND_SHOW_CHECK_DB_PRIVILEGE(pDb, pVgroup->dbName, pVgroup, RPC_MSG_TOKEN(pReq), MND_OPER_SHOW_VGROUPS, _OVER);
902,872✔
1216

1217
    cols = 0;
901,120✔
1218
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
901,120✔
1219
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->vgId, false, pVgroup, pShow->pIter, _OVER);
901,120✔
1220

1221
    SName name = {0};
901,120✔
1222
    char  db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
901,120✔
1223
    code = tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB);
901,120✔
1224
    if (code != 0) {
901,120✔
1225
      mError("vgId:%d, failed to set dbName, since %s", pVgroup->vgId, tstrerror(code));
×
1226
      sdbRelease(pSdb, pVgroup);
×
1227
      // sdbCancelFetch(pSdb, pShow->pIter);
1228
      goto _OVER;
×
1229
    }
1230
    (void)tNameGetDbName(&name, varDataVal(db));
901,120✔
1231
    varDataSetLen(db, strlen(varDataVal(db)));
901,120✔
1232

1233
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
901,120✔
1234
    COL_DATA_SET_VAL_GOTO((const char *)db, false, pVgroup, pShow->pIter, _OVER);
901,120✔
1235

1236
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
901,120✔
1237
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->numOfTables, false, pVgroup, pShow->pIter, _OVER);
901,120✔
1238

1239
    bool isReady = false;
901,120✔
1240
    bool isLeaderRestored = false;
901,120✔
1241
    bool hasFollowerRestored = false;
901,120✔
1242
    ESyncState leaderState = TAOS_SYNC_STATE_OFFLINE;
901,120✔
1243
    // default 3 replica, add 1 replica if move vnode
1244
    for (int32_t i = 0; i < 4; ++i) {
4,505,600✔
1245
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3,604,480✔
1246
      if (i < pVgroup->replica) {
3,604,480✔
1247
        int16_t dnodeId = (int16_t)pVgroup->vnodeGid[i].dnodeId;
1,901,951✔
1248
        COL_DATA_SET_VAL_GOTO((const char *)&dnodeId, false, pVgroup, pShow->pIter, _OVER);
1,901,951✔
1249

1250
        bool       exist = false;
1,901,951✔
1251
        bool       online = false;
1,901,951✔
1252
        SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgroup->vnodeGid[i].dnodeId);
1,901,951✔
1253
        if (pDnode != NULL) {
1,901,951✔
1254
          exist = true;
1,901,951✔
1255
          online = mndIsDnodeOnline(pDnode, curMs);
1,901,951✔
1256
          mndReleaseDnode(pMnode, pDnode);
1,901,951✔
1257
        }
1258

1259
        char buf1[20] = {0};
1,901,951✔
1260
        char role[20] = "offline";
1,901,951✔
1261
        if (!exist) {
1,901,951✔
1262
          tstrncpy(role, "dropping", sizeof(role));
×
1263
        } else if (online) {
1,901,951✔
1264
          if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEADER ||
1,879,230✔
1265
              pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
1,170,121✔
1266
            if (pVgroup->vnodeGid[i].syncRestore) {
709,109✔
1267
              isLeaderRestored = true;
626,238✔
1268
            }
1269
          } else if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_FOLLOWER) {
1,170,121✔
1270
            if (pVgroup->vnodeGid[i].syncRestore) {
962,494✔
1271
              hasFollowerRestored = true;
553,695✔
1272
            }
1273
          }
1274
          if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEADER ||
1,879,230✔
1275
              pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER)
1,170,121✔
1276
            leaderState = pVgroup->vnodeGid[i].syncState;
709,109✔
1277
          snprintf(role, sizeof(role), "%s", syncStr(pVgroup->vnodeGid[i].syncState));
1,879,230✔
1278
        }
1279
        STR_WITH_MAXSIZE_TO_VARSTR(buf1, role, pShow->pMeta->pSchemas[cols].bytes);
1,901,951✔
1280

1281
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,901,951✔
1282
        COL_DATA_SET_VAL_GOTO((const char *)buf1, false, pVgroup, pShow->pIter, _OVER);
1,901,951✔
1283

1284
        char applyStr[TSDB_SYNC_APPLY_COMMIT_LEN + 1] = {0};
1,901,951✔
1285
        char buf[TSDB_SYNC_APPLY_COMMIT_LEN + VARSTR_HEADER_SIZE + 1] = {0};
1,901,951✔
1286

1287
        if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEARNER &&
1,901,951✔
1288
            (pVgroup->vnodeGid[i].snapSeq > 0 && pVgroup->vnodeGid[i].snapSeq < SYNC_SNAPSHOT_SEQ_END)) {
40,827✔
1289
          if (pDb != NULL) {
×
1290
            mInfo("db:%s, learner progress:%d", pDb->name, pVgroup->vnodeGid[i].learnerProgress);
×
1291
          } else {
1292
            mInfo("db:null, learner progress:%d", pVgroup->vnodeGid[i].learnerProgress);
×
1293
          }
1294

1295
          snprintf(applyStr, sizeof(applyStr), "%" PRId64 "/%" PRId64 "/%" PRId64 "(snap:%d)(learner:%d)",
×
1296
                   pVgroup->vnodeGid[i].syncAppliedIndex, pVgroup->vnodeGid[i].syncCommitIndex,
×
1297
                   pVgroup->vnodeGid[i].syncTotalIndex, pVgroup->vnodeGid[i].snapSeq,
×
1298
                   pVgroup->vnodeGid[i].learnerProgress);
×
1299
        } else if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEARNER) {
1,901,951✔
1300
          if (pDb != NULL) {
40,827✔
1301
            mInfo("db:%s, learner progress:%d", pDb->name, pVgroup->vnodeGid[i].learnerProgress);
38,567✔
1302
          } else {
1303
            mInfo("db:null, learner progress:%d", pVgroup->vnodeGid[i].learnerProgress);
2,260✔
1304
          }
1305

1306
          snprintf(applyStr, sizeof(applyStr), "%" PRId64 "/%" PRId64 "/%" PRId64 "(learner:%d)",
163,308✔
1307
                   pVgroup->vnodeGid[i].syncAppliedIndex, pVgroup->vnodeGid[i].syncCommitIndex,
81,654✔
1308
                   pVgroup->vnodeGid[i].syncTotalIndex, pVgroup->vnodeGid[i].learnerProgress);
81,654✔
1309
        } else if (pVgroup->vnodeGid[i].snapSeq > 0 && pVgroup->vnodeGid[i].snapSeq < SYNC_SNAPSHOT_SEQ_END) {
1,861,124✔
1310
          snprintf(applyStr, sizeof(applyStr), "%" PRId64 "/%" PRId64 "(snap:%d)",
1,896✔
1311
                   pVgroup->vnodeGid[i].syncAppliedIndex, pVgroup->vnodeGid[i].syncCommitIndex,
1,264✔
1312
                   pVgroup->vnodeGid[i].snapSeq);
632✔
1313
        } else {
1314
          snprintf(applyStr, sizeof(applyStr), "%" PRId64 "/%" PRId64, pVgroup->vnodeGid[i].syncAppliedIndex,
1,860,492✔
1315
                   pVgroup->vnodeGid[i].syncCommitIndex);
1,860,492✔
1316
        }
1317

1318
        STR_WITH_MAXSIZE_TO_VARSTR(buf, applyStr, pShow->pMeta->pSchemas[cols].bytes);
1,901,951✔
1319

1320
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,901,951✔
1321
        COL_DATA_SET_VAL_GOTO((const char *)&buf, false, pVgroup, pShow->pIter, _OVER);
1,901,951✔
1322
      } else {
1323
        colDataSetNULL(pColInfo, numOfRows);
1,702,529✔
1324
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,702,529✔
1325
        colDataSetNULL(pColInfo, numOfRows);
1,702,529✔
1326
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,702,529✔
1327
        colDataSetNULL(pColInfo, numOfRows);
1,702,529✔
1328
      }
1329
    }
1330

1331
    if (pVgroup->replica >= 3) {
901,120✔
1332
      if (isLeaderRestored && hasFollowerRestored) isReady = true;
410,223✔
1333
    } else if (pVgroup->replica == 2) {
490,897✔
1334
      if (leaderState == TAOS_SYNC_STATE_LEADER) {
180,385✔
1335
        if (isLeaderRestored && hasFollowerRestored) isReady = true;
96,369✔
1336
      } else if (leaderState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
84,016✔
1337
        if (isLeaderRestored) isReady = true;
×
1338
      }
1339
    } else {
1340
      if (isLeaderRestored) isReady = true;
310,512✔
1341
    }
1342
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
901,120✔
1343
    COL_DATA_SET_VAL_GOTO((const char *)&isReady, false, pVgroup, pShow->pIter, _OVER);
901,120✔
1344

1345
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
901,120✔
1346
    int64_t cacheUsage = (int64_t)pVgroup->cacheUsage;
901,120✔
1347
    COL_DATA_SET_VAL_GOTO((const char *)&cacheUsage, false, pVgroup, pShow->pIter, _OVER);
901,120✔
1348

1349
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
901,120✔
1350
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->numOfCachedTables, false, pVgroup, pShow->pIter, _OVER);
901,120✔
1351

1352
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
901,120✔
1353
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->isTsma, false, pVgroup, pShow->pIter, _OVER);
901,120✔
1354

1355
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
901,120✔
1356
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->mountVgId, false, pVgroup, pShow->pIter, _OVER);
901,120✔
1357

1358
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
901,120✔
1359
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->keepVersion, false, pVgroup, pShow->pIter, _OVER);
901,120✔
1360

1361
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
901,120✔
1362
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->keepVersionTime, false, pVgroup, pShow->pIter, _OVER);
901,120✔
1363

1364
    numOfRows++;
901,120✔
1365
    sdbRelease(pSdb, pVgroup);
901,120✔
1366
  }
1367
_OVER:
296,871✔
1368
  if (pUser) mndReleaseUser(pMnode, pUser);
296,871✔
1369
  if (pDb != NULL) {
296,871✔
1370
    mndReleaseDb(pMnode, pDb);
255,180✔
1371
  }
1372
  if (code != 0) {
296,871✔
1373
    mError("failed to retrieve vgroup info at line %d since %s", lino, tstrerror(code));
×
1374
    TAOS_RETURN(code);
×
1375
  }
1376

1377
  pShow->numOfRows += numOfRows;
296,871✔
1378
  return numOfRows;
296,871✔
1379
}
1380

1381
static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter) {
×
1382
  SSdb *pSdb = pMnode->pSdb;
×
1383
  sdbCancelFetchByType(pSdb, pIter, SDB_VGROUP);
×
1384
}
×
1385

1386
static bool mndGetVnodesNumFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
12,423,215✔
1387
  SVgObj  *pVgroup = pObj;
12,423,215✔
1388
  int32_t  dnodeId = *(int32_t *)p1;
12,423,215✔
1389
  int32_t *pNumOfVnodes = (int32_t *)p2;
12,423,215✔
1390

1391
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
32,801,084✔
1392
    if (pVgroup->vnodeGid[v].dnodeId == dnodeId) {
20,377,869✔
1393
      (*pNumOfVnodes)++;
8,153,804✔
1394
    }
1395
  }
1396

1397
  return true;
12,423,215✔
1398
}
1399

1400
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) {
4,747,527✔
1401
  int32_t numOfVnodes = 0;
4,747,527✔
1402
  sdbTraverse(pMnode->pSdb, SDB_VGROUP, mndGetVnodesNumFp, &dnodeId, &numOfVnodes, NULL);
4,747,527✔
1403
  return numOfVnodes;
4,747,527✔
1404
}
1405

1406
int64_t mndGetVgroupMemory(SMnode *pMnode, SDbObj *pDbInput, SVgObj *pVgroup) {
10,281,234✔
1407
  SDbObj *pDb = pDbInput;
10,281,234✔
1408
  if (pDbInput == NULL) {
10,281,234✔
1409
    pDb = mndAcquireDb(pMnode, pVgroup->dbName);
5,986,017✔
1410
  }
1411

1412
  int64_t vgroupMemroy = 0;
10,281,234✔
1413
  if (pDb != NULL) {
10,281,234✔
1414
    int64_t buffer = (int64_t)pDb->cfg.buffer * 1024 * 1024;
10,281,234✔
1415
    int64_t cache = (int64_t)pDb->cfg.pages * pDb->cfg.pageSize * 1024;
10,281,234✔
1416
    vgroupMemroy = buffer + cache;
10,281,234✔
1417
    int64_t cacheLast = (int64_t)pDb->cfg.cacheLastSize * 1024 * 1024;
10,281,234✔
1418
    if (pDb->cfg.cacheLast > 0) {
10,281,234✔
1419
      vgroupMemroy += cacheLast;
1,238,329✔
1420
    }
1421
    mDebug("db:%s, vgroup:%d, buffer:%" PRId64 " cache:%" PRId64 " cacheLast:%" PRId64, pDb->name, pVgroup->vgId,
10,281,234✔
1422
           buffer, cache, cacheLast);
1423
  }
1424

1425
  if (pDbInput == NULL) {
10,281,234✔
1426
    mndReleaseDb(pMnode, pDb);
5,986,017✔
1427
  }
1428
  return vgroupMemroy;
10,281,234✔
1429
}
1430

1431
static bool mndGetVnodeMemroyFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
8,126,736✔
1432
  SVgObj  *pVgroup = pObj;
8,126,736✔
1433
  int32_t  dnodeId = *(int32_t *)p1;
8,126,736✔
1434
  int64_t *pVnodeMemory = (int64_t *)p2;
8,126,736✔
1435

1436
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
18,788,788✔
1437
    if (pVgroup->vnodeGid[v].dnodeId == dnodeId) {
10,662,052✔
1438
      *pVnodeMemory += mndGetVgroupMemory(pMnode, NULL, pVgroup);
5,814,827✔
1439
    }
1440
  }
1441

1442
  return true;
8,126,736✔
1443
}
1444

1445
int64_t mndGetVnodesMemory(SMnode *pMnode, int32_t dnodeId) {
2,371,830✔
1446
  int64_t vnodeMemory = 0;
2,371,830✔
1447
  sdbTraverse(pMnode->pSdb, SDB_VGROUP, mndGetVnodeMemroyFp, &dnodeId, &vnodeMemory, NULL);
2,371,830✔
1448
  return vnodeMemory;
2,371,830✔
1449
}
1450

1451
void calculateRstoreFinishTime(double rate, int64_t applyCount, char *restoreStr, size_t restoreStrSize) {
3,142✔
1452
  if (rate == 0) {
3,142✔
1453
    snprintf(restoreStr, restoreStrSize, "0:0:0");
3,142✔
1454
    return;
3,142✔
1455
  }
1456

UNCOV
1457
  int64_t costTime = applyCount / rate;
×
UNCOV
1458
  int64_t totalSeconds = costTime / 1000;
×
UNCOV
1459
  int64_t hours = totalSeconds / 3600;
×
UNCOV
1460
  totalSeconds %= 3600;
×
UNCOV
1461
  int64_t minutes = totalSeconds / 60;
×
UNCOV
1462
  int64_t seconds = totalSeconds % 60;
×
UNCOV
1463
  snprintf(restoreStr, restoreStrSize, "%" PRId64 ":%" PRId64 ":%" PRId64, hours, minutes, seconds);
×
1464
}
1465

1466
static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
84,134✔
1467
  SMnode   *pMnode = pReq->info.node;
84,134✔
1468
  SSdb     *pSdb = pMnode->pSdb;
84,134✔
1469
  int32_t   numOfRows = 0;
84,134✔
1470
  SVgObj   *pVgroup = NULL;
84,134✔
1471
  SDbObj   *pVgDb = NULL;
84,134✔
1472
  int32_t   cols = 0;
84,134✔
1473
  int64_t   curMs = taosGetTimestampMs();
84,134✔
1474
  int32_t   code = 0, lino = 0;
84,134✔
1475
  SUserObj *pUser = NULL;
84,134✔
1476
  SDbObj   *pDb = NULL, *pIterDb = NULL;
84,134✔
1477
  char      objFName[TSDB_OBJ_FNAME_LEN + 1] = {0};
84,134✔
1478
  bool      showAll = false, showIter = false;
84,134✔
1479
  int64_t   dbUid = 0;
84,134✔
1480

1481
  MND_SHOW_CHECK_OBJ_PRIVILEGE_ALL(RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), PRIV_SHOW_VNODES, PRIV_OBJ_DB, 0, _OVER);
84,134✔
1482

1483
  while (numOfRows < rows - TSDB_MAX_REPLICA) {
260,138✔
1484
    pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
260,138✔
1485
    if (pShow->pIter == NULL) break;
260,138✔
1486

1487
    MND_SHOW_CHECK_DB_PRIVILEGE(pDb, pVgroup->dbName, pVgroup, RPC_MSG_TOKEN(pReq), MND_OPER_SHOW_VNODES, _OVER);
176,004✔
1488

1489
    for (int32_t i = 0; i < pVgroup->replica && numOfRows < rows; ++i) {
451,334✔
1490
      SVnodeGid       *pGid = &pVgroup->vnodeGid[i];
275,330✔
1491
      SColumnInfoData *pColInfo = NULL;
275,330✔
1492
      cols = 0;
275,330✔
1493

1494
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
275,330✔
1495
      COL_DATA_SET_VAL_GOTO((const char *)&pGid->dnodeId, false, pVgroup, pShow->pIter, _OVER);
275,330✔
1496
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
275,330✔
1497
      COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->vgId, false, pVgroup, pShow->pIter, _OVER);
275,330✔
1498

1499
      // db_name
1500
      const char *dbname = mndGetDbStr(pVgroup->dbName);
275,330✔
1501
      char        b1[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
275,330✔
1502
      if (dbname != NULL) {
275,330✔
1503
        STR_WITH_MAXSIZE_TO_VARSTR(b1, dbname, TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE);
275,330✔
1504
      } else {
1505
        STR_WITH_MAXSIZE_TO_VARSTR(b1, "NULL", TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE);
×
1506
      }
1507
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
275,330✔
1508
      COL_DATA_SET_VAL_GOTO((const char *)b1, false, pVgroup, pShow->pIter, _OVER);
275,330✔
1509

1510
      // dnode is online?
1511
      SDnodeObj *pDnode = mndAcquireDnode(pMnode, pGid->dnodeId);
275,330✔
1512
      if (pDnode == NULL) {
275,330✔
1513
        mError("failed to acquire dnode. dnodeId:%d", pGid->dnodeId);
×
1514
        break;
×
1515
      }
1516
      bool isDnodeOnline = mndIsDnodeOnline(pDnode, curMs);
275,330✔
1517
      sdbRelease(pSdb, pDnode);
275,330✔
1518

1519
      char       buf[20] = {0};
275,330✔
1520
      ESyncState syncState = (isDnodeOnline) ? pGid->syncState : TAOS_SYNC_STATE_OFFLINE;
275,330✔
1521
      STR_TO_VARSTR(buf, syncStr(syncState));
275,330✔
1522
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
275,330✔
1523
      COL_DATA_SET_VAL_GOTO((const char *)buf, false, pVgroup, pShow->pIter, _OVER);
275,330✔
1524

1525
      int64_t roleTimeMs = (isDnodeOnline) ? pGid->roleTimeMs : 0;
275,330✔
1526
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
275,330✔
1527
      COL_DATA_SET_VAL_GOTO((const char *)&roleTimeMs, false, pVgroup, pShow->pIter, _OVER);
275,330✔
1528

1529
      int64_t startTimeMs = (isDnodeOnline) ? pGid->startTimeMs : 0;
275,330✔
1530
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
275,330✔
1531
      COL_DATA_SET_VAL_GOTO((const char *)&startTimeMs, false, pVgroup, pShow->pIter, _OVER);
275,330✔
1532

1533
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
275,330✔
1534
      COL_DATA_SET_VAL_GOTO((const char *)&pGid->syncRestore, false, pVgroup, pShow->pIter, _OVER);
275,330✔
1535

1536
      int64_t unappliedCount = pGid->syncCommitIndex - pGid->syncAppliedIndex;
275,330✔
1537
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
275,330✔
1538
      char restoreStr[20] = {0};
275,330✔
1539
      if (unappliedCount > 0) {
275,330✔
1540
        calculateRstoreFinishTime(pGid->appliedRate, unappliedCount, restoreStr, sizeof(restoreStr));
3,142✔
1541
      }
1542
      STR_TO_VARSTR(buf, restoreStr);
275,330✔
1543
      COL_DATA_SET_VAL_GOTO((const char *)&buf, false, pVgroup, pShow->pIter, _OVER);
275,330✔
1544

1545
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
275,330✔
1546
      COL_DATA_SET_VAL_GOTO((const char *)&unappliedCount, false, pVgroup, pShow->pIter, _OVER);
275,330✔
1547

1548
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
275,330✔
1549
      COL_DATA_SET_VAL_GOTO((const char *)&pGid->bufferSegmentUsed, false, pVgroup, pShow->pIter, _OVER);
275,330✔
1550

1551
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
275,330✔
1552
      COL_DATA_SET_VAL_GOTO((const char *)&pGid->bufferSegmentSize, false, pVgroup, pShow->pIter, _OVER);
275,330✔
1553

1554
      numOfRows++;
275,330✔
1555
    }
1556
    sdbRelease(pSdb, pVgroup);
176,004✔
1557
  }
1558
_OVER:
84,134✔
1559
  if (pUser) mndReleaseUser(pMnode, pUser);
84,134✔
1560
  if (pDb) mndReleaseDb(pMnode, pDb);
84,134✔
1561
  if (code != 0) {
84,134✔
1562
    mError("failed to retrieve vnode info at line %d since %s", lino, tstrerror(code));
×
1563
    return code;
×
1564
  }
1565
  pShow->numOfRows += numOfRows;
84,134✔
1566
  return numOfRows;
84,134✔
1567
}
1568

1569
static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter) {
×
1570
  SSdb *pSdb = pMnode->pSdb;
×
1571
  sdbCancelFetchByType(pSdb, pIter, SDB_VGROUP);
×
1572
}
×
1573

1574
static int32_t mndAddVnodeToVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray) {
95,553✔
1575
  int32_t code = 0;
95,553✔
1576
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
95,553✔
1577
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
386,092✔
1578
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
290,539✔
1579
    mInfo("trans:%d, dnode:%d, equivalent vnodes:%d others:%d", pTrans->id, pDnode->id, pDnode->numOfVnodes,
290,539✔
1580
          pDnode->numOfOtherNodes);
1581
  }
1582

1583
  SVnodeGid *pVgid = &pVgroup->vnodeGid[pVgroup->replica];
95,553✔
1584
  for (int32_t d = 0; d < taosArrayGetSize(pArray); ++d) {
118,725✔
1585
    SDnodeObj *pDnode = taosArrayGet(pArray, d);
115,809✔
1586

1587
    bool used = false;
115,809✔
1588
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
273,382✔
1589
      if (pDnode->id == pVgroup->vnodeGid[vn].dnodeId) {
180,745✔
1590
        used = true;
23,172✔
1591
        break;
23,172✔
1592
      }
1593
    }
1594
    if (used) continue;
115,809✔
1595

1596
    if (pDnode == NULL) {
92,637✔
1597
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_DNODES);
×
1598
    }
1599
    if (pDnode->numOfVnodes >= pDnode->numOfSupportVnodes) {
92,637✔
1600
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_VNODES);
×
1601
    }
1602

1603
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
92,637✔
1604
    if (pDnode->memAvail - vgMem - pDnode->memUsed <= 0) {
92,637✔
1605
      mError("trans:%d, db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
1606
             pTrans->id, pVgroup->dbName, pVgroup->vgId, vgMem, pDnode->id, pDnode->memAvail, pDnode->memUsed);
1607
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE);
×
1608
    } else {
1609
      pDnode->memUsed += vgMem;
92,637✔
1610
    }
1611

1612
    pVgid->dnodeId = pDnode->id;
92,637✔
1613
    pVgid->syncState = TAOS_SYNC_STATE_OFFLINE;
92,637✔
1614
    mInfo("trans:%id, db:%s, vgId:%d, vn:%d is added, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
92,637✔
1615
          pTrans->id, pVgroup->dbName, pVgroup->vgId, pVgroup->replica, vgMem, pVgid->dnodeId, pDnode->memAvail,
1616
          pDnode->memUsed);
1617

1618
    pVgroup->replica++;
92,637✔
1619
    pDnode->numOfVnodes++;
92,637✔
1620

1621
    SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
92,637✔
1622
    if (pVgRaw == NULL) {
92,637✔
1623
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1624
      if (terrno != 0) code = terrno;
×
1625
      TAOS_RETURN(code);
×
1626
    }
1627
    if ((code = mndTransAppendGroupRedolog(pTrans, pVgRaw, pVgroup->vgId)) != 0) {
92,637✔
1628
      sdbFreeRaw(pVgRaw);
×
1629
      TAOS_RETURN(code);
×
1630
    }
1631
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
92,637✔
1632
    if (code != 0) {
92,637✔
1633
      mError("trans:%d, vgId:%d, failed to set raw status since %s at line:%d", pTrans->id, pVgroup->vgId,
×
1634
             tstrerror(code), __LINE__);
1635
    }
1636
    TAOS_RETURN(code);
92,637✔
1637
  }
1638

1639
  code = TSDB_CODE_MND_NO_ENOUGH_DNODES;
2,916✔
1640
  mError("trans:%d, db:%s, failed to add vnode to vgId:%d since %s", pTrans->id, pVgroup->dbName, pVgroup->vgId,
2,916✔
1641
         tstrerror(code));
1642
  TAOS_RETURN(code);
2,916✔
1643
}
1644

1645
static int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray,
20,036✔
1646
                                        SVnodeGid *pDelVgid) {
1647
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
20,036✔
1648
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
75,095✔
1649
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
55,059✔
1650
    mInfo("trans:%d, dnode:%d, equivalent vnodes:%d others:%d", pTrans->id, pDnode->id, pDnode->numOfVnodes,
55,059✔
1651
          pDnode->numOfOtherNodes);
1652
  }
1653

1654
  int32_t code = -1;
20,036✔
1655
  for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) {
23,322✔
1656
    SDnodeObj *pDnode = taosArrayGet(pArray, d);
21,571✔
1657

1658
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
42,516✔
1659
      SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
39,230✔
1660
      if (pVgid->dnodeId == pDnode->id) {
39,230✔
1661
        int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
18,285✔
1662
        pDnode->memUsed -= vgMem;
18,285✔
1663
        mInfo("trans:%d, db:%s, vgId:%d, vn:%d is removed, memory:%" PRId64 ", dnode:%d avail:%" PRId64
18,285✔
1664
              " used:%" PRId64,
1665
              pTrans->id, pVgroup->dbName, pVgroup->vgId, vn, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
1666
        pDnode->numOfVnodes--;
18,285✔
1667
        pVgroup->replica--;
18,285✔
1668
        *pDelVgid = *pVgid;
18,285✔
1669
        *pVgid = pVgroup->vnodeGid[pVgroup->replica];
18,285✔
1670
        memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
18,285✔
1671
        code = 0;
18,285✔
1672
        goto _OVER;
18,285✔
1673
      }
1674
    }
1675
  }
1676

1677
_OVER:
1,751✔
1678
  if (code != 0) {
20,036✔
1679
    code = TSDB_CODE_APP_ERROR;
1,751✔
1680
    mError("trans:%d, db:%s, failed to remove vnode from vgId:%d since %s", pTrans->id, pVgroup->dbName, pVgroup->vgId,
1,751✔
1681
           tstrerror(code));
1682
    TAOS_RETURN(code);
1,751✔
1683
  }
1684

1685
  for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
50,765✔
1686
    SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
32,480✔
1687
    mInfo("trans:%d, db:%s, vgId:%d, vn:%d dnode:%d is reserved", pTrans->id, pVgroup->dbName, pVgroup->vgId, vn,
32,480✔
1688
          pVgid->dnodeId);
1689
  }
1690

1691
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
18,285✔
1692
  if (pVgRaw == NULL) {
18,285✔
1693
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1694
    if (terrno != 0) code = terrno;
×
1695
    TAOS_RETURN(code);
×
1696
  }
1697
  if (mndTransAppendGroupRedolog(pTrans, pVgRaw, pVgroup->vgId) != 0) {
18,285✔
1698
    sdbFreeRaw(pVgRaw);
×
1699
    TAOS_RETURN(code);
×
1700
  }
1701
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
18,285✔
1702
  if (code != 0) {
18,285✔
1703
    mError("trans:%d, vgId:%d, failed to set raw status since %s at line:%d", pTrans->id, pVgroup->vgId,
×
1704
           tstrerror(code), __LINE__);
1705
  }
1706

1707
  TAOS_RETURN(code);
18,285✔
1708
}
1709

1710
static int32_t mndRemoveVnodeFromVgroupWithoutSave(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray,
×
1711
                                                   SVnodeGid *pDelVgid) {
1712
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
×
1713
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
×
1714
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
×
1715
    mInfo("dnode:%d, equivalent vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes);
×
1716
  }
1717

1718
  int32_t code = -1;
×
1719
  for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) {
×
1720
    SDnodeObj *pDnode = taosArrayGet(pArray, d);
×
1721

1722
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
×
1723
      SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
×
1724
      if (pVgid->dnodeId == pDnode->id) {
×
1725
        int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
×
1726
        pDnode->memUsed -= vgMem;
×
1727
        mInfo("db:%s, vgId:%d, vn:%d is removed, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
×
1728
              pVgroup->dbName, pVgroup->vgId, vn, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
1729
        pDnode->numOfVnodes--;
×
1730
        pVgroup->replica--;
×
1731
        *pDelVgid = *pVgid;
×
1732
        *pVgid = pVgroup->vnodeGid[pVgroup->replica];
×
1733
        memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
×
1734
        code = 0;
×
1735
        goto _OVER;
×
1736
      }
1737
    }
1738
  }
1739

1740
_OVER:
×
1741
  if (code != 0) {
×
1742
    code = TSDB_CODE_APP_ERROR;
×
1743
    mError("db:%s, failed to remove vnode from vgId:%d since %s", pVgroup->dbName, pVgroup->vgId, tstrerror(code));
×
1744
    TAOS_RETURN(code);
×
1745
  }
1746

1747
  for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
×
1748
    SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
×
1749
    mInfo("db:%s, vgId:%d, vn:%d dnode:%d is reserved", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId);
×
1750
  }
1751

1752
  TAOS_RETURN(code);
×
1753
}
1754

1755
int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid) {
3,731,120✔
1756
  int32_t      code = 0;
3,731,120✔
1757
  STransAction action = {0};
3,731,120✔
1758

1759
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
3,731,120✔
1760
  if (pDnode == NULL) return -1;
3,731,120✔
1761
  action.epSet = mndGetDnodeEpset(pDnode);
3,731,120✔
1762
  mndReleaseDnode(pMnode, pDnode);
3,731,120✔
1763

1764
  int32_t contLen = 0;
3,731,120✔
1765
  void   *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
3,731,120✔
1766
  if (pReq == NULL) return -1;
3,731,120✔
1767

1768
  action.pCont = pReq;
3,731,120✔
1769
  action.contLen = contLen;
3,731,120✔
1770
  action.msgType = TDMT_DND_CREATE_VNODE;
3,731,120✔
1771
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
3,731,120✔
1772
  action.groupId = pVgroup->vgId;
3,731,120✔
1773

1774
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
3,731,120✔
1775
    taosMemoryFree(pReq);
×
1776
    TAOS_RETURN(code);
×
1777
  }
1778

1779
  TAOS_RETURN(code);
3,731,120✔
1780
}
1781

1782
int32_t mndRestoreAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
3,455✔
1783
                                       SDnodeObj *pDnode) {
1784
  int32_t      code = 0;
3,455✔
1785
  STransAction action = {0};
3,455✔
1786

1787
  action.epSet = mndGetDnodeEpset(pDnode);
3,455✔
1788

1789
  int32_t contLen = 0;
3,455✔
1790
  void   *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
3,455✔
1791
  if (pReq == NULL) {
3,455✔
1792
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1793
    if (terrno != 0) code = terrno;
×
1794
    TAOS_RETURN(code);
×
1795
  }
1796

1797
  action.pCont = pReq;
3,455✔
1798
  action.contLen = contLen;
3,455✔
1799
  action.msgType = TDMT_DND_CREATE_VNODE;
3,455✔
1800
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
3,455✔
1801
  action.groupId = pVgroup->vgId;
3,455✔
1802

1803
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
3,455✔
1804
    taosMemoryFree(pReq);
×
1805
    TAOS_RETURN(code);
×
1806
  }
1807

1808
  TAOS_RETURN(code);
3,455✔
1809
}
1810

1811
int32_t mndAddAlterVnodeConfirmAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
330,639✔
1812
  int32_t      code = 0;
330,639✔
1813
  STransAction action = {0};
330,639✔
1814
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
330,639✔
1815

1816
  mInfo("trans:%d, vgId:%d, build alter vnode confirm req", pTrans->id, pVgroup->vgId);
330,639✔
1817
  int32_t   contLen = sizeof(SMsgHead);
330,639✔
1818
  SMsgHead *pHead = taosMemoryMalloc(contLen);
330,639✔
1819
  if (pHead == NULL) {
330,639✔
1820
    TAOS_RETURN(terrno);
×
1821
  }
1822

1823
  pHead->contLen = htonl(contLen);
330,639✔
1824
  pHead->vgId = htonl(pVgroup->vgId);
330,639✔
1825

1826
  action.pCont = pHead;
330,639✔
1827
  action.contLen = contLen;
330,639✔
1828
  action.msgType = TDMT_VND_ALTER_CONFIRM;
330,639✔
1829
  // incorrect redirect result will cause this erro
1830
  action.retryCode = TSDB_CODE_VND_INVALID_VGROUP_ID;
330,639✔
1831
  action.groupId = pVgroup->vgId;
330,639✔
1832

1833
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
330,639✔
1834
    taosMemoryFree(pHead);
×
1835
    TAOS_RETURN(code);
×
1836
  }
1837

1838
  TAOS_RETURN(code);
330,639✔
1839
}
1840

1841
int32_t mndAddChangeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pOldVgroup, SVgObj *pNewVgroup,
×
1842
                                 int32_t dnodeId) {
1843
  int32_t      code = 0;
×
1844
  STransAction action = {0};
×
1845
  action.epSet = mndGetVgroupEpset(pMnode, pNewVgroup);
×
1846

1847
  int32_t contLen = 0;
×
1848
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pNewVgroup, dnodeId, &contLen);
×
1849
  if (pReq == NULL) {
×
1850
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1851
    if (terrno != 0) code = terrno;
×
1852
    TAOS_RETURN(code);
×
1853
  }
1854

1855
  int32_t totallen = contLen + sizeof(SMsgHead);
×
1856

1857
  SMsgHead *pHead = taosMemoryMalloc(totallen);
×
1858
  if (pHead == NULL) {
×
1859
    taosMemoryFree(pReq);
×
1860
    TAOS_RETURN(terrno);
×
1861
  }
1862

1863
  pHead->contLen = htonl(totallen);
×
1864
  pHead->vgId = htonl(pNewVgroup->vgId);
×
1865

1866
  memcpy((void *)(pHead + 1), pReq, contLen);
×
1867
  taosMemoryFree(pReq);
×
1868

1869
  action.pCont = pHead;
×
1870
  action.contLen = totallen;
×
1871
  action.msgType = TDMT_SYNC_CONFIG_CHANGE;
×
1872

1873
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
1874
    taosMemoryFree(pHead);
×
1875
    TAOS_RETURN(code);
×
1876
  }
1877

1878
  TAOS_RETURN(code);
×
1879
}
1880

1881
static int32_t mndAddAlterVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, int32_t srcVgId, SVgObj *pVgroup) {
35,366✔
1882
  int32_t      code = 0;
35,366✔
1883
  STransAction action = {0};
35,366✔
1884
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
35,366✔
1885

1886
  int32_t contLen = 0;
35,366✔
1887
  void   *pReq = mndBuildAlterVnodeHashRangeReq(pMnode, srcVgId, pVgroup, &contLen);
35,366✔
1888
  if (pReq == NULL) {
35,366✔
1889
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1890
    if (terrno != 0) code = terrno;
×
1891
    TAOS_RETURN(code);
×
1892
  }
1893

1894
  action.pCont = pReq;
35,366✔
1895
  action.contLen = contLen;
35,366✔
1896
  action.msgType = TDMT_VND_ALTER_HASHRANGE;
35,366✔
1897
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
35,366✔
1898

1899
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
35,366✔
1900
    taosMemoryFree(pReq);
×
1901
    TAOS_RETURN(code);
×
1902
  }
1903

1904
  mInfo("trans:%d, add alter vnode hash range action for from vgId:%d to vgId:%d", pTrans->id, srcVgId, pVgroup->vgId);
35,366✔
1905
  TAOS_RETURN(code);
35,366✔
1906
}
1907

1908
int32_t mndAddAlterVnodeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
228,380✔
1909
  int32_t      code = 0;
228,380✔
1910
  STransAction action = {0};
228,380✔
1911
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
228,380✔
1912

1913
  int32_t contLen = 0;
228,380✔
1914
  void   *pReq = mndBuildAlterVnodeConfigReq(pMnode, pDb, pVgroup, &contLen);
228,380✔
1915
  if (pReq == NULL) {
228,380✔
1916
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1917
    if (terrno != 0) code = terrno;
×
1918
    TAOS_RETURN(code);
×
1919
  }
1920

1921
  action.pCont = pReq;
228,380✔
1922
  action.contLen = contLen;
228,380✔
1923
  action.msgType = TDMT_VND_ALTER_CONFIG;
228,380✔
1924
  action.groupId = pVgroup->vgId;
228,380✔
1925

1926
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
228,380✔
1927
    taosMemoryFree(pReq);
×
1928
    TAOS_RETURN(code);
×
1929
  }
1930

1931
  TAOS_RETURN(code);
228,380✔
1932
}
1933

1934
int32_t mndAddNewVgPrepareAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVg) {
3,282,699✔
1935
  int32_t  code = 0;
3,282,699✔
1936
  SSdbRaw *pRaw = mndVgroupActionEncode(pVg);
3,282,699✔
1937
  if (pRaw == NULL) {
3,282,699✔
1938
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1939
    if (terrno != 0) code = terrno;
×
1940
    goto _err;
×
1941
  }
1942

1943
  TAOS_CHECK_GOTO(mndTransAppendPrepareLog(pTrans, pRaw), NULL, _err);
3,282,699✔
1944
  if (sdbSetRawStatus(pRaw, SDB_STATUS_CREATING) != 0) {
3,282,699✔
1945
    mError("vgId:%d, failed to set raw status at line:%d", pVg->vgId, __LINE__);
×
1946
  }
1947
  if (code != 0) {
3,282,699✔
1948
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVg->vgId, tstrerror(code), __LINE__);
×
1949
    TAOS_RETURN(code);
×
1950
  }
1951
  pRaw = NULL;
3,282,699✔
1952
  TAOS_RETURN(code);
3,282,699✔
1953

1954
_err:
×
1955
  sdbFreeRaw(pRaw);
×
1956
  TAOS_RETURN(code);
×
1957
}
1958

1959
int32_t mndAddAlterVnodeReplicaAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
813,014✔
1960
  int32_t    code = 0;
813,014✔
1961
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
813,014✔
1962
  if (pDnode == NULL) {
813,014✔
1963
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1964
    if (terrno != 0) code = terrno;
×
1965
    TAOS_RETURN(code);
×
1966
  }
1967

1968
  STransAction action = {0};
813,014✔
1969
  action.epSet = mndGetDnodeEpset(pDnode);
813,014✔
1970
  mndReleaseDnode(pMnode, pDnode);
813,014✔
1971

1972
  int32_t contLen = 0;
813,014✔
1973
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
813,014✔
1974
  if (pReq == NULL) {
813,014✔
1975
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1976
    if (terrno != 0) code = terrno;
×
1977
    TAOS_RETURN(code);
×
1978
  }
1979

1980
  action.pCont = pReq;
813,014✔
1981
  action.contLen = contLen;
813,014✔
1982
  action.msgType = TDMT_VND_ALTER_REPLICA;
813,014✔
1983
  action.groupId = pVgroup->vgId;
813,014✔
1984

1985
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
813,014✔
1986
    taosMemoryFree(pReq);
×
1987
    TAOS_RETURN(code);
×
1988
  }
1989

1990
  TAOS_RETURN(code);
813,014✔
1991
}
1992

1993
int32_t mndAddCheckLearnerCatchupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
×
1994
  int32_t    code = 0;
×
1995
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
×
1996
  if (pDnode == NULL) {
×
1997
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1998
    if (terrno != 0) code = terrno;
×
1999
    TAOS_RETURN(code);
×
2000
  }
2001

2002
  STransAction action = {0};
×
2003
  action.epSet = mndGetDnodeEpset(pDnode);
×
2004
  mndReleaseDnode(pMnode, pDnode);
×
2005

2006
  int32_t contLen = 0;
×
2007
  void   *pReq = mndBuildCheckLearnCatchupReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
×
2008
  if (pReq == NULL) {
×
2009
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2010
    if (terrno != 0) code = terrno;
×
2011
    TAOS_RETURN(code);
×
2012
  }
2013

2014
  action.pCont = pReq;
×
2015
  action.contLen = contLen;
×
2016
  action.msgType = TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP;
×
2017
  action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
2018
  action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
×
2019

2020
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
2021
    taosMemoryFree(pReq);
×
2022
    TAOS_RETURN(code);
×
2023
  }
2024

2025
  TAOS_RETURN(code);
×
2026
}
2027

2028
int32_t mndAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
120,207✔
2029
  int32_t    code = 0;
120,207✔
2030
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
120,207✔
2031
  if (pDnode == NULL) {
120,207✔
2032
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2033
    if (terrno != 0) code = terrno;
×
2034
    TAOS_RETURN(code);
×
2035
  }
2036

2037
  STransAction action = {0};
120,207✔
2038
  action.epSet = mndGetDnodeEpset(pDnode);
120,207✔
2039
  mndReleaseDnode(pMnode, pDnode);
120,207✔
2040

2041
  int32_t contLen = 0;
120,207✔
2042
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
120,207✔
2043
  if (pReq == NULL) {
120,207✔
2044
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2045
    if (terrno != 0) code = terrno;
×
2046
    TAOS_RETURN(code);
×
2047
  }
2048

2049
  action.pCont = pReq;
120,207✔
2050
  action.contLen = contLen;
120,207✔
2051
  action.msgType = TDMT_DND_ALTER_VNODE_TYPE;
120,207✔
2052
  action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
120,207✔
2053
  action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
120,207✔
2054
  action.groupId = pVgroup->vgId;
120,207✔
2055

2056
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
120,207✔
2057
    taosMemoryFree(pReq);
×
2058
    TAOS_RETURN(code);
×
2059
  }
2060

2061
  TAOS_RETURN(code);
120,207✔
2062
}
2063

2064
int32_t mndRestoreAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
3,455✔
2065
                                          SDnodeObj *pDnode) {
2066
  int32_t      code = 0;
3,455✔
2067
  STransAction action = {0};
3,455✔
2068
  action.epSet = mndGetDnodeEpset(pDnode);
3,455✔
2069

2070
  int32_t contLen = 0;
3,455✔
2071
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, pDnode->id, &contLen);
3,455✔
2072
  if (pReq == NULL) {
3,455✔
2073
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2074
    if (terrno != 0) code = terrno;
×
2075
    TAOS_RETURN(code);
×
2076
  }
2077

2078
  action.pCont = pReq;
3,455✔
2079
  action.contLen = contLen;
3,455✔
2080
  action.msgType = TDMT_DND_ALTER_VNODE_TYPE;
3,455✔
2081
  action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
3,455✔
2082
  action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
3,455✔
2083
  action.groupId = pVgroup->vgId;
3,455✔
2084

2085
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
3,455✔
2086
    taosMemoryFree(pReq);
×
2087
    TAOS_RETURN(code);
×
2088
  }
2089

2090
  TAOS_RETURN(code);
3,455✔
2091
}
2092

2093
static int32_t mndAddDisableVnodeWriteAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
35,366✔
2094
                                             int32_t dnodeId) {
2095
  int32_t    code = 0;
35,366✔
2096
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
35,366✔
2097
  if (pDnode == NULL) {
35,366✔
2098
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2099
    if (terrno != 0) code = terrno;
×
2100
    TAOS_RETURN(code);
×
2101
  }
2102

2103
  STransAction action = {0};
35,366✔
2104
  action.epSet = mndGetDnodeEpset(pDnode);
35,366✔
2105
  mndReleaseDnode(pMnode, pDnode);
35,366✔
2106

2107
  int32_t contLen = 0;
35,366✔
2108
  void   *pReq = mndBuildDisableVnodeWriteReq(pMnode, pDb, pVgroup->vgId, &contLen);
35,366✔
2109
  if (pReq == NULL) {
35,366✔
2110
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2111
    if (terrno != 0) code = terrno;
×
2112
    TAOS_RETURN(code);
×
2113
  }
2114

2115
  action.pCont = pReq;
35,366✔
2116
  action.contLen = contLen;
35,366✔
2117
  action.msgType = TDMT_VND_DISABLE_WRITE;
35,366✔
2118

2119
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
35,366✔
2120
    taosMemoryFree(pReq);
×
2121
    TAOS_RETURN(code);
×
2122
  }
2123

2124
  TAOS_RETURN(code);
35,366✔
2125
}
2126

2127
int32_t mndAddDropVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid,
5,531,954✔
2128
                              bool isRedo) {
2129
  int32_t      code = 0;
5,531,954✔
2130
  STransAction action = {0};
5,531,954✔
2131

2132
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
5,531,954✔
2133
  if (pDnode == NULL) {
5,531,954✔
2134
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2135
    if (terrno != 0) code = terrno;
×
2136
    TAOS_RETURN(code);
×
2137
  }
2138
  action.epSet = mndGetDnodeEpset(pDnode);
5,531,954✔
2139
  mndReleaseDnode(pMnode, pDnode);
5,531,954✔
2140

2141
  int32_t contLen = 0;
5,531,954✔
2142
  void   *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
5,531,954✔
2143
  if (pReq == NULL) {
5,531,954✔
2144
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2145
    if (terrno != 0) code = terrno;
×
2146
    TAOS_RETURN(code);
×
2147
  }
2148

2149
  action.pCont = pReq;
5,531,954✔
2150
  action.contLen = contLen;
5,531,954✔
2151
  action.msgType = TDMT_DND_DROP_VNODE;
5,531,954✔
2152
  action.acceptableCode = TSDB_CODE_VND_NOT_EXIST;
5,531,954✔
2153
  action.groupId = pVgroup->vgId;
5,531,954✔
2154

2155
  if (isRedo) {
5,531,954✔
2156
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
1,970,821✔
2157
      taosMemoryFree(pReq);
×
2158
      TAOS_RETURN(code);
×
2159
    }
2160
  } else {
2161
    if ((code = mndTransAppendUndoAction(pTrans, &action)) != 0) {
3,561,133✔
2162
      taosMemoryFree(pReq);
×
2163
      TAOS_RETURN(code);
×
2164
    }
2165
  }
2166

2167
  TAOS_RETURN(code);
5,531,954✔
2168
}
2169

2170
int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t vnIndex,
17,058✔
2171
                                    SArray *pArray, bool force, bool unsafe) {
2172
  int32_t code = 0;
17,058✔
2173
  SVgObj  newVg = {0};
17,058✔
2174
  memcpy(&newVg, pVgroup, sizeof(SVgObj));
17,058✔
2175

2176
  mInfo("vgId:%d, trans:%d, vgroup info before move, replica:%d", newVg.vgId, pTrans->id, newVg.replica);
17,058✔
2177
  for (int32_t i = 0; i < newVg.replica; ++i) {
55,074✔
2178
    mInfo("vgId:%d, trans:%d, vnode:%d dnode:%d", newVg.vgId, pTrans->id, i, newVg.vnodeGid[i].dnodeId);
38,016✔
2179
  }
2180

2181
  if (!force) {
17,058✔
2182
#if 1
2183
    {
2184
#else
2185
    if (newVg.replica == 1) {
2186
#endif
2187
      mInfo("vgId:%d, trans:%d, will add 1 vnode, replca:%d", pVgroup->vgId, pTrans->id, newVg.replica);
17,058✔
2188
      TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray));
17,058✔
2189
      for (int32_t i = 0; i < newVg.replica - 1; ++i) {
55,074✔
2190
        TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId));
38,016✔
2191
      }
2192
      TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[newVg.replica - 1]));
17,058✔
2193
      TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg));
17,058✔
2194

2195
      mInfo("vgId:%d, trans:%d, will remove 1 vnode, replca:2", pVgroup->vgId, pTrans->id);
17,058✔
2196
      newVg.replica--;
17,058✔
2197
      SVnodeGid del = newVg.vnodeGid[vnIndex];
17,058✔
2198
      newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
17,058✔
2199
      memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
17,058✔
2200
      {
2201
        SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
17,058✔
2202
        if (pRaw == NULL) {
17,058✔
2203
          code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2204
          if (terrno != 0) code = terrno;
×
2205
          TAOS_RETURN(code);
×
2206
        }
2207
        if ((code = mndTransAppendGroupRedolog(pTrans, pRaw, pVgroup->vgId)) != 0) {
17,058✔
2208
          sdbFreeRaw(pRaw);
×
2209
          TAOS_RETURN(code);
×
2210
        }
2211
        code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
17,058✔
2212
        if (code != 0) {
17,058✔
2213
          mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2214
          return code;
×
2215
        }
2216
      }
2217

2218
      TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true));
17,058✔
2219
      for (int32_t i = 0; i < newVg.replica; ++i) {
55,074✔
2220
        TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId));
38,016✔
2221
      }
2222
      TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg));
17,058✔
2223
#if 1
2224
    }
2225
#else
2226
    } else {  // new replica == 3
2227
      mInfo("vgId:%d, will add 1 vnode, replca:3", pVgroup->vgId);
2228
      if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1;
2229
      mInfo("vgId:%d, will remove 1 vnode, replca:4", pVgroup->vgId);
2230
      newVg.replica--;
2231
      SVnodeGid del = newVg.vnodeGid[vnIndex];
2232
      newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
2233
      memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
2234
      {
2235
        SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
2236
        if (pRaw == NULL) return -1;
2237
        if (mndTransAppendRedolog(pTrans, pRaw) != 0) {
2238
          sdbFreeRaw(pRaw);
2239
          return -1;
2240
        }
2241
      }
2242

2243
      if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true) != 0) return -1;
2244
      for (int32_t i = 0; i < newVg.replica; ++i) {
2245
        if (i == vnIndex) continue;
2246
        if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1;
2247
      }
2248
      if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[vnIndex]) != 0) return -1;
2249
      if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
2250
    }
2251
#endif
2252
  } else {
2253
    mInfo("vgId:%d, will add 1 vnode and force remove 1 vnode", pVgroup->vgId);
×
2254
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray));
×
2255
    newVg.replica--;
×
2256
    // SVnodeGid del = newVg.vnodeGid[vnIndex];
2257
    newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
×
2258
    memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
×
2259
    {
2260
      SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
×
2261
      if (pRaw == NULL) {
×
2262
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2263
        if (terrno != 0) code = terrno;
×
2264
        TAOS_RETURN(code);
×
2265
      }
2266
      if ((code = mndTransAppendGroupRedolog(pTrans, pRaw, pVgroup->vgId)) != 0) {
×
2267
        sdbFreeRaw(pRaw);
×
2268
        TAOS_RETURN(code);
×
2269
      }
2270
      code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
×
2271
      if (code != 0) {
×
2272
        mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2273
        return code;
×
2274
      }
2275
    }
2276

2277
    for (int32_t i = 0; i < newVg.replica; ++i) {
×
2278
      if (i != vnIndex) {
×
2279
        TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId));
×
2280
      }
2281
    }
2282
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[vnIndex]));
×
2283
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg));
×
2284

2285
    if (newVg.replica == 1) {
×
2286
      if (force && !unsafe) {
×
2287
        TAOS_RETURN(TSDB_CODE_VND_META_DATA_UNSAFE_DELETE);
×
2288
      }
2289

2290
      SSdb *pSdb = pMnode->pSdb;
×
2291
      void *pIter = NULL;
×
2292

2293
      while (1) {
×
2294
        SStbObj *pStb = NULL;
×
2295
        pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
×
2296
        if (pIter == NULL) break;
×
2297

2298
        if (strcmp(pStb->db, pDb->name) == 0) {
×
2299
          if ((code = mndSetForceDropCreateStbRedoActions(pMnode, pTrans, &newVg, pStb)) != 0) {
×
2300
            sdbCancelFetch(pSdb, pIter);
×
2301
            sdbRelease(pSdb, pStb);
×
2302
            TAOS_RETURN(code);
×
2303
          }
2304
        }
2305

2306
        sdbRelease(pSdb, pStb);
×
2307
      }
2308

2309
      mInfo("vgId:%d, all data is dropped since replica=1", pVgroup->vgId);
×
2310
    }
2311
  }
2312

2313
  {
2314
    SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
17,058✔
2315
    if (pRaw == NULL) {
17,058✔
2316
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2317
      if (terrno != 0) code = terrno;
×
2318
      TAOS_RETURN(code);
×
2319
    }
2320
    if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
17,058✔
2321
      sdbFreeRaw(pRaw);
×
2322
      TAOS_RETURN(code);
×
2323
    }
2324
    code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
17,058✔
2325
    if (code != 0) {
17,058✔
2326
      mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2327
      return code;
×
2328
    }
2329
  }
2330

2331
  mInfo("vgId:%d, vgroup info after move, replica:%d", newVg.vgId, newVg.replica);
17,058✔
2332
  for (int32_t i = 0; i < newVg.replica; ++i) {
55,074✔
2333
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
38,016✔
2334
  }
2335
  TAOS_RETURN(code);
17,058✔
2336
}
2337

2338
int32_t mndSetMoveVgroupsInfoToTrans(SMnode *pMnode, STrans *pTrans, int32_t delDnodeId, bool force, bool unsafe) {
8,508✔
2339
  int32_t code = 0;
8,508✔
2340
  SArray *pArray = mndBuildDnodesArray(pMnode, delDnodeId, NULL);
8,508✔
2341
  if (pArray == NULL) {
8,508✔
2342
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2343
    if (terrno != 0) code = terrno;
×
2344
    TAOS_RETURN(code);
×
2345
  }
2346

2347
  void *pIter = NULL;
8,508✔
2348
  while (1) {
25,264✔
2349
    SVgObj *pVgroup = NULL;
33,772✔
2350
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
33,772✔
2351
    if (pIter == NULL) break;
33,772✔
2352

2353
    int32_t vnIndex = -1;
25,264✔
2354
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
50,257✔
2355
      if (pVgroup->vnodeGid[i].dnodeId == delDnodeId) {
42,051✔
2356
        vnIndex = i;
17,058✔
2357
        break;
17,058✔
2358
      }
2359
    }
2360

2361
    code = 0;
25,264✔
2362
    if (vnIndex != -1) {
25,264✔
2363
      mInfo("vgId:%d, trans:%d, vnode:%d will be removed from dnode:%d, force:%d", pVgroup->vgId, pTrans->id, vnIndex,
17,058✔
2364
            delDnodeId, force);
2365
      SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
17,058✔
2366
      code = mndSetMoveVgroupInfoToTrans(pMnode, pTrans, pDb, pVgroup, vnIndex, pArray, force, unsafe);
17,058✔
2367
      mndReleaseDb(pMnode, pDb);
17,058✔
2368
    }
2369

2370
    sdbRelease(pMnode->pSdb, pVgroup);
25,264✔
2371

2372
    if (code != 0) {
25,264✔
2373
      sdbCancelFetch(pMnode->pSdb, pIter);
×
2374
      break;
×
2375
    }
2376
  }
2377

2378
  taosArrayDestroy(pArray);
8,508✔
2379
  TAOS_RETURN(code);
8,508✔
2380
}
2381

2382
static int32_t mndAddIncVgroupReplicaToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
77,350✔
2383
                                             int32_t newDnodeId) {
2384
  int32_t code = 0;
77,350✔
2385
  mInfo("vgId:%d, will add 1 vnode, replica:%d dnode:%d", pVgroup->vgId, pVgroup->replica, newDnodeId);
77,350✔
2386

2387
  // assoc dnode
2388
  SVnodeGid *pGid = &pVgroup->vnodeGid[pVgroup->replica];
77,350✔
2389
  pVgroup->replica++;
77,350✔
2390
  pGid->dnodeId = newDnodeId;
77,350✔
2391
  pGid->syncState = TAOS_SYNC_STATE_OFFLINE;
77,350✔
2392
  pGid->nodeRole = TAOS_SYNC_ROLE_LEARNER;
77,350✔
2393

2394
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
77,350✔
2395
  if (pVgRaw == NULL) {
77,350✔
2396
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2397
    if (terrno != 0) code = terrno;
×
2398
    TAOS_RETURN(code);
×
2399
  }
2400
  if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
77,350✔
2401
    sdbFreeRaw(pVgRaw);
×
2402
    TAOS_RETURN(code);
×
2403
  }
2404
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
77,350✔
2405
  if (code != 0) {
77,350✔
2406
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVgroup->vgId, tstrerror(code), __LINE__);
×
2407
    TAOS_RETURN(code);
×
2408
  }
2409

2410
  // learner
2411
  for (int32_t i = 0; i < pVgroup->replica - 1; ++i) {
261,798✔
2412
    TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId));
184,448✔
2413
  }
2414
  TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pGid));
77,350✔
2415

2416
  // voter
2417
  pGid->nodeRole = TAOS_SYNC_ROLE_VOTER;
77,350✔
2418
  TAOS_CHECK_RETURN(mndAddAlterVnodeTypeAction(pMnode, pTrans, pDb, pVgroup, pGid->dnodeId));
77,350✔
2419
  for (int32_t i = 0; i < pVgroup->replica - 1; ++i) {
261,798✔
2420
    TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId));
184,448✔
2421
  }
2422

2423
  // confirm
2424
  TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup));
77,350✔
2425

2426
  TAOS_RETURN(code);
77,350✔
2427
}
2428

2429
static int32_t mndAddDecVgroupReplicaFromTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
77,350✔
2430
                                               int32_t delDnodeId) {
2431
  int32_t code = 0;
77,350✔
2432
  mInfo("vgId:%d, will remove 1 vnode, replica:%d dnode:%d", pVgroup->vgId, pVgroup->replica, delDnodeId);
77,350✔
2433

2434
  SVnodeGid *pGid = NULL;
77,350✔
2435
  SVnodeGid  delGid = {0};
77,350✔
2436
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
133,891✔
2437
    if (pVgroup->vnodeGid[i].dnodeId == delDnodeId) {
133,891✔
2438
      pGid = &pVgroup->vnodeGid[i];
77,350✔
2439
      break;
77,350✔
2440
    }
2441
  }
2442

2443
  if (pGid == NULL) return 0;
77,350✔
2444

2445
  pVgroup->replica--;
77,350✔
2446
  memcpy(&delGid, pGid, sizeof(SVnodeGid));
77,350✔
2447
  memcpy(pGid, &pVgroup->vnodeGid[pVgroup->replica], sizeof(SVnodeGid));
77,350✔
2448
  memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
77,350✔
2449

2450
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
77,350✔
2451
  if (pVgRaw == NULL) {
77,350✔
2452
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2453
    if (terrno != 0) code = terrno;
×
2454
    TAOS_RETURN(code);
×
2455
  }
2456
  if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
77,350✔
2457
    sdbFreeRaw(pVgRaw);
×
2458
    TAOS_RETURN(code);
×
2459
  }
2460
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
77,350✔
2461
  if (code != 0) {
77,350✔
2462
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVgroup->vgId, tstrerror(code), __LINE__);
×
2463
    TAOS_RETURN(code);
×
2464
  }
2465

2466
  TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pDb, pVgroup, &delGid, true));
77,350✔
2467
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
261,798✔
2468
    TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId));
184,448✔
2469
  }
2470
  TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup));
77,350✔
2471

2472
  TAOS_RETURN(code);
77,350✔
2473
}
2474

2475
static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup, SDnodeObj *pNew1,
43,289✔
2476
                                     SDnodeObj *pOld1, SDnodeObj *pNew2, SDnodeObj *pOld2, SDnodeObj *pNew3,
2477
                                     SDnodeObj *pOld3) {
2478
  int32_t code = -1;
43,289✔
2479
  STrans *pTrans = NULL;
43,289✔
2480

2481
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "red-vgroup");
43,289✔
2482
  if (pTrans == NULL) {
43,289✔
2483
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2484
    if (terrno != 0) code = terrno;
×
2485
    goto _OVER;
×
2486
  }
2487

2488
  mndTransSetDbName(pTrans, pVgroup->dbName, NULL);
43,289✔
2489
  TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
43,289✔
2490
  TAOS_CHECK_GOTO(mndTransCheckConflictWithRetention(pMnode, pTrans), NULL, _OVER);
43,184✔
2491

2492
  mndTransSetSerial(pTrans);
43,184✔
2493
  mInfo("trans:%d, used to redistribute vgroup, vgId:%d", pTrans->id, pVgroup->vgId);
43,184✔
2494

2495
  SVgObj newVg = {0};
43,184✔
2496
  memcpy(&newVg, pVgroup, sizeof(SVgObj));
43,184✔
2497
  mInfo("vgId:%d, vgroup info before redistribute, replica:%d", newVg.vgId, newVg.replica);
43,184✔
2498
  for (int32_t i = 0; i < newVg.replica; ++i) {
144,750✔
2499
    mInfo("vgId:%d, vnode:%d dnode:%d role:%s", newVg.vgId, i, newVg.vnodeGid[i].dnodeId,
101,566✔
2500
          syncStr(newVg.vnodeGid[i].syncState));
2501
  }
2502

2503
  if (pNew1 != NULL && pOld1 != NULL) {
43,184✔
2504
    int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew1->id);
43,184✔
2505
    if (numOfVnodes >= pNew1->numOfSupportVnodes) {
43,184✔
2506
      mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew1->id, numOfVnodes,
778✔
2507
             pNew1->numOfSupportVnodes);
2508
      code = TSDB_CODE_MND_NO_ENOUGH_VNODES;
778✔
2509
      goto _OVER;
778✔
2510
    }
2511

2512
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
42,406✔
2513
    if (pNew1->memAvail - vgMem - pNew1->memUsed <= 0) {
42,406✔
2514
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
2515
             pVgroup->dbName, pVgroup->vgId, vgMem, pNew1->id, pNew1->memAvail, pNew1->memUsed);
2516
      code = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
×
2517
      goto _OVER;
×
2518
    } else {
2519
      pNew1->memUsed += vgMem;
42,406✔
2520
    }
2521

2522
    TAOS_CHECK_GOTO(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew1->id), NULL, _OVER);
42,406✔
2523
    TAOS_CHECK_GOTO(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld1->id), NULL, _OVER);
42,406✔
2524
  }
2525

2526
  if (pNew2 != NULL && pOld2 != NULL) {
42,406✔
2527
    int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew2->id);
12,767✔
2528
    if (numOfVnodes >= pNew2->numOfSupportVnodes) {
12,767✔
2529
      mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew2->id, numOfVnodes,
×
2530
             pNew2->numOfSupportVnodes);
2531
      code = TSDB_CODE_MND_NO_ENOUGH_VNODES;
×
2532
      goto _OVER;
×
2533
    }
2534
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
12,767✔
2535
    if (pNew2->memAvail - vgMem - pNew2->memUsed <= 0) {
12,767✔
2536
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
2537
             pVgroup->dbName, pVgroup->vgId, vgMem, pNew2->id, pNew2->memAvail, pNew2->memUsed);
2538
      code = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
×
2539
      goto _OVER;
×
2540
    } else {
2541
      pNew2->memUsed += vgMem;
12,767✔
2542
    }
2543
    TAOS_CHECK_GOTO(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew2->id), NULL, _OVER);
12,767✔
2544
    TAOS_CHECK_GOTO(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld2->id), NULL, _OVER);
12,767✔
2545
  }
2546

2547
  if (pNew3 != NULL && pOld3 != NULL) {
42,406✔
2548
    int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew3->id);
5,095✔
2549
    if (numOfVnodes >= pNew3->numOfSupportVnodes) {
5,095✔
2550
      mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew3->id, numOfVnodes,
×
2551
             pNew3->numOfSupportVnodes);
2552
      code = TSDB_CODE_MND_NO_ENOUGH_VNODES;
×
2553
      goto _OVER;
×
2554
    }
2555
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
5,095✔
2556
    if (pNew3->memAvail - vgMem - pNew3->memUsed <= 0) {
5,095✔
2557
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
2558
             pVgroup->dbName, pVgroup->vgId, vgMem, pNew3->id, pNew3->memAvail, pNew3->memUsed);
2559
      code = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
×
2560
      goto _OVER;
×
2561
    } else {
2562
      pNew3->memUsed += vgMem;
5,095✔
2563
    }
2564
    TAOS_CHECK_GOTO(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew3->id), NULL, _OVER);
5,095✔
2565
    TAOS_CHECK_GOTO(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld3->id), NULL, _OVER);
5,095✔
2566
  }
2567

2568
  {
2569
    SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
42,406✔
2570
    if (pRaw == NULL) {
42,406✔
2571
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2572
      if (terrno != 0) code = terrno;
×
2573
      goto _OVER;
×
2574
    }
2575
    if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
42,406✔
2576
      sdbFreeRaw(pRaw);
×
2577
      goto _OVER;
×
2578
    }
2579
    code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
42,406✔
2580
    if (code != 0) {
42,406✔
2581
      mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2582
      goto _OVER;
×
2583
    }
2584
  }
2585

2586
  mInfo("vgId:%d, vgroup info after redistribute, replica:%d", newVg.vgId, newVg.replica);
42,406✔
2587
  for (int32_t i = 0; i < newVg.replica; ++i) {
141,638✔
2588
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
99,232✔
2589
  }
2590

2591
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
42,406✔
2592
  code = 0;
41,008✔
2593

2594
_OVER:
43,289✔
2595
  mndTransDrop(pTrans);
43,289✔
2596
  mndReleaseDb(pMnode, pDb);
43,289✔
2597
  TAOS_RETURN(code);
43,289✔
2598
}
2599

2600
static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) {
52,647✔
2601
  SMnode    *pMnode = pReq->info.node;
52,647✔
2602
  SDnodeObj *pNew1 = NULL;
52,647✔
2603
  SDnodeObj *pNew2 = NULL;
52,647✔
2604
  SDnodeObj *pNew3 = NULL;
52,647✔
2605
  SDnodeObj *pOld1 = NULL;
52,647✔
2606
  SDnodeObj *pOld2 = NULL;
52,647✔
2607
  SDnodeObj *pOld3 = NULL;
52,647✔
2608
  SVgObj    *pVgroup = NULL;
52,647✔
2609
  SDbObj    *pDb = NULL;
52,647✔
2610
  int32_t    code = -1;
52,647✔
2611
  int64_t    curMs = taosGetTimestampMs();
52,647✔
2612
  int32_t    newDnodeId[3] = {0};
52,647✔
2613
  int32_t    oldDnodeId[3] = {0};
52,647✔
2614
  int32_t    newIndex = -1;
52,647✔
2615
  int32_t    oldIndex = -1;
52,647✔
2616
  int64_t    tss = taosGetTimestampMs();
52,647✔
2617

2618
  SRedistributeVgroupReq req = {0};
52,647✔
2619
  if (tDeserializeSRedistributeVgroupReq(pReq->pCont, pReq->contLen, &req) != 0) {
52,647✔
2620
    code = TSDB_CODE_INVALID_MSG;
×
2621
    goto _OVER;
×
2622
  }
2623

2624
  mInfo("vgId:%d, start to redistribute vgroup to dnode %d:%d:%d", req.vgId, req.dnodeId1, req.dnodeId2, req.dnodeId3);
52,647✔
2625
  if ((code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_REDISTRIBUTE_VGROUP)) != 0) {
52,647✔
2626
    goto _OVER;
×
2627
  }
2628

2629
  pVgroup = mndAcquireVgroup(pMnode, req.vgId);
52,647✔
2630
  if (pVgroup == NULL) {
52,647✔
2631
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
2,334✔
2632
    if (terrno != 0) code = terrno;
2,334✔
2633
    goto _OVER;
2,334✔
2634
  }
2635
  if (pVgroup->mountVgId) {
50,313✔
2636
    code = TSDB_CODE_MND_MOUNT_OBJ_NOT_SUPPORT;
×
2637
    goto _OVER;
×
2638
  }
2639
  pDb = mndAcquireDb(pMnode, pVgroup->dbName);
50,313✔
2640
  if (pDb == NULL) {
50,313✔
2641
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2642
    if (terrno != 0) code = terrno;
×
2643
    goto _OVER;
×
2644
  }
2645

2646
  if (pVgroup->replica == 1) {
50,313✔
2647
    if (req.dnodeId1 <= 0 || req.dnodeId2 > 0 || req.dnodeId3 > 0) {
13,298✔
2648
      code = TSDB_CODE_MND_INVALID_REPLICA;
×
2649
      goto _OVER;
×
2650
    }
2651

2652
    if (req.dnodeId1 == pVgroup->vnodeGid[0].dnodeId) {
13,298✔
2653
      // terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
2654
      code = 0;
×
2655
      goto _OVER;
×
2656
    }
2657

2658
    pNew1 = mndAcquireDnode(pMnode, req.dnodeId1);
13,298✔
2659
    if (pNew1 == NULL) {
13,298✔
2660
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2661
      if (terrno != 0) code = terrno;
×
2662
      goto _OVER;
×
2663
    }
2664
    if (!mndIsDnodeOnline(pNew1, curMs)) {
13,298✔
2665
      code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2666
      goto _OVER;
×
2667
    }
2668

2669
    pOld1 = mndAcquireDnode(pMnode, pVgroup->vnodeGid[0].dnodeId);
13,298✔
2670
    if (pOld1 == NULL) {
13,298✔
2671
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2672
      if (terrno != 0) code = terrno;
×
2673
      goto _OVER;
×
2674
    }
2675
    if (!mndIsDnodeOnline(pOld1, curMs)) {
13,298✔
2676
      code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2677
      goto _OVER;
×
2678
    }
2679

2680
    code = mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, NULL, NULL, NULL, NULL);
13,298✔
2681

2682
  } else if (pVgroup->replica == 3) {
37,015✔
2683
    if (req.dnodeId1 <= 0 || req.dnodeId2 <= 0 || req.dnodeId3 <= 0) {
35,415✔
2684
      code = TSDB_CODE_MND_INVALID_REPLICA;
3,112✔
2685
      goto _OVER;
3,112✔
2686
    }
2687

2688
    if (req.dnodeId1 == req.dnodeId2 || req.dnodeId1 == req.dnodeId3 || req.dnodeId2 == req.dnodeId3) {
32,303✔
2689
      code = TSDB_CODE_MND_INVALID_REPLICA;
778✔
2690
      goto _OVER;
778✔
2691
    }
2692

2693
    if (req.dnodeId1 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId1 != pVgroup->vnodeGid[1].dnodeId &&
31,525✔
2694
        req.dnodeId1 != pVgroup->vnodeGid[2].dnodeId) {
15,796✔
2695
      newDnodeId[++newIndex] = req.dnodeId1;
13,447✔
2696
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
13,447✔
2697
    }
2698

2699
    if (req.dnodeId2 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[1].dnodeId &&
31,525✔
2700
        req.dnodeId2 != pVgroup->vnodeGid[2].dnodeId) {
21,118✔
2701
      newDnodeId[++newIndex] = req.dnodeId2;
15,660✔
2702
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
15,660✔
2703
    }
2704

2705
    if (req.dnodeId3 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId3 != pVgroup->vnodeGid[1].dnodeId &&
31,525✔
2706
        req.dnodeId3 != pVgroup->vnodeGid[2].dnodeId) {
23,708✔
2707
      newDnodeId[++newIndex] = req.dnodeId3;
20,365✔
2708
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
20,365✔
2709
    }
2710

2711
    if (req.dnodeId1 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[0].dnodeId &&
31,525✔
2712
        req.dnodeId3 != pVgroup->vnodeGid[0].dnodeId) {
21,180✔
2713
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[0].dnodeId;
16,511✔
2714
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
16,511✔
2715
    }
2716

2717
    if (req.dnodeId1 != pVgroup->vnodeGid[1].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[1].dnodeId &&
31,525✔
2718
        req.dnodeId3 != pVgroup->vnodeGid[1].dnodeId) {
15,734✔
2719
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[1].dnodeId;
12,586✔
2720
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
12,586✔
2721
    }
2722

2723
    if (req.dnodeId1 != pVgroup->vnodeGid[2].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[2].dnodeId &&
31,525✔
2724
        req.dnodeId3 != pVgroup->vnodeGid[2].dnodeId) {
23,718✔
2725
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[2].dnodeId;
20,375✔
2726
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
20,375✔
2727
    }
2728

2729
    if (newDnodeId[0] != 0) {
31,525✔
2730
      pNew1 = mndAcquireDnode(pMnode, newDnodeId[0]);
30,516✔
2731
      if (pNew1 == NULL) {
30,516✔
2732
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2733
        if (terrno != 0) code = terrno;
×
2734
        goto _OVER;
×
2735
      }
2736
      if (!mndIsDnodeOnline(pNew1, curMs)) {
30,516✔
2737
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
1,112✔
2738
        goto _OVER;
1,112✔
2739
      }
2740
    }
2741

2742
    if (newDnodeId[1] != 0) {
30,413✔
2743
      pNew2 = mndAcquireDnode(pMnode, newDnodeId[1]);
12,180✔
2744
      if (pNew2 == NULL) {
12,180✔
2745
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2746
        if (terrno != 0) code = terrno;
×
2747
        goto _OVER;
×
2748
      }
2749
      if (!mndIsDnodeOnline(pNew2, curMs)) {
12,180✔
2750
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2751
        goto _OVER;
×
2752
      }
2753
    }
2754

2755
    if (newDnodeId[2] != 0) {
30,413✔
2756
      pNew3 = mndAcquireDnode(pMnode, newDnodeId[2]);
6,108✔
2757
      if (pNew3 == NULL) {
6,108✔
2758
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2759
        if (terrno != 0) code = terrno;
×
2760
        goto _OVER;
×
2761
      }
2762
      if (!mndIsDnodeOnline(pNew3, curMs)) {
6,108✔
2763
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2764
        goto _OVER;
×
2765
      }
2766
    }
2767

2768
    if (oldDnodeId[0] != 0) {
30,413✔
2769
      pOld1 = mndAcquireDnode(pMnode, oldDnodeId[0]);
29,404✔
2770
      if (pOld1 == NULL) {
29,404✔
2771
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2772
        if (terrno != 0) code = terrno;
×
2773
        goto _OVER;
×
2774
      }
2775
      if (!mndIsDnodeOnline(pOld1, curMs)) {
29,404✔
2776
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
1,013✔
2777
        goto _OVER;
1,013✔
2778
      }
2779
    }
2780

2781
    if (oldDnodeId[1] != 0) {
29,400✔
2782
      pOld2 = mndAcquireDnode(pMnode, oldDnodeId[1]);
11,167✔
2783
      if (pOld2 == NULL) {
11,167✔
2784
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2785
        if (terrno != 0) code = terrno;
×
2786
        goto _OVER;
×
2787
      }
2788
      if (!mndIsDnodeOnline(pOld2, curMs)) {
11,167✔
2789
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2790
        goto _OVER;
×
2791
      }
2792
    }
2793

2794
    if (oldDnodeId[2] != 0) {
29,400✔
2795
      pOld3 = mndAcquireDnode(pMnode, oldDnodeId[2]);
5,095✔
2796
      if (pOld3 == NULL) {
5,095✔
2797
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2798
        if (terrno != 0) code = terrno;
×
2799
        goto _OVER;
×
2800
      }
2801
      if (!mndIsDnodeOnline(pOld3, curMs)) {
5,095✔
2802
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2803
        goto _OVER;
×
2804
      }
2805
    }
2806

2807
    if (pNew1 == NULL && pOld1 == NULL && pNew2 == NULL && pOld2 == NULL && pNew3 == NULL && pOld3 == NULL) {
29,400✔
2808
      // terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
2809
      code = 0;
1,009✔
2810
      goto _OVER;
1,009✔
2811
    }
2812

2813
    code = mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, pNew2, pOld2, pNew3, pOld3);
28,391✔
2814

2815
  } else if (pVgroup->replica == 2) {
1,600✔
2816
    if (req.dnodeId1 <= 0 || req.dnodeId2 <= 0) {
1,600✔
2817
      code = TSDB_CODE_MND_INVALID_REPLICA;
×
2818
      goto _OVER;
×
2819
    }
2820

2821
    if (req.dnodeId1 == req.dnodeId2) {
1,600✔
2822
      code = TSDB_CODE_MND_INVALID_REPLICA;
×
2823
      goto _OVER;
×
2824
    }
2825

2826
    if (req.dnodeId1 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId1 != pVgroup->vnodeGid[1].dnodeId) {
1,600✔
2827
      newDnodeId[++newIndex] = req.dnodeId1;
1,600✔
2828
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
1,600✔
2829
    }
2830

2831
    if (req.dnodeId2 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[1].dnodeId) {
1,600✔
2832
      newDnodeId[++newIndex] = req.dnodeId2;
1,600✔
2833
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
1,600✔
2834
    }
2835

2836
    if (req.dnodeId1 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[0].dnodeId) {
1,600✔
2837
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[0].dnodeId;
1,600✔
2838
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
1,600✔
2839
    }
2840

2841
    if (req.dnodeId1 != pVgroup->vnodeGid[1].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[1].dnodeId) {
1,600✔
2842
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[1].dnodeId;
1,600✔
2843
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
1,600✔
2844
    }
2845

2846
    if (newDnodeId[0] != 0) {
1,600✔
2847
      pNew1 = mndAcquireDnode(pMnode, newDnodeId[0]);
1,600✔
2848
      if (pNew1 == NULL) {
1,600✔
2849
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2850
        if (terrno != 0) code = terrno;
×
2851
        goto _OVER;
×
2852
      }
2853
      if (!mndIsDnodeOnline(pNew1, curMs)) {
1,600✔
2854
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2855
        goto _OVER;
×
2856
      }
2857
    }
2858

2859
    if (newDnodeId[1] != 0) {
1,600✔
2860
      pNew2 = mndAcquireDnode(pMnode, newDnodeId[1]);
1,600✔
2861
      if (pNew2 == NULL) {
1,600✔
2862
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2863
        if (terrno != 0) code = terrno;
×
2864
        goto _OVER;
×
2865
      }
2866
      if (!mndIsDnodeOnline(pNew2, curMs)) {
1,600✔
2867
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2868
        goto _OVER;
×
2869
      }
2870
    }
2871

2872
    if (oldDnodeId[0] != 0) {
1,600✔
2873
      pOld1 = mndAcquireDnode(pMnode, oldDnodeId[0]);
1,600✔
2874
      if (pOld1 == NULL) {
1,600✔
2875
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2876
        if (terrno != 0) code = terrno;
×
2877
        goto _OVER;
×
2878
      }
2879
      if (!mndIsDnodeOnline(pOld1, curMs)) {
1,600✔
2880
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2881
        goto _OVER;
×
2882
      }
2883
    }
2884

2885
    if (oldDnodeId[1] != 0) {
1,600✔
2886
      pOld2 = mndAcquireDnode(pMnode, oldDnodeId[1]);
1,600✔
2887
      if (pOld2 == NULL) {
1,600✔
2888
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2889
        if (terrno != 0) code = terrno;
×
2890
        goto _OVER;
×
2891
      }
2892
      if (!mndIsDnodeOnline(pOld2, curMs)) {
1,600✔
2893
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2894
        goto _OVER;
×
2895
      }
2896
    }
2897

2898
    if (pNew1 == NULL && pOld1 == NULL && pNew2 == NULL && pOld2 == NULL) {
1,600✔
2899
      // terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
2900
      code = 0;
×
2901
      goto _OVER;
×
2902
    }
2903

2904
    code = mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, pNew2, pOld2, NULL, NULL);
1,600✔
2905
  } else {
2906
    code = TSDB_CODE_MND_REQ_REJECTED;
×
2907
    goto _OVER;
×
2908
  }
2909

2910
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
43,289✔
2911

2912
  if (tsAuditLevel >= AUDIT_LEVEL_CLUSTER) {
43,289✔
2913
    char obj[33] = {0};
43,289✔
2914
    (void)snprintf(obj, sizeof(obj), "%d", req.vgId);
43,289✔
2915

2916
    int64_t tse = taosGetTimestampMs();
43,289✔
2917
    double  duration = (double)(tse - tss);
43,289✔
2918
    duration = duration / 1000;
43,289✔
2919
    auditRecord(pReq, pMnode->clusterId, "RedistributeVgroup", "", obj, req.sql, req.sqlLen, duration, 0);
43,289✔
2920
  }
2921
_OVER:
52,647✔
2922
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
52,647✔
2923
    mError("vgId:%d, failed to redistribute to dnode %d:%d:%d since %s", req.vgId, req.dnodeId1, req.dnodeId2,
10,630✔
2924
           req.dnodeId3, tstrerror(code));
2925
  }
2926

2927
  mndReleaseDnode(pMnode, pNew1);
52,647✔
2928
  mndReleaseDnode(pMnode, pNew2);
52,647✔
2929
  mndReleaseDnode(pMnode, pNew3);
52,647✔
2930
  mndReleaseDnode(pMnode, pOld1);
52,647✔
2931
  mndReleaseDnode(pMnode, pOld2);
52,647✔
2932
  mndReleaseDnode(pMnode, pOld3);
52,647✔
2933
  mndReleaseVgroup(pMnode, pVgroup);
52,647✔
2934
  mndReleaseDb(pMnode, pDb);
52,647✔
2935
  tFreeSRedistributeVgroupReq(&req);
52,647✔
2936

2937
  TAOS_RETURN(code);
52,647✔
2938
}
2939

2940
static void *mndBuildSForceBecomeFollowerReq(SMnode *pMnode, SVgObj *pVgroup, int32_t dnodeId, int32_t *pContLen) {
5,015✔
2941
  SForceBecomeFollowerReq balanceReq = {
5,015✔
2942
      .vgId = pVgroup->vgId,
5,015✔
2943
  };
2944

2945
  int32_t contLen = tSerializeSForceBecomeFollowerReq(NULL, 0, &balanceReq);
5,015✔
2946
  if (contLen < 0) {
5,015✔
2947
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2948
    return NULL;
×
2949
  }
2950
  contLen += sizeof(SMsgHead);
5,015✔
2951

2952
  void *pReq = taosMemoryMalloc(contLen);
5,015✔
2953
  if (pReq == NULL) {
5,015✔
2954
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2955
    return NULL;
×
2956
  }
2957

2958
  SMsgHead *pHead = pReq;
5,015✔
2959
  pHead->contLen = htonl(contLen);
5,015✔
2960
  pHead->vgId = htonl(pVgroup->vgId);
5,015✔
2961

2962
  if (tSerializeSForceBecomeFollowerReq((char *)pReq + sizeof(SMsgHead), contLen, &balanceReq) < 0) {
5,015✔
2963
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2964
    taosMemoryFree(pReq);
×
2965
    return NULL;
×
2966
  }
2967
  *pContLen = contLen;
5,015✔
2968
  return pReq;
5,015✔
2969
}
2970

2971
int32_t mndAddBalanceVgroupLeaderAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t dnodeId) {
5,015✔
2972
  int32_t    code = 0;
5,015✔
2973
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
5,015✔
2974
  if (pDnode == NULL) {
5,015✔
2975
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2976
    if (terrno != 0) code = terrno;
×
2977
    TAOS_RETURN(code);
×
2978
  }
2979

2980
  STransAction action = {0};
5,015✔
2981
  action.epSet = mndGetDnodeEpset(pDnode);
5,015✔
2982
  mndReleaseDnode(pMnode, pDnode);
5,015✔
2983

2984
  int32_t contLen = 0;
5,015✔
2985
  void   *pReq = mndBuildSForceBecomeFollowerReq(pMnode, pVgroup, dnodeId, &contLen);
5,015✔
2986
  if (pReq == NULL) {
5,015✔
2987
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2988
    if (terrno != 0) code = terrno;
×
2989
    TAOS_RETURN(code);
×
2990
  }
2991

2992
  action.pCont = pReq;
5,015✔
2993
  action.contLen = contLen;
5,015✔
2994
  action.msgType = TDMT_SYNC_FORCE_FOLLOWER;
5,015✔
2995

2996
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
5,015✔
2997
    taosMemoryFree(pReq);
×
2998
    TAOS_RETURN(code);
×
2999
  }
3000

3001
  TAOS_RETURN(code);
5,015✔
3002
}
3003

3004
static void *mndBuildAlterVnodeElectBaselineReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId,
30,090✔
3005
                                          int32_t *pContLen, int32_t ms) {
3006
  SAlterVnodeElectBaselineReq alterReq = {
30,090✔
3007
      .vgId = pVgroup->vgId,
30,090✔
3008
      .electBaseLine = ms,
3009
  };
3010

3011
  int32_t contLen = tSerializeSAlterVnodeReplicaReq(NULL, 0, &alterReq);
30,090✔
3012
  if (contLen < 0) {
30,090✔
3013
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
3014
    return NULL;
×
3015
  }
3016

3017
  void *pReq = taosMemoryMalloc(contLen);
30,090✔
3018
  if (pReq == NULL) {
30,090✔
3019
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
3020
    return NULL;
×
3021
  }
3022

3023
  if (tSerializeSAlterVnodeReplicaReq(pReq, contLen, &alterReq) < 0) {
30,090✔
3024
    mError("vgId:%d, failed to serialize alter vnode req,since %s", alterReq.vgId, terrstr());
×
3025
    taosMemoryFree(pReq);
×
3026
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
3027
    return NULL;
×
3028
  }
3029
  *pContLen = contLen;
30,090✔
3030
  return pReq;
30,090✔
3031
}
3032

3033
static int32_t mndAddAlterVnodeElectionBaselineActionToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId, int32_t ms) {
30,090✔
3034
  int32_t    code = 0;
30,090✔
3035
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
30,090✔
3036
  if (pDnode == NULL) {
30,090✔
3037
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3038
    if (terrno != 0) code = terrno;
×
3039
    TAOS_RETURN(code);
×
3040
  }
3041

3042
  STransAction action = {0};
30,090✔
3043
  action.epSet = mndGetDnodeEpset(pDnode);
30,090✔
3044
  mndReleaseDnode(pMnode, pDnode);
30,090✔
3045

3046
  int32_t contLen = 0;
30,090✔
3047
  void   *pReq = mndBuildAlterVnodeElectBaselineReq(pMnode, pDb, pVgroup, dnodeId, &contLen, ms);
30,090✔
3048
  if (pReq == NULL) {
30,090✔
3049
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3050
    if (terrno != 0) code = terrno;
×
3051
    TAOS_RETURN(code);
×
3052
  }
3053

3054
  action.pCont = pReq;
30,090✔
3055
  action.contLen = contLen;
30,090✔
3056
  action.msgType = TDMT_VND_ALTER_ELECTBASELINE;
30,090✔
3057
  action.groupId = pVgroup->vgId;
30,090✔
3058

3059
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
30,090✔
3060
    taosMemoryFree(pReq);
×
3061
    TAOS_RETURN(code);
×
3062
  }
3063

3064
  TAOS_RETURN(code);
30,090✔
3065
}
3066

3067
static int32_t mndAddAlterVgroupElectionBaselineActionToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTrans, int32_t index){
10,030✔
3068
  int32_t code = 0;
10,030✔
3069
  SSdb   *pSdb = pMnode->pSdb;
10,030✔
3070

3071
  int32_t vgid = pVgroup->vgId;
10,030✔
3072
  int8_t  replica = pVgroup->replica;
10,030✔
3073

3074
  if (pVgroup->replica <= 1) {
10,030✔
3075
    mInfo("trans:%d, vgid:%d no need to balance, replica:%d", pTrans->id, vgid, replica);
×
3076
    return -1;
×
3077
  }
3078

3079
  for(int32_t i = 0; i < 3; i++){
40,120✔
3080
    if(i == index%3){
30,090✔
3081
      mInfo("trans:%d, balance leader to dnode:%d", pTrans->id, pVgroup->vnodeGid[i].dnodeId);
5,015✔
3082
      TAOS_CHECK_RETURN(mndAddAlterVnodeElectionBaselineActionToTrans(pMnode, pTrans, NULL, pVgroup,
5,015✔
3083
                                                                      pVgroup->vnodeGid[i].dnodeId, 1500));
3084
    }
3085
    else{
3086
    TAOS_CHECK_RETURN(
25,075✔
3087
        mndAddAlterVnodeElectionBaselineActionToTrans(pMnode, pTrans, NULL, pVgroup, pVgroup->vnodeGid[i].dnodeId, 5000));
3088
    }
3089
  }
3090
  return code; 
10,030✔
3091
}
3092

3093
int32_t mndAddVgroupBalanceToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTrans, int32_t index) {
6,634✔
3094
  int32_t code = 0;
6,634✔
3095
  SSdb   *pSdb = pMnode->pSdb;
6,634✔
3096

3097
  int32_t vgid = pVgroup->vgId;
6,634✔
3098
  int8_t  replica = pVgroup->replica;
6,634✔
3099

3100
  if (pVgroup->replica <= 1) {
6,634✔
3101
    mInfo("trans:%d, vgid:%d no need to balance, replica:%d", pTrans->id, vgid, replica);
670✔
3102
    return -1;
670✔
3103
  }
3104

3105
  int32_t dnodeId = 0;
5,964✔
3106

3107
  for (int i = 0; i < replica; i++) {
15,948✔
3108
    if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEADER) {
14,999✔
3109
      dnodeId = pVgroup->vnodeGid[i].dnodeId;
5,015✔
3110
      break;
5,015✔
3111
    }
3112
  }
3113

3114
  bool       exist = false;
5,964✔
3115
  bool       online = false;
5,964✔
3116
  int64_t    curMs = taosGetTimestampMs();
5,964✔
3117
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
5,964✔
3118
  if (pDnode != NULL) {
5,964✔
3119
    exist = true;
5,015✔
3120
    online = mndIsDnodeOnline(pDnode, curMs);
5,015✔
3121
    mndReleaseDnode(pMnode, pDnode);
5,015✔
3122
  }
3123

3124
  if (exist && online) {
10,979✔
3125
    mInfo("trans:%d, vgid:%d force drop leader from dnode:%d", pTrans->id, vgid, dnodeId);    
5,015✔
3126
    TAOS_CHECK_RETURN(mndAddAlterVgroupElectionBaselineActionToTrans(pMnode, pVgroup, pTrans, index));
5,015✔
3127

3128
    if ((code = mndAddBalanceVgroupLeaderAction(pMnode, pTrans, pVgroup, dnodeId)) != 0) {
5,015✔
3129
      mError("trans:%d, vgid:%d failed to be balanced to dnode:%d", pTrans->id, vgid, dnodeId);
×
3130
      TAOS_RETURN(code);
×
3131
    }
3132

3133
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, NULL, pVgroup));
5,015✔
3134

3135
    TAOS_CHECK_RETURN(mndAddAlterVgroupElectionBaselineActionToTrans(pMnode, pVgroup, pTrans, -1));
5,015✔
3136

3137
    SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
5,015✔
3138
    if (pDb == NULL) {
5,015✔
3139
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3140
      if (terrno != 0) code = terrno;
×
3141
      mError("trans:%d, vgid:%d failed to be balanced to dnode:%d, because db not exist", pTrans->id, vgid, dnodeId);
×
3142
      TAOS_RETURN(code);
×
3143
    }
3144

3145
    mndReleaseDb(pMnode, pDb);
5,015✔
3146
  } else {
3147
    mInfo("trans:%d, vgid:%d cant be balanced to dnode:%d, exist:%d, online:%d", pTrans->id, vgid, dnodeId, exist,
949✔
3148
          online);
3149
  }
3150

3151
  TAOS_RETURN(code);
5,964✔
3152
}
3153

3154
extern int32_t mndProcessVgroupBalanceLeaderMsgImp(SRpcMsg *pReq);
3155

3156
int32_t mndProcessVgroupBalanceLeaderMsg(SRpcMsg *pReq) { return mndProcessVgroupBalanceLeaderMsgImp(pReq); }
2,849✔
3157

3158
#ifndef TD_ENTERPRISE
3159
int32_t mndProcessVgroupBalanceLeaderMsgImp(SRpcMsg *pReq) { return 0; }
3160
#endif
3161

3162
static int32_t mndCheckDnodeMemory(SMnode *pMnode, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pOldVgroup,
228,380✔
3163
                                   SVgObj *pNewVgroup, SArray *pArray) {
3164
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
693,384✔
3165
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
465,004✔
3166
    bool       inVgroup = false;
465,004✔
3167
    int64_t    oldMemUsed = 0;
465,004✔
3168
    int64_t    newMemUsed = 0;
465,004✔
3169
    mDebug("db:%s, vgId:%d, check dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName, pNewVgroup->vgId,
465,004✔
3170
           pDnode->id, pDnode->memAvail, pDnode->memUsed);
3171
    for (int32_t j = 0; j < pOldVgroup->replica; ++j) {
1,343,732✔
3172
      SVnodeGid *pVgId = &pOldVgroup->vnodeGid[j];
878,728✔
3173
      if (pDnode->id == pVgId->dnodeId) {
878,728✔
3174
        oldMemUsed = mndGetVgroupMemory(pMnode, pOldDb, pOldVgroup);
366,288✔
3175
        inVgroup = true;
366,288✔
3176
      }
3177
    }
3178
    for (int32_t j = 0; j < pNewVgroup->replica; ++j) {
1,343,732✔
3179
      SVnodeGid *pVgId = &pNewVgroup->vnodeGid[j];
878,728✔
3180
      if (pDnode->id == pVgId->dnodeId) {
878,728✔
3181
        newMemUsed = mndGetVgroupMemory(pMnode, pNewDb, pNewVgroup);
366,288✔
3182
        inVgroup = true;
366,288✔
3183
      }
3184
    }
3185

3186
    mDebug("db:%s, vgId:%d, memory in dnode:%d, oldUsed:%" PRId64 ", newUsed:%" PRId64, pNewVgroup->dbName,
465,004✔
3187
           pNewVgroup->vgId, pDnode->id, oldMemUsed, newMemUsed);
3188

3189
    pDnode->memUsed = pDnode->memUsed - oldMemUsed + newMemUsed;
465,004✔
3190
    if (pDnode->memAvail - pDnode->memUsed <= 0) {
465,004✔
3191
      mError("db:%s, vgId:%d, no enough memory in dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName,
×
3192
             pNewVgroup->vgId, pDnode->id, pDnode->memAvail, pDnode->memUsed);
3193
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE);
×
3194
    } else if (inVgroup) {
465,004✔
3195
      mInfo("db:%s, vgId:%d, memory in dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName, pNewVgroup->vgId,
366,288✔
3196
            pDnode->id, pDnode->memAvail, pDnode->memUsed);
3197
    } else {
3198
    }
3199
  }
3200
  return 0;
228,380✔
3201
}
3202

3203
int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
269,352✔
3204
                                  SArray *pArray, SVgObj *pNewVgroup) {
3205
  int32_t code = 0;
269,352✔
3206
  memcpy(pNewVgroup, pVgroup, sizeof(SVgObj));
269,352✔
3207

3208
  if (pVgroup->replica <= 0 || pVgroup->replica == pNewDb->cfg.replications) {
269,352✔
3209
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfigAction(pMnode, pTrans, pNewDb, pVgroup));
228,380✔
3210
    TAOS_CHECK_RETURN(mndCheckDnodeMemory(pMnode, pOldDb, pNewDb, pNewVgroup, pVgroup, pArray));
228,380✔
3211
    return 0;
228,380✔
3212
  }
3213

3214
  // mndTransSetGroupParallel(pTrans);
3215

3216
  if (pNewDb->cfg.replications == 3) {
40,972✔
3217
    mInfo("trans:%d, db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pTrans->id, pVgroup->dbName, pVgroup->vgId,
35,282✔
3218
          pVgroup->vnodeGid[0].dnodeId);
3219

3220
    // add second
3221
    if (pNewVgroup->replica == 1) {
35,282✔
3222
      TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
35,282✔
3223
    }
3224

3225
    // learner stage
3226
    pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
34,479✔
3227
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
34,479✔
3228
    TAOS_CHECK_RETURN(
34,479✔
3229
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3230

3231
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &pNewVgroup->vnodeGid[1]));
34,479✔
3232

3233
    // follower stage
3234
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
34,479✔
3235
    TAOS_CHECK_RETURN(mndAddAlterVnodeTypeAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
34,479✔
3236
    TAOS_CHECK_RETURN(
34,479✔
3237
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3238

3239
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
34,479✔
3240

3241
    // add third
3242
    if (pNewVgroup->replica == 2) {
34,479✔
3243
      TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
34,479✔
3244
    }
3245

3246
    pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
32,722✔
3247
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
32,722✔
3248
    pNewVgroup->vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
32,722✔
3249
    TAOS_CHECK_RETURN(
32,722✔
3250
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3251
    TAOS_CHECK_RETURN(
32,722✔
3252
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
3253
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &pNewVgroup->vnodeGid[2]));
32,722✔
3254

3255
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
32,722✔
3256
  } else if (pNewDb->cfg.replications == 1) {
5,690✔
3257
    mInfo("trans:%d, db:%s, vgId:%d, will remove 2 vnodes, vn:0 dnode:%d vn:1 dnode:%d vn:2 dnode:%d", pTrans->id,
4,090✔
3258
          pVgroup->dbName, pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId, pVgroup->vnodeGid[1].dnodeId,
3259
          pVgroup->vnodeGid[2].dnodeId);
3260

3261
    SVnodeGid del1 = {0};
4,090✔
3262
    SVnodeGid del2 = {0};
4,090✔
3263
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroup(pMnode, pTrans, pNewVgroup, pArray, &del1));
4,090✔
3264
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &del1, true));
4,090✔
3265
    TAOS_CHECK_RETURN(
4,090✔
3266
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3267
    TAOS_CHECK_RETURN(
4,090✔
3268
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
3269
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
4,090✔
3270

3271
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroup(pMnode, pTrans, pNewVgroup, pArray, &del2));
4,090✔
3272
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &del2, true));
4,090✔
3273
    TAOS_CHECK_RETURN(
4,090✔
3274
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3275
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
4,090✔
3276
  } else if (pNewDb->cfg.replications == 2) {
1,600✔
3277
    mInfo("trans:%d, db:%s, vgId:%d, will add 1 vnode, vn:0 dnode:%d", pTrans->id, pVgroup->dbName, pVgroup->vgId,
1,600✔
3278
          pVgroup->vnodeGid[0].dnodeId);
3279

3280
    // add second
3281
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
1,600✔
3282

3283
    // learner stage
3284
    pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
1,600✔
3285
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
1,600✔
3286
    TAOS_CHECK_RETURN(
1,600✔
3287
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3288

3289
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &pNewVgroup->vnodeGid[1]));
1,600✔
3290

3291
    // follower stage
3292
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
1,600✔
3293
    TAOS_CHECK_RETURN(mndAddAlterVnodeTypeAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
1,600✔
3294
    TAOS_CHECK_RETURN(
1,600✔
3295
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3296

3297
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
1,600✔
3298
  } else {
3299
    return -1;
×
3300
  }
3301

3302
  mndSortVnodeGid(pNewVgroup);
38,412✔
3303

3304
  {
3305
    SSdbRaw *pVgRaw = mndVgroupActionEncode(pNewVgroup);
38,412✔
3306
    if (pVgRaw == NULL) {
38,412✔
3307
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3308
      if (terrno != 0) code = terrno;
×
3309
      TAOS_RETURN(code);
×
3310
    }
3311
    if ((code = mndTransAppendCommitlog(pTrans, pVgRaw)) != 0) {
38,412✔
3312
      sdbFreeRaw(pVgRaw);
×
3313
      TAOS_RETURN(code);
×
3314
    }
3315
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
38,412✔
3316
    if (code != 0) {
38,412✔
3317
      mError("vgId:%d, failed to set raw status since %s at line:%d", pNewVgroup->vgId, tstrerror(code), __LINE__);
×
3318
      TAOS_RETURN(code);
×
3319
    }
3320
  }
3321

3322
  TAOS_RETURN(code);
38,412✔
3323
}
3324

3325
int32_t mndBuildRaftAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
×
3326
                                      SArray *pArray) {
3327
  int32_t code = 0;
×
3328
  SVgObj  newVgroup = {0};
×
3329
  memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
×
3330

3331
  if (pVgroup->replica <= 0 || pVgroup->replica == pNewDb->cfg.replications) {
×
3332
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfigAction(pMnode, pTrans, pNewDb, pVgroup));
×
3333
    TAOS_CHECK_RETURN(mndCheckDnodeMemory(pMnode, pOldDb, pNewDb, &newVgroup, pVgroup, pArray));
×
3334
    return 0;
×
3335
  }
3336

3337
  mndTransSetSerial(pTrans);
×
3338

3339
  mInfo("trans:%d, vgId:%d, alter vgroup, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id, pVgroup->vgId,
×
3340
        pVgroup->syncConfChangeVer, pVgroup->version, pVgroup->replica);
3341

3342
  if (newVgroup.replica == 1 && pNewDb->cfg.replications == 3) {
×
3343
    mInfo("db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId,
×
3344
          pVgroup->vnodeGid[0].dnodeId);
3345

3346
    // add second
3347
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVgroup, pArray));
×
3348
    // add third
3349
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVgroup, pArray));
×
3350

3351
    // add learner stage
3352
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3353
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3354
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3355
    TAOS_CHECK_RETURN(
×
3356
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3357
    mInfo("trans:%d, vgId:%d, add change config, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id,
×
3358
          pVgroup->vgId, newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
3359
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[1]));
×
3360
    mInfo("trans:%d, vgId:%d, create vnode, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id, pVgroup->vgId,
×
3361
          newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
3362
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[2]));
×
3363
    mInfo("trans:%d, vgId:%d, create vnode, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id, pVgroup->vgId,
×
3364
          newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
3365

3366
    // check learner
3367
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3368
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3369
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3370
    TAOS_CHECK_RETURN(
×
3371
        mndAddCheckLearnerCatchupAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[1].dnodeId));
3372
    TAOS_CHECK_RETURN(
×
3373
        mndAddCheckLearnerCatchupAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[2].dnodeId));
3374

3375
    // change raft type
3376
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3377
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3378
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3379
    TAOS_CHECK_RETURN(
×
3380
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3381

3382
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3383

3384
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3385
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3386
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3387
    TAOS_CHECK_RETURN(
×
3388
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3389

3390
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3391

3392
    SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
×
3393
    if (pVgRaw == NULL) {
×
3394
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3395
      if (terrno != 0) code = terrno;
×
3396
      TAOS_RETURN(code);
×
3397
    }
3398
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
3399
      sdbFreeRaw(pVgRaw);
×
3400
      TAOS_RETURN(code);
×
3401
    }
3402
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3403
    if (code != 0) {
×
3404
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3405
             __LINE__);
3406
      TAOS_RETURN(code);
×
3407
    }
3408
  } else if (newVgroup.replica == 3 && pNewDb->cfg.replications == 1) {
×
3409
    mInfo("db:%s, vgId:%d, will remove 2 vnodes, vn:0 dnode:%d vn:1 dnode:%d vn:2 dnode:%d", pVgroup->dbName,
×
3410
          pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId, pVgroup->vnodeGid[1].dnodeId, pVgroup->vnodeGid[2].dnodeId);
3411

3412
    SVnodeGid del1 = {0};
×
3413
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroupWithoutSave(pMnode, pTrans, &newVgroup, pArray, &del1));
×
3414

3415
    TAOS_CHECK_RETURN(
×
3416
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3417

3418
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3419

3420
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del1, true));
×
3421

3422
    SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
×
3423
    if (pVgRaw == NULL) {
×
3424
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3425
      if (terrno != 0) code = terrno;
×
3426
      TAOS_RETURN(code);
×
3427
    }
3428
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
3429
      sdbFreeRaw(pVgRaw);
×
3430
      TAOS_RETURN(code);
×
3431
    }
3432
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3433
    if (code != 0) {
×
3434
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3435
             __LINE__);
3436
      TAOS_RETURN(code);
×
3437
    }
3438

3439
    SVnodeGid del2 = {0};
×
3440
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroupWithoutSave(pMnode, pTrans, &newVgroup, pArray, &del2));
×
3441

3442
    TAOS_CHECK_RETURN(
×
3443
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3444

3445
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3446

3447
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del2, true));
×
3448

3449
    pVgRaw = mndVgroupActionEncode(&newVgroup);
×
3450
    if (pVgRaw == NULL) {
×
3451
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3452
      if (terrno != 0) code = terrno;
×
3453
      TAOS_RETURN(code);
×
3454
    }
3455
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
3456
      sdbFreeRaw(pVgRaw);
×
3457
      TAOS_RETURN(code);
×
3458
    }
3459
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3460
    if (code != 0) {
×
3461
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3462
             __LINE__);
3463
      TAOS_RETURN(code);
×
3464
    }
3465
  } else {
3466
    return -1;
×
3467
  }
3468

3469
  mndSortVnodeGid(&newVgroup);
×
3470

3471
  {
3472
    SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
×
3473
    if (pVgRaw == NULL) {
×
3474
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3475
      if (terrno != 0) code = terrno;
×
3476
      TAOS_RETURN(code);
×
3477
    }
3478
    if ((code = mndTransAppendCommitlog(pTrans, pVgRaw)) != 0) {
×
3479
      sdbFreeRaw(pVgRaw);
×
3480
      TAOS_RETURN(code);
×
3481
    }
3482
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3483
    if (code != 0) {
×
3484
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3485
             __LINE__);
3486
      TAOS_RETURN(code);
×
3487
    }
3488
  }
3489

3490
  TAOS_RETURN(code);
×
3491
}
3492

3493
int32_t mndBuildRestoreAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *db, SVgObj *pVgroup, SDnodeObj *pDnode,
3,455✔
3494
                                         SDnodeObj *pAnotherDnode) {
3495
  int32_t code = 0;
3,455✔
3496
  SVgObj  newVgroup = {0};
3,455✔
3497
  memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
3,455✔
3498

3499
  mInfo("trans:%d, db:%s, vgId:%d, restore vnodes, vn:0 dnode:%d", pTrans->id, pVgroup->dbName, pVgroup->vgId,
3,455✔
3500
        pVgroup->vnodeGid[0].dnodeId);
3501

3502
  if (newVgroup.replica == 1) {
3,455✔
3503
    int selected = 0;
×
3504
    for (int i = 0; i < newVgroup.replica; i++) {
×
3505
      newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3506
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3507
        selected = i;
×
3508
      }
3509
    }
3510
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, &newVgroup.vnodeGid[selected]));
×
3511
  } else if (newVgroup.replica == 2) {
3,455✔
3512
    for (int i = 0; i < newVgroup.replica; i++) {
×
3513
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3514
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3515
      } else {
3516
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3517
      }
3518
    }
3519
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pAnotherDnode));
×
3520

3521
    for (int i = 0; i < newVgroup.replica; i++) {
×
3522
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3523
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3524
      } else {
3525
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3526
      }
3527
    }
3528
    TAOS_CHECK_RETURN(mndRestoreAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, pDnode));
×
3529

3530
    for (int i = 0; i < newVgroup.replica; i++) {
×
3531
      newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3532
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3533
      }
3534
    }
3535
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pDnode));
×
3536
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pAnotherDnode));
×
3537
  } else if (newVgroup.replica == 3) {
3,455✔
3538
    for (int i = 0; i < newVgroup.replica; i++) {
13,820✔
3539
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
10,365✔
3540
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_LEARNER;
3,455✔
3541
      } else {
3542
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
6,910✔
3543
      }
3544
    }
3545
    TAOS_CHECK_RETURN(mndRestoreAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, pDnode));
3,455✔
3546

3547
    for (int i = 0; i < newVgroup.replica; i++) {
13,820✔
3548
      newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
10,365✔
3549
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
10,365✔
3550
      }
3551
    }
3552
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pDnode));
3,455✔
3553
  }
3554
  SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
3,455✔
3555
  if (pVgRaw == NULL) {
3,455✔
3556
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3557
    if (terrno != 0) code = terrno;
×
3558
    TAOS_RETURN(code);
×
3559
  }
3560
  if ((code = mndTransAppendCommitlog(pTrans, pVgRaw)) != 0) {
3,455✔
3561
    sdbFreeRaw(pVgRaw);
×
3562
    TAOS_RETURN(code);
×
3563
  }
3564
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
3,455✔
3565
  if (code != 0) {
3,455✔
3566
    mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code), __LINE__);
×
3567
    TAOS_RETURN(code);
×
3568
  }
3569

3570
  TAOS_RETURN(code);
3,455✔
3571
}
3572

3573
static int32_t mndAddAdjustVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
×
3574
  return 0;
×
3575
}
3576

3577
typedef int32_t (*FpTransActionCb)(STrans *pTrans, SSdbRaw *pRaw);
3578

3579
static int32_t mndAddVgStatusAction(STrans *pTrans, SVgObj *pVg, ESdbStatus vgStatus, ETrnStage stage) {
82,531✔
3580
  int32_t         code = 0;
82,531✔
3581
  FpTransActionCb appendActionCb = (stage == TRN_STAGE_COMMIT_ACTION) ? mndTransAppendCommitlog : mndTransAppendRedolog;
82,531✔
3582
  SSdbRaw        *pRaw = mndVgroupActionEncode(pVg);
82,531✔
3583
  if (pRaw == NULL) {
82,531✔
3584
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3585
    if (terrno != 0) code = terrno;
×
3586
    goto _err;
×
3587
  }
3588
  if ((code = appendActionCb(pTrans, pRaw)) != 0) goto _err;
82,531✔
3589
  code = sdbSetRawStatus(pRaw, vgStatus);
82,531✔
3590
  if (code != 0) {
82,531✔
3591
    mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", pVg->vgId, tstrerror(code), __LINE__);
×
3592
    goto _err;
×
3593
  }
3594
  pRaw = NULL;
82,531✔
3595
  TAOS_RETURN(code);
82,531✔
3596
_err:
×
3597
  sdbFreeRaw(pRaw);
×
3598
  TAOS_RETURN(code);
×
3599
}
3600

3601
static int32_t mndAddDbStatusAction(STrans *pTrans, SDbObj *pDb, ESdbStatus dbStatus, ETrnStage stage) {
33,609✔
3602
  int32_t         code = 0;
33,609✔
3603
  FpTransActionCb appendActionCb = (stage == TRN_STAGE_COMMIT_ACTION) ? mndTransAppendCommitlog : mndTransAppendRedolog;
33,609✔
3604
  SSdbRaw        *pRaw = mndDbActionEncode(pDb);
33,609✔
3605
  if (pRaw == NULL) {
33,609✔
3606
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3607
    if (terrno != 0) code = terrno;
×
3608
    goto _err;
×
3609
  }
3610
  if ((code = appendActionCb(pTrans, pRaw)) != 0) goto _err;
33,609✔
3611
  code = sdbSetRawStatus(pRaw, dbStatus);
33,609✔
3612
  if (code != 0) {
33,609✔
3613
    mError("db:%s, failed to set raw status to ready, error:%s, line:%d", pDb->name, tstrerror(code), __LINE__);
×
3614
    goto _err;
×
3615
  }
3616
  pRaw = NULL;
33,609✔
3617
  TAOS_RETURN(code);
33,609✔
3618
_err:
×
3619
  sdbFreeRaw(pRaw);
×
3620
  TAOS_RETURN(code);
×
3621
}
3622

3623
int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) {
19,895✔
3624
  int32_t code = -1;
19,895✔
3625
  STrans *pTrans = NULL;
19,895✔
3626
  SDbObj  dbObj = {0};
19,895✔
3627
  SArray *pArray = mndBuildDnodesArray(pMnode, 0, NULL);
19,895✔
3628

3629
#if defined(USE_SHARED_STORAGE)
3630
  if (tsSsEnabled) {
19,895✔
3631
    code = TSDB_CODE_OPS_NOT_SUPPORT;
×
3632
    mError("vgId:%d, db:%s, shared storage exists, split vgroup not allowed", pVgroup->vgId, pVgroup->dbName);
×
3633
    goto _OVER;
×
3634
  }
3635
#endif
3636

3637
  /*
3638
    if (pDb->cfg.withArbitrator) {
3639
      code = TSDB_CODE_OPS_NOT_SUPPORT;
3640
      mError("vgId:%d, db:%s, with arbitrator, split vgroup not allowed", pVgroup->vgId, pVgroup->dbName);
3641
      goto _OVER;
3642
    }
3643
  */
3644

3645
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "split-vgroup");
19,895✔
3646
  if (pTrans == NULL) {
19,895✔
3647
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3648
    if (terrno != 0) code = terrno;
×
3649
    goto _OVER;
×
3650
  }
3651
  mndTransSetSerial(pTrans);
19,895✔
3652
  mInfo("trans:%d, used to split vgroup, vgId:%d", pTrans->id, pVgroup->vgId);
19,895✔
3653

3654
  mndTransSetDbName(pTrans, pDb->name, NULL);
19,895✔
3655
  TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
19,895✔
3656
  TAOS_CHECK_GOTO(mndTransCheckConflictWithRetention(pMnode, pTrans), NULL, _OVER);
19,790✔
3657

3658
  SVgObj newVg1 = {0};
19,790✔
3659
  memcpy(&newVg1, pVgroup, sizeof(SVgObj));
19,790✔
3660
  mInfo("vgId:%d, vgroup info before split, replica:%d hashBegin:%u hashEnd:%u", newVg1.vgId, newVg1.replica,
19,790✔
3661
        newVg1.hashBegin, newVg1.hashEnd);
3662
  for (int32_t i = 0; i < newVg1.replica; ++i) {
64,092✔
3663
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg1.vgId, i, newVg1.vnodeGid[i].dnodeId);
44,302✔
3664
  }
3665

3666
  if (newVg1.replica == 1) {
19,790✔
3667
    TAOS_CHECK_GOTO(mndAddVnodeToVgroup(pMnode, pTrans, &newVg1, pArray), NULL, _OVER);
7,134✔
3668

3669
    newVg1.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
6,778✔
3670
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId), NULL,
6,778✔
3671
                    _OVER);
3672
    TAOS_CHECK_GOTO(mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg1, &newVg1.vnodeGid[1]), NULL, _OVER);
6,778✔
3673

3674
    newVg1.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
6,778✔
3675
    TAOS_CHECK_GOTO(mndAddAlterVnodeTypeAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[1].dnodeId), NULL, _OVER);
6,778✔
3676
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId), NULL,
6,778✔
3677
                    _OVER);
3678

3679
    TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1), NULL, _OVER);
6,778✔
3680
  } else if (newVg1.replica == 3) {
12,656✔
3681
    SVnodeGid del1 = {0};
11,856✔
3682
    TAOS_CHECK_GOTO(mndRemoveVnodeFromVgroup(pMnode, pTrans, &newVg1, pArray, &del1), NULL, _OVER);
11,856✔
3683
    TAOS_CHECK_GOTO(mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg1, &del1, true), NULL, _OVER);
10,105✔
3684
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId), NULL,
10,105✔
3685
                    _OVER);
3686
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[1].dnodeId), NULL,
10,105✔
3687
                    _OVER);
3688
  } else {
3689
    // goto _OVER;
3690
  }
3691

3692
  for (int32_t i = 0; i < newVg1.replica; ++i) {
53,049✔
3693
    TAOS_CHECK_GOTO(mndAddDisableVnodeWriteAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[i].dnodeId), NULL,
35,366✔
3694
                    _OVER);
3695
  }
3696
  TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1), NULL, _OVER);
17,683✔
3697

3698
  SVgObj newVg2 = {0};
17,683✔
3699
  memcpy(&newVg2, &newVg1, sizeof(SVgObj));
17,683✔
3700
  newVg1.replica = 1;
17,683✔
3701
  newVg1.hashEnd = newVg1.hashBegin / 2 + newVg1.hashEnd / 2;
17,683✔
3702
  memset(&newVg1.vnodeGid[1], 0, sizeof(SVnodeGid));
17,683✔
3703

3704
  newVg2.replica = 1;
17,683✔
3705
  newVg2.hashBegin = newVg1.hashEnd + 1;
17,683✔
3706
  memcpy(&newVg2.vnodeGid[0], &newVg2.vnodeGid[1], sizeof(SVnodeGid));
17,683✔
3707
  memset(&newVg2.vnodeGid[1], 0, sizeof(SVnodeGid));
17,683✔
3708

3709
  mInfo("vgId:%d, vgroup info after split, replica:%d hashrange:[%u, %u] vnode:0 dnode:%d", newVg1.vgId, newVg1.replica,
17,683✔
3710
        newVg1.hashBegin, newVg1.hashEnd, newVg1.vnodeGid[0].dnodeId);
3711
  for (int32_t i = 0; i < newVg1.replica; ++i) {
35,366✔
3712
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg1.vgId, i, newVg1.vnodeGid[i].dnodeId);
17,683✔
3713
  }
3714
  mInfo("vgId:%d, vgroup info after split, replica:%d hashrange:[%u, %u] vnode:0 dnode:%d", newVg2.vgId, newVg2.replica,
17,683✔
3715
        newVg2.hashBegin, newVg2.hashEnd, newVg2.vnodeGid[0].dnodeId);
3716
  for (int32_t i = 0; i < newVg1.replica; ++i) {
35,366✔
3717
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg2.vgId, i, newVg2.vnodeGid[i].dnodeId);
17,683✔
3718
  }
3719

3720
  // alter vgId and hash range
3721
  int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
17,683✔
3722
  int32_t srcVgId = newVg1.vgId;
17,683✔
3723
  newVg1.vgId = maxVgId;
17,683✔
3724
  TAOS_CHECK_GOTO(mndAddNewVgPrepareAction(pMnode, pTrans, &newVg1), NULL, _OVER);
17,683✔
3725
  TAOS_CHECK_GOTO(mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg1), NULL, _OVER);
17,683✔
3726

3727
  maxVgId++;
17,683✔
3728
  srcVgId = newVg2.vgId;
17,683✔
3729
  newVg2.vgId = maxVgId;
17,683✔
3730
  TAOS_CHECK_GOTO(mndAddNewVgPrepareAction(pMnode, pTrans, &newVg2), NULL, _OVER);
17,683✔
3731
  TAOS_CHECK_GOTO(mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg2), NULL, _OVER);
17,683✔
3732

3733
  TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1), NULL, _OVER);
17,683✔
3734
  TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg2), NULL, _OVER);
17,683✔
3735

3736
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg1, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION), NULL, _OVER);
17,683✔
3737
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg2, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION), NULL, _OVER);
17,683✔
3738
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, pVgroup, SDB_STATUS_DROPPED, TRN_STAGE_REDO_ACTION), NULL, _OVER);
17,683✔
3739

3740
  // update db status
3741
  memcpy(&dbObj, pDb, sizeof(SDbObj));
17,683✔
3742
  if (dbObj.cfg.pRetensions != NULL) {
17,683✔
3743
    dbObj.cfg.pRetensions = taosArrayDup(pDb->cfg.pRetensions, NULL);
×
3744
    if (dbObj.cfg.pRetensions == NULL) {
×
3745
      code = terrno;
×
3746
      goto _OVER;
×
3747
    }
3748
  }
3749
  dbObj.vgVersion++;
17,683✔
3750
  dbObj.updateTime = taosGetTimestampMs();
17,683✔
3751
  dbObj.cfg.numOfVgroups++;
17,683✔
3752
  TAOS_CHECK_GOTO(mndAddDbStatusAction(pTrans, &dbObj, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION), NULL, _OVER);
17,683✔
3753

3754
  // adjust vgroup replica
3755
  if (pDb->cfg.replications != newVg1.replica) {
17,683✔
3756
    SVgObj tmpGroup = {0};
10,905✔
3757
    TAOS_CHECK_GOTO(mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg1, pArray, &tmpGroup), NULL, _OVER);
10,905✔
3758
  } else {
3759
    TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg1, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
6,778✔
3760
  }
3761

3762
  if (pDb->cfg.replications != newVg2.replica) {
16,284✔
3763
    SVgObj tmpGroup = {0};
9,506✔
3764
    TAOS_CHECK_GOTO(mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg2, pArray, &tmpGroup), NULL, _OVER);
9,506✔
3765
  } else {
3766
    TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg2, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
6,778✔
3767
  }
3768

3769
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, pVgroup, SDB_STATUS_DROPPED, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
15,926✔
3770

3771
  // commit db status
3772
  dbObj.vgVersion++;
15,926✔
3773
  dbObj.updateTime = taosGetTimestampMs();
15,926✔
3774
  TAOS_CHECK_GOTO(mndAddDbStatusAction(pTrans, &dbObj, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
15,926✔
3775

3776
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
15,926✔
3777
  code = 0;
15,926✔
3778

3779
_OVER:
19,895✔
3780
  taosArrayDestroy(pArray);
19,895✔
3781
  mndTransDrop(pTrans);
19,895✔
3782
  taosArrayDestroy(dbObj.cfg.pRetensions);
19,895✔
3783
  TAOS_RETURN(code);
19,895✔
3784
}
3785

3786
extern int32_t mndProcessSplitVgroupMsgImp(SRpcMsg *pReq);
3787

3788
static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) { return mndProcessSplitVgroupMsgImp(pReq); }
20,405✔
3789

3790
#ifndef TD_ENTERPRISE
3791
int32_t mndProcessSplitVgroupMsgImp(SRpcMsg *pReq) { return 0; }
3792
#endif
3793

3794
static int32_t mndSetBalanceVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
17,082✔
3795
                                              SDnodeObj *pSrc, SDnodeObj *pDst) {
3796
  int32_t code = 0;
17,082✔
3797
  SVgObj  newVg = {0};
17,082✔
3798
  memcpy(&newVg, pVgroup, sizeof(SVgObj));
17,082✔
3799
  mInfo("vgId:%d, vgroup info before balance, replica:%d", newVg.vgId, newVg.replica);
17,082✔
3800
  for (int32_t i = 0; i < newVg.replica; ++i) {
50,312✔
3801
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
33,230✔
3802
  }
3803

3804
  TAOS_CHECK_RETURN(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pDst->id));
17,082✔
3805
  TAOS_CHECK_RETURN(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pSrc->id));
17,082✔
3806

3807
  {
3808
    SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
17,082✔
3809
    if (pRaw == NULL) {
17,082✔
3810
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3811
      if (terrno != 0) code = terrno;
×
3812
      TAOS_RETURN(code);
×
3813
    }
3814
    if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
17,082✔
3815
      sdbFreeRaw(pRaw);
×
3816
      TAOS_RETURN(code);
×
3817
    }
3818
    code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
17,082✔
3819
    if (code != 0) {
17,082✔
3820
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
3821
      TAOS_RETURN(code);
×
3822
    }
3823
  }
3824

3825
  mInfo("vgId:%d, vgroup info after balance, replica:%d", newVg.vgId, newVg.replica);
17,082✔
3826
  for (int32_t i = 0; i < newVg.replica; ++i) {
50,312✔
3827
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
33,230✔
3828
  }
3829
  TAOS_RETURN(code);
17,082✔
3830
}
3831

3832
static int32_t mndBalanceVgroupBetweenDnode(SMnode *pMnode, STrans *pTrans, SDnodeObj *pSrc, SDnodeObj *pDst,
17,082✔
3833
                                            SHashObj *pBalancedVgroups) {
3834
  void   *pIter = NULL;
17,082✔
3835
  int32_t code = -1;
17,082✔
3836
  SSdb   *pSdb = pMnode->pSdb;
17,082✔
3837

3838
  while (1) {
10,527✔
3839
    SVgObj *pVgroup = NULL;
27,609✔
3840
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
27,609✔
3841
    if (pIter == NULL) break;
27,609✔
3842
    if (taosHashGet(pBalancedVgroups, &pVgroup->vgId, sizeof(int32_t)) != NULL) {
27,609✔
3843
      sdbRelease(pSdb, pVgroup);
9,706✔
3844
      continue;
9,706✔
3845
    }
3846

3847
    bool existInSrc = false;
17,903✔
3848
    bool existInDst = false;
17,903✔
3849
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
51,954✔
3850
      SVnodeGid *pGid = &pVgroup->vnodeGid[i];
34,051✔
3851
      if (pGid->dnodeId == pSrc->id) existInSrc = true;
34,051✔
3852
      if (pGid->dnodeId == pDst->id) existInDst = true;
34,051✔
3853
    }
3854

3855
    if (!existInSrc || existInDst) {
17,903✔
3856
      sdbRelease(pSdb, pVgroup);
821✔
3857
      continue;
821✔
3858
    }
3859

3860
    SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
17,082✔
3861
    if (pDb == NULL) {
17,082✔
3862
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3863
      if (terrno != 0) code = terrno;
×
3864
      mError("vgId:%d, balance vgroup can't find db obj dbName:%s", pVgroup->vgId, pVgroup->dbName);
×
3865
      goto _OUT;
×
3866
    }
3867

3868
    if (pDb->cfg.withArbitrator) {
17,082✔
3869
      mInfo("vgId:%d, db:%s, with arbitrator, balance vgroup not allowed", pVgroup->vgId, pVgroup->dbName);
×
3870
      goto _OUT;
×
3871
    }
3872

3873
    code = mndSetBalanceVgroupInfoToTrans(pMnode, pTrans, pDb, pVgroup, pSrc, pDst);
17,082✔
3874
    if (code == 0) {
17,082✔
3875
      code = taosHashPut(pBalancedVgroups, &pVgroup->vgId, sizeof(int32_t), &pVgroup->vgId, sizeof(int32_t));
17,082✔
3876
    }
3877

3878
  _OUT:
17,082✔
3879
    mndReleaseDb(pMnode, pDb);
17,082✔
3880
    sdbRelease(pSdb, pVgroup);
17,082✔
3881
    sdbCancelFetch(pSdb, pIter);
17,082✔
3882
    break;
17,082✔
3883
  }
3884

3885
  return code;
17,082✔
3886
}
3887

3888
static int32_t mndBalanceVgroup(SMnode *pMnode, SRpcMsg *pReq, SArray *pArray) {
11,772✔
3889
  int32_t   code = -1;
11,772✔
3890
  int32_t   numOfVgroups = 0;
11,772✔
3891
  STrans   *pTrans = NULL;
11,772✔
3892
  SHashObj *pBalancedVgroups = NULL;
11,772✔
3893

3894
  pBalancedVgroups = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
11,772✔
3895
  if (pBalancedVgroups == NULL) goto _OVER;
11,772✔
3896

3897
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "balance-vgroup");
11,772✔
3898
  if (pTrans == NULL) {
11,772✔
3899
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3900
    if (terrno != 0) code = terrno;
×
3901
    goto _OVER;
×
3902
  }
3903
  mndTransSetSerial(pTrans);
11,772✔
3904
  mInfo("trans:%d, used to balance vgroup", pTrans->id);
11,772✔
3905
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
11,772✔
3906
  TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
10,722✔
3907
  TAOS_CHECK_GOTO(mndTransCheckConflictWithRetention(pMnode, pTrans), NULL, _OVER);
10,617✔
3908

3909
  while (1) {
17,082✔
3910
    taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
27,699✔
3911
    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
119,587✔
3912
      SDnodeObj *pDnode = taosArrayGet(pArray, i);
91,888✔
3913
      mInfo("dnode:%d, equivalent vnodes:%d others:%d support:%d, score:%f", pDnode->id, pDnode->numOfVnodes,
91,888✔
3914
            pDnode->numOfSupportVnodes, pDnode->numOfOtherNodes, mndGetDnodeScore(pDnode, 0, 1));
3915
    }
3916

3917
    SDnodeObj *pSrc = taosArrayGet(pArray, taosArrayGetSize(pArray) - 1);
27,699✔
3918
    SDnodeObj *pDst = taosArrayGet(pArray, 0);
27,699✔
3919

3920
    float srcScore = mndGetDnodeScore(pSrc, -1, 1);
27,699✔
3921
    float dstScore = mndGetDnodeScore(pDst, 1, 1);
27,699✔
3922
    mInfo("trans:%d, after balance, src dnode:%d score:%f, dst dnode:%d score:%f", pTrans->id, pSrc->id, dstScore,
27,699✔
3923
          pDst->id, dstScore);
3924

3925
    if (srcScore > dstScore - 0.000001) {
27,699✔
3926
      code = mndBalanceVgroupBetweenDnode(pMnode, pTrans, pSrc, pDst, pBalancedVgroups);
17,082✔
3927
      if (code == 0) {
17,082✔
3928
        pSrc->numOfVnodes--;
17,082✔
3929
        pDst->numOfVnodes++;
17,082✔
3930
        numOfVgroups++;
17,082✔
3931
        continue;
17,082✔
3932
      } else {
3933
        mInfo("trans:%d, no vgroup need to balance from dnode:%d to dnode:%d", pTrans->id, pSrc->id, pDst->id);
×
3934
        break;
×
3935
      }
3936
    } else {
3937
      mInfo("trans:%d, no vgroup need to balance any more", pTrans->id);
10,617✔
3938
      break;
10,617✔
3939
    }
3940
  }
3941

3942
  if (numOfVgroups <= 0) {
10,617✔
3943
    mInfo("no need to balance vgroup");
×
3944
    code = 0;
×
3945
  } else {
3946
    mInfo("start to balance vgroup, numOfVgroups:%d", numOfVgroups);
10,617✔
3947
    if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
10,617✔
3948
    code = TSDB_CODE_ACTION_IN_PROGRESS;
10,617✔
3949
  }
3950

3951
_OVER:
11,772✔
3952
  taosHashCleanup(pBalancedVgroups);
11,772✔
3953
  mndTransDrop(pTrans);
11,772✔
3954
  TAOS_RETURN(code);
11,772✔
3955
}
3956

3957
static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) {
13,424✔
3958
  SMnode *pMnode = pReq->info.node;
13,424✔
3959
  int32_t code = -1;
13,424✔
3960
  SArray *pArray = NULL;
13,424✔
3961
  void   *pIter = NULL;
13,424✔
3962
  int64_t curMs = taosGetTimestampMs();
13,424✔
3963
  int64_t tss = taosGetTimestampMs();
13,424✔
3964

3965
  SBalanceVgroupReq req = {0};
13,424✔
3966
  if (tDeserializeSBalanceVgroupReq(pReq->pCont, pReq->contLen, &req) != 0) {
13,424✔
3967
    code = TSDB_CODE_INVALID_MSG;
×
3968
    goto _OVER;
×
3969
  }
3970

3971
  mInfo("start to balance vgroup");
13,424✔
3972
  if ((code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_BALANCE_VGROUP)) != 0) {
13,424✔
3973
    goto _OVER;
×
3974
  }
3975

3976
  if (sdbGetSize(pMnode->pSdb, SDB_MOUNT) > 0) {
13,424✔
3977
    code = TSDB_CODE_MND_MOUNT_NOT_EMPTY;
×
3978
    goto _OVER;
×
3979
  }
3980

3981
  while (1) {
40,991✔
3982
    SDnodeObj *pDnode = NULL;
54,415✔
3983
    pIter = sdbFetch(pMnode->pSdb, SDB_DNODE, pIter, (void **)&pDnode);
54,415✔
3984
    if (pIter == NULL) break;
54,415✔
3985
    if (!mndIsDnodeOnline(pDnode, curMs)) {
42,643✔
3986
      sdbCancelFetch(pMnode->pSdb, pIter);
1,652✔
3987
      code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
1,652✔
3988
      mError("failed to balance vgroup since %s, dnode:%d", terrstr(), pDnode->id);
1,652✔
3989
      sdbRelease(pMnode->pSdb, pDnode);
1,652✔
3990
      goto _OVER;
1,652✔
3991
    }
3992

3993
    sdbRelease(pMnode->pSdb, pDnode);
40,991✔
3994
  }
3995

3996
  pArray = mndBuildDnodesArray(pMnode, 0, NULL);
11,772✔
3997
  if (pArray == NULL) {
11,772✔
3998
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3999
    if (terrno != 0) code = terrno;
×
4000
    goto _OVER;
×
4001
  }
4002

4003
  if (taosArrayGetSize(pArray) < 2) {
11,772✔
4004
    mInfo("no need to balance vgroup since dnode num less than 2");
×
4005
    code = 0;
×
4006
  } else {
4007
    code = mndBalanceVgroup(pMnode, pReq, pArray);
11,772✔
4008
  }
4009

4010
  if (tsAuditLevel >= AUDIT_LEVEL_CLUSTER) {
11,772✔
4011
    int64_t tse = taosGetTimestampMs();
11,772✔
4012
    double  duration = (double)(tse - tss);
11,772✔
4013
    duration = duration / 1000;
11,772✔
4014
    auditRecord(pReq, pMnode->clusterId, "balanceVgroup", "", "", req.sql, req.sqlLen, duration, 0);
11,772✔
4015
  }
4016

4017
_OVER:
13,424✔
4018
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
13,424✔
4019
    mError("failed to balance vgroup since %s", tstrerror(code));
2,807✔
4020
  }
4021

4022
  taosArrayDestroy(pArray);
13,424✔
4023
  tFreeSBalanceVgroupReq(&req);
13,424✔
4024
  TAOS_RETURN(code);
13,424✔
4025
}
4026

4027
bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; }
153,487,228✔
4028

4029
bool mndVgroupInDnode(SVgObj *pVgroup, int32_t dnodeId) {
9,848✔
4030
  for (int i = 0; i < pVgroup->replica; i++) {
27,300✔
4031
    if (pVgroup->vnodeGid[i].dnodeId == dnodeId) return true;
23,607✔
4032
  }
4033
  return false;
3,693✔
4034
}
4035

4036
static void *mndBuildCompactVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen, int64_t compactTs,
175,070✔
4037
                                     STimeWindow tw, bool metaOnly, bool force, ETsdbOpType type,
4038
                                     ETriggerType triggerType) {
4039
  SCompactVnodeReq compactReq = {0};
175,070✔
4040
  compactReq.dbUid = pDb->uid;
175,070✔
4041
  compactReq.compactStartTime = compactTs;
175,070✔
4042
  compactReq.tw = tw;
175,070✔
4043
  compactReq.metaOnly = metaOnly;
175,070✔
4044
  compactReq.force = force;
175,070✔
4045
  compactReq.optrType = type;
175,070✔
4046
  compactReq.triggerType = triggerType;
175,070✔
4047
  tstrncpy(compactReq.db, pDb->name, TSDB_DB_FNAME_LEN);
175,070✔
4048

4049
  mInfo("vgId:%d, build compact vnode config req", pVgroup->vgId);
175,070✔
4050
  int32_t contLen = tSerializeSCompactVnodeReq(NULL, 0, &compactReq);
175,070✔
4051
  if (contLen < 0) {
175,070✔
4052
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
4053
    return NULL;
×
4054
  }
4055
  contLen += sizeof(SMsgHead);
175,070✔
4056

4057
  void *pReq = taosMemoryMalloc(contLen);
175,070✔
4058
  if (pReq == NULL) {
175,070✔
4059
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
4060
    return NULL;
×
4061
  }
4062

4063
  SMsgHead *pHead = pReq;
175,070✔
4064
  pHead->contLen = htonl(contLen);
175,070✔
4065
  pHead->vgId = htonl(pVgroup->vgId);
175,070✔
4066

4067
  if (tSerializeSCompactVnodeReq((char *)pReq + sizeof(SMsgHead), contLen, &compactReq) < 0) {
175,070✔
4068
    taosMemoryFree(pReq);
×
4069
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
4070
    return NULL;
×
4071
  }
4072
  *pContLen = contLen;
175,070✔
4073
  return pReq;
175,070✔
4074
}
4075

4076
static int32_t mndAddCompactVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs,
70,074✔
4077
                                        STimeWindow tw, bool metaOnly, bool force, ETsdbOpType type,
4078
                                        ETriggerType triggerType) {
4079
  int32_t      code = 0;
70,074✔
4080
  STransAction action = {0};
70,074✔
4081
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
70,074✔
4082

4083
  int32_t contLen = 0;
70,074✔
4084
  void   *pReq =
4085
      mndBuildCompactVnodeReq(pMnode, pDb, pVgroup, &contLen, compactTs, tw, metaOnly, force, type, triggerType);
70,074✔
4086
  if (pReq == NULL) {
70,074✔
4087
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
4088
    if (terrno != 0) code = terrno;
×
4089
    TAOS_RETURN(code);
×
4090
  }
4091

4092
  action.pCont = pReq;
70,074✔
4093
  action.contLen = contLen;
70,074✔
4094
  action.msgType = TDMT_VND_COMPACT;
70,074✔
4095

4096
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
70,074✔
4097
    taosMemoryFree(pReq);
×
4098
    TAOS_RETURN(code);
×
4099
  }
4100

4101
  TAOS_RETURN(code);
70,074✔
4102
}
4103

4104
int32_t mndBuildCompactVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs,
70,074✔
4105
                                    STimeWindow tw, bool metaOnly, bool force, ETsdbOpType type,
4106
                                    ETriggerType triggerType) {
4107
  TAOS_CHECK_RETURN(
70,074✔
4108
      mndAddCompactVnodeAction(pMnode, pTrans, pDb, pVgroup, compactTs, tw, metaOnly, force, type, triggerType));
4109
  return 0;
70,074✔
4110
}
4111

4112
int32_t mndBuildTrimVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t startTs,
104,996✔
4113
                                 STimeWindow tw, ETsdbOpType type, ETriggerType triggerType) {
4114
  int32_t      code = 0;
104,996✔
4115
  STransAction action = {0};
104,996✔
4116
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
104,996✔
4117

4118
  int32_t contLen = 0;
104,996✔
4119
  // reuse SCompactVnodeReq as SVTrimDbReq
4120
  void *pReq = mndBuildCompactVnodeReq(pMnode, pDb, pVgroup, &contLen, startTs, tw, false, false, type, triggerType);
104,996✔
4121
  if (pReq == NULL) {
104,996✔
4122
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
4123
    if (terrno != 0) code = terrno;
×
4124
    TAOS_RETURN(code);
×
4125
  }
4126

4127
  action.pCont = pReq;
104,996✔
4128
  action.contLen = contLen;
104,996✔
4129
  action.msgType = TDMT_VND_TRIM;
104,996✔
4130

4131
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
104,996✔
4132
    taosMemoryFree(pReq);
×
4133
    TAOS_RETURN(code);
×
4134
  }
4135

4136
  TAOS_RETURN(code);
104,996✔
4137
}
4138

4139
static int32_t mndProcessSetVgroupKeepVersionReq(SRpcMsg *pReq) {
1,276✔
4140
  SMnode *pMnode = pReq->info.node;
1,276✔
4141
  int32_t code = TSDB_CODE_SUCCESS;
1,276✔
4142
  STrans *pTrans = NULL;
1,276✔
4143
  SVgObj *pVgroup = NULL;
1,276✔
4144

4145
  SMndSetVgroupKeepVersionReq req = {0};
1,276✔
4146
  if (tDeserializeSMndSetVgroupKeepVersionReq(pReq->pCont, pReq->contLen, &req) != 0) {
1,276✔
4147
    code = TSDB_CODE_INVALID_MSG;
×
4148
    goto _OVER;
×
4149
  }
4150

4151
  mInfo("start to set vgroup keep version, vgId:%d, keepVersion:%" PRId64, req.vgId, req.keepVersion);
1,276✔
4152

4153
  // Check permission
4154
  if ((code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_WRITE_DB)) != 0) {
1,276✔
4155
    goto _OVER;
×
4156
  }
4157

4158
  // Get vgroup
4159
  pVgroup = mndAcquireVgroup(pMnode, req.vgId);
1,276✔
4160
  if (pVgroup == NULL) {
1,276✔
4161
    code = TSDB_CODE_MND_VGROUP_NOT_EXIST;
×
4162
    mError("vgId:%d not exist, failed to set keep version", req.vgId);
×
4163
    goto _OVER;
×
4164
  }
4165

4166
  // Create transaction
4167
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "set-vgroup-keep-version");
1,276✔
4168
  if (pTrans == NULL) {
1,276✔
4169
    code = terrno != 0 ? terrno : TSDB_CODE_MND_RETURN_VALUE_NULL;
×
4170
    mndReleaseVgroup(pMnode, pVgroup);
×
4171
    goto _OVER;
×
4172
  }
4173

4174
  mndTransSetSerial(pTrans);
1,276✔
4175
  mInfo("trans:%d, used to set vgroup keep version, vgId:%d keepVersion:%" PRId64, pTrans->id, req.vgId,
1,276✔
4176
        req.keepVersion);
4177

4178
  // Update SVgObj's keepVersion in mnode
4179
  SVgObj newVgroup = {0};
1,276✔
4180
  memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
1,276✔
4181
  newVgroup.keepVersion = req.keepVersion;
1,276✔
4182
  newVgroup.keepVersionTime = taosGetTimestampMs();
1,276✔
4183

4184
  // Add prepare log for SDB vgroup update (execute in PREPARE stage, before redo actions)
4185
  SSdbRaw *pCommitRaw = mndVgroupActionEncode(&newVgroup);
1,276✔
4186
  if (pCommitRaw == NULL) {
1,276✔
4187
    code = TSDB_CODE_OUT_OF_MEMORY;
×
4188
    mndReleaseVgroup(pMnode, pVgroup);
×
4189
    goto _OVER;
×
4190
  }
4191
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
1,276✔
4192
    code = terrno;
×
4193
    sdbFreeRaw(pCommitRaw);
×
4194
    mndReleaseVgroup(pMnode, pVgroup);
×
4195
    goto _OVER;
×
4196
  }
4197
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
1,276✔
4198
    mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", pVgroup->vgId, tstrerror(code), __LINE__);
×
4199
    mndReleaseVgroup(pMnode, pVgroup);
×
4200
    goto _OVER;
×
4201
  }
4202

4203
  // Prepare message for vnodes
4204
  SVndSetKeepVersionReq vndReq = {.keepVersion = req.keepVersion};
1,276✔
4205
  int32_t               reqLen = tSerializeSVndSetKeepVersionReq(NULL, 0, &vndReq);
1,276✔
4206
  int32_t               contLen = reqLen + sizeof(SMsgHead);
1,276✔
4207

4208
  // Send to all replicas of the vgroup
4209
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
5,104✔
4210
    SMsgHead *pHead = taosMemoryMalloc(contLen);
3,828✔
4211
    if (pHead == NULL) {
3,828✔
4212
      code = TSDB_CODE_OUT_OF_MEMORY;
×
4213
      mndReleaseVgroup(pMnode, pVgroup);
×
4214
      goto _OVER;
×
4215
    }
4216

4217
    pHead->contLen = htonl(contLen);
3,828✔
4218
    pHead->vgId = htonl(pVgroup->vgId);
3,828✔
4219

4220
    if (tSerializeSVndSetKeepVersionReq((char *)pHead + sizeof(SMsgHead), reqLen, &vndReq) < 0) {
3,828✔
4221
      taosMemoryFree(pHead);
×
4222
      code = TSDB_CODE_OUT_OF_MEMORY;
×
4223
      mndReleaseVgroup(pMnode, pVgroup);
×
4224
      goto _OVER;
×
4225
    }
4226

4227
    // Get dnode and add action to transaction
4228
    SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgroup->vnodeGid[i].dnodeId);
3,828✔
4229
    if (pDnode == NULL) {
3,828✔
4230
      taosMemoryFree(pHead);
×
4231
      code = TSDB_CODE_MND_DNODE_NOT_EXIST;
×
4232
      mndReleaseVgroup(pMnode, pVgroup);
×
4233
      goto _OVER;
×
4234
    }
4235

4236
    STransAction action = {0};
3,828✔
4237
    action.epSet = mndGetDnodeEpset(pDnode);
3,828✔
4238
    mndReleaseDnode(pMnode, pDnode);
3,828✔
4239
    action.pCont = pHead;
3,828✔
4240
    action.contLen = contLen;
3,828✔
4241
    action.msgType = TDMT_VND_SET_KEEP_VERSION;
3,828✔
4242
    action.acceptableCode = TSDB_CODE_VND_STOPPED;
3,828✔
4243

4244
    if (mndTransAppendRedoAction(pTrans, &action) != 0) {
3,828✔
4245
      taosMemoryFree(pHead);
×
4246
      code = terrno;
×
4247
      mndReleaseVgroup(pMnode, pVgroup);
×
4248
      goto _OVER;
×
4249
    }
4250
  }
4251

4252
  mndReleaseVgroup(pMnode, pVgroup);
1,276✔
4253

4254
  // Prepare and execute transaction
4255
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
1,276✔
4256
    goto _OVER;
×
4257
  }
4258

4259
  code = TSDB_CODE_ACTION_IN_PROGRESS;
1,276✔
4260

4261
_OVER:
1,276✔
4262
  if (pTrans != NULL) mndTransDrop(pTrans);
1,276✔
4263

4264
  return code;
1,276✔
4265
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc