• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4875

09 Dec 2025 01:22AM UTC coverage: 64.472% (-0.2%) from 64.623%
#4875

push

travis-ci

guanshengliang
fix: temporarily disable memory leak detection for UDF tests (#33856)

162014 of 251293 relevant lines covered (64.47%)

104318075.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.74
/source/dnode/mnode/impl/src/mndVgroup.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndVgroup.h"
18
#include "audit.h"
19
#include "mndArbGroup.h"
20
#include "mndDb.h"
21
#include "mndDnode.h"
22
#include "mndEncryptAlgr.h"
23
#include "mndMnode.h"
24
#include "mndPrivilege.h"
25
#include "mndShow.h"
26
#include "mndStb.h"
27
#include "mndStream.h"
28
#include "mndTopic.h"
29
#include "mndTrans.h"
30
#include "mndUser.h"
31
#include "tmisce.h"
32

33
#define VGROUP_VER_NUMBER   1
34
#define VGROUP_RESERVE_SIZE 60
35

36
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup);
37
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup);
38
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew);
39
static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw);
40

41
static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
42
static void    mndCancelGetNextVgroup(SMnode *pMnode, void *pIter);
43
static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
44
static void    mndCancelGetNextVnode(SMnode *pMnode, void *pIter);
45

46
static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq);
47
static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq);
48
static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq);
49
static int32_t mndProcessVgroupBalanceLeaderMsg(SRpcMsg *pReq);
50
static int32_t mndProcessSetVgroupKeepVersionReq(SRpcMsg *pReq);
51

52
int32_t mndInitVgroup(SMnode *pMnode) {
504,033✔
53
  SSdbTable table = {
504,033✔
54
      .sdbType = SDB_VGROUP,
55
      .keyType = SDB_KEY_INT32,
56
      .encodeFp = (SdbEncodeFp)mndVgroupActionEncode,
57
      .decodeFp = (SdbDecodeFp)mndVgroupActionDecode,
58
      .insertFp = (SdbInsertFp)mndVgroupActionInsert,
59
      .updateFp = (SdbUpdateFp)mndVgroupActionUpdate,
60
      .deleteFp = (SdbDeleteFp)mndVgroupActionDelete,
61
      .validateFp = (SdbValidateFp)mndNewVgActionValidate,
62
  };
63

64
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndTransProcessRsp);
504,033✔
65
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_REPLICA_RSP, mndTransProcessRsp);
504,033✔
66
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIG_RSP, mndTransProcessRsp);
504,033✔
67
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIRM_RSP, mndTransProcessRsp);
504,033✔
68
  mndSetMsgHandle(pMnode, TDMT_VND_SET_KEEP_VERSION_RSP, mndTransProcessRsp);
504,033✔
69
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_HASHRANGE_RSP, mndTransProcessRsp);
504,033✔
70
  mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndTransProcessRsp);
504,033✔
71
  mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_RSP, mndTransProcessRsp);
504,033✔
72
  mndSetMsgHandle(pMnode, TDMT_VND_SCAN_RSP, mndTransProcessRsp);
504,033✔
73
  mndSetMsgHandle(pMnode, TDMT_VND_DISABLE_WRITE_RSP, mndTransProcessRsp);
504,033✔
74
  mndSetMsgHandle(pMnode, TDMT_SYNC_FORCE_FOLLOWER_RSP, mndTransProcessRsp);
504,033✔
75
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_ELECTBASELINE_RSP, mndTransProcessRsp);
504,033✔
76
  
77
  mndSetMsgHandle(pMnode, TDMT_DND_ALTER_VNODE_TYPE_RSP, mndTransProcessRsp);
504,033✔
78
  mndSetMsgHandle(pMnode, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP_RSP, mndTransProcessRsp);
504,033✔
79
  mndSetMsgHandle(pMnode, TDMT_SYNC_CONFIG_CHANGE_RSP, mndTransProcessRsp);
504,033✔
80

81
  mndSetMsgHandle(pMnode, TDMT_MND_REDISTRIBUTE_VGROUP, mndProcessRedistributeVgroupMsg);
504,033✔
82
  mndSetMsgHandle(pMnode, TDMT_MND_SPLIT_VGROUP, mndProcessSplitVgroupMsg);
504,033✔
83
  // mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessVgroupBalanceLeaderMsg);
84
  mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessBalanceVgroupMsg);
504,033✔
85
  mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP_LEADER, mndProcessVgroupBalanceLeaderMsg);
504,033✔
86
  mndSetMsgHandle(pMnode, TDMT_MND_SET_VGROUP_KEEP_VERSION, mndProcessSetVgroupKeepVersionReq);
504,033✔
87

88
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndRetrieveVgroups);
504,033✔
89
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndCancelGetNextVgroup);
504,033✔
90
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndRetrieveVnodes);
504,033✔
91
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndCancelGetNextVnode);
504,033✔
92

93
  return sdbSetTable(pMnode->pSdb, table);
504,033✔
94
}
95

96
void mndCleanupVgroup(SMnode *pMnode) {}
503,220✔
97

98
SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) {
11,658,794✔
99
  int32_t code = 0;
11,658,794✔
100
  int32_t lino = 0;
11,658,794✔
101
  terrno = TSDB_CODE_OUT_OF_MEMORY;
11,658,794✔
102

103
  SSdbRaw *pRaw = sdbAllocRaw(SDB_VGROUP, VGROUP_VER_NUMBER, sizeof(SVgObj) + VGROUP_RESERVE_SIZE);
11,658,794✔
104
  if (pRaw == NULL) goto _OVER;
11,658,794✔
105

106
  int32_t dataPos = 0;
11,658,794✔
107
  SDB_SET_INT32(pRaw, dataPos, pVgroup->vgId, _OVER)
11,658,794✔
108
  SDB_SET_INT64(pRaw, dataPos, pVgroup->createdTime, _OVER)
11,658,794✔
109
  SDB_SET_INT64(pRaw, dataPos, pVgroup->updateTime, _OVER)
11,658,794✔
110
  SDB_SET_INT32(pRaw, dataPos, pVgroup->version, _OVER)
11,658,794✔
111
  SDB_SET_INT32(pRaw, dataPos, pVgroup->hashBegin, _OVER)
11,658,794✔
112
  SDB_SET_INT32(pRaw, dataPos, pVgroup->hashEnd, _OVER)
11,658,794✔
113
  SDB_SET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, _OVER)
11,658,794✔
114
  SDB_SET_INT64(pRaw, dataPos, pVgroup->dbUid, _OVER)
11,658,794✔
115
  SDB_SET_INT8(pRaw, dataPos, pVgroup->isTsma, _OVER)
11,658,794✔
116
  SDB_SET_INT8(pRaw, dataPos, pVgroup->replica, _OVER)
11,658,794✔
117
  for (int8_t i = 0; i < pVgroup->replica; ++i) {
25,680,549✔
118
    SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
14,021,755✔
119
    SDB_SET_INT32(pRaw, dataPos, pVgid->dnodeId, _OVER)
14,021,755✔
120
  }
121
  SDB_SET_INT32(pRaw, dataPos, pVgroup->syncConfChangeVer, _OVER)
11,658,794✔
122
  SDB_SET_INT32(pRaw, dataPos, pVgroup->mountVgId, _OVER)
11,658,794✔
123
  SDB_SET_INT64(pRaw, dataPos, pVgroup->keepVersion, _OVER)
11,658,794✔
124
  SDB_SET_INT64(pRaw, dataPos, pVgroup->keepVersionTime, _OVER)
11,658,794✔
125
  SDB_SET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER)
11,658,794✔
126
  SDB_SET_DATALEN(pRaw, dataPos, _OVER)
11,658,794✔
127

128
  terrno = 0;
11,658,794✔
129

130
_OVER:
11,658,794✔
131
  if (terrno != 0) {
11,658,794✔
132
    mError("vgId:%d, failed to encode to raw:%p since %s", pVgroup->vgId, pRaw, terrstr());
×
133
    sdbFreeRaw(pRaw);
×
134
    return NULL;
×
135
  }
136

137
  mTrace("vgId:%d, encode to raw:%p, row:%p", pVgroup->vgId, pRaw, pVgroup);
11,658,794✔
138
  return pRaw;
11,658,794✔
139
}
140

141
SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
10,988,187✔
142
  int32_t code = 0;
10,988,187✔
143
  int32_t lino = 0;
10,988,187✔
144
  terrno = TSDB_CODE_OUT_OF_MEMORY;
10,988,187✔
145
  SSdbRow *pRow = NULL;
10,988,187✔
146
  SVgObj  *pVgroup = NULL;
10,988,187✔
147

148
  int8_t sver = 0;
10,988,187✔
149
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
10,988,187✔
150

151
  if (sver < 1 || sver > VGROUP_VER_NUMBER) {
10,988,187✔
152
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
153
    goto _OVER;
×
154
  }
155

156
  pRow = sdbAllocRow(sizeof(SVgObj));
10,988,187✔
157
  if (pRow == NULL) goto _OVER;
10,988,187✔
158

159
  pVgroup = sdbGetRowObj(pRow);
10,988,187✔
160
  if (pVgroup == NULL) goto _OVER;
10,988,187✔
161

162
  int32_t dataPos = 0;
10,988,187✔
163
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->vgId, _OVER)
10,988,187✔
164
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->createdTime, _OVER)
10,988,187✔
165
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->updateTime, _OVER)
10,988,187✔
166
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->version, _OVER)
10,988,187✔
167
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashBegin, _OVER)
10,988,187✔
168
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashEnd, _OVER)
10,988,187✔
169
  SDB_GET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, _OVER)
10,988,187✔
170
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->dbUid, _OVER)
10,988,187✔
171
  SDB_GET_INT8(pRaw, dataPos, &pVgroup->isTsma, _OVER)
10,988,187✔
172
  SDB_GET_INT8(pRaw, dataPos, &pVgroup->replica, _OVER)
10,988,187✔
173
  for (int8_t i = 0; i < pVgroup->replica; ++i) {
24,596,355✔
174
    SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
13,608,168✔
175
    SDB_GET_INT32(pRaw, dataPos, &pVgid->dnodeId, _OVER)
13,608,168✔
176
    if (pVgroup->replica == 1) {
13,608,168✔
177
      pVgid->syncState = TAOS_SYNC_STATE_LEADER;
9,616,320✔
178
    }
179
  }
180
  if (dataPos + 2 * sizeof(int32_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen) {
10,988,187✔
181
    SDB_GET_INT32(pRaw, dataPos, &pVgroup->syncConfChangeVer, _OVER)
10,988,187✔
182
  }
183
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->mountVgId, _OVER)
10,988,187✔
184
  if (dataPos + sizeof(int64_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen) {
10,988,187✔
185
    SDB_GET_INT64(pRaw, dataPos, &pVgroup->keepVersion, _OVER)
10,988,187✔
186
  }
187
  if (dataPos + sizeof(int64_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen) {
10,988,187✔
188
    SDB_GET_INT64(pRaw, dataPos, &pVgroup->keepVersionTime, _OVER)
10,988,187✔
189
  }
190
  SDB_GET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER)
10,988,187✔
191

192
  terrno = 0;
10,988,187✔
193

194
_OVER:
10,988,187✔
195
  if (terrno != 0) {
10,988,187✔
196
    mError("vgId:%d, failed to decode from raw:%p since %s", pVgroup == NULL ? 0 : pVgroup->vgId, pRaw, terrstr());
×
197
    taosMemoryFreeClear(pRow);
×
198
    return NULL;
×
199
  }
200

201
  mTrace("vgId:%d, decode from raw:%p, row:%p", pVgroup->vgId, pRaw, pVgroup);
10,988,187✔
202
  return pRow;
10,988,187✔
203
}
204

205
static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw) {
2,843,142✔
206
  SSdb    *pSdb = pMnode->pSdb;
2,843,142✔
207
  SSdbRow *pRow = NULL;
2,843,142✔
208
  SVgObj  *pVgroup = NULL;
2,843,142✔
209
  int      code = -1;
2,843,142✔
210

211
  pRow = mndVgroupActionDecode(pRaw);
2,843,142✔
212
  if (pRow == NULL) {
2,843,142✔
213
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
214
    if (terrno != 0) code = terrno;
×
215
    goto _OVER;
×
216
  }
217
  pVgroup = sdbGetRowObj(pRow);
2,843,142✔
218
  if (pVgroup == NULL) {
2,843,142✔
219
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
220
    if (terrno != 0) code = terrno;
×
221
    goto _OVER;
×
222
  }
223

224
  int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
2,843,142✔
225
  if (maxVgId > pVgroup->vgId) {
2,843,142✔
226
    mError("trans:%d, vgroup id %d already in use. maxVgId:%d", pTrans->id, pVgroup->vgId, maxVgId);
×
227
    goto _OVER;
×
228
  }
229

230
  code = 0;
2,843,142✔
231
_OVER:
2,843,142✔
232
  if (pVgroup) mndVgroupActionDelete(pSdb, pVgroup);
2,843,142✔
233
  taosMemoryFreeClear(pRow);
2,843,142✔
234
  TAOS_RETURN(code);
2,843,142✔
235
}
236

237
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup) {
3,283,471✔
238
  mTrace("vgId:%d, perform insert action, row:%p", pVgroup->vgId, pVgroup);
3,283,471✔
239
  return 0;
3,283,471✔
240
}
241

242
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup) {
10,965,147✔
243
  mTrace("vgId:%d, perform delete action, row:%p", pVgroup->vgId, pVgroup);
10,965,147✔
244
  return 0;
10,965,147✔
245
}
246

247
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew) {
3,305,099✔
248
  mTrace("vgId:%d, perform update action, old row:%p new row:%p", pOld->vgId, pOld, pNew);
3,305,099✔
249
  pOld->updateTime = pNew->updateTime;
3,305,099✔
250
  pOld->version = pNew->version;
3,305,099✔
251
  pOld->hashBegin = pNew->hashBegin;
3,305,099✔
252
  pOld->hashEnd = pNew->hashEnd;
3,305,099✔
253
  pOld->replica = pNew->replica;
3,305,099✔
254
  pOld->isTsma = pNew->isTsma;
3,305,099✔
255
  pOld->keepVersion = pNew->keepVersion;
3,305,099✔
256
  pOld->keepVersionTime = pNew->keepVersionTime;
3,305,099✔
257
  for (int32_t i = 0; i < pNew->replica; ++i) {
7,802,483✔
258
    SVnodeGid *pNewGid = &pNew->vnodeGid[i];
4,497,384✔
259
    for (int32_t j = 0; j < pOld->replica; ++j) {
12,598,830✔
260
      SVnodeGid *pOldGid = &pOld->vnodeGid[j];
8,101,446✔
261
      if (pNewGid->dnodeId == pOldGid->dnodeId) {
8,101,446✔
262
        pNewGid->syncState = pOldGid->syncState;
4,269,930✔
263
        pNewGid->syncRestore = pOldGid->syncRestore;
4,269,930✔
264
        pNewGid->syncCanRead = pOldGid->syncCanRead;
4,269,930✔
265
        pNewGid->syncAppliedIndex = pOldGid->syncAppliedIndex;
4,269,930✔
266
        pNewGid->syncCommitIndex = pOldGid->syncCommitIndex;
4,269,930✔
267
        pNewGid->bufferSegmentUsed = pOldGid->bufferSegmentUsed;
4,269,930✔
268
        pNewGid->bufferSegmentSize = pOldGid->bufferSegmentSize;
4,269,930✔
269
      }
270
    }
271
  }
272
  pNew->numOfTables = pOld->numOfTables;
3,305,099✔
273
  pNew->numOfTimeSeries = pOld->numOfTimeSeries;
3,305,099✔
274
  pNew->totalStorage = pOld->totalStorage;
3,305,099✔
275
  pNew->compStorage = pOld->compStorage;
3,305,099✔
276
  pNew->pointsWritten = pOld->pointsWritten;
3,305,099✔
277
  pNew->compact = pOld->compact;
3,305,099✔
278
  memcpy(pOld->vnodeGid, pNew->vnodeGid, (TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA) * sizeof(SVnodeGid));
3,305,099✔
279
  pOld->syncConfChangeVer = pNew->syncConfChangeVer;
3,305,099✔
280
  tstrncpy(pOld->dbName, pNew->dbName, TSDB_DB_FNAME_LEN);
3,305,099✔
281
  return 0;
3,305,099✔
282
}
283

284
SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId) {
104,978,085✔
285
  SSdb   *pSdb = pMnode->pSdb;
104,978,085✔
286
  SVgObj *pVgroup = sdbAcquire(pSdb, SDB_VGROUP, &vgId);
104,978,085✔
287
  if (pVgroup == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
104,978,085✔
288
    terrno = TSDB_CODE_MND_VGROUP_NOT_EXIST;
247,660✔
289
  }
290
  return pVgroup;
104,978,085✔
291
}
292

293
void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) {
104,889,432✔
294
  SSdb *pSdb = pMnode->pSdb;
104,889,432✔
295
  sdbRelease(pSdb, pVgroup);
104,889,432✔
296
}
104,889,432✔
297

298
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
3,064,762✔
299
  SCreateVnodeReq createReq = {0};
3,064,762✔
300
  createReq.vgId = pVgroup->vgId;
3,064,762✔
301
  memcpy(createReq.db, pDb->name, TSDB_DB_FNAME_LEN);
3,064,762✔
302
  createReq.dbUid = pDb->uid;
3,064,762✔
303
  createReq.vgVersion = pVgroup->version;
3,064,762✔
304
  createReq.numOfStables = pDb->cfg.numOfStables;
3,064,762✔
305
  createReq.buffer = pDb->cfg.buffer;
3,064,762✔
306
  createReq.pageSize = pDb->cfg.pageSize;
3,064,762✔
307
  createReq.pages = pDb->cfg.pages;
3,064,762✔
308
  createReq.cacheLastSize = pDb->cfg.cacheLastSize;
3,064,762✔
309
  createReq.daysPerFile = pDb->cfg.daysPerFile;
3,064,762✔
310
  createReq.daysToKeep0 = pDb->cfg.daysToKeep0;
3,064,762✔
311
  createReq.daysToKeep1 = pDb->cfg.daysToKeep1;
3,064,762✔
312
  createReq.daysToKeep2 = pDb->cfg.daysToKeep2;
3,064,762✔
313
  createReq.keepTimeOffset = pDb->cfg.keepTimeOffset;
3,064,762✔
314
  createReq.ssChunkSize = pDb->cfg.ssChunkSize;
3,064,762✔
315
  createReq.ssKeepLocal = pDb->cfg.ssKeepLocal;
3,064,762✔
316
  createReq.ssCompact = pDb->cfg.ssCompact;
3,064,762✔
317
  createReq.minRows = pDb->cfg.minRows;
3,064,762✔
318
  createReq.maxRows = pDb->cfg.maxRows;
3,064,762✔
319
  createReq.walFsyncPeriod = pDb->cfg.walFsyncPeriod;
3,064,762✔
320
  createReq.walLevel = pDb->cfg.walLevel;
3,064,762✔
321
  createReq.precision = pDb->cfg.precision;
3,064,762✔
322
  createReq.compression = pDb->cfg.compression;
3,064,762✔
323
  createReq.strict = pDb->cfg.strict;
3,064,762✔
324
  createReq.cacheLast = pDb->cfg.cacheLast;
3,064,762✔
325
  createReq.replica = 0;
3,064,762✔
326
  createReq.learnerReplica = 0;
3,064,762✔
327
  createReq.selfIndex = -1;
3,064,762✔
328
  createReq.learnerSelfIndex = -1;
3,064,762✔
329
  createReq.hashBegin = pVgroup->hashBegin;
3,064,762✔
330
  createReq.hashEnd = pVgroup->hashEnd;
3,064,762✔
331
  createReq.hashMethod = pDb->cfg.hashMethod;
3,064,762✔
332
  createReq.numOfRetensions = pDb->cfg.numOfRetensions;
3,064,762✔
333
  createReq.pRetensions = pDb->cfg.pRetensions;
3,064,762✔
334
  createReq.isTsma = pVgroup->isTsma;
3,064,762✔
335
  createReq.pTsma = pVgroup->pTsma;
3,064,762✔
336
  createReq.walRetentionPeriod = pDb->cfg.walRetentionPeriod;
3,064,762✔
337
  createReq.walRetentionSize = pDb->cfg.walRetentionSize;
3,064,762✔
338
  createReq.walRollPeriod = pDb->cfg.walRollPeriod;
3,064,762✔
339
  createReq.walSegmentSize = pDb->cfg.walSegmentSize;
3,064,762✔
340
  createReq.sstTrigger = pDb->cfg.sstTrigger;
3,064,762✔
341
  createReq.hashPrefix = pDb->cfg.hashPrefix;
3,064,762✔
342
  createReq.hashSuffix = pDb->cfg.hashSuffix;
3,064,762✔
343
  createReq.tsdbPageSize = pDb->cfg.tsdbPageSize;
3,064,762✔
344
  createReq.changeVersion = ++(pVgroup->syncConfChangeVer);
3,064,762✔
345
  // createReq.encryptAlgorithm = pDb->cfg.encryptAlgorithm;
346
  memset(createReq.encryptAlgrName, 0, TSDB_ENCRYPT_ALGR_NAME_LEN);
3,064,762✔
347
  if (pDb->cfg.encryptAlgorithm > 0) {
3,064,762✔
348
    mndGetEncryptOsslAlgrNameById(pMnode, pDb->cfg.encryptAlgorithm, createReq.encryptAlgrName);
1,660✔
349
  }
350
  int32_t code = 0;
3,064,762✔
351

352
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
7,392,952✔
353
    SReplica *pReplica = NULL;
4,328,190✔
354

355
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
4,328,190✔
356
      pReplica = &createReq.replicas[createReq.replica];
4,221,177✔
357
    } else {
358
      pReplica = &createReq.learnerReplicas[createReq.learnerReplica];
107,013✔
359
    }
360

361
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
4,328,190✔
362
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
4,328,190✔
363
    if (pVgidDnode == NULL) {
4,328,190✔
364
      return NULL;
×
365
    }
366

367
    pReplica->id = pVgidDnode->id;
4,328,190✔
368
    pReplica->port = pVgidDnode->port;
4,328,190✔
369
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
4,328,190✔
370
    mndReleaseDnode(pMnode, pVgidDnode);
4,328,190✔
371

372
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
4,328,190✔
373
      if (pDnode->id == pVgid->dnodeId) {
4,221,177✔
374
        createReq.selfIndex = createReq.replica;
2,957,749✔
375
      }
376
    } else {
377
      if (pDnode->id == pVgid->dnodeId) {
107,013✔
378
        createReq.learnerSelfIndex = createReq.learnerReplica;
107,013✔
379
      }
380
    }
381

382
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
4,328,190✔
383
      createReq.replica++;
4,221,177✔
384
    } else {
385
      createReq.learnerReplica++;
107,013✔
386
    }
387
  }
388

389
  if (createReq.selfIndex == -1 && createReq.learnerSelfIndex == -1) {
3,064,762✔
390
    terrno = TSDB_CODE_APP_ERROR;
×
391
    return NULL;
×
392
  }
393

394
  createReq.changeVersion = pVgroup->syncConfChangeVer;
3,064,762✔
395

396
  mInfo(
3,064,762✔
397
      "vgId:%d, build create vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d "
398
      "changeVersion:%d",
399
      createReq.vgId, createReq.replica, createReq.selfIndex, createReq.learnerReplica, createReq.learnerSelfIndex,
400
      createReq.strict, createReq.changeVersion);
401
  for (int32_t i = 0; i < createReq.replica; ++i) {
7,285,939✔
402
    mInfo("vgId:%d, replica:%d ep:%s:%u", createReq.vgId, i, createReq.replicas[i].fqdn, createReq.replicas[i].port);
4,221,177✔
403
  }
404
  for (int32_t i = 0; i < createReq.learnerReplica; ++i) {
3,171,775✔
405
    mInfo("vgId:%d, replica:%d ep:%s:%u", createReq.vgId, i, createReq.learnerReplicas[i].fqdn,
107,013✔
406
          createReq.learnerReplicas[i].port);
407
  }
408

409
  int32_t contLen = tSerializeSCreateVnodeReq(NULL, 0, &createReq);
3,064,762✔
410
  if (contLen < 0) {
3,064,762✔
411
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
412
    return NULL;
×
413
  }
414

415
  void *pReq = taosMemoryMalloc(contLen);
3,064,762✔
416
  if (pReq == NULL) {
3,064,762✔
417
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
418
    return NULL;
×
419
  }
420

421
  code = tSerializeSCreateVnodeReq(pReq, contLen, &createReq);
3,064,762✔
422
  if (code < 0) {
3,064,762✔
423
    terrno = TSDB_CODE_APP_ERROR;
×
424
    taosMemoryFree(pReq);
×
425
    mError("vgId:%d, failed to serialize create vnode req,since %s", createReq.vgId, terrstr());
×
426
    return NULL;
×
427
  }
428
  *pContLen = contLen;
3,064,762✔
429
  return pReq;
3,064,762✔
430
}
431

432
static void *mndBuildAlterVnodeConfigReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
212,040✔
433
  SAlterVnodeConfigReq alterReq = {0};
212,040✔
434
  alterReq.vgVersion = pVgroup->version;
212,040✔
435
  alterReq.buffer = pDb->cfg.buffer;
212,040✔
436
  alterReq.pageSize = pDb->cfg.pageSize;
212,040✔
437
  alterReq.pages = pDb->cfg.pages;
212,040✔
438
  alterReq.cacheLastSize = pDb->cfg.cacheLastSize;
212,040✔
439
  alterReq.daysPerFile = pDb->cfg.daysPerFile;
212,040✔
440
  alterReq.daysToKeep0 = pDb->cfg.daysToKeep0;
212,040✔
441
  alterReq.daysToKeep1 = pDb->cfg.daysToKeep1;
212,040✔
442
  alterReq.daysToKeep2 = pDb->cfg.daysToKeep2;
212,040✔
443
  alterReq.keepTimeOffset = pDb->cfg.keepTimeOffset;
212,040✔
444
  alterReq.walFsyncPeriod = pDb->cfg.walFsyncPeriod;
212,040✔
445
  alterReq.walLevel = pDb->cfg.walLevel;
212,040✔
446
  alterReq.strict = pDb->cfg.strict;
212,040✔
447
  alterReq.cacheLast = pDb->cfg.cacheLast;
212,040✔
448
  alterReq.sttTrigger = pDb->cfg.sstTrigger;
212,040✔
449
  alterReq.minRows = pDb->cfg.minRows;
212,040✔
450
  alterReq.walRetentionPeriod = pDb->cfg.walRetentionPeriod;
212,040✔
451
  alterReq.walRetentionSize = pDb->cfg.walRetentionSize;
212,040✔
452
  alterReq.ssKeepLocal = pDb->cfg.ssKeepLocal;
212,040✔
453
  alterReq.ssCompact = pDb->cfg.ssCompact;
212,040✔
454

455
  mInfo("vgId:%d, build alter vnode config req", pVgroup->vgId);
212,040✔
456
  int32_t contLen = tSerializeSAlterVnodeConfigReq(NULL, 0, &alterReq);
212,040✔
457
  if (contLen < 0) {
212,040✔
458
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
459
    return NULL;
×
460
  }
461
  contLen += sizeof(SMsgHead);
212,040✔
462

463
  void *pReq = taosMemoryMalloc(contLen);
212,040✔
464
  if (pReq == NULL) {
212,040✔
465
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
466
    return NULL;
×
467
  }
468

469
  SMsgHead *pHead = pReq;
212,040✔
470
  pHead->contLen = htonl(contLen);
212,040✔
471
  pHead->vgId = htonl(pVgroup->vgId);
212,040✔
472

473
  if (tSerializeSAlterVnodeConfigReq((char *)pReq + sizeof(SMsgHead), contLen, &alterReq) < 0) {
212,040✔
474
    taosMemoryFree(pReq);
×
475
    mError("vgId:%d, failed to serialize alter vnode config req,since %s", pVgroup->vgId, terrstr());
×
476
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
477
    return NULL;
×
478
  }
479
  *pContLen = contLen;
212,040✔
480
  return pReq;
212,040✔
481
}
482

483
static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId,
791,722✔
484
                                          int32_t *pContLen) {
485
  SAlterVnodeReplicaReq alterReq = {
1,583,444✔
486
      .vgId = pVgroup->vgId,
791,722✔
487
      .strict = pDb->cfg.strict,
791,722✔
488
      .replica = 0,
489
      .learnerReplica = 0,
490
      .selfIndex = -1,
491
      .learnerSelfIndex = -1,
492
      .changeVersion = ++(pVgroup->syncConfChangeVer),
1,583,444✔
493
  };
494

495
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
3,229,804✔
496
    SReplica *pReplica = NULL;
2,438,082✔
497

498
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
2,438,082✔
499
      pReplica = &alterReq.replicas[alterReq.replica];
2,250,173✔
500
      alterReq.replica++;
2,250,173✔
501
    } else {
502
      pReplica = &alterReq.learnerReplicas[alterReq.learnerReplica];
187,909✔
503
      alterReq.learnerReplica++;
187,909✔
504
    }
505

506
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
2,438,082✔
507
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
2,438,082✔
508
    if (pVgidDnode == NULL) return NULL;
2,438,082✔
509

510
    pReplica->id = pVgidDnode->id;
2,438,082✔
511
    pReplica->port = pVgidDnode->port;
2,438,082✔
512
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
2,438,082✔
513
    mndReleaseDnode(pMnode, pVgidDnode);
2,438,082✔
514

515
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
2,438,082✔
516
      if (dnodeId == pVgid->dnodeId) {
2,250,173✔
517
        alterReq.selfIndex = v;
791,722✔
518
      }
519
    } else {
520
      if (dnodeId == pVgid->dnodeId) {
187,909✔
521
        alterReq.learnerSelfIndex = v;
×
522
      }
523
    }
524
  }
525

526
  mInfo(
791,722✔
527
      "vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d "
528
      "changeVersion:%d",
529
      alterReq.vgId, alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica, alterReq.learnerSelfIndex,
530
      alterReq.strict, alterReq.changeVersion);
531
  for (int32_t i = 0; i < alterReq.replica; ++i) {
3,041,895✔
532
    mInfo("vgId:%d, replica:%d ep:%s:%u", alterReq.vgId, i, alterReq.replicas[i].fqdn, alterReq.replicas[i].port);
2,250,173✔
533
  }
534
  for (int32_t i = 0; i < alterReq.learnerReplica; ++i) {
979,631✔
535
    mInfo("vgId:%d, learnerReplica:%d ep:%s:%u", alterReq.vgId, i, alterReq.learnerReplicas[i].fqdn,
187,909✔
536
          alterReq.learnerReplicas[i].port);
537
  }
538

539
  if (alterReq.selfIndex == -1 && alterReq.learnerSelfIndex == -1) {
791,722✔
540
    terrno = TSDB_CODE_APP_ERROR;
×
541
    return NULL;
×
542
  }
543

544
  int32_t contLen = tSerializeSAlterVnodeReplicaReq(NULL, 0, &alterReq);
791,722✔
545
  if (contLen < 0) {
791,722✔
546
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
547
    return NULL;
×
548
  }
549

550
  void *pReq = taosMemoryMalloc(contLen);
791,722✔
551
  if (pReq == NULL) {
791,722✔
552
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
553
    return NULL;
×
554
  }
555

556
  if (tSerializeSAlterVnodeReplicaReq(pReq, contLen, &alterReq) < 0) {
791,722✔
557
    mError("vgId:%d, failed to serialize alter vnode req,since %s", alterReq.vgId, terrstr());
×
558
    taosMemoryFree(pReq);
×
559
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
560
    return NULL;
×
561
  }
562
  *pContLen = contLen;
791,722✔
563
  return pReq;
791,722✔
564
}
565

566
static void *mndBuildCheckLearnCatchupReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId,
×
567
                                          int32_t *pContLen) {
568
  SCheckLearnCatchupReq req = {
×
569
      .vgId = pVgroup->vgId,
×
570
      .strict = pDb->cfg.strict,
×
571
      .replica = 0,
572
      .learnerReplica = 0,
573
      .selfIndex = -1,
574
      .learnerSelfIndex = -1,
575
  };
576

577
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
×
578
    SReplica *pReplica = NULL;
×
579

580
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
581
      pReplica = &req.replicas[req.replica];
×
582
      req.replica++;
×
583
    } else {
584
      pReplica = &req.learnerReplicas[req.learnerReplica];
×
585
      req.learnerReplica++;
×
586
    }
587

588
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
×
589
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
×
590
    if (pVgidDnode == NULL) return NULL;
×
591

592
    pReplica->id = pVgidDnode->id;
×
593
    pReplica->port = pVgidDnode->port;
×
594
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
×
595
    mndReleaseDnode(pMnode, pVgidDnode);
×
596

597
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
598
      if (dnodeId == pVgid->dnodeId) {
×
599
        req.selfIndex = v;
×
600
      }
601
    } else {
602
      if (dnodeId == pVgid->dnodeId) {
×
603
        req.learnerSelfIndex = v;
×
604
      }
605
    }
606
  }
607

608
  mInfo("vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d",
×
609
        req.vgId, req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict);
610
  for (int32_t i = 0; i < req.replica; ++i) {
×
611
    mInfo("vgId:%d, replica:%d ep:%s:%u", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port);
×
612
  }
613
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
×
614
    mInfo("vgId:%d, learnerReplica:%d ep:%s:%u", req.vgId, i, req.learnerReplicas[i].fqdn, req.learnerReplicas[i].port);
×
615
  }
616

617
  if (req.selfIndex == -1 && req.learnerSelfIndex == -1) {
×
618
    terrno = TSDB_CODE_APP_ERROR;
×
619
    return NULL;
×
620
  }
621

622
  int32_t contLen = tSerializeSAlterVnodeReplicaReq(NULL, 0, &req);
×
623
  if (contLen < 0) {
×
624
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
625
    return NULL;
×
626
  }
627

628
  void *pReq = taosMemoryMalloc(contLen);
×
629
  if (pReq == NULL) {
×
630
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
631
    return NULL;
×
632
  }
633

634
  if (tSerializeSAlterVnodeReplicaReq(pReq, contLen, &req) < 0) {
×
635
    mError("vgId:%d, failed to serialize alter vnode req,since %s", req.vgId, terrstr());
×
636
    taosMemoryFree(pReq);
×
637
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
638
    return NULL;
×
639
  }
640
  *pContLen = contLen;
×
641
  return pReq;
×
642
}
643

644
static void *mndBuildDisableVnodeWriteReq(SMnode *pMnode, SDbObj *pDb, int32_t vgId, int32_t *pContLen) {
23,464✔
645
  SDisableVnodeWriteReq disableReq = {
23,464✔
646
      .vgId = vgId,
647
      .disable = 1,
648
  };
649

650
  mInfo("vgId:%d, build disable vnode write req", vgId);
23,464✔
651
  int32_t contLen = tSerializeSDisableVnodeWriteReq(NULL, 0, &disableReq);
23,464✔
652
  if (contLen < 0) {
23,464✔
653
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
654
    return NULL;
×
655
  }
656

657
  void *pReq = taosMemoryMalloc(contLen);
23,464✔
658
  if (pReq == NULL) {
23,464✔
659
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
660
    return NULL;
×
661
  }
662

663
  if (tSerializeSDisableVnodeWriteReq(pReq, contLen, &disableReq) < 0) {
23,464✔
664
    mError("vgId:%d, failed to serialize disable vnode write req,since %s", vgId, terrstr());
×
665
    taosMemoryFree(pReq);
×
666
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
667
    return NULL;
×
668
  }
669
  *pContLen = contLen;
23,464✔
670
  return pReq;
23,464✔
671
}
672

673
static void *mndBuildAlterVnodeHashRangeReq(SMnode *pMnode, int32_t srcVgId, SVgObj *pVgroup, int32_t *pContLen) {
23,464✔
674
  SAlterVnodeHashRangeReq alterReq = {
46,928✔
675
      .srcVgId = srcVgId,
676
      .dstVgId = pVgroup->vgId,
23,464✔
677
      .hashBegin = pVgroup->hashBegin,
23,464✔
678
      .hashEnd = pVgroup->hashEnd,
23,464✔
679
      .changeVersion = ++(pVgroup->syncConfChangeVer),
46,928✔
680
  };
681

682
  mInfo("vgId:%d, build alter vnode hashrange req, dstVgId:%d, hashrange:[%u, %u]", srcVgId, pVgroup->vgId,
23,464✔
683
        pVgroup->hashBegin, pVgroup->hashEnd);
684
  int32_t contLen = tSerializeSAlterVnodeHashRangeReq(NULL, 0, &alterReq);
23,464✔
685
  if (contLen < 0) {
23,464✔
686
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
687
    return NULL;
×
688
  }
689

690
  void *pReq = taosMemoryMalloc(contLen);
23,464✔
691
  if (pReq == NULL) {
23,464✔
692
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
693
    return NULL;
×
694
  }
695

696
  if (tSerializeSAlterVnodeHashRangeReq(pReq, contLen, &alterReq) < 0) {
23,464✔
697
    mError("vgId:%d, failed to serialize alter vnode hashrange req,since %s", srcVgId, terrstr());
×
698
    taosMemoryFree(pReq);
×
699
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
700
    return NULL;
×
701
  }
702
  *pContLen = contLen;
23,464✔
703
  return pReq;
23,464✔
704
}
705

706
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
4,562,074✔
707
  SDropVnodeReq dropReq = {0};
4,562,074✔
708
  dropReq.dnodeId = pDnode->id;
4,562,074✔
709
  dropReq.vgId = pVgroup->vgId;
4,562,074✔
710
  memcpy(dropReq.db, pDb->name, TSDB_DB_FNAME_LEN);
4,562,074✔
711
  dropReq.dbUid = pDb->uid;
4,562,074✔
712

713
  mInfo("vgId:%d, build drop vnode req", dropReq.vgId);
4,562,074✔
714
  int32_t contLen = tSerializeSDropVnodeReq(NULL, 0, &dropReq);
4,562,074✔
715
  if (contLen < 0) {
4,562,074✔
716
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
717
    return NULL;
×
718
  }
719

720
  void *pReq = taosMemoryMalloc(contLen);
4,562,074✔
721
  if (pReq == NULL) {
4,562,074✔
722
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
723
    return NULL;
×
724
  }
725

726
  if (tSerializeSDropVnodeReq(pReq, contLen, &dropReq) < 0) {
4,562,074✔
727
    mError("vgId:%d, failed to serialize drop vnode req,since %s", dropReq.vgId, terrstr());
×
728
    taosMemoryFree(pReq);
×
729
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
730
    return NULL;
×
731
  }
732
  *pContLen = contLen;
4,562,074✔
733
  return pReq;
4,562,074✔
734
}
735

736
static bool mndResetDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
2,018,795✔
737
  SDnodeObj *pDnode = pObj;
2,018,795✔
738
  pDnode->numOfVnodes = 0;
2,018,795✔
739
  pDnode->numOfOtherNodes = 0;
2,018,795✔
740
  return true;
2,018,795✔
741
}
742

743
static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
2,018,795✔
744
  SDnodeObj *pDnode = pObj;
2,018,795✔
745
  SArray    *pArray = p1;
2,018,795✔
746
  int32_t    exceptDnodeId = *(int32_t *)p2;
2,018,795✔
747
  SArray    *dnodeList = p3;
2,018,795✔
748

749
  if (exceptDnodeId == pDnode->id) {
2,018,795✔
750
    return true;
8,425✔
751
  }
752

753
  if (dnodeList != NULL) {
2,010,370✔
754
    int32_t dnodeListSize = taosArrayGetSize(dnodeList);
72,675✔
755
    if (dnodeListSize > 0) {
72,675✔
756
      bool inDnodeList = false;
72,675✔
757
      for (int32_t index = 0; index < dnodeListSize; ++index) {
237,150✔
758
        int32_t dnodeId = *(int32_t *)taosArrayGet(dnodeList, index);
164,475✔
759
        if (pDnode->id == dnodeId) {
164,475✔
760
          inDnodeList = true;
32,895✔
761
        }
762
      }
763
      if (!inDnodeList) {
72,675✔
764
        return true;
39,780✔
765
      }
766
    } else {
767
      return true;  // TS-6191
×
768
    }
769
  }
770

771
  int64_t curMs = taosGetTimestampMs();
1,970,590✔
772
  bool    online = mndIsDnodeOnline(pDnode, curMs);
1,970,590✔
773
  bool    isMnode = mndIsMnode(pMnode, pDnode->id);
1,970,590✔
774
  pDnode->numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
1,970,590✔
775
  pDnode->memUsed = mndGetVnodesMemory(pMnode, pDnode->id);
1,970,590✔
776

777
  mInfo("dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d online:%d memory avail:%" PRId64 " used:%" PRId64, pDnode->id,
1,970,590✔
778
        pDnode->numOfVnodes, pDnode->numOfSupportVnodes, isMnode, online, pDnode->memAvail, pDnode->memUsed);
779

780
  if (isMnode) {
1,970,590✔
781
    pDnode->numOfOtherNodes++;
1,432,615✔
782
  }
783

784
  if (online && pDnode->numOfSupportVnodes > 0) {
1,970,590✔
785
    if (taosArrayPush(pArray, pDnode) == NULL) return false;
1,922,744✔
786
  }
787
  return true;
1,970,590✔
788
}
789

790
static bool isDnodeInList(SArray *dnodeList, int32_t dnodeId) {
×
791
  int32_t dnodeListSize = taosArrayGetSize(dnodeList);
×
792
  for (int32_t i = 0; i < dnodeListSize; ++i) {
×
793
    int32_t id = *(int32_t *)TARRAY_GET_ELEM(dnodeList, i);
×
794
    if (id == dnodeId) {
×
795
      return true;
×
796
    }
797
  }
798
  return false;
×
799
}
800

801
#ifdef TD_ENTERPRISE
802
static float mndGetDnodeScore1(SDnodeObj *pDnode, int32_t additionDnodes, float ratio) {
×
803
  float totalDnodes = pDnode->numOfVnodes + (float)pDnode->numOfOtherNodes * ratio + additionDnodes;
×
804
  float result = totalDnodes / pDnode->numOfSupportVnodes;
×
805
  return pDnode->numOfVnodes > 0 ? -result : result;
×
806
}
807

808
static int32_t mndCompareDnodeVnodes1(SDnodeObj *pDnode1, SDnodeObj *pDnode2) {
×
809
  float d1Score = mndGetDnodeScore1(pDnode1, 0, 0.9);
×
810
  float d2Score = mndGetDnodeScore1(pDnode2, 0, 0.9);
×
811
  if (d1Score == d2Score) {
×
812
    if (pDnode1->id == pDnode2->id) {
×
813
      return 0;
×
814
    }
815
    return pDnode1->id > pDnode2->id ? 1 : -1;
×
816
  }
817
  return d1Score > d2Score ? 1 : -1;
×
818
}
819

820
static bool mndBuildDnodesListFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
×
821
  SDnodeObj *pDnode = pObj;
×
822
  SArray    *pArray = p1;
×
823

824
  bool isMnode = mndIsMnode(pMnode, pDnode->id);
×
825
  pDnode->numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
×
826

827
  if (isMnode) {
×
828
    pDnode->numOfOtherNodes++;
×
829
  }
830

831
  if (pDnode->numOfSupportVnodes > 0) {
×
832
    if (taosArrayPush(pArray, pDnode) == NULL) return false;
×
833
  }
834
  return true;
×
835
}
836

837
// TS-6191
838
static int32_t mndBuildNodesCheckDualReplica(SMnode *pMnode, int32_t nDnodes, SArray *dnodeList, SArray **ppDnodeList) {
1,362,193✔
839
  int32_t code = 0;
1,362,193✔
840
  if (!grantCheckDualReplicaDnodes(pMnode)) {
1,362,193✔
841
    TAOS_RETURN(code);
1,362,193✔
842
  }
843
  SSdb   *pSdb = pMnode->pSdb;
×
844
  SArray *pArray = taosArrayInit(nDnodes, sizeof(SDnodeObj));
×
845
  if (pArray == NULL) {
×
846
    TAOS_RETURN(code = terrno);
×
847
  }
848
  *ppDnodeList = pArray;
×
849

850
  sdbTraverse(pSdb, SDB_DNODE, mndResetDnodesArrayFp, NULL, NULL, NULL);
×
851
  sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesListFp, pArray, NULL, NULL);
×
852

853
  int32_t arrSize = taosArrayGetSize(pArray);
×
854
  if (arrSize <= 0) {
×
855
    TAOS_RETURN(code);
×
856
  }
857
  if (arrSize > 1) taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes1);
×
858

859
  int32_t dnodeListSize = taosArrayGetSize(dnodeList);
×
860
  if (dnodeListSize <= 0) {
×
861
    if (arrSize > 2) taosArrayRemoveBatch(pArray, 2, arrSize - 2, NULL);
×
862
  } else {
863
    int32_t nDnodesWithVnodes = 0;
×
864
    for (int32_t i = 0; i < arrSize; ++i) {
×
865
      SDnodeObj *pDnode = TARRAY_GET_ELEM(pArray, i);
×
866
      if (pDnode->numOfVnodes <= 0) {
×
867
        break;
×
868
      }
869
      ++nDnodesWithVnodes;
×
870
    }
871
    int32_t dnodeId = -1;
×
872
    if (nDnodesWithVnodes == 1) {
×
873
      dnodeId = ((SDnodeObj *)TARRAY_GET_ELEM(pArray, 0))->id;
×
874
    } else if (nDnodesWithVnodes >= 2) {
×
875
      // must select the dnodes from the 1st 2 dnodes
876
      taosArrayRemoveBatch(pArray, 2, arrSize - 2, NULL);
×
877
    }
878
    for (int32_t i = 0; i < TARRAY_SIZE(pArray);) {
×
879
      SDnodeObj *pDnode = taosArrayGet(pArray, i);
×
880
      if (!isDnodeInList(dnodeList, pDnode->id)) {
×
881
        taosArrayRemove(pArray, i);
×
882
        continue;
×
883
      }
884
      ++i;
×
885
    }
886
    if (nDnodesWithVnodes == 1) {
×
887
      SDnodeObj *pDnode = taosArrayGet(pArray, 0);
×
888
      if (pDnode && (pDnode->id != dnodeId)) {  // the first dnode is not in dnodeList, remove the last element
×
889
        taosArrayRemove(pArray, taosArrayGetSize(pArray) - 1);
×
890
      }
891
    }
892
  }
893

894
  TAOS_RETURN(code);
×
895
}
896
#endif
897

898
SArray *mndBuildDnodesArray(SMnode *pMnode, int32_t exceptDnodeId, SArray *dnodeList) {
1,362,193✔
899
  SSdb   *pSdb = pMnode->pSdb;
1,362,193✔
900
  int32_t numOfDnodes = mndGetDnodeSize(pMnode);
1,362,193✔
901
  SArray *tDnodeList = NULL;
1,362,193✔
902
  SArray *pDnodeList = NULL;
1,362,193✔
903

904
  SArray *pArray = taosArrayInit(numOfDnodes, sizeof(SDnodeObj));
1,362,193✔
905
  if (pArray == NULL) {
1,362,193✔
906
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
907
    return NULL;
×
908
  }
909
  if (taosArrayGetSize(dnodeList) > 0) {
1,362,193✔
910
    tDnodeList = dnodeList;
14,535✔
911
  }
912
#ifdef TD_ENTERPRISE
913
  if (0 != mndBuildNodesCheckDualReplica(pMnode, numOfDnodes, tDnodeList, &pDnodeList)) {
1,362,193✔
914
    taosArrayDestroy(pArray);
×
915
    return NULL;
×
916
  }
917
#endif
918
  sdbTraverse(pSdb, SDB_DNODE, mndResetDnodesArrayFp, NULL, NULL, NULL);
1,362,193✔
919
  sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesArrayFp, pArray, &exceptDnodeId, pDnodeList ? pDnodeList : tDnodeList);
1,362,193✔
920

921
  mDebug("build %d dnodes array", (int32_t)taosArrayGetSize(pArray));
1,362,193✔
922
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
3,284,937✔
923
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
1,922,744✔
924
    mDebug("dnode:%d, vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes);
1,922,744✔
925
  }
926
  taosArrayDestroy(pDnodeList);
1,362,193✔
927
  return pArray;
1,362,193✔
928
}
929

930
static int32_t mndCompareDnodeId(int32_t *dnode1Id, int32_t *dnode2Id) {
×
931
  if (*dnode1Id == *dnode2Id) {
×
932
    return 0;
×
933
  }
934
  return *dnode1Id > *dnode2Id ? 1 : -1;
×
935
}
936

937
static float mndGetDnodeScore(SDnodeObj *pDnode, int32_t additionDnodes, float ratio) {
10,973,173✔
938
  float totalDnodes = pDnode->numOfVnodes + (float)pDnode->numOfOtherNodes * ratio + additionDnodes;
10,973,173✔
939
  return totalDnodes / pDnode->numOfSupportVnodes;
10,973,173✔
940
}
941

942
static int32_t mndCompareDnodeVnodes(SDnodeObj *pDnode1, SDnodeObj *pDnode2) {
3,565,293✔
943
  float d1Score = mndGetDnodeScore(pDnode1, 0, 0.9);
3,565,293✔
944
  float d2Score = mndGetDnodeScore(pDnode2, 0, 0.9);
3,565,293✔
945
  if (d1Score == d2Score) {
3,565,293✔
946
    return 0;
1,326,140✔
947
  }
948
  return d1Score > d2Score ? 1 : -1;
2,239,153✔
949
}
950

951
void mndSortVnodeGid(SVgObj *pVgroup) {
2,621,826✔
952
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
5,625,164✔
953
    for (int32_t j = 0; j < pVgroup->replica - 1 - i; ++j) {
3,571,823✔
954
      if (pVgroup->vnodeGid[j].dnodeId > pVgroup->vnodeGid[j + 1].dnodeId) {
568,485✔
955
        TSWAP(pVgroup->vnodeGid[j], pVgroup->vnodeGid[j + 1]);
213,079✔
956
      }
957
    }
958
  }
959
}
2,621,826✔
960

961
static int32_t mndGetAvailableDnode(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, SArray *pArray) {
2,593,717✔
962
  mDebug("start to sort %d dnodes", (int32_t)taosArrayGetSize(pArray));
2,593,717✔
963
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
2,593,717✔
964
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
6,344,271✔
965
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
3,750,554✔
966
    mDebug("dnode:%d, score:%f", pDnode->id, mndGetDnodeScore(pDnode, 0, 0.9));
3,750,554✔
967
  }
968

969
  int32_t size = taosArrayGetSize(pArray);
2,593,717✔
970
  if (size < pVgroup->replica) {
2,593,717✔
971
    mError("db:%s, vgId:%d, no enough online dnodes:%d to alloc %d replica", pVgroup->dbName, pVgroup->vgId, size,
5,770✔
972
           pVgroup->replica);
973
    TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_DNODES);
5,770✔
974
  }
975

976
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
5,501,284✔
977
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
2,913,337✔
978
    SDnodeObj *pDnode = taosArrayGet(pArray, v);
2,913,337✔
979
    if (pDnode == NULL) {
2,913,337✔
980
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_DNODES);
×
981
    }
982
    if (pDnode->numOfVnodes >= pDnode->numOfSupportVnodes) {
2,913,337✔
983
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_VNODES);
×
984
    }
985

986
    int64_t vgMem = mndGetVgroupMemory(pMnode, pDb, pVgroup);
2,913,337✔
987
    if (pDnode->memAvail - vgMem - pDnode->memUsed <= 0) {
2,913,337✔
988
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d, avail:%" PRId64 " used:%" PRId64,
×
989
             pVgroup->dbName, pVgroup->vgId, vgMem, pDnode->id, pDnode->memAvail, pDnode->memUsed);
990
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE);
×
991
    } else {
992
      pDnode->memUsed += vgMem;
2,913,337✔
993
    }
994

995
    pVgid->dnodeId = pDnode->id;
2,913,337✔
996
    if (pVgroup->replica == 1) {
2,913,337✔
997
      pVgid->syncState = TAOS_SYNC_STATE_LEADER;
2,422,207✔
998
    } else {
999
      pVgid->syncState = TAOS_SYNC_STATE_FOLLOWER;
491,130✔
1000
    }
1001

1002
    mInfo("db:%s, vgId:%d, vn:%d is alloced, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
2,913,337✔
1003
          pVgroup->dbName, pVgroup->vgId, v, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
1004
    pDnode->numOfVnodes++;
2,913,337✔
1005
  }
1006

1007
  mndSortVnodeGid(pVgroup);
2,587,947✔
1008
  return 0;
2,587,947✔
1009
}
1010

1011
int32_t mndAllocSmaVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup) {
×
1012
  int32_t code = 0;
×
1013
  SArray *pArray = mndBuildDnodesArray(pMnode, 0, NULL);
×
1014
  if (pArray == NULL) {
×
1015
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1016
    if (terrno != 0) code = terrno;
×
1017
    TAOS_RETURN(code);
×
1018
  }
1019

1020
  pVgroup->vgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
×
1021
  pVgroup->isTsma = 1;
×
1022
  pVgroup->createdTime = taosGetTimestampMs();
×
1023
  pVgroup->updateTime = pVgroup->createdTime;
×
1024
  pVgroup->version = 1;
×
1025
  memcpy(pVgroup->dbName, pDb->name, TSDB_DB_FNAME_LEN);
×
1026
  pVgroup->dbUid = pDb->uid;
×
1027
  pVgroup->replica = 1;
×
1028
  pVgroup->keepVersion = -1;  // default: WAL keep version disabled
×
1029
  pVgroup->keepVersionTime = 0;
×
1030

1031
  if (mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray) != 0) return -1;
×
1032
  taosArrayDestroy(pArray);
×
1033

1034
  mInfo("db:%s, sma vgId:%d is alloced", pDb->name, pVgroup->vgId);
×
1035
  return 0;
×
1036
}
1037

1038
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups, SArray *dnodeList) {
1,231,068✔
1039
  int32_t code = -1;
1,231,068✔
1040
  SArray *pArray = NULL;
1,231,068✔
1041
  SVgObj *pVgroups = NULL;
1,231,068✔
1042

1043
  pVgroups = taosMemoryCalloc(pDb->cfg.numOfVgroups, sizeof(SVgObj));
1,231,068✔
1044
  if (pVgroups == NULL) {
1,231,068✔
1045
    code = terrno;
×
1046
    goto _OVER;
×
1047
  }
1048

1049
  pArray = mndBuildDnodesArray(pMnode, 0, dnodeList);
1,231,068✔
1050
  if (pArray == NULL) {
1,231,068✔
1051
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1052
    if (terrno != 0) code = terrno;
×
1053
    goto _OVER;
×
1054
  }
1055

1056
  mInfo("db:%s, total %d dnodes used to create %d vgroups (%d vnodes)", pDb->name, (int32_t)taosArrayGetSize(pArray),
1,231,068✔
1057
        pDb->cfg.numOfVgroups, pDb->cfg.numOfVgroups * pDb->cfg.replications);
1058

1059
  int32_t  allocedVgroups = 0;
1,231,068✔
1060
  int32_t  maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
1,231,068✔
1061
  uint32_t hashMin = 0;
1,231,068✔
1062
  uint32_t hashMax = UINT32_MAX;
1,231,068✔
1063
  uint32_t hashInterval = (hashMax - hashMin) / pDb->cfg.numOfVgroups;
1,231,068✔
1064

1065
  if (maxVgId < 2) maxVgId = 2;
1,231,068✔
1066

1067
  for (uint32_t v = 0; v < pDb->cfg.numOfVgroups; v++) {
3,819,015✔
1068
    SVgObj *pVgroup = &pVgroups[v];
2,593,717✔
1069
    pVgroup->vgId = maxVgId++;
2,593,717✔
1070
    pVgroup->createdTime = taosGetTimestampMs();
2,593,717✔
1071
    pVgroup->updateTime = pVgroups->createdTime;
2,593,717✔
1072
    pVgroup->version = 1;
2,593,717✔
1073
    pVgroup->hashBegin = hashMin + hashInterval * v;
2,593,717✔
1074
    if (v == pDb->cfg.numOfVgroups - 1) {
2,593,717✔
1075
      pVgroup->hashEnd = hashMax;
1,227,881✔
1076
    } else {
1077
      pVgroup->hashEnd = hashMin + hashInterval * (v + 1) - 1;
1,365,836✔
1078
    }
1079

1080
    memcpy(pVgroup->dbName, pDb->name, TSDB_DB_FNAME_LEN);
2,593,717✔
1081
    pVgroup->dbUid = pDb->uid;
2,593,717✔
1082
    pVgroup->replica = pDb->cfg.replications;
2,593,717✔
1083
    pVgroup->keepVersion = -1;  // default: WAL keep version disabled
2,593,717✔
1084
    pVgroup->keepVersionTime = 0;
2,593,717✔
1085

1086
    if ((code = mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray)) != 0) {
2,593,717✔
1087
      goto _OVER;
5,770✔
1088
    }
1089

1090
    allocedVgroups++;
2,587,947✔
1091
  }
1092

1093
  *ppVgroups = pVgroups;
1,225,298✔
1094
  code = 0;
1,225,298✔
1095

1096
  mInfo("db:%s, total %d vgroups is alloced, replica:%d", pDb->name, pDb->cfg.numOfVgroups, pDb->cfg.replications);
1,225,298✔
1097

1098
_OVER:
×
1099
  if (code != 0) taosMemoryFree(pVgroups);
1,231,068✔
1100
  taosArrayDestroy(pArray);
1,231,068✔
1101
  TAOS_RETURN(code);
1,231,068✔
1102
}
1103

1104
SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup) {
26,870,265✔
1105
  SEpSet epset = {0};
26,870,265✔
1106

1107
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
61,734,681✔
1108
    const SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
34,864,416✔
1109
    SDnodeObj       *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
34,864,416✔
1110
    if (pDnode == NULL) continue;
34,864,416✔
1111

1112
    if (pVgid->syncState == TAOS_SYNC_STATE_LEADER || pVgid->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
34,845,857✔
1113
      epset.inUse = epset.numOfEps;
26,516,447✔
1114
    }
1115

1116
    if (addEpIntoEpSet(&epset, pDnode->fqdn, pDnode->port) != 0) {
34,845,857✔
1117
      mWarn("vgId:%d, failed to add ep:%s:%d into epset", pVgroup->vgId, pDnode->fqdn, pDnode->port);
×
1118
    }
1119
    mndReleaseDnode(pMnode, pDnode);
34,845,857✔
1120
  }
1121
  epsetSort(&epset);
26,870,265✔
1122

1123
  return epset;
26,870,265✔
1124
}
1125

1126
SEpSet mndGetVgroupEpsetById(SMnode *pMnode, int32_t vgId) {
678,498✔
1127
  SEpSet epset = {0};
678,498✔
1128

1129
  SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
678,498✔
1130
  if (!pVgroup) return epset;
678,498✔
1131

1132
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
1,428,422✔
1133
    const SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
749,924✔
1134
    SDnodeObj       *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
749,924✔
1135
    if (pDnode == NULL) continue;
749,924✔
1136

1137
    if (pVgid->syncState == TAOS_SYNC_STATE_LEADER || pVgid->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
749,924✔
1138
      epset.inUse = epset.numOfEps;
650,880✔
1139
    }
1140

1141
    if (addEpIntoEpSet(&epset, pDnode->fqdn, pDnode->port) != 0) {
749,924✔
1142
      mWarn("vgId:%d, failed to add ep:%s:%d into epset", pVgroup->vgId, pDnode->fqdn, pDnode->port);
×
1143
    }
1144
    mndReleaseDnode(pMnode, pDnode);
749,924✔
1145
  }
1146

1147
  mndReleaseVgroup(pMnode, pVgroup);
678,498✔
1148
  return epset;
678,498✔
1149
}
1150

1151
static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
314,146✔
1152
  SMnode *pMnode = pReq->info.node;
314,146✔
1153
  SSdb   *pSdb = pMnode->pSdb;
314,146✔
1154
  int32_t numOfRows = 0;
314,146✔
1155
  SVgObj *pVgroup = NULL;
314,146✔
1156
  int32_t cols = 0;
314,146✔
1157
  int64_t curMs = taosGetTimestampMs();
314,146✔
1158
  int32_t code = 0, lino = 0;
314,146✔
1159

1160
  SDbObj *pDb = NULL;
314,146✔
1161
  if (strlen(pShow->db) > 0) {
314,146✔
1162
    pDb = mndAcquireDb(pMnode, pShow->db);
266,529✔
1163
    if (pDb == NULL) {
266,529✔
1164
      return 0;
×
1165
    }
1166
  }
1167

1168
  while (numOfRows < rows) {
1,751,143✔
1169
    pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
1,751,143✔
1170
    if (pShow->pIter == NULL) break;
1,751,143✔
1171

1172
    if (pDb != NULL && pVgroup->dbUid != pDb->uid) {
1,436,997✔
1173
      sdbRelease(pSdb, pVgroup);
453,686✔
1174
      continue;
453,686✔
1175
    }
1176

1177
    cols = 0;
983,311✔
1178
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
983,311✔
1179
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->vgId, false, pVgroup, pShow->pIter, _OVER);
983,311✔
1180

1181
    SName name = {0};
983,311✔
1182
    char  db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
983,311✔
1183
    code = tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB);
983,311✔
1184
    if (code != 0) {
983,311✔
1185
      mError("vgId:%d, failed to set dbName, since %s", pVgroup->vgId, tstrerror(code));
×
1186
      sdbRelease(pSdb, pVgroup);
×
1187
      sdbCancelFetch(pSdb, pShow->pIter);
×
1188
      return code;
×
1189
    }
1190
    (void)tNameGetDbName(&name, varDataVal(db));
983,311✔
1191
    varDataSetLen(db, strlen(varDataVal(db)));
983,311✔
1192

1193
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
983,311✔
1194
    COL_DATA_SET_VAL_GOTO((const char *)db, false, pVgroup, pShow->pIter, _OVER);
983,311✔
1195

1196
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
983,311✔
1197
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->numOfTables, false, pVgroup, pShow->pIter, _OVER);
983,311✔
1198

1199
    // default 3 replica, add 1 replica if move vnode
1200
    for (int32_t i = 0; i < 4; ++i) {
4,916,555✔
1201
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3,933,244✔
1202
      if (i < pVgroup->replica) {
3,933,244✔
1203
        int16_t dnodeId = (int16_t)pVgroup->vnodeGid[i].dnodeId;
2,058,556✔
1204
        COL_DATA_SET_VAL_GOTO((const char *)&dnodeId, false, pVgroup, pShow->pIter, _OVER);
2,058,556✔
1205

1206
        bool       exist = false;
2,058,556✔
1207
        bool       online = false;
2,058,556✔
1208
        SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgroup->vnodeGid[i].dnodeId);
2,058,556✔
1209
        if (pDnode != NULL) {
2,058,556✔
1210
          exist = true;
2,058,556✔
1211
          online = mndIsDnodeOnline(pDnode, curMs);
2,058,556✔
1212
          mndReleaseDnode(pMnode, pDnode);
2,058,556✔
1213
        }
1214

1215
        char buf1[20] = {0};
2,058,556✔
1216
        char role[20] = "offline";
2,058,556✔
1217
        if (!exist) {
2,058,556✔
1218
          tstrncpy(role, "dropping", sizeof(role));
×
1219
        } else if (online) {
2,058,556✔
1220
          char *star = "";
2,031,098✔
1221
          if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEADER ||
2,031,098✔
1222
              pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
1,278,343✔
1223
            if (!pVgroup->vnodeGid[i].syncRestore && !pVgroup->vnodeGid[i].syncCanRead) {
752,755✔
1224
              star = "**";
111,044✔
1225
            } else if (!pVgroup->vnodeGid[i].syncRestore && pVgroup->vnodeGid[i].syncCanRead) {
641,711✔
1226
              star = "*";
×
1227
            } else {
1228
            }
1229
          }
1230
          snprintf(role, sizeof(role), "%s%s", syncStr(pVgroup->vnodeGid[i].syncState), star);
2,031,098✔
1231
          /*
1232
          mInfo("db:%s, learner progress:%d", pDb->name, pVgroup->vnodeGid[i].learnerProgress);
1233

1234
          if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEARNER) {
1235
            if(pVgroup->vnodeGid[i].learnerProgress < 0){
1236
              snprintf(role, sizeof(role), "%s-",
1237
                syncStr(pVgroup->vnodeGid[i].syncState));
1238

1239
            }
1240
            else if(pVgroup->vnodeGid[i].learnerProgress >= 100){
1241
              snprintf(role, sizeof(role), "%s--",
1242
                syncStr(pVgroup->vnodeGid[i].syncState));
1243
            }
1244
            else{
1245
              snprintf(role, sizeof(role), "%s%d",
1246
                syncStr(pVgroup->vnodeGid[i].syncState), pVgroup->vnodeGid[i].learnerProgress);
1247
            }
1248
          }
1249
          else{
1250
            snprintf(role, sizeof(role), "%s%s", syncStr(pVgroup->vnodeGid[i].syncState), star);
1251
          }
1252
          */
1253
        } else {
1254
        }
1255
        STR_WITH_MAXSIZE_TO_VARSTR(buf1, role, pShow->pMeta->pSchemas[cols].bytes);
2,058,556✔
1256

1257
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,058,556✔
1258
        COL_DATA_SET_VAL_GOTO((const char *)buf1, false, pVgroup, pShow->pIter, _OVER);
2,058,556✔
1259

1260
        char applyStr[TSDB_SYNC_APPLY_COMMIT_LEN + 1] = {0};
2,058,556✔
1261
        char buf[TSDB_SYNC_APPLY_COMMIT_LEN + VARSTR_HEADER_SIZE + 1] = {0};
2,058,556✔
1262
        snprintf(applyStr, sizeof(applyStr), "%" PRId64 "/%" PRId64, pVgroup->vnodeGid[i].syncAppliedIndex,
2,058,556✔
1263
                 pVgroup->vnodeGid[i].syncCommitIndex);
2,058,556✔
1264
        STR_WITH_MAXSIZE_TO_VARSTR(buf, applyStr, pShow->pMeta->pSchemas[cols].bytes);
2,058,556✔
1265

1266
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,058,556✔
1267
        COL_DATA_SET_VAL_GOTO((const char *)&buf, false, pVgroup, pShow->pIter, _OVER);
2,058,556✔
1268
      } else {
1269
        colDataSetNULL(pColInfo, numOfRows);
1,874,688✔
1270
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,874,688✔
1271
        colDataSetNULL(pColInfo, numOfRows);
1,874,688✔
1272
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,874,688✔
1273
        colDataSetNULL(pColInfo, numOfRows);
1,874,688✔
1274
      }
1275
    }
1276

1277
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
983,311✔
1278
    int32_t cacheUsage = (int32_t)pVgroup->cacheUsage;
983,311✔
1279
    COL_DATA_SET_VAL_GOTO((const char *)&cacheUsage, false, pVgroup, pShow->pIter, _OVER);
983,311✔
1280

1281
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
983,311✔
1282
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->numOfCachedTables, false, pVgroup, pShow->pIter, _OVER);
983,311✔
1283

1284
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
983,311✔
1285
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->isTsma, false, pVgroup, pShow->pIter, _OVER);
983,311✔
1286

1287
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
983,311✔
1288
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->mountVgId, false, pVgroup, pShow->pIter, _OVER);
983,311✔
1289

1290
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
983,311✔
1291
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->keepVersion, false);
983,311✔
1292
    if (code != 0) {
983,311✔
1293
      mError("vgId:%d, failed to set keepVersion, since %s", pVgroup->vgId, tstrerror(code));
×
1294
      return code;
×
1295
    }
1296

1297
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
983,311✔
1298
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->keepVersionTime, false);
983,311✔
1299
    if (code != 0) {
983,311✔
1300
      mError("vgId:%d, failed to set keepVersionTime, since %s", pVgroup->vgId, tstrerror(code));
×
1301
      return code;
×
1302
    }
1303

1304
    numOfRows++;
983,311✔
1305
    sdbRelease(pSdb, pVgroup);
983,311✔
1306
  }
1307
_OVER:
314,146✔
1308
  if (pDb != NULL) {
314,146✔
1309
    mndReleaseDb(pMnode, pDb);
266,529✔
1310
  }
1311
  if (code != 0) {
314,146✔
1312
    mError("failed to retrieve vgroup info at line %d since %s", lino, tstrerror(code));
×
1313
    TAOS_RETURN(code);
×
1314
  }
1315

1316
  pShow->numOfRows += numOfRows;
314,146✔
1317
  return numOfRows;
314,146✔
1318
}
1319

1320
static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter) {
×
1321
  SSdb *pSdb = pMnode->pSdb;
×
1322
  sdbCancelFetchByType(pSdb, pIter, SDB_VGROUP);
×
1323
}
×
1324

1325
static bool mndGetVnodesNumFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
11,959,712✔
1326
  SVgObj  *pVgroup = pObj;
11,959,712✔
1327
  int32_t  dnodeId = *(int32_t *)p1;
11,959,712✔
1328
  int32_t *pNumOfVnodes = (int32_t *)p2;
11,959,712✔
1329

1330
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
33,745,301✔
1331
    if (pVgroup->vnodeGid[v].dnodeId == dnodeId) {
21,785,589✔
1332
      (*pNumOfVnodes)++;
7,050,701✔
1333
    }
1334
  }
1335

1336
  return true;
11,959,712✔
1337
}
1338

1339
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) {
4,473,029✔
1340
  int32_t numOfVnodes = 0;
4,473,029✔
1341
  sdbTraverse(pMnode->pSdb, SDB_VGROUP, mndGetVnodesNumFp, &dnodeId, &numOfVnodes, NULL);
4,473,029✔
1342
  return numOfVnodes;
4,473,029✔
1343
}
1344

1345
int64_t mndGetVgroupMemory(SMnode *pMnode, SDbObj *pDbInput, SVgObj *pVgroup) {
7,952,969✔
1346
  SDbObj *pDb = pDbInput;
7,952,969✔
1347
  if (pDbInput == NULL) {
7,952,969✔
1348
    pDb = mndAcquireDb(pMnode, pVgroup->dbName);
4,392,888✔
1349
  }
1350

1351
  int64_t vgroupMemroy = 0;
7,952,969✔
1352
  if (pDb != NULL) {
7,952,969✔
1353
    int64_t buffer = (int64_t)pDb->cfg.buffer * 1024 * 1024;
7,952,969✔
1354
    int64_t cache = (int64_t)pDb->cfg.pages * pDb->cfg.pageSize * 1024;
7,952,969✔
1355
    vgroupMemroy = buffer + cache;
7,952,969✔
1356
    int64_t cacheLast = (int64_t)pDb->cfg.cacheLastSize * 1024 * 1024;
7,952,969✔
1357
    if (pDb->cfg.cacheLast > 0) {
7,952,969✔
1358
      vgroupMemroy += cacheLast;
991,844✔
1359
    }
1360
    mDebug("db:%s, vgroup:%d, buffer:%" PRId64 " cache:%" PRId64 " cacheLast:%" PRId64, pDb->name, pVgroup->vgId,
7,952,969✔
1361
           buffer, cache, cacheLast);
1362
  }
1363

1364
  if (pDbInput == NULL) {
7,952,969✔
1365
    mndReleaseDb(pMnode, pDb);
4,392,888✔
1366
  }
1367
  return vgroupMemroy;
7,952,969✔
1368
}
1369

1370
static bool mndGetVnodeMemroyFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
6,259,575✔
1371
  SVgObj  *pVgroup = pObj;
6,259,575✔
1372
  int32_t  dnodeId = *(int32_t *)p1;
6,259,575✔
1373
  int64_t *pVnodeMemory = (int64_t *)p2;
6,259,575✔
1374

1375
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
15,309,307✔
1376
    if (pVgroup->vnodeGid[v].dnodeId == dnodeId) {
9,049,732✔
1377
      *pVnodeMemory += mndGetVgroupMemory(pMnode, NULL, pVgroup);
4,248,087✔
1378
    }
1379
  }
1380

1381
  return true;
6,259,575✔
1382
}
1383

1384
int64_t mndGetVnodesMemory(SMnode *pMnode, int32_t dnodeId) {
1,971,258✔
1385
  int64_t vnodeMemory = 0;
1,971,258✔
1386
  sdbTraverse(pMnode->pSdb, SDB_VGROUP, mndGetVnodeMemroyFp, &dnodeId, &vnodeMemory, NULL);
1,971,258✔
1387
  return vnodeMemory;
1,971,258✔
1388
}
1389

1390
void calculateRstoreFinishTime(double rate, int64_t applyCount, char *restoreStr, size_t restoreStrSize) {
×
1391
  if (rate == 0) {
×
1392
    snprintf(restoreStr, restoreStrSize, "0:0:0");
×
1393
    return;
×
1394
  }
1395

1396
  int64_t costTime = applyCount / rate;
×
1397
  int64_t totalSeconds = costTime / 1000;
×
1398
  int64_t hours = totalSeconds / 3600;
×
1399
  totalSeconds %= 3600;
×
1400
  int64_t minutes = totalSeconds / 60;
×
1401
  int64_t seconds = totalSeconds % 60;
×
1402
  snprintf(restoreStr, restoreStrSize, "%" PRId64 ":%" PRId64 ":%" PRId64, hours, minutes, seconds);
×
1403
}
1404

1405
static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
1,988✔
1406
  SMnode *pMnode = pReq->info.node;
1,988✔
1407
  SSdb   *pSdb = pMnode->pSdb;
1,988✔
1408
  int32_t numOfRows = 0;
1,988✔
1409
  SVgObj *pVgroup = NULL;
1,988✔
1410
  int32_t cols = 0;
1,988✔
1411
  int64_t curMs = taosGetTimestampMs();
1,988✔
1412
  int32_t code = 0;
1,988✔
1413

1414
  while (numOfRows < rows - TSDB_MAX_REPLICA) {
6,215✔
1415
    pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
6,215✔
1416
    if (pShow->pIter == NULL) break;
6,215✔
1417

1418
    for (int32_t i = 0; i < pVgroup->replica && numOfRows < rows; ++i) {
11,295✔
1419
      SVnodeGid       *pGid = &pVgroup->vnodeGid[i];
7,068✔
1420
      SColumnInfoData *pColInfo = NULL;
7,068✔
1421
      cols = 0;
7,068✔
1422

1423
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
7,068✔
1424
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pGid->dnodeId, false);
7,068✔
1425
      if (code != 0) {
7,068✔
1426
        mError("vgId:%d, failed to set dnodeId, since %s", pVgroup->vgId, tstrerror(code));
×
1427
        return code;
×
1428
      }
1429
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
7,068✔
1430
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->vgId, false);
7,068✔
1431
      if (code != 0) {
7,068✔
1432
        mError("vgId:%d, failed to set vgId, since %s", pVgroup->vgId, tstrerror(code));
×
1433
        return code;
×
1434
      }
1435

1436
      // db_name
1437
      const char *dbname = mndGetDbStr(pVgroup->dbName);
7,068✔
1438
      char        b1[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
7,068✔
1439
      if (dbname != NULL) {
7,068✔
1440
        STR_WITH_MAXSIZE_TO_VARSTR(b1, dbname, TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE);
7,068✔
1441
      } else {
1442
        STR_WITH_MAXSIZE_TO_VARSTR(b1, "NULL", TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE);
×
1443
      }
1444
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
7,068✔
1445
      code = colDataSetVal(pColInfo, numOfRows, (const char *)b1, false);
7,068✔
1446
      if (code != 0) {
7,068✔
1447
        mError("vgId:%d, failed to set dbName, since %s", pVgroup->vgId, tstrerror(code));
×
1448
        return code;
×
1449
      }
1450

1451
      // dnode is online?
1452
      SDnodeObj *pDnode = mndAcquireDnode(pMnode, pGid->dnodeId);
7,068✔
1453
      if (pDnode == NULL) {
7,068✔
1454
        mError("failed to acquire dnode. dnodeId:%d", pGid->dnodeId);
×
1455
        break;
×
1456
      }
1457
      bool isDnodeOnline = mndIsDnodeOnline(pDnode, curMs);
7,068✔
1458

1459
      char       buf[20] = {0};
7,068✔
1460
      ESyncState syncState = (isDnodeOnline) ? pGid->syncState : TAOS_SYNC_STATE_OFFLINE;
7,068✔
1461
      STR_TO_VARSTR(buf, syncStr(syncState));
7,068✔
1462
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
7,068✔
1463
      code = colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
7,068✔
1464
      if (code != 0) {
7,068✔
1465
        mError("vgId:%d, failed to set syncState, since %s", pVgroup->vgId, tstrerror(code));
×
1466
        return code;
×
1467
      }
1468

1469
      int64_t roleTimeMs = (isDnodeOnline) ? pGid->roleTimeMs : 0;
7,068✔
1470
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
7,068✔
1471
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&roleTimeMs, false);
7,068✔
1472
      if (code != 0) {
7,068✔
1473
        mError("vgId:%d, failed to set roleTimeMs, since %s", pVgroup->vgId, tstrerror(code));
×
1474
        return code;
×
1475
      }
1476

1477
      int64_t startTimeMs = (isDnodeOnline) ? pGid->startTimeMs : 0;
7,068✔
1478
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
7,068✔
1479
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&startTimeMs, false);
7,068✔
1480
      if (code != 0) {
7,068✔
1481
        mError("vgId:%d, failed to set startTimeMs, since %s", pVgroup->vgId, tstrerror(code));
×
1482
        return code;
×
1483
      }
1484

1485
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
7,068✔
1486
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pGid->syncRestore, false);
7,068✔
1487
      if (code != 0) {
7,068✔
1488
        mError("vgId:%d, failed to set syncRestore, since %s", pVgroup->vgId, tstrerror(code));
×
1489
        return code;
×
1490
      }
1491

1492
      int64_t unappliedCount = pGid->syncCommitIndex - pGid->syncAppliedIndex;
7,068✔
1493
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
7,068✔
1494
      char restoreStr[20] = {0};
7,068✔
1495
      if (unappliedCount > 0) {
7,068✔
1496
        calculateRstoreFinishTime(pGid->appliedRate, unappliedCount, restoreStr, sizeof(restoreStr));
×
1497
      }
1498
      STR_TO_VARSTR(buf, restoreStr);
7,068✔
1499
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&buf, false);
7,068✔
1500
      if (code != 0) {
7,068✔
1501
        mError("vgId:%d, failed to set syncRestore finish time, since %s", pVgroup->vgId, tstrerror(code));
×
1502
        return code;
×
1503
      }
1504

1505
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
7,068✔
1506
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&unappliedCount, false);
7,068✔
1507
      if (code != 0) {
7,068✔
1508
        mError("vgId:%d, failed to set syncRestore, since %s", pVgroup->vgId, tstrerror(code));
×
1509
        return code;
×
1510
      }
1511

1512
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
7,068✔
1513
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pGid->bufferSegmentUsed, false);
7,068✔
1514
      if (code != 0) {
7,068✔
1515
        mError("vgId:%d, failed to set buffer segment used, since %s", pVgroup->vgId, tstrerror(code));
×
1516
        return code;
×
1517
      }
1518

1519
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
7,068✔
1520
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pGid->bufferSegmentSize, false);
7,068✔
1521
      if (code != 0) {
7,068✔
1522
        mError("vgId:%d, failed to set buffer segment size, since %s", pVgroup->vgId, tstrerror(code));
×
1523
        return code;
×
1524
      }
1525

1526
      numOfRows++;
7,068✔
1527
      sdbRelease(pSdb, pDnode);
7,068✔
1528
    }
1529

1530
    sdbRelease(pSdb, pVgroup);
4,227✔
1531
  }
1532

1533
  pShow->numOfRows += numOfRows;
1,988✔
1534
  return numOfRows;
1,988✔
1535
}
1536

1537
static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter) {
×
1538
  SSdb *pSdb = pMnode->pSdb;
×
1539
  sdbCancelFetchByType(pSdb, pIter, SDB_VGROUP);
×
1540
}
×
1541

1542
static int32_t mndAddVnodeToVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray) {
80,484✔
1543
  int32_t code = 0;
80,484✔
1544
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
80,484✔
1545
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
336,233✔
1546
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
255,749✔
1547
    mInfo("trans:%d, dnode:%d, equivalent vnodes:%d others:%d", pTrans->id, pDnode->id, pDnode->numOfVnodes,
255,749✔
1548
          pDnode->numOfOtherNodes);
1549
  }
1550

1551
  SVnodeGid *pVgid = &pVgroup->vnodeGid[pVgroup->replica];
80,484✔
1552
  for (int32_t d = 0; d < taosArrayGetSize(pArray); ++d) {
103,999✔
1553
    SDnodeObj *pDnode = taosArrayGet(pArray, d);
103,037✔
1554

1555
    bool used = false;
103,037✔
1556
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
241,306✔
1557
      if (pDnode->id == pVgroup->vnodeGid[vn].dnodeId) {
161,784✔
1558
        used = true;
23,515✔
1559
        break;
23,515✔
1560
      }
1561
    }
1562
    if (used) continue;
103,037✔
1563

1564
    if (pDnode == NULL) {
79,522✔
1565
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_DNODES);
×
1566
    }
1567
    if (pDnode->numOfVnodes >= pDnode->numOfSupportVnodes) {
79,522✔
1568
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_VNODES);
×
1569
    }
1570

1571
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
79,522✔
1572
    if (pDnode->memAvail - vgMem - pDnode->memUsed <= 0) {
79,522✔
1573
      mError("trans:%d, db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
1574
             pTrans->id, pVgroup->dbName, pVgroup->vgId, vgMem, pDnode->id, pDnode->memAvail, pDnode->memUsed);
1575
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE);
×
1576
    } else {
1577
      pDnode->memUsed += vgMem;
79,522✔
1578
    }
1579

1580
    pVgid->dnodeId = pDnode->id;
79,522✔
1581
    pVgid->syncState = TAOS_SYNC_STATE_OFFLINE;
79,522✔
1582
    mInfo("trans:%id, db:%s, vgId:%d, vn:%d is added, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
79,522✔
1583
          pTrans->id, pVgroup->dbName, pVgroup->vgId, pVgroup->replica, vgMem, pVgid->dnodeId, pDnode->memAvail,
1584
          pDnode->memUsed);
1585

1586
    pVgroup->replica++;
79,522✔
1587
    pDnode->numOfVnodes++;
79,522✔
1588

1589
    SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
79,522✔
1590
    if (pVgRaw == NULL) {
79,522✔
1591
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1592
      if (terrno != 0) code = terrno;
×
1593
      TAOS_RETURN(code);
×
1594
    }
1595
    if ((code = mndTransAppendGroupRedolog(pTrans, pVgRaw, pVgroup->vgId)) != 0) {
79,522✔
1596
      sdbFreeRaw(pVgRaw);
×
1597
      TAOS_RETURN(code);
×
1598
    }
1599
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
79,522✔
1600
    if (code != 0) {
79,522✔
1601
      mError("trans:%d, vgId:%d, failed to set raw status since %s at line:%d", pTrans->id, pVgroup->vgId,
×
1602
             tstrerror(code), __LINE__);
1603
    }
1604
    TAOS_RETURN(code);
79,522✔
1605
  }
1606

1607
  code = TSDB_CODE_MND_NO_ENOUGH_DNODES;
962✔
1608
  mError("trans:%d, db:%s, failed to add vnode to vgId:%d since %s", pTrans->id, pVgroup->dbName, pVgroup->vgId,
962✔
1609
         tstrerror(code));
1610
  TAOS_RETURN(code);
962✔
1611
}
1612

1613
static int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray,
15,070✔
1614
                                        SVnodeGid *pDelVgid) {
1615
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
15,070✔
1616
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
67,625✔
1617
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
52,555✔
1618
    mInfo("trans:%d, dnode:%d, equivalent vnodes:%d others:%d", pTrans->id, pDnode->id, pDnode->numOfVnodes,
52,555✔
1619
          pDnode->numOfOtherNodes);
1620
  }
1621

1622
  int32_t code = -1;
15,070✔
1623
  for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) {
20,368✔
1624
    SDnodeObj *pDnode = taosArrayGet(pArray, d);
20,368✔
1625

1626
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
42,044✔
1627
      SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
36,746✔
1628
      if (pVgid->dnodeId == pDnode->id) {
36,746✔
1629
        int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
15,070✔
1630
        pDnode->memUsed -= vgMem;
15,070✔
1631
        mInfo("trans:%d, db:%s, vgId:%d, vn:%d is removed, memory:%" PRId64 ", dnode:%d avail:%" PRId64
15,070✔
1632
              " used:%" PRId64,
1633
              pTrans->id, pVgroup->dbName, pVgroup->vgId, vn, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
1634
        pDnode->numOfVnodes--;
15,070✔
1635
        pVgroup->replica--;
15,070✔
1636
        *pDelVgid = *pVgid;
15,070✔
1637
        *pVgid = pVgroup->vnodeGid[pVgroup->replica];
15,070✔
1638
        memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
15,070✔
1639
        code = 0;
15,070✔
1640
        goto _OVER;
15,070✔
1641
      }
1642
    }
1643
  }
1644

1645
_OVER:
×
1646
  if (code != 0) {
15,070✔
1647
    code = TSDB_CODE_APP_ERROR;
×
1648
    mError("trans:%d, db:%s, failed to remove vnode from vgId:%d since %s", pTrans->id, pVgroup->dbName, pVgroup->vgId,
×
1649
           tstrerror(code));
1650
    TAOS_RETURN(code);
×
1651
  }
1652

1653
  for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
40,130✔
1654
    SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
25,060✔
1655
    mInfo("trans:%d, db:%s, vgId:%d, vn:%d dnode:%d is reserved", pTrans->id, pVgroup->dbName, pVgroup->vgId, vn,
25,060✔
1656
          pVgid->dnodeId);
1657
  }
1658

1659
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
15,070✔
1660
  if (pVgRaw == NULL) {
15,070✔
1661
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1662
    if (terrno != 0) code = terrno;
×
1663
    TAOS_RETURN(code);
×
1664
  }
1665
  if (mndTransAppendGroupRedolog(pTrans, pVgRaw, pVgroup->vgId) != 0) {
15,070✔
1666
    sdbFreeRaw(pVgRaw);
×
1667
    TAOS_RETURN(code);
×
1668
  }
1669
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
15,070✔
1670
  if (code != 0) {
15,070✔
1671
    mError("trans:%d, vgId:%d, failed to set raw status since %s at line:%d", pTrans->id, pVgroup->vgId,
×
1672
           tstrerror(code), __LINE__);
1673
  }
1674

1675
  TAOS_RETURN(code);
15,070✔
1676
}
1677

1678
static int32_t mndRemoveVnodeFromVgroupWithoutSave(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray,
×
1679
                                                   SVnodeGid *pDelVgid) {
1680
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
×
1681
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
×
1682
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
×
1683
    mInfo("dnode:%d, equivalent vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes);
×
1684
  }
1685

1686
  int32_t code = -1;
×
1687
  for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) {
×
1688
    SDnodeObj *pDnode = taosArrayGet(pArray, d);
×
1689

1690
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
×
1691
      SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
×
1692
      if (pVgid->dnodeId == pDnode->id) {
×
1693
        int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
×
1694
        pDnode->memUsed -= vgMem;
×
1695
        mInfo("db:%s, vgId:%d, vn:%d is removed, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
×
1696
              pVgroup->dbName, pVgroup->vgId, vn, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
1697
        pDnode->numOfVnodes--;
×
1698
        pVgroup->replica--;
×
1699
        *pDelVgid = *pVgid;
×
1700
        *pVgid = pVgroup->vnodeGid[pVgroup->replica];
×
1701
        memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
×
1702
        code = 0;
×
1703
        goto _OVER;
×
1704
      }
1705
    }
1706
  }
1707

1708
_OVER:
×
1709
  if (code != 0) {
×
1710
    code = TSDB_CODE_APP_ERROR;
×
1711
    mError("db:%s, failed to remove vnode from vgId:%d since %s", pVgroup->dbName, pVgroup->vgId, tstrerror(code));
×
1712
    TAOS_RETURN(code);
×
1713
  }
1714

1715
  for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
×
1716
    SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
×
1717
    mInfo("db:%s, vgId:%d, vn:%d dnode:%d is reserved", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId);
×
1718
  }
1719

1720
  TAOS_RETURN(code);
×
1721
}
1722

1723
int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid) {
3,058,462✔
1724
  int32_t      code = 0;
3,058,462✔
1725
  STransAction action = {0};
3,058,462✔
1726

1727
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
3,058,462✔
1728
  if (pDnode == NULL) return -1;
3,058,462✔
1729
  action.epSet = mndGetDnodeEpset(pDnode);
3,058,462✔
1730
  mndReleaseDnode(pMnode, pDnode);
3,058,462✔
1731

1732
  int32_t contLen = 0;
3,058,462✔
1733
  void   *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
3,058,462✔
1734
  if (pReq == NULL) return -1;
3,058,462✔
1735

1736
  action.pCont = pReq;
3,058,462✔
1737
  action.contLen = contLen;
3,058,462✔
1738
  action.msgType = TDMT_DND_CREATE_VNODE;
3,058,462✔
1739
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
3,058,462✔
1740
  action.groupId = pVgroup->vgId;
3,058,462✔
1741

1742
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
3,058,462✔
1743
    taosMemoryFree(pReq);
×
1744
    TAOS_RETURN(code);
×
1745
  }
1746

1747
  TAOS_RETURN(code);
3,058,462✔
1748
}
1749

1750
int32_t mndRestoreAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
6,300✔
1751
                                       SDnodeObj *pDnode) {
1752
  int32_t      code = 0;
6,300✔
1753
  STransAction action = {0};
6,300✔
1754

1755
  action.epSet = mndGetDnodeEpset(pDnode);
6,300✔
1756

1757
  int32_t contLen = 0;
6,300✔
1758
  void   *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
6,300✔
1759
  if (pReq == NULL) {
6,300✔
1760
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1761
    if (terrno != 0) code = terrno;
×
1762
    TAOS_RETURN(code);
×
1763
  }
1764

1765
  action.pCont = pReq;
6,300✔
1766
  action.contLen = contLen;
6,300✔
1767
  action.msgType = TDMT_DND_CREATE_VNODE;
6,300✔
1768
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
6,300✔
1769
  action.groupId = pVgroup->vgId;
6,300✔
1770

1771
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
6,300✔
1772
    taosMemoryFree(pReq);
×
1773
    TAOS_RETURN(code);
×
1774
  }
1775

1776
  TAOS_RETURN(code);
6,300✔
1777
}
1778

1779
int32_t mndAddAlterVnodeConfirmAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
279,843✔
1780
  int32_t      code = 0;
279,843✔
1781
  STransAction action = {0};
279,843✔
1782
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
279,843✔
1783

1784
  mInfo("trans:%d, vgId:%d, build alter vnode confirm req", pTrans->id, pVgroup->vgId);
279,843✔
1785
  int32_t   contLen = sizeof(SMsgHead);
279,843✔
1786
  SMsgHead *pHead = taosMemoryMalloc(contLen);
279,843✔
1787
  if (pHead == NULL) {
279,843✔
1788
    TAOS_RETURN(terrno);
×
1789
  }
1790

1791
  pHead->contLen = htonl(contLen);
279,843✔
1792
  pHead->vgId = htonl(pVgroup->vgId);
279,843✔
1793

1794
  action.pCont = pHead;
279,843✔
1795
  action.contLen = contLen;
279,843✔
1796
  action.msgType = TDMT_VND_ALTER_CONFIRM;
279,843✔
1797
  // incorrect redirect result will cause this erro
1798
  action.retryCode = TSDB_CODE_VND_INVALID_VGROUP_ID;
279,843✔
1799
  action.groupId = pVgroup->vgId;
279,843✔
1800

1801
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
279,843✔
1802
    taosMemoryFree(pHead);
×
1803
    TAOS_RETURN(code);
×
1804
  }
1805

1806
  TAOS_RETURN(code);
279,843✔
1807
}
1808

1809
int32_t mndAddChangeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pOldVgroup, SVgObj *pNewVgroup,
×
1810
                                 int32_t dnodeId) {
1811
  int32_t      code = 0;
×
1812
  STransAction action = {0};
×
1813
  action.epSet = mndGetVgroupEpset(pMnode, pNewVgroup);
×
1814

1815
  int32_t contLen = 0;
×
1816
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pNewVgroup, dnodeId, &contLen);
×
1817
  if (pReq == NULL) {
×
1818
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1819
    if (terrno != 0) code = terrno;
×
1820
    TAOS_RETURN(code);
×
1821
  }
1822

1823
  int32_t totallen = contLen + sizeof(SMsgHead);
×
1824

1825
  SMsgHead *pHead = taosMemoryMalloc(totallen);
×
1826
  if (pHead == NULL) {
×
1827
    taosMemoryFree(pReq);
×
1828
    TAOS_RETURN(terrno);
×
1829
  }
1830

1831
  pHead->contLen = htonl(totallen);
×
1832
  pHead->vgId = htonl(pNewVgroup->vgId);
×
1833

1834
  memcpy((void *)(pHead + 1), pReq, contLen);
×
1835
  taosMemoryFree(pReq);
×
1836

1837
  action.pCont = pHead;
×
1838
  action.contLen = totallen;
×
1839
  action.msgType = TDMT_SYNC_CONFIG_CHANGE;
×
1840

1841
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
1842
    taosMemoryFree(pHead);
×
1843
    TAOS_RETURN(code);
×
1844
  }
1845

1846
  TAOS_RETURN(code);
×
1847
}
1848

1849
static int32_t mndAddAlterVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, int32_t srcVgId, SVgObj *pVgroup) {
23,464✔
1850
  int32_t      code = 0;
23,464✔
1851
  STransAction action = {0};
23,464✔
1852
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
23,464✔
1853

1854
  int32_t contLen = 0;
23,464✔
1855
  void   *pReq = mndBuildAlterVnodeHashRangeReq(pMnode, srcVgId, pVgroup, &contLen);
23,464✔
1856
  if (pReq == NULL) {
23,464✔
1857
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1858
    if (terrno != 0) code = terrno;
×
1859
    TAOS_RETURN(code);
×
1860
  }
1861

1862
  action.pCont = pReq;
23,464✔
1863
  action.contLen = contLen;
23,464✔
1864
  action.msgType = TDMT_VND_ALTER_HASHRANGE;
23,464✔
1865
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
23,464✔
1866

1867
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
23,464✔
1868
    taosMemoryFree(pReq);
×
1869
    TAOS_RETURN(code);
×
1870
  }
1871

1872
  mInfo("trans:%d, add alter vnode hash range action for from vgId:%d to vgId:%d", pTrans->id, srcVgId, pVgroup->vgId);
23,464✔
1873
  TAOS_RETURN(code);
23,464✔
1874
}
1875

1876
int32_t mndAddAlterVnodeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
212,040✔
1877
  int32_t      code = 0;
212,040✔
1878
  STransAction action = {0};
212,040✔
1879
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
212,040✔
1880

1881
  int32_t contLen = 0;
212,040✔
1882
  void   *pReq = mndBuildAlterVnodeConfigReq(pMnode, pDb, pVgroup, &contLen);
212,040✔
1883
  if (pReq == NULL) {
212,040✔
1884
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1885
    if (terrno != 0) code = terrno;
×
1886
    TAOS_RETURN(code);
×
1887
  }
1888

1889
  action.pCont = pReq;
212,040✔
1890
  action.contLen = contLen;
212,040✔
1891
  action.msgType = TDMT_VND_ALTER_CONFIG;
212,040✔
1892
  action.groupId = pVgroup->vgId;
212,040✔
1893

1894
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
212,040✔
1895
    taosMemoryFree(pReq);
×
1896
    TAOS_RETURN(code);
×
1897
  }
1898

1899
  TAOS_RETURN(code);
212,040✔
1900
}
1901

1902
int32_t mndAddNewVgPrepareAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVg) {
2,614,083✔
1903
  int32_t  code = 0;
2,614,083✔
1904
  SSdbRaw *pRaw = mndVgroupActionEncode(pVg);
2,614,083✔
1905
  if (pRaw == NULL) {
2,614,083✔
1906
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1907
    if (terrno != 0) code = terrno;
×
1908
    goto _err;
×
1909
  }
1910

1911
  TAOS_CHECK_GOTO(mndTransAppendPrepareLog(pTrans, pRaw), NULL, _err);
2,614,083✔
1912
  if (sdbSetRawStatus(pRaw, SDB_STATUS_CREATING) != 0) {
2,614,083✔
1913
    mError("vgId:%d, failed to set raw status at line:%d", pVg->vgId, __LINE__);
×
1914
  }
1915
  if (code != 0) {
2,614,083✔
1916
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVg->vgId, tstrerror(code), __LINE__);
×
1917
    TAOS_RETURN(code);
×
1918
  }
1919
  pRaw = NULL;
2,614,083✔
1920
  TAOS_RETURN(code);
2,614,083✔
1921

1922
_err:
×
1923
  sdbFreeRaw(pRaw);
×
1924
  TAOS_RETURN(code);
×
1925
}
1926

1927
int32_t mndAddAlterVnodeReplicaAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
684,709✔
1928
  int32_t    code = 0;
684,709✔
1929
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
684,709✔
1930
  if (pDnode == NULL) {
684,709✔
1931
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1932
    if (terrno != 0) code = terrno;
×
1933
    TAOS_RETURN(code);
×
1934
  }
1935

1936
  STransAction action = {0};
684,709✔
1937
  action.epSet = mndGetDnodeEpset(pDnode);
684,709✔
1938
  mndReleaseDnode(pMnode, pDnode);
684,709✔
1939

1940
  int32_t contLen = 0;
684,709✔
1941
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
684,709✔
1942
  if (pReq == NULL) {
684,709✔
1943
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1944
    if (terrno != 0) code = terrno;
×
1945
    TAOS_RETURN(code);
×
1946
  }
1947

1948
  action.pCont = pReq;
684,709✔
1949
  action.contLen = contLen;
684,709✔
1950
  action.msgType = TDMT_VND_ALTER_REPLICA;
684,709✔
1951
  action.groupId = pVgroup->vgId;
684,709✔
1952

1953
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
684,709✔
1954
    taosMemoryFree(pReq);
×
1955
    TAOS_RETURN(code);
×
1956
  }
1957

1958
  TAOS_RETURN(code);
684,709✔
1959
}
1960

1961
int32_t mndAddCheckLearnerCatchupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
×
1962
  int32_t    code = 0;
×
1963
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
×
1964
  if (pDnode == NULL) {
×
1965
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1966
    if (terrno != 0) code = terrno;
×
1967
    TAOS_RETURN(code);
×
1968
  }
1969

1970
  STransAction action = {0};
×
1971
  action.epSet = mndGetDnodeEpset(pDnode);
×
1972
  mndReleaseDnode(pMnode, pDnode);
×
1973

1974
  int32_t contLen = 0;
×
1975
  void   *pReq = mndBuildCheckLearnCatchupReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
×
1976
  if (pReq == NULL) {
×
1977
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1978
    if (terrno != 0) code = terrno;
×
1979
    TAOS_RETURN(code);
×
1980
  }
1981

1982
  action.pCont = pReq;
×
1983
  action.contLen = contLen;
×
1984
  action.msgType = TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP;
×
1985
  action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
1986
  action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
×
1987

1988
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
1989
    taosMemoryFree(pReq);
×
1990
    TAOS_RETURN(code);
×
1991
  }
1992

1993
  TAOS_RETURN(code);
×
1994
}
1995

1996
int32_t mndAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
100,713✔
1997
  int32_t    code = 0;
100,713✔
1998
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
100,713✔
1999
  if (pDnode == NULL) {
100,713✔
2000
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2001
    if (terrno != 0) code = terrno;
×
2002
    TAOS_RETURN(code);
×
2003
  }
2004

2005
  STransAction action = {0};
100,713✔
2006
  action.epSet = mndGetDnodeEpset(pDnode);
100,713✔
2007
  mndReleaseDnode(pMnode, pDnode);
100,713✔
2008

2009
  int32_t contLen = 0;
100,713✔
2010
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
100,713✔
2011
  if (pReq == NULL) {
100,713✔
2012
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2013
    if (terrno != 0) code = terrno;
×
2014
    TAOS_RETURN(code);
×
2015
  }
2016

2017
  action.pCont = pReq;
100,713✔
2018
  action.contLen = contLen;
100,713✔
2019
  action.msgType = TDMT_DND_ALTER_VNODE_TYPE;
100,713✔
2020
  action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
100,713✔
2021
  action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
100,713✔
2022
  action.groupId = pVgroup->vgId;
100,713✔
2023

2024
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
100,713✔
2025
    taosMemoryFree(pReq);
×
2026
    TAOS_RETURN(code);
×
2027
  }
2028

2029
  TAOS_RETURN(code);
100,713✔
2030
}
2031

2032
int32_t mndRestoreAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
6,300✔
2033
                                          SDnodeObj *pDnode) {
2034
  int32_t      code = 0;
6,300✔
2035
  STransAction action = {0};
6,300✔
2036
  action.epSet = mndGetDnodeEpset(pDnode);
6,300✔
2037

2038
  int32_t contLen = 0;
6,300✔
2039
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, pDnode->id, &contLen);
6,300✔
2040
  if (pReq == NULL) {
6,300✔
2041
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2042
    if (terrno != 0) code = terrno;
×
2043
    TAOS_RETURN(code);
×
2044
  }
2045

2046
  action.pCont = pReq;
6,300✔
2047
  action.contLen = contLen;
6,300✔
2048
  action.msgType = TDMT_DND_ALTER_VNODE_TYPE;
6,300✔
2049
  action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
6,300✔
2050
  action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
6,300✔
2051
  action.groupId = pVgroup->vgId;
6,300✔
2052

2053
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
6,300✔
2054
    taosMemoryFree(pReq);
×
2055
    TAOS_RETURN(code);
×
2056
  }
2057

2058
  TAOS_RETURN(code);
6,300✔
2059
}
2060

2061
static int32_t mndAddDisableVnodeWriteAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
23,464✔
2062
                                             int32_t dnodeId) {
2063
  int32_t    code = 0;
23,464✔
2064
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
23,464✔
2065
  if (pDnode == NULL) {
23,464✔
2066
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2067
    if (terrno != 0) code = terrno;
×
2068
    TAOS_RETURN(code);
×
2069
  }
2070

2071
  STransAction action = {0};
23,464✔
2072
  action.epSet = mndGetDnodeEpset(pDnode);
23,464✔
2073
  mndReleaseDnode(pMnode, pDnode);
23,464✔
2074

2075
  int32_t contLen = 0;
23,464✔
2076
  void   *pReq = mndBuildDisableVnodeWriteReq(pMnode, pDb, pVgroup->vgId, &contLen);
23,464✔
2077
  if (pReq == NULL) {
23,464✔
2078
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2079
    if (terrno != 0) code = terrno;
×
2080
    TAOS_RETURN(code);
×
2081
  }
2082

2083
  action.pCont = pReq;
23,464✔
2084
  action.contLen = contLen;
23,464✔
2085
  action.msgType = TDMT_VND_DISABLE_WRITE;
23,464✔
2086

2087
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
23,464✔
2088
    taosMemoryFree(pReq);
×
2089
    TAOS_RETURN(code);
×
2090
  }
2091

2092
  TAOS_RETURN(code);
23,464✔
2093
}
2094

2095
int32_t mndAddDropVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid,
4,562,074✔
2096
                              bool isRedo) {
2097
  int32_t      code = 0;
4,562,074✔
2098
  STransAction action = {0};
4,562,074✔
2099

2100
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
4,562,074✔
2101
  if (pDnode == NULL) {
4,562,074✔
2102
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2103
    if (terrno != 0) code = terrno;
×
2104
    TAOS_RETURN(code);
×
2105
  }
2106
  action.epSet = mndGetDnodeEpset(pDnode);
4,562,074✔
2107
  mndReleaseDnode(pMnode, pDnode);
4,562,074✔
2108

2109
  int32_t contLen = 0;
4,562,074✔
2110
  void   *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
4,562,074✔
2111
  if (pReq == NULL) {
4,562,074✔
2112
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2113
    if (terrno != 0) code = terrno;
×
2114
    TAOS_RETURN(code);
×
2115
  }
2116

2117
  action.pCont = pReq;
4,562,074✔
2118
  action.contLen = contLen;
4,562,074✔
2119
  action.msgType = TDMT_DND_DROP_VNODE;
4,562,074✔
2120
  action.acceptableCode = TSDB_CODE_VND_NOT_EXIST;
4,562,074✔
2121
  action.groupId = pVgroup->vgId;
4,562,074✔
2122

2123
  if (isRedo) {
4,562,074✔
2124
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
1,648,737✔
2125
      taosMemoryFree(pReq);
×
2126
      TAOS_RETURN(code);
×
2127
    }
2128
  } else {
2129
    if ((code = mndTransAppendUndoAction(pTrans, &action)) != 0) {
2,913,337✔
2130
      taosMemoryFree(pReq);
×
2131
      TAOS_RETURN(code);
×
2132
    }
2133
  }
2134

2135
  TAOS_RETURN(code);
4,562,074✔
2136
}
2137

2138
int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t vnIndex,
17,089✔
2139
                                    SArray *pArray, bool force, bool unsafe) {
2140
  int32_t code = 0;
17,089✔
2141
  SVgObj  newVg = {0};
17,089✔
2142
  memcpy(&newVg, pVgroup, sizeof(SVgObj));
17,089✔
2143

2144
  mInfo("vgId:%d, trans:%d, vgroup info before move, replica:%d", newVg.vgId, pTrans->id, newVg.replica);
17,089✔
2145
  for (int32_t i = 0; i < newVg.replica; ++i) {
55,282✔
2146
    mInfo("vgId:%d, trans:%d, vnode:%d dnode:%d", newVg.vgId, pTrans->id, i, newVg.vnodeGid[i].dnodeId);
38,193✔
2147
  }
2148

2149
  if (!force) {
17,089✔
2150
#if 1
2151
    {
2152
#else
2153
    if (newVg.replica == 1) {
2154
#endif
2155
      mInfo("vgId:%d, trans:%d, will add 1 vnode, replca:%d", pVgroup->vgId, pTrans->id, newVg.replica);
17,089✔
2156
      TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray));
17,089✔
2157
      for (int32_t i = 0; i < newVg.replica - 1; ++i) {
55,282✔
2158
        TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId));
38,193✔
2159
      }
2160
      TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[newVg.replica - 1]));
17,089✔
2161
      TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg));
17,089✔
2162

2163
      mInfo("vgId:%d, trans:%d, will remove 1 vnode, replca:2", pVgroup->vgId, pTrans->id);
17,089✔
2164
      newVg.replica--;
17,089✔
2165
      SVnodeGid del = newVg.vnodeGid[vnIndex];
17,089✔
2166
      newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
17,089✔
2167
      memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
17,089✔
2168
      {
2169
        SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
17,089✔
2170
        if (pRaw == NULL) {
17,089✔
2171
          code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2172
          if (terrno != 0) code = terrno;
×
2173
          TAOS_RETURN(code);
×
2174
        }
2175
        if ((code = mndTransAppendGroupRedolog(pTrans, pRaw, pVgroup->vgId)) != 0) {
17,089✔
2176
          sdbFreeRaw(pRaw);
×
2177
          TAOS_RETURN(code);
×
2178
        }
2179
        code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
17,089✔
2180
        if (code != 0) {
17,089✔
2181
          mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2182
          return code;
×
2183
        }
2184
      }
2185

2186
      TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true));
17,089✔
2187
      for (int32_t i = 0; i < newVg.replica; ++i) {
55,282✔
2188
        TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId));
38,193✔
2189
      }
2190
      TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg));
17,089✔
2191
#if 1
2192
    }
2193
#else
2194
    } else {  // new replica == 3
2195
      mInfo("vgId:%d, will add 1 vnode, replca:3", pVgroup->vgId);
2196
      if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1;
2197
      mInfo("vgId:%d, will remove 1 vnode, replca:4", pVgroup->vgId);
2198
      newVg.replica--;
2199
      SVnodeGid del = newVg.vnodeGid[vnIndex];
2200
      newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
2201
      memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
2202
      {
2203
        SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
2204
        if (pRaw == NULL) return -1;
2205
        if (mndTransAppendRedolog(pTrans, pRaw) != 0) {
2206
          sdbFreeRaw(pRaw);
2207
          return -1;
2208
        }
2209
      }
2210

2211
      if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true) != 0) return -1;
2212
      for (int32_t i = 0; i < newVg.replica; ++i) {
2213
        if (i == vnIndex) continue;
2214
        if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1;
2215
      }
2216
      if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[vnIndex]) != 0) return -1;
2217
      if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
2218
    }
2219
#endif
2220
  } else {
2221
    mInfo("vgId:%d, will add 1 vnode and force remove 1 vnode", pVgroup->vgId);
×
2222
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray));
×
2223
    newVg.replica--;
×
2224
    // SVnodeGid del = newVg.vnodeGid[vnIndex];
2225
    newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
×
2226
    memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
×
2227
    {
2228
      SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
×
2229
      if (pRaw == NULL) {
×
2230
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2231
        if (terrno != 0) code = terrno;
×
2232
        TAOS_RETURN(code);
×
2233
      }
2234
      if ((code = mndTransAppendGroupRedolog(pTrans, pRaw, pVgroup->vgId)) != 0) {
×
2235
        sdbFreeRaw(pRaw);
×
2236
        TAOS_RETURN(code);
×
2237
      }
2238
      code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
×
2239
      if (code != 0) {
×
2240
        mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2241
        return code;
×
2242
      }
2243
    }
2244

2245
    for (int32_t i = 0; i < newVg.replica; ++i) {
×
2246
      if (i != vnIndex) {
×
2247
        TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId));
×
2248
      }
2249
    }
2250
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[vnIndex]));
×
2251
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg));
×
2252

2253
    if (newVg.replica == 1) {
×
2254
      if (force && !unsafe) {
×
2255
        TAOS_RETURN(TSDB_CODE_VND_META_DATA_UNSAFE_DELETE);
×
2256
      }
2257

2258
      SSdb *pSdb = pMnode->pSdb;
×
2259
      void *pIter = NULL;
×
2260

2261
      while (1) {
×
2262
        SStbObj *pStb = NULL;
×
2263
        pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
×
2264
        if (pIter == NULL) break;
×
2265

2266
        if (strcmp(pStb->db, pDb->name) == 0) {
×
2267
          if ((code = mndSetForceDropCreateStbRedoActions(pMnode, pTrans, &newVg, pStb)) != 0) {
×
2268
            sdbCancelFetch(pSdb, pIter);
×
2269
            sdbRelease(pSdb, pStb);
×
2270
            TAOS_RETURN(code);
×
2271
          }
2272
        }
2273

2274
        sdbRelease(pSdb, pStb);
×
2275
      }
2276

2277
      mInfo("vgId:%d, all data is dropped since replica=1", pVgroup->vgId);
×
2278
    }
2279
  }
2280

2281
  {
2282
    SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
17,089✔
2283
    if (pRaw == NULL) {
17,089✔
2284
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2285
      if (terrno != 0) code = terrno;
×
2286
      TAOS_RETURN(code);
×
2287
    }
2288
    if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
17,089✔
2289
      sdbFreeRaw(pRaw);
×
2290
      TAOS_RETURN(code);
×
2291
    }
2292
    code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
17,089✔
2293
    if (code != 0) {
17,089✔
2294
      mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2295
      return code;
×
2296
    }
2297
  }
2298

2299
  mInfo("vgId:%d, vgroup info after move, replica:%d", newVg.vgId, newVg.replica);
17,089✔
2300
  for (int32_t i = 0; i < newVg.replica; ++i) {
55,282✔
2301
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
38,193✔
2302
  }
2303
  TAOS_RETURN(code);
17,089✔
2304
}
2305

2306
int32_t mndSetMoveVgroupsInfoToTrans(SMnode *pMnode, STrans *pTrans, int32_t delDnodeId, bool force, bool unsafe) {
8,425✔
2307
  int32_t code = 0;
8,425✔
2308
  SArray *pArray = mndBuildDnodesArray(pMnode, delDnodeId, NULL);
8,425✔
2309
  if (pArray == NULL) {
8,425✔
2310
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2311
    if (terrno != 0) code = terrno;
×
2312
    TAOS_RETURN(code);
×
2313
  }
2314

2315
  void *pIter = NULL;
8,425✔
2316
  while (1) {
24,424✔
2317
    SVgObj *pVgroup = NULL;
32,849✔
2318
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
32,849✔
2319
    if (pIter == NULL) break;
32,849✔
2320

2321
    int32_t vnIndex = -1;
24,424✔
2322
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
47,156✔
2323
      if (pVgroup->vnodeGid[i].dnodeId == delDnodeId) {
39,821✔
2324
        vnIndex = i;
17,089✔
2325
        break;
17,089✔
2326
      }
2327
    }
2328

2329
    code = 0;
24,424✔
2330
    if (vnIndex != -1) {
24,424✔
2331
      mInfo("vgId:%d, trans:%d, vnode:%d will be removed from dnode:%d, force:%d", pVgroup->vgId, pTrans->id, vnIndex,
17,089✔
2332
            delDnodeId, force);
2333
      SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
17,089✔
2334
      code = mndSetMoveVgroupInfoToTrans(pMnode, pTrans, pDb, pVgroup, vnIndex, pArray, force, unsafe);
17,089✔
2335
      mndReleaseDb(pMnode, pDb);
17,089✔
2336
    }
2337

2338
    sdbRelease(pMnode->pSdb, pVgroup);
24,424✔
2339

2340
    if (code != 0) {
24,424✔
2341
      sdbCancelFetch(pMnode->pSdb, pIter);
×
2342
      break;
×
2343
    }
2344
  }
2345

2346
  taosArrayDestroy(pArray);
8,425✔
2347
  TAOS_RETURN(code);
8,425✔
2348
}
2349

2350
static int32_t mndAddIncVgroupReplicaToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
65,603✔
2351
                                             int32_t newDnodeId) {
2352
  int32_t code = 0;
65,603✔
2353
  mInfo("vgId:%d, will add 1 vnode, replica:%d dnode:%d", pVgroup->vgId, pVgroup->replica, newDnodeId);
65,603✔
2354

2355
  // assoc dnode
2356
  SVnodeGid *pGid = &pVgroup->vnodeGid[pVgroup->replica];
65,603✔
2357
  pVgroup->replica++;
65,603✔
2358
  pGid->dnodeId = newDnodeId;
65,603✔
2359
  pGid->syncState = TAOS_SYNC_STATE_OFFLINE;
65,603✔
2360
  pGid->nodeRole = TAOS_SYNC_ROLE_LEARNER;
65,603✔
2361

2362
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
65,603✔
2363
  if (pVgRaw == NULL) {
65,603✔
2364
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2365
    if (terrno != 0) code = terrno;
×
2366
    TAOS_RETURN(code);
×
2367
  }
2368
  if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
65,603✔
2369
    sdbFreeRaw(pVgRaw);
×
2370
    TAOS_RETURN(code);
×
2371
  }
2372
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
65,603✔
2373
  if (code != 0) {
65,603✔
2374
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVgroup->vgId, tstrerror(code), __LINE__);
×
2375
    TAOS_RETURN(code);
×
2376
  }
2377

2378
  // learner
2379
  for (int32_t i = 0; i < pVgroup->replica - 1; ++i) {
218,402✔
2380
    TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId));
152,799✔
2381
  }
2382
  TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pGid));
65,603✔
2383

2384
  // voter
2385
  pGid->nodeRole = TAOS_SYNC_ROLE_VOTER;
65,603✔
2386
  TAOS_CHECK_RETURN(mndAddAlterVnodeTypeAction(pMnode, pTrans, pDb, pVgroup, pGid->dnodeId));
65,603✔
2387
  for (int32_t i = 0; i < pVgroup->replica - 1; ++i) {
218,402✔
2388
    TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId));
152,799✔
2389
  }
2390

2391
  // confirm
2392
  TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup));
65,603✔
2393

2394
  TAOS_RETURN(code);
65,603✔
2395
}
2396

2397
static int32_t mndAddDecVgroupReplicaFromTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
65,603✔
2398
                                               int32_t delDnodeId) {
2399
  int32_t code = 0;
65,603✔
2400
  mInfo("vgId:%d, will remove 1 vnode, replica:%d dnode:%d", pVgroup->vgId, pVgroup->replica, delDnodeId);
65,603✔
2401

2402
  SVnodeGid *pGid = NULL;
65,603✔
2403
  SVnodeGid  delGid = {0};
65,603✔
2404
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
108,370✔
2405
    if (pVgroup->vnodeGid[i].dnodeId == delDnodeId) {
108,370✔
2406
      pGid = &pVgroup->vnodeGid[i];
65,603✔
2407
      break;
65,603✔
2408
    }
2409
  }
2410

2411
  if (pGid == NULL) return 0;
65,603✔
2412

2413
  pVgroup->replica--;
65,603✔
2414
  memcpy(&delGid, pGid, sizeof(SVnodeGid));
65,603✔
2415
  memcpy(pGid, &pVgroup->vnodeGid[pVgroup->replica], sizeof(SVnodeGid));
65,603✔
2416
  memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
65,603✔
2417

2418
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
65,603✔
2419
  if (pVgRaw == NULL) {
65,603✔
2420
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2421
    if (terrno != 0) code = terrno;
×
2422
    TAOS_RETURN(code);
×
2423
  }
2424
  if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
65,603✔
2425
    sdbFreeRaw(pVgRaw);
×
2426
    TAOS_RETURN(code);
×
2427
  }
2428
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
65,603✔
2429
  if (code != 0) {
65,603✔
2430
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVgroup->vgId, tstrerror(code), __LINE__);
×
2431
    TAOS_RETURN(code);
×
2432
  }
2433

2434
  TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pDb, pVgroup, &delGid, true));
65,603✔
2435
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
218,402✔
2436
    TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId));
152,799✔
2437
  }
2438
  TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup));
65,603✔
2439

2440
  TAOS_RETURN(code);
65,603✔
2441
}
2442

2443
static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup, SDnodeObj *pNew1,
38,401✔
2444
                                     SDnodeObj *pOld1, SDnodeObj *pNew2, SDnodeObj *pOld2, SDnodeObj *pNew3,
2445
                                     SDnodeObj *pOld3) {
2446
  int32_t code = -1;
38,401✔
2447
  STrans *pTrans = NULL;
38,401✔
2448

2449
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "red-vgroup");
38,401✔
2450
  if (pTrans == NULL) {
38,401✔
2451
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2452
    if (terrno != 0) code = terrno;
×
2453
    goto _OVER;
×
2454
  }
2455

2456
  mndTransSetDbName(pTrans, pVgroup->dbName, NULL);
38,401✔
2457
  TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
38,401✔
2458
  TAOS_CHECK_GOTO(mndTransCheckConflictWithRetention(pMnode, pTrans), NULL, _OVER);
38,114✔
2459

2460
  mndTransSetSerial(pTrans);
38,114✔
2461
  mInfo("trans:%d, used to redistribute vgroup, vgId:%d", pTrans->id, pVgroup->vgId);
38,114✔
2462

2463
  SVgObj newVg = {0};
38,114✔
2464
  memcpy(&newVg, pVgroup, sizeof(SVgObj));
38,114✔
2465
  mInfo("vgId:%d, vgroup info before redistribute, replica:%d", newVg.vgId, newVg.replica);
38,114✔
2466
  for (int32_t i = 0; i < newVg.replica; ++i) {
126,124✔
2467
    mInfo("vgId:%d, vnode:%d dnode:%d role:%s", newVg.vgId, i, newVg.vnodeGid[i].dnodeId,
88,010✔
2468
          syncStr(newVg.vnodeGid[i].syncState));
2469
  }
2470

2471
  if (pNew1 != NULL && pOld1 != NULL) {
38,114✔
2472
    int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew1->id);
38,114✔
2473
    if (numOfVnodes >= pNew1->numOfSupportVnodes) {
38,114✔
2474
      mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew1->id, numOfVnodes,
717✔
2475
             pNew1->numOfSupportVnodes);
2476
      code = TSDB_CODE_MND_NO_ENOUGH_VNODES;
717✔
2477
      goto _OVER;
717✔
2478
    }
2479

2480
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
37,397✔
2481
    if (pNew1->memAvail - vgMem - pNew1->memUsed <= 0) {
37,397✔
2482
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
2483
             pVgroup->dbName, pVgroup->vgId, vgMem, pNew1->id, pNew1->memAvail, pNew1->memUsed);
2484
      code = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
×
2485
      goto _OVER;
×
2486
    } else {
2487
      pNew1->memUsed += vgMem;
37,397✔
2488
    }
2489

2490
    TAOS_CHECK_GOTO(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew1->id), NULL, _OVER);
37,397✔
2491
    TAOS_CHECK_GOTO(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld1->id), NULL, _OVER);
37,397✔
2492
  }
2493

2494
  if (pNew2 != NULL && pOld2 != NULL) {
37,397✔
2495
    int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew2->id);
9,909✔
2496
    if (numOfVnodes >= pNew2->numOfSupportVnodes) {
9,909✔
2497
      mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew2->id, numOfVnodes,
×
2498
             pNew2->numOfSupportVnodes);
2499
      code = TSDB_CODE_MND_NO_ENOUGH_VNODES;
×
2500
      goto _OVER;
×
2501
    }
2502
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
9,909✔
2503
    if (pNew2->memAvail - vgMem - pNew2->memUsed <= 0) {
9,909✔
2504
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
2505
             pVgroup->dbName, pVgroup->vgId, vgMem, pNew2->id, pNew2->memAvail, pNew2->memUsed);
2506
      code = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
×
2507
      goto _OVER;
×
2508
    } else {
2509
      pNew2->memUsed += vgMem;
9,909✔
2510
    }
2511
    TAOS_CHECK_GOTO(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew2->id), NULL, _OVER);
9,909✔
2512
    TAOS_CHECK_GOTO(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld2->id), NULL, _OVER);
9,909✔
2513
  }
2514

2515
  if (pNew3 != NULL && pOld3 != NULL) {
37,397✔
2516
    int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew3->id);
2,903✔
2517
    if (numOfVnodes >= pNew3->numOfSupportVnodes) {
2,903✔
2518
      mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew3->id, numOfVnodes,
×
2519
             pNew3->numOfSupportVnodes);
2520
      code = TSDB_CODE_MND_NO_ENOUGH_VNODES;
×
2521
      goto _OVER;
×
2522
    }
2523
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
2,903✔
2524
    if (pNew3->memAvail - vgMem - pNew3->memUsed <= 0) {
2,903✔
2525
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
2526
             pVgroup->dbName, pVgroup->vgId, vgMem, pNew3->id, pNew3->memAvail, pNew3->memUsed);
2527
      code = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
×
2528
      goto _OVER;
×
2529
    } else {
2530
      pNew3->memUsed += vgMem;
2,903✔
2531
    }
2532
    TAOS_CHECK_GOTO(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew3->id), NULL, _OVER);
2,903✔
2533
    TAOS_CHECK_GOTO(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld3->id), NULL, _OVER);
2,903✔
2534
  }
2535

2536
  {
2537
    SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
37,397✔
2538
    if (pRaw == NULL) {
37,397✔
2539
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2540
      if (terrno != 0) code = terrno;
×
2541
      goto _OVER;
×
2542
    }
2543
    if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
37,397✔
2544
      sdbFreeRaw(pRaw);
×
2545
      goto _OVER;
×
2546
    }
2547
    code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
37,397✔
2548
    if (code != 0) {
37,397✔
2549
      mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2550
      goto _OVER;
×
2551
    }
2552
  }
2553

2554
  mInfo("vgId:%d, vgroup info after redistribute, replica:%d", newVg.vgId, newVg.replica);
37,397✔
2555
  for (int32_t i = 0; i < newVg.replica; ++i) {
123,256✔
2556
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
85,859✔
2557
  }
2558

2559
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
37,397✔
2560
  code = 0;
34,527✔
2561

2562
_OVER:
38,401✔
2563
  mndTransDrop(pTrans);
38,401✔
2564
  mndReleaseDb(pMnode, pDb);
38,401✔
2565
  TAOS_RETURN(code);
38,401✔
2566
}
2567

2568
static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) {
46,384✔
2569
  SMnode    *pMnode = pReq->info.node;
46,384✔
2570
  SDnodeObj *pNew1 = NULL;
46,384✔
2571
  SDnodeObj *pNew2 = NULL;
46,384✔
2572
  SDnodeObj *pNew3 = NULL;
46,384✔
2573
  SDnodeObj *pOld1 = NULL;
46,384✔
2574
  SDnodeObj *pOld2 = NULL;
46,384✔
2575
  SDnodeObj *pOld3 = NULL;
46,384✔
2576
  SVgObj    *pVgroup = NULL;
46,384✔
2577
  SDbObj    *pDb = NULL;
46,384✔
2578
  int32_t    code = -1;
46,384✔
2579
  int64_t    curMs = taosGetTimestampMs();
46,384✔
2580
  int32_t    newDnodeId[3] = {0};
46,384✔
2581
  int32_t    oldDnodeId[3] = {0};
46,384✔
2582
  int32_t    newIndex = -1;
46,384✔
2583
  int32_t    oldIndex = -1;
46,384✔
2584

2585
  SRedistributeVgroupReq req = {0};
46,384✔
2586
  if (tDeserializeSRedistributeVgroupReq(pReq->pCont, pReq->contLen, &req) != 0) {
46,384✔
2587
    code = TSDB_CODE_INVALID_MSG;
×
2588
    goto _OVER;
×
2589
  }
2590

2591
  mInfo("vgId:%d, start to redistribute vgroup to dnode %d:%d:%d", req.vgId, req.dnodeId1, req.dnodeId2, req.dnodeId3);
46,384✔
2592
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_REDISTRIBUTE_VGROUP)) != 0) {
46,384✔
2593
    goto _OVER;
359✔
2594
  }
2595

2596
  pVgroup = mndAcquireVgroup(pMnode, req.vgId);
46,025✔
2597
  if (pVgroup == NULL) {
46,025✔
2598
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
2,151✔
2599
    if (terrno != 0) code = terrno;
2,151✔
2600
    goto _OVER;
2,151✔
2601
  }
2602
  if (pVgroup->mountVgId) {
43,874✔
2603
    code = TSDB_CODE_MND_MOUNT_OBJ_NOT_SUPPORT;
×
2604
    goto _OVER;
×
2605
  }
2606
  pDb = mndAcquireDb(pMnode, pVgroup->dbName);
43,874✔
2607
  if (pDb == NULL) {
43,874✔
2608
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2609
    if (terrno != 0) code = terrno;
×
2610
    goto _OVER;
×
2611
  }
2612

2613
  if (pVgroup->replica == 1) {
43,874✔
2614
    if (req.dnodeId1 <= 0 || req.dnodeId2 > 0 || req.dnodeId3 > 0) {
12,773✔
2615
      code = TSDB_CODE_MND_INVALID_REPLICA;
×
2616
      goto _OVER;
×
2617
    }
2618

2619
    if (req.dnodeId1 == pVgroup->vnodeGid[0].dnodeId) {
12,773✔
2620
      // terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
2621
      code = 0;
×
2622
      goto _OVER;
×
2623
    }
2624

2625
    pNew1 = mndAcquireDnode(pMnode, req.dnodeId1);
12,773✔
2626
    if (pNew1 == NULL) {
12,773✔
2627
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2628
      if (terrno != 0) code = terrno;
×
2629
      goto _OVER;
×
2630
    }
2631
    if (!mndIsDnodeOnline(pNew1, curMs)) {
12,773✔
2632
      code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
24✔
2633
      goto _OVER;
24✔
2634
    }
2635

2636
    pOld1 = mndAcquireDnode(pMnode, pVgroup->vnodeGid[0].dnodeId);
12,749✔
2637
    if (pOld1 == NULL) {
12,749✔
2638
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2639
      if (terrno != 0) code = terrno;
×
2640
      goto _OVER;
×
2641
    }
2642
    if (!mndIsDnodeOnline(pOld1, curMs)) {
12,749✔
2643
      code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
31✔
2644
      goto _OVER;
31✔
2645
    }
2646

2647
    code = mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, NULL, NULL, NULL, NULL);
12,718✔
2648

2649
  } else if (pVgroup->replica == 3) {
31,101✔
2650
    if (req.dnodeId1 <= 0 || req.dnodeId2 <= 0 || req.dnodeId3 <= 0) {
29,631✔
2651
      code = TSDB_CODE_MND_INVALID_REPLICA;
2,868✔
2652
      goto _OVER;
2,868✔
2653
    }
2654

2655
    if (req.dnodeId1 == req.dnodeId2 || req.dnodeId1 == req.dnodeId3 || req.dnodeId2 == req.dnodeId3) {
26,763✔
2656
      code = TSDB_CODE_MND_INVALID_REPLICA;
717✔
2657
      goto _OVER;
717✔
2658
    }
2659

2660
    if (req.dnodeId1 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId1 != pVgroup->vnodeGid[1].dnodeId &&
26,046✔
2661
        req.dnodeId1 != pVgroup->vnodeGid[2].dnodeId) {
17,941✔
2662
      newDnodeId[++newIndex] = req.dnodeId1;
9,346✔
2663
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
9,346✔
2664
    }
2665

2666
    if (req.dnodeId2 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[1].dnodeId &&
26,046✔
2667
        req.dnodeId2 != pVgroup->vnodeGid[2].dnodeId) {
14,571✔
2668
      newDnodeId[++newIndex] = req.dnodeId2;
11,366✔
2669
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
11,366✔
2670
    }
2671

2672
    if (req.dnodeId3 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId3 != pVgroup->vnodeGid[1].dnodeId &&
26,046✔
2673
        req.dnodeId3 != pVgroup->vnodeGid[2].dnodeId) {
17,837✔
2674
      newDnodeId[++newIndex] = req.dnodeId3;
15,689✔
2675
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
15,689✔
2676
    }
2677

2678
    if (req.dnodeId1 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[0].dnodeId &&
26,046✔
2679
        req.dnodeId3 != pVgroup->vnodeGid[0].dnodeId) {
16,435✔
2680
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[0].dnodeId;
12,170✔
2681
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
12,170✔
2682
    }
2683

2684
    if (req.dnodeId1 != pVgroup->vnodeGid[1].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[1].dnodeId &&
26,046✔
2685
        req.dnodeId3 != pVgroup->vnodeGid[1].dnodeId) {
16,077✔
2686
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[1].dnodeId;
12,133✔
2687
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
12,133✔
2688
    }
2689

2690
    if (req.dnodeId1 != pVgroup->vnodeGid[2].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[2].dnodeId &&
26,046✔
2691
        req.dnodeId3 != pVgroup->vnodeGid[2].dnodeId) {
14,246✔
2692
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[2].dnodeId;
12,098✔
2693
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
12,098✔
2694
    }
2695

2696
    if (newDnodeId[0] != 0) {
26,046✔
2697
      pNew1 = mndAcquireDnode(pMnode, newDnodeId[0]);
24,973✔
2698
      if (pNew1 == NULL) {
24,973✔
2699
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2700
        if (terrno != 0) code = terrno;
×
2701
        goto _OVER;
×
2702
      }
2703
      if (!mndIsDnodeOnline(pNew1, curMs)) {
24,973✔
2704
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
734✔
2705
        goto _OVER;
734✔
2706
      }
2707
    }
2708

2709
    if (newDnodeId[1] != 0) {
25,312✔
2710
      pNew2 = mndAcquireDnode(pMnode, newDnodeId[1]);
8,465✔
2711
      if (pNew2 == NULL) {
8,465✔
2712
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2713
        if (terrno != 0) code = terrno;
×
2714
        goto _OVER;
×
2715
      }
2716
      if (!mndIsDnodeOnline(pNew2, curMs)) {
8,465✔
2717
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2718
        goto _OVER;
×
2719
      }
2720
    }
2721

2722
    if (newDnodeId[2] != 0) {
25,312✔
2723
      pNew3 = mndAcquireDnode(pMnode, newDnodeId[2]);
2,929✔
2724
      if (pNew3 == NULL) {
2,929✔
2725
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2726
        if (terrno != 0) code = terrno;
×
2727
        goto _OVER;
×
2728
      }
2729
      if (!mndIsDnodeOnline(pNew3, curMs)) {
2,929✔
2730
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2731
        goto _OVER;
×
2732
      }
2733
    }
2734

2735
    if (oldDnodeId[0] != 0) {
25,312✔
2736
      pOld1 = mndAcquireDnode(pMnode, oldDnodeId[0]);
24,239✔
2737
      if (pOld1 == NULL) {
24,239✔
2738
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2739
        if (terrno != 0) code = terrno;
×
2740
        goto _OVER;
×
2741
      }
2742
      if (!mndIsDnodeOnline(pOld1, curMs)) {
24,239✔
2743
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
26✔
2744
        goto _OVER;
26✔
2745
      }
2746
    }
2747

2748
    if (oldDnodeId[1] != 0) {
25,286✔
2749
      pOld2 = mndAcquireDnode(pMnode, oldDnodeId[1]);
8,439✔
2750
      if (pOld2 == NULL) {
8,439✔
2751
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2752
        if (terrno != 0) code = terrno;
×
2753
        goto _OVER;
×
2754
      }
2755
      if (!mndIsDnodeOnline(pOld2, curMs)) {
8,439✔
2756
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2757
        goto _OVER;
×
2758
      }
2759
    }
2760

2761
    if (oldDnodeId[2] != 0) {
25,286✔
2762
      pOld3 = mndAcquireDnode(pMnode, oldDnodeId[2]);
2,903✔
2763
      if (pOld3 == NULL) {
2,903✔
2764
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2765
        if (terrno != 0) code = terrno;
×
2766
        goto _OVER;
×
2767
      }
2768
      if (!mndIsDnodeOnline(pOld3, curMs)) {
2,903✔
2769
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2770
        goto _OVER;
×
2771
      }
2772
    }
2773

2774
    if (pNew1 == NULL && pOld1 == NULL && pNew2 == NULL && pOld2 == NULL && pNew3 == NULL && pOld3 == NULL) {
25,286✔
2775
      // terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
2776
      code = 0;
1,073✔
2777
      goto _OVER;
1,073✔
2778
    }
2779

2780
    code = mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, pNew2, pOld2, pNew3, pOld3);
24,213✔
2781

2782
  } else if (pVgroup->replica == 2) {
1,470✔
2783
    if (req.dnodeId1 <= 0 || req.dnodeId2 <= 0) {
1,470✔
2784
      code = TSDB_CODE_MND_INVALID_REPLICA;
×
2785
      goto _OVER;
×
2786
    }
2787

2788
    if (req.dnodeId1 == req.dnodeId2) {
1,470✔
2789
      code = TSDB_CODE_MND_INVALID_REPLICA;
×
2790
      goto _OVER;
×
2791
    }
2792

2793
    if (req.dnodeId1 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId1 != pVgroup->vnodeGid[1].dnodeId) {
1,470✔
2794
      newDnodeId[++newIndex] = req.dnodeId1;
1,470✔
2795
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
1,470✔
2796
    }
2797

2798
    if (req.dnodeId2 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[1].dnodeId) {
1,470✔
2799
      newDnodeId[++newIndex] = req.dnodeId2;
1,470✔
2800
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
1,470✔
2801
    }
2802

2803
    if (req.dnodeId1 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[0].dnodeId) {
1,470✔
2804
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[0].dnodeId;
1,470✔
2805
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
1,470✔
2806
    }
2807

2808
    if (req.dnodeId1 != pVgroup->vnodeGid[1].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[1].dnodeId) {
1,470✔
2809
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[1].dnodeId;
1,470✔
2810
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
1,470✔
2811
    }
2812

2813
    if (newDnodeId[0] != 0) {
1,470✔
2814
      pNew1 = mndAcquireDnode(pMnode, newDnodeId[0]);
1,470✔
2815
      if (pNew1 == NULL) {
1,470✔
2816
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2817
        if (terrno != 0) code = terrno;
×
2818
        goto _OVER;
×
2819
      }
2820
      if (!mndIsDnodeOnline(pNew1, curMs)) {
1,470✔
2821
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2822
        goto _OVER;
×
2823
      }
2824
    }
2825

2826
    if (newDnodeId[1] != 0) {
1,470✔
2827
      pNew2 = mndAcquireDnode(pMnode, newDnodeId[1]);
1,470✔
2828
      if (pNew2 == NULL) {
1,470✔
2829
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2830
        if (terrno != 0) code = terrno;
×
2831
        goto _OVER;
×
2832
      }
2833
      if (!mndIsDnodeOnline(pNew2, curMs)) {
1,470✔
2834
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2835
        goto _OVER;
×
2836
      }
2837
    }
2838

2839
    if (oldDnodeId[0] != 0) {
1,470✔
2840
      pOld1 = mndAcquireDnode(pMnode, oldDnodeId[0]);
1,470✔
2841
      if (pOld1 == NULL) {
1,470✔
2842
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2843
        if (terrno != 0) code = terrno;
×
2844
        goto _OVER;
×
2845
      }
2846
      if (!mndIsDnodeOnline(pOld1, curMs)) {
1,470✔
2847
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2848
        goto _OVER;
×
2849
      }
2850
    }
2851

2852
    if (oldDnodeId[1] != 0) {
1,470✔
2853
      pOld2 = mndAcquireDnode(pMnode, oldDnodeId[1]);
1,470✔
2854
      if (pOld2 == NULL) {
1,470✔
2855
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2856
        if (terrno != 0) code = terrno;
×
2857
        goto _OVER;
×
2858
      }
2859
      if (!mndIsDnodeOnline(pOld2, curMs)) {
1,470✔
2860
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2861
        goto _OVER;
×
2862
      }
2863
    }
2864

2865
    if (pNew1 == NULL && pOld1 == NULL && pNew2 == NULL && pOld2 == NULL) {
1,470✔
2866
      // terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
2867
      code = 0;
×
2868
      goto _OVER;
×
2869
    }
2870

2871
    code = mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, pNew2, pOld2, NULL, NULL);
1,470✔
2872
  } else {
2873
    code = TSDB_CODE_MND_REQ_REJECTED;
×
2874
    goto _OVER;
×
2875
  }
2876

2877
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
38,401✔
2878

2879
  char obj[33] = {0};
38,401✔
2880
  (void)tsnprintf(obj, sizeof(obj), "%d", req.vgId);
38,401✔
2881

2882
  auditRecord(pReq, pMnode->clusterId, "RedistributeVgroup", "", obj, req.sql, req.sqlLen);
38,401✔
2883

2884
_OVER:
46,384✔
2885
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
46,384✔
2886
    mError("vgId:%d, failed to redistribute to dnode %d:%d:%d since %s", req.vgId, req.dnodeId1, req.dnodeId2,
10,784✔
2887
           req.dnodeId3, tstrerror(code));
2888
  }
2889

2890
  mndReleaseDnode(pMnode, pNew1);
46,384✔
2891
  mndReleaseDnode(pMnode, pNew2);
46,384✔
2892
  mndReleaseDnode(pMnode, pNew3);
46,384✔
2893
  mndReleaseDnode(pMnode, pOld1);
46,384✔
2894
  mndReleaseDnode(pMnode, pOld2);
46,384✔
2895
  mndReleaseDnode(pMnode, pOld3);
46,384✔
2896
  mndReleaseVgroup(pMnode, pVgroup);
46,384✔
2897
  mndReleaseDb(pMnode, pDb);
46,384✔
2898
  tFreeSRedistributeVgroupReq(&req);
46,384✔
2899

2900
  TAOS_RETURN(code);
46,384✔
2901
}
2902

2903
static void *mndBuildSForceBecomeFollowerReq(SMnode *pMnode, SVgObj *pVgroup, int32_t dnodeId, int32_t *pContLen) {
6,670✔
2904
  SForceBecomeFollowerReq balanceReq = {
6,670✔
2905
      .vgId = pVgroup->vgId,
6,670✔
2906
  };
2907

2908
  int32_t contLen = tSerializeSForceBecomeFollowerReq(NULL, 0, &balanceReq);
6,670✔
2909
  if (contLen < 0) {
6,670✔
2910
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2911
    return NULL;
×
2912
  }
2913
  contLen += sizeof(SMsgHead);
6,670✔
2914

2915
  void *pReq = taosMemoryMalloc(contLen);
6,670✔
2916
  if (pReq == NULL) {
6,670✔
2917
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2918
    return NULL;
×
2919
  }
2920

2921
  SMsgHead *pHead = pReq;
6,670✔
2922
  pHead->contLen = htonl(contLen);
6,670✔
2923
  pHead->vgId = htonl(pVgroup->vgId);
6,670✔
2924

2925
  if (tSerializeSForceBecomeFollowerReq((char *)pReq + sizeof(SMsgHead), contLen, &balanceReq) < 0) {
6,670✔
2926
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2927
    taosMemoryFree(pReq);
×
2928
    return NULL;
×
2929
  }
2930
  *pContLen = contLen;
6,670✔
2931
  return pReq;
6,670✔
2932
}
2933

2934
int32_t mndAddBalanceVgroupLeaderAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t dnodeId) {
6,670✔
2935
  int32_t    code = 0;
6,670✔
2936
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
6,670✔
2937
  if (pDnode == NULL) {
6,670✔
2938
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2939
    if (terrno != 0) code = terrno;
×
2940
    TAOS_RETURN(code);
×
2941
  }
2942

2943
  STransAction action = {0};
6,670✔
2944
  action.epSet = mndGetDnodeEpset(pDnode);
6,670✔
2945
  mndReleaseDnode(pMnode, pDnode);
6,670✔
2946

2947
  int32_t contLen = 0;
6,670✔
2948
  void   *pReq = mndBuildSForceBecomeFollowerReq(pMnode, pVgroup, dnodeId, &contLen);
6,670✔
2949
  if (pReq == NULL) {
6,670✔
2950
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2951
    if (terrno != 0) code = terrno;
×
2952
    TAOS_RETURN(code);
×
2953
  }
2954

2955
  action.pCont = pReq;
6,670✔
2956
  action.contLen = contLen;
6,670✔
2957
  action.msgType = TDMT_SYNC_FORCE_FOLLOWER;
6,670✔
2958

2959
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
6,670✔
2960
    taosMemoryFree(pReq);
×
2961
    TAOS_RETURN(code);
×
2962
  }
2963

2964
  TAOS_RETURN(code);
6,670✔
2965
}
2966

2967
static void *mndBuildAlterVnodeElectBaselineReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId,
40,020✔
2968
                                          int32_t *pContLen, int32_t ms) {
2969
  SAlterVnodeElectBaselineReq alterReq = {
40,020✔
2970
      .vgId = pVgroup->vgId,
40,020✔
2971
      .electBaseLine = ms,
2972
  };
2973

2974
  int32_t contLen = tSerializeSAlterVnodeReplicaReq(NULL, 0, &alterReq);
40,020✔
2975
  if (contLen < 0) {
40,020✔
2976
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2977
    return NULL;
×
2978
  }
2979

2980
  void *pReq = taosMemoryMalloc(contLen);
40,020✔
2981
  if (pReq == NULL) {
40,020✔
2982
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2983
    return NULL;
×
2984
  }
2985

2986
  if (tSerializeSAlterVnodeReplicaReq(pReq, contLen, &alterReq) < 0) {
40,020✔
2987
    mError("vgId:%d, failed to serialize alter vnode req,since %s", alterReq.vgId, terrstr());
×
2988
    taosMemoryFree(pReq);
×
2989
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2990
    return NULL;
×
2991
  }
2992
  *pContLen = contLen;
40,020✔
2993
  return pReq;
40,020✔
2994
}
2995

2996
static int32_t mndAddAlterVnodeElectionBaselineActionToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId, int32_t ms) {
40,020✔
2997
  int32_t    code = 0;
40,020✔
2998
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
40,020✔
2999
  if (pDnode == NULL) {
40,020✔
3000
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3001
    if (terrno != 0) code = terrno;
×
3002
    TAOS_RETURN(code);
×
3003
  }
3004

3005
  STransAction action = {0};
40,020✔
3006
  action.epSet = mndGetDnodeEpset(pDnode);
40,020✔
3007
  mndReleaseDnode(pMnode, pDnode);
40,020✔
3008

3009
  int32_t contLen = 0;
40,020✔
3010
  void   *pReq = mndBuildAlterVnodeElectBaselineReq(pMnode, pDb, pVgroup, dnodeId, &contLen, ms);
40,020✔
3011
  if (pReq == NULL) {
40,020✔
3012
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3013
    if (terrno != 0) code = terrno;
×
3014
    TAOS_RETURN(code);
×
3015
  }
3016

3017
  action.pCont = pReq;
40,020✔
3018
  action.contLen = contLen;
40,020✔
3019
  action.msgType = TDMT_VND_ALTER_ELECTBASELINE;
40,020✔
3020
  action.groupId = pVgroup->vgId;
40,020✔
3021

3022
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
40,020✔
3023
    taosMemoryFree(pReq);
×
3024
    TAOS_RETURN(code);
×
3025
  }
3026

3027
  TAOS_RETURN(code);
40,020✔
3028
}
3029

3030
static int32_t mndAddAlterVgroupElectionBaselineActionToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTrans, int32_t index){
13,340✔
3031
  int32_t code = 0;
13,340✔
3032
  SSdb   *pSdb = pMnode->pSdb;
13,340✔
3033

3034
  int32_t vgid = pVgroup->vgId;
13,340✔
3035
  int8_t  replica = pVgroup->replica;
13,340✔
3036

3037
  if (pVgroup->replica <= 1) {
13,340✔
3038
    mInfo("trans:%d, vgid:%d no need to balance, replica:%d", pTrans->id, vgid, replica);
×
3039
    return -1;
×
3040
  }
3041

3042
  for(int32_t i = 0; i < 3; i++){
53,360✔
3043
    if(i == index%3){
40,020✔
3044
      mInfo("trans:%d, balance leader to dnode:%d", pTrans->id, pVgroup->vnodeGid[i].dnodeId);
6,670✔
3045
      TAOS_CHECK_RETURN(mndAddAlterVnodeElectionBaselineActionToTrans(pMnode, pTrans, NULL, pVgroup,
6,670✔
3046
                                                                      pVgroup->vnodeGid[i].dnodeId, 1500));
3047
    }
3048
    else{
3049
    TAOS_CHECK_RETURN(
33,350✔
3050
        mndAddAlterVnodeElectionBaselineActionToTrans(pMnode, pTrans, NULL, pVgroup, pVgroup->vnodeGid[i].dnodeId, 5000));
3051
    }
3052
  }
3053
  return code; 
13,340✔
3054
}
3055

3056
int32_t mndAddVgroupBalanceToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTrans, int32_t index) {
7,161✔
3057
  int32_t code = 0;
7,161✔
3058
  SSdb   *pSdb = pMnode->pSdb;
7,161✔
3059

3060
  int32_t vgid = pVgroup->vgId;
7,161✔
3061
  int8_t  replica = pVgroup->replica;
7,161✔
3062

3063
  if (pVgroup->replica <= 1) {
7,161✔
3064
    mInfo("trans:%d, vgid:%d no need to balance, replica:%d", pTrans->id, vgid, replica);
22✔
3065
    return -1;
22✔
3066
  }
3067

3068
  int32_t dnodeId = 0;
7,139✔
3069

3070
  for (int i = 0; i < replica; i++) {
17,090✔
3071
    if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEADER) {
16,621✔
3072
      dnodeId = pVgroup->vnodeGid[i].dnodeId;
6,670✔
3073
      break;
6,670✔
3074
    }
3075
  }
3076

3077
  bool       exist = false;
7,139✔
3078
  bool       online = false;
7,139✔
3079
  int64_t    curMs = taosGetTimestampMs();
7,139✔
3080
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
7,139✔
3081
  if (pDnode != NULL) {
7,139✔
3082
    exist = true;
6,670✔
3083
    online = mndIsDnodeOnline(pDnode, curMs);
6,670✔
3084
    mndReleaseDnode(pMnode, pDnode);
6,670✔
3085
  }
3086

3087
  if (exist && online) {
13,809✔
3088
    mInfo("trans:%d, vgid:%d force drop leader from dnode:%d", pTrans->id, vgid, dnodeId);    
6,670✔
3089
    TAOS_CHECK_RETURN(mndAddAlterVgroupElectionBaselineActionToTrans(pMnode, pVgroup, pTrans, index));
6,670✔
3090

3091
    if ((code = mndAddBalanceVgroupLeaderAction(pMnode, pTrans, pVgroup, dnodeId)) != 0) {
6,670✔
3092
      mError("trans:%d, vgid:%d failed to be balanced to dnode:%d", pTrans->id, vgid, dnodeId);
×
3093
      TAOS_RETURN(code);
×
3094
    }
3095

3096
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, NULL, pVgroup));
6,670✔
3097

3098
    TAOS_CHECK_RETURN(mndAddAlterVgroupElectionBaselineActionToTrans(pMnode, pVgroup, pTrans, -1));
6,670✔
3099

3100
    SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
6,670✔
3101
    if (pDb == NULL) {
6,670✔
3102
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3103
      if (terrno != 0) code = terrno;
×
3104
      mError("trans:%d, vgid:%d failed to be balanced to dnode:%d, because db not exist", pTrans->id, vgid, dnodeId);
×
3105
      TAOS_RETURN(code);
×
3106
    }
3107

3108
    mndReleaseDb(pMnode, pDb);
6,670✔
3109
  } else {
3110
    mInfo("trans:%d, vgid:%d cant be balanced to dnode:%d, exist:%d, online:%d", pTrans->id, vgid, dnodeId, exist,
469✔
3111
          online);
3112
  }
3113

3114
  TAOS_RETURN(code);
7,139✔
3115
}
3116

3117
extern int32_t mndProcessVgroupBalanceLeaderMsgImp(SRpcMsg *pReq);
3118

3119
int32_t mndProcessVgroupBalanceLeaderMsg(SRpcMsg *pReq) { return mndProcessVgroupBalanceLeaderMsgImp(pReq); }
2,594✔
3120

3121
#ifndef TD_ENTERPRISE
3122
int32_t mndProcessVgroupBalanceLeaderMsgImp(SRpcMsg *pReq) { return 0; }
3123
#endif
3124

3125
static int32_t mndCheckDnodeMemory(SMnode *pMnode, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pOldVgroup,
212,040✔
3126
                                   SVgObj *pNewVgroup, SArray *pArray) {
3127
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
617,796✔
3128
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
405,756✔
3129
    bool       inVgroup = false;
405,756✔
3130
    int64_t    oldMemUsed = 0;
405,756✔
3131
    int64_t    newMemUsed = 0;
405,756✔
3132
    mDebug("db:%s, vgId:%d, check dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName, pNewVgroup->vgId,
405,756✔
3133
           pDnode->id, pDnode->memAvail, pDnode->memUsed);
3134
    for (int32_t j = 0; j < pOldVgroup->replica; ++j) {
1,141,500✔
3135
      SVnodeGid *pVgId = &pOldVgroup->vnodeGid[j];
735,744✔
3136
      if (pDnode->id == pVgId->dnodeId) {
735,744✔
3137
        oldMemUsed = mndGetVgroupMemory(pMnode, pOldDb, pOldVgroup);
322,036✔
3138
        inVgroup = true;
322,036✔
3139
      }
3140
    }
3141
    for (int32_t j = 0; j < pNewVgroup->replica; ++j) {
1,141,500✔
3142
      SVnodeGid *pVgId = &pNewVgroup->vnodeGid[j];
735,744✔
3143
      if (pDnode->id == pVgId->dnodeId) {
735,744✔
3144
        newMemUsed = mndGetVgroupMemory(pMnode, pNewDb, pNewVgroup);
322,036✔
3145
        inVgroup = true;
322,036✔
3146
      }
3147
    }
3148

3149
    mDebug("db:%s, vgId:%d, memory in dnode:%d, oldUsed:%" PRId64 ", newUsed:%" PRId64, pNewVgroup->dbName,
405,756✔
3150
           pNewVgroup->vgId, pDnode->id, oldMemUsed, newMemUsed);
3151

3152
    pDnode->memUsed = pDnode->memUsed - oldMemUsed + newMemUsed;
405,756✔
3153
    if (pDnode->memAvail - pDnode->memUsed <= 0) {
405,756✔
3154
      mError("db:%s, vgId:%d, no enough memory in dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName,
×
3155
             pNewVgroup->vgId, pDnode->id, pDnode->memAvail, pDnode->memUsed);
3156
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE);
×
3157
    } else if (inVgroup) {
405,756✔
3158
      mInfo("db:%s, vgId:%d, memory in dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName, pNewVgroup->vgId,
322,036✔
3159
            pDnode->id, pDnode->memAvail, pDnode->memUsed);
3160
    } else {
3161
    }
3162
  }
3163
  return 0;
212,040✔
3164
}
3165

3166
int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
246,881✔
3167
                                  SArray *pArray, SVgObj *pNewVgroup) {
3168
  int32_t code = 0;
246,881✔
3169
  memcpy(pNewVgroup, pVgroup, sizeof(SVgObj));
246,881✔
3170

3171
  if (pVgroup->replica <= 0 || pVgroup->replica == pNewDb->cfg.replications) {
246,881✔
3172
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfigAction(pMnode, pTrans, pNewDb, pVgroup));
212,040✔
3173
    TAOS_CHECK_RETURN(mndCheckDnodeMemory(pMnode, pOldDb, pNewDb, pNewVgroup, pVgroup, pArray));
212,040✔
3174
    return 0;
212,040✔
3175
  }
3176

3177
  // mndTransSetGroupParallel(pTrans);
3178

3179
  if (pNewDb->cfg.replications == 3) {
34,841✔
3180
    mInfo("trans:%d, db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pTrans->id, pVgroup->dbName, pVgroup->vgId,
28,285✔
3181
          pVgroup->vnodeGid[0].dnodeId);
3182

3183
    // add second
3184
    if (pNewVgroup->replica == 1) {
28,285✔
3185
      TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
28,285✔
3186
    }
3187

3188
    // learner stage
3189
    pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
27,550✔
3190
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
27,550✔
3191
    TAOS_CHECK_RETURN(
27,550✔
3192
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3193

3194
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &pNewVgroup->vnodeGid[1]));
27,550✔
3195

3196
    // follower stage
3197
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
27,550✔
3198
    TAOS_CHECK_RETURN(mndAddAlterVnodeTypeAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
27,550✔
3199
    TAOS_CHECK_RETURN(
27,550✔
3200
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3201

3202
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
27,550✔
3203

3204
    // add third
3205
    if (pNewVgroup->replica == 2) {
27,550✔
3206
      TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
27,550✔
3207
    }
3208

3209
    pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
27,323✔
3210
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
27,323✔
3211
    pNewVgroup->vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
27,323✔
3212
    TAOS_CHECK_RETURN(
27,323✔
3213
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3214
    TAOS_CHECK_RETURN(
27,323✔
3215
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
3216
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &pNewVgroup->vnodeGid[2]));
27,323✔
3217

3218
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
27,323✔
3219
  } else if (pNewDb->cfg.replications == 1) {
6,556✔
3220
    mInfo("trans:%d, db:%s, vgId:%d, will remove 2 vnodes, vn:0 dnode:%d vn:1 dnode:%d vn:2 dnode:%d", pTrans->id,
5,080✔
3221
          pVgroup->dbName, pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId, pVgroup->vnodeGid[1].dnodeId,
3222
          pVgroup->vnodeGid[2].dnodeId);
3223

3224
    SVnodeGid del1 = {0};
5,080✔
3225
    SVnodeGid del2 = {0};
5,080✔
3226
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroup(pMnode, pTrans, pNewVgroup, pArray, &del1));
5,080✔
3227
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &del1, true));
5,080✔
3228
    TAOS_CHECK_RETURN(
5,080✔
3229
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3230
    TAOS_CHECK_RETURN(
5,080✔
3231
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
3232
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
5,080✔
3233

3234
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroup(pMnode, pTrans, pNewVgroup, pArray, &del2));
5,080✔
3235
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &del2, true));
5,080✔
3236
    TAOS_CHECK_RETURN(
5,080✔
3237
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3238
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
5,080✔
3239
  } else if (pNewDb->cfg.replications == 2) {
1,476✔
3240
    mInfo("trans:%d, db:%s, vgId:%d, will add 1 vnode, vn:0 dnode:%d", pTrans->id, pVgroup->dbName, pVgroup->vgId,
1,476✔
3241
          pVgroup->vnodeGid[0].dnodeId);
3242

3243
    // add second
3244
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
1,476✔
3245

3246
    // learner stage
3247
    pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
1,476✔
3248
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
1,476✔
3249
    TAOS_CHECK_RETURN(
1,476✔
3250
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3251

3252
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &pNewVgroup->vnodeGid[1]));
1,476✔
3253

3254
    // follower stage
3255
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
1,476✔
3256
    TAOS_CHECK_RETURN(mndAddAlterVnodeTypeAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
1,476✔
3257
    TAOS_CHECK_RETURN(
1,476✔
3258
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3259

3260
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
1,476✔
3261
  } else {
3262
    return -1;
×
3263
  }
3264

3265
  mndSortVnodeGid(pNewVgroup);
33,879✔
3266

3267
  {
3268
    SSdbRaw *pVgRaw = mndVgroupActionEncode(pNewVgroup);
33,879✔
3269
    if (pVgRaw == NULL) {
33,879✔
3270
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3271
      if (terrno != 0) code = terrno;
×
3272
      TAOS_RETURN(code);
×
3273
    }
3274
    if ((code = mndTransAppendCommitlog(pTrans, pVgRaw)) != 0) {
33,879✔
3275
      sdbFreeRaw(pVgRaw);
×
3276
      TAOS_RETURN(code);
×
3277
    }
3278
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
33,879✔
3279
    if (code != 0) {
33,879✔
3280
      mError("vgId:%d, failed to set raw status since %s at line:%d", pNewVgroup->vgId, tstrerror(code), __LINE__);
×
3281
      TAOS_RETURN(code);
×
3282
    }
3283
  }
3284

3285
  TAOS_RETURN(code);
33,879✔
3286
}
3287

3288
int32_t mndBuildRaftAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
×
3289
                                      SArray *pArray) {
3290
  int32_t code = 0;
×
3291
  SVgObj  newVgroup = {0};
×
3292
  memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
×
3293

3294
  if (pVgroup->replica <= 0 || pVgroup->replica == pNewDb->cfg.replications) {
×
3295
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfigAction(pMnode, pTrans, pNewDb, pVgroup));
×
3296
    TAOS_CHECK_RETURN(mndCheckDnodeMemory(pMnode, pOldDb, pNewDb, &newVgroup, pVgroup, pArray));
×
3297
    return 0;
×
3298
  }
3299

3300
  mndTransSetSerial(pTrans);
×
3301

3302
  mInfo("trans:%d, vgId:%d, alter vgroup, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id, pVgroup->vgId,
×
3303
        pVgroup->syncConfChangeVer, pVgroup->version, pVgroup->replica);
3304

3305
  if (newVgroup.replica == 1 && pNewDb->cfg.replications == 3) {
×
3306
    mInfo("db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId,
×
3307
          pVgroup->vnodeGid[0].dnodeId);
3308

3309
    // add second
3310
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVgroup, pArray));
×
3311
    // add third
3312
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVgroup, pArray));
×
3313

3314
    // add learner stage
3315
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3316
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3317
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3318
    TAOS_CHECK_RETURN(
×
3319
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3320
    mInfo("trans:%d, vgId:%d, add change config, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id,
×
3321
          pVgroup->vgId, newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
3322
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[1]));
×
3323
    mInfo("trans:%d, vgId:%d, create vnode, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id, pVgroup->vgId,
×
3324
          newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
3325
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[2]));
×
3326
    mInfo("trans:%d, vgId:%d, create vnode, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id, pVgroup->vgId,
×
3327
          newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
3328

3329
    // check learner
3330
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3331
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3332
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3333
    TAOS_CHECK_RETURN(
×
3334
        mndAddCheckLearnerCatchupAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[1].dnodeId));
3335
    TAOS_CHECK_RETURN(
×
3336
        mndAddCheckLearnerCatchupAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[2].dnodeId));
3337

3338
    // change raft type
3339
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3340
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3341
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3342
    TAOS_CHECK_RETURN(
×
3343
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3344

3345
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3346

3347
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3348
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3349
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3350
    TAOS_CHECK_RETURN(
×
3351
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3352

3353
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3354

3355
    SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
×
3356
    if (pVgRaw == NULL) {
×
3357
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3358
      if (terrno != 0) code = terrno;
×
3359
      TAOS_RETURN(code);
×
3360
    }
3361
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
3362
      sdbFreeRaw(pVgRaw);
×
3363
      TAOS_RETURN(code);
×
3364
    }
3365
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3366
    if (code != 0) {
×
3367
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3368
             __LINE__);
3369
      TAOS_RETURN(code);
×
3370
    }
3371
  } else if (newVgroup.replica == 3 && pNewDb->cfg.replications == 1) {
×
3372
    mInfo("db:%s, vgId:%d, will remove 2 vnodes, vn:0 dnode:%d vn:1 dnode:%d vn:2 dnode:%d", pVgroup->dbName,
×
3373
          pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId, pVgroup->vnodeGid[1].dnodeId, pVgroup->vnodeGid[2].dnodeId);
3374

3375
    SVnodeGid del1 = {0};
×
3376
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroupWithoutSave(pMnode, pTrans, &newVgroup, pArray, &del1));
×
3377

3378
    TAOS_CHECK_RETURN(
×
3379
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3380

3381
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3382

3383
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del1, true));
×
3384

3385
    SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
×
3386
    if (pVgRaw == NULL) {
×
3387
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3388
      if (terrno != 0) code = terrno;
×
3389
      TAOS_RETURN(code);
×
3390
    }
3391
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
3392
      sdbFreeRaw(pVgRaw);
×
3393
      TAOS_RETURN(code);
×
3394
    }
3395
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3396
    if (code != 0) {
×
3397
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3398
             __LINE__);
3399
      TAOS_RETURN(code);
×
3400
    }
3401

3402
    SVnodeGid del2 = {0};
×
3403
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroupWithoutSave(pMnode, pTrans, &newVgroup, pArray, &del2));
×
3404

3405
    TAOS_CHECK_RETURN(
×
3406
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3407

3408
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3409

3410
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del2, true));
×
3411

3412
    pVgRaw = mndVgroupActionEncode(&newVgroup);
×
3413
    if (pVgRaw == NULL) {
×
3414
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3415
      if (terrno != 0) code = terrno;
×
3416
      TAOS_RETURN(code);
×
3417
    }
3418
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
3419
      sdbFreeRaw(pVgRaw);
×
3420
      TAOS_RETURN(code);
×
3421
    }
3422
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3423
    if (code != 0) {
×
3424
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3425
             __LINE__);
3426
      TAOS_RETURN(code);
×
3427
    }
3428
  } else {
3429
    return -1;
×
3430
  }
3431

3432
  mndSortVnodeGid(&newVgroup);
×
3433

3434
  {
3435
    SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
×
3436
    if (pVgRaw == NULL) {
×
3437
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3438
      if (terrno != 0) code = terrno;
×
3439
      TAOS_RETURN(code);
×
3440
    }
3441
    if ((code = mndTransAppendCommitlog(pTrans, pVgRaw)) != 0) {
×
3442
      sdbFreeRaw(pVgRaw);
×
3443
      TAOS_RETURN(code);
×
3444
    }
3445
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3446
    if (code != 0) {
×
3447
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3448
             __LINE__);
3449
      TAOS_RETURN(code);
×
3450
    }
3451
  }
3452

3453
  TAOS_RETURN(code);
×
3454
}
3455

3456
int32_t mndBuildRestoreAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *db, SVgObj *pVgroup, SDnodeObj *pDnode,
6,300✔
3457
                                         SDnodeObj *pAnotherDnode) {
3458
  int32_t code = 0;
6,300✔
3459
  SVgObj  newVgroup = {0};
6,300✔
3460
  memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
6,300✔
3461

3462
  mInfo("trans:%d, db:%s, vgId:%d, restore vnodes, vn:0 dnode:%d", pTrans->id, pVgroup->dbName, pVgroup->vgId,
6,300✔
3463
        pVgroup->vnodeGid[0].dnodeId);
3464

3465
  if (newVgroup.replica == 1) {
6,300✔
3466
    int selected = 0;
×
3467
    for (int i = 0; i < newVgroup.replica; i++) {
×
3468
      newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3469
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3470
        selected = i;
×
3471
      }
3472
    }
3473
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, &newVgroup.vnodeGid[selected]));
×
3474
  } else if (newVgroup.replica == 2) {
6,300✔
3475
    for (int i = 0; i < newVgroup.replica; i++) {
×
3476
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3477
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3478
      } else {
3479
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3480
      }
3481
    }
3482
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pAnotherDnode));
×
3483

3484
    for (int i = 0; i < newVgroup.replica; i++) {
×
3485
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3486
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3487
      } else {
3488
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3489
      }
3490
    }
3491
    TAOS_CHECK_RETURN(mndRestoreAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, pDnode));
×
3492

3493
    for (int i = 0; i < newVgroup.replica; i++) {
×
3494
      newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3495
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3496
      }
3497
    }
3498
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pDnode));
×
3499
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pAnotherDnode));
×
3500
  } else if (newVgroup.replica == 3) {
6,300✔
3501
    for (int i = 0; i < newVgroup.replica; i++) {
25,200✔
3502
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
18,900✔
3503
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_LEARNER;
6,300✔
3504
      } else {
3505
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
12,600✔
3506
      }
3507
    }
3508
    TAOS_CHECK_RETURN(mndRestoreAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, pDnode));
6,300✔
3509

3510
    for (int i = 0; i < newVgroup.replica; i++) {
25,200✔
3511
      newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
18,900✔
3512
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
18,900✔
3513
      }
3514
    }
3515
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pDnode));
6,300✔
3516
  }
3517
  SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
6,300✔
3518
  if (pVgRaw == NULL) {
6,300✔
3519
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3520
    if (terrno != 0) code = terrno;
×
3521
    TAOS_RETURN(code);
×
3522
  }
3523
  if ((code = mndTransAppendCommitlog(pTrans, pVgRaw)) != 0) {
6,300✔
3524
    sdbFreeRaw(pVgRaw);
×
3525
    TAOS_RETURN(code);
×
3526
  }
3527
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
6,300✔
3528
  if (code != 0) {
6,300✔
3529
    mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code), __LINE__);
×
3530
    TAOS_RETURN(code);
×
3531
  }
3532

3533
  TAOS_RETURN(code);
6,300✔
3534
}
3535

3536
static int32_t mndAddAdjustVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
×
3537
  return 0;
×
3538
}
3539

3540
typedef int32_t (*FpTransActionCb)(STrans *pTrans, SSdbRaw *pRaw);
3541

3542
static int32_t mndAddVgStatusAction(STrans *pTrans, SVgObj *pVg, ESdbStatus vgStatus, ETrnStage stage) {
58,869✔
3543
  int32_t         code = 0;
58,869✔
3544
  FpTransActionCb appendActionCb = (stage == TRN_STAGE_COMMIT_ACTION) ? mndTransAppendCommitlog : mndTransAppendRedolog;
58,869✔
3545
  SSdbRaw        *pRaw = mndVgroupActionEncode(pVg);
58,869✔
3546
  if (pRaw == NULL) {
58,869✔
3547
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3548
    if (terrno != 0) code = terrno;
×
3549
    goto _err;
×
3550
  }
3551
  if ((code = appendActionCb(pTrans, pRaw)) != 0) goto _err;
58,869✔
3552
  code = sdbSetRawStatus(pRaw, vgStatus);
58,869✔
3553
  if (code != 0) {
58,869✔
3554
    mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", pVg->vgId, tstrerror(code), __LINE__);
×
3555
    goto _err;
×
3556
  }
3557
  pRaw = NULL;
58,869✔
3558
  TAOS_RETURN(code);
58,869✔
3559
_err:
×
3560
  sdbFreeRaw(pRaw);
×
3561
  TAOS_RETURN(code);
×
3562
}
3563

3564
static int32_t mndAddDbStatusAction(STrans *pTrans, SDbObj *pDb, ESdbStatus dbStatus, ETrnStage stage) {
23,237✔
3565
  int32_t         code = 0;
23,237✔
3566
  FpTransActionCb appendActionCb = (stage == TRN_STAGE_COMMIT_ACTION) ? mndTransAppendCommitlog : mndTransAppendRedolog;
23,237✔
3567
  SSdbRaw        *pRaw = mndDbActionEncode(pDb);
23,237✔
3568
  if (pRaw == NULL) {
23,237✔
3569
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3570
    if (terrno != 0) code = terrno;
×
3571
    goto _err;
×
3572
  }
3573
  if ((code = appendActionCb(pTrans, pRaw)) != 0) goto _err;
23,237✔
3574
  code = sdbSetRawStatus(pRaw, dbStatus);
23,237✔
3575
  if (code != 0) {
23,237✔
3576
    mError("db:%s, failed to set raw status to ready, error:%s, line:%d", pDb->name, tstrerror(code), __LINE__);
×
3577
    goto _err;
×
3578
  }
3579
  pRaw = NULL;
23,237✔
3580
  TAOS_RETURN(code);
23,237✔
3581
_err:
×
3582
  sdbFreeRaw(pRaw);
×
3583
  TAOS_RETURN(code);
×
3584
}
3585

3586
int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) {
14,194✔
3587
  int32_t code = -1;
14,194✔
3588
  STrans *pTrans = NULL;
14,194✔
3589
  SDbObj  dbObj = {0};
14,194✔
3590
  SArray *pArray = mndBuildDnodesArray(pMnode, 0, NULL);
14,194✔
3591

3592
#if defined(USE_SHARED_STORAGE)
3593
  if (tsSsEnabled) {
14,194✔
3594
    code = TSDB_CODE_OPS_NOT_SUPPORT;
×
3595
    mError("vgId:%d, db:%s, shared storage exists, split vgroup not allowed", pVgroup->vgId, pVgroup->dbName);
×
3596
    goto _OVER;
×
3597
  }
3598
#endif
3599

3600
  /*
3601
    if (pDb->cfg.withArbitrator) {
3602
      code = TSDB_CODE_OPS_NOT_SUPPORT;
3603
      mError("vgId:%d, db:%s, with arbitrator, split vgroup not allowed", pVgroup->vgId, pVgroup->dbName);
3604
      goto _OVER;
3605
    }
3606
  */
3607

3608
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "split-vgroup");
14,194✔
3609
  if (pTrans == NULL) {
14,194✔
3610
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3611
    if (terrno != 0) code = terrno;
×
3612
    goto _OVER;
×
3613
  }
3614
  mndTransSetSerial(pTrans);
14,194✔
3615
  mInfo("trans:%d, used to split vgroup, vgId:%d", pTrans->id, pVgroup->vgId);
14,194✔
3616

3617
  mndTransSetDbName(pTrans, pDb->name, NULL);
14,194✔
3618
  TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
14,194✔
3619
  TAOS_CHECK_GOTO(mndTransCheckConflictWithRetention(pMnode, pTrans), NULL, _OVER);
11,732✔
3620

3621
  SVgObj newVg1 = {0};
11,732✔
3622
  memcpy(&newVg1, pVgroup, sizeof(SVgObj));
11,732✔
3623
  mInfo("vgId:%d, vgroup info before split, replica:%d hashBegin:%u hashEnd:%u", newVg1.vgId, newVg1.replica,
11,732✔
3624
        newVg1.hashBegin, newVg1.hashEnd);
3625
  for (int32_t i = 0; i < newVg1.replica; ++i) {
34,022✔
3626
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg1.vgId, i, newVg1.vnodeGid[i].dnodeId);
22,290✔
3627
  }
3628

3629
  if (newVg1.replica == 1) {
11,732✔
3630
    TAOS_CHECK_GOTO(mndAddVnodeToVgroup(pMnode, pTrans, &newVg1, pArray), NULL, _OVER);
6,084✔
3631

3632
    newVg1.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
6,084✔
3633
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId), NULL,
6,084✔
3634
                    _OVER);
3635
    TAOS_CHECK_GOTO(mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg1, &newVg1.vnodeGid[1]), NULL, _OVER);
6,084✔
3636

3637
    newVg1.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
6,084✔
3638
    TAOS_CHECK_GOTO(mndAddAlterVnodeTypeAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[1].dnodeId), NULL, _OVER);
6,084✔
3639
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId), NULL,
6,084✔
3640
                    _OVER);
3641

3642
    TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1), NULL, _OVER);
6,084✔
3643
  } else if (newVg1.replica == 3) {
5,648✔
3644
    SVnodeGid del1 = {0};
4,910✔
3645
    TAOS_CHECK_GOTO(mndRemoveVnodeFromVgroup(pMnode, pTrans, &newVg1, pArray, &del1), NULL, _OVER);
4,910✔
3646
    TAOS_CHECK_GOTO(mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg1, &del1, true), NULL, _OVER);
4,910✔
3647
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId), NULL,
4,910✔
3648
                    _OVER);
3649
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[1].dnodeId), NULL,
4,910✔
3650
                    _OVER);
3651
  } else {
3652
    // goto _OVER;
3653
  }
3654

3655
  for (int32_t i = 0; i < newVg1.replica; ++i) {
35,196✔
3656
    TAOS_CHECK_GOTO(mndAddDisableVnodeWriteAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[i].dnodeId), NULL,
23,464✔
3657
                    _OVER);
3658
  }
3659
  TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1), NULL, _OVER);
11,732✔
3660

3661
  SVgObj newVg2 = {0};
11,732✔
3662
  memcpy(&newVg2, &newVg1, sizeof(SVgObj));
11,732✔
3663
  newVg1.replica = 1;
11,732✔
3664
  newVg1.hashEnd = newVg1.hashBegin / 2 + newVg1.hashEnd / 2;
11,732✔
3665
  memset(&newVg1.vnodeGid[1], 0, sizeof(SVnodeGid));
11,732✔
3666

3667
  newVg2.replica = 1;
11,732✔
3668
  newVg2.hashBegin = newVg1.hashEnd + 1;
11,732✔
3669
  memcpy(&newVg2.vnodeGid[0], &newVg2.vnodeGid[1], sizeof(SVnodeGid));
11,732✔
3670
  memset(&newVg2.vnodeGid[1], 0, sizeof(SVnodeGid));
11,732✔
3671

3672
  mInfo("vgId:%d, vgroup info after split, replica:%d hashrange:[%u, %u] vnode:0 dnode:%d", newVg1.vgId, newVg1.replica,
11,732✔
3673
        newVg1.hashBegin, newVg1.hashEnd, newVg1.vnodeGid[0].dnodeId);
3674
  for (int32_t i = 0; i < newVg1.replica; ++i) {
23,464✔
3675
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg1.vgId, i, newVg1.vnodeGid[i].dnodeId);
11,732✔
3676
  }
3677
  mInfo("vgId:%d, vgroup info after split, replica:%d hashrange:[%u, %u] vnode:0 dnode:%d", newVg2.vgId, newVg2.replica,
11,732✔
3678
        newVg2.hashBegin, newVg2.hashEnd, newVg2.vnodeGid[0].dnodeId);
3679
  for (int32_t i = 0; i < newVg1.replica; ++i) {
23,464✔
3680
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg2.vgId, i, newVg2.vnodeGid[i].dnodeId);
11,732✔
3681
  }
3682

3683
  // alter vgId and hash range
3684
  int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
11,732✔
3685
  int32_t srcVgId = newVg1.vgId;
11,732✔
3686
  newVg1.vgId = maxVgId;
11,732✔
3687
  TAOS_CHECK_GOTO(mndAddNewVgPrepareAction(pMnode, pTrans, &newVg1), NULL, _OVER);
11,732✔
3688
  TAOS_CHECK_GOTO(mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg1), NULL, _OVER);
11,732✔
3689

3690
  maxVgId++;
11,732✔
3691
  srcVgId = newVg2.vgId;
11,732✔
3692
  newVg2.vgId = maxVgId;
11,732✔
3693
  TAOS_CHECK_GOTO(mndAddNewVgPrepareAction(pMnode, pTrans, &newVg2), NULL, _OVER);
11,732✔
3694
  TAOS_CHECK_GOTO(mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg2), NULL, _OVER);
11,732✔
3695

3696
  TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1), NULL, _OVER);
11,732✔
3697
  TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg2), NULL, _OVER);
11,732✔
3698

3699
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg1, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION), NULL, _OVER);
11,732✔
3700
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg2, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION), NULL, _OVER);
11,732✔
3701
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, pVgroup, SDB_STATUS_DROPPED, TRN_STAGE_REDO_ACTION), NULL, _OVER);
11,732✔
3702

3703
  // update db status
3704
  memcpy(&dbObj, pDb, sizeof(SDbObj));
11,732✔
3705
  if (dbObj.cfg.pRetensions != NULL) {
11,732✔
3706
    dbObj.cfg.pRetensions = taosArrayDup(pDb->cfg.pRetensions, NULL);
×
3707
    if (dbObj.cfg.pRetensions == NULL) {
×
3708
      code = terrno;
×
3709
      goto _OVER;
×
3710
    }
3711
  }
3712
  dbObj.vgVersion++;
11,732✔
3713
  dbObj.updateTime = taosGetTimestampMs();
11,732✔
3714
  dbObj.cfg.numOfVgroups++;
11,732✔
3715
  TAOS_CHECK_GOTO(mndAddDbStatusAction(pTrans, &dbObj, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION), NULL, _OVER);
11,732✔
3716

3717
  // adjust vgroup replica
3718
  if (pDb->cfg.replications != newVg1.replica) {
11,732✔
3719
    SVgObj tmpGroup = {0};
5,648✔
3720
    TAOS_CHECK_GOTO(mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg1, pArray, &tmpGroup), NULL, _OVER);
5,648✔
3721
  } else {
3722
    TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg1, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
6,084✔
3723
  }
3724

3725
  if (pDb->cfg.replications != newVg2.replica) {
11,648✔
3726
    SVgObj tmpGroup = {0};
5,564✔
3727
    TAOS_CHECK_GOTO(mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg2, pArray, &tmpGroup), NULL, _OVER);
5,564✔
3728
  } else {
3729
    TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg2, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
6,084✔
3730
  }
3731

3732
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, pVgroup, SDB_STATUS_DROPPED, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
11,505✔
3733

3734
  // commit db status
3735
  dbObj.vgVersion++;
11,505✔
3736
  dbObj.updateTime = taosGetTimestampMs();
11,505✔
3737
  TAOS_CHECK_GOTO(mndAddDbStatusAction(pTrans, &dbObj, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
11,505✔
3738

3739
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
11,505✔
3740
  code = 0;
11,505✔
3741

3742
_OVER:
14,194✔
3743
  taosArrayDestroy(pArray);
14,194✔
3744
  mndTransDrop(pTrans);
14,194✔
3745
  taosArrayDestroy(dbObj.cfg.pRetensions);
14,194✔
3746
  TAOS_RETURN(code);
14,194✔
3747
}
3748

3749
extern int32_t mndProcessSplitVgroupMsgImp(SRpcMsg *pReq);
3750

3751
static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) { return mndProcessSplitVgroupMsgImp(pReq); }
14,962✔
3752

3753
#ifndef TD_ENTERPRISE
3754
int32_t mndProcessSplitVgroupMsgImp(SRpcMsg *pReq) { return 0; }
3755
#endif
3756

3757
static int32_t mndSetBalanceVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
15,394✔
3758
                                              SDnodeObj *pSrc, SDnodeObj *pDst) {
3759
  int32_t code = 0;
15,394✔
3760
  SVgObj  newVg = {0};
15,394✔
3761
  memcpy(&newVg, pVgroup, sizeof(SVgObj));
15,394✔
3762
  mInfo("vgId:%d, vgroup info before balance, replica:%d", newVg.vgId, newVg.replica);
15,394✔
3763
  for (int32_t i = 0; i < newVg.replica; ++i) {
45,368✔
3764
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
29,974✔
3765
  }
3766

3767
  TAOS_CHECK_RETURN(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pDst->id));
15,394✔
3768
  TAOS_CHECK_RETURN(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pSrc->id));
15,394✔
3769

3770
  {
3771
    SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
15,394✔
3772
    if (pRaw == NULL) {
15,394✔
3773
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3774
      if (terrno != 0) code = terrno;
×
3775
      TAOS_RETURN(code);
×
3776
    }
3777
    if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
15,394✔
3778
      sdbFreeRaw(pRaw);
×
3779
      TAOS_RETURN(code);
×
3780
    }
3781
    code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
15,394✔
3782
    if (code != 0) {
15,394✔
3783
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
3784
      TAOS_RETURN(code);
×
3785
    }
3786
  }
3787

3788
  mInfo("vgId:%d, vgroup info after balance, replica:%d", newVg.vgId, newVg.replica);
15,394✔
3789
  for (int32_t i = 0; i < newVg.replica; ++i) {
45,368✔
3790
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
29,974✔
3791
  }
3792
  TAOS_RETURN(code);
15,394✔
3793
}
3794

3795
static int32_t mndBalanceVgroupBetweenDnode(SMnode *pMnode, STrans *pTrans, SDnodeObj *pSrc, SDnodeObj *pDst,
15,394✔
3796
                                            SHashObj *pBalancedVgroups) {
3797
  void   *pIter = NULL;
15,394✔
3798
  int32_t code = -1;
15,394✔
3799
  SSdb   *pSdb = pMnode->pSdb;
15,394✔
3800

3801
  while (1) {
9,490✔
3802
    SVgObj *pVgroup = NULL;
24,884✔
3803
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
24,884✔
3804
    if (pIter == NULL) break;
24,884✔
3805
    if (taosHashGet(pBalancedVgroups, &pVgroup->vgId, sizeof(int32_t)) != NULL) {
24,884✔
3806
      sdbRelease(pSdb, pVgroup);
8,755✔
3807
      continue;
8,755✔
3808
    }
3809

3810
    bool existInSrc = false;
16,129✔
3811
    bool existInDst = false;
16,129✔
3812
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
46,838✔
3813
      SVnodeGid *pGid = &pVgroup->vnodeGid[i];
30,709✔
3814
      if (pGid->dnodeId == pSrc->id) existInSrc = true;
30,709✔
3815
      if (pGid->dnodeId == pDst->id) existInDst = true;
30,709✔
3816
    }
3817

3818
    if (!existInSrc || existInDst) {
16,129✔
3819
      sdbRelease(pSdb, pVgroup);
735✔
3820
      continue;
735✔
3821
    }
3822

3823
    SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
15,394✔
3824
    if (pDb == NULL) {
15,394✔
3825
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3826
      if (terrno != 0) code = terrno;
×
3827
      mError("vgId:%d, balance vgroup can't find db obj dbName:%s", pVgroup->vgId, pVgroup->dbName);
×
3828
      goto _OUT;
×
3829
    }
3830

3831
    if (pDb->cfg.withArbitrator) {
15,394✔
3832
      mInfo("vgId:%d, db:%s, with arbitrator, balance vgroup not allowed", pVgroup->vgId, pVgroup->dbName);
×
3833
      goto _OUT;
×
3834
    }
3835

3836
    code = mndSetBalanceVgroupInfoToTrans(pMnode, pTrans, pDb, pVgroup, pSrc, pDst);
15,394✔
3837
    if (code == 0) {
15,394✔
3838
      code = taosHashPut(pBalancedVgroups, &pVgroup->vgId, sizeof(int32_t), &pVgroup->vgId, sizeof(int32_t));
15,394✔
3839
    }
3840

3841
  _OUT:
15,394✔
3842
    mndReleaseDb(pMnode, pDb);
15,394✔
3843
    sdbRelease(pSdb, pVgroup);
15,394✔
3844
    sdbCancelFetch(pSdb, pIter);
15,394✔
3845
    break;
15,394✔
3846
  }
3847

3848
  return code;
15,394✔
3849
}
3850

3851
static int32_t mndBalanceVgroup(SMnode *pMnode, SRpcMsg *pReq, SArray *pArray) {
12,709✔
3852
  int32_t   code = -1;
12,709✔
3853
  int32_t   numOfVgroups = 0;
12,709✔
3854
  STrans   *pTrans = NULL;
12,709✔
3855
  SHashObj *pBalancedVgroups = NULL;
12,709✔
3856

3857
  pBalancedVgroups = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
12,709✔
3858
  if (pBalancedVgroups == NULL) goto _OVER;
12,709✔
3859

3860
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "balance-vgroup");
12,709✔
3861
  if (pTrans == NULL) {
12,709✔
3862
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3863
    if (terrno != 0) code = terrno;
×
3864
    goto _OVER;
×
3865
  }
3866
  mndTransSetSerial(pTrans);
12,709✔
3867
  mInfo("trans:%d, used to balance vgroup", pTrans->id);
12,709✔
3868
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
12,709✔
3869
  TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
9,839✔
3870
  TAOS_CHECK_GOTO(mndTransCheckConflictWithRetention(pMnode, pTrans), NULL, _OVER);
9,552✔
3871

3872
  while (1) {
15,394✔
3873
    taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
24,946✔
3874
    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
107,712✔
3875
      SDnodeObj *pDnode = taosArrayGet(pArray, i);
82,766✔
3876
      mInfo("dnode:%d, equivalent vnodes:%d others:%d support:%d, score:%f", pDnode->id, pDnode->numOfVnodes,
82,766✔
3877
            pDnode->numOfSupportVnodes, pDnode->numOfOtherNodes, mndGetDnodeScore(pDnode, 0, 1));
3878
    }
3879

3880
    SDnodeObj *pSrc = taosArrayGet(pArray, taosArrayGetSize(pArray) - 1);
24,946✔
3881
    SDnodeObj *pDst = taosArrayGet(pArray, 0);
24,946✔
3882

3883
    float srcScore = mndGetDnodeScore(pSrc, -1, 1);
24,946✔
3884
    float dstScore = mndGetDnodeScore(pDst, 1, 1);
24,946✔
3885
    mInfo("trans:%d, after balance, src dnode:%d score:%f, dst dnode:%d score:%f", pTrans->id, pSrc->id, dstScore,
24,946✔
3886
          pDst->id, dstScore);
3887

3888
    if (srcScore > dstScore - 0.000001) {
24,946✔
3889
      code = mndBalanceVgroupBetweenDnode(pMnode, pTrans, pSrc, pDst, pBalancedVgroups);
15,394✔
3890
      if (code == 0) {
15,394✔
3891
        pSrc->numOfVnodes--;
15,394✔
3892
        pDst->numOfVnodes++;
15,394✔
3893
        numOfVgroups++;
15,394✔
3894
        continue;
15,394✔
3895
      } else {
3896
        mInfo("trans:%d, no vgroup need to balance from dnode:%d to dnode:%d", pTrans->id, pSrc->id, pDst->id);
×
3897
        break;
×
3898
      }
3899
    } else {
3900
      mInfo("trans:%d, no vgroup need to balance any more", pTrans->id);
9,552✔
3901
      break;
9,552✔
3902
    }
3903
  }
3904

3905
  if (numOfVgroups <= 0) {
9,552✔
3906
    mInfo("no need to balance vgroup");
×
3907
    code = 0;
×
3908
  } else {
3909
    mInfo("start to balance vgroup, numOfVgroups:%d", numOfVgroups);
9,552✔
3910
    if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
9,552✔
3911
    code = TSDB_CODE_ACTION_IN_PROGRESS;
9,552✔
3912
  }
3913

3914
_OVER:
12,709✔
3915
  taosHashCleanup(pBalancedVgroups);
12,709✔
3916
  mndTransDrop(pTrans);
12,709✔
3917
  TAOS_RETURN(code);
12,709✔
3918
}
3919

3920
static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) {
14,572✔
3921
  SMnode *pMnode = pReq->info.node;
14,572✔
3922
  int32_t code = -1;
14,572✔
3923
  SArray *pArray = NULL;
14,572✔
3924
  void   *pIter = NULL;
14,572✔
3925
  int64_t curMs = taosGetTimestampMs();
14,572✔
3926

3927
  SBalanceVgroupReq req = {0};
14,572✔
3928
  if (tDeserializeSBalanceVgroupReq(pReq->pCont, pReq->contLen, &req) != 0) {
14,572✔
3929
    code = TSDB_CODE_INVALID_MSG;
×
3930
    goto _OVER;
×
3931
  }
3932

3933
  mInfo("start to balance vgroup");
14,572✔
3934
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_BALANCE_VGROUP)) != 0) {
14,572✔
3935
    goto _OVER;
359✔
3936
  }
3937

3938
  if (sdbGetSize(pMnode->pSdb, SDB_MOUNT) > 0) {
14,213✔
3939
    code = TSDB_CODE_MND_MOUNT_NOT_EMPTY;
×
3940
    goto _OVER;
×
3941
  }
3942

3943
  while (1) {
43,300✔
3944
    SDnodeObj *pDnode = NULL;
57,513✔
3945
    pIter = sdbFetch(pMnode->pSdb, SDB_DNODE, pIter, (void **)&pDnode);
57,513✔
3946
    if (pIter == NULL) break;
57,513✔
3947
    if (!mndIsDnodeOnline(pDnode, curMs)) {
44,804✔
3948
      sdbCancelFetch(pMnode->pSdb, pIter);
1,504✔
3949
      code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
1,504✔
3950
      mError("failed to balance vgroup since %s, dnode:%d", terrstr(), pDnode->id);
1,504✔
3951
      sdbRelease(pMnode->pSdb, pDnode);
1,504✔
3952
      goto _OVER;
1,504✔
3953
    }
3954

3955
    sdbRelease(pMnode->pSdb, pDnode);
43,300✔
3956
  }
3957

3958
  pArray = mndBuildDnodesArray(pMnode, 0, NULL);
12,709✔
3959
  if (pArray == NULL) {
12,709✔
3960
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3961
    if (terrno != 0) code = terrno;
×
3962
    goto _OVER;
×
3963
  }
3964

3965
  if (taosArrayGetSize(pArray) < 2) {
12,709✔
3966
    mInfo("no need to balance vgroup since dnode num less than 2");
×
3967
    code = 0;
×
3968
  } else {
3969
    code = mndBalanceVgroup(pMnode, pReq, pArray);
12,709✔
3970
  }
3971

3972
  auditRecord(pReq, pMnode->clusterId, "balanceVgroup", "", "", req.sql, req.sqlLen);
12,709✔
3973

3974
_OVER:
14,572✔
3975
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
14,572✔
3976
    mError("failed to balance vgroup since %s", tstrerror(code));
5,020✔
3977
  }
3978

3979
  taosArrayDestroy(pArray);
14,572✔
3980
  tFreeSBalanceVgroupReq(&req);
14,572✔
3981
  TAOS_RETURN(code);
14,572✔
3982
}
3983

3984
bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; }
92,832,558✔
3985

3986
bool mndVgroupInDnode(SVgObj *pVgroup, int32_t dnodeId) {
10,080✔
3987
  for (int i = 0; i < pVgroup->replica; i++) {
25,196✔
3988
    if (pVgroup->vnodeGid[i].dnodeId == dnodeId) return true;
21,416✔
3989
  }
3990
  return false;
3,780✔
3991
}
3992

3993
static void *mndBuildCompactVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen, int64_t compactTs,
131,365✔
3994
                                     STimeWindow tw, bool metaOnly, bool force, ETsdbOpType type,
3995
                                     ETriggerType triggerType) {
3996
  SCompactVnodeReq compactReq = {0};
131,365✔
3997
  compactReq.dbUid = pDb->uid;
131,365✔
3998
  compactReq.compactStartTime = compactTs;
131,365✔
3999
  compactReq.tw = tw;
131,365✔
4000
  compactReq.metaOnly = metaOnly;
131,365✔
4001
  compactReq.force = force;
131,365✔
4002
  compactReq.optrType = type;
131,365✔
4003
  compactReq.triggerType = triggerType;
131,365✔
4004
  tstrncpy(compactReq.db, pDb->name, TSDB_DB_FNAME_LEN);
131,365✔
4005

4006
  mInfo("vgId:%d, build compact vnode config req", pVgroup->vgId);
131,365✔
4007
  int32_t contLen = tSerializeSCompactVnodeReq(NULL, 0, &compactReq);
131,365✔
4008
  if (contLen < 0) {
131,365✔
4009
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
4010
    return NULL;
×
4011
  }
4012
  contLen += sizeof(SMsgHead);
131,365✔
4013

4014
  void *pReq = taosMemoryMalloc(contLen);
131,365✔
4015
  if (pReq == NULL) {
131,365✔
4016
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
4017
    return NULL;
×
4018
  }
4019

4020
  SMsgHead *pHead = pReq;
131,365✔
4021
  pHead->contLen = htonl(contLen);
131,365✔
4022
  pHead->vgId = htonl(pVgroup->vgId);
131,365✔
4023

4024
  if (tSerializeSCompactVnodeReq((char *)pReq + sizeof(SMsgHead), contLen, &compactReq) < 0) {
131,365✔
4025
    taosMemoryFree(pReq);
×
4026
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
4027
    return NULL;
×
4028
  }
4029
  *pContLen = contLen;
131,365✔
4030
  return pReq;
131,365✔
4031
}
4032

4033
static int32_t mndAddCompactVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs,
64,317✔
4034
                                        STimeWindow tw, bool metaOnly, bool force, ETsdbOpType type,
4035
                                        ETriggerType triggerType) {
4036
  int32_t      code = 0;
64,317✔
4037
  STransAction action = {0};
64,317✔
4038
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
64,317✔
4039

4040
  int32_t contLen = 0;
64,317✔
4041
  void   *pReq =
4042
      mndBuildCompactVnodeReq(pMnode, pDb, pVgroup, &contLen, compactTs, tw, metaOnly, force, type, triggerType);
64,317✔
4043
  if (pReq == NULL) {
64,317✔
4044
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
4045
    if (terrno != 0) code = terrno;
×
4046
    TAOS_RETURN(code);
×
4047
  }
4048

4049
  action.pCont = pReq;
64,317✔
4050
  action.contLen = contLen;
64,317✔
4051
  action.msgType = TDMT_VND_COMPACT;
64,317✔
4052

4053
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
64,317✔
4054
    taosMemoryFree(pReq);
×
4055
    TAOS_RETURN(code);
×
4056
  }
4057

4058
  TAOS_RETURN(code);
64,317✔
4059
}
4060

4061
int32_t mndBuildCompactVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs,
64,317✔
4062
                                    STimeWindow tw, bool metaOnly, bool force, ETsdbOpType type,
4063
                                    ETriggerType triggerType) {
4064
  TAOS_CHECK_RETURN(
64,317✔
4065
      mndAddCompactVnodeAction(pMnode, pTrans, pDb, pVgroup, compactTs, tw, metaOnly, force, type, triggerType));
4066
  return 0;
64,317✔
4067
}
4068

4069
int32_t mndBuildTrimVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t startTs,
67,048✔
4070
                                 STimeWindow tw, ETsdbOpType type, ETriggerType triggerType) {
4071
  int32_t      code = 0;
67,048✔
4072
  STransAction action = {0};
67,048✔
4073
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
67,048✔
4074

4075
  int32_t contLen = 0;
67,048✔
4076
  // reuse SCompactVnodeReq as SVTrimDbReq
4077
  void *pReq = mndBuildCompactVnodeReq(pMnode, pDb, pVgroup, &contLen, startTs, tw, false, false, type, triggerType);
67,048✔
4078
  if (pReq == NULL) {
67,048✔
4079
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
4080
    if (terrno != 0) code = terrno;
×
4081
    TAOS_RETURN(code);
×
4082
  }
4083

4084
  action.pCont = pReq;
67,048✔
4085
  action.contLen = contLen;
67,048✔
4086
  action.msgType = TDMT_VND_TRIM;
67,048✔
4087

4088
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
67,048✔
4089
    taosMemoryFree(pReq);
×
4090
    TAOS_RETURN(code);
×
4091
  }
4092

4093
  TAOS_RETURN(code);
67,048✔
4094
}
4095

4096
static int32_t mndProcessSetVgroupKeepVersionReq(SRpcMsg *pReq) {
1,198✔
4097
  SMnode *pMnode = pReq->info.node;
1,198✔
4098
  int32_t code = TSDB_CODE_SUCCESS;
1,198✔
4099
  STrans *pTrans = NULL;
1,198✔
4100
  SVgObj *pVgroup = NULL;
1,198✔
4101

4102
  SMndSetVgroupKeepVersionReq req = {0};
1,198✔
4103
  if (tDeserializeSMndSetVgroupKeepVersionReq(pReq->pCont, pReq->contLen, &req) != 0) {
1,198✔
4104
    code = TSDB_CODE_INVALID_MSG;
×
4105
    goto _OVER;
×
4106
  }
4107

4108
  mInfo("start to set vgroup keep version, vgId:%d, keepVersion:%" PRId64, req.vgId, req.keepVersion);
1,198✔
4109

4110
  // Check permission
4111
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB)) != 0) {
1,198✔
4112
    goto _OVER;
×
4113
  }
4114

4115
  // Get vgroup
4116
  pVgroup = mndAcquireVgroup(pMnode, req.vgId);
1,198✔
4117
  if (pVgroup == NULL) {
1,198✔
4118
    code = TSDB_CODE_MND_VGROUP_NOT_EXIST;
×
4119
    mError("vgId:%d not exist, failed to set keep version", req.vgId);
×
4120
    goto _OVER;
×
4121
  }
4122

4123
  // Create transaction
4124
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "set-vgroup-keep-version");
1,198✔
4125
  if (pTrans == NULL) {
1,198✔
4126
    code = terrno != 0 ? terrno : TSDB_CODE_MND_RETURN_VALUE_NULL;
×
4127
    mndReleaseVgroup(pMnode, pVgroup);
×
4128
    goto _OVER;
×
4129
  }
4130

4131
  mndTransSetSerial(pTrans);
1,198✔
4132
  mInfo("trans:%d, used to set vgroup keep version, vgId:%d keepVersion:%" PRId64, pTrans->id, req.vgId,
1,198✔
4133
        req.keepVersion);
4134

4135
  // Update SVgObj's keepVersion in mnode
4136
  SVgObj newVgroup = {0};
1,198✔
4137
  memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
1,198✔
4138
  newVgroup.keepVersion = req.keepVersion;
1,198✔
4139
  newVgroup.keepVersionTime = taosGetTimestampMs();
1,198✔
4140

4141
  // Add prepare log for SDB vgroup update (execute in PREPARE stage, before redo actions)
4142
  SSdbRaw *pCommitRaw = mndVgroupActionEncode(&newVgroup);
1,198✔
4143
  if (pCommitRaw == NULL) {
1,198✔
4144
    code = TSDB_CODE_OUT_OF_MEMORY;
×
4145
    mndReleaseVgroup(pMnode, pVgroup);
×
4146
    goto _OVER;
×
4147
  }
4148
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
1,198✔
4149
    code = terrno;
×
4150
    sdbFreeRaw(pCommitRaw);
×
4151
    mndReleaseVgroup(pMnode, pVgroup);
×
4152
    goto _OVER;
×
4153
  }
4154
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
1,198✔
4155
    mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", pVgroup->vgId, tstrerror(code), __LINE__);
×
4156
    sdbFreeRaw(pCommitRaw);
×
4157
    mndReleaseVgroup(pMnode, pVgroup);
×
4158
    goto _OVER;
×
4159
  }
4160

4161
  // Prepare message for vnodes
4162
  SVndSetKeepVersionReq vndReq = {.keepVersion = req.keepVersion};
1,198✔
4163
  int32_t               reqLen = tSerializeSVndSetKeepVersionReq(NULL, 0, &vndReq);
1,198✔
4164
  int32_t               contLen = reqLen + sizeof(SMsgHead);
1,198✔
4165

4166
  // Send to all replicas of the vgroup
4167
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
4,792✔
4168
    SMsgHead *pHead = taosMemoryMalloc(contLen);
3,594✔
4169
    if (pHead == NULL) {
3,594✔
4170
      code = TSDB_CODE_OUT_OF_MEMORY;
×
4171
      mndReleaseVgroup(pMnode, pVgroup);
×
4172
      goto _OVER;
×
4173
    }
4174

4175
    pHead->contLen = htonl(contLen);
3,594✔
4176
    pHead->vgId = htonl(pVgroup->vgId);
3,594✔
4177

4178
    if (tSerializeSVndSetKeepVersionReq((char *)pHead + sizeof(SMsgHead), reqLen, &vndReq) < 0) {
3,594✔
4179
      taosMemoryFree(pHead);
×
4180
      code = TSDB_CODE_OUT_OF_MEMORY;
×
4181
      mndReleaseVgroup(pMnode, pVgroup);
×
4182
      goto _OVER;
×
4183
    }
4184

4185
    // Get dnode and add action to transaction
4186
    SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgroup->vnodeGid[i].dnodeId);
3,594✔
4187
    if (pDnode == NULL) {
3,594✔
4188
      taosMemoryFree(pHead);
×
4189
      code = TSDB_CODE_MND_DNODE_NOT_EXIST;
×
4190
      mndReleaseVgroup(pMnode, pVgroup);
×
4191
      goto _OVER;
×
4192
    }
4193

4194
    STransAction action = {0};
3,594✔
4195
    action.epSet = mndGetDnodeEpset(pDnode);
3,594✔
4196
    mndReleaseDnode(pMnode, pDnode);
3,594✔
4197
    action.pCont = pHead;
3,594✔
4198
    action.contLen = contLen;
3,594✔
4199
    action.msgType = TDMT_VND_SET_KEEP_VERSION;
3,594✔
4200
    action.acceptableCode = TSDB_CODE_VND_STOPPED;
3,594✔
4201

4202
    if (mndTransAppendRedoAction(pTrans, &action) != 0) {
3,594✔
4203
      taosMemoryFree(pHead);
×
4204
      code = terrno;
×
4205
      mndReleaseVgroup(pMnode, pVgroup);
×
4206
      goto _OVER;
×
4207
    }
4208
  }
4209

4210
  mndReleaseVgroup(pMnode, pVgroup);
1,198✔
4211

4212
  // Prepare and execute transaction
4213
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
1,198✔
4214
    goto _OVER;
×
4215
  }
4216

4217
  code = TSDB_CODE_ACTION_IN_PROGRESS;
1,198✔
4218

4219
_OVER:
1,198✔
4220
  if (pTrans != NULL) mndTransDrop(pTrans);
1,198✔
4221

4222
  return code;
1,198✔
4223
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc