• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5011

03 Apr 2026 03:59PM UTC coverage: 72.3% (+0.008%) from 72.292%
#5011

push

travis-ci

web-flow
merge: from main to 3.0 branch #35067

4053 of 5985 new or added lines in 68 files covered. (67.72%)

732 existing lines in 143 files now uncovered.

257430 of 356056 relevant lines covered (72.3%)

131834103.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.94
/source/dnode/mnode/impl/src/mndVgroup.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndVgroup.h"
18
#include "audit.h"
19
#include "mndArbGroup.h"
20
#include "mndDb.h"
21
#include "mndDnode.h"
22
#include "mndEncryptAlgr.h"
23
#include "mndMnode.h"
24
#include "mndPrivilege.h"
25
#include "mndShow.h"
26
#include "mndStb.h"
27
#include "mndStream.h"
28
#include "mndTopic.h"
29
#include "mndTrans.h"
30
#include "mndUser.h"
31
#include "tmisce.h"
32

33
#define VGROUP_VER_COMPAT_MOUNT_KEEP_VER 2
34
#define VGROUP_VER_NUMBER                VGROUP_VER_COMPAT_MOUNT_KEEP_VER
35
#define VGROUP_RESERVE_SIZE              60
36
// since 3.3.6.32/3.3.8.6 mountId + keepVersion + keepVersionTime + VGROUP_RESERVE_SIZE = 4 + 8 + 8 + 60 = 80
37
#define DLEN_AFTER_SYNC_CONF_CHANGE_VER 80
38

39
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup);
40
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup);
41
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew);
42
static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw);
43

44
static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
45
static void    mndCancelGetNextVgroup(SMnode *pMnode, void *pIter);
46
static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
47
static void    mndCancelGetNextVnode(SMnode *pMnode, void *pIter);
48

49
static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq);
50
static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq);
51
static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq);
52
static int32_t mndProcessVgroupBalanceLeaderMsg(SRpcMsg *pReq);
53
static int32_t mndProcessSetVgroupKeepVersionReq(SRpcMsg *pReq);
54

55
int32_t mndInitVgroup(SMnode *pMnode) {
459,358✔
56
  SSdbTable table = {
459,358✔
57
      .sdbType = SDB_VGROUP,
58
      .keyType = SDB_KEY_INT32,
59
      .encodeFp = (SdbEncodeFp)mndVgroupActionEncode,
60
      .decodeFp = (SdbDecodeFp)mndVgroupActionDecode,
61
      .insertFp = (SdbInsertFp)mndVgroupActionInsert,
62
      .updateFp = (SdbUpdateFp)mndVgroupActionUpdate,
63
      .deleteFp = (SdbDeleteFp)mndVgroupActionDelete,
64
      .validateFp = (SdbValidateFp)mndNewVgActionValidate,
65
  };
66

67
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndTransProcessRsp);
459,358✔
68
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_REPLICA_RSP, mndTransProcessRsp);
459,358✔
69
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIG_RSP, mndTransProcessRsp);
459,358✔
70
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIRM_RSP, mndTransProcessRsp);
459,358✔
71
  mndSetMsgHandle(pMnode, TDMT_VND_SET_KEEP_VERSION_RSP, mndTransProcessRsp);
459,358✔
72
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_HASHRANGE_RSP, mndTransProcessRsp);
459,358✔
73
  mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndTransProcessRsp);
459,358✔
74
  mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_RSP, mndTransProcessRsp);
459,358✔
75
  mndSetMsgHandle(pMnode, TDMT_VND_SCAN_RSP, mndTransProcessRsp);
459,358✔
76
  mndSetMsgHandle(pMnode, TDMT_VND_DISABLE_WRITE_RSP, mndTransProcessRsp);
459,358✔
77
  mndSetMsgHandle(pMnode, TDMT_SYNC_FORCE_FOLLOWER_RSP, mndTransProcessRsp);
459,358✔
78
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_ELECTBASELINE_RSP, mndTransProcessRsp);
459,358✔
79
  
80
  mndSetMsgHandle(pMnode, TDMT_DND_ALTER_VNODE_TYPE_RSP, mndTransProcessRsp);
459,358✔
81
  mndSetMsgHandle(pMnode, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP_RSP, mndTransProcessRsp);
459,358✔
82
  mndSetMsgHandle(pMnode, TDMT_SYNC_CONFIG_CHANGE_RSP, mndTransProcessRsp);
459,358✔
83

84
  mndSetMsgHandle(pMnode, TDMT_MND_REDISTRIBUTE_VGROUP, mndProcessRedistributeVgroupMsg);
459,358✔
85
  mndSetMsgHandle(pMnode, TDMT_MND_SPLIT_VGROUP, mndProcessSplitVgroupMsg);
459,358✔
86
  // mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessVgroupBalanceLeaderMsg);
87
  mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessBalanceVgroupMsg);
459,358✔
88
  mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP_LEADER, mndProcessVgroupBalanceLeaderMsg);
459,358✔
89
  mndSetMsgHandle(pMnode, TDMT_MND_SET_VGROUP_KEEP_VERSION, mndProcessSetVgroupKeepVersionReq);
459,358✔
90

91
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndRetrieveVgroups);
459,358✔
92
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndCancelGetNextVgroup);
459,358✔
93
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndRetrieveVnodes);
459,358✔
94
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndCancelGetNextVnode);
459,358✔
95

96
  return sdbSetTable(pMnode->pSdb, table);
459,358✔
97
}
98

99
void mndCleanupVgroup(SMnode *pMnode) {}
459,297✔
100

101
SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) {
12,219,268✔
102
  int32_t code = 0;
12,219,268✔
103
  int32_t lino = 0;
12,219,268✔
104
  terrno = TSDB_CODE_OUT_OF_MEMORY;
12,219,268✔
105

106
  SSdbRaw *pRaw = sdbAllocRaw(SDB_VGROUP, VGROUP_VER_NUMBER, sizeof(SVgObj) + VGROUP_RESERVE_SIZE);
12,219,268✔
107
  if (pRaw == NULL) goto _OVER;
12,219,268✔
108

109
  int32_t dataPos = 0;
12,219,268✔
110
  SDB_SET_INT32(pRaw, dataPos, pVgroup->vgId, _OVER)
12,219,268✔
111
  SDB_SET_INT64(pRaw, dataPos, pVgroup->createdTime, _OVER)
12,219,268✔
112
  SDB_SET_INT64(pRaw, dataPos, pVgroup->updateTime, _OVER)
12,219,268✔
113
  SDB_SET_INT32(pRaw, dataPos, pVgroup->version, _OVER)
12,219,268✔
114
  SDB_SET_INT32(pRaw, dataPos, pVgroup->hashBegin, _OVER)
12,219,268✔
115
  SDB_SET_INT32(pRaw, dataPos, pVgroup->hashEnd, _OVER)
12,219,268✔
116
  SDB_SET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, _OVER)
12,219,268✔
117
  SDB_SET_INT64(pRaw, dataPos, pVgroup->dbUid, _OVER)
12,219,268✔
118
  SDB_SET_INT8(pRaw, dataPos, pVgroup->isTsma, _OVER)
12,219,268✔
119
  SDB_SET_INT8(pRaw, dataPos, pVgroup->replica, _OVER)
12,219,268✔
120
  for (int8_t i = 0; i < pVgroup->replica; ++i) {
26,435,698✔
121
    SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
14,216,430✔
122
    SDB_SET_INT32(pRaw, dataPos, pVgid->dnodeId, _OVER)
14,216,430✔
123
  }
124
  SDB_SET_INT32(pRaw, dataPos, pVgroup->syncConfChangeVer, _OVER)
12,219,268✔
125
  SDB_SET_INT32(pRaw, dataPos, pVgroup->mountVgId, _OVER)
12,219,268✔
126
  SDB_SET_INT64(pRaw, dataPos, pVgroup->keepVersion, _OVER)
12,219,268✔
127
  SDB_SET_INT64(pRaw, dataPos, pVgroup->keepVersionTime, _OVER)
12,219,268✔
128
  SDB_SET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER)
12,219,268✔
129
  SDB_SET_DATALEN(pRaw, dataPos, _OVER)
12,219,268✔
130

131
  terrno = 0;
12,219,268✔
132

133
_OVER:
12,219,268✔
134
  if (terrno != 0) {
12,219,268✔
135
    mError("vgId:%d, failed to encode to raw:%p since %s", pVgroup->vgId, pRaw, terrstr());
×
136
    sdbFreeRaw(pRaw);
×
137
    return NULL;
×
138
  }
139

140
  mTrace("vgId:%d, encode to raw:%p, row:%p", pVgroup->vgId, pRaw, pVgroup);
12,219,268✔
141
  return pRaw;
12,219,268✔
142
}
143

144
SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
11,005,505✔
145
  int32_t code = 0;
11,005,505✔
146
  int32_t lino = 0;
11,005,505✔
147
  terrno = TSDB_CODE_OUT_OF_MEMORY;
11,005,505✔
148
  SSdbRow *pRow = NULL;
11,005,505✔
149
  SVgObj  *pVgroup = NULL;
11,005,505✔
150

151
  int8_t sver = 0;
11,005,505✔
152
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
11,005,505✔
153

154
  if (sver < 1 || sver > VGROUP_VER_NUMBER) {
11,005,505✔
155
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
156
    goto _OVER;
×
157
  }
158

159
  pRow = sdbAllocRow(sizeof(SVgObj));
11,005,505✔
160
  if (pRow == NULL) goto _OVER;
11,005,505✔
161

162
  pVgroup = sdbGetRowObj(pRow);
11,005,505✔
163
  if (pVgroup == NULL) goto _OVER;
11,005,505✔
164

165
  int32_t dataPos = 0;
11,005,505✔
166
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->vgId, _OVER)
11,005,505✔
167
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->createdTime, _OVER)
11,005,505✔
168
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->updateTime, _OVER)
11,005,505✔
169
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->version, _OVER)
11,005,505✔
170
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashBegin, _OVER)
11,005,505✔
171
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashEnd, _OVER)
11,005,505✔
172
  SDB_GET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, _OVER)
11,005,505✔
173
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->dbUid, _OVER)
11,005,505✔
174
  SDB_GET_INT8(pRaw, dataPos, &pVgroup->isTsma, _OVER)
11,005,505✔
175
  SDB_GET_INT8(pRaw, dataPos, &pVgroup->replica, _OVER)
11,005,505✔
176
  for (int8_t i = 0; i < pVgroup->replica; ++i) {
23,945,715✔
177
    SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
12,940,210✔
178
    SDB_GET_INT32(pRaw, dataPos, &pVgid->dnodeId, _OVER)
12,940,210✔
179
    if (pVgroup->replica == 1) {
12,940,210✔
180
      pVgid->syncState = TAOS_SYNC_STATE_LEADER;
9,990,239✔
181
    }
182
    pVgid->snapSeq = -1;
12,940,210✔
183
  }
184
  if (dataPos + 2 * sizeof(int32_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen) {
11,005,505✔
185
    SDB_GET_INT32(pRaw, dataPos, &pVgroup->syncConfChangeVer, _OVER)
11,005,505✔
186
  }
187

188
  int32_t dlenAfterSyncConfChangeVer = pRaw->dataLen - dataPos;
11,005,505✔
189
  if (dataPos + sizeof(int32_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen) {
11,005,505✔
190
    SDB_GET_INT32(pRaw, dataPos, &pVgroup->mountVgId, _OVER)
11,005,505✔
191
  }
192
  if (dataPos + sizeof(int64_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen) {
11,005,505✔
193
    SDB_GET_INT64(pRaw, dataPos, &pVgroup->keepVersion, _OVER)
11,005,505✔
194
  }
195
  if (dataPos + sizeof(int64_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen) {
11,005,505✔
196
    SDB_GET_INT64(pRaw, dataPos, &pVgroup->keepVersionTime, _OVER)
11,005,505✔
197
  }
198
  if (dataPos + VGROUP_RESERVE_SIZE <= pRaw->dataLen) {
11,005,505✔
199
    SDB_GET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER)
11,005,505✔
200
  }
201

202
  if (sver < VGROUP_VER_COMPAT_MOUNT_KEEP_VER) {
11,005,505✔
203
    if (dlenAfterSyncConfChangeVer == DLEN_AFTER_SYNC_CONF_CHANGE_VER) {
×
204
      pVgroup->mountVgId = 0;
×
205
    }
206
    pVgroup->keepVersion = -1;
×
207
    pVgroup->keepVersionTime = 0;
×
208
  }
209

210
  terrno = 0;
11,005,505✔
211

212
_OVER:
11,005,505✔
213
  if (terrno != 0) {
11,005,505✔
214
    mError("vgId:%d, failed to decode from raw:%p since %s", pVgroup == NULL ? 0 : pVgroup->vgId, pRaw, terrstr());
×
215
    taosMemoryFreeClear(pRow);
×
216
    return NULL;
×
217
  }
218

219
  mTrace("vgId:%d, decode from raw:%p, row:%p", pVgroup->vgId, pRaw, pVgroup);
11,005,505✔
220
  return pRow;
11,005,505✔
221
}
222

223
static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw) {
2,898,542✔
224
  SSdb    *pSdb = pMnode->pSdb;
2,898,542✔
225
  SSdbRow *pRow = NULL;
2,898,542✔
226
  SVgObj  *pVgroup = NULL;
2,898,542✔
227
  int      code = -1;
2,898,542✔
228

229
  pRow = mndVgroupActionDecode(pRaw);
2,898,542✔
230
  if (pRow == NULL) {
2,898,542✔
231
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
232
    if (terrno != 0) code = terrno;
×
233
    goto _OVER;
×
234
  }
235
  pVgroup = sdbGetRowObj(pRow);
2,898,542✔
236
  if (pVgroup == NULL) {
2,898,542✔
237
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
238
    if (terrno != 0) code = terrno;
×
239
    goto _OVER;
×
240
  }
241

242
  int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
2,898,542✔
243
  if (maxVgId > pVgroup->vgId) {
2,898,542✔
244
    mError("trans:%d, vgroup id %d already in use. maxVgId:%d", pTrans->id, pVgroup->vgId, maxVgId);
×
245
    goto _OVER;
×
246
  }
247

248
  code = 0;
2,898,542✔
249
_OVER:
2,898,542✔
250
  if (pVgroup) mndVgroupActionDelete(pSdb, pVgroup);
2,898,542✔
251
  taosMemoryFreeClear(pRow);
2,898,542✔
252
  TAOS_RETURN(code);
2,898,542✔
253
}
254

255
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup) {
3,218,481✔
256
  mTrace("vgId:%d, perform insert action, row:%p", pVgroup->vgId, pVgroup);
3,218,481✔
257
  return 0;
3,218,481✔
258
}
259

260
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup) {
10,993,793✔
261
  mTrace("vgId:%d, perform delete action, row:%p", pVgroup->vgId, pVgroup);
10,993,793✔
262
  return 0;
10,993,793✔
263
}
264

265
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew) {
3,338,867✔
266
  mTrace("vgId:%d, perform update action, old row:%p new row:%p", pOld->vgId, pOld, pNew);
3,338,867✔
267
  pOld->updateTime = pNew->updateTime;
3,338,867✔
268
  pOld->version = pNew->version;
3,338,867✔
269
  pOld->hashBegin = pNew->hashBegin;
3,338,867✔
270
  pOld->hashEnd = pNew->hashEnd;
3,338,867✔
271
  pOld->replica = pNew->replica;
3,338,867✔
272
  pOld->isTsma = pNew->isTsma;
3,338,867✔
273
  pOld->keepVersion = pNew->keepVersion;
3,338,867✔
274
  pOld->keepVersionTime = pNew->keepVersionTime;
3,338,867✔
275
  for (int32_t i = 0; i < pNew->replica; ++i) {
7,694,827✔
276
    SVnodeGid *pNewGid = &pNew->vnodeGid[i];
4,355,960✔
277
    for (int32_t j = 0; j < pOld->replica; ++j) {
11,818,244✔
278
      SVnodeGid *pOldGid = &pOld->vnodeGid[j];
7,462,284✔
279
      if (pNewGid->dnodeId == pOldGid->dnodeId) {
7,462,284✔
280
        pNewGid->syncState = pOldGid->syncState;
4,103,104✔
281
        pNewGid->syncRestore = pOldGid->syncRestore;
4,103,104✔
282
        pNewGid->syncCanRead = pOldGid->syncCanRead;
4,103,104✔
283
        pNewGid->syncAppliedIndex = pOldGid->syncAppliedIndex;
4,103,104✔
284
        pNewGid->syncCommitIndex = pOldGid->syncCommitIndex;
4,103,104✔
285
        pNewGid->bufferSegmentUsed = pOldGid->bufferSegmentUsed;
4,103,104✔
286
        pNewGid->bufferSegmentSize = pOldGid->bufferSegmentSize;
4,103,104✔
287
        pNewGid->learnerProgress = pOldGid->learnerProgress;
4,103,104✔
288
        pNewGid->snapSeq = pOldGid->snapSeq;
4,103,104✔
289
        pNewGid->syncTotalIndex = pOldGid->syncTotalIndex;
4,103,104✔
290
      }
291
    }
292
  }
293
  pNew->numOfTables = pOld->numOfTables;
3,338,867✔
294
  pNew->numOfTimeSeries = pOld->numOfTimeSeries;
3,338,867✔
295
  pNew->totalStorage = pOld->totalStorage;
3,338,867✔
296
  pNew->compStorage = pOld->compStorage;
3,338,867✔
297
  pNew->pointsWritten = pOld->pointsWritten;
3,338,867✔
298
  pNew->compact = pOld->compact;
3,338,867✔
299
  memcpy(pOld->vnodeGid, pNew->vnodeGid, (TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA) * sizeof(SVnodeGid));
3,338,867✔
300
  pOld->syncConfChangeVer = pNew->syncConfChangeVer;
3,338,867✔
301
  tstrncpy(pOld->dbName, pNew->dbName, TSDB_DB_FNAME_LEN);
3,338,867✔
302
  return 0;
3,338,867✔
303
}
304

305
SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId) {
152,745,887✔
306
  SSdb   *pSdb = pMnode->pSdb;
152,745,887✔
307
  SVgObj *pVgroup = sdbAcquire(pSdb, SDB_VGROUP, &vgId);
152,745,887✔
308
  if (pVgroup == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
152,745,887✔
309
    terrno = TSDB_CODE_MND_VGROUP_NOT_EXIST;
938,414✔
310
  }
311
  return pVgroup;
152,745,887✔
312
}
313

314
void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) {
152,084,877✔
315
  SSdb *pSdb = pMnode->pSdb;
152,084,877✔
316
  sdbRelease(pSdb, pVgroup);
152,084,877✔
317
}
152,084,877✔
318

319
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
3,203,775✔
320
  SCreateVnodeReq createReq = {0};
3,203,775✔
321
  createReq.vgId = pVgroup->vgId;
3,203,775✔
322
  memcpy(createReq.db, pDb->name, TSDB_DB_FNAME_LEN);
3,203,775✔
323
  createReq.dbUid = pDb->uid;
3,203,775✔
324
  createReq.vgVersion = pVgroup->version;
3,203,775✔
325
  createReq.numOfStables = pDb->cfg.numOfStables;
3,203,775✔
326
  createReq.buffer = pDb->cfg.buffer;
3,203,775✔
327
  createReq.pageSize = pDb->cfg.pageSize;
3,203,775✔
328
  createReq.pages = pDb->cfg.pages;
3,203,775✔
329
  createReq.cacheLastSize = pDb->cfg.cacheLastSize;
3,203,775✔
330
  createReq.cacheLastShardBits = pDb->cfg.cacheLastShardBits;
3,203,775✔
331
  createReq.daysPerFile = pDb->cfg.daysPerFile;
3,203,775✔
332
  createReq.daysToKeep0 = pDb->cfg.daysToKeep0;
3,203,775✔
333
  createReq.daysToKeep1 = pDb->cfg.daysToKeep1;
3,203,775✔
334
  createReq.daysToKeep2 = pDb->cfg.daysToKeep2;
3,203,775✔
335
  createReq.keepTimeOffset = pDb->cfg.keepTimeOffset;
3,203,775✔
336
  createReq.ssChunkSize = pDb->cfg.ssChunkSize;
3,203,775✔
337
  createReq.ssKeepLocal = pDb->cfg.ssKeepLocal;
3,203,775✔
338
  createReq.ssCompact = pDb->cfg.ssCompact;
3,203,775✔
339
  createReq.minRows = pDb->cfg.minRows;
3,203,775✔
340
  createReq.maxRows = pDb->cfg.maxRows;
3,203,775✔
341
  createReq.walFsyncPeriod = pDb->cfg.walFsyncPeriod;
3,203,775✔
342
  createReq.walLevel = pDb->cfg.walLevel;
3,203,775✔
343
  createReq.precision = pDb->cfg.precision;
3,203,775✔
344
  createReq.compression = pDb->cfg.compression;
3,203,775✔
345
  createReq.strict = pDb->cfg.strict;
3,203,775✔
346
  createReq.cacheLast = pDb->cfg.cacheLast;
3,203,775✔
347
  createReq.replica = 0;
3,203,775✔
348
  createReq.learnerReplica = 0;
3,203,775✔
349
  createReq.selfIndex = -1;
3,203,775✔
350
  createReq.learnerSelfIndex = -1;
3,203,775✔
351
  createReq.hashBegin = pVgroup->hashBegin;
3,203,775✔
352
  createReq.hashEnd = pVgroup->hashEnd;
3,203,775✔
353
  createReq.hashMethod = pDb->cfg.hashMethod;
3,203,775✔
354
  createReq.numOfRetensions = pDb->cfg.numOfRetensions;
3,203,775✔
355
  createReq.pRetensions = pDb->cfg.pRetensions;
3,203,775✔
356
  createReq.isTsma = pVgroup->isTsma;
3,203,775✔
357
  createReq.pTsma = pVgroup->pTsma;
3,203,775✔
358
  createReq.walRetentionPeriod = pDb->cfg.walRetentionPeriod;
3,203,775✔
359
  createReq.walRetentionSize = pDb->cfg.walRetentionSize;
3,203,775✔
360
  createReq.walRollPeriod = pDb->cfg.walRollPeriod;
3,203,775✔
361
  createReq.walSegmentSize = pDb->cfg.walSegmentSize;
3,203,775✔
362
  createReq.sstTrigger = pDb->cfg.sstTrigger;
3,203,775✔
363
  createReq.hashPrefix = pDb->cfg.hashPrefix;
3,203,775✔
364
  createReq.hashSuffix = pDb->cfg.hashSuffix;
3,203,775✔
365
  createReq.tsdbPageSize = pDb->cfg.tsdbPageSize;
3,203,775✔
366
  createReq.changeVersion = ++(pVgroup->syncConfChangeVer);
3,203,775✔
367
  // createReq.encryptAlgorithm = pDb->cfg.encryptAlgorithm;
368
  memset(createReq.encryptAlgrName, 0, TSDB_ENCRYPT_ALGR_NAME_LEN);
3,203,775✔
369
  if (pDb->cfg.encryptAlgorithm > 0) {
3,203,775✔
370
    mndGetEncryptOsslAlgrNameById(pMnode, pDb->cfg.encryptAlgorithm, createReq.encryptAlgrName);
4,371✔
371
  }
372
  createReq.isAudit = pDb->cfg.isAudit ? 1 : 0;
3,203,775✔
373
  createReq.allowDrop = pDb->cfg.allowDrop;
3,203,775✔
374
  createReq.secureDelete = pDb->cfg.secureDelete;
3,203,775✔
375
  int32_t code = 0;
3,203,775✔
376

377
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
7,551,898✔
378
    SReplica *pReplica = NULL;
4,348,123✔
379

380
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
4,348,123✔
381
      pReplica = &createReq.replicas[createReq.replica];
4,230,431✔
382
    } else {
383
      pReplica = &createReq.learnerReplicas[createReq.learnerReplica];
117,692✔
384
    }
385

386
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
4,348,123✔
387
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
4,348,123✔
388
    if (pVgidDnode == NULL) {
4,348,123✔
389
      return NULL;
×
390
    }
391

392
    pReplica->id = pVgidDnode->id;
4,348,123✔
393
    pReplica->port = pVgidDnode->port;
4,348,123✔
394
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
4,348,123✔
395
    mndReleaseDnode(pMnode, pVgidDnode);
4,348,123✔
396

397
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
4,348,123✔
398
      if (pDnode->id == pVgid->dnodeId) {
4,230,431✔
399
        createReq.selfIndex = createReq.replica;
3,086,083✔
400
      }
401
    } else {
402
      if (pDnode->id == pVgid->dnodeId) {
117,692✔
403
        createReq.learnerSelfIndex = createReq.learnerReplica;
117,692✔
404
      }
405
    }
406

407
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
4,348,123✔
408
      createReq.replica++;
4,230,431✔
409
    } else {
410
      createReq.learnerReplica++;
117,692✔
411
    }
412
  }
413

414
  if (createReq.selfIndex == -1 && createReq.learnerSelfIndex == -1) {
3,203,775✔
415
    terrno = TSDB_CODE_APP_ERROR;
×
416
    return NULL;
×
417
  }
418

419
  createReq.changeVersion = pVgroup->syncConfChangeVer;
3,203,775✔
420

421
  mInfo(
3,203,775✔
422
      "vgId:%d, build create vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d "
423
      "changeVersion:%d",
424
      createReq.vgId, createReq.replica, createReq.selfIndex, createReq.learnerReplica, createReq.learnerSelfIndex,
425
      createReq.strict, createReq.changeVersion);
426
  for (int32_t i = 0; i < createReq.replica; ++i) {
7,434,206✔
427
    mInfo("vgId:%d, replica:%d ep:%s:%u", createReq.vgId, i, createReq.replicas[i].fqdn, createReq.replicas[i].port);
4,230,431✔
428
  }
429
  for (int32_t i = 0; i < createReq.learnerReplica; ++i) {
3,321,467✔
430
    mInfo("vgId:%d, replica:%d ep:%s:%u", createReq.vgId, i, createReq.learnerReplicas[i].fqdn,
117,692✔
431
          createReq.learnerReplicas[i].port);
432
  }
433

434
  int32_t contLen = tSerializeSCreateVnodeReq(NULL, 0, &createReq);
3,203,775✔
435
  if (contLen < 0) {
3,203,775✔
436
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
437
    return NULL;
×
438
  }
439

440
  void *pReq = taosMemoryMalloc(contLen);
3,203,775✔
441
  if (pReq == NULL) {
3,203,775✔
442
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
443
    return NULL;
×
444
  }
445

446
  code = tSerializeSCreateVnodeReq(pReq, contLen, &createReq);
3,203,775✔
447
  if (code < 0) {
3,203,775✔
448
    terrno = TSDB_CODE_APP_ERROR;
×
449
    taosMemoryFree(pReq);
×
450
    mError("vgId:%d, failed to serialize create vnode req,since %s", createReq.vgId, terrstr());
×
451
    return NULL;
×
452
  }
453
  *pContLen = contLen;
3,203,775✔
454
  return pReq;
3,203,775✔
455
}
456

457
static void *mndBuildAlterVnodeConfigReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
208,227✔
458
  SAlterVnodeConfigReq alterReq = {0};
208,227✔
459
  alterReq.vgVersion = pVgroup->version;
208,227✔
460
  alterReq.buffer = pDb->cfg.buffer;
208,227✔
461
  alterReq.pageSize = pDb->cfg.pageSize;
208,227✔
462
  alterReq.pages = pDb->cfg.pages;
208,227✔
463
  alterReq.cacheLastSize = pDb->cfg.cacheLastSize;
208,227✔
464
  alterReq.cacheLastShardBits = pDb->cfg.cacheLastShardBits;
208,227✔
465
  alterReq.daysPerFile = pDb->cfg.daysPerFile;
208,227✔
466
  alterReq.daysToKeep0 = pDb->cfg.daysToKeep0;
208,227✔
467
  alterReq.daysToKeep1 = pDb->cfg.daysToKeep1;
208,227✔
468
  alterReq.daysToKeep2 = pDb->cfg.daysToKeep2;
208,227✔
469
  alterReq.keepTimeOffset = pDb->cfg.keepTimeOffset;
208,227✔
470
  alterReq.walFsyncPeriod = pDb->cfg.walFsyncPeriod;
208,227✔
471
  alterReq.walLevel = pDb->cfg.walLevel;
208,227✔
472
  alterReq.strict = pDb->cfg.strict;
208,227✔
473
  alterReq.cacheLast = pDb->cfg.cacheLast;
208,227✔
474
  alterReq.sttTrigger = pDb->cfg.sstTrigger;
208,227✔
475
  alterReq.minRows = pDb->cfg.minRows;
208,227✔
476
  alterReq.walRetentionPeriod = pDb->cfg.walRetentionPeriod;
208,227✔
477
  alterReq.walRetentionSize = pDb->cfg.walRetentionSize;
208,227✔
478
  alterReq.ssKeepLocal = pDb->cfg.ssKeepLocal;
208,227✔
479
  alterReq.ssCompact = pDb->cfg.ssCompact;
208,227✔
480
  alterReq.allowDrop = (int8_t)pDb->cfg.allowDrop;
208,227✔
481
  alterReq.secureDelete = pDb->cfg.secureDelete;
208,227✔
482

483
  mInfo("vgId:%d, build alter vnode config req", pVgroup->vgId);
208,227✔
484
  int32_t contLen = tSerializeSAlterVnodeConfigReq(NULL, 0, &alterReq);
208,227✔
485
  if (contLen < 0) {
208,227✔
486
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
487
    return NULL;
×
488
  }
489
  contLen += sizeof(SMsgHead);
208,227✔
490

491
  void *pReq = taosMemoryMalloc(contLen);
208,227✔
492
  if (pReq == NULL) {
208,227✔
493
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
494
    return NULL;
×
495
  }
496

497
  SMsgHead *pHead = pReq;
208,227✔
498
  pHead->contLen = htonl(contLen);
208,227✔
499
  pHead->vgId = htonl(pVgroup->vgId);
208,227✔
500

501
  if (tSerializeSAlterVnodeConfigReq((char *)pReq + sizeof(SMsgHead), contLen, &alterReq) < 0) {
208,227✔
502
    taosMemoryFree(pReq);
×
503
    mError("vgId:%d, failed to serialize alter vnode config req,since %s", pVgroup->vgId, terrstr());
×
504
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
505
    return NULL;
×
506
  }
507
  *pContLen = contLen;
208,227✔
508
  return pReq;
208,227✔
509
}
510

511
static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId,
881,375✔
512
                                          int32_t *pContLen) {
513
  SAlterVnodeReplicaReq alterReq = {
1,762,750✔
514
      .vgId = pVgroup->vgId,
881,375✔
515
      .strict = pDb->cfg.strict,
881,375✔
516
      .replica = 0,
517
      .learnerReplica = 0,
518
      .selfIndex = -1,
519
      .learnerSelfIndex = -1,
520
      .changeVersion = ++(pVgroup->syncConfChangeVer),
1,762,750✔
521
  };
522

523
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
3,581,878✔
524
    SReplica *pReplica = NULL;
2,700,503✔
525

526
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
2,700,503✔
527
      pReplica = &alterReq.replicas[alterReq.replica];
2,488,902✔
528
      alterReq.replica++;
2,488,902✔
529
    } else {
530
      pReplica = &alterReq.learnerReplicas[alterReq.learnerReplica];
211,601✔
531
      alterReq.learnerReplica++;
211,601✔
532
    }
533

534
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
2,700,503✔
535
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
2,700,503✔
536
    if (pVgidDnode == NULL) return NULL;
2,700,503✔
537

538
    pReplica->id = pVgidDnode->id;
2,700,503✔
539
    pReplica->port = pVgidDnode->port;
2,700,503✔
540
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
2,700,503✔
541
    mndReleaseDnode(pMnode, pVgidDnode);
2,700,503✔
542

543
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
2,700,503✔
544
      if (dnodeId == pVgid->dnodeId) {
2,488,902✔
545
        alterReq.selfIndex = v;
881,375✔
546
      }
547
    } else {
548
      if (dnodeId == pVgid->dnodeId) {
211,601✔
549
        alterReq.learnerSelfIndex = v;
×
550
      }
551
    }
552
  }
553

554
  mInfo(
881,375✔
555
      "vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d "
556
      "changeVersion:%d",
557
      alterReq.vgId, alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica, alterReq.learnerSelfIndex,
558
      alterReq.strict, alterReq.changeVersion);
559
  for (int32_t i = 0; i < alterReq.replica; ++i) {
3,370,277✔
560
    mInfo("vgId:%d, replica:%d ep:%s:%u", alterReq.vgId, i, alterReq.replicas[i].fqdn, alterReq.replicas[i].port);
2,488,902✔
561
  }
562
  for (int32_t i = 0; i < alterReq.learnerReplica; ++i) {
1,092,976✔
563
    mInfo("vgId:%d, learnerReplica:%d ep:%s:%u", alterReq.vgId, i, alterReq.learnerReplicas[i].fqdn,
211,601✔
564
          alterReq.learnerReplicas[i].port);
565
  }
566

567
  if (alterReq.selfIndex == -1 && alterReq.learnerSelfIndex == -1) {
881,375✔
568
    terrno = TSDB_CODE_APP_ERROR;
×
569
    return NULL;
×
570
  }
571

572
  int32_t contLen = tSerializeSAlterVnodeReplicaReq(NULL, 0, &alterReq);
881,375✔
573
  if (contLen < 0) {
881,375✔
574
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
575
    return NULL;
×
576
  }
577

578
  void *pReq = taosMemoryMalloc(contLen);
881,375✔
579
  if (pReq == NULL) {
881,375✔
580
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
581
    return NULL;
×
582
  }
583

584
  if (tSerializeSAlterVnodeReplicaReq(pReq, contLen, &alterReq) < 0) {
881,375✔
585
    mError("vgId:%d, failed to serialize alter vnode req,since %s", alterReq.vgId, terrstr());
×
586
    taosMemoryFree(pReq);
×
587
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
588
    return NULL;
×
589
  }
590
  *pContLen = contLen;
881,375✔
591
  return pReq;
881,375✔
592
}
593

594
static void *mndBuildCheckLearnCatchupReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId,
×
595
                                          int32_t *pContLen) {
596
  SCheckLearnCatchupReq req = {
×
597
      .vgId = pVgroup->vgId,
×
598
      .strict = pDb->cfg.strict,
×
599
      .replica = 0,
600
      .learnerReplica = 0,
601
      .selfIndex = -1,
602
      .learnerSelfIndex = -1,
603
  };
604

605
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
×
606
    SReplica *pReplica = NULL;
×
607

608
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
609
      pReplica = &req.replicas[req.replica];
×
610
      req.replica++;
×
611
    } else {
612
      pReplica = &req.learnerReplicas[req.learnerReplica];
×
613
      req.learnerReplica++;
×
614
    }
615

616
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
×
617
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
×
618
    if (pVgidDnode == NULL) return NULL;
×
619

620
    pReplica->id = pVgidDnode->id;
×
621
    pReplica->port = pVgidDnode->port;
×
622
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
×
623
    mndReleaseDnode(pMnode, pVgidDnode);
×
624

625
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
626
      if (dnodeId == pVgid->dnodeId) {
×
627
        req.selfIndex = v;
×
628
      }
629
    } else {
630
      if (dnodeId == pVgid->dnodeId) {
×
631
        req.learnerSelfIndex = v;
×
632
      }
633
    }
634
  }
635

636
  mInfo("vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d",
×
637
        req.vgId, req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict);
638
  for (int32_t i = 0; i < req.replica; ++i) {
×
639
    mInfo("vgId:%d, replica:%d ep:%s:%u", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port);
×
640
  }
641
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
×
642
    mInfo("vgId:%d, learnerReplica:%d ep:%s:%u", req.vgId, i, req.learnerReplicas[i].fqdn, req.learnerReplicas[i].port);
×
643
  }
644

645
  if (req.selfIndex == -1 && req.learnerSelfIndex == -1) {
×
646
    terrno = TSDB_CODE_APP_ERROR;
×
647
    return NULL;
×
648
  }
649

650
  int32_t contLen = tSerializeSAlterVnodeReplicaReq(NULL, 0, &req);
×
651
  if (contLen < 0) {
×
652
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
653
    return NULL;
×
654
  }
655

656
  void *pReq = taosMemoryMalloc(contLen);
×
657
  if (pReq == NULL) {
×
658
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
659
    return NULL;
×
660
  }
661

662
  if (tSerializeSAlterVnodeReplicaReq(pReq, contLen, &req) < 0) {
×
663
    mError("vgId:%d, failed to serialize alter vnode req,since %s", req.vgId, terrstr());
×
664
    taosMemoryFree(pReq);
×
665
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
666
    return NULL;
×
667
  }
668
  *pContLen = contLen;
×
669
  return pReq;
×
670
}
671

672
static void *mndBuildDisableVnodeWriteReq(SMnode *pMnode, SDbObj *pDb, int32_t vgId, int32_t *pContLen) {
39,914✔
673
  SDisableVnodeWriteReq disableReq = {
39,914✔
674
      .vgId = vgId,
675
      .disable = 1,
676
  };
677

678
  mInfo("vgId:%d, build disable vnode write req", vgId);
39,914✔
679
  int32_t contLen = tSerializeSDisableVnodeWriteReq(NULL, 0, &disableReq);
39,914✔
680
  if (contLen < 0) {
39,914✔
681
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
682
    return NULL;
×
683
  }
684

685
  void *pReq = taosMemoryMalloc(contLen);
39,914✔
686
  if (pReq == NULL) {
39,914✔
687
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
688
    return NULL;
×
689
  }
690

691
  if (tSerializeSDisableVnodeWriteReq(pReq, contLen, &disableReq) < 0) {
39,914✔
692
    mError("vgId:%d, failed to serialize disable vnode write req,since %s", vgId, terrstr());
×
693
    taosMemoryFree(pReq);
×
694
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
695
    return NULL;
×
696
  }
697
  *pContLen = contLen;
39,914✔
698
  return pReq;
39,914✔
699
}
700

701
static void *mndBuildAlterVnodeHashRangeReq(SMnode *pMnode, int32_t srcVgId, SVgObj *pVgroup, int32_t *pContLen) {
39,914✔
702
  SAlterVnodeHashRangeReq alterReq = {
79,828✔
703
      .srcVgId = srcVgId,
704
      .dstVgId = pVgroup->vgId,
39,914✔
705
      .hashBegin = pVgroup->hashBegin,
39,914✔
706
      .hashEnd = pVgroup->hashEnd,
39,914✔
707
      .changeVersion = ++(pVgroup->syncConfChangeVer),
79,828✔
708
  };
709

710
  mInfo("vgId:%d, build alter vnode hashrange req, dstVgId:%d, hashrange:[%u, %u]", srcVgId, pVgroup->vgId,
39,914✔
711
        pVgroup->hashBegin, pVgroup->hashEnd);
712
  int32_t contLen = tSerializeSAlterVnodeHashRangeReq(NULL, 0, &alterReq);
39,914✔
713
  if (contLen < 0) {
39,914✔
714
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
715
    return NULL;
×
716
  }
717

718
  void *pReq = taosMemoryMalloc(contLen);
39,914✔
719
  if (pReq == NULL) {
39,914✔
720
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
721
    return NULL;
×
722
  }
723

724
  if (tSerializeSAlterVnodeHashRangeReq(pReq, contLen, &alterReq) < 0) {
39,914✔
725
    mError("vgId:%d, failed to serialize alter vnode hashrange req,since %s", srcVgId, terrstr());
×
726
    taosMemoryFree(pReq);
×
727
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
728
    return NULL;
×
729
  }
730
  *pContLen = contLen;
39,914✔
731
  return pReq;
39,914✔
732
}
733

734
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
4,712,019✔
735
  SDropVnodeReq dropReq = {0};
4,712,019✔
736
  dropReq.dnodeId = pDnode->id;
4,712,019✔
737
  dropReq.vgId = pVgroup->vgId;
4,712,019✔
738
  memcpy(dropReq.db, pDb->name, TSDB_DB_FNAME_LEN);
4,712,019✔
739
  dropReq.dbUid = pDb->uid;
4,712,019✔
740

741
  mInfo("vgId:%d, build drop vnode req", dropReq.vgId);
4,712,019✔
742
  int32_t contLen = tSerializeSDropVnodeReq(NULL, 0, &dropReq);
4,712,019✔
743
  if (contLen < 0) {
4,712,019✔
744
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
745
    return NULL;
×
746
  }
747

748
  void *pReq = taosMemoryMalloc(contLen);
4,712,019✔
749
  if (pReq == NULL) {
4,712,019✔
750
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
751
    return NULL;
×
752
  }
753

754
  if (tSerializeSDropVnodeReq(pReq, contLen, &dropReq) < 0) {
4,712,019✔
755
    mError("vgId:%d, failed to serialize drop vnode req,since %s", dropReq.vgId, terrstr());
×
756
    taosMemoryFree(pReq);
×
757
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
758
    return NULL;
×
759
  }
760
  *pContLen = contLen;
4,712,019✔
761
  return pReq;
4,712,019✔
762
}
763

764
static bool mndResetDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
2,076,232✔
765
  SDnodeObj *pDnode = pObj;
2,076,232✔
766
  pDnode->numOfVnodes = 0;
2,076,232✔
767
  pDnode->numOfOtherNodes = 0;
2,076,232✔
768
  return true;
2,076,232✔
769
}
770

771
static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
2,076,232✔
772
  SDnodeObj *pDnode = pObj;
2,076,232✔
773
  SArray    *pArray = p1;
2,076,232✔
774
  int32_t    exceptDnodeId = *(int32_t *)p2;
2,076,232✔
775
  SArray    *dnodeList = p3;
2,076,232✔
776

777
  if (exceptDnodeId == pDnode->id) {
2,076,232✔
778
    return true;
7,579✔
779
  }
780

781
  if (dnodeList != NULL) {
2,068,653✔
782
    int32_t dnodeListSize = taosArrayGetSize(dnodeList);
73,040✔
783
    if (dnodeListSize > 0) {
73,040✔
784
      bool inDnodeList = false;
73,040✔
785
      for (int32_t index = 0; index < dnodeListSize; ++index) {
238,020✔
786
        int32_t dnodeId = *(int32_t *)taosArrayGet(dnodeList, index);
164,980✔
787
        if (pDnode->id == dnodeId) {
164,980✔
788
          inDnodeList = true;
33,728✔
789
        }
790
      }
791
      if (!inDnodeList) {
73,040✔
792
        return true;
39,312✔
793
      }
794
    } else {
795
      return true;  // TS-6191
×
796
    }
797
  }
798

799
  int64_t curMs = taosGetTimestampMs();
2,029,341✔
800
  bool    online = mndIsDnodeOnline(pDnode, curMs);
2,029,341✔
801
  bool    isMnode = mndIsMnode(pMnode, pDnode->id);
2,029,341✔
802
  pDnode->numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
2,029,341✔
803
  pDnode->memUsed = mndGetVnodesMemory(pMnode, pDnode->id);
2,029,341✔
804

805
  mInfo("dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d online:%d memory avail:%" PRId64 " used:%" PRId64, pDnode->id,
2,029,341✔
806
        pDnode->numOfVnodes, pDnode->numOfSupportVnodes, isMnode, online, pDnode->memAvail, pDnode->memUsed);
807

808
  if (isMnode) {
2,029,341✔
809
    pDnode->numOfOtherNodes++;
1,460,217✔
810
  }
811

812
  if (online && pDnode->numOfSupportVnodes > 0) {
2,029,341✔
813
    if (taosArrayPush(pArray, pDnode) == NULL) return false;
1,979,308✔
814
  }
815
  return true;
2,029,341✔
816
}
817

818
static bool isDnodeInList(SArray *dnodeList, int32_t dnodeId) {
×
819
  int32_t dnodeListSize = taosArrayGetSize(dnodeList);
×
820
  for (int32_t i = 0; i < dnodeListSize; ++i) {
×
821
    int32_t id = *(int32_t *)TARRAY_GET_ELEM(dnodeList, i);
×
822
    if (id == dnodeId) {
×
823
      return true;
×
824
    }
825
  }
826
  return false;
×
827
}
828

829
#ifdef TD_ENTERPRISE
830
static float mndGetDnodeScore1(SDnodeObj *pDnode, int32_t additionDnodes, float ratio) {
×
831
  float totalDnodes = pDnode->numOfVnodes + (float)pDnode->numOfOtherNodes * ratio + additionDnodes;
×
832
  float result = totalDnodes / pDnode->numOfSupportVnodes;
×
833
  return pDnode->numOfVnodes > 0 ? -result : result;
×
834
}
835

836
static int32_t mndCompareDnodeVnodes1(SDnodeObj *pDnode1, SDnodeObj *pDnode2) {
×
837
  float d1Score = mndGetDnodeScore1(pDnode1, 0, 0.9);
×
838
  float d2Score = mndGetDnodeScore1(pDnode2, 0, 0.9);
×
839
  if (d1Score == d2Score) {
×
840
    if (pDnode1->id == pDnode2->id) {
×
841
      return 0;
×
842
    }
843
    return pDnode1->id > pDnode2->id ? 1 : -1;
×
844
  }
845
  return d1Score > d2Score ? 1 : -1;
×
846
}
847

848
static bool mndBuildDnodesListFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
×
849
  SDnodeObj *pDnode = pObj;
×
850
  SArray    *pArray = p1;
×
851

852
  bool isMnode = mndIsMnode(pMnode, pDnode->id);
×
853
  pDnode->numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
×
854

855
  if (isMnode) {
×
856
    pDnode->numOfOtherNodes++;
×
857
  }
858

859
  if (pDnode->numOfSupportVnodes > 0) {
×
860
    if (taosArrayPush(pArray, pDnode) == NULL) return false;
×
861
  }
862
  return true;
×
863
}
864

865
// TS-6191
866
static int32_t mndBuildNodesCheckDualReplica(SMnode *pMnode, int32_t nDnodes, SArray *dnodeList, SArray **ppDnodeList) {
1,425,300✔
867
  int32_t code = 0;
1,425,300✔
868
  if (!grantCheckDualReplicaDnodes(pMnode)) {
1,425,300✔
869
    TAOS_RETURN(code);
1,425,300✔
870
  }
871
  SSdb   *pSdb = pMnode->pSdb;
×
872
  SArray *pArray = taosArrayInit(nDnodes, sizeof(SDnodeObj));
×
873
  if (pArray == NULL) {
×
874
    TAOS_RETURN(code = terrno);
×
875
  }
876
  *ppDnodeList = pArray;
×
877

878
  sdbTraverse(pSdb, SDB_DNODE, mndResetDnodesArrayFp, NULL, NULL, NULL);
×
879
  sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesListFp, pArray, NULL, NULL);
×
880

881
  int32_t arrSize = taosArrayGetSize(pArray);
×
882
  if (arrSize <= 0) {
×
883
    TAOS_RETURN(code);
×
884
  }
885
  if (arrSize > 1) taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes1);
×
886

887
  int32_t dnodeListSize = taosArrayGetSize(dnodeList);
×
888
  if (dnodeListSize <= 0) {
×
889
    if (arrSize > 2) taosArrayRemoveBatch(pArray, 2, arrSize - 2, NULL);
×
890
  } else {
891
    int32_t nDnodesWithVnodes = 0;
×
892
    for (int32_t i = 0; i < arrSize; ++i) {
×
893
      SDnodeObj *pDnode = TARRAY_GET_ELEM(pArray, i);
×
894
      if (pDnode->numOfVnodes <= 0) {
×
895
        break;
×
896
      }
897
      ++nDnodesWithVnodes;
×
898
    }
899
    int32_t dnodeId = -1;
×
900
    if (nDnodesWithVnodes == 1) {
×
901
      dnodeId = ((SDnodeObj *)TARRAY_GET_ELEM(pArray, 0))->id;
×
902
    } else if (nDnodesWithVnodes >= 2) {
×
903
      // must select the dnodes from the 1st 2 dnodes
904
      taosArrayRemoveBatch(pArray, 2, arrSize - 2, NULL);
×
905
    }
906
    for (int32_t i = 0; i < TARRAY_SIZE(pArray);) {
×
907
      SDnodeObj *pDnode = taosArrayGet(pArray, i);
×
908
      if (!isDnodeInList(dnodeList, pDnode->id)) {
×
909
        taosArrayRemove(pArray, i);
×
910
        continue;
×
911
      }
912
      ++i;
×
913
    }
914
    if (nDnodesWithVnodes == 1) {
×
915
      SDnodeObj *pDnode = taosArrayGet(pArray, 0);
×
916
      if (pDnode && (pDnode->id != dnodeId)) {  // the first dnode is not in dnodeList, remove the last element
×
917
        taosArrayRemove(pArray, taosArrayGetSize(pArray) - 1);
×
918
      }
919
    }
920
  }
921

922
  TAOS_RETURN(code);
×
923
}
924
#endif
925

926
SArray *mndBuildDnodesArray(SMnode *pMnode, int32_t exceptDnodeId, SArray *dnodeList) {
1,425,300✔
927
  SSdb   *pSdb = pMnode->pSdb;
1,425,300✔
928
  int32_t numOfDnodes = mndGetDnodeSize(pMnode);
1,425,300✔
929
  SArray *tDnodeList = NULL;
1,425,300✔
930
  SArray *pDnodeList = NULL;
1,425,300✔
931

932
  SArray *pArray = taosArrayInit(numOfDnodes, sizeof(SDnodeObj));
1,425,300✔
933
  if (pArray == NULL) {
1,425,300✔
934
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
935
    return NULL;
×
936
  }
937
  if (taosArrayGetSize(dnodeList) > 0) {
1,425,300✔
938
    tDnodeList = dnodeList;
14,974✔
939
  }
940
#ifdef TD_ENTERPRISE
941
  if (0 != mndBuildNodesCheckDualReplica(pMnode, numOfDnodes, tDnodeList, &pDnodeList)) {
1,425,300✔
942
    taosArrayDestroy(pArray);
×
943
    return NULL;
×
944
  }
945
#endif
946
  sdbTraverse(pSdb, SDB_DNODE, mndResetDnodesArrayFp, NULL, NULL, NULL);
1,425,300✔
947
  sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesArrayFp, pArray, &exceptDnodeId, pDnodeList ? pDnodeList : tDnodeList);
1,425,300✔
948

949
  mDebug("build %d dnodes array", (int32_t)taosArrayGetSize(pArray));
1,425,300✔
950
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
3,404,608✔
951
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
1,979,308✔
952
    mDebug("dnode:%d, vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes);
1,979,308✔
953
  }
954
  taosArrayDestroy(pDnodeList);
1,425,300✔
955
  return pArray;
1,425,300✔
956
}
957

958
static int32_t mndCompareDnodeId(int32_t *dnode1Id, int32_t *dnode2Id) {
×
959
  if (*dnode1Id == *dnode2Id) {
×
960
    return 0;
×
961
  }
962
  return *dnode1Id > *dnode2Id ? 1 : -1;
×
963
}
964

965
static float mndGetDnodeScore(SDnodeObj *pDnode, int32_t additionDnodes, float ratio) {
10,051,482✔
966
  float totalDnodes = pDnode->numOfVnodes + (float)pDnode->numOfOtherNodes * ratio + additionDnodes;
10,051,482✔
967
  return totalDnodes / pDnode->numOfSupportVnodes;
10,051,482✔
968
}
969

970
static int32_t mndCompareDnodeVnodes(SDnodeObj *pDnode1, SDnodeObj *pDnode2) {
3,131,527✔
971
  float d1Score = mndGetDnodeScore(pDnode1, 0, 0.9);
3,131,527✔
972
  float d2Score = mndGetDnodeScore(pDnode2, 0, 0.9);
3,131,527✔
973
  if (d1Score == d2Score) {
3,131,527✔
974
    return 0;
1,029,174✔
975
  }
976
  return d1Score > d2Score ? 1 : -1;
2,102,353✔
977
}
978

979
void mndSortVnodeGid(SVgObj *pVgroup) {
2,799,630✔
980
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
5,948,565✔
981
    for (int32_t j = 0; j < pVgroup->replica - 1 - i; ++j) {
3,668,908✔
982
      if (pVgroup->vnodeGid[j].dnodeId > pVgroup->vnodeGid[j + 1].dnodeId) {
519,973✔
983
        TSWAP(pVgroup->vnodeGid[j], pVgroup->vnodeGid[j + 1]);
241,152✔
984
      }
985
    }
986
  }
987
}
2,799,630✔
988

989
static int32_t mndGetAvailableDnode(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, SArray *pArray) {
2,764,169✔
990
  mDebug("start to sort %d dnodes", (int32_t)taosArrayGetSize(pArray));
2,764,169✔
991
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
2,764,169✔
992
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
6,510,401✔
993
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
3,746,232✔
994
    mDebug("dnode:%d, score:%f", pDnode->id, mndGetDnodeScore(pDnode, 0, 0.9));
3,746,232✔
995
  }
996

997
  int32_t size = taosArrayGetSize(pArray);
2,764,169✔
998
  if (size < pVgroup->replica) {
2,764,169✔
999
    mError("db:%s, vgId:%d, no enough online dnodes:%d to alloc %d replica", pVgroup->dbName, pVgroup->vgId, size,
5,335✔
1000
           pVgroup->replica);
1001
    TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_DNODES);
5,335✔
1002
  }
1003

1004
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
5,794,011✔
1005
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
3,035,177✔
1006
    SDnodeObj *pDnode = taosArrayGet(pArray, v);
3,035,177✔
1007
    if (pDnode == NULL) {
3,035,177✔
1008
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_DNODES);
×
1009
    }
1010
    if (pDnode->numOfVnodes >= pDnode->numOfSupportVnodes) {
3,035,177✔
1011
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_VNODES);
×
1012
    }
1013

1014
    int64_t vgMem = mndGetVgroupMemory(pMnode, pDb, pVgroup);
3,035,177✔
1015
    if (pDnode->memAvail - vgMem - pDnode->memUsed <= 0) {
3,035,177✔
1016
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d, avail:%" PRId64 " used:%" PRId64,
×
1017
             pVgroup->dbName, pVgroup->vgId, vgMem, pDnode->id, pDnode->memAvail, pDnode->memUsed);
1018
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE);
×
1019
    } else {
1020
      pDnode->memUsed += vgMem;
3,035,177✔
1021
    }
1022

1023
    pVgid->dnodeId = pDnode->id;
3,035,177✔
1024
    if (pVgroup->replica == 1) {
3,035,177✔
1025
      pVgid->syncState = TAOS_SYNC_STATE_LEADER;
2,617,418✔
1026
    } else {
1027
      pVgid->syncState = TAOS_SYNC_STATE_FOLLOWER;
417,759✔
1028
    }
1029

1030
    mInfo("db:%s, vgId:%d, vn:%d is alloced, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
3,035,177✔
1031
          pVgroup->dbName, pVgroup->vgId, v, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
1032
    pDnode->numOfVnodes++;
3,035,177✔
1033
  }
1034

1035
  mndSortVnodeGid(pVgroup);
2,758,834✔
1036
  return 0;
2,758,834✔
1037
}
1038

1039
int32_t mndAllocSmaVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup) {
×
1040
  int32_t code = 0;
×
1041
  SArray *pArray = mndBuildDnodesArray(pMnode, 0, NULL);
×
1042
  if (pArray == NULL) {
×
1043
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1044
    if (terrno != 0) code = terrno;
×
1045
    TAOS_RETURN(code);
×
1046
  }
1047

1048
  pVgroup->vgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
×
1049
  pVgroup->isTsma = 1;
×
1050
  pVgroup->createdTime = taosGetTimestampMs();
×
1051
  pVgroup->updateTime = pVgroup->createdTime;
×
1052
  pVgroup->version = 1;
×
1053
  memcpy(pVgroup->dbName, pDb->name, TSDB_DB_FNAME_LEN);
×
1054
  pVgroup->dbUid = pDb->uid;
×
1055
  pVgroup->replica = 1;
×
1056
  pVgroup->keepVersion = -1;  // default: WAL keep version disabled
×
1057
  pVgroup->keepVersionTime = 0;
×
1058

1059
  if (mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray) != 0) return -1;
×
1060
  taosArrayDestroy(pArray);
×
1061

1062
  mInfo("db:%s, sma vgId:%d is alloced", pDb->name, pVgroup->vgId);
×
1063
  return 0;
×
1064
}
1065

1066
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups, SArray *dnodeList) {
1,299,534✔
1067
  int32_t code = -1;
1,299,534✔
1068
  SArray *pArray = NULL;
1,299,534✔
1069
  SVgObj *pVgroups = NULL;
1,299,534✔
1070

1071
  pVgroups = taosMemoryCalloc(pDb->cfg.numOfVgroups, sizeof(SVgObj));
1,299,534✔
1072
  if (pVgroups == NULL) {
1,299,534✔
1073
    code = terrno;
×
1074
    goto _OVER;
×
1075
  }
1076

1077
  pArray = mndBuildDnodesArray(pMnode, 0, dnodeList);
1,299,534✔
1078
  if (pArray == NULL) {
1,299,534✔
1079
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1080
    if (terrno != 0) code = terrno;
×
1081
    goto _OVER;
×
1082
  }
1083

1084
  mInfo("db:%s, total %d dnodes used to create %d vgroups (%d vnodes)", pDb->name, (int32_t)taosArrayGetSize(pArray),
1,299,534✔
1085
        pDb->cfg.numOfVgroups, pDb->cfg.numOfVgroups * pDb->cfg.replications);
1086

1087
  int32_t  allocedVgroups = 0;
1,299,534✔
1088
  int32_t  maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
1,299,534✔
1089
  uint32_t hashMin = 0;
1,299,534✔
1090
  uint32_t hashMax = UINT32_MAX;
1,299,534✔
1091
  uint32_t hashInterval = (hashMax - hashMin) / pDb->cfg.numOfVgroups;
1,299,534✔
1092

1093
  if (maxVgId < 2) maxVgId = 2;
1,299,534✔
1094

1095
  for (uint32_t v = 0; v < pDb->cfg.numOfVgroups; v++) {
4,058,368✔
1096
    SVgObj *pVgroup = &pVgroups[v];
2,764,169✔
1097
    pVgroup->vgId = maxVgId++;
2,764,169✔
1098
    pVgroup->createdTime = taosGetTimestampMs();
2,764,169✔
1099
    pVgroup->updateTime = pVgroups->createdTime;
2,764,169✔
1100
    pVgroup->version = 1;
2,764,169✔
1101
    pVgroup->hashBegin = hashMin + hashInterval * v;
2,764,169✔
1102
    if (v == pDb->cfg.numOfVgroups - 1) {
2,764,169✔
1103
      pVgroup->hashEnd = hashMax;
1,296,997✔
1104
    } else {
1105
      pVgroup->hashEnd = hashMin + hashInterval * (v + 1) - 1;
1,467,172✔
1106
    }
1107

1108
    memcpy(pVgroup->dbName, pDb->name, TSDB_DB_FNAME_LEN);
2,764,169✔
1109
    pVgroup->dbUid = pDb->uid;
2,764,169✔
1110
    pVgroup->replica = pDb->cfg.replications;
2,764,169✔
1111
    pVgroup->keepVersion = -1;  // default: WAL keep version disabled
2,764,169✔
1112
    pVgroup->keepVersionTime = 0;
2,764,169✔
1113

1114
    if ((code = mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray)) != 0) {
2,764,169✔
1115
      goto _OVER;
5,335✔
1116
    }
1117

1118
    allocedVgroups++;
2,758,834✔
1119
  }
1120

1121
  *ppVgroups = pVgroups;
1,294,199✔
1122
  code = 0;
1,294,199✔
1123

1124
  mInfo("db:%s, total %d vgroups is alloced, replica:%d", pDb->name, pDb->cfg.numOfVgroups, pDb->cfg.replications);
1,294,199✔
1125

1126
_OVER:
×
1127
  if (code != 0) taosMemoryFree(pVgroups);
1,299,534✔
1128
  taosArrayDestroy(pArray);
1,299,534✔
1129
  TAOS_RETURN(code);
1,299,534✔
1130
}
1131

1132
SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup) {
32,325,121✔
1133
  SEpSet epset = {0};
32,325,121✔
1134

1135
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
71,679,280✔
1136
    const SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
39,354,159✔
1137
    SDnodeObj       *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
39,354,159✔
1138
    if (pDnode == NULL) continue;
39,354,159✔
1139

1140
    if (pVgid->syncState == TAOS_SYNC_STATE_LEADER || pVgid->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
39,339,000✔
1141
      epset.inUse = epset.numOfEps;
31,868,148✔
1142
    }
1143

1144
    if (addEpIntoEpSet(&epset, pDnode->fqdn, pDnode->port) != 0) {
39,339,000✔
1145
      mWarn("vgId:%d, failed to add ep:%s:%d into epset", pVgroup->vgId, pDnode->fqdn, pDnode->port);
×
1146
    }
1147
    mndReleaseDnode(pMnode, pDnode);
39,339,000✔
1148
  }
1149
  epsetSort(&epset);
32,325,121✔
1150

1151
  return epset;
32,325,121✔
1152
}
1153

1154
SEpSet mndGetVgroupEpsetById(SMnode *pMnode, int32_t vgId) {
711,987✔
1155
  SEpSet epset = {0};
711,987✔
1156

1157
  SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
711,987✔
1158
  if (!pVgroup) return epset;
711,987✔
1159

1160
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
1,485,230✔
1161
    const SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
773,243✔
1162
    SDnodeObj       *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
773,243✔
1163
    if (pDnode == NULL) continue;
773,243✔
1164

1165
    if (pVgid->syncState == TAOS_SYNC_STATE_LEADER || pVgid->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
773,243✔
1166
      epset.inUse = epset.numOfEps;
682,224✔
1167
    }
1168

1169
    if (addEpIntoEpSet(&epset, pDnode->fqdn, pDnode->port) != 0) {
773,243✔
1170
      mWarn("vgId:%d, failed to add ep:%s:%d into epset", pVgroup->vgId, pDnode->fqdn, pDnode->port);
×
1171
    }
1172
    mndReleaseDnode(pMnode, pDnode);
773,243✔
1173
  }
1174

1175
  mndReleaseVgroup(pMnode, pVgroup);
711,987✔
1176
  return epset;
711,987✔
1177
}
1178

1179
static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
273,115✔
1180
  SMnode   *pMnode = pReq->info.node;
273,115✔
1181
  SSdb     *pSdb = pMnode->pSdb;
273,115✔
1182
  int32_t   numOfRows = 0;
273,115✔
1183
  SVgObj   *pVgroup = NULL;
273,115✔
1184
  SDbObj   *pVgDb = NULL;
273,115✔
1185
  int32_t   cols = 0;
273,115✔
1186
  int64_t   curMs = taosGetTimestampMs();
273,115✔
1187
  int32_t   code = 0, lino = 0;
273,115✔
1188
  SDbObj   *pDb = NULL;
273,115✔
1189
  SUserObj *pUser = NULL;
273,115✔
1190
  SDbObj   *pIterDb = NULL;
273,115✔
1191
  char      objFName[TSDB_OBJ_FNAME_LEN + 1] = {0};
273,115✔
1192
  bool      showAll = false, showIter = false;
273,115✔
1193
  int64_t   dbUid = 0;
273,115✔
1194

1195
  MND_SHOW_CHECK_OBJ_PRIVILEGE_ALL(RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), PRIV_SHOW_VGROUPS, PRIV_OBJ_DB, 0, _OVER);
273,115✔
1196

1197
  if (strlen(pShow->db) > 0) {
273,115✔
1198
    pDb = mndAcquireDb(pMnode, pShow->db);
226,339✔
1199
    if (pDb == NULL) {
226,339✔
1200
      goto _OVER;
×
1201
    }
1202
  }
1203

1204
  while (numOfRows < rows) {
1,541,532✔
1205
    pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
1,541,532✔
1206
    if (pShow->pIter == NULL) break;
1,541,532✔
1207

1208
    if (pDb != NULL && pVgroup->dbUid != pDb->uid) {
1,268,417✔
1209
      sdbRelease(pSdb, pVgroup);
379,807✔
1210
      continue;
379,807✔
1211
    }
1212

1213
    MND_SHOW_CHECK_DB_PRIVILEGE(pDb, pVgroup->dbName, pVgroup, RPC_MSG_TOKEN(pReq), MND_OPER_SHOW_VGROUPS, _OVER);
888,610✔
1214

1215
    cols = 0;
887,266✔
1216
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
887,266✔
1217
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->vgId, false, pVgroup, pShow->pIter, _OVER);
887,266✔
1218

1219
    SName name = {0};
887,266✔
1220
    char  db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
887,266✔
1221
    code = tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB);
887,266✔
1222
    if (code != 0) {
887,266✔
1223
      mError("vgId:%d, failed to set dbName, since %s", pVgroup->vgId, tstrerror(code));
×
1224
      sdbRelease(pSdb, pVgroup);
×
1225
      // sdbCancelFetch(pSdb, pShow->pIter);
1226
      goto _OVER;
×
1227
    }
1228
    (void)tNameGetDbName(&name, varDataVal(db));
887,266✔
1229
    varDataSetLen(db, strlen(varDataVal(db)));
887,266✔
1230

1231
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
887,266✔
1232
    COL_DATA_SET_VAL_GOTO((const char *)db, false, pVgroup, pShow->pIter, _OVER);
887,266✔
1233

1234
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
887,266✔
1235
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->numOfTables, false, pVgroup, pShow->pIter, _OVER);
887,266✔
1236

1237
    bool isReady = false;
887,266✔
1238
    bool isLeaderRestored = false;
887,266✔
1239
    bool hasFollowerRestored = false;
887,266✔
1240
    ESyncState leaderState = TAOS_SYNC_STATE_OFFLINE;
887,266✔
1241
    // default 3 replica, add 1 replica if move vnode
1242
    for (int32_t i = 0; i < 4; ++i) {
4,436,330✔
1243
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3,549,064✔
1244
      if (i < pVgroup->replica) {
3,549,064✔
1245
        int16_t dnodeId = (int16_t)pVgroup->vnodeGid[i].dnodeId;
1,884,991✔
1246
        COL_DATA_SET_VAL_GOTO((const char *)&dnodeId, false, pVgroup, pShow->pIter, _OVER);
1,884,991✔
1247

1248
        bool       exist = false;
1,884,991✔
1249
        bool       online = false;
1,884,991✔
1250
        SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgroup->vnodeGid[i].dnodeId);
1,884,991✔
1251
        if (pDnode != NULL) {
1,884,991✔
1252
          exist = true;
1,884,991✔
1253
          online = mndIsDnodeOnline(pDnode, curMs);
1,884,991✔
1254
          mndReleaseDnode(pMnode, pDnode);
1,884,991✔
1255
        }
1256

1257
        char buf1[20] = {0};
1,884,991✔
1258
        char role[20] = "offline";
1,884,991✔
1259
        if (!exist) {
1,884,991✔
1260
          tstrncpy(role, "dropping", sizeof(role));
×
1261
        } else if (online) {
1,884,991✔
1262
          if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEADER ||
1,866,841✔
1263
              pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
1,153,140✔
1264
            if (pVgroup->vnodeGid[i].syncRestore) {
713,701✔
1265
              isLeaderRestored = true;
619,219✔
1266
            }
1267
          } else if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_FOLLOWER) {
1,153,140✔
1268
            if (pVgroup->vnodeGid[i].syncRestore) {
965,897✔
1269
              hasFollowerRestored = true;
576,929✔
1270
            }
1271
          }
1272
          if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEADER ||
1,866,841✔
1273
              pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER)
1,153,140✔
1274
            leaderState = pVgroup->vnodeGid[i].syncState;
713,701✔
1275
          snprintf(role, sizeof(role), "%s", syncStr(pVgroup->vnodeGid[i].syncState));
1,866,841✔
1276
        }
1277
        STR_WITH_MAXSIZE_TO_VARSTR(buf1, role, pShow->pMeta->pSchemas[cols].bytes);
1,884,991✔
1278

1279
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,884,991✔
1280
        COL_DATA_SET_VAL_GOTO((const char *)buf1, false, pVgroup, pShow->pIter, _OVER);
1,884,991✔
1281

1282
        char applyStr[TSDB_SYNC_APPLY_COMMIT_LEN + 1] = {0};
1,884,991✔
1283
        char buf[TSDB_SYNC_APPLY_COMMIT_LEN + VARSTR_HEADER_SIZE + 1] = {0};
1,884,991✔
1284

1285
        if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEARNER &&
1,884,991✔
1286
            (pVgroup->vnodeGid[i].snapSeq > 0 && pVgroup->vnodeGid[i].snapSeq < SYNC_SNAPSHOT_SEQ_END)) {
41,713✔
1287
          if (pDb != NULL) {
×
1288
            mInfo("db:%s, learner progress:%d", pDb->name, pVgroup->vnodeGid[i].learnerProgress);
×
1289
          } else {
1290
            mInfo("db:null, learner progress:%d", pVgroup->vnodeGid[i].learnerProgress);
×
1291
          }
1292

1293
          snprintf(applyStr, sizeof(applyStr), "%" PRId64 "/%" PRId64 "/%" PRId64 "(snap:%d)(learner:%d)",
×
1294
                   pVgroup->vnodeGid[i].syncAppliedIndex, pVgroup->vnodeGid[i].syncCommitIndex,
×
1295
                   pVgroup->vnodeGid[i].syncTotalIndex, pVgroup->vnodeGid[i].snapSeq,
×
1296
                   pVgroup->vnodeGid[i].learnerProgress);
×
1297
        } else if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEARNER) {
1,884,991✔
1298
          if (pDb != NULL) {
41,713✔
1299
            mInfo("db:%s, learner progress:%d", pDb->name, pVgroup->vnodeGid[i].learnerProgress);
39,873✔
1300
          } else {
1301
            mInfo("db:null, learner progress:%d", pVgroup->vnodeGid[i].learnerProgress);
1,840✔
1302
          }
1303

1304
          snprintf(applyStr, sizeof(applyStr), "%" PRId64 "/%" PRId64 "/%" PRId64 "(learner:%d)",
166,852✔
1305
                   pVgroup->vnodeGid[i].syncAppliedIndex, pVgroup->vnodeGid[i].syncCommitIndex,
83,426✔
1306
                   pVgroup->vnodeGid[i].syncTotalIndex, pVgroup->vnodeGid[i].learnerProgress);
83,426✔
1307
        } else if (pVgroup->vnodeGid[i].snapSeq > 0 && pVgroup->vnodeGid[i].snapSeq < SYNC_SNAPSHOT_SEQ_END) {
1,843,278✔
1308
          snprintf(applyStr, sizeof(applyStr), "%" PRId64 "/%" PRId64 "(snap:%d)",
1,026✔
1309
                   pVgroup->vnodeGid[i].syncAppliedIndex, pVgroup->vnodeGid[i].syncCommitIndex,
684✔
1310
                   pVgroup->vnodeGid[i].snapSeq);
342✔
1311
        } else {
1312
          snprintf(applyStr, sizeof(applyStr), "%" PRId64 "/%" PRId64, pVgroup->vnodeGid[i].syncAppliedIndex,
1,842,936✔
1313
                   pVgroup->vnodeGid[i].syncCommitIndex);
1,842,936✔
1314
        }
1315

1316
        STR_WITH_MAXSIZE_TO_VARSTR(buf, applyStr, pShow->pMeta->pSchemas[cols].bytes);
1,884,991✔
1317

1318
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,884,991✔
1319
        COL_DATA_SET_VAL_GOTO((const char *)&buf, false, pVgroup, pShow->pIter, _OVER);
1,884,991✔
1320
      } else {
1321
        colDataSetNULL(pColInfo, numOfRows);
1,664,073✔
1322
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,664,073✔
1323
        colDataSetNULL(pColInfo, numOfRows);
1,664,073✔
1324
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,664,073✔
1325
        colDataSetNULL(pColInfo, numOfRows);
1,664,073✔
1326
      }
1327
    }
1328

1329
    if (pVgroup->replica >= 3) {
887,266✔
1330
      if (isLeaderRestored && hasFollowerRestored) isReady = true;
411,951✔
1331
    } else if (pVgroup->replica == 2) {
475,315✔
1332
      if (leaderState == TAOS_SYNC_STATE_LEADER) {
173,823✔
1333
        if (isLeaderRestored && hasFollowerRestored) isReady = true;
98,362✔
1334
      } else if (leaderState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
75,461✔
1335
        if (isLeaderRestored) isReady = true;
×
1336
      }
1337
    } else {
1338
      if (isLeaderRestored) isReady = true;
301,492✔
1339
    }
1340
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
887,266✔
1341
    COL_DATA_SET_VAL_GOTO((const char *)&isReady, false, pVgroup, pShow->pIter, _OVER);
887,266✔
1342

1343
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
887,266✔
1344
    int64_t cacheUsage = (int64_t)pVgroup->cacheUsage;
887,266✔
1345
    COL_DATA_SET_VAL_GOTO((const char *)&cacheUsage, false, pVgroup, pShow->pIter, _OVER);
887,266✔
1346

1347
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
887,266✔
1348
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->numOfCachedTables, false, pVgroup, pShow->pIter, _OVER);
887,266✔
1349

1350
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
887,266✔
1351
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->isTsma, false, pVgroup, pShow->pIter, _OVER);
887,266✔
1352

1353
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
887,266✔
1354
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->mountVgId, false, pVgroup, pShow->pIter, _OVER);
887,266✔
1355

1356
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
887,266✔
1357
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->keepVersion, false, pVgroup, pShow->pIter, _OVER);
887,266✔
1358

1359
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
887,266✔
1360
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->keepVersionTime, false, pVgroup, pShow->pIter, _OVER);
887,266✔
1361

1362
    numOfRows++;
887,266✔
1363
    sdbRelease(pSdb, pVgroup);
887,266✔
1364
  }
1365
_OVER:
273,115✔
1366
  if (pUser) mndReleaseUser(pMnode, pUser);
273,115✔
1367
  if (pDb != NULL) {
273,115✔
1368
    mndReleaseDb(pMnode, pDb);
226,339✔
1369
  }
1370
  if (code != 0) {
273,115✔
1371
    mError("failed to retrieve vgroup info at line %d since %s", lino, tstrerror(code));
×
1372
    TAOS_RETURN(code);
×
1373
  }
1374

1375
  pShow->numOfRows += numOfRows;
273,115✔
1376
  return numOfRows;
273,115✔
1377
}
1378

1379
static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter) {
×
1380
  SSdb *pSdb = pMnode->pSdb;
×
1381
  sdbCancelFetchByType(pSdb, pIter, SDB_VGROUP);
×
1382
}
×
1383

1384
static bool mndGetVnodesNumFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
10,638,122✔
1385
  SVgObj  *pVgroup = pObj;
10,638,122✔
1386
  int32_t  dnodeId = *(int32_t *)p1;
10,638,122✔
1387
  int32_t *pNumOfVnodes = (int32_t *)p2;
10,638,122✔
1388

1389
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
28,261,538✔
1390
    if (pVgroup->vnodeGid[v].dnodeId == dnodeId) {
17,623,416✔
1391
      (*pNumOfVnodes)++;
7,036,711✔
1392
    }
1393
  }
1394

1395
  return true;
10,638,122✔
1396
}
1397

1398
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) {
4,084,667✔
1399
  int32_t numOfVnodes = 0;
4,084,667✔
1400
  sdbTraverse(pMnode->pSdb, SDB_VGROUP, mndGetVnodesNumFp, &dnodeId, &numOfVnodes, NULL);
4,084,667✔
1401
  return numOfVnodes;
4,084,667✔
1402
}
1403

1404
int64_t mndGetVgroupMemory(SMnode *pMnode, SDbObj *pDbInput, SVgObj *pVgroup) {
8,902,672✔
1405
  SDbObj *pDb = pDbInput;
8,902,672✔
1406
  if (pDbInput == NULL) {
8,902,672✔
1407
    pDb = mndAcquireDb(pMnode, pVgroup->dbName);
5,201,633✔
1408
  }
1409

1410
  int64_t vgroupMemroy = 0;
8,902,672✔
1411
  if (pDb != NULL) {
8,902,672✔
1412
    int64_t buffer = (int64_t)pDb->cfg.buffer * 1024 * 1024;
8,902,672✔
1413
    int64_t cache = (int64_t)pDb->cfg.pages * pDb->cfg.pageSize * 1024;
8,902,672✔
1414
    vgroupMemroy = buffer + cache;
8,902,672✔
1415
    int64_t cacheLast = (int64_t)pDb->cfg.cacheLastSize * 1024 * 1024;
8,902,672✔
1416
    if (pDb->cfg.cacheLast > 0) {
8,902,672✔
1417
      vgroupMemroy += cacheLast;
1,119,747✔
1418
    }
1419
    mDebug("db:%s, vgroup:%d, buffer:%" PRId64 " cache:%" PRId64 " cacheLast:%" PRId64, pDb->name, pVgroup->vgId,
8,902,672✔
1420
           buffer, cache, cacheLast);
1421
  }
1422

1423
  if (pDbInput == NULL) {
8,902,672✔
1424
    mndReleaseDb(pMnode, pDb);
5,201,633✔
1425
  }
1426
  return vgroupMemroy;
8,902,672✔
1427
}
1428

1429
static bool mndGetVnodeMemroyFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
7,068,857✔
1430
  SVgObj  *pVgroup = pObj;
7,068,857✔
1431
  int32_t  dnodeId = *(int32_t *)p1;
7,068,857✔
1432
  int64_t *pVnodeMemory = (int64_t *)p2;
7,068,857✔
1433

1434
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
16,512,442✔
1435
    if (pVgroup->vnodeGid[v].dnodeId == dnodeId) {
9,443,585✔
1436
      *pVnodeMemory += mndGetVgroupMemory(pMnode, NULL, pVgroup);
5,031,451✔
1437
    }
1438
  }
1439

1440
  return true;
7,068,857✔
1441
}
1442

1443
int64_t mndGetVnodesMemory(SMnode *pMnode, int32_t dnodeId) {
2,029,619✔
1444
  int64_t vnodeMemory = 0;
2,029,619✔
1445
  sdbTraverse(pMnode->pSdb, SDB_VGROUP, mndGetVnodeMemroyFp, &dnodeId, &vnodeMemory, NULL);
2,029,619✔
1446
  return vnodeMemory;
2,029,619✔
1447
}
1448

1449
void calculateRstoreFinishTime(double rate, int64_t applyCount, char *restoreStr, size_t restoreStrSize) {
1,207✔
1450
  if (rate == 0) {
1,207✔
1451
    snprintf(restoreStr, restoreStrSize, "0:0:0");
1,207✔
1452
    return;
1,207✔
1453
  }
1454

1455
  int64_t costTime = applyCount / rate;
×
1456
  int64_t totalSeconds = costTime / 1000;
×
1457
  int64_t hours = totalSeconds / 3600;
×
1458
  totalSeconds %= 3600;
×
1459
  int64_t minutes = totalSeconds / 60;
×
1460
  int64_t seconds = totalSeconds % 60;
×
1461
  snprintf(restoreStr, restoreStrSize, "%" PRId64 ":%" PRId64 ":%" PRId64, hours, minutes, seconds);
×
1462
}
1463

1464
static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
55,615✔
1465
  SMnode   *pMnode = pReq->info.node;
55,615✔
1466
  SSdb     *pSdb = pMnode->pSdb;
55,615✔
1467
  int32_t   numOfRows = 0;
55,615✔
1468
  SVgObj   *pVgroup = NULL;
55,615✔
1469
  SDbObj   *pVgDb = NULL;
55,615✔
1470
  int32_t   cols = 0;
55,615✔
1471
  int64_t   curMs = taosGetTimestampMs();
55,615✔
1472
  int32_t   code = 0, lino = 0;
55,615✔
1473
  SUserObj *pUser = NULL;
55,615✔
1474
  SDbObj   *pDb = NULL, *pIterDb = NULL;
55,615✔
1475
  char      objFName[TSDB_OBJ_FNAME_LEN + 1] = {0};
55,615✔
1476
  bool      showAll = false, showIter = false;
55,615✔
1477
  int64_t   dbUid = 0;
55,615✔
1478

1479
  MND_SHOW_CHECK_OBJ_PRIVILEGE_ALL(RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), PRIV_SHOW_VNODES, PRIV_OBJ_DB, 0, _OVER);
55,615✔
1480

1481
  while (numOfRows < rows - TSDB_MAX_REPLICA) {
173,582✔
1482
    pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
173,582✔
1483
    if (pShow->pIter == NULL) break;
173,582✔
1484

1485
    MND_SHOW_CHECK_DB_PRIVILEGE(pDb, pVgroup->dbName, pVgroup, RPC_MSG_TOKEN(pReq), MND_OPER_SHOW_VNODES, _OVER);
117,967✔
1486

1487
    for (int32_t i = 0; i < pVgroup->replica && numOfRows < rows; ++i) {
302,997✔
1488
      SVnodeGid       *pGid = &pVgroup->vnodeGid[i];
185,030✔
1489
      SColumnInfoData *pColInfo = NULL;
185,030✔
1490
      cols = 0;
185,030✔
1491

1492
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
185,030✔
1493
      COL_DATA_SET_VAL_GOTO((const char *)&pGid->dnodeId, false, pVgroup, pShow->pIter, _OVER);
185,030✔
1494
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
185,030✔
1495
      COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->vgId, false, pVgroup, pShow->pIter, _OVER);
185,030✔
1496

1497
      // db_name
1498
      const char *dbname = mndGetDbStr(pVgroup->dbName);
185,030✔
1499
      char        b1[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
185,030✔
1500
      if (dbname != NULL) {
185,030✔
1501
        STR_WITH_MAXSIZE_TO_VARSTR(b1, dbname, TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE);
185,030✔
1502
      } else {
1503
        STR_WITH_MAXSIZE_TO_VARSTR(b1, "NULL", TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE);
×
1504
      }
1505
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
185,030✔
1506
      COL_DATA_SET_VAL_GOTO((const char *)b1, false, pVgroup, pShow->pIter, _OVER);
185,030✔
1507

1508
      // dnode is online?
1509
      SDnodeObj *pDnode = mndAcquireDnode(pMnode, pGid->dnodeId);
185,030✔
1510
      if (pDnode == NULL) {
185,030✔
1511
        mError("failed to acquire dnode. dnodeId:%d", pGid->dnodeId);
×
1512
        break;
×
1513
      }
1514
      bool isDnodeOnline = mndIsDnodeOnline(pDnode, curMs);
185,030✔
1515
      sdbRelease(pSdb, pDnode);
185,030✔
1516

1517
      char       buf[20] = {0};
185,030✔
1518
      ESyncState syncState = (isDnodeOnline) ? pGid->syncState : TAOS_SYNC_STATE_OFFLINE;
185,030✔
1519
      STR_TO_VARSTR(buf, syncStr(syncState));
185,030✔
1520
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
185,030✔
1521
      COL_DATA_SET_VAL_GOTO((const char *)buf, false, pVgroup, pShow->pIter, _OVER);
185,030✔
1522

1523
      int64_t roleTimeMs = (isDnodeOnline) ? pGid->roleTimeMs : 0;
185,030✔
1524
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
185,030✔
1525
      COL_DATA_SET_VAL_GOTO((const char *)&roleTimeMs, false, pVgroup, pShow->pIter, _OVER);
185,030✔
1526

1527
      int64_t startTimeMs = (isDnodeOnline) ? pGid->startTimeMs : 0;
185,030✔
1528
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
185,030✔
1529
      COL_DATA_SET_VAL_GOTO((const char *)&startTimeMs, false, pVgroup, pShow->pIter, _OVER);
185,030✔
1530

1531
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
185,030✔
1532
      COL_DATA_SET_VAL_GOTO((const char *)&pGid->syncRestore, false, pVgroup, pShow->pIter, _OVER);
185,030✔
1533

1534
      int64_t unappliedCount = pGid->syncCommitIndex - pGid->syncAppliedIndex;
185,030✔
1535
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
185,030✔
1536
      char restoreStr[20] = {0};
185,030✔
1537
      if (unappliedCount > 0) {
185,030✔
1538
        calculateRstoreFinishTime(pGid->appliedRate, unappliedCount, restoreStr, sizeof(restoreStr));
1,207✔
1539
      }
1540
      STR_TO_VARSTR(buf, restoreStr);
185,030✔
1541
      COL_DATA_SET_VAL_GOTO((const char *)&buf, false, pVgroup, pShow->pIter, _OVER);
185,030✔
1542

1543
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
185,030✔
1544
      COL_DATA_SET_VAL_GOTO((const char *)&unappliedCount, false, pVgroup, pShow->pIter, _OVER);
185,030✔
1545

1546
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
185,030✔
1547
      COL_DATA_SET_VAL_GOTO((const char *)&pGid->bufferSegmentUsed, false, pVgroup, pShow->pIter, _OVER);
185,030✔
1548

1549
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
185,030✔
1550
      COL_DATA_SET_VAL_GOTO((const char *)&pGid->bufferSegmentSize, false, pVgroup, pShow->pIter, _OVER);
185,030✔
1551

1552
      numOfRows++;
185,030✔
1553
    }
1554
    sdbRelease(pSdb, pVgroup);
117,967✔
1555
  }
1556
_OVER:
55,615✔
1557
  if (pUser) mndReleaseUser(pMnode, pUser);
55,615✔
1558
  if (pDb) mndReleaseDb(pMnode, pDb);
55,615✔
1559
  if (code != 0) {
55,615✔
1560
    mError("failed to retrieve vnode info at line %d since %s", lino, tstrerror(code));
×
1561
    return code;
×
1562
  }
1563
  pShow->numOfRows += numOfRows;
55,615✔
1564
  return numOfRows;
55,615✔
1565
}
1566

1567
static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter) {
×
1568
  SSdb *pSdb = pMnode->pSdb;
×
1569
  sdbCancelFetchByType(pSdb, pIter, SDB_VGROUP);
×
1570
}
×
1571

1572
static int32_t mndAddVnodeToVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray) {
99,768✔
1573
  int32_t code = 0;
99,768✔
1574
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
99,768✔
1575
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
401,016✔
1576
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
301,248✔
1577
    mInfo("trans:%d, dnode:%d, equivalent vnodes:%d others:%d", pTrans->id, pDnode->id, pDnode->numOfVnodes,
301,248✔
1578
          pDnode->numOfOtherNodes);
1579
  }
1580

1581
  SVnodeGid *pVgid = &pVgroup->vnodeGid[pVgroup->replica];
99,768✔
1582
  for (int32_t d = 0; d < taosArrayGetSize(pArray); ++d) {
122,171✔
1583
    SDnodeObj *pDnode = taosArrayGet(pArray, d);
119,324✔
1584

1585
    bool used = false;
119,324✔
1586
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
281,109✔
1587
      if (pDnode->id == pVgroup->vnodeGid[vn].dnodeId) {
184,188✔
1588
        used = true;
22,403✔
1589
        break;
22,403✔
1590
      }
1591
    }
1592
    if (used) continue;
119,324✔
1593

1594
    if (pDnode == NULL) {
96,921✔
1595
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_DNODES);
×
1596
    }
1597
    if (pDnode->numOfVnodes >= pDnode->numOfSupportVnodes) {
96,921✔
1598
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_VNODES);
×
1599
    }
1600

1601
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
96,921✔
1602
    if (pDnode->memAvail - vgMem - pDnode->memUsed <= 0) {
96,921✔
1603
      mError("trans:%d, db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
1604
             pTrans->id, pVgroup->dbName, pVgroup->vgId, vgMem, pDnode->id, pDnode->memAvail, pDnode->memUsed);
1605
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE);
×
1606
    } else {
1607
      pDnode->memUsed += vgMem;
96,921✔
1608
    }
1609

1610
    pVgid->dnodeId = pDnode->id;
96,921✔
1611
    pVgid->syncState = TAOS_SYNC_STATE_OFFLINE;
96,921✔
1612
    mInfo("trans:%id, db:%s, vgId:%d, vn:%d is added, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
96,921✔
1613
          pTrans->id, pVgroup->dbName, pVgroup->vgId, pVgroup->replica, vgMem, pVgid->dnodeId, pDnode->memAvail,
1614
          pDnode->memUsed);
1615

1616
    pVgroup->replica++;
96,921✔
1617
    pDnode->numOfVnodes++;
96,921✔
1618

1619
    SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
96,921✔
1620
    if (pVgRaw == NULL) {
96,921✔
1621
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1622
      if (terrno != 0) code = terrno;
×
1623
      TAOS_RETURN(code);
×
1624
    }
1625
    if ((code = mndTransAppendGroupRedolog(pTrans, pVgRaw, pVgroup->vgId)) != 0) {
96,921✔
1626
      sdbFreeRaw(pVgRaw);
×
1627
      TAOS_RETURN(code);
×
1628
    }
1629
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
96,921✔
1630
    if (code != 0) {
96,921✔
1631
      mError("trans:%d, vgId:%d, failed to set raw status since %s at line:%d", pTrans->id, pVgroup->vgId,
×
1632
             tstrerror(code), __LINE__);
1633
    }
1634
    TAOS_RETURN(code);
96,921✔
1635
  }
1636

1637
  code = TSDB_CODE_MND_NO_ENOUGH_DNODES;
2,847✔
1638
  mError("trans:%d, db:%s, failed to add vnode to vgId:%d since %s", pTrans->id, pVgroup->dbName, pVgroup->vgId,
2,847✔
1639
         tstrerror(code));
1640
  TAOS_RETURN(code);
2,847✔
1641
}
1642

1643
static int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray,
20,578✔
1644
                                        SVnodeGid *pDelVgid) {
1645
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
20,578✔
1646
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
79,098✔
1647
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
58,520✔
1648
    mInfo("trans:%d, dnode:%d, equivalent vnodes:%d others:%d", pTrans->id, pDnode->id, pDnode->numOfVnodes,
58,520✔
1649
          pDnode->numOfOtherNodes);
1650
  }
1651

1652
  int32_t code = -1;
20,578✔
1653
  for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) {
23,308✔
1654
    SDnodeObj *pDnode = taosArrayGet(pArray, d);
22,409✔
1655

1656
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
41,256✔
1657
      SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
38,526✔
1658
      if (pVgid->dnodeId == pDnode->id) {
38,526✔
1659
        int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
19,679✔
1660
        pDnode->memUsed -= vgMem;
19,679✔
1661
        mInfo("trans:%d, db:%s, vgId:%d, vn:%d is removed, memory:%" PRId64 ", dnode:%d avail:%" PRId64
19,679✔
1662
              " used:%" PRId64,
1663
              pTrans->id, pVgroup->dbName, pVgroup->vgId, vn, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
1664
        pDnode->numOfVnodes--;
19,679✔
1665
        pVgroup->replica--;
19,679✔
1666
        *pDelVgid = *pVgid;
19,679✔
1667
        *pVgid = pVgroup->vnodeGid[pVgroup->replica];
19,679✔
1668
        memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
19,679✔
1669
        code = 0;
19,679✔
1670
        goto _OVER;
19,679✔
1671
      }
1672
    }
1673
  }
1674

1675
_OVER:
899✔
1676
  if (code != 0) {
20,578✔
1677
    code = TSDB_CODE_APP_ERROR;
899✔
1678
    mError("trans:%d, db:%s, failed to remove vnode from vgId:%d since %s", pTrans->id, pVgroup->dbName, pVgroup->vgId,
899✔
1679
           tstrerror(code));
1680
    TAOS_RETURN(code);
899✔
1681
  }
1682

1683
  for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
55,462✔
1684
    SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
35,783✔
1685
    mInfo("trans:%d, db:%s, vgId:%d, vn:%d dnode:%d is reserved", pTrans->id, pVgroup->dbName, pVgroup->vgId, vn,
35,783✔
1686
          pVgid->dnodeId);
1687
  }
1688

1689
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
19,679✔
1690
  if (pVgRaw == NULL) {
19,679✔
1691
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1692
    if (terrno != 0) code = terrno;
×
1693
    TAOS_RETURN(code);
×
1694
  }
1695
  if (mndTransAppendGroupRedolog(pTrans, pVgRaw, pVgroup->vgId) != 0) {
19,679✔
1696
    sdbFreeRaw(pVgRaw);
×
1697
    TAOS_RETURN(code);
×
1698
  }
1699
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
19,679✔
1700
  if (code != 0) {
19,679✔
1701
    mError("trans:%d, vgId:%d, failed to set raw status since %s at line:%d", pTrans->id, pVgroup->vgId,
×
1702
           tstrerror(code), __LINE__);
1703
  }
1704

1705
  TAOS_RETURN(code);
19,679✔
1706
}
1707

1708
static int32_t mndRemoveVnodeFromVgroupWithoutSave(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray,
×
1709
                                                   SVnodeGid *pDelVgid) {
1710
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
×
1711
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
×
1712
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
×
1713
    mInfo("dnode:%d, equivalent vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes);
×
1714
  }
1715

1716
  int32_t code = -1;
×
1717
  for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) {
×
1718
    SDnodeObj *pDnode = taosArrayGet(pArray, d);
×
1719

1720
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
×
1721
      SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
×
1722
      if (pVgid->dnodeId == pDnode->id) {
×
1723
        int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
×
1724
        pDnode->memUsed -= vgMem;
×
1725
        mInfo("db:%s, vgId:%d, vn:%d is removed, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
×
1726
              pVgroup->dbName, pVgroup->vgId, vn, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
1727
        pDnode->numOfVnodes--;
×
1728
        pVgroup->replica--;
×
1729
        *pDelVgid = *pVgid;
×
1730
        *pVgid = pVgroup->vnodeGid[pVgroup->replica];
×
1731
        memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
×
1732
        code = 0;
×
1733
        goto _OVER;
×
1734
      }
1735
    }
1736
  }
1737

1738
_OVER:
×
1739
  if (code != 0) {
×
1740
    code = TSDB_CODE_APP_ERROR;
×
1741
    mError("db:%s, failed to remove vnode from vgId:%d since %s", pVgroup->dbName, pVgroup->vgId, tstrerror(code));
×
1742
    TAOS_RETURN(code);
×
1743
  }
1744

1745
  for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
×
1746
    SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
×
1747
    mInfo("db:%s, vgId:%d, vn:%d dnode:%d is reserved", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId);
×
1748
  }
1749

1750
  TAOS_RETURN(code);
×
1751
}
1752

1753
int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid) {
3,201,320✔
1754
  int32_t      code = 0;
3,201,320✔
1755
  STransAction action = {0};
3,201,320✔
1756

1757
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
3,201,320✔
1758
  if (pDnode == NULL) return -1;
3,201,320✔
1759
  action.epSet = mndGetDnodeEpset(pDnode);
3,201,320✔
1760
  mndReleaseDnode(pMnode, pDnode);
3,201,320✔
1761

1762
  int32_t contLen = 0;
3,201,320✔
1763
  void   *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
3,201,320✔
1764
  if (pReq == NULL) return -1;
3,201,320✔
1765

1766
  action.pCont = pReq;
3,201,320✔
1767
  action.contLen = contLen;
3,201,320✔
1768
  action.msgType = TDMT_DND_CREATE_VNODE;
3,201,320✔
1769
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
3,201,320✔
1770
  action.groupId = pVgroup->vgId;
3,201,320✔
1771

1772
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
3,201,320✔
1773
    taosMemoryFree(pReq);
×
1774
    TAOS_RETURN(code);
×
1775
  }
1776

1777
  TAOS_RETURN(code);
3,201,320✔
1778
}
1779

1780
int32_t mndRestoreAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
2,455✔
1781
                                       SDnodeObj *pDnode) {
1782
  int32_t      code = 0;
2,455✔
1783
  STransAction action = {0};
2,455✔
1784

1785
  action.epSet = mndGetDnodeEpset(pDnode);
2,455✔
1786

1787
  int32_t contLen = 0;
2,455✔
1788
  void   *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
2,455✔
1789
  if (pReq == NULL) {
2,455✔
1790
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1791
    if (terrno != 0) code = terrno;
×
1792
    TAOS_RETURN(code);
×
1793
  }
1794

1795
  action.pCont = pReq;
2,455✔
1796
  action.contLen = contLen;
2,455✔
1797
  action.msgType = TDMT_DND_CREATE_VNODE;
2,455✔
1798
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
2,455✔
1799
  action.groupId = pVgroup->vgId;
2,455✔
1800

1801
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
2,455✔
1802
    taosMemoryFree(pReq);
×
1803
    TAOS_RETURN(code);
×
1804
  }
1805

1806
  TAOS_RETURN(code);
2,455✔
1807
}
1808

1809
int32_t mndAddAlterVnodeConfirmAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
321,782✔
1810
  int32_t      code = 0;
321,782✔
1811
  STransAction action = {0};
321,782✔
1812
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
321,782✔
1813

1814
  mInfo("trans:%d, vgId:%d, build alter vnode confirm req", pTrans->id, pVgroup->vgId);
321,782✔
1815
  int32_t   contLen = sizeof(SMsgHead);
321,782✔
1816
  SMsgHead *pHead = taosMemoryMalloc(contLen);
321,782✔
1817
  if (pHead == NULL) {
321,782✔
1818
    TAOS_RETURN(terrno);
×
1819
  }
1820

1821
  pHead->contLen = htonl(contLen);
321,782✔
1822
  pHead->vgId = htonl(pVgroup->vgId);
321,782✔
1823

1824
  action.pCont = pHead;
321,782✔
1825
  action.contLen = contLen;
321,782✔
1826
  action.msgType = TDMT_VND_ALTER_CONFIRM;
321,782✔
1827
  // incorrect redirect result will cause this erro
1828
  action.retryCode = TSDB_CODE_VND_INVALID_VGROUP_ID;
321,782✔
1829
  action.groupId = pVgroup->vgId;
321,782✔
1830

1831
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
321,782✔
1832
    taosMemoryFree(pHead);
×
1833
    TAOS_RETURN(code);
×
1834
  }
1835

1836
  TAOS_RETURN(code);
321,782✔
1837
}
1838

1839
int32_t mndAddChangeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pOldVgroup, SVgObj *pNewVgroup,
×
1840
                                 int32_t dnodeId) {
1841
  int32_t      code = 0;
×
1842
  STransAction action = {0};
×
1843
  action.epSet = mndGetVgroupEpset(pMnode, pNewVgroup);
×
1844

1845
  int32_t contLen = 0;
×
1846
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pNewVgroup, dnodeId, &contLen);
×
1847
  if (pReq == NULL) {
×
1848
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1849
    if (terrno != 0) code = terrno;
×
1850
    TAOS_RETURN(code);
×
1851
  }
1852

1853
  int32_t totallen = contLen + sizeof(SMsgHead);
×
1854

1855
  SMsgHead *pHead = taosMemoryMalloc(totallen);
×
1856
  if (pHead == NULL) {
×
1857
    taosMemoryFree(pReq);
×
1858
    TAOS_RETURN(terrno);
×
1859
  }
1860

1861
  pHead->contLen = htonl(totallen);
×
1862
  pHead->vgId = htonl(pNewVgroup->vgId);
×
1863

1864
  memcpy((void *)(pHead + 1), pReq, contLen);
×
1865
  taosMemoryFree(pReq);
×
1866

1867
  action.pCont = pHead;
×
1868
  action.contLen = totallen;
×
1869
  action.msgType = TDMT_SYNC_CONFIG_CHANGE;
×
1870

1871
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
1872
    taosMemoryFree(pHead);
×
1873
    TAOS_RETURN(code);
×
1874
  }
1875

1876
  TAOS_RETURN(code);
×
1877
}
1878

1879
static int32_t mndAddAlterVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, int32_t srcVgId, SVgObj *pVgroup) {
39,914✔
1880
  int32_t      code = 0;
39,914✔
1881
  STransAction action = {0};
39,914✔
1882
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
39,914✔
1883

1884
  int32_t contLen = 0;
39,914✔
1885
  void   *pReq = mndBuildAlterVnodeHashRangeReq(pMnode, srcVgId, pVgroup, &contLen);
39,914✔
1886
  if (pReq == NULL) {
39,914✔
1887
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1888
    if (terrno != 0) code = terrno;
×
1889
    TAOS_RETURN(code);
×
1890
  }
1891

1892
  action.pCont = pReq;
39,914✔
1893
  action.contLen = contLen;
39,914✔
1894
  action.msgType = TDMT_VND_ALTER_HASHRANGE;
39,914✔
1895
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
39,914✔
1896

1897
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
39,914✔
1898
    taosMemoryFree(pReq);
×
1899
    TAOS_RETURN(code);
×
1900
  }
1901

1902
  mInfo("trans:%d, add alter vnode hash range action for from vgId:%d to vgId:%d", pTrans->id, srcVgId, pVgroup->vgId);
39,914✔
1903
  TAOS_RETURN(code);
39,914✔
1904
}
1905

1906
int32_t mndAddAlterVnodeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
208,227✔
1907
  int32_t      code = 0;
208,227✔
1908
  STransAction action = {0};
208,227✔
1909
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
208,227✔
1910

1911
  int32_t contLen = 0;
208,227✔
1912
  void   *pReq = mndBuildAlterVnodeConfigReq(pMnode, pDb, pVgroup, &contLen);
208,227✔
1913
  if (pReq == NULL) {
208,227✔
1914
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1915
    if (terrno != 0) code = terrno;
×
1916
    TAOS_RETURN(code);
×
1917
  }
1918

1919
  action.pCont = pReq;
208,227✔
1920
  action.contLen = contLen;
208,227✔
1921
  action.msgType = TDMT_VND_ALTER_CONFIG;
208,227✔
1922
  action.groupId = pVgroup->vgId;
208,227✔
1923

1924
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
208,227✔
1925
    taosMemoryFree(pReq);
×
1926
    TAOS_RETURN(code);
×
1927
  }
1928

1929
  TAOS_RETURN(code);
208,227✔
1930
}
1931

1932
int32_t mndAddNewVgPrepareAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVg) {
2,799,860✔
1933
  int32_t  code = 0;
2,799,860✔
1934
  SSdbRaw *pRaw = mndVgroupActionEncode(pVg);
2,799,860✔
1935
  if (pRaw == NULL) {
2,799,860✔
1936
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1937
    if (terrno != 0) code = terrno;
×
1938
    goto _err;
×
1939
  }
1940

1941
  TAOS_CHECK_GOTO(mndTransAppendPrepareLog(pTrans, pRaw), NULL, _err);
2,799,860✔
1942
  if (sdbSetRawStatus(pRaw, SDB_STATUS_CREATING) != 0) {
2,799,860✔
1943
    mError("vgId:%d, failed to set raw status at line:%d", pVg->vgId, __LINE__);
×
1944
  }
1945
  if (code != 0) {
2,799,860✔
1946
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVg->vgId, tstrerror(code), __LINE__);
×
1947
    TAOS_RETURN(code);
×
1948
  }
1949
  pRaw = NULL;
2,799,860✔
1950
  TAOS_RETURN(code);
2,799,860✔
1951

1952
_err:
×
1953
  sdbFreeRaw(pRaw);
×
1954
  TAOS_RETURN(code);
×
1955
}
1956

1957
int32_t mndAddAlterVnodeReplicaAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
763,683✔
1958
  int32_t    code = 0;
763,683✔
1959
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
763,683✔
1960
  if (pDnode == NULL) {
763,683✔
1961
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1962
    if (terrno != 0) code = terrno;
×
1963
    TAOS_RETURN(code);
×
1964
  }
1965

1966
  STransAction action = {0};
763,683✔
1967
  action.epSet = mndGetDnodeEpset(pDnode);
763,683✔
1968
  mndReleaseDnode(pMnode, pDnode);
763,683✔
1969

1970
  int32_t contLen = 0;
763,683✔
1971
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
763,683✔
1972
  if (pReq == NULL) {
763,683✔
1973
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1974
    if (terrno != 0) code = terrno;
×
1975
    TAOS_RETURN(code);
×
1976
  }
1977

1978
  action.pCont = pReq;
763,683✔
1979
  action.contLen = contLen;
763,683✔
1980
  action.msgType = TDMT_VND_ALTER_REPLICA;
763,683✔
1981
  action.groupId = pVgroup->vgId;
763,683✔
1982

1983
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
763,683✔
1984
    taosMemoryFree(pReq);
×
1985
    TAOS_RETURN(code);
×
1986
  }
1987

1988
  TAOS_RETURN(code);
763,683✔
1989
}
1990

1991
int32_t mndAddCheckLearnerCatchupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
×
1992
  int32_t    code = 0;
×
1993
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
×
1994
  if (pDnode == NULL) {
×
1995
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1996
    if (terrno != 0) code = terrno;
×
1997
    TAOS_RETURN(code);
×
1998
  }
1999

2000
  STransAction action = {0};
×
2001
  action.epSet = mndGetDnodeEpset(pDnode);
×
2002
  mndReleaseDnode(pMnode, pDnode);
×
2003

2004
  int32_t contLen = 0;
×
2005
  void   *pReq = mndBuildCheckLearnCatchupReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
×
2006
  if (pReq == NULL) {
×
2007
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2008
    if (terrno != 0) code = terrno;
×
2009
    TAOS_RETURN(code);
×
2010
  }
2011

2012
  action.pCont = pReq;
×
2013
  action.contLen = contLen;
×
2014
  action.msgType = TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP;
×
2015
  action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
2016
  action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
×
2017

2018
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
2019
    taosMemoryFree(pReq);
×
2020
    TAOS_RETURN(code);
×
2021
  }
2022

2023
  TAOS_RETURN(code);
×
2024
}
2025

2026
int32_t mndAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
115,237✔
2027
  int32_t    code = 0;
115,237✔
2028
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
115,237✔
2029
  if (pDnode == NULL) {
115,237✔
2030
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2031
    if (terrno != 0) code = terrno;
×
2032
    TAOS_RETURN(code);
×
2033
  }
2034

2035
  STransAction action = {0};
115,237✔
2036
  action.epSet = mndGetDnodeEpset(pDnode);
115,237✔
2037
  mndReleaseDnode(pMnode, pDnode);
115,237✔
2038

2039
  int32_t contLen = 0;
115,237✔
2040
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
115,237✔
2041
  if (pReq == NULL) {
115,237✔
2042
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2043
    if (terrno != 0) code = terrno;
×
2044
    TAOS_RETURN(code);
×
2045
  }
2046

2047
  action.pCont = pReq;
115,237✔
2048
  action.contLen = contLen;
115,237✔
2049
  action.msgType = TDMT_DND_ALTER_VNODE_TYPE;
115,237✔
2050
  action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
115,237✔
2051
  action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
115,237✔
2052
  action.groupId = pVgroup->vgId;
115,237✔
2053

2054
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
115,237✔
2055
    taosMemoryFree(pReq);
×
2056
    TAOS_RETURN(code);
×
2057
  }
2058

2059
  TAOS_RETURN(code);
115,237✔
2060
}
2061

2062
int32_t mndRestoreAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
2,455✔
2063
                                          SDnodeObj *pDnode) {
2064
  int32_t      code = 0;
2,455✔
2065
  STransAction action = {0};
2,455✔
2066
  action.epSet = mndGetDnodeEpset(pDnode);
2,455✔
2067

2068
  int32_t contLen = 0;
2,455✔
2069
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, pDnode->id, &contLen);
2,455✔
2070
  if (pReq == NULL) {
2,455✔
2071
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2072
    if (terrno != 0) code = terrno;
×
2073
    TAOS_RETURN(code);
×
2074
  }
2075

2076
  action.pCont = pReq;
2,455✔
2077
  action.contLen = contLen;
2,455✔
2078
  action.msgType = TDMT_DND_ALTER_VNODE_TYPE;
2,455✔
2079
  action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
2,455✔
2080
  action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
2,455✔
2081
  action.groupId = pVgroup->vgId;
2,455✔
2082

2083
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
2,455✔
2084
    taosMemoryFree(pReq);
×
2085
    TAOS_RETURN(code);
×
2086
  }
2087

2088
  TAOS_RETURN(code);
2,455✔
2089
}
2090

2091
static int32_t mndAddDisableVnodeWriteAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
39,914✔
2092
                                             int32_t dnodeId) {
2093
  int32_t    code = 0;
39,914✔
2094
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
39,914✔
2095
  if (pDnode == NULL) {
39,914✔
2096
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2097
    if (terrno != 0) code = terrno;
×
2098
    TAOS_RETURN(code);
×
2099
  }
2100

2101
  STransAction action = {0};
39,914✔
2102
  action.epSet = mndGetDnodeEpset(pDnode);
39,914✔
2103
  mndReleaseDnode(pMnode, pDnode);
39,914✔
2104

2105
  int32_t contLen = 0;
39,914✔
2106
  void   *pReq = mndBuildDisableVnodeWriteReq(pMnode, pDb, pVgroup->vgId, &contLen);
39,914✔
2107
  if (pReq == NULL) {
39,914✔
2108
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2109
    if (terrno != 0) code = terrno;
×
2110
    TAOS_RETURN(code);
×
2111
  }
2112

2113
  action.pCont = pReq;
39,914✔
2114
  action.contLen = contLen;
39,914✔
2115
  action.msgType = TDMT_VND_DISABLE_WRITE;
39,914✔
2116

2117
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
39,914✔
2118
    taosMemoryFree(pReq);
×
2119
    TAOS_RETURN(code);
×
2120
  }
2121

2122
  TAOS_RETURN(code);
39,914✔
2123
}
2124

2125
int32_t mndAddDropVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid,
4,712,019✔
2126
                              bool isRedo) {
2127
  int32_t      code = 0;
4,712,019✔
2128
  STransAction action = {0};
4,712,019✔
2129

2130
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
4,712,019✔
2131
  if (pDnode == NULL) {
4,712,019✔
2132
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2133
    if (terrno != 0) code = terrno;
×
2134
    TAOS_RETURN(code);
×
2135
  }
2136
  action.epSet = mndGetDnodeEpset(pDnode);
4,712,019✔
2137
  mndReleaseDnode(pMnode, pDnode);
4,712,019✔
2138

2139
  int32_t contLen = 0;
4,712,019✔
2140
  void   *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
4,712,019✔
2141
  if (pReq == NULL) {
4,712,019✔
2142
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2143
    if (terrno != 0) code = terrno;
×
2144
    TAOS_RETURN(code);
×
2145
  }
2146

2147
  action.pCont = pReq;
4,712,019✔
2148
  action.contLen = contLen;
4,712,019✔
2149
  action.msgType = TDMT_DND_DROP_VNODE;
4,712,019✔
2150
  action.acceptableCode = TSDB_CODE_VND_NOT_EXIST;
4,712,019✔
2151
  action.groupId = pVgroup->vgId;
4,712,019✔
2152

2153
  if (isRedo) {
4,712,019✔
2154
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
1,676,842✔
2155
      taosMemoryFree(pReq);
×
2156
      TAOS_RETURN(code);
×
2157
    }
2158
  } else {
2159
    if ((code = mndTransAppendUndoAction(pTrans, &action)) != 0) {
3,035,177✔
2160
      taosMemoryFree(pReq);
×
2161
      TAOS_RETURN(code);
×
2162
    }
2163
  }
2164

2165
  TAOS_RETURN(code);
4,712,019✔
2166
}
2167

2168
int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t vnIndex,
15,165✔
2169
                                    SArray *pArray, bool force, bool unsafe) {
2170
  int32_t code = 0;
15,165✔
2171
  SVgObj  newVg = {0};
15,165✔
2172
  memcpy(&newVg, pVgroup, sizeof(SVgObj));
15,165✔
2173

2174
  mInfo("vgId:%d, trans:%d, vgroup info before move, replica:%d", newVg.vgId, pTrans->id, newVg.replica);
15,165✔
2175
  for (int32_t i = 0; i < newVg.replica; ++i) {
48,980✔
2176
    mInfo("vgId:%d, trans:%d, vnode:%d dnode:%d", newVg.vgId, pTrans->id, i, newVg.vnodeGid[i].dnodeId);
33,815✔
2177
  }
2178

2179
  if (!force) {
15,165✔
2180
#if 1
2181
    {
2182
#else
2183
    if (newVg.replica == 1) {
2184
#endif
2185
      mInfo("vgId:%d, trans:%d, will add 1 vnode, replca:%d", pVgroup->vgId, pTrans->id, newVg.replica);
15,165✔
2186
      TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray));
15,165✔
2187
      for (int32_t i = 0; i < newVg.replica - 1; ++i) {
48,980✔
2188
        TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId));
33,815✔
2189
      }
2190
      TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[newVg.replica - 1]));
15,165✔
2191
      TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg));
15,165✔
2192

2193
      mInfo("vgId:%d, trans:%d, will remove 1 vnode, replca:2", pVgroup->vgId, pTrans->id);
15,165✔
2194
      newVg.replica--;
15,165✔
2195
      SVnodeGid del = newVg.vnodeGid[vnIndex];
15,165✔
2196
      newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
15,165✔
2197
      memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
15,165✔
2198
      {
2199
        SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
15,165✔
2200
        if (pRaw == NULL) {
15,165✔
2201
          code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2202
          if (terrno != 0) code = terrno;
×
2203
          TAOS_RETURN(code);
×
2204
        }
2205
        if ((code = mndTransAppendGroupRedolog(pTrans, pRaw, pVgroup->vgId)) != 0) {
15,165✔
2206
          sdbFreeRaw(pRaw);
×
2207
          TAOS_RETURN(code);
×
2208
        }
2209
        code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
15,165✔
2210
        if (code != 0) {
15,165✔
2211
          mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2212
          return code;
×
2213
        }
2214
      }
2215

2216
      TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true));
15,165✔
2217
      for (int32_t i = 0; i < newVg.replica; ++i) {
48,980✔
2218
        TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId));
33,815✔
2219
      }
2220
      TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg));
15,165✔
2221
#if 1
2222
    }
2223
#else
2224
    } else {  // new replica == 3
2225
      mInfo("vgId:%d, will add 1 vnode, replca:3", pVgroup->vgId);
2226
      if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1;
2227
      mInfo("vgId:%d, will remove 1 vnode, replca:4", pVgroup->vgId);
2228
      newVg.replica--;
2229
      SVnodeGid del = newVg.vnodeGid[vnIndex];
2230
      newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
2231
      memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
2232
      {
2233
        SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
2234
        if (pRaw == NULL) return -1;
2235
        if (mndTransAppendRedolog(pTrans, pRaw) != 0) {
2236
          sdbFreeRaw(pRaw);
2237
          return -1;
2238
        }
2239
      }
2240

2241
      if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true) != 0) return -1;
2242
      for (int32_t i = 0; i < newVg.replica; ++i) {
2243
        if (i == vnIndex) continue;
2244
        if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1;
2245
      }
2246
      if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[vnIndex]) != 0) return -1;
2247
      if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
2248
    }
2249
#endif
2250
  } else {
2251
    mInfo("vgId:%d, will add 1 vnode and force remove 1 vnode", pVgroup->vgId);
×
2252
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray));
×
2253
    newVg.replica--;
×
2254
    // SVnodeGid del = newVg.vnodeGid[vnIndex];
2255
    newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
×
2256
    memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
×
2257
    {
2258
      SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
×
2259
      if (pRaw == NULL) {
×
2260
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2261
        if (terrno != 0) code = terrno;
×
2262
        TAOS_RETURN(code);
×
2263
      }
2264
      if ((code = mndTransAppendGroupRedolog(pTrans, pRaw, pVgroup->vgId)) != 0) {
×
2265
        sdbFreeRaw(pRaw);
×
2266
        TAOS_RETURN(code);
×
2267
      }
2268
      code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
×
2269
      if (code != 0) {
×
2270
        mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2271
        return code;
×
2272
      }
2273
    }
2274

2275
    for (int32_t i = 0; i < newVg.replica; ++i) {
×
2276
      if (i != vnIndex) {
×
2277
        TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId));
×
2278
      }
2279
    }
2280
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[vnIndex]));
×
2281
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg));
×
2282

2283
    if (newVg.replica == 1) {
×
2284
      if (force && !unsafe) {
×
2285
        TAOS_RETURN(TSDB_CODE_VND_META_DATA_UNSAFE_DELETE);
×
2286
      }
2287

2288
      SSdb *pSdb = pMnode->pSdb;
×
2289
      void *pIter = NULL;
×
2290

2291
      while (1) {
×
2292
        SStbObj *pStb = NULL;
×
2293
        pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
×
2294
        if (pIter == NULL) break;
×
2295

2296
        if (strcmp(pStb->db, pDb->name) == 0) {
×
2297
          if ((code = mndSetForceDropCreateStbRedoActions(pMnode, pTrans, &newVg, pStb)) != 0) {
×
2298
            sdbCancelFetch(pSdb, pIter);
×
2299
            sdbRelease(pSdb, pStb);
×
2300
            TAOS_RETURN(code);
×
2301
          }
2302
        }
2303

2304
        sdbRelease(pSdb, pStb);
×
2305
      }
2306

2307
      mInfo("vgId:%d, all data is dropped since replica=1", pVgroup->vgId);
×
2308
    }
2309
  }
2310

2311
  {
2312
    SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
15,165✔
2313
    if (pRaw == NULL) {
15,165✔
2314
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2315
      if (terrno != 0) code = terrno;
×
2316
      TAOS_RETURN(code);
×
2317
    }
2318
    if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
15,165✔
2319
      sdbFreeRaw(pRaw);
×
2320
      TAOS_RETURN(code);
×
2321
    }
2322
    code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
15,165✔
2323
    if (code != 0) {
15,165✔
2324
      mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2325
      return code;
×
2326
    }
2327
  }
2328

2329
  mInfo("vgId:%d, vgroup info after move, replica:%d", newVg.vgId, newVg.replica);
15,165✔
2330
  for (int32_t i = 0; i < newVg.replica; ++i) {
48,980✔
2331
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
33,815✔
2332
  }
2333
  TAOS_RETURN(code);
15,165✔
2334
}
2335

2336
int32_t mndSetMoveVgroupsInfoToTrans(SMnode *pMnode, STrans *pTrans, int32_t delDnodeId, bool force, bool unsafe) {
7,579✔
2337
  int32_t code = 0;
7,579✔
2338
  SArray *pArray = mndBuildDnodesArray(pMnode, delDnodeId, NULL);
7,579✔
2339
  if (pArray == NULL) {
7,579✔
2340
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2341
    if (terrno != 0) code = terrno;
×
2342
    TAOS_RETURN(code);
×
2343
  }
2344

2345
  void *pIter = NULL;
7,579✔
2346
  while (1) {
22,611✔
2347
    SVgObj *pVgroup = NULL;
30,190✔
2348
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
30,190✔
2349
    if (pIter == NULL) break;
30,190✔
2350

2351
    int32_t vnIndex = -1;
22,611✔
2352
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
45,309✔
2353
      if (pVgroup->vnodeGid[i].dnodeId == delDnodeId) {
37,863✔
2354
        vnIndex = i;
15,165✔
2355
        break;
15,165✔
2356
      }
2357
    }
2358

2359
    code = 0;
22,611✔
2360
    if (vnIndex != -1) {
22,611✔
2361
      mInfo("vgId:%d, trans:%d, vnode:%d will be removed from dnode:%d, force:%d", pVgroup->vgId, pTrans->id, vnIndex,
15,165✔
2362
            delDnodeId, force);
2363
      SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
15,165✔
2364
      code = mndSetMoveVgroupInfoToTrans(pMnode, pTrans, pDb, pVgroup, vnIndex, pArray, force, unsafe);
15,165✔
2365
      mndReleaseDb(pMnode, pDb);
15,165✔
2366
    }
2367

2368
    sdbRelease(pMnode->pSdb, pVgroup);
22,611✔
2369

2370
    if (code != 0) {
22,611✔
2371
      sdbCancelFetch(pMnode->pSdb, pIter);
×
2372
      break;
×
2373
    }
2374
  }
2375

2376
  taosArrayDestroy(pArray);
7,579✔
2377
  TAOS_RETURN(code);
7,579✔
2378
}
2379

2380
static int32_t mndAddIncVgroupReplicaToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
69,222✔
2381
                                             int32_t newDnodeId) {
2382
  int32_t code = 0;
69,222✔
2383
  mInfo("vgId:%d, will add 1 vnode, replica:%d dnode:%d", pVgroup->vgId, pVgroup->replica, newDnodeId);
69,222✔
2384

2385
  // assoc dnode
2386
  SVnodeGid *pGid = &pVgroup->vnodeGid[pVgroup->replica];
69,222✔
2387
  pVgroup->replica++;
69,222✔
2388
  pGid->dnodeId = newDnodeId;
69,222✔
2389
  pGid->syncState = TAOS_SYNC_STATE_OFFLINE;
69,222✔
2390
  pGid->nodeRole = TAOS_SYNC_ROLE_LEARNER;
69,222✔
2391

2392
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
69,222✔
2393
  if (pVgRaw == NULL) {
69,222✔
2394
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2395
    if (terrno != 0) code = terrno;
×
2396
    TAOS_RETURN(code);
×
2397
  }
2398
  if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
69,222✔
2399
    sdbFreeRaw(pVgRaw);
×
2400
    TAOS_RETURN(code);
×
2401
  }
2402
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
69,222✔
2403
  if (code != 0) {
69,222✔
2404
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVgroup->vgId, tstrerror(code), __LINE__);
×
2405
    TAOS_RETURN(code);
×
2406
  }
2407

2408
  // learner
2409
  for (int32_t i = 0; i < pVgroup->replica - 1; ++i) {
234,808✔
2410
    TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId));
165,586✔
2411
  }
2412
  TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pGid));
69,222✔
2413

2414
  // voter
2415
  pGid->nodeRole = TAOS_SYNC_ROLE_VOTER;
69,222✔
2416
  TAOS_CHECK_RETURN(mndAddAlterVnodeTypeAction(pMnode, pTrans, pDb, pVgroup, pGid->dnodeId));
69,222✔
2417
  for (int32_t i = 0; i < pVgroup->replica - 1; ++i) {
234,808✔
2418
    TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId));
165,586✔
2419
  }
2420

2421
  // confirm
2422
  TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup));
69,222✔
2423

2424
  TAOS_RETURN(code);
69,222✔
2425
}
2426

2427
static int32_t mndAddDecVgroupReplicaFromTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
69,222✔
2428
                                               int32_t delDnodeId) {
2429
  int32_t code = 0;
69,222✔
2430
  mInfo("vgId:%d, will remove 1 vnode, replica:%d dnode:%d", pVgroup->vgId, pVgroup->replica, delDnodeId);
69,222✔
2431

2432
  SVnodeGid *pGid = NULL;
69,222✔
2433
  SVnodeGid  delGid = {0};
69,222✔
2434
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
122,952✔
2435
    if (pVgroup->vnodeGid[i].dnodeId == delDnodeId) {
122,952✔
2436
      pGid = &pVgroup->vnodeGid[i];
69,222✔
2437
      break;
69,222✔
2438
    }
2439
  }
2440

2441
  if (pGid == NULL) return 0;
69,222✔
2442

2443
  pVgroup->replica--;
69,222✔
2444
  memcpy(&delGid, pGid, sizeof(SVnodeGid));
69,222✔
2445
  memcpy(pGid, &pVgroup->vnodeGid[pVgroup->replica], sizeof(SVnodeGid));
69,222✔
2446
  memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
69,222✔
2447

2448
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
69,222✔
2449
  if (pVgRaw == NULL) {
69,222✔
2450
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2451
    if (terrno != 0) code = terrno;
×
2452
    TAOS_RETURN(code);
×
2453
  }
2454
  if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
69,222✔
2455
    sdbFreeRaw(pVgRaw);
×
2456
    TAOS_RETURN(code);
×
2457
  }
2458
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
69,222✔
2459
  if (code != 0) {
69,222✔
2460
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVgroup->vgId, tstrerror(code), __LINE__);
×
2461
    TAOS_RETURN(code);
×
2462
  }
2463

2464
  TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pDb, pVgroup, &delGid, true));
69,222✔
2465
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
234,808✔
2466
    TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId));
165,586✔
2467
  }
2468
  TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup));
69,222✔
2469

2470
  TAOS_RETURN(code);
69,222✔
2471
}
2472

2473
static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup, SDnodeObj *pNew1,
38,520✔
2474
                                     SDnodeObj *pOld1, SDnodeObj *pNew2, SDnodeObj *pOld2, SDnodeObj *pNew3,
2475
                                     SDnodeObj *pOld3) {
2476
  int32_t code = -1;
38,520✔
2477
  STrans *pTrans = NULL;
38,520✔
2478

2479
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "red-vgroup");
38,520✔
2480
  if (pTrans == NULL) {
38,520✔
2481
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2482
    if (terrno != 0) code = terrno;
×
2483
    goto _OVER;
×
2484
  }
2485

2486
  mndTransSetDbName(pTrans, pVgroup->dbName, NULL);
38,520✔
2487
  TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
38,520✔
2488
  TAOS_CHECK_GOTO(mndTransCheckConflictWithRetention(pMnode, pTrans), NULL, _OVER);
38,452✔
2489

2490
  mndTransSetSerial(pTrans);
38,452✔
2491
  mInfo("trans:%d, used to redistribute vgroup, vgId:%d", pTrans->id, pVgroup->vgId);
38,452✔
2492

2493
  SVgObj newVg = {0};
38,452✔
2494
  memcpy(&newVg, pVgroup, sizeof(SVgObj));
38,452✔
2495
  mInfo("vgId:%d, vgroup info before redistribute, replica:%d", newVg.vgId, newVg.replica);
38,452✔
2496
  for (int32_t i = 0; i < newVg.replica; ++i) {
129,682✔
2497
    mInfo("vgId:%d, vnode:%d dnode:%d role:%s", newVg.vgId, i, newVg.vnodeGid[i].dnodeId,
91,230✔
2498
          syncStr(newVg.vnodeGid[i].syncState));
2499
  }
2500

2501
  if (pNew1 != NULL && pOld1 != NULL) {
38,452✔
2502
    int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew1->id);
38,452✔
2503
    if (numOfVnodes >= pNew1->numOfSupportVnodes) {
38,452✔
2504
      mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew1->id, numOfVnodes,
712✔
2505
             pNew1->numOfSupportVnodes);
2506
      code = TSDB_CODE_MND_NO_ENOUGH_VNODES;
712✔
2507
      goto _OVER;
712✔
2508
    }
2509

2510
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
37,740✔
2511
    if (pNew1->memAvail - vgMem - pNew1->memUsed <= 0) {
37,740✔
2512
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
2513
             pVgroup->dbName, pVgroup->vgId, vgMem, pNew1->id, pNew1->memAvail, pNew1->memUsed);
2514
      code = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
×
2515
      goto _OVER;
×
2516
    } else {
2517
      pNew1->memUsed += vgMem;
37,740✔
2518
    }
2519

2520
    TAOS_CHECK_GOTO(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew1->id), NULL, _OVER);
37,740✔
2521
    TAOS_CHECK_GOTO(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld1->id), NULL, _OVER);
37,740✔
2522
  }
2523

2524
  if (pNew2 != NULL && pOld2 != NULL) {
37,740✔
2525
    int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew2->id);
11,382✔
2526
    if (numOfVnodes >= pNew2->numOfSupportVnodes) {
11,382✔
2527
      mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew2->id, numOfVnodes,
×
2528
             pNew2->numOfSupportVnodes);
2529
      code = TSDB_CODE_MND_NO_ENOUGH_VNODES;
×
2530
      goto _OVER;
×
2531
    }
2532
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
11,382✔
2533
    if (pNew2->memAvail - vgMem - pNew2->memUsed <= 0) {
11,382✔
2534
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
2535
             pVgroup->dbName, pVgroup->vgId, vgMem, pNew2->id, pNew2->memAvail, pNew2->memUsed);
2536
      code = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
×
2537
      goto _OVER;
×
2538
    } else {
2539
      pNew2->memUsed += vgMem;
11,382✔
2540
    }
2541
    TAOS_CHECK_GOTO(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew2->id), NULL, _OVER);
11,382✔
2542
    TAOS_CHECK_GOTO(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld2->id), NULL, _OVER);
11,382✔
2543
  }
2544

2545
  if (pNew3 != NULL && pOld3 != NULL) {
37,740✔
2546
    int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew3->id);
4,460✔
2547
    if (numOfVnodes >= pNew3->numOfSupportVnodes) {
4,460✔
2548
      mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew3->id, numOfVnodes,
×
2549
             pNew3->numOfSupportVnodes);
2550
      code = TSDB_CODE_MND_NO_ENOUGH_VNODES;
×
2551
      goto _OVER;
×
2552
    }
2553
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
4,460✔
2554
    if (pNew3->memAvail - vgMem - pNew3->memUsed <= 0) {
4,460✔
2555
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
2556
             pVgroup->dbName, pVgroup->vgId, vgMem, pNew3->id, pNew3->memAvail, pNew3->memUsed);
2557
      code = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
×
2558
      goto _OVER;
×
2559
    } else {
2560
      pNew3->memUsed += vgMem;
4,460✔
2561
    }
2562
    TAOS_CHECK_GOTO(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew3->id), NULL, _OVER);
4,460✔
2563
    TAOS_CHECK_GOTO(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld3->id), NULL, _OVER);
4,460✔
2564
  }
2565

2566
  {
2567
    SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
37,740✔
2568
    if (pRaw == NULL) {
37,740✔
2569
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2570
      if (terrno != 0) code = terrno;
×
2571
      goto _OVER;
×
2572
    }
2573
    if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
37,740✔
2574
      sdbFreeRaw(pRaw);
×
2575
      goto _OVER;
×
2576
    }
2577
    code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
37,740✔
2578
    if (code != 0) {
37,740✔
2579
      mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2580
      goto _OVER;
×
2581
    }
2582
  }
2583

2584
  mInfo("vgId:%d, vgroup info after redistribute, replica:%d", newVg.vgId, newVg.replica);
37,740✔
2585
  for (int32_t i = 0; i < newVg.replica; ++i) {
126,834✔
2586
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
89,094✔
2587
  }
2588

2589
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
37,740✔
2590
  code = 0;
37,060✔
2591

2592
_OVER:
38,520✔
2593
  mndTransDrop(pTrans);
38,520✔
2594
  mndReleaseDb(pMnode, pDb);
38,520✔
2595
  TAOS_RETURN(code);
38,520✔
2596
}
2597

2598
static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) {
46,954✔
2599
  SMnode    *pMnode = pReq->info.node;
46,954✔
2600
  SDnodeObj *pNew1 = NULL;
46,954✔
2601
  SDnodeObj *pNew2 = NULL;
46,954✔
2602
  SDnodeObj *pNew3 = NULL;
46,954✔
2603
  SDnodeObj *pOld1 = NULL;
46,954✔
2604
  SDnodeObj *pOld2 = NULL;
46,954✔
2605
  SDnodeObj *pOld3 = NULL;
46,954✔
2606
  SVgObj    *pVgroup = NULL;
46,954✔
2607
  SDbObj    *pDb = NULL;
46,954✔
2608
  int32_t    code = -1;
46,954✔
2609
  int64_t    curMs = taosGetTimestampMs();
46,954✔
2610
  int32_t    newDnodeId[3] = {0};
46,954✔
2611
  int32_t    oldDnodeId[3] = {0};
46,954✔
2612
  int32_t    newIndex = -1;
46,954✔
2613
  int32_t    oldIndex = -1;
46,954✔
2614
  int64_t    tss = taosGetTimestampMs();
46,954✔
2615

2616
  SRedistributeVgroupReq req = {0};
46,954✔
2617
  if (tDeserializeSRedistributeVgroupReq(pReq->pCont, pReq->contLen, &req) != 0) {
46,954✔
2618
    code = TSDB_CODE_INVALID_MSG;
×
2619
    goto _OVER;
×
2620
  }
2621

2622
  mInfo("vgId:%d, start to redistribute vgroup to dnode %d:%d:%d", req.vgId, req.dnodeId1, req.dnodeId2, req.dnodeId3);
46,954✔
2623
  if ((code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_REDISTRIBUTE_VGROUP)) != 0) {
46,954✔
2624
    goto _OVER;
×
2625
  }
2626

2627
  pVgroup = mndAcquireVgroup(pMnode, req.vgId);
46,954✔
2628
  if (pVgroup == NULL) {
46,954✔
2629
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
2,136✔
2630
    if (terrno != 0) code = terrno;
2,136✔
2631
    goto _OVER;
2,136✔
2632
  }
2633
  if (pVgroup->mountVgId) {
44,818✔
2634
    code = TSDB_CODE_MND_MOUNT_OBJ_NOT_SUPPORT;
×
2635
    goto _OVER;
×
2636
  }
2637
  pDb = mndAcquireDb(pMnode, pVgroup->dbName);
44,818✔
2638
  if (pDb == NULL) {
44,818✔
2639
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2640
    if (terrno != 0) code = terrno;
×
2641
    goto _OVER;
×
2642
  }
2643

2644
  if (pVgroup->replica == 1) {
44,818✔
2645
    if (req.dnodeId1 <= 0 || req.dnodeId2 > 0 || req.dnodeId3 > 0) {
11,402✔
2646
      code = TSDB_CODE_MND_INVALID_REPLICA;
×
2647
      goto _OVER;
×
2648
    }
2649

2650
    if (req.dnodeId1 == pVgroup->vnodeGid[0].dnodeId) {
11,402✔
2651
      // terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
2652
      code = 0;
×
2653
      goto _OVER;
×
2654
    }
2655

2656
    pNew1 = mndAcquireDnode(pMnode, req.dnodeId1);
11,402✔
2657
    if (pNew1 == NULL) {
11,402✔
2658
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2659
      if (terrno != 0) code = terrno;
×
2660
      goto _OVER;
×
2661
    }
2662
    if (!mndIsDnodeOnline(pNew1, curMs)) {
11,402✔
2663
      code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2664
      goto _OVER;
×
2665
    }
2666

2667
    pOld1 = mndAcquireDnode(pMnode, pVgroup->vnodeGid[0].dnodeId);
11,402✔
2668
    if (pOld1 == NULL) {
11,402✔
2669
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2670
      if (terrno != 0) code = terrno;
×
2671
      goto _OVER;
×
2672
    }
2673
    if (!mndIsDnodeOnline(pOld1, curMs)) {
11,402✔
2674
      code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2675
      goto _OVER;
×
2676
    }
2677

2678
    code = mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, NULL, NULL, NULL, NULL);
11,402✔
2679

2680
  } else if (pVgroup->replica == 3) {
33,416✔
2681
    if (req.dnodeId1 <= 0 || req.dnodeId2 <= 0 || req.dnodeId3 <= 0) {
31,958✔
2682
      code = TSDB_CODE_MND_INVALID_REPLICA;
2,848✔
2683
      goto _OVER;
2,848✔
2684
    }
2685

2686
    if (req.dnodeId1 == req.dnodeId2 || req.dnodeId1 == req.dnodeId3 || req.dnodeId2 == req.dnodeId3) {
29,110✔
2687
      code = TSDB_CODE_MND_INVALID_REPLICA;
712✔
2688
      goto _OVER;
712✔
2689
    }
2690

2691
    if (req.dnodeId1 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId1 != pVgroup->vnodeGid[1].dnodeId &&
28,398✔
2692
        req.dnodeId1 != pVgroup->vnodeGid[2].dnodeId) {
14,100✔
2693
      newDnodeId[++newIndex] = req.dnodeId1;
11,964✔
2694
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
11,964✔
2695
    }
2696

2697
    if (req.dnodeId2 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[1].dnodeId &&
28,398✔
2698
        req.dnodeId2 != pVgroup->vnodeGid[2].dnodeId) {
18,910✔
2699
      newDnodeId[++newIndex] = req.dnodeId2;
13,931✔
2700
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
13,931✔
2701
    }
2702

2703
    if (req.dnodeId3 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId3 != pVgroup->vnodeGid[1].dnodeId &&
28,398✔
2704
        req.dnodeId3 != pVgroup->vnodeGid[2].dnodeId) {
21,283✔
2705
      newDnodeId[++newIndex] = req.dnodeId3;
18,251✔
2706
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
18,251✔
2707
    }
2708

2709
    if (req.dnodeId1 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[0].dnodeId &&
28,398✔
2710
        req.dnodeId3 != pVgroup->vnodeGid[0].dnodeId) {
14,754✔
2711
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[0].dnodeId;
11,906✔
2712
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
11,906✔
2713
    }
2714

2715
    if (req.dnodeId1 != pVgroup->vnodeGid[1].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[1].dnodeId &&
28,398✔
2716
        req.dnodeId3 != pVgroup->vnodeGid[1].dnodeId) {
18,256✔
2717
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[1].dnodeId;
13,989✔
2718
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
13,989✔
2719
    }
2720

2721
    if (req.dnodeId1 != pVgroup->vnodeGid[2].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[2].dnodeId &&
28,398✔
2722
        req.dnodeId3 != pVgroup->vnodeGid[2].dnodeId) {
21,283✔
2723
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[2].dnodeId;
18,251✔
2724
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
18,251✔
2725
    }
2726

2727
    if (newDnodeId[0] != 0) {
28,398✔
2728
      pNew1 = mndAcquireDnode(pMnode, newDnodeId[0]);
27,502✔
2729
      if (pNew1 == NULL) {
27,502✔
2730
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2731
        if (terrno != 0) code = terrno;
×
2732
        goto _OVER;
×
2733
      }
2734
      if (!mndIsDnodeOnline(pNew1, curMs)) {
27,502✔
2735
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
712✔
2736
        goto _OVER;
712✔
2737
      }
2738
    }
2739

2740
    if (newDnodeId[1] != 0) {
27,686✔
2741
      pNew2 = mndAcquireDnode(pMnode, newDnodeId[1]);
11,054✔
2742
      if (pNew2 == NULL) {
11,054✔
2743
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2744
        if (terrno != 0) code = terrno;
×
2745
        goto _OVER;
×
2746
      }
2747
      if (!mndIsDnodeOnline(pNew2, curMs)) {
11,054✔
UNCOV
2748
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
UNCOV
2749
        goto _OVER;
×
2750
      }
2751
    }
2752

2753
    if (newDnodeId[2] != 0) {
27,686✔
2754
      pNew3 = mndAcquireDnode(pMnode, newDnodeId[2]);
5,590✔
2755
      if (pNew3 == NULL) {
5,590✔
2756
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2757
        if (terrno != 0) code = terrno;
×
2758
        goto _OVER;
×
2759
      }
2760
      if (!mndIsDnodeOnline(pNew3, curMs)) {
5,590✔
2761
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2762
        goto _OVER;
×
2763
      }
2764
    }
2765

2766
    if (oldDnodeId[0] != 0) {
27,686✔
2767
      pOld1 = mndAcquireDnode(pMnode, oldDnodeId[0]);
26,790✔
2768
      if (pOld1 == NULL) {
26,790✔
2769
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2770
        if (terrno != 0) code = terrno;
×
2771
        goto _OVER;
×
2772
      }
2773
      if (!mndIsDnodeOnline(pOld1, curMs)) {
26,790✔
2774
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
1,130✔
2775
        goto _OVER;
1,130✔
2776
      }
2777
    }
2778

2779
    if (oldDnodeId[1] != 0) {
26,556✔
2780
      pOld2 = mndAcquireDnode(pMnode, oldDnodeId[1]);
9,924✔
2781
      if (pOld2 == NULL) {
9,924✔
2782
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2783
        if (terrno != 0) code = terrno;
×
2784
        goto _OVER;
×
2785
      }
2786
      if (!mndIsDnodeOnline(pOld2, curMs)) {
9,924✔
2787
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2788
        goto _OVER;
×
2789
      }
2790
    }
2791

2792
    if (oldDnodeId[2] != 0) {
26,556✔
2793
      pOld3 = mndAcquireDnode(pMnode, oldDnodeId[2]);
4,460✔
2794
      if (pOld3 == NULL) {
4,460✔
2795
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2796
        if (terrno != 0) code = terrno;
×
2797
        goto _OVER;
×
2798
      }
2799
      if (!mndIsDnodeOnline(pOld3, curMs)) {
4,460✔
2800
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2801
        goto _OVER;
×
2802
      }
2803
    }
2804

2805
    if (pNew1 == NULL && pOld1 == NULL && pNew2 == NULL && pOld2 == NULL && pNew3 == NULL && pOld3 == NULL) {
26,556✔
2806
      // terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
2807
      code = 0;
896✔
2808
      goto _OVER;
896✔
2809
    }
2810

2811
    code = mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, pNew2, pOld2, pNew3, pOld3);
25,660✔
2812

2813
  } else if (pVgroup->replica == 2) {
1,458✔
2814
    if (req.dnodeId1 <= 0 || req.dnodeId2 <= 0) {
1,458✔
2815
      code = TSDB_CODE_MND_INVALID_REPLICA;
×
2816
      goto _OVER;
×
2817
    }
2818

2819
    if (req.dnodeId1 == req.dnodeId2) {
1,458✔
2820
      code = TSDB_CODE_MND_INVALID_REPLICA;
×
2821
      goto _OVER;
×
2822
    }
2823

2824
    if (req.dnodeId1 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId1 != pVgroup->vnodeGid[1].dnodeId) {
1,458✔
2825
      newDnodeId[++newIndex] = req.dnodeId1;
1,458✔
2826
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
1,458✔
2827
    }
2828

2829
    if (req.dnodeId2 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[1].dnodeId) {
1,458✔
2830
      newDnodeId[++newIndex] = req.dnodeId2;
1,458✔
2831
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
1,458✔
2832
    }
2833

2834
    if (req.dnodeId1 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[0].dnodeId) {
1,458✔
2835
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[0].dnodeId;
1,458✔
2836
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
1,458✔
2837
    }
2838

2839
    if (req.dnodeId1 != pVgroup->vnodeGid[1].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[1].dnodeId) {
1,458✔
2840
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[1].dnodeId;
1,458✔
2841
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
1,458✔
2842
    }
2843

2844
    if (newDnodeId[0] != 0) {
1,458✔
2845
      pNew1 = mndAcquireDnode(pMnode, newDnodeId[0]);
1,458✔
2846
      if (pNew1 == NULL) {
1,458✔
2847
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2848
        if (terrno != 0) code = terrno;
×
2849
        goto _OVER;
×
2850
      }
2851
      if (!mndIsDnodeOnline(pNew1, curMs)) {
1,458✔
2852
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2853
        goto _OVER;
×
2854
      }
2855
    }
2856

2857
    if (newDnodeId[1] != 0) {
1,458✔
2858
      pNew2 = mndAcquireDnode(pMnode, newDnodeId[1]);
1,458✔
2859
      if (pNew2 == NULL) {
1,458✔
2860
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2861
        if (terrno != 0) code = terrno;
×
2862
        goto _OVER;
×
2863
      }
2864
      if (!mndIsDnodeOnline(pNew2, curMs)) {
1,458✔
2865
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2866
        goto _OVER;
×
2867
      }
2868
    }
2869

2870
    if (oldDnodeId[0] != 0) {
1,458✔
2871
      pOld1 = mndAcquireDnode(pMnode, oldDnodeId[0]);
1,458✔
2872
      if (pOld1 == NULL) {
1,458✔
2873
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2874
        if (terrno != 0) code = terrno;
×
2875
        goto _OVER;
×
2876
      }
2877
      if (!mndIsDnodeOnline(pOld1, curMs)) {
1,458✔
2878
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2879
        goto _OVER;
×
2880
      }
2881
    }
2882

2883
    if (oldDnodeId[1] != 0) {
1,458✔
2884
      pOld2 = mndAcquireDnode(pMnode, oldDnodeId[1]);
1,458✔
2885
      if (pOld2 == NULL) {
1,458✔
2886
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2887
        if (terrno != 0) code = terrno;
×
2888
        goto _OVER;
×
2889
      }
2890
      if (!mndIsDnodeOnline(pOld2, curMs)) {
1,458✔
2891
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2892
        goto _OVER;
×
2893
      }
2894
    }
2895

2896
    if (pNew1 == NULL && pOld1 == NULL && pNew2 == NULL && pOld2 == NULL) {
1,458✔
2897
      // terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
2898
      code = 0;
×
2899
      goto _OVER;
×
2900
    }
2901

2902
    code = mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, pNew2, pOld2, NULL, NULL);
1,458✔
2903
  } else {
2904
    code = TSDB_CODE_MND_REQ_REJECTED;
×
2905
    goto _OVER;
×
2906
  }
2907

2908
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
38,520✔
2909

2910
  if (tsAuditLevel >= AUDIT_LEVEL_CLUSTER) {
38,520✔
2911
    char obj[33] = {0};
38,520✔
2912
    (void)snprintf(obj, sizeof(obj), "%d", req.vgId);
38,520✔
2913

2914
    int64_t tse = taosGetTimestampMs();
38,520✔
2915
    double  duration = (double)(tse - tss);
38,520✔
2916
    duration = duration / 1000;
38,520✔
2917
    auditRecord(pReq, pMnode->clusterId, "RedistributeVgroup", "", obj, req.sql, req.sqlLen, duration, 0);
38,520✔
2918
  }
2919
_OVER:
46,954✔
2920
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
46,954✔
2921
    mError("vgId:%d, failed to redistribute to dnode %d:%d:%d since %s", req.vgId, req.dnodeId1, req.dnodeId2,
8,998✔
2922
           req.dnodeId3, tstrerror(code));
2923
  }
2924

2925
  mndReleaseDnode(pMnode, pNew1);
46,954✔
2926
  mndReleaseDnode(pMnode, pNew2);
46,954✔
2927
  mndReleaseDnode(pMnode, pNew3);
46,954✔
2928
  mndReleaseDnode(pMnode, pOld1);
46,954✔
2929
  mndReleaseDnode(pMnode, pOld2);
46,954✔
2930
  mndReleaseDnode(pMnode, pOld3);
46,954✔
2931
  mndReleaseVgroup(pMnode, pVgroup);
46,954✔
2932
  mndReleaseDb(pMnode, pDb);
46,954✔
2933
  tFreeSRedistributeVgroupReq(&req);
46,954✔
2934

2935
  TAOS_RETURN(code);
46,954✔
2936
}
2937

2938
static void *mndBuildSForceBecomeFollowerReq(SMnode *pMnode, SVgObj *pVgroup, int32_t dnodeId, int32_t *pContLen) {
4,231✔
2939
  SForceBecomeFollowerReq balanceReq = {
4,231✔
2940
      .vgId = pVgroup->vgId,
4,231✔
2941
  };
2942

2943
  int32_t contLen = tSerializeSForceBecomeFollowerReq(NULL, 0, &balanceReq);
4,231✔
2944
  if (contLen < 0) {
4,231✔
2945
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2946
    return NULL;
×
2947
  }
2948
  contLen += sizeof(SMsgHead);
4,231✔
2949

2950
  void *pReq = taosMemoryMalloc(contLen);
4,231✔
2951
  if (pReq == NULL) {
4,231✔
2952
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2953
    return NULL;
×
2954
  }
2955

2956
  SMsgHead *pHead = pReq;
4,231✔
2957
  pHead->contLen = htonl(contLen);
4,231✔
2958
  pHead->vgId = htonl(pVgroup->vgId);
4,231✔
2959

2960
  if (tSerializeSForceBecomeFollowerReq((char *)pReq + sizeof(SMsgHead), contLen, &balanceReq) < 0) {
4,231✔
2961
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2962
    taosMemoryFree(pReq);
×
2963
    return NULL;
×
2964
  }
2965
  *pContLen = contLen;
4,231✔
2966
  return pReq;
4,231✔
2967
}
2968

2969
int32_t mndAddBalanceVgroupLeaderAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t dnodeId) {
4,231✔
2970
  int32_t    code = 0;
4,231✔
2971
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
4,231✔
2972
  if (pDnode == NULL) {
4,231✔
2973
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2974
    if (terrno != 0) code = terrno;
×
2975
    TAOS_RETURN(code);
×
2976
  }
2977

2978
  STransAction action = {0};
4,231✔
2979
  action.epSet = mndGetDnodeEpset(pDnode);
4,231✔
2980
  mndReleaseDnode(pMnode, pDnode);
4,231✔
2981

2982
  int32_t contLen = 0;
4,231✔
2983
  void   *pReq = mndBuildSForceBecomeFollowerReq(pMnode, pVgroup, dnodeId, &contLen);
4,231✔
2984
  if (pReq == NULL) {
4,231✔
2985
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2986
    if (terrno != 0) code = terrno;
×
2987
    TAOS_RETURN(code);
×
2988
  }
2989

2990
  action.pCont = pReq;
4,231✔
2991
  action.contLen = contLen;
4,231✔
2992
  action.msgType = TDMT_SYNC_FORCE_FOLLOWER;
4,231✔
2993

2994
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
4,231✔
2995
    taosMemoryFree(pReq);
×
2996
    TAOS_RETURN(code);
×
2997
  }
2998

2999
  TAOS_RETURN(code);
4,231✔
3000
}
3001

3002
static void *mndBuildAlterVnodeElectBaselineReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId,
25,386✔
3003
                                          int32_t *pContLen, int32_t ms) {
3004
  SAlterVnodeElectBaselineReq alterReq = {
25,386✔
3005
      .vgId = pVgroup->vgId,
25,386✔
3006
      .electBaseLine = ms,
3007
  };
3008

3009
  int32_t contLen = tSerializeSAlterVnodeReplicaReq(NULL, 0, &alterReq);
25,386✔
3010
  if (contLen < 0) {
25,386✔
3011
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
3012
    return NULL;
×
3013
  }
3014

3015
  void *pReq = taosMemoryMalloc(contLen);
25,386✔
3016
  if (pReq == NULL) {
25,386✔
3017
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
3018
    return NULL;
×
3019
  }
3020

3021
  if (tSerializeSAlterVnodeReplicaReq(pReq, contLen, &alterReq) < 0) {
25,386✔
3022
    mError("vgId:%d, failed to serialize alter vnode req,since %s", alterReq.vgId, terrstr());
×
3023
    taosMemoryFree(pReq);
×
3024
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
3025
    return NULL;
×
3026
  }
3027
  *pContLen = contLen;
25,386✔
3028
  return pReq;
25,386✔
3029
}
3030

3031
static int32_t mndAddAlterVnodeElectionBaselineActionToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId, int32_t ms) {
25,386✔
3032
  int32_t    code = 0;
25,386✔
3033
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
25,386✔
3034
  if (pDnode == NULL) {
25,386✔
3035
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3036
    if (terrno != 0) code = terrno;
×
3037
    TAOS_RETURN(code);
×
3038
  }
3039

3040
  STransAction action = {0};
25,386✔
3041
  action.epSet = mndGetDnodeEpset(pDnode);
25,386✔
3042
  mndReleaseDnode(pMnode, pDnode);
25,386✔
3043

3044
  int32_t contLen = 0;
25,386✔
3045
  void   *pReq = mndBuildAlterVnodeElectBaselineReq(pMnode, pDb, pVgroup, dnodeId, &contLen, ms);
25,386✔
3046
  if (pReq == NULL) {
25,386✔
3047
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3048
    if (terrno != 0) code = terrno;
×
3049
    TAOS_RETURN(code);
×
3050
  }
3051

3052
  action.pCont = pReq;
25,386✔
3053
  action.contLen = contLen;
25,386✔
3054
  action.msgType = TDMT_VND_ALTER_ELECTBASELINE;
25,386✔
3055
  action.groupId = pVgroup->vgId;
25,386✔
3056

3057
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
25,386✔
3058
    taosMemoryFree(pReq);
×
3059
    TAOS_RETURN(code);
×
3060
  }
3061

3062
  TAOS_RETURN(code);
25,386✔
3063
}
3064

3065
static int32_t mndAddAlterVgroupElectionBaselineActionToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTrans, int32_t index){
8,462✔
3066
  int32_t code = 0;
8,462✔
3067
  SSdb   *pSdb = pMnode->pSdb;
8,462✔
3068

3069
  int32_t vgid = pVgroup->vgId;
8,462✔
3070
  int8_t  replica = pVgroup->replica;
8,462✔
3071

3072
  if (pVgroup->replica <= 1) {
8,462✔
3073
    mInfo("trans:%d, vgid:%d no need to balance, replica:%d", pTrans->id, vgid, replica);
×
3074
    return -1;
×
3075
  }
3076

3077
  for(int32_t i = 0; i < 3; i++){
33,848✔
3078
    if(i == index%3){
25,386✔
3079
      mInfo("trans:%d, balance leader to dnode:%d", pTrans->id, pVgroup->vnodeGid[i].dnodeId);
4,231✔
3080
      TAOS_CHECK_RETURN(mndAddAlterVnodeElectionBaselineActionToTrans(pMnode, pTrans, NULL, pVgroup,
4,231✔
3081
                                                                      pVgroup->vnodeGid[i].dnodeId, 1500));
3082
    }
3083
    else{
3084
    TAOS_CHECK_RETURN(
21,155✔
3085
        mndAddAlterVnodeElectionBaselineActionToTrans(pMnode, pTrans, NULL, pVgroup, pVgroup->vnodeGid[i].dnodeId, 5000));
3086
    }
3087
  }
3088
  return code; 
8,462✔
3089
}
3090

3091
int32_t mndAddVgroupBalanceToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTrans, int32_t index) {
5,038✔
3092
  int32_t code = 0;
5,038✔
3093
  SSdb   *pSdb = pMnode->pSdb;
5,038✔
3094

3095
  int32_t vgid = pVgroup->vgId;
5,038✔
3096
  int8_t  replica = pVgroup->replica;
5,038✔
3097

3098
  if (pVgroup->replica <= 1) {
5,038✔
3099
    mInfo("trans:%d, vgid:%d no need to balance, replica:%d", pTrans->id, vgid, replica);
285✔
3100
    return -1;
285✔
3101
  }
3102

3103
  int32_t dnodeId = 0;
4,753✔
3104

3105
  for (int i = 0; i < replica; i++) {
7,830✔
3106
    if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEADER) {
7,308✔
3107
      dnodeId = pVgroup->vnodeGid[i].dnodeId;
4,231✔
3108
      break;
4,231✔
3109
    }
3110
  }
3111

3112
  bool       exist = false;
4,753✔
3113
  bool       online = false;
4,753✔
3114
  int64_t    curMs = taosGetTimestampMs();
4,753✔
3115
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
4,753✔
3116
  if (pDnode != NULL) {
4,753✔
3117
    exist = true;
4,231✔
3118
    online = mndIsDnodeOnline(pDnode, curMs);
4,231✔
3119
    mndReleaseDnode(pMnode, pDnode);
4,231✔
3120
  }
3121

3122
  if (exist && online) {
8,984✔
3123
    mInfo("trans:%d, vgid:%d force drop leader from dnode:%d", pTrans->id, vgid, dnodeId);    
4,231✔
3124
    TAOS_CHECK_RETURN(mndAddAlterVgroupElectionBaselineActionToTrans(pMnode, pVgroup, pTrans, index));
4,231✔
3125

3126
    if ((code = mndAddBalanceVgroupLeaderAction(pMnode, pTrans, pVgroup, dnodeId)) != 0) {
4,231✔
3127
      mError("trans:%d, vgid:%d failed to be balanced to dnode:%d", pTrans->id, vgid, dnodeId);
×
3128
      TAOS_RETURN(code);
×
3129
    }
3130

3131
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, NULL, pVgroup));
4,231✔
3132

3133
    TAOS_CHECK_RETURN(mndAddAlterVgroupElectionBaselineActionToTrans(pMnode, pVgroup, pTrans, -1));
4,231✔
3134

3135
    SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
4,231✔
3136
    if (pDb == NULL) {
4,231✔
3137
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3138
      if (terrno != 0) code = terrno;
×
3139
      mError("trans:%d, vgid:%d failed to be balanced to dnode:%d, because db not exist", pTrans->id, vgid, dnodeId);
×
3140
      TAOS_RETURN(code);
×
3141
    }
3142

3143
    mndReleaseDb(pMnode, pDb);
4,231✔
3144
  } else {
3145
    mInfo("trans:%d, vgid:%d cant be balanced to dnode:%d, exist:%d, online:%d", pTrans->id, vgid, dnodeId, exist,
522✔
3146
          online);
3147
  }
3148

3149
  TAOS_RETURN(code);
4,753✔
3150
}
3151

3152
extern int32_t mndProcessVgroupBalanceLeaderMsgImp(SRpcMsg *pReq);
3153

3154
int32_t mndProcessVgroupBalanceLeaderMsg(SRpcMsg *pReq) { return mndProcessVgroupBalanceLeaderMsgImp(pReq); }
2,146✔
3155

3156
#ifndef TD_ENTERPRISE
3157
int32_t mndProcessVgroupBalanceLeaderMsgImp(SRpcMsg *pReq) { return 0; }
3158
#endif
3159

3160
static int32_t mndCheckDnodeMemory(SMnode *pMnode, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pOldVgroup,
208,227✔
3161
                                   SVgObj *pNewVgroup, SArray *pArray) {
3162
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
630,308✔
3163
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
422,081✔
3164
    bool       inVgroup = false;
422,081✔
3165
    int64_t    oldMemUsed = 0;
422,081✔
3166
    int64_t    newMemUsed = 0;
422,081✔
3167
    mDebug("db:%s, vgId:%d, check dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName, pNewVgroup->vgId,
422,081✔
3168
           pDnode->id, pDnode->memAvail, pDnode->memUsed);
3169
    for (int32_t j = 0; j < pOldVgroup->replica; ++j) {
1,216,606✔
3170
      SVnodeGid *pVgId = &pOldVgroup->vnodeGid[j];
794,525✔
3171
      if (pDnode->id == pVgId->dnodeId) {
794,525✔
3172
        oldMemUsed = mndGetVgroupMemory(pMnode, pOldDb, pOldVgroup);
332,375✔
3173
        inVgroup = true;
332,375✔
3174
      }
3175
    }
3176
    for (int32_t j = 0; j < pNewVgroup->replica; ++j) {
1,216,606✔
3177
      SVnodeGid *pVgId = &pNewVgroup->vnodeGid[j];
794,525✔
3178
      if (pDnode->id == pVgId->dnodeId) {
794,525✔
3179
        newMemUsed = mndGetVgroupMemory(pMnode, pNewDb, pNewVgroup);
332,375✔
3180
        inVgroup = true;
332,375✔
3181
      }
3182
    }
3183

3184
    mDebug("db:%s, vgId:%d, memory in dnode:%d, oldUsed:%" PRId64 ", newUsed:%" PRId64, pNewVgroup->dbName,
422,081✔
3185
           pNewVgroup->vgId, pDnode->id, oldMemUsed, newMemUsed);
3186

3187
    pDnode->memUsed = pDnode->memUsed - oldMemUsed + newMemUsed;
422,081✔
3188
    if (pDnode->memAvail - pDnode->memUsed <= 0) {
422,081✔
3189
      mError("db:%s, vgId:%d, no enough memory in dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName,
×
3190
             pNewVgroup->vgId, pDnode->id, pDnode->memAvail, pDnode->memUsed);
3191
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE);
×
3192
    } else if (inVgroup) {
422,081✔
3193
      mInfo("db:%s, vgId:%d, memory in dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName, pNewVgroup->vgId,
332,375✔
3194
            pDnode->id, pDnode->memAvail, pDnode->memUsed);
3195
    } else {
3196
    }
3197
  }
3198
  return 0;
208,227✔
3199
}
3200

3201
int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
251,870✔
3202
                                  SArray *pArray, SVgObj *pNewVgroup) {
3203
  int32_t code = 0;
251,870✔
3204
  memcpy(pNewVgroup, pVgroup, sizeof(SVgObj));
251,870✔
3205

3206
  if (pVgroup->replica <= 0 || pVgroup->replica == pNewDb->cfg.replications) {
251,870✔
3207
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfigAction(pMnode, pTrans, pNewDb, pVgroup));
208,227✔
3208
    TAOS_CHECK_RETURN(mndCheckDnodeMemory(pMnode, pOldDb, pNewDb, pNewVgroup, pVgroup, pArray));
208,227✔
3209
    return 0;
208,227✔
3210
  }
3211

3212
  // mndTransSetGroupParallel(pTrans);
3213

3214
  if (pNewDb->cfg.replications == 3) {
43,643✔
3215
    mInfo("trans:%d, db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pTrans->id, pVgroup->dbName, pVgroup->vgId,
38,588✔
3216
          pVgroup->vnodeGid[0].dnodeId);
3217

3218
    // add second
3219
    if (pNewVgroup->replica == 1) {
38,588✔
3220
      TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
38,588✔
3221
    }
3222

3223
    // learner stage
3224
    pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
37,847✔
3225
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
37,847✔
3226
    TAOS_CHECK_RETURN(
37,847✔
3227
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3228

3229
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &pNewVgroup->vnodeGid[1]));
37,847✔
3230

3231
    // follower stage
3232
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
37,847✔
3233
    TAOS_CHECK_RETURN(mndAddAlterVnodeTypeAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
37,847✔
3234
    TAOS_CHECK_RETURN(
37,847✔
3235
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3236

3237
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
37,847✔
3238

3239
    // add third
3240
    if (pNewVgroup->replica == 2) {
37,847✔
3241
      TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
37,847✔
3242
    }
3243

3244
    pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
35,741✔
3245
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
35,741✔
3246
    pNewVgroup->vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
35,741✔
3247
    TAOS_CHECK_RETURN(
35,741✔
3248
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3249
    TAOS_CHECK_RETURN(
35,741✔
3250
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
3251
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &pNewVgroup->vnodeGid[2]));
35,741✔
3252

3253
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
35,741✔
3254
  } else if (pNewDb->cfg.replications == 1) {
5,055✔
3255
    mInfo("trans:%d, db:%s, vgId:%d, will remove 2 vnodes, vn:0 dnode:%d vn:1 dnode:%d vn:2 dnode:%d", pTrans->id,
3,575✔
3256
          pVgroup->dbName, pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId, pVgroup->vnodeGid[1].dnodeId,
3257
          pVgroup->vnodeGid[2].dnodeId);
3258

3259
    SVnodeGid del1 = {0};
3,575✔
3260
    SVnodeGid del2 = {0};
3,575✔
3261
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroup(pMnode, pTrans, pNewVgroup, pArray, &del1));
3,575✔
3262
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &del1, true));
3,575✔
3263
    TAOS_CHECK_RETURN(
3,575✔
3264
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3265
    TAOS_CHECK_RETURN(
3,575✔
3266
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
3267
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
3,575✔
3268

3269
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroup(pMnode, pTrans, pNewVgroup, pArray, &del2));
3,575✔
3270
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &del2, true));
3,575✔
3271
    TAOS_CHECK_RETURN(
3,575✔
3272
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3273
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
3,575✔
3274
  } else if (pNewDb->cfg.replications == 2) {
1,480✔
3275
    mInfo("trans:%d, db:%s, vgId:%d, will add 1 vnode, vn:0 dnode:%d", pTrans->id, pVgroup->dbName, pVgroup->vgId,
1,480✔
3276
          pVgroup->vnodeGid[0].dnodeId);
3277

3278
    // add second
3279
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
1,480✔
3280

3281
    // learner stage
3282
    pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
1,480✔
3283
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
1,480✔
3284
    TAOS_CHECK_RETURN(
1,480✔
3285
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3286

3287
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &pNewVgroup->vnodeGid[1]));
1,480✔
3288

3289
    // follower stage
3290
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
1,480✔
3291
    TAOS_CHECK_RETURN(mndAddAlterVnodeTypeAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
1,480✔
3292
    TAOS_CHECK_RETURN(
1,480✔
3293
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3294

3295
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
1,480✔
3296
  } else {
3297
    return -1;
×
3298
  }
3299

3300
  mndSortVnodeGid(pNewVgroup);
40,796✔
3301

3302
  {
3303
    SSdbRaw *pVgRaw = mndVgroupActionEncode(pNewVgroup);
40,796✔
3304
    if (pVgRaw == NULL) {
40,796✔
3305
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3306
      if (terrno != 0) code = terrno;
×
3307
      TAOS_RETURN(code);
×
3308
    }
3309
    if ((code = mndTransAppendCommitlog(pTrans, pVgRaw)) != 0) {
40,796✔
3310
      sdbFreeRaw(pVgRaw);
×
3311
      TAOS_RETURN(code);
×
3312
    }
3313
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
40,796✔
3314
    if (code != 0) {
40,796✔
3315
      mError("vgId:%d, failed to set raw status since %s at line:%d", pNewVgroup->vgId, tstrerror(code), __LINE__);
×
3316
      TAOS_RETURN(code);
×
3317
    }
3318
  }
3319

3320
  TAOS_RETURN(code);
40,796✔
3321
}
3322

3323
int32_t mndBuildRaftAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
×
3324
                                      SArray *pArray) {
3325
  int32_t code = 0;
×
3326
  SVgObj  newVgroup = {0};
×
3327
  memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
×
3328

3329
  if (pVgroup->replica <= 0 || pVgroup->replica == pNewDb->cfg.replications) {
×
3330
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfigAction(pMnode, pTrans, pNewDb, pVgroup));
×
3331
    TAOS_CHECK_RETURN(mndCheckDnodeMemory(pMnode, pOldDb, pNewDb, &newVgroup, pVgroup, pArray));
×
3332
    return 0;
×
3333
  }
3334

3335
  mndTransSetSerial(pTrans);
×
3336

3337
  mInfo("trans:%d, vgId:%d, alter vgroup, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id, pVgroup->vgId,
×
3338
        pVgroup->syncConfChangeVer, pVgroup->version, pVgroup->replica);
3339

3340
  if (newVgroup.replica == 1 && pNewDb->cfg.replications == 3) {
×
3341
    mInfo("db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId,
×
3342
          pVgroup->vnodeGid[0].dnodeId);
3343

3344
    // add second
3345
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVgroup, pArray));
×
3346
    // add third
3347
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVgroup, pArray));
×
3348

3349
    // add learner stage
3350
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3351
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3352
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3353
    TAOS_CHECK_RETURN(
×
3354
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3355
    mInfo("trans:%d, vgId:%d, add change config, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id,
×
3356
          pVgroup->vgId, newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
3357
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[1]));
×
3358
    mInfo("trans:%d, vgId:%d, create vnode, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id, pVgroup->vgId,
×
3359
          newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
3360
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[2]));
×
3361
    mInfo("trans:%d, vgId:%d, create vnode, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id, pVgroup->vgId,
×
3362
          newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
3363

3364
    // check learner
3365
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3366
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3367
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3368
    TAOS_CHECK_RETURN(
×
3369
        mndAddCheckLearnerCatchupAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[1].dnodeId));
3370
    TAOS_CHECK_RETURN(
×
3371
        mndAddCheckLearnerCatchupAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[2].dnodeId));
3372

3373
    // change raft type
3374
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3375
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3376
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3377
    TAOS_CHECK_RETURN(
×
3378
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3379

3380
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3381

3382
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3383
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3384
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3385
    TAOS_CHECK_RETURN(
×
3386
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3387

3388
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3389

3390
    SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
×
3391
    if (pVgRaw == NULL) {
×
3392
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3393
      if (terrno != 0) code = terrno;
×
3394
      TAOS_RETURN(code);
×
3395
    }
3396
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
3397
      sdbFreeRaw(pVgRaw);
×
3398
      TAOS_RETURN(code);
×
3399
    }
3400
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3401
    if (code != 0) {
×
3402
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3403
             __LINE__);
3404
      TAOS_RETURN(code);
×
3405
    }
3406
  } else if (newVgroup.replica == 3 && pNewDb->cfg.replications == 1) {
×
3407
    mInfo("db:%s, vgId:%d, will remove 2 vnodes, vn:0 dnode:%d vn:1 dnode:%d vn:2 dnode:%d", pVgroup->dbName,
×
3408
          pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId, pVgroup->vnodeGid[1].dnodeId, pVgroup->vnodeGid[2].dnodeId);
3409

3410
    SVnodeGid del1 = {0};
×
3411
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroupWithoutSave(pMnode, pTrans, &newVgroup, pArray, &del1));
×
3412

3413
    TAOS_CHECK_RETURN(
×
3414
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3415

3416
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3417

3418
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del1, true));
×
3419

3420
    SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
×
3421
    if (pVgRaw == NULL) {
×
3422
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3423
      if (terrno != 0) code = terrno;
×
3424
      TAOS_RETURN(code);
×
3425
    }
3426
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
3427
      sdbFreeRaw(pVgRaw);
×
3428
      TAOS_RETURN(code);
×
3429
    }
3430
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3431
    if (code != 0) {
×
3432
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3433
             __LINE__);
3434
      TAOS_RETURN(code);
×
3435
    }
3436

3437
    SVnodeGid del2 = {0};
×
3438
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroupWithoutSave(pMnode, pTrans, &newVgroup, pArray, &del2));
×
3439

3440
    TAOS_CHECK_RETURN(
×
3441
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3442

3443
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3444

3445
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del2, true));
×
3446

3447
    pVgRaw = mndVgroupActionEncode(&newVgroup);
×
3448
    if (pVgRaw == NULL) {
×
3449
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3450
      if (terrno != 0) code = terrno;
×
3451
      TAOS_RETURN(code);
×
3452
    }
3453
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
3454
      sdbFreeRaw(pVgRaw);
×
3455
      TAOS_RETURN(code);
×
3456
    }
3457
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3458
    if (code != 0) {
×
3459
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3460
             __LINE__);
3461
      TAOS_RETURN(code);
×
3462
    }
3463
  } else {
3464
    return -1;
×
3465
  }
3466

3467
  mndSortVnodeGid(&newVgroup);
×
3468

3469
  {
3470
    SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
×
3471
    if (pVgRaw == NULL) {
×
3472
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3473
      if (terrno != 0) code = terrno;
×
3474
      TAOS_RETURN(code);
×
3475
    }
3476
    if ((code = mndTransAppendCommitlog(pTrans, pVgRaw)) != 0) {
×
3477
      sdbFreeRaw(pVgRaw);
×
3478
      TAOS_RETURN(code);
×
3479
    }
3480
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3481
    if (code != 0) {
×
3482
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3483
             __LINE__);
3484
      TAOS_RETURN(code);
×
3485
    }
3486
  }
3487

3488
  TAOS_RETURN(code);
×
3489
}
3490

3491
int32_t mndBuildRestoreAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *db, SVgObj *pVgroup, SDnodeObj *pDnode,
2,455✔
3492
                                         SDnodeObj *pAnotherDnode) {
3493
  int32_t code = 0;
2,455✔
3494
  SVgObj  newVgroup = {0};
2,455✔
3495
  memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
2,455✔
3496

3497
  mInfo("trans:%d, db:%s, vgId:%d, restore vnodes, vn:0 dnode:%d", pTrans->id, pVgroup->dbName, pVgroup->vgId,
2,455✔
3498
        pVgroup->vnodeGid[0].dnodeId);
3499

3500
  if (newVgroup.replica == 1) {
2,455✔
3501
    int selected = 0;
×
3502
    for (int i = 0; i < newVgroup.replica; i++) {
×
3503
      newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3504
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3505
        selected = i;
×
3506
      }
3507
    }
3508
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, &newVgroup.vnodeGid[selected]));
×
3509
  } else if (newVgroup.replica == 2) {
2,455✔
3510
    for (int i = 0; i < newVgroup.replica; i++) {
×
3511
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3512
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3513
      } else {
3514
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3515
      }
3516
    }
3517
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pAnotherDnode));
×
3518

3519
    for (int i = 0; i < newVgroup.replica; i++) {
×
3520
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3521
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3522
      } else {
3523
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3524
      }
3525
    }
3526
    TAOS_CHECK_RETURN(mndRestoreAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, pDnode));
×
3527

3528
    for (int i = 0; i < newVgroup.replica; i++) {
×
3529
      newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3530
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3531
      }
3532
    }
3533
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pDnode));
×
3534
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pAnotherDnode));
×
3535
  } else if (newVgroup.replica == 3) {
2,455✔
3536
    for (int i = 0; i < newVgroup.replica; i++) {
9,820✔
3537
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
7,365✔
3538
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_LEARNER;
2,455✔
3539
      } else {
3540
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
4,910✔
3541
      }
3542
    }
3543
    TAOS_CHECK_RETURN(mndRestoreAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, pDnode));
2,455✔
3544

3545
    for (int i = 0; i < newVgroup.replica; i++) {
9,820✔
3546
      newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
7,365✔
3547
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
7,365✔
3548
      }
3549
    }
3550
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pDnode));
2,455✔
3551
  }
3552
  SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
2,455✔
3553
  if (pVgRaw == NULL) {
2,455✔
3554
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3555
    if (terrno != 0) code = terrno;
×
3556
    TAOS_RETURN(code);
×
3557
  }
3558
  if ((code = mndTransAppendCommitlog(pTrans, pVgRaw)) != 0) {
2,455✔
3559
    sdbFreeRaw(pVgRaw);
×
3560
    TAOS_RETURN(code);
×
3561
  }
3562
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
2,455✔
3563
  if (code != 0) {
2,455✔
3564
    mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code), __LINE__);
×
3565
    TAOS_RETURN(code);
×
3566
  }
3567

3568
  TAOS_RETURN(code);
2,455✔
3569
}
3570

3571
static int32_t mndAddAdjustVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
×
3572
  return 0;
×
3573
}
3574

3575
typedef int32_t (*FpTransActionCb)(STrans *pTrans, SSdbRaw *pRaw);
3576

3577
static int32_t mndAddVgStatusAction(STrans *pTrans, SVgObj *pVg, ESdbStatus vgStatus, ETrnStage stage) {
91,098✔
3578
  int32_t         code = 0;
91,098✔
3579
  FpTransActionCb appendActionCb = (stage == TRN_STAGE_COMMIT_ACTION) ? mndTransAppendCommitlog : mndTransAppendRedolog;
91,098✔
3580
  SSdbRaw        *pRaw = mndVgroupActionEncode(pVg);
91,098✔
3581
  if (pRaw == NULL) {
91,098✔
3582
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3583
    if (terrno != 0) code = terrno;
×
3584
    goto _err;
×
3585
  }
3586
  if ((code = appendActionCb(pTrans, pRaw)) != 0) goto _err;
91,098✔
3587
  code = sdbSetRawStatus(pRaw, vgStatus);
91,098✔
3588
  if (code != 0) {
91,098✔
3589
    mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", pVg->vgId, tstrerror(code), __LINE__);
×
3590
    goto _err;
×
3591
  }
3592
  pRaw = NULL;
91,098✔
3593
  TAOS_RETURN(code);
91,098✔
3594
_err:
×
3595
  sdbFreeRaw(pRaw);
×
3596
  TAOS_RETURN(code);
×
3597
}
3598

3599
static int32_t mndAddDbStatusAction(STrans *pTrans, SDbObj *pDb, ESdbStatus dbStatus, ETrnStage stage) {
37,808✔
3600
  int32_t         code = 0;
37,808✔
3601
  FpTransActionCb appendActionCb = (stage == TRN_STAGE_COMMIT_ACTION) ? mndTransAppendCommitlog : mndTransAppendRedolog;
37,808✔
3602
  SSdbRaw        *pRaw = mndDbActionEncode(pDb);
37,808✔
3603
  if (pRaw == NULL) {
37,808✔
3604
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3605
    if (terrno != 0) code = terrno;
×
3606
    goto _err;
×
3607
  }
3608
  if ((code = appendActionCb(pTrans, pRaw)) != 0) goto _err;
37,808✔
3609
  code = sdbSetRawStatus(pRaw, dbStatus);
37,808✔
3610
  if (code != 0) {
37,808✔
3611
    mError("db:%s, failed to set raw status to ready, error:%s, line:%d", pDb->name, tstrerror(code), __LINE__);
×
3612
    goto _err;
×
3613
  }
3614
  pRaw = NULL;
37,808✔
3615
  TAOS_RETURN(code);
37,808✔
3616
_err:
×
3617
  sdbFreeRaw(pRaw);
×
3618
  TAOS_RETURN(code);
×
3619
}
3620

3621
int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) {
20,924✔
3622
  int32_t code = -1;
20,924✔
3623
  STrans *pTrans = NULL;
20,924✔
3624
  SDbObj  dbObj = {0};
20,924✔
3625
  SArray *pArray = mndBuildDnodesArray(pMnode, 0, NULL);
20,924✔
3626

3627
#if defined(USE_SHARED_STORAGE)
3628
  if (tsSsEnabled) {
20,924✔
3629
    code = TSDB_CODE_OPS_NOT_SUPPORT;
×
3630
    mError("vgId:%d, db:%s, shared storage exists, split vgroup not allowed", pVgroup->vgId, pVgroup->dbName);
×
3631
    goto _OVER;
×
3632
  }
3633
#endif
3634

3635
  /*
3636
    if (pDb->cfg.withArbitrator) {
3637
      code = TSDB_CODE_OPS_NOT_SUPPORT;
3638
      mError("vgId:%d, db:%s, with arbitrator, split vgroup not allowed", pVgroup->vgId, pVgroup->dbName);
3639
      goto _OVER;
3640
    }
3641
  */
3642

3643
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "split-vgroup");
20,924✔
3644
  if (pTrans == NULL) {
20,924✔
3645
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3646
    if (terrno != 0) code = terrno;
×
3647
    goto _OVER;
×
3648
  }
3649
  mndTransSetSerial(pTrans);
20,924✔
3650
  mInfo("trans:%d, used to split vgroup, vgId:%d", pTrans->id, pVgroup->vgId);
20,924✔
3651

3652
  mndTransSetDbName(pTrans, pDb->name, NULL);
20,924✔
3653
  TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
20,924✔
3654
  TAOS_CHECK_GOTO(mndTransCheckConflictWithRetention(pMnode, pTrans), NULL, _OVER);
20,856✔
3655

3656
  SVgObj newVg1 = {0};
20,856✔
3657
  memcpy(&newVg1, pVgroup, sizeof(SVgObj));
20,856✔
3658
  mInfo("vgId:%d, vgroup info before split, replica:%d hashBegin:%u hashEnd:%u", newVg1.vgId, newVg1.replica,
20,856✔
3659
        newVg1.hashBegin, newVg1.hashEnd);
3660
  for (int32_t i = 0; i < newVg1.replica; ++i) {
69,308✔
3661
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg1.vgId, i, newVg1.vnodeGid[i].dnodeId);
48,452✔
3662
  }
3663

3664
  if (newVg1.replica == 1) {
20,856✔
3665
    TAOS_CHECK_GOTO(mndAddVnodeToVgroup(pMnode, pTrans, &newVg1, pArray), NULL, _OVER);
6,688✔
3666

3667
    newVg1.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
6,688✔
3668
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId), NULL,
6,688✔
3669
                    _OVER);
3670
    TAOS_CHECK_GOTO(mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg1, &newVg1.vnodeGid[1]), NULL, _OVER);
6,688✔
3671

3672
    newVg1.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
6,688✔
3673
    TAOS_CHECK_GOTO(mndAddAlterVnodeTypeAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[1].dnodeId), NULL, _OVER);
6,688✔
3674
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId), NULL,
6,688✔
3675
                    _OVER);
3676

3677
    TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1), NULL, _OVER);
6,688✔
3678
  } else if (newVg1.replica == 3) {
14,168✔
3679
    SVnodeGid del1 = {0};
13,428✔
3680
    TAOS_CHECK_GOTO(mndRemoveVnodeFromVgroup(pMnode, pTrans, &newVg1, pArray, &del1), NULL, _OVER);
13,428✔
3681
    TAOS_CHECK_GOTO(mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg1, &del1, true), NULL, _OVER);
12,529✔
3682
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId), NULL,
12,529✔
3683
                    _OVER);
3684
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[1].dnodeId), NULL,
12,529✔
3685
                    _OVER);
3686
  } else {
3687
    // goto _OVER;
3688
  }
3689

3690
  for (int32_t i = 0; i < newVg1.replica; ++i) {
59,871✔
3691
    TAOS_CHECK_GOTO(mndAddDisableVnodeWriteAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[i].dnodeId), NULL,
39,914✔
3692
                    _OVER);
3693
  }
3694
  TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1), NULL, _OVER);
19,957✔
3695

3696
  SVgObj newVg2 = {0};
19,957✔
3697
  memcpy(&newVg2, &newVg1, sizeof(SVgObj));
19,957✔
3698
  newVg1.replica = 1;
19,957✔
3699
  newVg1.hashEnd = newVg1.hashBegin / 2 + newVg1.hashEnd / 2;
19,957✔
3700
  memset(&newVg1.vnodeGid[1], 0, sizeof(SVnodeGid));
19,957✔
3701

3702
  newVg2.replica = 1;
19,957✔
3703
  newVg2.hashBegin = newVg1.hashEnd + 1;
19,957✔
3704
  memcpy(&newVg2.vnodeGid[0], &newVg2.vnodeGid[1], sizeof(SVnodeGid));
19,957✔
3705
  memset(&newVg2.vnodeGid[1], 0, sizeof(SVnodeGid));
19,957✔
3706

3707
  mInfo("vgId:%d, vgroup info after split, replica:%d hashrange:[%u, %u] vnode:0 dnode:%d", newVg1.vgId, newVg1.replica,
19,957✔
3708
        newVg1.hashBegin, newVg1.hashEnd, newVg1.vnodeGid[0].dnodeId);
3709
  for (int32_t i = 0; i < newVg1.replica; ++i) {
39,914✔
3710
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg1.vgId, i, newVg1.vnodeGid[i].dnodeId);
19,957✔
3711
  }
3712
  mInfo("vgId:%d, vgroup info after split, replica:%d hashrange:[%u, %u] vnode:0 dnode:%d", newVg2.vgId, newVg2.replica,
19,957✔
3713
        newVg2.hashBegin, newVg2.hashEnd, newVg2.vnodeGid[0].dnodeId);
3714
  for (int32_t i = 0; i < newVg1.replica; ++i) {
39,914✔
3715
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg2.vgId, i, newVg2.vnodeGid[i].dnodeId);
19,957✔
3716
  }
3717

3718
  // alter vgId and hash range
3719
  int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
19,957✔
3720
  int32_t srcVgId = newVg1.vgId;
19,957✔
3721
  newVg1.vgId = maxVgId;
19,957✔
3722
  TAOS_CHECK_GOTO(mndAddNewVgPrepareAction(pMnode, pTrans, &newVg1), NULL, _OVER);
19,957✔
3723
  TAOS_CHECK_GOTO(mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg1), NULL, _OVER);
19,957✔
3724

3725
  maxVgId++;
19,957✔
3726
  srcVgId = newVg2.vgId;
19,957✔
3727
  newVg2.vgId = maxVgId;
19,957✔
3728
  TAOS_CHECK_GOTO(mndAddNewVgPrepareAction(pMnode, pTrans, &newVg2), NULL, _OVER);
19,957✔
3729
  TAOS_CHECK_GOTO(mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg2), NULL, _OVER);
19,957✔
3730

3731
  TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1), NULL, _OVER);
19,957✔
3732
  TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg2), NULL, _OVER);
19,957✔
3733

3734
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg1, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION), NULL, _OVER);
19,957✔
3735
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg2, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION), NULL, _OVER);
19,957✔
3736
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, pVgroup, SDB_STATUS_DROPPED, TRN_STAGE_REDO_ACTION), NULL, _OVER);
19,957✔
3737

3738
  // update db status
3739
  memcpy(&dbObj, pDb, sizeof(SDbObj));
19,957✔
3740
  if (dbObj.cfg.pRetensions != NULL) {
19,957✔
3741
    dbObj.cfg.pRetensions = taosArrayDup(pDb->cfg.pRetensions, NULL);
×
3742
    if (dbObj.cfg.pRetensions == NULL) {
×
3743
      code = terrno;
×
3744
      goto _OVER;
×
3745
    }
3746
  }
3747
  dbObj.vgVersion++;
19,957✔
3748
  dbObj.updateTime = taosGetTimestampMs();
19,957✔
3749
  dbObj.cfg.numOfVgroups++;
19,957✔
3750
  TAOS_CHECK_GOTO(mndAddDbStatusAction(pTrans, &dbObj, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION), NULL, _OVER);
19,957✔
3751

3752
  // adjust vgroup replica
3753
  if (pDb->cfg.replications != newVg1.replica) {
19,957✔
3754
    SVgObj tmpGroup = {0};
13,269✔
3755
    TAOS_CHECK_GOTO(mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg1, pArray, &tmpGroup), NULL, _OVER);
13,269✔
3756
  } else {
3757
    TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg1, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
6,688✔
3758
  }
3759

3760
  if (pDb->cfg.replications != newVg2.replica) {
19,050✔
3761
    SVgObj tmpGroup = {0};
12,362✔
3762
    TAOS_CHECK_GOTO(mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg2, pArray, &tmpGroup), NULL, _OVER);
12,362✔
3763
  } else {
3764
    TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg2, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
6,688✔
3765
  }
3766

3767
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, pVgroup, SDB_STATUS_DROPPED, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
17,851✔
3768

3769
  // commit db status
3770
  dbObj.vgVersion++;
17,851✔
3771
  dbObj.updateTime = taosGetTimestampMs();
17,851✔
3772
  TAOS_CHECK_GOTO(mndAddDbStatusAction(pTrans, &dbObj, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
17,851✔
3773

3774
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
17,851✔
3775
  code = 0;
17,851✔
3776

3777
_OVER:
20,924✔
3778
  taosArrayDestroy(pArray);
20,924✔
3779
  mndTransDrop(pTrans);
20,924✔
3780
  taosArrayDestroy(dbObj.cfg.pRetensions);
20,924✔
3781
  TAOS_RETURN(code);
20,924✔
3782
}
3783

3784
extern int32_t mndProcessSplitVgroupMsgImp(SRpcMsg *pReq);
3785

3786
static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) { return mndProcessSplitVgroupMsgImp(pReq); }
21,322✔
3787

3788
#ifndef TD_ENTERPRISE
3789
int32_t mndProcessSplitVgroupMsgImp(SRpcMsg *pReq) { return 0; }
3790
#endif
3791

3792
static int32_t mndSetBalanceVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
15,640✔
3793
                                              SDnodeObj *pSrc, SDnodeObj *pDst) {
3794
  int32_t code = 0;
15,640✔
3795
  SVgObj  newVg = {0};
15,640✔
3796
  memcpy(&newVg, pVgroup, sizeof(SVgObj));
15,640✔
3797
  mInfo("vgId:%d, vgroup info before balance, replica:%d", newVg.vgId, newVg.replica);
15,640✔
3798
  for (int32_t i = 0; i < newVg.replica; ++i) {
46,064✔
3799
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
30,424✔
3800
  }
3801

3802
  TAOS_CHECK_RETURN(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pDst->id));
15,640✔
3803
  TAOS_CHECK_RETURN(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pSrc->id));
15,640✔
3804

3805
  {
3806
    SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
15,640✔
3807
    if (pRaw == NULL) {
15,640✔
3808
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3809
      if (terrno != 0) code = terrno;
×
3810
      TAOS_RETURN(code);
×
3811
    }
3812
    if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
15,640✔
3813
      sdbFreeRaw(pRaw);
×
3814
      TAOS_RETURN(code);
×
3815
    }
3816
    code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
15,640✔
3817
    if (code != 0) {
15,640✔
3818
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
3819
      TAOS_RETURN(code);
×
3820
    }
3821
  }
3822

3823
  mInfo("vgId:%d, vgroup info after balance, replica:%d", newVg.vgId, newVg.replica);
15,640✔
3824
  for (int32_t i = 0; i < newVg.replica; ++i) {
46,064✔
3825
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
30,424✔
3826
  }
3827
  TAOS_RETURN(code);
15,640✔
3828
}
3829

3830
static int32_t mndBalanceVgroupBetweenDnode(SMnode *pMnode, STrans *pTrans, SDnodeObj *pSrc, SDnodeObj *pDst,
15,640✔
3831
                                            SHashObj *pBalancedVgroups) {
3832
  void   *pIter = NULL;
15,640✔
3833
  int32_t code = -1;
15,640✔
3834
  SSdb   *pSdb = pMnode->pSdb;
15,640✔
3835

3836
  while (1) {
9,675✔
3837
    SVgObj *pVgroup = NULL;
25,315✔
3838
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
25,315✔
3839
    if (pIter == NULL) break;
25,315✔
3840
    if (taosHashGet(pBalancedVgroups, &pVgroup->vgId, sizeof(int32_t)) != NULL) {
25,315✔
3841
      sdbRelease(pSdb, pVgroup);
8,865✔
3842
      continue;
8,865✔
3843
    }
3844

3845
    bool existInSrc = false;
16,450✔
3846
    bool existInDst = false;
16,450✔
3847
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
47,684✔
3848
      SVnodeGid *pGid = &pVgroup->vnodeGid[i];
31,234✔
3849
      if (pGid->dnodeId == pSrc->id) existInSrc = true;
31,234✔
3850
      if (pGid->dnodeId == pDst->id) existInDst = true;
31,234✔
3851
    }
3852

3853
    if (!existInSrc || existInDst) {
16,450✔
3854
      sdbRelease(pSdb, pVgroup);
810✔
3855
      continue;
810✔
3856
    }
3857

3858
    SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
15,640✔
3859
    if (pDb == NULL) {
15,640✔
3860
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3861
      if (terrno != 0) code = terrno;
×
3862
      mError("vgId:%d, balance vgroup can't find db obj dbName:%s", pVgroup->vgId, pVgroup->dbName);
×
3863
      goto _OUT;
×
3864
    }
3865

3866
    if (pDb->cfg.withArbitrator) {
15,640✔
3867
      mInfo("vgId:%d, db:%s, with arbitrator, balance vgroup not allowed", pVgroup->vgId, pVgroup->dbName);
×
3868
      goto _OUT;
×
3869
    }
3870

3871
    code = mndSetBalanceVgroupInfoToTrans(pMnode, pTrans, pDb, pVgroup, pSrc, pDst);
15,640✔
3872
    if (code == 0) {
15,640✔
3873
      code = taosHashPut(pBalancedVgroups, &pVgroup->vgId, sizeof(int32_t), &pVgroup->vgId, sizeof(int32_t));
15,640✔
3874
    }
3875

3876
  _OUT:
15,640✔
3877
    mndReleaseDb(pMnode, pDb);
15,640✔
3878
    sdbRelease(pSdb, pVgroup);
15,640✔
3879
    sdbCancelFetch(pSdb, pIter);
15,640✔
3880
    break;
15,640✔
3881
  }
3882

3883
  return code;
15,640✔
3884
}
3885

3886
static int32_t mndBalanceVgroup(SMnode *pMnode, SRpcMsg *pReq, SArray *pArray) {
10,210✔
3887
  int32_t   code = -1;
10,210✔
3888
  int32_t   numOfVgroups = 0;
10,210✔
3889
  STrans   *pTrans = NULL;
10,210✔
3890
  SHashObj *pBalancedVgroups = NULL;
10,210✔
3891

3892
  pBalancedVgroups = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
10,210✔
3893
  if (pBalancedVgroups == NULL) goto _OVER;
10,210✔
3894

3895
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "balance-vgroup");
10,210✔
3896
  if (pTrans == NULL) {
10,210✔
3897
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3898
    if (terrno != 0) code = terrno;
×
3899
    goto _OVER;
×
3900
  }
3901
  mndTransSetSerial(pTrans);
10,210✔
3902
  mInfo("trans:%d, used to balance vgroup", pTrans->id);
10,210✔
3903
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
10,210✔
3904
  TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
9,802✔
3905
  TAOS_CHECK_GOTO(mndTransCheckConflictWithRetention(pMnode, pTrans), NULL, _OVER);
9,734✔
3906

3907
  while (1) {
15,640✔
3908
    taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
25,374✔
3909
    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
109,569✔
3910
      SDnodeObj *pDnode = taosArrayGet(pArray, i);
84,195✔
3911
      mInfo("dnode:%d, equivalent vnodes:%d others:%d support:%d, score:%f", pDnode->id, pDnode->numOfVnodes,
84,195✔
3912
            pDnode->numOfSupportVnodes, pDnode->numOfOtherNodes, mndGetDnodeScore(pDnode, 0, 1));
3913
    }
3914

3915
    SDnodeObj *pSrc = taosArrayGet(pArray, taosArrayGetSize(pArray) - 1);
25,374✔
3916
    SDnodeObj *pDst = taosArrayGet(pArray, 0);
25,374✔
3917

3918
    float srcScore = mndGetDnodeScore(pSrc, -1, 1);
25,374✔
3919
    float dstScore = mndGetDnodeScore(pDst, 1, 1);
25,374✔
3920
    mInfo("trans:%d, after balance, src dnode:%d score:%f, dst dnode:%d score:%f", pTrans->id, pSrc->id, dstScore,
25,374✔
3921
          pDst->id, dstScore);
3922

3923
    if (srcScore > dstScore - 0.000001) {
25,374✔
3924
      code = mndBalanceVgroupBetweenDnode(pMnode, pTrans, pSrc, pDst, pBalancedVgroups);
15,640✔
3925
      if (code == 0) {
15,640✔
3926
        pSrc->numOfVnodes--;
15,640✔
3927
        pDst->numOfVnodes++;
15,640✔
3928
        numOfVgroups++;
15,640✔
3929
        continue;
15,640✔
3930
      } else {
3931
        mInfo("trans:%d, no vgroup need to balance from dnode:%d to dnode:%d", pTrans->id, pSrc->id, pDst->id);
×
3932
        break;
×
3933
      }
3934
    } else {
3935
      mInfo("trans:%d, no vgroup need to balance any more", pTrans->id);
9,734✔
3936
      break;
9,734✔
3937
    }
3938
  }
3939

3940
  if (numOfVgroups <= 0) {
9,734✔
3941
    mInfo("no need to balance vgroup");
×
3942
    code = 0;
×
3943
  } else {
3944
    mInfo("start to balance vgroup, numOfVgroups:%d", numOfVgroups);
9,734✔
3945
    if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
9,734✔
3946
    code = TSDB_CODE_ACTION_IN_PROGRESS;
9,734✔
3947
  }
3948

3949
_OVER:
10,210✔
3950
  taosHashCleanup(pBalancedVgroups);
10,210✔
3951
  mndTransDrop(pTrans);
10,210✔
3952
  TAOS_RETURN(code);
10,210✔
3953
}
3954

3955
static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) {
11,732✔
3956
  SMnode *pMnode = pReq->info.node;
11,732✔
3957
  int32_t code = -1;
11,732✔
3958
  SArray *pArray = NULL;
11,732✔
3959
  void   *pIter = NULL;
11,732✔
3960
  int64_t curMs = taosGetTimestampMs();
11,732✔
3961
  int64_t tss = taosGetTimestampMs();
11,732✔
3962

3963
  SBalanceVgroupReq req = {0};
11,732✔
3964
  if (tDeserializeSBalanceVgroupReq(pReq->pCont, pReq->contLen, &req) != 0) {
11,732✔
3965
    code = TSDB_CODE_INVALID_MSG;
×
3966
    goto _OVER;
×
3967
  }
3968

3969
  mInfo("start to balance vgroup");
11,732✔
3970
  if ((code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_BALANCE_VGROUP)) != 0) {
11,732✔
3971
    goto _OVER;
×
3972
  }
3973

3974
  if (sdbGetSize(pMnode->pSdb, SDB_MOUNT) > 0) {
11,732✔
3975
    code = TSDB_CODE_MND_MOUNT_NOT_EMPTY;
×
3976
    goto _OVER;
×
3977
  }
3978

3979
  while (1) {
35,873✔
3980
    SDnodeObj *pDnode = NULL;
47,605✔
3981
    pIter = sdbFetch(pMnode->pSdb, SDB_DNODE, pIter, (void **)&pDnode);
47,605✔
3982
    if (pIter == NULL) break;
47,605✔
3983
    if (!mndIsDnodeOnline(pDnode, curMs)) {
37,395✔
3984
      sdbCancelFetch(pMnode->pSdb, pIter);
1,522✔
3985
      code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
1,522✔
3986
      mError("failed to balance vgroup since %s, dnode:%d", terrstr(), pDnode->id);
1,522✔
3987
      sdbRelease(pMnode->pSdb, pDnode);
1,522✔
3988
      goto _OVER;
1,522✔
3989
    }
3990

3991
    sdbRelease(pMnode->pSdb, pDnode);
35,873✔
3992
  }
3993

3994
  pArray = mndBuildDnodesArray(pMnode, 0, NULL);
10,210✔
3995
  if (pArray == NULL) {
10,210✔
3996
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3997
    if (terrno != 0) code = terrno;
×
3998
    goto _OVER;
×
3999
  }
4000

4001
  if (taosArrayGetSize(pArray) < 2) {
10,210✔
4002
    mInfo("no need to balance vgroup since dnode num less than 2");
×
4003
    code = 0;
×
4004
  } else {
4005
    code = mndBalanceVgroup(pMnode, pReq, pArray);
10,210✔
4006
  }
4007

4008
  if (tsAuditLevel >= AUDIT_LEVEL_CLUSTER) {
10,210✔
4009
    int64_t tse = taosGetTimestampMs();
10,210✔
4010
    double  duration = (double)(tse - tss);
10,210✔
4011
    duration = duration / 1000;
10,210✔
4012
    auditRecord(pReq, pMnode->clusterId, "balanceVgroup", "", "", req.sql, req.sqlLen, duration, 0);
10,210✔
4013
  }
4014

4015
_OVER:
11,732✔
4016
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
11,732✔
4017
    mError("failed to balance vgroup since %s", tstrerror(code));
1,998✔
4018
  }
4019

4020
  taosArrayDestroy(pArray);
11,732✔
4021
  tFreeSBalanceVgroupReq(&req);
11,732✔
4022
  TAOS_RETURN(code);
11,732✔
4023
}
4024

4025
bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; }
135,713,520✔
4026

4027
bool mndVgroupInDnode(SVgObj *pVgroup, int32_t dnodeId) {
6,776✔
4028
  for (int i = 0; i < pVgroup->replica; i++) {
18,660✔
4029
    if (pVgroup->vnodeGid[i].dnodeId == dnodeId) return true;
16,119✔
4030
  }
4031
  return false;
2,541✔
4032
}
4033

4034
static void *mndBuildCompactVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen, int64_t compactTs,
123,660✔
4035
                                     STimeWindow tw, bool metaOnly, bool force, ETsdbOpType type,
4036
                                     ETriggerType triggerType) {
4037
  SCompactVnodeReq compactReq = {0};
123,660✔
4038
  compactReq.dbUid = pDb->uid;
123,660✔
4039
  compactReq.compactStartTime = compactTs;
123,660✔
4040
  compactReq.tw = tw;
123,660✔
4041
  compactReq.metaOnly = metaOnly;
123,660✔
4042
  compactReq.force = force;
123,660✔
4043
  compactReq.optrType = type;
123,660✔
4044
  compactReq.triggerType = triggerType;
123,660✔
4045
  tstrncpy(compactReq.db, pDb->name, TSDB_DB_FNAME_LEN);
123,660✔
4046

4047
  mInfo("vgId:%d, build compact vnode config req", pVgroup->vgId);
123,660✔
4048
  int32_t contLen = tSerializeSCompactVnodeReq(NULL, 0, &compactReq);
123,660✔
4049
  if (contLen < 0) {
123,660✔
4050
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
4051
    return NULL;
×
4052
  }
4053
  contLen += sizeof(SMsgHead);
123,660✔
4054

4055
  void *pReq = taosMemoryMalloc(contLen);
123,660✔
4056
  if (pReq == NULL) {
123,660✔
4057
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
4058
    return NULL;
×
4059
  }
4060

4061
  SMsgHead *pHead = pReq;
123,660✔
4062
  pHead->contLen = htonl(contLen);
123,660✔
4063
  pHead->vgId = htonl(pVgroup->vgId);
123,660✔
4064

4065
  if (tSerializeSCompactVnodeReq((char *)pReq + sizeof(SMsgHead), contLen, &compactReq) < 0) {
123,660✔
4066
    taosMemoryFree(pReq);
×
4067
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
4068
    return NULL;
×
4069
  }
4070
  *pContLen = contLen;
123,660✔
4071
  return pReq;
123,660✔
4072
}
4073

4074
static int32_t mndAddCompactVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs,
63,325✔
4075
                                        STimeWindow tw, bool metaOnly, bool force, ETsdbOpType type,
4076
                                        ETriggerType triggerType) {
4077
  int32_t      code = 0;
63,325✔
4078
  STransAction action = {0};
63,325✔
4079
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
63,325✔
4080

4081
  int32_t contLen = 0;
63,325✔
4082
  void   *pReq =
4083
      mndBuildCompactVnodeReq(pMnode, pDb, pVgroup, &contLen, compactTs, tw, metaOnly, force, type, triggerType);
63,325✔
4084
  if (pReq == NULL) {
63,325✔
4085
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
4086
    if (terrno != 0) code = terrno;
×
4087
    TAOS_RETURN(code);
×
4088
  }
4089

4090
  action.pCont = pReq;
63,325✔
4091
  action.contLen = contLen;
63,325✔
4092
  action.msgType = TDMT_VND_COMPACT;
63,325✔
4093

4094
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
63,325✔
4095
    taosMemoryFree(pReq);
×
4096
    TAOS_RETURN(code);
×
4097
  }
4098

4099
  TAOS_RETURN(code);
63,325✔
4100
}
4101

4102
int32_t mndBuildCompactVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs,
63,325✔
4103
                                    STimeWindow tw, bool metaOnly, bool force, ETsdbOpType type,
4104
                                    ETriggerType triggerType) {
4105
  TAOS_CHECK_RETURN(
63,325✔
4106
      mndAddCompactVnodeAction(pMnode, pTrans, pDb, pVgroup, compactTs, tw, metaOnly, force, type, triggerType));
4107
  return 0;
63,325✔
4108
}
4109

4110
int32_t mndBuildTrimVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t startTs,
60,335✔
4111
                                 STimeWindow tw, ETsdbOpType type, ETriggerType triggerType) {
4112
  int32_t      code = 0;
60,335✔
4113
  STransAction action = {0};
60,335✔
4114
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
60,335✔
4115

4116
  int32_t contLen = 0;
60,335✔
4117
  // reuse SCompactVnodeReq as SVTrimDbReq
4118
  void *pReq = mndBuildCompactVnodeReq(pMnode, pDb, pVgroup, &contLen, startTs, tw, false, false, type, triggerType);
60,335✔
4119
  if (pReq == NULL) {
60,335✔
4120
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
4121
    if (terrno != 0) code = terrno;
×
4122
    TAOS_RETURN(code);
×
4123
  }
4124

4125
  action.pCont = pReq;
60,335✔
4126
  action.contLen = contLen;
60,335✔
4127
  action.msgType = TDMT_VND_TRIM;
60,335✔
4128

4129
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
60,335✔
4130
    taosMemoryFree(pReq);
×
4131
    TAOS_RETURN(code);
×
4132
  }
4133

4134
  TAOS_RETURN(code);
60,335✔
4135
}
4136

4137
static int32_t mndProcessSetVgroupKeepVersionReq(SRpcMsg *pReq) {
1,124✔
4138
  SMnode *pMnode = pReq->info.node;
1,124✔
4139
  int32_t code = TSDB_CODE_SUCCESS;
1,124✔
4140
  STrans *pTrans = NULL;
1,124✔
4141
  SVgObj *pVgroup = NULL;
1,124✔
4142

4143
  SMndSetVgroupKeepVersionReq req = {0};
1,124✔
4144
  if (tDeserializeSMndSetVgroupKeepVersionReq(pReq->pCont, pReq->contLen, &req) != 0) {
1,124✔
4145
    code = TSDB_CODE_INVALID_MSG;
×
4146
    goto _OVER;
×
4147
  }
4148

4149
  mInfo("start to set vgroup keep version, vgId:%d, keepVersion:%" PRId64, req.vgId, req.keepVersion);
1,124✔
4150

4151
  // Check permission
4152
  if ((code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_WRITE_DB)) != 0) {
1,124✔
4153
    goto _OVER;
×
4154
  }
4155

4156
  // Get vgroup
4157
  pVgroup = mndAcquireVgroup(pMnode, req.vgId);
1,124✔
4158
  if (pVgroup == NULL) {
1,124✔
4159
    code = TSDB_CODE_MND_VGROUP_NOT_EXIST;
×
4160
    mError("vgId:%d not exist, failed to set keep version", req.vgId);
×
4161
    goto _OVER;
×
4162
  }
4163

4164
  // Create transaction
4165
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "set-vgroup-keep-version");
1,124✔
4166
  if (pTrans == NULL) {
1,124✔
4167
    code = terrno != 0 ? terrno : TSDB_CODE_MND_RETURN_VALUE_NULL;
×
4168
    mndReleaseVgroup(pMnode, pVgroup);
×
4169
    goto _OVER;
×
4170
  }
4171

4172
  mndTransSetSerial(pTrans);
1,124✔
4173
  mInfo("trans:%d, used to set vgroup keep version, vgId:%d keepVersion:%" PRId64, pTrans->id, req.vgId,
1,124✔
4174
        req.keepVersion);
4175

4176
  // Update SVgObj's keepVersion in mnode
4177
  SVgObj newVgroup = {0};
1,124✔
4178
  memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
1,124✔
4179
  newVgroup.keepVersion = req.keepVersion;
1,124✔
4180
  newVgroup.keepVersionTime = taosGetTimestampMs();
1,124✔
4181

4182
  // Add prepare log for SDB vgroup update (execute in PREPARE stage, before redo actions)
4183
  SSdbRaw *pCommitRaw = mndVgroupActionEncode(&newVgroup);
1,124✔
4184
  if (pCommitRaw == NULL) {
1,124✔
4185
    code = TSDB_CODE_OUT_OF_MEMORY;
×
4186
    mndReleaseVgroup(pMnode, pVgroup);
×
4187
    goto _OVER;
×
4188
  }
4189
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
1,124✔
4190
    code = terrno;
×
4191
    sdbFreeRaw(pCommitRaw);
×
4192
    mndReleaseVgroup(pMnode, pVgroup);
×
4193
    goto _OVER;
×
4194
  }
4195
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
1,124✔
4196
    mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", pVgroup->vgId, tstrerror(code), __LINE__);
×
4197
    sdbFreeRaw(pCommitRaw);
×
4198
    mndReleaseVgroup(pMnode, pVgroup);
×
4199
    goto _OVER;
×
4200
  }
4201

4202
  // Prepare message for vnodes
4203
  SVndSetKeepVersionReq vndReq = {.keepVersion = req.keepVersion};
1,124✔
4204
  int32_t               reqLen = tSerializeSVndSetKeepVersionReq(NULL, 0, &vndReq);
1,124✔
4205
  int32_t               contLen = reqLen + sizeof(SMsgHead);
1,124✔
4206

4207
  // Send to all replicas of the vgroup
4208
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
4,496✔
4209
    SMsgHead *pHead = taosMemoryMalloc(contLen);
3,372✔
4210
    if (pHead == NULL) {
3,372✔
4211
      code = TSDB_CODE_OUT_OF_MEMORY;
×
4212
      mndReleaseVgroup(pMnode, pVgroup);
×
4213
      goto _OVER;
×
4214
    }
4215

4216
    pHead->contLen = htonl(contLen);
3,372✔
4217
    pHead->vgId = htonl(pVgroup->vgId);
3,372✔
4218

4219
    if (tSerializeSVndSetKeepVersionReq((char *)pHead + sizeof(SMsgHead), reqLen, &vndReq) < 0) {
3,372✔
4220
      taosMemoryFree(pHead);
×
4221
      code = TSDB_CODE_OUT_OF_MEMORY;
×
4222
      mndReleaseVgroup(pMnode, pVgroup);
×
4223
      goto _OVER;
×
4224
    }
4225

4226
    // Get dnode and add action to transaction
4227
    SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgroup->vnodeGid[i].dnodeId);
3,372✔
4228
    if (pDnode == NULL) {
3,372✔
4229
      taosMemoryFree(pHead);
×
4230
      code = TSDB_CODE_MND_DNODE_NOT_EXIST;
×
4231
      mndReleaseVgroup(pMnode, pVgroup);
×
4232
      goto _OVER;
×
4233
    }
4234

4235
    STransAction action = {0};
3,372✔
4236
    action.epSet = mndGetDnodeEpset(pDnode);
3,372✔
4237
    mndReleaseDnode(pMnode, pDnode);
3,372✔
4238
    action.pCont = pHead;
3,372✔
4239
    action.contLen = contLen;
3,372✔
4240
    action.msgType = TDMT_VND_SET_KEEP_VERSION;
3,372✔
4241
    action.acceptableCode = TSDB_CODE_VND_STOPPED;
3,372✔
4242

4243
    if (mndTransAppendRedoAction(pTrans, &action) != 0) {
3,372✔
4244
      taosMemoryFree(pHead);
×
4245
      code = terrno;
×
4246
      mndReleaseVgroup(pMnode, pVgroup);
×
4247
      goto _OVER;
×
4248
    }
4249
  }
4250

4251
  mndReleaseVgroup(pMnode, pVgroup);
1,124✔
4252

4253
  // Prepare and execute transaction
4254
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
1,124✔
4255
    goto _OVER;
×
4256
  }
4257

4258
  code = TSDB_CODE_ACTION_IN_PROGRESS;
1,124✔
4259

4260
_OVER:
1,124✔
4261
  if (pTrans != NULL) mndTransDrop(pTrans);
1,124✔
4262

4263
  return code;
1,124✔
4264
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc