• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4871

04 Dec 2025 01:55AM UTC coverage: 64.654% (+0.1%) from 64.545%
#4871

push

travis-ci

guanshengliang
Merge branch '3.0' into cover/3.0

869 of 2219 new or added lines in 36 files covered. (39.16%)

441 existing lines in 120 files now uncovered.

159620 of 246882 relevant lines covered (64.65%)

110922946.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.83
/source/dnode/mnode/impl/src/mndVgroup.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "mndArbGroup.h"
19
#include "mndDb.h"
20
#include "mndDnode.h"
21
#include "mndMnode.h"
22
#include "mndPrivilege.h"
23
#include "mndShow.h"
24
#include "mndStb.h"
25
#include "mndStream.h"
26
#include "mndTopic.h"
27
#include "mndTrans.h"
28
#include "mndUser.h"
29
#include "mndVgroup.h"
30
#include "tmisce.h"
31

32
#define VGROUP_VER_NUMBER   1
33
#define VGROUP_RESERVE_SIZE 60
34

35
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup);
36
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup);
37
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew);
38
static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw);
39

40
static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
41
static void    mndCancelGetNextVgroup(SMnode *pMnode, void *pIter);
42
static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
43
static void    mndCancelGetNextVnode(SMnode *pMnode, void *pIter);
44

45
static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq);
46
static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq);
47
static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq);
48
static int32_t mndProcessVgroupBalanceLeaderMsg(SRpcMsg *pReq);
49
static int32_t mndProcessSetVgroupKeepVersionReq(SRpcMsg *pReq);
50

51
int32_t mndInitVgroup(SMnode *pMnode) {
488,216✔
52
  SSdbTable table = {
488,216✔
53
      .sdbType = SDB_VGROUP,
54
      .keyType = SDB_KEY_INT32,
55
      .encodeFp = (SdbEncodeFp)mndVgroupActionEncode,
56
      .decodeFp = (SdbDecodeFp)mndVgroupActionDecode,
57
      .insertFp = (SdbInsertFp)mndVgroupActionInsert,
58
      .updateFp = (SdbUpdateFp)mndVgroupActionUpdate,
59
      .deleteFp = (SdbDeleteFp)mndVgroupActionDelete,
60
      .validateFp = (SdbValidateFp)mndNewVgActionValidate,
61
  };
62

63
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndTransProcessRsp);
488,216✔
64
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_REPLICA_RSP, mndTransProcessRsp);
488,216✔
65
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIG_RSP, mndTransProcessRsp);
488,216✔
66
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIRM_RSP, mndTransProcessRsp);
488,216✔
67
  mndSetMsgHandle(pMnode, TDMT_VND_SET_KEEP_VERSION_RSP, mndTransProcessRsp);
488,216✔
68
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_HASHRANGE_RSP, mndTransProcessRsp);
488,216✔
69
  mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndTransProcessRsp);
488,216✔
70
  mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_RSP, mndTransProcessRsp);
488,216✔
71
  mndSetMsgHandle(pMnode, TDMT_VND_SCAN_RSP, mndTransProcessRsp);
488,216✔
72
  mndSetMsgHandle(pMnode, TDMT_VND_DISABLE_WRITE_RSP, mndTransProcessRsp);
488,216✔
73
  mndSetMsgHandle(pMnode, TDMT_SYNC_FORCE_FOLLOWER_RSP, mndTransProcessRsp);
488,216✔
74
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_ELECTBASELINE_RSP, mndTransProcessRsp);
488,216✔
75
  
76
  mndSetMsgHandle(pMnode, TDMT_DND_ALTER_VNODE_TYPE_RSP, mndTransProcessRsp);
488,216✔
77
  mndSetMsgHandle(pMnode, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP_RSP, mndTransProcessRsp);
488,216✔
78
  mndSetMsgHandle(pMnode, TDMT_SYNC_CONFIG_CHANGE_RSP, mndTransProcessRsp);
488,216✔
79

80
  mndSetMsgHandle(pMnode, TDMT_MND_REDISTRIBUTE_VGROUP, mndProcessRedistributeVgroupMsg);
488,216✔
81
  mndSetMsgHandle(pMnode, TDMT_MND_SPLIT_VGROUP, mndProcessSplitVgroupMsg);
488,216✔
82
  // mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessVgroupBalanceLeaderMsg);
83
  mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessBalanceVgroupMsg);
488,216✔
84
  mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP_LEADER, mndProcessVgroupBalanceLeaderMsg);
488,216✔
85
  mndSetMsgHandle(pMnode, TDMT_MND_SET_VGROUP_KEEP_VERSION, mndProcessSetVgroupKeepVersionReq);
488,216✔
86

87
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndRetrieveVgroups);
488,216✔
88
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndCancelGetNextVgroup);
488,216✔
89
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndRetrieveVnodes);
488,216✔
90
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndCancelGetNextVnode);
488,216✔
91

92
  return sdbSetTable(pMnode->pSdb, table);
488,216✔
93
}
94

95
void mndCleanupVgroup(SMnode *pMnode) {}
487,446✔
96

97
SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) {
11,320,845✔
98
  int32_t code = 0;
11,320,845✔
99
  int32_t lino = 0;
11,320,845✔
100
  terrno = TSDB_CODE_OUT_OF_MEMORY;
11,320,845✔
101

102
  SSdbRaw *pRaw = sdbAllocRaw(SDB_VGROUP, VGROUP_VER_NUMBER, sizeof(SVgObj) + VGROUP_RESERVE_SIZE);
11,320,845✔
103
  if (pRaw == NULL) goto _OVER;
11,320,845✔
104

105
  int32_t dataPos = 0;
11,320,845✔
106
  SDB_SET_INT32(pRaw, dataPos, pVgroup->vgId, _OVER)
11,320,845✔
107
  SDB_SET_INT64(pRaw, dataPos, pVgroup->createdTime, _OVER)
11,320,845✔
108
  SDB_SET_INT64(pRaw, dataPos, pVgroup->updateTime, _OVER)
11,320,845✔
109
  SDB_SET_INT32(pRaw, dataPos, pVgroup->version, _OVER)
11,320,845✔
110
  SDB_SET_INT32(pRaw, dataPos, pVgroup->hashBegin, _OVER)
11,320,845✔
111
  SDB_SET_INT32(pRaw, dataPos, pVgroup->hashEnd, _OVER)
11,320,845✔
112
  SDB_SET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, _OVER)
11,320,845✔
113
  SDB_SET_INT64(pRaw, dataPos, pVgroup->dbUid, _OVER)
11,320,845✔
114
  SDB_SET_INT8(pRaw, dataPos, pVgroup->isTsma, _OVER)
11,320,845✔
115
  SDB_SET_INT8(pRaw, dataPos, pVgroup->replica, _OVER)
11,320,845✔
116
  for (int8_t i = 0; i < pVgroup->replica; ++i) {
24,954,209✔
117
    SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
13,633,364✔
118
    SDB_SET_INT32(pRaw, dataPos, pVgid->dnodeId, _OVER)
13,633,364✔
119
  }
120
  SDB_SET_INT32(pRaw, dataPos, pVgroup->syncConfChangeVer, _OVER)
11,320,845✔
121
  SDB_SET_INT32(pRaw, dataPos, pVgroup->mountVgId, _OVER)
11,320,845✔
122
  SDB_SET_INT64(pRaw, dataPos, pVgroup->keepVersion, _OVER)
11,320,845✔
123
  SDB_SET_INT64(pRaw, dataPos, pVgroup->keepVersionTime, _OVER)
11,320,845✔
124
  SDB_SET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER)
11,320,845✔
125
  SDB_SET_DATALEN(pRaw, dataPos, _OVER)
11,320,845✔
126

127
  terrno = 0;
11,320,845✔
128

129
_OVER:
11,320,845✔
130
  if (terrno != 0) {
11,320,845✔
131
    mError("vgId:%d, failed to encode to raw:%p since %s", pVgroup->vgId, pRaw, terrstr());
×
132
    sdbFreeRaw(pRaw);
×
133
    return NULL;
×
134
  }
135

136
  mTrace("vgId:%d, encode to raw:%p, row:%p", pVgroup->vgId, pRaw, pVgroup);
11,320,845✔
137
  return pRaw;
11,320,845✔
138
}
139

140
SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
10,677,950✔
141
  int32_t code = 0;
10,677,950✔
142
  int32_t lino = 0;
10,677,950✔
143
  terrno = TSDB_CODE_OUT_OF_MEMORY;
10,677,950✔
144
  SSdbRow *pRow = NULL;
10,677,950✔
145
  SVgObj  *pVgroup = NULL;
10,677,950✔
146

147
  int8_t sver = 0;
10,677,950✔
148
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
10,677,950✔
149

150
  if (sver < 1 || sver > VGROUP_VER_NUMBER) {
10,677,950✔
151
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
152
    goto _OVER;
×
153
  }
154

155
  pRow = sdbAllocRow(sizeof(SVgObj));
10,677,950✔
156
  if (pRow == NULL) goto _OVER;
10,677,950✔
157

158
  pVgroup = sdbGetRowObj(pRow);
10,677,950✔
159
  if (pVgroup == NULL) goto _OVER;
10,677,950✔
160

161
  int32_t dataPos = 0;
10,677,950✔
162
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->vgId, _OVER)
10,677,950✔
163
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->createdTime, _OVER)
10,677,950✔
164
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->updateTime, _OVER)
10,677,950✔
165
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->version, _OVER)
10,677,950✔
166
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashBegin, _OVER)
10,677,950✔
167
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashEnd, _OVER)
10,677,950✔
168
  SDB_GET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, _OVER)
10,677,950✔
169
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->dbUid, _OVER)
10,677,950✔
170
  SDB_GET_INT8(pRaw, dataPos, &pVgroup->isTsma, _OVER)
10,677,950✔
171
  SDB_GET_INT8(pRaw, dataPos, &pVgroup->replica, _OVER)
10,677,950✔
172
  for (int8_t i = 0; i < pVgroup->replica; ++i) {
23,888,230✔
173
    SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
13,210,280✔
174
    SDB_GET_INT32(pRaw, dataPos, &pVgid->dnodeId, _OVER)
13,210,280✔
175
    if (pVgroup->replica == 1) {
13,210,280✔
176
      pVgid->syncState = TAOS_SYNC_STATE_LEADER;
9,349,446✔
177
    }
178
  }
179
  if (dataPos + 2 * sizeof(int32_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen) {
10,677,950✔
180
    SDB_GET_INT32(pRaw, dataPos, &pVgroup->syncConfChangeVer, _OVER)
10,677,950✔
181
  }
182
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->mountVgId, _OVER)
10,677,950✔
183
  if (dataPos + sizeof(int64_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen) {
10,677,950✔
184
    SDB_GET_INT64(pRaw, dataPos, &pVgroup->keepVersion, _OVER)
10,677,950✔
185
  }
186
  if (dataPos + sizeof(int64_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen) {
10,677,950✔
187
    SDB_GET_INT64(pRaw, dataPos, &pVgroup->keepVersionTime, _OVER)
10,677,950✔
188
  }
189
  SDB_GET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER)
10,677,950✔
190

191
  terrno = 0;
10,677,950✔
192

193
_OVER:
10,677,950✔
194
  if (terrno != 0) {
10,677,950✔
195
    mError("vgId:%d, failed to decode from raw:%p since %s", pVgroup == NULL ? 0 : pVgroup->vgId, pRaw, terrstr());
×
196
    taosMemoryFreeClear(pRow);
×
197
    return NULL;
×
198
  }
199

200
  mTrace("vgId:%d, decode from raw:%p, row:%p", pVgroup->vgId, pRaw, pVgroup);
10,677,950✔
201
  return pRow;
10,677,950✔
202
}
203

204
static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw) {
2,760,159✔
205
  SSdb    *pSdb = pMnode->pSdb;
2,760,159✔
206
  SSdbRow *pRow = NULL;
2,760,159✔
207
  SVgObj  *pVgroup = NULL;
2,760,159✔
208
  int      code = -1;
2,760,159✔
209

210
  pRow = mndVgroupActionDecode(pRaw);
2,760,159✔
211
  if (pRow == NULL) {
2,760,159✔
212
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
213
    if (terrno != 0) code = terrno;
×
214
    goto _OVER;
×
215
  }
216
  pVgroup = sdbGetRowObj(pRow);
2,760,159✔
217
  if (pVgroup == NULL) {
2,760,159✔
218
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
219
    if (terrno != 0) code = terrno;
×
220
    goto _OVER;
×
221
  }
222

223
  int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
2,760,159✔
224
  if (maxVgId > pVgroup->vgId) {
2,760,159✔
225
    mError("trans:%d, vgroup id %d already in use. maxVgId:%d", pTrans->id, pVgroup->vgId, maxVgId);
×
226
    goto _OVER;
×
227
  }
228

229
  code = 0;
2,760,159✔
230
_OVER:
2,760,159✔
231
  if (pVgroup) mndVgroupActionDelete(pSdb, pVgroup);
2,760,159✔
232
  taosMemoryFreeClear(pRow);
2,760,159✔
233
  TAOS_RETURN(code);
2,760,159✔
234
}
235

236
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup) {
3,181,925✔
237
  mTrace("vgId:%d, perform insert action, row:%p", pVgroup->vgId, pVgroup);
3,181,925✔
238
  return 0;
3,181,925✔
239
}
240

241
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup) {
10,655,294✔
242
  mTrace("vgId:%d, perform delete action, row:%p", pVgroup->vgId, pVgroup);
10,655,294✔
243
  return 0;
10,655,294✔
244
}
245

246
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew) {
3,213,876✔
247
  mTrace("vgId:%d, perform update action, old row:%p new row:%p", pOld->vgId, pOld, pNew);
3,213,876✔
248
  pOld->updateTime = pNew->updateTime;
3,213,876✔
249
  pOld->version = pNew->version;
3,213,876✔
250
  pOld->hashBegin = pNew->hashBegin;
3,213,876✔
251
  pOld->hashEnd = pNew->hashEnd;
3,213,876✔
252
  pOld->replica = pNew->replica;
3,213,876✔
253
  pOld->isTsma = pNew->isTsma;
3,213,876✔
254
  pOld->keepVersion = pNew->keepVersion;
3,213,876✔
255
  pOld->keepVersionTime = pNew->keepVersionTime;
3,213,876✔
256
  for (int32_t i = 0; i < pNew->replica; ++i) {
7,592,135✔
257
    SVnodeGid *pNewGid = &pNew->vnodeGid[i];
4,378,259✔
258
    for (int32_t j = 0; j < pOld->replica; ++j) {
12,274,030✔
259
      SVnodeGid *pOldGid = &pOld->vnodeGid[j];
7,895,771✔
260
      if (pNewGid->dnodeId == pOldGid->dnodeId) {
7,895,771✔
261
        pNewGid->syncState = pOldGid->syncState;
4,152,348✔
262
        pNewGid->syncRestore = pOldGid->syncRestore;
4,152,348✔
263
        pNewGid->syncCanRead = pOldGid->syncCanRead;
4,152,348✔
264
        pNewGid->syncAppliedIndex = pOldGid->syncAppliedIndex;
4,152,348✔
265
        pNewGid->syncCommitIndex = pOldGid->syncCommitIndex;
4,152,348✔
266
        pNewGid->bufferSegmentUsed = pOldGid->bufferSegmentUsed;
4,152,348✔
267
        pNewGid->bufferSegmentSize = pOldGid->bufferSegmentSize;
4,152,348✔
268
      }
269
    }
270
  }
271
  pNew->numOfTables = pOld->numOfTables;
3,213,876✔
272
  pNew->numOfTimeSeries = pOld->numOfTimeSeries;
3,213,876✔
273
  pNew->totalStorage = pOld->totalStorage;
3,213,876✔
274
  pNew->compStorage = pOld->compStorage;
3,213,876✔
275
  pNew->pointsWritten = pOld->pointsWritten;
3,213,876✔
276
  pNew->compact = pOld->compact;
3,213,876✔
277
  memcpy(pOld->vnodeGid, pNew->vnodeGid, (TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA) * sizeof(SVnodeGid));
3,213,876✔
278
  pOld->syncConfChangeVer = pNew->syncConfChangeVer;
3,213,876✔
279
  tstrncpy(pOld->dbName, pNew->dbName, TSDB_DB_FNAME_LEN);
3,213,876✔
280
  return 0;
3,213,876✔
281
}
282

283
SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId) {
108,106,867✔
284
  SSdb   *pSdb = pMnode->pSdb;
108,106,867✔
285
  SVgObj *pVgroup = sdbAcquire(pSdb, SDB_VGROUP, &vgId);
108,106,867✔
286
  if (pVgroup == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
108,106,867✔
287
    terrno = TSDB_CODE_MND_VGROUP_NOT_EXIST;
246,422✔
288
  }
289
  return pVgroup;
108,106,867✔
290
}
291

292
void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) {
108,035,669✔
293
  SSdb *pSdb = pMnode->pSdb;
108,035,669✔
294
  sdbRelease(pSdb, pVgroup);
108,035,669✔
295
}
108,035,669✔
296

297
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
2,974,277✔
298
  SCreateVnodeReq createReq = {0};
2,974,277✔
299
  createReq.vgId = pVgroup->vgId;
2,974,277✔
300
  memcpy(createReq.db, pDb->name, TSDB_DB_FNAME_LEN);
2,974,277✔
301
  createReq.dbUid = pDb->uid;
2,974,277✔
302
  createReq.vgVersion = pVgroup->version;
2,974,277✔
303
  createReq.numOfStables = pDb->cfg.numOfStables;
2,974,277✔
304
  createReq.buffer = pDb->cfg.buffer;
2,974,277✔
305
  createReq.pageSize = pDb->cfg.pageSize;
2,974,277✔
306
  createReq.pages = pDb->cfg.pages;
2,974,277✔
307
  createReq.cacheLastSize = pDb->cfg.cacheLastSize;
2,974,277✔
308
  createReq.daysPerFile = pDb->cfg.daysPerFile;
2,974,277✔
309
  createReq.daysToKeep0 = pDb->cfg.daysToKeep0;
2,974,277✔
310
  createReq.daysToKeep1 = pDb->cfg.daysToKeep1;
2,974,277✔
311
  createReq.daysToKeep2 = pDb->cfg.daysToKeep2;
2,974,277✔
312
  createReq.keepTimeOffset = pDb->cfg.keepTimeOffset;
2,974,277✔
313
  createReq.ssChunkSize = pDb->cfg.ssChunkSize;
2,974,277✔
314
  createReq.ssKeepLocal = pDb->cfg.ssKeepLocal;
2,974,277✔
315
  createReq.ssCompact = pDb->cfg.ssCompact;
2,974,277✔
316
  createReq.minRows = pDb->cfg.minRows;
2,974,277✔
317
  createReq.maxRows = pDb->cfg.maxRows;
2,974,277✔
318
  createReq.walFsyncPeriod = pDb->cfg.walFsyncPeriod;
2,974,277✔
319
  createReq.walLevel = pDb->cfg.walLevel;
2,974,277✔
320
  createReq.precision = pDb->cfg.precision;
2,974,277✔
321
  createReq.compression = pDb->cfg.compression;
2,974,277✔
322
  createReq.strict = pDb->cfg.strict;
2,974,277✔
323
  createReq.cacheLast = pDb->cfg.cacheLast;
2,974,277✔
324
  createReq.replica = 0;
2,974,277✔
325
  createReq.learnerReplica = 0;
2,974,277✔
326
  createReq.selfIndex = -1;
2,974,277✔
327
  createReq.learnerSelfIndex = -1;
2,974,277✔
328
  createReq.hashBegin = pVgroup->hashBegin;
2,974,277✔
329
  createReq.hashEnd = pVgroup->hashEnd;
2,974,277✔
330
  createReq.hashMethod = pDb->cfg.hashMethod;
2,974,277✔
331
  createReq.numOfRetensions = pDb->cfg.numOfRetensions;
2,974,277✔
332
  createReq.pRetensions = pDb->cfg.pRetensions;
2,974,277✔
333
  createReq.isTsma = pVgroup->isTsma;
2,974,277✔
334
  createReq.pTsma = pVgroup->pTsma;
2,974,277✔
335
  createReq.walRetentionPeriod = pDb->cfg.walRetentionPeriod;
2,974,277✔
336
  createReq.walRetentionSize = pDb->cfg.walRetentionSize;
2,974,277✔
337
  createReq.walRollPeriod = pDb->cfg.walRollPeriod;
2,974,277✔
338
  createReq.walSegmentSize = pDb->cfg.walSegmentSize;
2,974,277✔
339
  createReq.sstTrigger = pDb->cfg.sstTrigger;
2,974,277✔
340
  createReq.hashPrefix = pDb->cfg.hashPrefix;
2,974,277✔
341
  createReq.hashSuffix = pDb->cfg.hashSuffix;
2,974,277✔
342
  createReq.tsdbPageSize = pDb->cfg.tsdbPageSize;
2,974,277✔
343
  createReq.changeVersion = ++(pVgroup->syncConfChangeVer);
2,974,277✔
344
  createReq.encryptAlgorithm = pDb->cfg.encryptAlgorithm;
2,974,277✔
345
  int32_t code = 0;
2,974,277✔
346

347
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
7,174,820✔
348
    SReplica *pReplica = NULL;
4,200,543✔
349

350
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
4,200,543✔
351
      pReplica = &createReq.replicas[createReq.replica];
4,094,398✔
352
    } else {
353
      pReplica = &createReq.learnerReplicas[createReq.learnerReplica];
106,145✔
354
    }
355

356
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
4,200,543✔
357
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
4,200,543✔
358
    if (pVgidDnode == NULL) {
4,200,543✔
359
      return NULL;
×
360
    }
361

362
    pReplica->id = pVgidDnode->id;
4,200,543✔
363
    pReplica->port = pVgidDnode->port;
4,200,543✔
364
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
4,200,543✔
365
    mndReleaseDnode(pMnode, pVgidDnode);
4,200,543✔
366

367
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
4,200,543✔
368
      if (pDnode->id == pVgid->dnodeId) {
4,094,398✔
369
        createReq.selfIndex = createReq.replica;
2,868,132✔
370
      }
371
    } else {
372
      if (pDnode->id == pVgid->dnodeId) {
106,145✔
373
        createReq.learnerSelfIndex = createReq.learnerReplica;
106,145✔
374
      }
375
    }
376

377
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
4,200,543✔
378
      createReq.replica++;
4,094,398✔
379
    } else {
380
      createReq.learnerReplica++;
106,145✔
381
    }
382
  }
383

384
  if (createReq.selfIndex == -1 && createReq.learnerSelfIndex == -1) {
2,974,277✔
385
    terrno = TSDB_CODE_APP_ERROR;
×
386
    return NULL;
×
387
  }
388

389
  createReq.changeVersion = pVgroup->syncConfChangeVer;
2,974,277✔
390

391
  mInfo(
2,974,277✔
392
      "vgId:%d, build create vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d "
393
      "changeVersion:%d",
394
      createReq.vgId, createReq.replica, createReq.selfIndex, createReq.learnerReplica, createReq.learnerSelfIndex,
395
      createReq.strict, createReq.changeVersion);
396
  for (int32_t i = 0; i < createReq.replica; ++i) {
7,068,675✔
397
    mInfo("vgId:%d, replica:%d ep:%s:%u", createReq.vgId, i, createReq.replicas[i].fqdn, createReq.replicas[i].port);
4,094,398✔
398
  }
399
  for (int32_t i = 0; i < createReq.learnerReplica; ++i) {
3,080,422✔
400
    mInfo("vgId:%d, replica:%d ep:%s:%u", createReq.vgId, i, createReq.learnerReplicas[i].fqdn,
106,145✔
401
          createReq.learnerReplicas[i].port);
402
  }
403

404
  int32_t contLen = tSerializeSCreateVnodeReq(NULL, 0, &createReq);
2,974,277✔
405
  if (contLen < 0) {
2,974,277✔
406
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
407
    return NULL;
×
408
  }
409

410
  void *pReq = taosMemoryMalloc(contLen);
2,974,277✔
411
  if (pReq == NULL) {
2,974,277✔
412
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
413
    return NULL;
×
414
  }
415

416
  code = tSerializeSCreateVnodeReq(pReq, contLen, &createReq);
2,974,277✔
417
  if (code < 0) {
2,974,277✔
418
    terrno = TSDB_CODE_APP_ERROR;
×
419
    taosMemoryFree(pReq);
×
420
    mError("vgId:%d, failed to serialize create vnode req,since %s", createReq.vgId, terrstr());
×
421
    return NULL;
×
422
  }
423
  *pContLen = contLen;
2,974,277✔
424
  return pReq;
2,974,277✔
425
}
426

427
static void *mndBuildAlterVnodeConfigReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
209,007✔
428
  SAlterVnodeConfigReq alterReq = {0};
209,007✔
429
  alterReq.vgVersion = pVgroup->version;
209,007✔
430
  alterReq.buffer = pDb->cfg.buffer;
209,007✔
431
  alterReq.pageSize = pDb->cfg.pageSize;
209,007✔
432
  alterReq.pages = pDb->cfg.pages;
209,007✔
433
  alterReq.cacheLastSize = pDb->cfg.cacheLastSize;
209,007✔
434
  alterReq.daysPerFile = pDb->cfg.daysPerFile;
209,007✔
435
  alterReq.daysToKeep0 = pDb->cfg.daysToKeep0;
209,007✔
436
  alterReq.daysToKeep1 = pDb->cfg.daysToKeep1;
209,007✔
437
  alterReq.daysToKeep2 = pDb->cfg.daysToKeep2;
209,007✔
438
  alterReq.keepTimeOffset = pDb->cfg.keepTimeOffset;
209,007✔
439
  alterReq.walFsyncPeriod = pDb->cfg.walFsyncPeriod;
209,007✔
440
  alterReq.walLevel = pDb->cfg.walLevel;
209,007✔
441
  alterReq.strict = pDb->cfg.strict;
209,007✔
442
  alterReq.cacheLast = pDb->cfg.cacheLast;
209,007✔
443
  alterReq.sttTrigger = pDb->cfg.sstTrigger;
209,007✔
444
  alterReq.minRows = pDb->cfg.minRows;
209,007✔
445
  alterReq.walRetentionPeriod = pDb->cfg.walRetentionPeriod;
209,007✔
446
  alterReq.walRetentionSize = pDb->cfg.walRetentionSize;
209,007✔
447
  alterReq.ssKeepLocal = pDb->cfg.ssKeepLocal;
209,007✔
448
  alterReq.ssCompact = pDb->cfg.ssCompact;
209,007✔
449

450
  mInfo("vgId:%d, build alter vnode config req", pVgroup->vgId);
209,007✔
451
  int32_t contLen = tSerializeSAlterVnodeConfigReq(NULL, 0, &alterReq);
209,007✔
452
  if (contLen < 0) {
209,007✔
453
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
454
    return NULL;
×
455
  }
456
  contLen += sizeof(SMsgHead);
209,007✔
457

458
  void *pReq = taosMemoryMalloc(contLen);
209,007✔
459
  if (pReq == NULL) {
209,007✔
460
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
461
    return NULL;
×
462
  }
463

464
  SMsgHead *pHead = pReq;
209,007✔
465
  pHead->contLen = htonl(contLen);
209,007✔
466
  pHead->vgId = htonl(pVgroup->vgId);
209,007✔
467

468
  if (tSerializeSAlterVnodeConfigReq((char *)pReq + sizeof(SMsgHead), contLen, &alterReq) < 0) {
209,007✔
469
    taosMemoryFree(pReq);
×
470
    mError("vgId:%d, failed to serialize alter vnode config req,since %s", pVgroup->vgId, terrstr());
×
471
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
472
    return NULL;
×
473
  }
474
  *pContLen = contLen;
209,007✔
475
  return pReq;
209,007✔
476
}
477

478
static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId,
785,323✔
479
                                          int32_t *pContLen) {
480
  SAlterVnodeReplicaReq alterReq = {
1,570,646✔
481
      .vgId = pVgroup->vgId,
785,323✔
482
      .strict = pDb->cfg.strict,
785,323✔
483
      .replica = 0,
484
      .learnerReplica = 0,
485
      .selfIndex = -1,
486
      .learnerSelfIndex = -1,
487
      .changeVersion = ++(pVgroup->syncConfChangeVer),
1,570,646✔
488
  };
489

490
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
3,203,832✔
491
    SReplica *pReplica = NULL;
2,418,509✔
492

493
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
2,418,509✔
494
      pReplica = &alterReq.replicas[alterReq.replica];
2,232,088✔
495
      alterReq.replica++;
2,232,088✔
496
    } else {
497
      pReplica = &alterReq.learnerReplicas[alterReq.learnerReplica];
186,421✔
498
      alterReq.learnerReplica++;
186,421✔
499
    }
500

501
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
2,418,509✔
502
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
2,418,509✔
503
    if (pVgidDnode == NULL) return NULL;
2,418,509✔
504

505
    pReplica->id = pVgidDnode->id;
2,418,509✔
506
    pReplica->port = pVgidDnode->port;
2,418,509✔
507
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
2,418,509✔
508
    mndReleaseDnode(pMnode, pVgidDnode);
2,418,509✔
509

510
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
2,418,509✔
511
      if (dnodeId == pVgid->dnodeId) {
2,232,088✔
512
        alterReq.selfIndex = v;
785,323✔
513
      }
514
    } else {
515
      if (dnodeId == pVgid->dnodeId) {
186,421✔
516
        alterReq.learnerSelfIndex = v;
×
517
      }
518
    }
519
  }
520

521
  mInfo(
785,323✔
522
      "vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d "
523
      "changeVersion:%d",
524
      alterReq.vgId, alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica, alterReq.learnerSelfIndex,
525
      alterReq.strict, alterReq.changeVersion);
526
  for (int32_t i = 0; i < alterReq.replica; ++i) {
3,017,411✔
527
    mInfo("vgId:%d, replica:%d ep:%s:%u", alterReq.vgId, i, alterReq.replicas[i].fqdn, alterReq.replicas[i].port);
2,232,088✔
528
  }
529
  for (int32_t i = 0; i < alterReq.learnerReplica; ++i) {
971,744✔
530
    mInfo("vgId:%d, learnerReplica:%d ep:%s:%u", alterReq.vgId, i, alterReq.learnerReplicas[i].fqdn,
186,421✔
531
          alterReq.learnerReplicas[i].port);
532
  }
533

534
  if (alterReq.selfIndex == -1 && alterReq.learnerSelfIndex == -1) {
785,323✔
535
    terrno = TSDB_CODE_APP_ERROR;
×
536
    return NULL;
×
537
  }
538

539
  int32_t contLen = tSerializeSAlterVnodeReplicaReq(NULL, 0, &alterReq);
785,323✔
540
  if (contLen < 0) {
785,323✔
541
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
542
    return NULL;
×
543
  }
544

545
  void *pReq = taosMemoryMalloc(contLen);
785,323✔
546
  if (pReq == NULL) {
785,323✔
547
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
548
    return NULL;
×
549
  }
550

551
  if (tSerializeSAlterVnodeReplicaReq(pReq, contLen, &alterReq) < 0) {
785,323✔
552
    mError("vgId:%d, failed to serialize alter vnode req,since %s", alterReq.vgId, terrstr());
×
553
    taosMemoryFree(pReq);
×
554
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
555
    return NULL;
×
556
  }
557
  *pContLen = contLen;
785,323✔
558
  return pReq;
785,323✔
559
}
560

561
static void *mndBuildCheckLearnCatchupReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId,
×
562
                                          int32_t *pContLen) {
563
  SCheckLearnCatchupReq req = {
×
564
      .vgId = pVgroup->vgId,
×
565
      .strict = pDb->cfg.strict,
×
566
      .replica = 0,
567
      .learnerReplica = 0,
568
      .selfIndex = -1,
569
      .learnerSelfIndex = -1,
570
  };
571

572
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
×
573
    SReplica *pReplica = NULL;
×
574

575
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
576
      pReplica = &req.replicas[req.replica];
×
577
      req.replica++;
×
578
    } else {
579
      pReplica = &req.learnerReplicas[req.learnerReplica];
×
580
      req.learnerReplica++;
×
581
    }
582

583
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
×
584
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
×
585
    if (pVgidDnode == NULL) return NULL;
×
586

587
    pReplica->id = pVgidDnode->id;
×
588
    pReplica->port = pVgidDnode->port;
×
589
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
×
590
    mndReleaseDnode(pMnode, pVgidDnode);
×
591

592
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
593
      if (dnodeId == pVgid->dnodeId) {
×
594
        req.selfIndex = v;
×
595
      }
596
    } else {
597
      if (dnodeId == pVgid->dnodeId) {
×
598
        req.learnerSelfIndex = v;
×
599
      }
600
    }
601
  }
602

603
  mInfo("vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d",
×
604
        req.vgId, req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict);
605
  for (int32_t i = 0; i < req.replica; ++i) {
×
606
    mInfo("vgId:%d, replica:%d ep:%s:%u", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port);
×
607
  }
608
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
×
609
    mInfo("vgId:%d, learnerReplica:%d ep:%s:%u", req.vgId, i, req.learnerReplicas[i].fqdn, req.learnerReplicas[i].port);
×
610
  }
611

612
  if (req.selfIndex == -1 && req.learnerSelfIndex == -1) {
×
613
    terrno = TSDB_CODE_APP_ERROR;
×
614
    return NULL;
×
615
  }
616

617
  int32_t contLen = tSerializeSAlterVnodeReplicaReq(NULL, 0, &req);
×
618
  if (contLen < 0) {
×
619
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
620
    return NULL;
×
621
  }
622

623
  void *pReq = taosMemoryMalloc(contLen);
×
624
  if (pReq == NULL) {
×
625
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
626
    return NULL;
×
627
  }
628

629
  if (tSerializeSAlterVnodeReplicaReq(pReq, contLen, &req) < 0) {
×
630
    mError("vgId:%d, failed to serialize alter vnode req,since %s", req.vgId, terrstr());
×
631
    taosMemoryFree(pReq);
×
632
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
633
    return NULL;
×
634
  }
635
  *pContLen = contLen;
×
636
  return pReq;
×
637
}
638

639
static void *mndBuildDisableVnodeWriteReq(SMnode *pMnode, SDbObj *pDb, int32_t vgId, int32_t *pContLen) {
22,826✔
640
  SDisableVnodeWriteReq disableReq = {
22,826✔
641
      .vgId = vgId,
642
      .disable = 1,
643
  };
644

645
  mInfo("vgId:%d, build disable vnode write req", vgId);
22,826✔
646
  int32_t contLen = tSerializeSDisableVnodeWriteReq(NULL, 0, &disableReq);
22,826✔
647
  if (contLen < 0) {
22,826✔
648
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
649
    return NULL;
×
650
  }
651

652
  void *pReq = taosMemoryMalloc(contLen);
22,826✔
653
  if (pReq == NULL) {
22,826✔
654
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
655
    return NULL;
×
656
  }
657

658
  if (tSerializeSDisableVnodeWriteReq(pReq, contLen, &disableReq) < 0) {
22,826✔
659
    mError("vgId:%d, failed to serialize disable vnode write req,since %s", vgId, terrstr());
×
660
    taosMemoryFree(pReq);
×
661
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
662
    return NULL;
×
663
  }
664
  *pContLen = contLen;
22,826✔
665
  return pReq;
22,826✔
666
}
667

668
static void *mndBuildAlterVnodeHashRangeReq(SMnode *pMnode, int32_t srcVgId, SVgObj *pVgroup, int32_t *pContLen) {
22,826✔
669
  SAlterVnodeHashRangeReq alterReq = {
45,652✔
670
      .srcVgId = srcVgId,
671
      .dstVgId = pVgroup->vgId,
22,826✔
672
      .hashBegin = pVgroup->hashBegin,
22,826✔
673
      .hashEnd = pVgroup->hashEnd,
22,826✔
674
      .changeVersion = ++(pVgroup->syncConfChangeVer),
45,652✔
675
  };
676

677
  mInfo("vgId:%d, build alter vnode hashrange req, dstVgId:%d, hashrange:[%u, %u]", srcVgId, pVgroup->vgId,
22,826✔
678
        pVgroup->hashBegin, pVgroup->hashEnd);
679
  int32_t contLen = tSerializeSAlterVnodeHashRangeReq(NULL, 0, &alterReq);
22,826✔
680
  if (contLen < 0) {
22,826✔
681
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
682
    return NULL;
×
683
  }
684

685
  void *pReq = taosMemoryMalloc(contLen);
22,826✔
686
  if (pReq == NULL) {
22,826✔
687
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
688
    return NULL;
×
689
  }
690

691
  if (tSerializeSAlterVnodeHashRangeReq(pReq, contLen, &alterReq) < 0) {
22,826✔
692
    mError("vgId:%d, failed to serialize alter vnode hashrange req,since %s", srcVgId, terrstr());
×
693
    taosMemoryFree(pReq);
×
694
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
695
    return NULL;
×
696
  }
697
  *pContLen = contLen;
22,826✔
698
  return pReq;
22,826✔
699
}
700

701
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
4,430,709✔
702
  SDropVnodeReq dropReq = {0};
4,430,709✔
703
  dropReq.dnodeId = pDnode->id;
4,430,709✔
704
  dropReq.vgId = pVgroup->vgId;
4,430,709✔
705
  memcpy(dropReq.db, pDb->name, TSDB_DB_FNAME_LEN);
4,430,709✔
706
  dropReq.dbUid = pDb->uid;
4,430,709✔
707

708
  mInfo("vgId:%d, build drop vnode req", dropReq.vgId);
4,430,709✔
709
  int32_t contLen = tSerializeSDropVnodeReq(NULL, 0, &dropReq);
4,430,709✔
710
  if (contLen < 0) {
4,430,709✔
711
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
712
    return NULL;
×
713
  }
714

715
  void *pReq = taosMemoryMalloc(contLen);
4,430,709✔
716
  if (pReq == NULL) {
4,430,709✔
717
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
718
    return NULL;
×
719
  }
720

721
  if (tSerializeSDropVnodeReq(pReq, contLen, &dropReq) < 0) {
4,430,709✔
722
    mError("vgId:%d, failed to serialize drop vnode req,since %s", dropReq.vgId, terrstr());
×
723
    taosMemoryFree(pReq);
×
724
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
725
    return NULL;
×
726
  }
727
  *pContLen = contLen;
4,430,709✔
728
  return pReq;
4,430,709✔
729
}
730

731
static bool mndResetDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
1,959,416✔
732
  SDnodeObj *pDnode = pObj;
1,959,416✔
733
  pDnode->numOfVnodes = 0;
1,959,416✔
734
  pDnode->numOfOtherNodes = 0;
1,959,416✔
735
  return true;
1,959,416✔
736
}
737

738
static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
1,959,416✔
739
  SDnodeObj *pDnode = pObj;
1,959,416✔
740
  SArray    *pArray = p1;
1,959,416✔
741
  int32_t    exceptDnodeId = *(int32_t *)p2;
1,959,416✔
742
  SArray    *dnodeList = p3;
1,959,416✔
743

744
  if (exceptDnodeId == pDnode->id) {
1,959,416✔
745
    return true;
8,327✔
746
  }
747

748
  if (dnodeList != NULL) {
1,951,089✔
749
    int32_t dnodeListSize = taosArrayGetSize(dnodeList);
71,535✔
750
    if (dnodeListSize > 0) {
71,535✔
751
      bool inDnodeList = false;
71,535✔
752
      for (int32_t index = 0; index < dnodeListSize; ++index) {
233,430✔
753
        int32_t dnodeId = *(int32_t *)taosArrayGet(dnodeList, index);
161,895✔
754
        if (pDnode->id == dnodeId) {
161,895✔
755
          inDnodeList = true;
32,379✔
756
        }
757
      }
758
      if (!inDnodeList) {
71,535✔
759
        return true;
39,156✔
760
      }
761
    } else {
762
      return true;  // TS-6191
×
763
    }
764
  }
765

766
  int64_t curMs = taosGetTimestampMs();
1,911,933✔
767
  bool    online = mndIsDnodeOnline(pDnode, curMs);
1,911,933✔
768
  bool    isMnode = mndIsMnode(pMnode, pDnode->id);
1,911,933✔
769
  pDnode->numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
1,911,933✔
770
  pDnode->memUsed = mndGetVnodesMemory(pMnode, pDnode->id);
1,911,933✔
771

772
  mInfo("dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d online:%d memory avail:%" PRId64 " used:%" PRId64, pDnode->id,
1,911,933✔
773
        pDnode->numOfVnodes, pDnode->numOfSupportVnodes, isMnode, online, pDnode->memAvail, pDnode->memUsed);
774

775
  if (isMnode) {
1,911,933✔
776
    pDnode->numOfOtherNodes++;
1,388,467✔
777
  }
778

779
  if (online && pDnode->numOfSupportVnodes > 0) {
1,911,933✔
780
    if (taosArrayPush(pArray, pDnode) == NULL) return false;
1,865,415✔
781
  }
782
  return true;
1,911,933✔
783
}
784

785
static bool isDnodeInList(SArray *dnodeList, int32_t dnodeId) {
×
786
  int32_t dnodeListSize = taosArrayGetSize(dnodeList);
×
787
  for (int32_t i = 0; i < dnodeListSize; ++i) {
×
788
    int32_t id = *(int32_t *)TARRAY_GET_ELEM(dnodeList, i);
×
789
    if (id == dnodeId) {
×
790
      return true;
×
791
    }
792
  }
793
  return false;
×
794
}
795

796
#ifdef TD_ENTERPRISE
797
static float mndGetDnodeScore1(SDnodeObj *pDnode, int32_t additionDnodes, float ratio) {
×
798
  float totalDnodes = pDnode->numOfVnodes + (float)pDnode->numOfOtherNodes * ratio + additionDnodes;
×
799
  float result = totalDnodes / pDnode->numOfSupportVnodes;
×
800
  return pDnode->numOfVnodes > 0 ? -result : result;
×
801
}
802

803
static int32_t mndCompareDnodeVnodes1(SDnodeObj *pDnode1, SDnodeObj *pDnode2) {
×
804
  float d1Score = mndGetDnodeScore1(pDnode1, 0, 0.9);
×
805
  float d2Score = mndGetDnodeScore1(pDnode2, 0, 0.9);
×
806
  if (d1Score == d2Score) {
×
807
    if (pDnode1->id == pDnode2->id) {
×
808
      return 0;
×
809
    }
810
    return pDnode1->id > pDnode2->id ? 1 : -1;
×
811
  }
812
  return d1Score > d2Score ? 1 : -1;
×
813
}
814

815
static bool mndBuildDnodesListFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
×
816
  SDnodeObj *pDnode = pObj;
×
817
  SArray    *pArray = p1;
×
818

819
  bool isMnode = mndIsMnode(pMnode, pDnode->id);
×
820
  pDnode->numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
×
821

822
  if (isMnode) {
×
823
    pDnode->numOfOtherNodes++;
×
824
  }
825

826
  if (pDnode->numOfSupportVnodes > 0) {
×
827
    if (taosArrayPush(pArray, pDnode) == NULL) return false;
×
828
  }
829
  return true;
×
830
}
831

832
// TS-6191
833
static int32_t mndBuildNodesCheckDualReplica(SMnode *pMnode, int32_t nDnodes, SArray *dnodeList, SArray **ppDnodeList) {
1,321,059✔
834
  int32_t code = 0;
1,321,059✔
835
  if (!grantCheckDualReplicaDnodes(pMnode)) {
1,321,059✔
836
    TAOS_RETURN(code);
1,321,059✔
837
  }
838
  SSdb   *pSdb = pMnode->pSdb;
×
839
  SArray *pArray = taosArrayInit(nDnodes, sizeof(SDnodeObj));
×
840
  if (pArray == NULL) {
×
841
    TAOS_RETURN(code = terrno);
×
842
  }
843
  *ppDnodeList = pArray;
×
844

845
  sdbTraverse(pSdb, SDB_DNODE, mndResetDnodesArrayFp, NULL, NULL, NULL);
×
846
  sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesListFp, pArray, NULL, NULL);
×
847

848
  int32_t arrSize = taosArrayGetSize(pArray);
×
849
  if (arrSize <= 0) {
×
850
    TAOS_RETURN(code);
×
851
  }
852
  if (arrSize > 1) taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes1);
×
853

854
  int32_t dnodeListSize = taosArrayGetSize(dnodeList);
×
855
  if (dnodeListSize <= 0) {
×
856
    if (arrSize > 2) taosArrayRemoveBatch(pArray, 2, arrSize - 2, NULL);
×
857
  } else {
858
    int32_t nDnodesWithVnodes = 0;
×
859
    for (int32_t i = 0; i < arrSize; ++i) {
×
860
      SDnodeObj *pDnode = TARRAY_GET_ELEM(pArray, i);
×
861
      if (pDnode->numOfVnodes <= 0) {
×
862
        break;
×
863
      }
864
      ++nDnodesWithVnodes;
×
865
    }
866
    int32_t dnodeId = -1;
×
867
    if (nDnodesWithVnodes == 1) {
×
868
      dnodeId = ((SDnodeObj *)TARRAY_GET_ELEM(pArray, 0))->id;
×
869
    } else if (nDnodesWithVnodes >= 2) {
×
870
      // must select the dnodes from the 1st 2 dnodes
871
      taosArrayRemoveBatch(pArray, 2, arrSize - 2, NULL);
×
872
    }
873
    for (int32_t i = 0; i < TARRAY_SIZE(pArray);) {
×
874
      SDnodeObj *pDnode = taosArrayGet(pArray, i);
×
875
      if (!isDnodeInList(dnodeList, pDnode->id)) {
×
876
        taosArrayRemove(pArray, i);
×
877
        continue;
×
878
      }
879
      ++i;
×
880
    }
881
    if (nDnodesWithVnodes == 1) {
×
882
      SDnodeObj *pDnode = taosArrayGet(pArray, 0);
×
883
      if (pDnode && (pDnode->id != dnodeId)) {  // the first dnode is not in dnodeList, remove the last element
×
884
        taosArrayRemove(pArray, taosArrayGetSize(pArray) - 1);
×
885
      }
886
    }
887
  }
888

889
  TAOS_RETURN(code);
×
890
}
891
#endif
892

893
SArray *mndBuildDnodesArray(SMnode *pMnode, int32_t exceptDnodeId, SArray *dnodeList) {
1,321,059✔
894
  SSdb   *pSdb = pMnode->pSdb;
1,321,059✔
895
  int32_t numOfDnodes = mndGetDnodeSize(pMnode);
1,321,059✔
896
  SArray *tDnodeList = NULL;
1,321,059✔
897
  SArray *pDnodeList = NULL;
1,321,059✔
898

899
  SArray *pArray = taosArrayInit(numOfDnodes, sizeof(SDnodeObj));
1,321,059✔
900
  if (pArray == NULL) {
1,321,059✔
901
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
902
    return NULL;
×
903
  }
904
  if (taosArrayGetSize(dnodeList) > 0) {
1,321,059✔
905
    tDnodeList = dnodeList;
14,307✔
906
  }
907
#ifdef TD_ENTERPRISE
908
  if (0 != mndBuildNodesCheckDualReplica(pMnode, numOfDnodes, tDnodeList, &pDnodeList)) {
1,321,059✔
909
    taosArrayDestroy(pArray);
×
910
    return NULL;
×
911
  }
912
#endif
913
  sdbTraverse(pSdb, SDB_DNODE, mndResetDnodesArrayFp, NULL, NULL, NULL);
1,321,059✔
914
  sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesArrayFp, pArray, &exceptDnodeId, pDnodeList ? pDnodeList : tDnodeList);
1,321,059✔
915

916
  mDebug("build %d dnodes array", (int32_t)taosArrayGetSize(pArray));
1,321,059✔
917
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
3,186,474✔
918
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
1,865,415✔
919
    mDebug("dnode:%d, vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes);
1,865,415✔
920
  }
921
  taosArrayDestroy(pDnodeList);
1,321,059✔
922
  return pArray;
1,321,059✔
923
}
924

925
static int32_t mndCompareDnodeId(int32_t *dnode1Id, int32_t *dnode2Id) {
×
926
  if (*dnode1Id == *dnode2Id) {
×
927
    return 0;
×
928
  }
929
  return *dnode1Id > *dnode2Id ? 1 : -1;
×
930
}
931

932
static float mndGetDnodeScore(SDnodeObj *pDnode, int32_t additionDnodes, float ratio) {
10,731,052✔
933
  float totalDnodes = pDnode->numOfVnodes + (float)pDnode->numOfOtherNodes * ratio + additionDnodes;
10,731,052✔
934
  return totalDnodes / pDnode->numOfSupportVnodes;
10,731,052✔
935
}
936

937
static int32_t mndCompareDnodeVnodes(SDnodeObj *pDnode1, SDnodeObj *pDnode2) {
3,496,367✔
938
  float d1Score = mndGetDnodeScore(pDnode1, 0, 0.9);
3,496,367✔
939
  float d2Score = mndGetDnodeScore(pDnode2, 0, 0.9);
3,496,367✔
940
  if (d1Score == d2Score) {
3,496,367✔
941
    return 0;
1,292,702✔
942
  }
943
  return d1Score > d2Score ? 1 : -1;
2,203,665✔
944
}
945

946
void mndSortVnodeGid(SVgObj *pVgroup) {
2,544,284✔
947
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
5,459,332✔
948
    for (int32_t j = 0; j < pVgroup->replica - 1 - i; ++j) {
3,467,464✔
949
      if (pVgroup->vnodeGid[j].dnodeId > pVgroup->vnodeGid[j + 1].dnodeId) {
552,416✔
950
        TSWAP(pVgroup->vnodeGid[j], pVgroup->vnodeGid[j + 1]);
207,972✔
951
      }
952
    }
953
  }
954
}
2,544,284✔
955

956
static int32_t mndGetAvailableDnode(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, SArray *pArray) {
2,515,269✔
957
  mDebug("start to sort %d dnodes", (int32_t)taosArrayGetSize(pArray));
2,515,269✔
958
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
2,515,269✔
959
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
6,161,077✔
960
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
3,645,808✔
961
    mDebug("dnode:%d, score:%f", pDnode->id, mndGetDnodeScore(pDnode, 0, 0.9));
3,645,808✔
962
  }
963

964
  int32_t size = taosArrayGetSize(pArray);
2,515,269✔
965
  if (size < pVgroup->replica) {
2,515,269✔
966
    mError("db:%s, vgId:%d, no enough online dnodes:%d to alloc %d replica", pVgroup->dbName, pVgroup->vgId, size,
5,332✔
967
           pVgroup->replica);
968
    TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_DNODES);
5,332✔
969
  }
970

971
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
5,333,268✔
972
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
2,823,331✔
973
    SDnodeObj *pDnode = taosArrayGet(pArray, v);
2,823,331✔
974
    if (pDnode == NULL) {
2,823,331✔
975
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_DNODES);
×
976
    }
977
    if (pDnode->numOfVnodes >= pDnode->numOfSupportVnodes) {
2,823,331✔
978
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_VNODES);
×
979
    }
980

981
    int64_t vgMem = mndGetVgroupMemory(pMnode, pDb, pVgroup);
2,823,331✔
982
    if (pDnode->memAvail - vgMem - pDnode->memUsed <= 0) {
2,823,331✔
983
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d, avail:%" PRId64 " used:%" PRId64,
×
984
             pVgroup->dbName, pVgroup->vgId, vgMem, pDnode->id, pDnode->memAvail, pDnode->memUsed);
985
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE);
×
986
    } else {
987
      pDnode->memUsed += vgMem;
2,823,331✔
988
    }
989

990
    pVgid->dnodeId = pDnode->id;
2,823,331✔
991
    if (pVgroup->replica == 1) {
2,823,331✔
992
      pVgid->syncState = TAOS_SYNC_STATE_LEADER;
2,350,243✔
993
    } else {
994
      pVgid->syncState = TAOS_SYNC_STATE_FOLLOWER;
473,088✔
995
    }
996

997
    mInfo("db:%s, vgId:%d, vn:%d is alloced, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
2,823,331✔
998
          pVgroup->dbName, pVgroup->vgId, v, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
999
    pDnode->numOfVnodes++;
2,823,331✔
1000
  }
1001

1002
  mndSortVnodeGid(pVgroup);
2,509,937✔
1003
  return 0;
2,509,937✔
1004
}
1005

1006
int32_t mndAllocSmaVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup) {
×
1007
  int32_t code = 0;
×
1008
  SArray *pArray = mndBuildDnodesArray(pMnode, 0, NULL);
×
1009
  if (pArray == NULL) {
×
1010
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1011
    if (terrno != 0) code = terrno;
×
1012
    TAOS_RETURN(code);
×
1013
  }
1014

1015
  pVgroup->vgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
×
1016
  pVgroup->isTsma = 1;
×
1017
  pVgroup->createdTime = taosGetTimestampMs();
×
1018
  pVgroup->updateTime = pVgroup->createdTime;
×
1019
  pVgroup->version = 1;
×
1020
  memcpy(pVgroup->dbName, pDb->name, TSDB_DB_FNAME_LEN);
×
1021
  pVgroup->dbUid = pDb->uid;
×
1022
  pVgroup->replica = 1;
×
1023
  pVgroup->keepVersion = -1;  // default: WAL keep version disabled
×
1024
  pVgroup->keepVersionTime = 0;
×
1025

1026
  if (mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray) != 0) return -1;
×
1027
  taosArrayDestroy(pArray);
×
1028

1029
  mInfo("db:%s, sma vgId:%d is alloced", pDb->name, pVgroup->vgId);
×
1030
  return 0;
×
1031
}
1032

1033
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups, SArray *dnodeList) {
1,194,971✔
1034
  int32_t code = -1;
1,194,971✔
1035
  SArray *pArray = NULL;
1,194,971✔
1036
  SVgObj *pVgroups = NULL;
1,194,971✔
1037

1038
  pVgroups = taosMemoryCalloc(pDb->cfg.numOfVgroups, sizeof(SVgObj));
1,194,971✔
1039
  if (pVgroups == NULL) {
1,194,971✔
1040
    code = terrno;
×
1041
    goto _OVER;
×
1042
  }
1043

1044
  pArray = mndBuildDnodesArray(pMnode, 0, dnodeList);
1,194,971✔
1045
  if (pArray == NULL) {
1,194,971✔
1046
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1047
    if (terrno != 0) code = terrno;
×
1048
    goto _OVER;
×
1049
  }
1050

1051
  mInfo("db:%s, total %d dnodes used to create %d vgroups (%d vnodes)", pDb->name, (int32_t)taosArrayGetSize(pArray),
1,194,971✔
1052
        pDb->cfg.numOfVgroups, pDb->cfg.numOfVgroups * pDb->cfg.replications);
1053

1054
  int32_t  allocedVgroups = 0;
1,194,971✔
1055
  int32_t  maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
1,194,971✔
1056
  uint32_t hashMin = 0;
1,194,971✔
1057
  uint32_t hashMax = UINT32_MAX;
1,194,971✔
1058
  uint32_t hashInterval = (hashMax - hashMin) / pDb->cfg.numOfVgroups;
1,194,971✔
1059

1060
  if (maxVgId < 2) maxVgId = 2;
1,194,971✔
1061

1062
  for (uint32_t v = 0; v < pDb->cfg.numOfVgroups; v++) {
3,704,908✔
1063
    SVgObj *pVgroup = &pVgroups[v];
2,515,269✔
1064
    pVgroup->vgId = maxVgId++;
2,515,269✔
1065
    pVgroup->createdTime = taosGetTimestampMs();
2,515,269✔
1066
    pVgroup->updateTime = pVgroups->createdTime;
2,515,269✔
1067
    pVgroup->version = 1;
2,515,269✔
1068
    pVgroup->hashBegin = hashMin + hashInterval * v;
2,515,269✔
1069
    if (v == pDb->cfg.numOfVgroups - 1) {
2,515,269✔
1070
      pVgroup->hashEnd = hashMax;
1,192,170✔
1071
    } else {
1072
      pVgroup->hashEnd = hashMin + hashInterval * (v + 1) - 1;
1,323,099✔
1073
    }
1074

1075
    memcpy(pVgroup->dbName, pDb->name, TSDB_DB_FNAME_LEN);
2,515,269✔
1076
    pVgroup->dbUid = pDb->uid;
2,515,269✔
1077
    pVgroup->replica = pDb->cfg.replications;
2,515,269✔
1078
    pVgroup->keepVersion = -1;  // default: WAL keep version disabled
2,515,269✔
1079
    pVgroup->keepVersionTime = 0;
2,515,269✔
1080

1081
    if ((code = mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray)) != 0) {
2,515,269✔
1082
      goto _OVER;
5,332✔
1083
    }
1084

1085
    allocedVgroups++;
2,509,937✔
1086
  }
1087

1088
  *ppVgroups = pVgroups;
1,189,639✔
1089
  code = 0;
1,189,639✔
1090

1091
  mInfo("db:%s, total %d vgroups is alloced, replica:%d", pDb->name, pDb->cfg.numOfVgroups, pDb->cfg.replications);
1,189,639✔
1092

1093
_OVER:
×
1094
  if (code != 0) taosMemoryFree(pVgroups);
1,194,971✔
1095
  taosArrayDestroy(pArray);
1,194,971✔
1096
  TAOS_RETURN(code);
1,194,971✔
1097
}
1098

1099
SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup) {
27,323,170✔
1100
  SEpSet epset = {0};
27,323,170✔
1101

1102
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
62,378,223✔
1103
    const SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
35,055,053✔
1104
    SDnodeObj       *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
35,055,053✔
1105
    if (pDnode == NULL) continue;
35,055,053✔
1106

1107
    if (pVgid->syncState == TAOS_SYNC_STATE_LEADER || pVgid->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
35,034,895✔
1108
      epset.inUse = epset.numOfEps;
27,014,782✔
1109
    }
1110

1111
    if (addEpIntoEpSet(&epset, pDnode->fqdn, pDnode->port) != 0) {
35,034,895✔
1112
      mWarn("vgId:%d, failed to add ep:%s:%d into epset", pVgroup->vgId, pDnode->fqdn, pDnode->port);
×
1113
    }
1114
    mndReleaseDnode(pMnode, pDnode);
35,034,895✔
1115
  }
1116
  epsetSort(&epset);
27,323,170✔
1117

1118
  return epset;
27,323,170✔
1119
}
1120

1121
SEpSet mndGetVgroupEpsetById(SMnode *pMnode, int32_t vgId) {
618,950✔
1122
  SEpSet epset = {0};
618,950✔
1123

1124
  SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
618,950✔
1125
  if (!pVgroup) return epset;
618,950✔
1126

1127
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
1,273,376✔
1128
    const SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
654,426✔
1129
    SDnodeObj       *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
654,426✔
1130
    if (pDnode == NULL) continue;
654,426✔
1131

1132
    if (pVgid->syncState == TAOS_SYNC_STATE_LEADER || pVgid->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
654,426✔
1133
      epset.inUse = epset.numOfEps;
606,243✔
1134
    }
1135

1136
    if (addEpIntoEpSet(&epset, pDnode->fqdn, pDnode->port) != 0) {
654,426✔
1137
      mWarn("vgId:%d, failed to add ep:%s:%d into epset", pVgroup->vgId, pDnode->fqdn, pDnode->port);
×
1138
    }
1139
    mndReleaseDnode(pMnode, pDnode);
654,426✔
1140
  }
1141

1142
  mndReleaseVgroup(pMnode, pVgroup);
618,950✔
1143
  return epset;
618,950✔
1144
}
1145

1146
static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
301,601✔
1147
  SMnode *pMnode = pReq->info.node;
301,601✔
1148
  SSdb   *pSdb = pMnode->pSdb;
301,601✔
1149
  int32_t numOfRows = 0;
301,601✔
1150
  SVgObj *pVgroup = NULL;
301,601✔
1151
  int32_t cols = 0;
301,601✔
1152
  int64_t curMs = taosGetTimestampMs();
301,601✔
1153
  int32_t code = 0, lino = 0;
301,601✔
1154

1155
  SDbObj *pDb = NULL;
301,601✔
1156
  if (strlen(pShow->db) > 0) {
301,601✔
1157
    pDb = mndAcquireDb(pMnode, pShow->db);
254,746✔
1158
    if (pDb == NULL) {
254,746✔
1159
      return 0;
×
1160
    }
1161
  }
1162

1163
  while (numOfRows < rows) {
1,719,535✔
1164
    pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
1,719,535✔
1165
    if (pShow->pIter == NULL) break;
1,719,535✔
1166

1167
    if (pDb != NULL && pVgroup->dbUid != pDb->uid) {
1,417,934✔
1168
      sdbRelease(pSdb, pVgroup);
482,055✔
1169
      continue;
482,055✔
1170
    }
1171

1172
    cols = 0;
935,879✔
1173
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
935,879✔
1174
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->vgId, false, pVgroup, pShow->pIter, _OVER);
935,879✔
1175

1176
    SName name = {0};
935,879✔
1177
    char  db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
935,879✔
1178
    code = tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB);
935,879✔
1179
    if (code != 0) {
935,879✔
1180
      mError("vgId:%d, failed to set dbName, since %s", pVgroup->vgId, tstrerror(code));
×
1181
      sdbRelease(pSdb, pVgroup);
×
1182
      sdbCancelFetch(pSdb, pShow->pIter);
×
1183
      return code;
×
1184
    }
1185
    (void)tNameGetDbName(&name, varDataVal(db));
935,879✔
1186
    varDataSetLen(db, strlen(varDataVal(db)));
935,879✔
1187

1188
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
935,879✔
1189
    COL_DATA_SET_VAL_GOTO((const char *)db, false, pVgroup, pShow->pIter, _OVER);
935,879✔
1190

1191
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
935,879✔
1192
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->numOfTables, false, pVgroup, pShow->pIter, _OVER);
935,879✔
1193

1194
    // default 3 replica, add 1 replica if move vnode
1195
    for (int32_t i = 0; i < 4; ++i) {
4,679,395✔
1196
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3,743,516✔
1197
      if (i < pVgroup->replica) {
3,743,516✔
1198
        int16_t dnodeId = (int16_t)pVgroup->vnodeGid[i].dnodeId;
1,958,428✔
1199
        COL_DATA_SET_VAL_GOTO((const char *)&dnodeId, false, pVgroup, pShow->pIter, _OVER);
1,958,428✔
1200

1201
        bool       exist = false;
1,958,428✔
1202
        bool       online = false;
1,958,428✔
1203
        SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgroup->vnodeGid[i].dnodeId);
1,958,428✔
1204
        if (pDnode != NULL) {
1,958,428✔
1205
          exist = true;
1,958,428✔
1206
          online = mndIsDnodeOnline(pDnode, curMs);
1,958,428✔
1207
          mndReleaseDnode(pMnode, pDnode);
1,958,428✔
1208
        }
1209

1210
        char buf1[20] = {0};
1,958,428✔
1211
        char role[20] = "offline";
1,958,428✔
1212
        if (!exist) {
1,958,428✔
1213
          tstrncpy(role, "dropping", sizeof(role));
×
1214
        } else if (online) {
1,958,428✔
1215
          char *star = "";
1,930,622✔
1216
          if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEADER ||
1,930,622✔
1217
              pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
1,196,716✔
1218
            if (!pVgroup->vnodeGid[i].syncRestore && !pVgroup->vnodeGid[i].syncCanRead) {
733,906✔
1219
              star = "**";
94,493✔
1220
            } else if (!pVgroup->vnodeGid[i].syncRestore && pVgroup->vnodeGid[i].syncCanRead) {
639,413✔
1221
              star = "*";
×
1222
            } else {
1223
            }
1224
          }
1225
          snprintf(role, sizeof(role), "%s%s", syncStr(pVgroup->vnodeGid[i].syncState), star);
1,930,622✔
1226
          /*
1227
          mInfo("db:%s, learner progress:%d", pDb->name, pVgroup->vnodeGid[i].learnerProgress);
1228

1229
          if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEARNER) {
1230
            if(pVgroup->vnodeGid[i].learnerProgress < 0){
1231
              snprintf(role, sizeof(role), "%s-",
1232
                syncStr(pVgroup->vnodeGid[i].syncState));
1233

1234
            }
1235
            else if(pVgroup->vnodeGid[i].learnerProgress >= 100){
1236
              snprintf(role, sizeof(role), "%s--",
1237
                syncStr(pVgroup->vnodeGid[i].syncState));
1238
            }
1239
            else{
1240
              snprintf(role, sizeof(role), "%s%d",
1241
                syncStr(pVgroup->vnodeGid[i].syncState), pVgroup->vnodeGid[i].learnerProgress);
1242
            }
1243
          }
1244
          else{
1245
            snprintf(role, sizeof(role), "%s%s", syncStr(pVgroup->vnodeGid[i].syncState), star);
1246
          }
1247
          */
1248
        } else {
1249
        }
1250
        STR_WITH_MAXSIZE_TO_VARSTR(buf1, role, pShow->pMeta->pSchemas[cols].bytes);
1,958,428✔
1251

1252
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,958,428✔
1253
        COL_DATA_SET_VAL_GOTO((const char *)buf1, false, pVgroup, pShow->pIter, _OVER);
1,958,428✔
1254

1255
        char applyStr[TSDB_SYNC_APPLY_COMMIT_LEN + 1] = {0};
1,958,428✔
1256
        char buf[TSDB_SYNC_APPLY_COMMIT_LEN + VARSTR_HEADER_SIZE + 1] = {0};
1,958,428✔
1257
        snprintf(applyStr, sizeof(applyStr), "%" PRId64 "/%" PRId64, pVgroup->vnodeGid[i].syncAppliedIndex,
1,958,428✔
1258
                 pVgroup->vnodeGid[i].syncCommitIndex);
1,958,428✔
1259
        STR_WITH_MAXSIZE_TO_VARSTR(buf, applyStr, pShow->pMeta->pSchemas[cols].bytes);
1,958,428✔
1260

1261
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,958,428✔
1262
        COL_DATA_SET_VAL_GOTO((const char *)&buf, false, pVgroup, pShow->pIter, _OVER);
1,958,428✔
1263
      } else {
1264
        colDataSetNULL(pColInfo, numOfRows);
1,785,088✔
1265
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,785,088✔
1266
        colDataSetNULL(pColInfo, numOfRows);
1,785,088✔
1267
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,785,088✔
1268
        colDataSetNULL(pColInfo, numOfRows);
1,785,088✔
1269
      }
1270
    }
1271

1272
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
935,879✔
1273
    int32_t cacheUsage = (int32_t)pVgroup->cacheUsage;
935,879✔
1274
    COL_DATA_SET_VAL_GOTO((const char *)&cacheUsage, false, pVgroup, pShow->pIter, _OVER);
935,879✔
1275

1276
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
935,879✔
1277
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->numOfCachedTables, false, pVgroup, pShow->pIter, _OVER);
935,879✔
1278

1279
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
935,879✔
1280
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->isTsma, false, pVgroup, pShow->pIter, _OVER);
935,879✔
1281

1282
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
935,879✔
1283
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->mountVgId, false, pVgroup, pShow->pIter, _OVER);
935,879✔
1284

1285
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
935,879✔
1286
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->keepVersion, false);
935,879✔
1287
    if (code != 0) {
935,879✔
1288
      mError("vgId:%d, failed to set keepVersion, since %s", pVgroup->vgId, tstrerror(code));
×
1289
      return code;
×
1290
    }
1291

1292
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
935,879✔
1293
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->keepVersionTime, false);
935,879✔
1294
    if (code != 0) {
935,879✔
1295
      mError("vgId:%d, failed to set keepVersionTime, since %s", pVgroup->vgId, tstrerror(code));
×
1296
      return code;
×
1297
    }
1298

1299
    numOfRows++;
935,879✔
1300
    sdbRelease(pSdb, pVgroup);
935,879✔
1301
  }
1302
_OVER:
301,601✔
1303
  if (pDb != NULL) {
301,601✔
1304
    mndReleaseDb(pMnode, pDb);
254,746✔
1305
  }
1306
  if (code != 0) {
301,601✔
1307
    mError("failed to retrieve vgroup info at line %d since %s", lino, tstrerror(code));
×
1308
    TAOS_RETURN(code);
×
1309
  }
1310

1311
  pShow->numOfRows += numOfRows;
301,601✔
1312
  return numOfRows;
301,601✔
1313
}
1314

1315
static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter) {
×
1316
  SSdb *pSdb = pMnode->pSdb;
×
1317
  sdbCancelFetchByType(pSdb, pIter, SDB_VGROUP);
×
1318
}
×
1319

1320
static bool mndGetVnodesNumFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
11,916,121✔
1321
  SVgObj  *pVgroup = pObj;
11,916,121✔
1322
  int32_t  dnodeId = *(int32_t *)p1;
11,916,121✔
1323
  int32_t *pNumOfVnodes = (int32_t *)p2;
11,916,121✔
1324

1325
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
33,670,558✔
1326
    if (pVgroup->vnodeGid[v].dnodeId == dnodeId) {
21,754,437✔
1327
      (*pNumOfVnodes)++;
6,949,157✔
1328
    }
1329
  }
1330

1331
  return true;
11,916,121✔
1332
}
1333

1334
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) {
4,352,295✔
1335
  int32_t numOfVnodes = 0;
4,352,295✔
1336
  sdbTraverse(pMnode->pSdb, SDB_VGROUP, mndGetVnodesNumFp, &dnodeId, &numOfVnodes, NULL);
4,352,295✔
1337
  return numOfVnodes;
4,352,295✔
1338
}
1339

1340
int64_t mndGetVgroupMemory(SMnode *pMnode, SDbObj *pDbInput, SVgObj *pVgroup) {
7,716,908✔
1341
  SDbObj *pDb = pDbInput;
7,716,908✔
1342
  if (pDbInput == NULL) {
7,716,908✔
1343
    pDb = mndAcquireDb(pMnode, pVgroup->dbName);
4,261,571✔
1344
  }
1345

1346
  int64_t vgroupMemroy = 0;
7,716,908✔
1347
  if (pDb != NULL) {
7,716,908✔
1348
    int64_t buffer = (int64_t)pDb->cfg.buffer * 1024 * 1024;
7,716,908✔
1349
    int64_t cache = (int64_t)pDb->cfg.pages * pDb->cfg.pageSize * 1024;
7,716,908✔
1350
    vgroupMemroy = buffer + cache;
7,716,908✔
1351
    int64_t cacheLast = (int64_t)pDb->cfg.cacheLastSize * 1024 * 1024;
7,716,908✔
1352
    if (pDb->cfg.cacheLast > 0) {
7,716,908✔
1353
      vgroupMemroy += cacheLast;
964,357✔
1354
    }
1355
    mDebug("db:%s, vgroup:%d, buffer:%" PRId64 " cache:%" PRId64 " cacheLast:%" PRId64, pDb->name, pVgroup->vgId,
7,716,908✔
1356
           buffer, cache, cacheLast);
1357
  }
1358

1359
  if (pDbInput == NULL) {
7,716,908✔
1360
    mndReleaseDb(pMnode, pDb);
4,261,571✔
1361
  }
1362
  return vgroupMemroy;
7,716,908✔
1363
}
1364

1365
static bool mndGetVnodeMemroyFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
6,063,089✔
1366
  SVgObj  *pVgroup = pObj;
6,063,089✔
1367
  int32_t  dnodeId = *(int32_t *)p1;
6,063,089✔
1368
  int64_t *pVnodeMemory = (int64_t *)p2;
6,063,089✔
1369

1370
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
14,821,738✔
1371
    if (pVgroup->vnodeGid[v].dnodeId == dnodeId) {
8,758,649✔
1372
      *pVnodeMemory += mndGetVgroupMemory(pMnode, NULL, pVgroup);
4,117,570✔
1373
    }
1374
  }
1375

1376
  return true;
6,063,089✔
1377
}
1378

1379
int64_t mndGetVnodesMemory(SMnode *pMnode, int32_t dnodeId) {
1,912,571✔
1380
  int64_t vnodeMemory = 0;
1,912,571✔
1381
  sdbTraverse(pMnode->pSdb, SDB_VGROUP, mndGetVnodeMemroyFp, &dnodeId, &vnodeMemory, NULL);
1,912,571✔
1382
  return vnodeMemory;
1,912,571✔
1383
}
1384

1385
void calculateRstoreFinishTime(double rate, int64_t applyCount, char *restoreStr, size_t restoreStrSize) {
80✔
1386
  if (rate == 0) {
80✔
1387
    snprintf(restoreStr, restoreStrSize, "0:0:0");
80✔
1388
    return;
80✔
1389
  }
1390

UNCOV
1391
  int64_t costTime = applyCount / rate;
×
UNCOV
1392
  int64_t totalSeconds = costTime / 1000;
×
UNCOV
1393
  int64_t hours = totalSeconds / 3600;
×
UNCOV
1394
  totalSeconds %= 3600;
×
UNCOV
1395
  int64_t minutes = totalSeconds / 60;
×
UNCOV
1396
  int64_t seconds = totalSeconds % 60;
×
UNCOV
1397
  snprintf(restoreStr, restoreStrSize, "%" PRId64 ":%" PRId64 ":%" PRId64, hours, minutes, seconds);
×
1398
}
1399

1400
static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
1,598✔
1401
  SMnode *pMnode = pReq->info.node;
1,598✔
1402
  SSdb   *pSdb = pMnode->pSdb;
1,598✔
1403
  int32_t numOfRows = 0;
1,598✔
1404
  SVgObj *pVgroup = NULL;
1,598✔
1405
  int32_t cols = 0;
1,598✔
1406
  int64_t curMs = taosGetTimestampMs();
1,598✔
1407
  int32_t code = 0;
1,598✔
1408

1409
  while (numOfRows < rows - TSDB_MAX_REPLICA) {
5,067✔
1410
    pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
5,067✔
1411
    if (pShow->pIter == NULL) break;
5,067✔
1412

1413
    for (int32_t i = 0; i < pVgroup->replica && numOfRows < rows; ++i) {
9,111✔
1414
      SVnodeGid       *pGid = &pVgroup->vnodeGid[i];
5,642✔
1415
      SColumnInfoData *pColInfo = NULL;
5,642✔
1416
      cols = 0;
5,642✔
1417

1418
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
5,642✔
1419
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pGid->dnodeId, false);
5,642✔
1420
      if (code != 0) {
5,642✔
1421
        mError("vgId:%d, failed to set dnodeId, since %s", pVgroup->vgId, tstrerror(code));
×
1422
        return code;
×
1423
      }
1424
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
5,642✔
1425
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->vgId, false);
5,642✔
1426
      if (code != 0) {
5,642✔
1427
        mError("vgId:%d, failed to set vgId, since %s", pVgroup->vgId, tstrerror(code));
×
1428
        return code;
×
1429
      }
1430

1431
      // db_name
1432
      const char *dbname = mndGetDbStr(pVgroup->dbName);
5,642✔
1433
      char        b1[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
5,642✔
1434
      if (dbname != NULL) {
5,642✔
1435
        STR_WITH_MAXSIZE_TO_VARSTR(b1, dbname, TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE);
5,642✔
1436
      } else {
1437
        STR_WITH_MAXSIZE_TO_VARSTR(b1, "NULL", TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE);
×
1438
      }
1439
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
5,642✔
1440
      code = colDataSetVal(pColInfo, numOfRows, (const char *)b1, false);
5,642✔
1441
      if (code != 0) {
5,642✔
1442
        mError("vgId:%d, failed to set dbName, since %s", pVgroup->vgId, tstrerror(code));
×
1443
        return code;
×
1444
      }
1445

1446
      // dnode is online?
1447
      SDnodeObj *pDnode = mndAcquireDnode(pMnode, pGid->dnodeId);
5,642✔
1448
      if (pDnode == NULL) {
5,642✔
1449
        mError("failed to acquire dnode. dnodeId:%d", pGid->dnodeId);
×
1450
        break;
×
1451
      }
1452
      bool isDnodeOnline = mndIsDnodeOnline(pDnode, curMs);
5,642✔
1453

1454
      char       buf[20] = {0};
5,642✔
1455
      ESyncState syncState = (isDnodeOnline) ? pGid->syncState : TAOS_SYNC_STATE_OFFLINE;
5,642✔
1456
      STR_TO_VARSTR(buf, syncStr(syncState));
5,642✔
1457
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
5,642✔
1458
      code = colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
5,642✔
1459
      if (code != 0) {
5,642✔
1460
        mError("vgId:%d, failed to set syncState, since %s", pVgroup->vgId, tstrerror(code));
×
1461
        return code;
×
1462
      }
1463

1464
      int64_t roleTimeMs = (isDnodeOnline) ? pGid->roleTimeMs : 0;
5,642✔
1465
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
5,642✔
1466
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&roleTimeMs, false);
5,642✔
1467
      if (code != 0) {
5,642✔
1468
        mError("vgId:%d, failed to set roleTimeMs, since %s", pVgroup->vgId, tstrerror(code));
×
1469
        return code;
×
1470
      }
1471

1472
      int64_t startTimeMs = (isDnodeOnline) ? pGid->startTimeMs : 0;
5,642✔
1473
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
5,642✔
1474
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&startTimeMs, false);
5,642✔
1475
      if (code != 0) {
5,642✔
1476
        mError("vgId:%d, failed to set startTimeMs, since %s", pVgroup->vgId, tstrerror(code));
×
1477
        return code;
×
1478
      }
1479

1480
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
5,642✔
1481
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pGid->syncRestore, false);
5,642✔
1482
      if (code != 0) {
5,642✔
1483
        mError("vgId:%d, failed to set syncRestore, since %s", pVgroup->vgId, tstrerror(code));
×
1484
        return code;
×
1485
      }
1486

1487
      int64_t unappliedCount = pGid->syncCommitIndex - pGid->syncAppliedIndex;
5,642✔
1488
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
5,642✔
1489
      char restoreStr[20] = {0};
5,642✔
1490
      if (unappliedCount > 0) {
5,642✔
1491
        calculateRstoreFinishTime(pGid->appliedRate, unappliedCount, restoreStr, sizeof(restoreStr));
80✔
1492
      }
1493
      STR_TO_VARSTR(buf, restoreStr);
5,642✔
1494
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&buf, false);
5,642✔
1495
      if (code != 0) {
5,642✔
1496
        mError("vgId:%d, failed to set syncRestore finish time, since %s", pVgroup->vgId, tstrerror(code));
×
1497
        return code;
×
1498
      }
1499

1500
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
5,642✔
1501
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&unappliedCount, false);
5,642✔
1502
      if (code != 0) {
5,642✔
1503
        mError("vgId:%d, failed to set syncRestore, since %s", pVgroup->vgId, tstrerror(code));
×
1504
        return code;
×
1505
      }
1506

1507
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
5,642✔
1508
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pGid->bufferSegmentUsed, false);
5,642✔
1509
      if (code != 0) {
5,642✔
1510
        mError("vgId:%d, failed to set buffer segment used, since %s", pVgroup->vgId, tstrerror(code));
×
1511
        return code;
×
1512
      }
1513

1514
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
5,642✔
1515
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pGid->bufferSegmentSize, false);
5,642✔
1516
      if (code != 0) {
5,642✔
1517
        mError("vgId:%d, failed to set buffer segment size, since %s", pVgroup->vgId, tstrerror(code));
×
1518
        return code;
×
1519
      }
1520

1521
      numOfRows++;
5,642✔
1522
      sdbRelease(pSdb, pDnode);
5,642✔
1523
    }
1524

1525
    sdbRelease(pSdb, pVgroup);
3,469✔
1526
  }
1527

1528
  pShow->numOfRows += numOfRows;
1,598✔
1529
  return numOfRows;
1,598✔
1530
}
1531

1532
static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter) {
×
1533
  SSdb *pSdb = pMnode->pSdb;
×
1534
  sdbCancelFetchByType(pSdb, pIter, SDB_VGROUP);
×
1535
}
×
1536

1537
static int32_t mndAddVnodeToVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray) {
81,175✔
1538
  int32_t code = 0;
81,175✔
1539
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
81,175✔
1540
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
339,036✔
1541
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
257,861✔
1542
    mInfo("trans:%d, dnode:%d, equivalent vnodes:%d others:%d", pTrans->id, pDnode->id, pDnode->numOfVnodes,
257,861✔
1543
          pDnode->numOfOtherNodes);
1544
  }
1545

1546
  SVnodeGid *pVgid = &pVgroup->vnodeGid[pVgroup->replica];
81,175✔
1547
  for (int32_t d = 0; d < taosArrayGetSize(pArray); ++d) {
105,028✔
1548
    SDnodeObj *pDnode = taosArrayGet(pArray, d);
104,150✔
1549

1550
    bool used = false;
104,150✔
1551
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
243,490✔
1552
      if (pDnode->id == pVgroup->vnodeGid[vn].dnodeId) {
163,193✔
1553
        used = true;
23,853✔
1554
        break;
23,853✔
1555
      }
1556
    }
1557
    if (used) continue;
104,150✔
1558

1559
    if (pDnode == NULL) {
80,297✔
1560
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_DNODES);
×
1561
    }
1562
    if (pDnode->numOfVnodes >= pDnode->numOfSupportVnodes) {
80,297✔
1563
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_VNODES);
×
1564
    }
1565

1566
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
80,297✔
1567
    if (pDnode->memAvail - vgMem - pDnode->memUsed <= 0) {
80,297✔
1568
      mError("trans:%d, db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
1569
             pTrans->id, pVgroup->dbName, pVgroup->vgId, vgMem, pDnode->id, pDnode->memAvail, pDnode->memUsed);
1570
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE);
×
1571
    } else {
1572
      pDnode->memUsed += vgMem;
80,297✔
1573
    }
1574

1575
    pVgid->dnodeId = pDnode->id;
80,297✔
1576
    pVgid->syncState = TAOS_SYNC_STATE_OFFLINE;
80,297✔
1577
    mInfo("trans:%id, db:%s, vgId:%d, vn:%d is added, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
80,297✔
1578
          pTrans->id, pVgroup->dbName, pVgroup->vgId, pVgroup->replica, vgMem, pVgid->dnodeId, pDnode->memAvail,
1579
          pDnode->memUsed);
1580

1581
    pVgroup->replica++;
80,297✔
1582
    pDnode->numOfVnodes++;
80,297✔
1583

1584
    SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
80,297✔
1585
    if (pVgRaw == NULL) {
80,297✔
1586
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1587
      if (terrno != 0) code = terrno;
×
1588
      TAOS_RETURN(code);
×
1589
    }
1590
    if ((code = mndTransAppendGroupRedolog(pTrans, pVgRaw, pVgroup->vgId)) != 0) {
80,297✔
1591
      sdbFreeRaw(pVgRaw);
×
1592
      TAOS_RETURN(code);
×
1593
    }
1594
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
80,297✔
1595
    if (code != 0) {
80,297✔
1596
      mError("trans:%d, vgId:%d, failed to set raw status since %s at line:%d", pTrans->id, pVgroup->vgId,
×
1597
             tstrerror(code), __LINE__);
1598
    }
1599
    TAOS_RETURN(code);
80,297✔
1600
  }
1601

1602
  code = TSDB_CODE_MND_NO_ENOUGH_DNODES;
878✔
1603
  mError("trans:%d, db:%s, failed to add vnode to vgId:%d since %s", pTrans->id, pVgroup->dbName, pVgroup->vgId,
878✔
1604
         tstrerror(code));
1605
  TAOS_RETURN(code);
878✔
1606
}
1607

1608
static int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray,
14,605✔
1609
                                        SVnodeGid *pDelVgid) {
1610
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
14,605✔
1611
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
65,535✔
1612
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
50,930✔
1613
    mInfo("trans:%d, dnode:%d, equivalent vnodes:%d others:%d", pTrans->id, pDnode->id, pDnode->numOfVnodes,
50,930✔
1614
          pDnode->numOfOtherNodes);
1615
  }
1616

1617
  int32_t code = -1;
14,605✔
1618
  for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) {
19,731✔
1619
    SDnodeObj *pDnode = taosArrayGet(pArray, d);
19,731✔
1620

1621
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
40,617✔
1622
      SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
35,491✔
1623
      if (pVgid->dnodeId == pDnode->id) {
35,491✔
1624
        int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
14,605✔
1625
        pDnode->memUsed -= vgMem;
14,605✔
1626
        mInfo("trans:%d, db:%s, vgId:%d, vn:%d is removed, memory:%" PRId64 ", dnode:%d avail:%" PRId64
14,605✔
1627
              " used:%" PRId64,
1628
              pTrans->id, pVgroup->dbName, pVgroup->vgId, vn, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
1629
        pDnode->numOfVnodes--;
14,605✔
1630
        pVgroup->replica--;
14,605✔
1631
        *pDelVgid = *pVgid;
14,605✔
1632
        *pVgid = pVgroup->vnodeGid[pVgroup->replica];
14,605✔
1633
        memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
14,605✔
1634
        code = 0;
14,605✔
1635
        goto _OVER;
14,605✔
1636
      }
1637
    }
1638
  }
1639

UNCOV
1640
_OVER:
×
1641
  if (code != 0) {
14,605✔
UNCOV
1642
    code = TSDB_CODE_APP_ERROR;
×
UNCOV
1643
    mError("trans:%d, db:%s, failed to remove vnode from vgId:%d since %s", pTrans->id, pVgroup->dbName, pVgroup->vgId,
×
1644
           tstrerror(code));
UNCOV
1645
    TAOS_RETURN(code);
×
1646
  }
1647

1648
  for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
38,886✔
1649
    SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
24,281✔
1650
    mInfo("trans:%d, db:%s, vgId:%d, vn:%d dnode:%d is reserved", pTrans->id, pVgroup->dbName, pVgroup->vgId, vn,
24,281✔
1651
          pVgid->dnodeId);
1652
  }
1653

1654
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
14,605✔
1655
  if (pVgRaw == NULL) {
14,605✔
1656
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1657
    if (terrno != 0) code = terrno;
×
1658
    TAOS_RETURN(code);
×
1659
  }
1660
  if (mndTransAppendGroupRedolog(pTrans, pVgRaw, pVgroup->vgId) != 0) {
14,605✔
1661
    sdbFreeRaw(pVgRaw);
×
1662
    TAOS_RETURN(code);
×
1663
  }
1664
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
14,605✔
1665
  if (code != 0) {
14,605✔
1666
    mError("trans:%d, vgId:%d, failed to set raw status since %s at line:%d", pTrans->id, pVgroup->vgId,
×
1667
           tstrerror(code), __LINE__);
1668
  }
1669

1670
  TAOS_RETURN(code);
14,605✔
1671
}
1672

1673
static int32_t mndRemoveVnodeFromVgroupWithoutSave(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray,
×
1674
                                                   SVnodeGid *pDelVgid) {
1675
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
×
1676
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
×
1677
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
×
1678
    mInfo("dnode:%d, equivalent vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes);
×
1679
  }
1680

1681
  int32_t code = -1;
×
1682
  for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) {
×
1683
    SDnodeObj *pDnode = taosArrayGet(pArray, d);
×
1684

1685
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
×
1686
      SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
×
1687
      if (pVgid->dnodeId == pDnode->id) {
×
1688
        int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
×
1689
        pDnode->memUsed -= vgMem;
×
1690
        mInfo("db:%s, vgId:%d, vn:%d is removed, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
×
1691
              pVgroup->dbName, pVgroup->vgId, vn, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
1692
        pDnode->numOfVnodes--;
×
1693
        pVgroup->replica--;
×
1694
        *pDelVgid = *pVgid;
×
1695
        *pVgid = pVgroup->vnodeGid[pVgroup->replica];
×
1696
        memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
×
1697
        code = 0;
×
1698
        goto _OVER;
×
1699
      }
1700
    }
1701
  }
1702

1703
_OVER:
×
1704
  if (code != 0) {
×
1705
    code = TSDB_CODE_APP_ERROR;
×
1706
    mError("db:%s, failed to remove vnode from vgId:%d since %s", pVgroup->dbName, pVgroup->vgId, tstrerror(code));
×
1707
    TAOS_RETURN(code);
×
1708
  }
1709

1710
  for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
×
1711
    SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
×
1712
    mInfo("db:%s, vgId:%d, vn:%d dnode:%d is reserved", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId);
×
1713
  }
1714

1715
  TAOS_RETURN(code);
×
1716
}
1717

1718
int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid) {
2,968,207✔
1719
  int32_t      code = 0;
2,968,207✔
1720
  STransAction action = {0};
2,968,207✔
1721

1722
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
2,968,207✔
1723
  if (pDnode == NULL) return -1;
2,968,207✔
1724
  action.epSet = mndGetDnodeEpset(pDnode);
2,968,207✔
1725
  mndReleaseDnode(pMnode, pDnode);
2,968,207✔
1726

1727
  int32_t contLen = 0;
2,968,207✔
1728
  void   *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
2,968,207✔
1729
  if (pReq == NULL) return -1;
2,968,207✔
1730

1731
  action.pCont = pReq;
2,968,207✔
1732
  action.contLen = contLen;
2,968,207✔
1733
  action.msgType = TDMT_DND_CREATE_VNODE;
2,968,207✔
1734
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
2,968,207✔
1735
  action.groupId = pVgroup->vgId;
2,968,207✔
1736

1737
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
2,968,207✔
1738
    taosMemoryFree(pReq);
×
1739
    TAOS_RETURN(code);
×
1740
  }
1741

1742
  TAOS_RETURN(code);
2,968,207✔
1743
}
1744

1745
int32_t mndRestoreAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
6,070✔
1746
                                       SDnodeObj *pDnode) {
1747
  int32_t      code = 0;
6,070✔
1748
  STransAction action = {0};
6,070✔
1749

1750
  action.epSet = mndGetDnodeEpset(pDnode);
6,070✔
1751

1752
  int32_t contLen = 0;
6,070✔
1753
  void   *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
6,070✔
1754
  if (pReq == NULL) {
6,070✔
1755
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1756
    if (terrno != 0) code = terrno;
×
1757
    TAOS_RETURN(code);
×
1758
  }
1759

1760
  action.pCont = pReq;
6,070✔
1761
  action.contLen = contLen;
6,070✔
1762
  action.msgType = TDMT_DND_CREATE_VNODE;
6,070✔
1763
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
6,070✔
1764
  action.groupId = pVgroup->vgId;
6,070✔
1765

1766
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
6,070✔
1767
    taosMemoryFree(pReq);
×
1768
    TAOS_RETURN(code);
×
1769
  }
1770

1771
  TAOS_RETURN(code);
6,070✔
1772
}
1773

1774
int32_t mndAddAlterVnodeConfirmAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
276,837✔
1775
  int32_t      code = 0;
276,837✔
1776
  STransAction action = {0};
276,837✔
1777
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
276,837✔
1778

1779
  mInfo("trans:%d, vgId:%d, build alter vnode confirm req", pTrans->id, pVgroup->vgId);
276,837✔
1780
  int32_t   contLen = sizeof(SMsgHead);
276,837✔
1781
  SMsgHead *pHead = taosMemoryMalloc(contLen);
276,837✔
1782
  if (pHead == NULL) {
276,837✔
1783
    TAOS_RETURN(terrno);
×
1784
  }
1785

1786
  pHead->contLen = htonl(contLen);
276,837✔
1787
  pHead->vgId = htonl(pVgroup->vgId);
276,837✔
1788

1789
  action.pCont = pHead;
276,837✔
1790
  action.contLen = contLen;
276,837✔
1791
  action.msgType = TDMT_VND_ALTER_CONFIRM;
276,837✔
1792
  // incorrect redirect result will cause this erro
1793
  action.retryCode = TSDB_CODE_VND_INVALID_VGROUP_ID;
276,837✔
1794
  action.groupId = pVgroup->vgId;
276,837✔
1795

1796
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
276,837✔
1797
    taosMemoryFree(pHead);
×
1798
    TAOS_RETURN(code);
×
1799
  }
1800

1801
  TAOS_RETURN(code);
276,837✔
1802
}
1803

1804
int32_t mndAddChangeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pOldVgroup, SVgObj *pNewVgroup,
×
1805
                                 int32_t dnodeId) {
1806
  int32_t      code = 0;
×
1807
  STransAction action = {0};
×
1808
  action.epSet = mndGetVgroupEpset(pMnode, pNewVgroup);
×
1809

1810
  int32_t contLen = 0;
×
1811
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pNewVgroup, dnodeId, &contLen);
×
1812
  if (pReq == NULL) {
×
1813
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1814
    if (terrno != 0) code = terrno;
×
1815
    TAOS_RETURN(code);
×
1816
  }
1817

1818
  int32_t totallen = contLen + sizeof(SMsgHead);
×
1819

1820
  SMsgHead *pHead = taosMemoryMalloc(totallen);
×
1821
  if (pHead == NULL) {
×
1822
    taosMemoryFree(pReq);
×
1823
    TAOS_RETURN(terrno);
×
1824
  }
1825

1826
  pHead->contLen = htonl(totallen);
×
1827
  pHead->vgId = htonl(pNewVgroup->vgId);
×
1828

1829
  memcpy((void *)(pHead + 1), pReq, contLen);
×
1830
  taosMemoryFree(pReq);
×
1831

1832
  action.pCont = pHead;
×
1833
  action.contLen = totallen;
×
1834
  action.msgType = TDMT_SYNC_CONFIG_CHANGE;
×
1835

1836
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
1837
    taosMemoryFree(pHead);
×
1838
    TAOS_RETURN(code);
×
1839
  }
1840

1841
  TAOS_RETURN(code);
×
1842
}
1843

1844
static int32_t mndAddAlterVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, int32_t srcVgId, SVgObj *pVgroup) {
22,826✔
1845
  int32_t      code = 0;
22,826✔
1846
  STransAction action = {0};
22,826✔
1847
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
22,826✔
1848

1849
  int32_t contLen = 0;
22,826✔
1850
  void   *pReq = mndBuildAlterVnodeHashRangeReq(pMnode, srcVgId, pVgroup, &contLen);
22,826✔
1851
  if (pReq == NULL) {
22,826✔
1852
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1853
    if (terrno != 0) code = terrno;
×
1854
    TAOS_RETURN(code);
×
1855
  }
1856

1857
  action.pCont = pReq;
22,826✔
1858
  action.contLen = contLen;
22,826✔
1859
  action.msgType = TDMT_VND_ALTER_HASHRANGE;
22,826✔
1860
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
22,826✔
1861

1862
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
22,826✔
1863
    taosMemoryFree(pReq);
×
1864
    TAOS_RETURN(code);
×
1865
  }
1866

1867
  mInfo("trans:%d, add alter vnode hash range action for from vgId:%d to vgId:%d", pTrans->id, srcVgId, pVgroup->vgId);
22,826✔
1868
  TAOS_RETURN(code);
22,826✔
1869
}
1870

1871
int32_t mndAddAlterVnodeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
209,007✔
1872
  int32_t      code = 0;
209,007✔
1873
  STransAction action = {0};
209,007✔
1874
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
209,007✔
1875

1876
  int32_t contLen = 0;
209,007✔
1877
  void   *pReq = mndBuildAlterVnodeConfigReq(pMnode, pDb, pVgroup, &contLen);
209,007✔
1878
  if (pReq == NULL) {
209,007✔
1879
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1880
    if (terrno != 0) code = terrno;
×
1881
    TAOS_RETURN(code);
×
1882
  }
1883

1884
  action.pCont = pReq;
209,007✔
1885
  action.contLen = contLen;
209,007✔
1886
  action.msgType = TDMT_VND_ALTER_CONFIG;
209,007✔
1887
  action.groupId = pVgroup->vgId;
209,007✔
1888

1889
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
209,007✔
1890
    taosMemoryFree(pReq);
×
1891
    TAOS_RETURN(code);
×
1892
  }
1893

1894
  TAOS_RETURN(code);
209,007✔
1895
}
1896

1897
int32_t mndAddNewVgPrepareAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVg) {
2,535,315✔
1898
  int32_t  code = 0;
2,535,315✔
1899
  SSdbRaw *pRaw = mndVgroupActionEncode(pVg);
2,535,315✔
1900
  if (pRaw == NULL) {
2,535,315✔
1901
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1902
    if (terrno != 0) code = terrno;
×
1903
    goto _err;
×
1904
  }
1905

1906
  TAOS_CHECK_GOTO(mndTransAppendPrepareLog(pTrans, pRaw), NULL, _err);
2,535,315✔
1907
  if (sdbSetRawStatus(pRaw, SDB_STATUS_CREATING) != 0) {
2,535,315✔
1908
    mError("vgId:%d, failed to set raw status at line:%d", pVg->vgId, __LINE__);
×
1909
  }
1910
  if (code != 0) {
2,535,315✔
1911
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVg->vgId, tstrerror(code), __LINE__);
×
1912
    TAOS_RETURN(code);
×
1913
  }
1914
  pRaw = NULL;
2,535,315✔
1915
  TAOS_RETURN(code);
2,535,315✔
1916

1917
_err:
×
1918
  sdbFreeRaw(pRaw);
×
1919
  TAOS_RETURN(code);
×
1920
}
1921

1922
int32_t mndAddAlterVnodeReplicaAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
679,178✔
1923
  int32_t    code = 0;
679,178✔
1924
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
679,178✔
1925
  if (pDnode == NULL) {
679,178✔
1926
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1927
    if (terrno != 0) code = terrno;
×
1928
    TAOS_RETURN(code);
×
1929
  }
1930

1931
  STransAction action = {0};
679,178✔
1932
  action.epSet = mndGetDnodeEpset(pDnode);
679,178✔
1933
  mndReleaseDnode(pMnode, pDnode);
679,178✔
1934

1935
  int32_t contLen = 0;
679,178✔
1936
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
679,178✔
1937
  if (pReq == NULL) {
679,178✔
1938
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1939
    if (terrno != 0) code = terrno;
×
1940
    TAOS_RETURN(code);
×
1941
  }
1942

1943
  action.pCont = pReq;
679,178✔
1944
  action.contLen = contLen;
679,178✔
1945
  action.msgType = TDMT_VND_ALTER_REPLICA;
679,178✔
1946
  action.groupId = pVgroup->vgId;
679,178✔
1947

1948
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
679,178✔
1949
    taosMemoryFree(pReq);
×
1950
    TAOS_RETURN(code);
×
1951
  }
1952

1953
  TAOS_RETURN(code);
679,178✔
1954
}
1955

1956
int32_t mndAddCheckLearnerCatchupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
×
1957
  int32_t    code = 0;
×
1958
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
×
1959
  if (pDnode == NULL) {
×
1960
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1961
    if (terrno != 0) code = terrno;
×
1962
    TAOS_RETURN(code);
×
1963
  }
1964

1965
  STransAction action = {0};
×
1966
  action.epSet = mndGetDnodeEpset(pDnode);
×
1967
  mndReleaseDnode(pMnode, pDnode);
×
1968

1969
  int32_t contLen = 0;
×
1970
  void   *pReq = mndBuildCheckLearnCatchupReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
×
1971
  if (pReq == NULL) {
×
1972
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1973
    if (terrno != 0) code = terrno;
×
1974
    TAOS_RETURN(code);
×
1975
  }
1976

1977
  action.pCont = pReq;
×
1978
  action.contLen = contLen;
×
1979
  action.msgType = TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP;
×
1980
  action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
1981
  action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
×
1982

1983
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
1984
    taosMemoryFree(pReq);
×
1985
    TAOS_RETURN(code);
×
1986
  }
1987

1988
  TAOS_RETURN(code);
×
1989
}
1990

1991
int32_t mndAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
100,075✔
1992
  int32_t    code = 0;
100,075✔
1993
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
100,075✔
1994
  if (pDnode == NULL) {
100,075✔
1995
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1996
    if (terrno != 0) code = terrno;
×
1997
    TAOS_RETURN(code);
×
1998
  }
1999

2000
  STransAction action = {0};
100,075✔
2001
  action.epSet = mndGetDnodeEpset(pDnode);
100,075✔
2002
  mndReleaseDnode(pMnode, pDnode);
100,075✔
2003

2004
  int32_t contLen = 0;
100,075✔
2005
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
100,075✔
2006
  if (pReq == NULL) {
100,075✔
2007
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2008
    if (terrno != 0) code = terrno;
×
2009
    TAOS_RETURN(code);
×
2010
  }
2011

2012
  action.pCont = pReq;
100,075✔
2013
  action.contLen = contLen;
100,075✔
2014
  action.msgType = TDMT_DND_ALTER_VNODE_TYPE;
100,075✔
2015
  action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
100,075✔
2016
  action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
100,075✔
2017
  action.groupId = pVgroup->vgId;
100,075✔
2018

2019
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
100,075✔
2020
    taosMemoryFree(pReq);
×
2021
    TAOS_RETURN(code);
×
2022
  }
2023

2024
  TAOS_RETURN(code);
100,075✔
2025
}
2026

2027
int32_t mndRestoreAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
6,070✔
2028
                                          SDnodeObj *pDnode) {
2029
  int32_t      code = 0;
6,070✔
2030
  STransAction action = {0};
6,070✔
2031
  action.epSet = mndGetDnodeEpset(pDnode);
6,070✔
2032

2033
  int32_t contLen = 0;
6,070✔
2034
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, pDnode->id, &contLen);
6,070✔
2035
  if (pReq == NULL) {
6,070✔
2036
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2037
    if (terrno != 0) code = terrno;
×
2038
    TAOS_RETURN(code);
×
2039
  }
2040

2041
  action.pCont = pReq;
6,070✔
2042
  action.contLen = contLen;
6,070✔
2043
  action.msgType = TDMT_DND_ALTER_VNODE_TYPE;
6,070✔
2044
  action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
6,070✔
2045
  action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
6,070✔
2046
  action.groupId = pVgroup->vgId;
6,070✔
2047

2048
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
6,070✔
2049
    taosMemoryFree(pReq);
×
2050
    TAOS_RETURN(code);
×
2051
  }
2052

2053
  TAOS_RETURN(code);
6,070✔
2054
}
2055

2056
static int32_t mndAddDisableVnodeWriteAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
22,826✔
2057
                                             int32_t dnodeId) {
2058
  int32_t    code = 0;
22,826✔
2059
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
22,826✔
2060
  if (pDnode == NULL) {
22,826✔
2061
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2062
    if (terrno != 0) code = terrno;
×
2063
    TAOS_RETURN(code);
×
2064
  }
2065

2066
  STransAction action = {0};
22,826✔
2067
  action.epSet = mndGetDnodeEpset(pDnode);
22,826✔
2068
  mndReleaseDnode(pMnode, pDnode);
22,826✔
2069

2070
  int32_t contLen = 0;
22,826✔
2071
  void   *pReq = mndBuildDisableVnodeWriteReq(pMnode, pDb, pVgroup->vgId, &contLen);
22,826✔
2072
  if (pReq == NULL) {
22,826✔
2073
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2074
    if (terrno != 0) code = terrno;
×
2075
    TAOS_RETURN(code);
×
2076
  }
2077

2078
  action.pCont = pReq;
22,826✔
2079
  action.contLen = contLen;
22,826✔
2080
  action.msgType = TDMT_VND_DISABLE_WRITE;
22,826✔
2081

2082
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
22,826✔
2083
    taosMemoryFree(pReq);
×
2084
    TAOS_RETURN(code);
×
2085
  }
2086

2087
  TAOS_RETURN(code);
22,826✔
2088
}
2089

2090
int32_t mndAddDropVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid,
4,430,709✔
2091
                              bool isRedo) {
2092
  int32_t      code = 0;
4,430,709✔
2093
  STransAction action = {0};
4,430,709✔
2094

2095
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
4,430,709✔
2096
  if (pDnode == NULL) {
4,430,709✔
2097
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2098
    if (terrno != 0) code = terrno;
×
2099
    TAOS_RETURN(code);
×
2100
  }
2101
  action.epSet = mndGetDnodeEpset(pDnode);
4,430,709✔
2102
  mndReleaseDnode(pMnode, pDnode);
4,430,709✔
2103

2104
  int32_t contLen = 0;
4,430,709✔
2105
  void   *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
4,430,709✔
2106
  if (pReq == NULL) {
4,430,709✔
2107
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2108
    if (terrno != 0) code = terrno;
×
2109
    TAOS_RETURN(code);
×
2110
  }
2111

2112
  action.pCont = pReq;
4,430,709✔
2113
  action.contLen = contLen;
4,430,709✔
2114
  action.msgType = TDMT_DND_DROP_VNODE;
4,430,709✔
2115
  action.acceptableCode = TSDB_CODE_VND_NOT_EXIST;
4,430,709✔
2116
  action.groupId = pVgroup->vgId;
4,430,709✔
2117

2118
  if (isRedo) {
4,430,709✔
2119
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
1,607,378✔
2120
      taosMemoryFree(pReq);
×
2121
      TAOS_RETURN(code);
×
2122
    }
2123
  } else {
2124
    if ((code = mndTransAppendUndoAction(pTrans, &action)) != 0) {
2,823,331✔
2125
      taosMemoryFree(pReq);
×
2126
      TAOS_RETURN(code);
×
2127
    }
2128
  }
2129

2130
  TAOS_RETURN(code);
4,430,709✔
2131
}
2132

2133
int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t vnIndex,
16,849✔
2134
                                    SArray *pArray, bool force, bool unsafe) {
2135
  int32_t code = 0;
16,849✔
2136
  SVgObj  newVg = {0};
16,849✔
2137
  memcpy(&newVg, pVgroup, sizeof(SVgObj));
16,849✔
2138

2139
  mInfo("vgId:%d, trans:%d, vgroup info before move, replica:%d", newVg.vgId, pTrans->id, newVg.replica);
16,849✔
2140
  for (int32_t i = 0; i < newVg.replica; ++i) {
54,462✔
2141
    mInfo("vgId:%d, trans:%d, vnode:%d dnode:%d", newVg.vgId, pTrans->id, i, newVg.vnodeGid[i].dnodeId);
37,613✔
2142
  }
2143

2144
  if (!force) {
16,849✔
2145
#if 1
2146
    {
2147
#else
2148
    if (newVg.replica == 1) {
2149
#endif
2150
      mInfo("vgId:%d, trans:%d, will add 1 vnode, replca:%d", pVgroup->vgId, pTrans->id, newVg.replica);
16,849✔
2151
      TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray));
16,849✔
2152
      for (int32_t i = 0; i < newVg.replica - 1; ++i) {
54,462✔
2153
        TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId));
37,613✔
2154
      }
2155
      TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[newVg.replica - 1]));
16,849✔
2156
      TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg));
16,849✔
2157

2158
      mInfo("vgId:%d, trans:%d, will remove 1 vnode, replca:2", pVgroup->vgId, pTrans->id);
16,849✔
2159
      newVg.replica--;
16,849✔
2160
      SVnodeGid del = newVg.vnodeGid[vnIndex];
16,849✔
2161
      newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
16,849✔
2162
      memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
16,849✔
2163
      {
2164
        SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
16,849✔
2165
        if (pRaw == NULL) {
16,849✔
2166
          code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2167
          if (terrno != 0) code = terrno;
×
2168
          TAOS_RETURN(code);
×
2169
        }
2170
        if ((code = mndTransAppendGroupRedolog(pTrans, pRaw, pVgroup->vgId)) != 0) {
16,849✔
2171
          sdbFreeRaw(pRaw);
×
2172
          TAOS_RETURN(code);
×
2173
        }
2174
        code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
16,849✔
2175
        if (code != 0) {
16,849✔
2176
          mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2177
          return code;
×
2178
        }
2179
      }
2180

2181
      TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true));
16,849✔
2182
      for (int32_t i = 0; i < newVg.replica; ++i) {
54,462✔
2183
        TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId));
37,613✔
2184
      }
2185
      TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg));
16,849✔
2186
#if 1
2187
    }
2188
#else
2189
    } else {  // new replica == 3
2190
      mInfo("vgId:%d, will add 1 vnode, replca:3", pVgroup->vgId);
2191
      if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1;
2192
      mInfo("vgId:%d, will remove 1 vnode, replca:4", pVgroup->vgId);
2193
      newVg.replica--;
2194
      SVnodeGid del = newVg.vnodeGid[vnIndex];
2195
      newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
2196
      memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
2197
      {
2198
        SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
2199
        if (pRaw == NULL) return -1;
2200
        if (mndTransAppendRedolog(pTrans, pRaw) != 0) {
2201
          sdbFreeRaw(pRaw);
2202
          return -1;
2203
        }
2204
      }
2205

2206
      if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true) != 0) return -1;
2207
      for (int32_t i = 0; i < newVg.replica; ++i) {
2208
        if (i == vnIndex) continue;
2209
        if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1;
2210
      }
2211
      if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[vnIndex]) != 0) return -1;
2212
      if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
2213
    }
2214
#endif
2215
  } else {
2216
    mInfo("vgId:%d, will add 1 vnode and force remove 1 vnode", pVgroup->vgId);
×
2217
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray));
×
2218
    newVg.replica--;
×
2219
    // SVnodeGid del = newVg.vnodeGid[vnIndex];
2220
    newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
×
2221
    memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
×
2222
    {
2223
      SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
×
2224
      if (pRaw == NULL) {
×
2225
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2226
        if (terrno != 0) code = terrno;
×
2227
        TAOS_RETURN(code);
×
2228
      }
2229
      if ((code = mndTransAppendGroupRedolog(pTrans, pRaw, pVgroup->vgId)) != 0) {
×
2230
        sdbFreeRaw(pRaw);
×
2231
        TAOS_RETURN(code);
×
2232
      }
2233
      code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
×
2234
      if (code != 0) {
×
2235
        mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2236
        return code;
×
2237
      }
2238
    }
2239

2240
    for (int32_t i = 0; i < newVg.replica; ++i) {
×
2241
      if (i != vnIndex) {
×
2242
        TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId));
×
2243
      }
2244
    }
2245
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[vnIndex]));
×
2246
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg));
×
2247

2248
    if (newVg.replica == 1) {
×
2249
      if (force && !unsafe) {
×
2250
        TAOS_RETURN(TSDB_CODE_VND_META_DATA_UNSAFE_DELETE);
×
2251
      }
2252

2253
      SSdb *pSdb = pMnode->pSdb;
×
2254
      void *pIter = NULL;
×
2255

2256
      while (1) {
×
2257
        SStbObj *pStb = NULL;
×
2258
        pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
×
2259
        if (pIter == NULL) break;
×
2260

2261
        if (strcmp(pStb->db, pDb->name) == 0) {
×
2262
          if ((code = mndSetForceDropCreateStbRedoActions(pMnode, pTrans, &newVg, pStb)) != 0) {
×
2263
            sdbCancelFetch(pSdb, pIter);
×
2264
            sdbRelease(pSdb, pStb);
×
2265
            TAOS_RETURN(code);
×
2266
          }
2267
        }
2268

2269
        sdbRelease(pSdb, pStb);
×
2270
      }
2271

2272
      mInfo("vgId:%d, all data is dropped since replica=1", pVgroup->vgId);
×
2273
    }
2274
  }
2275

2276
  {
2277
    SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
16,849✔
2278
    if (pRaw == NULL) {
16,849✔
2279
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2280
      if (terrno != 0) code = terrno;
×
2281
      TAOS_RETURN(code);
×
2282
    }
2283
    if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
16,849✔
2284
      sdbFreeRaw(pRaw);
×
2285
      TAOS_RETURN(code);
×
2286
    }
2287
    code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
16,849✔
2288
    if (code != 0) {
16,849✔
2289
      mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2290
      return code;
×
2291
    }
2292
  }
2293

2294
  mInfo("vgId:%d, vgroup info after move, replica:%d", newVg.vgId, newVg.replica);
16,849✔
2295
  for (int32_t i = 0; i < newVg.replica; ++i) {
54,462✔
2296
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
37,613✔
2297
  }
2298
  TAOS_RETURN(code);
16,849✔
2299
}
2300

2301
int32_t mndSetMoveVgroupsInfoToTrans(SMnode *pMnode, STrans *pTrans, int32_t delDnodeId, bool force, bool unsafe) {
8,327✔
2302
  int32_t code = 0;
8,327✔
2303
  SArray *pArray = mndBuildDnodesArray(pMnode, delDnodeId, NULL);
8,327✔
2304
  if (pArray == NULL) {
8,327✔
2305
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2306
    if (terrno != 0) code = terrno;
×
2307
    TAOS_RETURN(code);
×
2308
  }
2309

2310
  void *pIter = NULL;
8,327✔
2311
  while (1) {
24,144✔
2312
    SVgObj *pVgroup = NULL;
32,471✔
2313
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
32,471✔
2314
    if (pIter == NULL) break;
32,471✔
2315

2316
    int32_t vnIndex = -1;
24,144✔
2317
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
46,660✔
2318
      if (pVgroup->vnodeGid[i].dnodeId == delDnodeId) {
39,365✔
2319
        vnIndex = i;
16,849✔
2320
        break;
16,849✔
2321
      }
2322
    }
2323

2324
    code = 0;
24,144✔
2325
    if (vnIndex != -1) {
24,144✔
2326
      mInfo("vgId:%d, trans:%d, vnode:%d will be removed from dnode:%d, force:%d", pVgroup->vgId, pTrans->id, vnIndex,
16,849✔
2327
            delDnodeId, force);
2328
      SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
16,849✔
2329
      code = mndSetMoveVgroupInfoToTrans(pMnode, pTrans, pDb, pVgroup, vnIndex, pArray, force, unsafe);
16,849✔
2330
      mndReleaseDb(pMnode, pDb);
16,849✔
2331
    }
2332

2333
    sdbRelease(pMnode->pSdb, pVgroup);
24,144✔
2334

2335
    if (code != 0) {
24,144✔
2336
      sdbCancelFetch(pMnode->pSdb, pIter);
×
2337
      break;
×
2338
    }
2339
  }
2340

2341
  taosArrayDestroy(pArray);
8,327✔
2342
  TAOS_RETURN(code);
8,327✔
2343
}
2344

2345
static int32_t mndAddIncVgroupReplicaToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
64,579✔
2346
                                             int32_t newDnodeId) {
2347
  int32_t code = 0;
64,579✔
2348
  mInfo("vgId:%d, will add 1 vnode, replica:%d dnode:%d", pVgroup->vgId, pVgroup->replica, newDnodeId);
64,579✔
2349

2350
  // assoc dnode
2351
  SVnodeGid *pGid = &pVgroup->vnodeGid[pVgroup->replica];
64,579✔
2352
  pVgroup->replica++;
64,579✔
2353
  pGid->dnodeId = newDnodeId;
64,579✔
2354
  pGid->syncState = TAOS_SYNC_STATE_OFFLINE;
64,579✔
2355
  pGid->nodeRole = TAOS_SYNC_ROLE_LEARNER;
64,579✔
2356

2357
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
64,579✔
2358
  if (pVgRaw == NULL) {
64,579✔
2359
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2360
    if (terrno != 0) code = terrno;
×
2361
    TAOS_RETURN(code);
×
2362
  }
2363
  if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
64,579✔
2364
    sdbFreeRaw(pVgRaw);
×
2365
    TAOS_RETURN(code);
×
2366
  }
2367
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
64,579✔
2368
  if (code != 0) {
64,579✔
2369
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVgroup->vgId, tstrerror(code), __LINE__);
×
2370
    TAOS_RETURN(code);
×
2371
  }
2372

2373
  // learner
2374
  for (int32_t i = 0; i < pVgroup->replica - 1; ++i) {
215,504✔
2375
    TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId));
150,925✔
2376
  }
2377
  TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pGid));
64,579✔
2378

2379
  // voter
2380
  pGid->nodeRole = TAOS_SYNC_ROLE_VOTER;
64,579✔
2381
  TAOS_CHECK_RETURN(mndAddAlterVnodeTypeAction(pMnode, pTrans, pDb, pVgroup, pGid->dnodeId));
64,579✔
2382
  for (int32_t i = 0; i < pVgroup->replica - 1; ++i) {
215,504✔
2383
    TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId));
150,925✔
2384
  }
2385

2386
  // confirm
2387
  TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup));
64,579✔
2388

2389
  TAOS_RETURN(code);
64,579✔
2390
}
2391

2392
static int32_t mndAddDecVgroupReplicaFromTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
64,579✔
2393
                                               int32_t delDnodeId) {
2394
  int32_t code = 0;
64,579✔
2395
  mInfo("vgId:%d, will remove 1 vnode, replica:%d dnode:%d", pVgroup->vgId, pVgroup->replica, delDnodeId);
64,579✔
2396

2397
  SVnodeGid *pGid = NULL;
64,579✔
2398
  SVnodeGid  delGid = {0};
64,579✔
2399
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
106,250✔
2400
    if (pVgroup->vnodeGid[i].dnodeId == delDnodeId) {
106,250✔
2401
      pGid = &pVgroup->vnodeGid[i];
64,579✔
2402
      break;
64,579✔
2403
    }
2404
  }
2405

2406
  if (pGid == NULL) return 0;
64,579✔
2407

2408
  pVgroup->replica--;
64,579✔
2409
  memcpy(&delGid, pGid, sizeof(SVnodeGid));
64,579✔
2410
  memcpy(pGid, &pVgroup->vnodeGid[pVgroup->replica], sizeof(SVnodeGid));
64,579✔
2411
  memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
64,579✔
2412

2413
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
64,579✔
2414
  if (pVgRaw == NULL) {
64,579✔
2415
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2416
    if (terrno != 0) code = terrno;
×
2417
    TAOS_RETURN(code);
×
2418
  }
2419
  if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
64,579✔
2420
    sdbFreeRaw(pVgRaw);
×
2421
    TAOS_RETURN(code);
×
2422
  }
2423
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
64,579✔
2424
  if (code != 0) {
64,579✔
2425
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVgroup->vgId, tstrerror(code), __LINE__);
×
2426
    TAOS_RETURN(code);
×
2427
  }
2428

2429
  TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pDb, pVgroup, &delGid, true));
64,579✔
2430
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
215,504✔
2431
    TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId));
150,925✔
2432
  }
2433
  TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup));
64,579✔
2434

2435
  TAOS_RETURN(code);
64,579✔
2436
}
2437

2438
static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup, SDnodeObj *pNew1,
37,338✔
2439
                                     SDnodeObj *pOld1, SDnodeObj *pNew2, SDnodeObj *pOld2, SDnodeObj *pNew3,
2440
                                     SDnodeObj *pOld3) {
2441
  int32_t code = -1;
37,338✔
2442
  STrans *pTrans = NULL;
37,338✔
2443

2444
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "red-vgroup");
37,338✔
2445
  if (pTrans == NULL) {
37,338✔
2446
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2447
    if (terrno != 0) code = terrno;
×
2448
    goto _OVER;
×
2449
  }
2450

2451
  mndTransSetDbName(pTrans, pVgroup->dbName, NULL);
37,338✔
2452
  TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
37,338✔
2453
  TAOS_CHECK_GOTO(mndTransCheckConflictWithRetention(pMnode, pTrans), NULL, _OVER);
37,115✔
2454

2455
  mndTransSetSerial(pTrans);
37,115✔
2456
  mInfo("trans:%d, used to redistribute vgroup, vgId:%d", pTrans->id, pVgroup->vgId);
37,115✔
2457

2458
  SVgObj newVg = {0};
37,115✔
2459
  memcpy(&newVg, pVgroup, sizeof(SVgObj));
37,115✔
2460
  mInfo("vgId:%d, vgroup info before redistribute, replica:%d", newVg.vgId, newVg.replica);
37,115✔
2461
  for (int32_t i = 0; i < newVg.replica; ++i) {
123,660✔
2462
    mInfo("vgId:%d, vnode:%d dnode:%d role:%s", newVg.vgId, i, newVg.vnodeGid[i].dnodeId,
86,545✔
2463
          syncStr(newVg.vnodeGid[i].syncState));
2464
  }
2465

2466
  if (pNew1 != NULL && pOld1 != NULL) {
37,115✔
2467
    int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew1->id);
37,115✔
2468
    if (numOfVnodes >= pNew1->numOfSupportVnodes) {
37,115✔
2469
      mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew1->id, numOfVnodes,
708✔
2470
             pNew1->numOfSupportVnodes);
2471
      code = TSDB_CODE_MND_NO_ENOUGH_VNODES;
708✔
2472
      goto _OVER;
708✔
2473
    }
2474

2475
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
36,407✔
2476
    if (pNew1->memAvail - vgMem - pNew1->memUsed <= 0) {
36,407✔
2477
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
2478
             pVgroup->dbName, pVgroup->vgId, vgMem, pNew1->id, pNew1->memAvail, pNew1->memUsed);
2479
      code = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
×
2480
      goto _OVER;
×
2481
    } else {
2482
      pNew1->memUsed += vgMem;
36,407✔
2483
    }
2484

2485
    TAOS_CHECK_GOTO(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew1->id), NULL, _OVER);
36,407✔
2486
    TAOS_CHECK_GOTO(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld1->id), NULL, _OVER);
36,407✔
2487
  }
2488

2489
  if (pNew2 != NULL && pOld2 != NULL) {
36,407✔
2490
    int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew2->id);
9,830✔
2491
    if (numOfVnodes >= pNew2->numOfSupportVnodes) {
9,830✔
2492
      mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew2->id, numOfVnodes,
×
2493
             pNew2->numOfSupportVnodes);
2494
      code = TSDB_CODE_MND_NO_ENOUGH_VNODES;
×
2495
      goto _OVER;
×
2496
    }
2497
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
9,830✔
2498
    if (pNew2->memAvail - vgMem - pNew2->memUsed <= 0) {
9,830✔
2499
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
2500
             pVgroup->dbName, pVgroup->vgId, vgMem, pNew2->id, pNew2->memAvail, pNew2->memUsed);
2501
      code = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
×
2502
      goto _OVER;
×
2503
    } else {
2504
      pNew2->memUsed += vgMem;
9,830✔
2505
    }
2506
    TAOS_CHECK_GOTO(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew2->id), NULL, _OVER);
9,830✔
2507
    TAOS_CHECK_GOTO(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld2->id), NULL, _OVER);
9,830✔
2508
  }
2509

2510
  if (pNew3 != NULL && pOld3 != NULL) {
36,407✔
2511
    int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew3->id);
2,862✔
2512
    if (numOfVnodes >= pNew3->numOfSupportVnodes) {
2,862✔
2513
      mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew3->id, numOfVnodes,
×
2514
             pNew3->numOfSupportVnodes);
2515
      code = TSDB_CODE_MND_NO_ENOUGH_VNODES;
×
2516
      goto _OVER;
×
2517
    }
2518
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
2,862✔
2519
    if (pNew3->memAvail - vgMem - pNew3->memUsed <= 0) {
2,862✔
2520
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
2521
             pVgroup->dbName, pVgroup->vgId, vgMem, pNew3->id, pNew3->memAvail, pNew3->memUsed);
2522
      code = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
×
2523
      goto _OVER;
×
2524
    } else {
2525
      pNew3->memUsed += vgMem;
2,862✔
2526
    }
2527
    TAOS_CHECK_GOTO(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew3->id), NULL, _OVER);
2,862✔
2528
    TAOS_CHECK_GOTO(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld3->id), NULL, _OVER);
2,862✔
2529
  }
2530

2531
  {
2532
    SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
36,407✔
2533
    if (pRaw == NULL) {
36,407✔
2534
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2535
      if (terrno != 0) code = terrno;
×
2536
      goto _OVER;
×
2537
    }
2538
    if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
36,407✔
2539
      sdbFreeRaw(pRaw);
×
2540
      goto _OVER;
×
2541
    }
2542
    code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
36,407✔
2543
    if (code != 0) {
36,407✔
2544
      mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2545
      goto _OVER;
×
2546
    }
2547
  }
2548

2549
  mInfo("vgId:%d, vgroup info after redistribute, replica:%d", newVg.vgId, newVg.replica);
36,407✔
2550
  for (int32_t i = 0; i < newVg.replica; ++i) {
120,828✔
2551
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
84,421✔
2552
  }
2553

2554
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
36,407✔
2555
  code = 0;
34,177✔
2556

2557
_OVER:
37,338✔
2558
  mndTransDrop(pTrans);
37,338✔
2559
  mndReleaseDb(pMnode, pDb);
37,338✔
2560
  TAOS_RETURN(code);
37,338✔
2561
}
2562

2563
static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) {
45,201✔
2564
  SMnode    *pMnode = pReq->info.node;
45,201✔
2565
  SDnodeObj *pNew1 = NULL;
45,201✔
2566
  SDnodeObj *pNew2 = NULL;
45,201✔
2567
  SDnodeObj *pNew3 = NULL;
45,201✔
2568
  SDnodeObj *pOld1 = NULL;
45,201✔
2569
  SDnodeObj *pOld2 = NULL;
45,201✔
2570
  SDnodeObj *pOld3 = NULL;
45,201✔
2571
  SVgObj    *pVgroup = NULL;
45,201✔
2572
  SDbObj    *pDb = NULL;
45,201✔
2573
  int32_t    code = -1;
45,201✔
2574
  int64_t    curMs = taosGetTimestampMs();
45,201✔
2575
  int32_t    newDnodeId[3] = {0};
45,201✔
2576
  int32_t    oldDnodeId[3] = {0};
45,201✔
2577
  int32_t    newIndex = -1;
45,201✔
2578
  int32_t    oldIndex = -1;
45,201✔
2579

2580
  SRedistributeVgroupReq req = {0};
45,201✔
2581
  if (tDeserializeSRedistributeVgroupReq(pReq->pCont, pReq->contLen, &req) != 0) {
45,201✔
2582
    code = TSDB_CODE_INVALID_MSG;
×
2583
    goto _OVER;
×
2584
  }
2585

2586
  mInfo("vgId:%d, start to redistribute vgroup to dnode %d:%d:%d", req.vgId, req.dnodeId1, req.dnodeId2, req.dnodeId3);
45,201✔
2587
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_REDISTRIBUTE_VGROUP)) != 0) {
45,201✔
2588
    goto _OVER;
350✔
2589
  }
2590

2591
  pVgroup = mndAcquireVgroup(pMnode, req.vgId);
44,851✔
2592
  if (pVgroup == NULL) {
44,851✔
2593
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
2,124✔
2594
    if (terrno != 0) code = terrno;
2,124✔
2595
    goto _OVER;
2,124✔
2596
  }
2597
  if (pVgroup->mountVgId) {
42,727✔
2598
    code = TSDB_CODE_MND_MOUNT_OBJ_NOT_SUPPORT;
×
2599
    goto _OVER;
×
2600
  }
2601
  pDb = mndAcquireDb(pMnode, pVgroup->dbName);
42,727✔
2602
  if (pDb == NULL) {
42,727✔
2603
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2604
    if (terrno != 0) code = terrno;
×
2605
    goto _OVER;
×
2606
  }
2607

2608
  if (pVgroup->replica == 1) {
42,727✔
2609
    if (req.dnodeId1 <= 0 || req.dnodeId2 > 0 || req.dnodeId3 > 0) {
11,941✔
2610
      code = TSDB_CODE_MND_INVALID_REPLICA;
×
2611
      goto _OVER;
×
2612
    }
2613

2614
    if (req.dnodeId1 == pVgroup->vnodeGid[0].dnodeId) {
11,941✔
2615
      // terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
2616
      code = 0;
×
2617
      goto _OVER;
×
2618
    }
2619

2620
    pNew1 = mndAcquireDnode(pMnode, req.dnodeId1);
11,941✔
2621
    if (pNew1 == NULL) {
11,941✔
2622
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2623
      if (terrno != 0) code = terrno;
×
2624
      goto _OVER;
×
2625
    }
2626
    if (!mndIsDnodeOnline(pNew1, curMs)) {
11,941✔
UNCOV
2627
      code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
UNCOV
2628
      goto _OVER;
×
2629
    }
2630

2631
    pOld1 = mndAcquireDnode(pMnode, pVgroup->vnodeGid[0].dnodeId);
11,941✔
2632
    if (pOld1 == NULL) {
11,941✔
2633
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2634
      if (terrno != 0) code = terrno;
×
2635
      goto _OVER;
×
2636
    }
2637
    if (!mndIsDnodeOnline(pOld1, curMs)) {
11,941✔
2638
      code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
42✔
2639
      goto _OVER;
42✔
2640
    }
2641

2642
    code = mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, NULL, NULL, NULL, NULL);
11,899✔
2643

2644
  } else if (pVgroup->replica == 3) {
30,786✔
2645
    if (req.dnodeId1 <= 0 || req.dnodeId2 <= 0 || req.dnodeId3 <= 0) {
29,338✔
2646
      code = TSDB_CODE_MND_INVALID_REPLICA;
2,832✔
2647
      goto _OVER;
2,832✔
2648
    }
2649

2650
    if (req.dnodeId1 == req.dnodeId2 || req.dnodeId1 == req.dnodeId3 || req.dnodeId2 == req.dnodeId3) {
26,506✔
2651
      code = TSDB_CODE_MND_INVALID_REPLICA;
708✔
2652
      goto _OVER;
708✔
2653
    }
2654

2655
    if (req.dnodeId1 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId1 != pVgroup->vnodeGid[1].dnodeId &&
25,798✔
2656
        req.dnodeId1 != pVgroup->vnodeGid[2].dnodeId) {
17,390✔
2657
      newDnodeId[++newIndex] = req.dnodeId1;
9,242✔
2658
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
9,242✔
2659
    }
2660

2661
    if (req.dnodeId2 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[1].dnodeId &&
25,798✔
2662
        req.dnodeId2 != pVgroup->vnodeGid[2].dnodeId) {
14,091✔
2663
      newDnodeId[++newIndex] = req.dnodeId2;
11,267✔
2664
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
11,267✔
2665
    }
2666

2667
    if (req.dnodeId3 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId3 != pVgroup->vnodeGid[1].dnodeId &&
25,798✔
2668
        req.dnodeId3 != pVgroup->vnodeGid[2].dnodeId) {
19,073✔
2669
      newDnodeId[++newIndex] = req.dnodeId3;
15,530✔
2670
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
15,530✔
2671
    }
2672

2673
    if (req.dnodeId1 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[0].dnodeId &&
25,798✔
2674
        req.dnodeId3 != pVgroup->vnodeGid[0].dnodeId) {
15,938✔
2675
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[0].dnodeId;
12,036✔
2676
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
12,036✔
2677
    }
2678

2679
    if (req.dnodeId1 != pVgroup->vnodeGid[1].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[1].dnodeId &&
25,798✔
2680
        req.dnodeId3 != pVgroup->vnodeGid[1].dnodeId) {
15,543✔
2681
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[1].dnodeId;
12,720✔
2682
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
12,720✔
2683
    }
2684

2685
    if (req.dnodeId1 != pVgroup->vnodeGid[2].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[2].dnodeId &&
25,798✔
2686
        req.dnodeId3 != pVgroup->vnodeGid[2].dnodeId) {
14,826✔
2687
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[2].dnodeId;
11,283✔
2688
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
11,283✔
2689
    }
2690

2691
    if (newDnodeId[0] != 0) {
25,798✔
2692
      pNew1 = mndAcquireDnode(pMnode, newDnodeId[0]);
24,731✔
2693
      if (pNew1 == NULL) {
24,731✔
2694
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2695
        if (terrno != 0) code = terrno;
×
2696
        goto _OVER;
×
2697
      }
2698
      if (!mndIsDnodeOnline(pNew1, curMs)) {
24,731✔
2699
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
708✔
2700
        goto _OVER;
708✔
2701
      }
2702
    }
2703

2704
    if (newDnodeId[1] != 0) {
25,090✔
2705
      pNew2 = mndAcquireDnode(pMnode, newDnodeId[1]);
8,414✔
2706
      if (pNew2 == NULL) {
8,414✔
2707
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2708
        if (terrno != 0) code = terrno;
×
2709
        goto _OVER;
×
2710
      }
2711
      if (!mndIsDnodeOnline(pNew2, curMs)) {
8,414✔
2712
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2713
        goto _OVER;
×
2714
      }
2715
    }
2716

2717
    if (newDnodeId[2] != 0) {
25,090✔
2718
      pNew3 = mndAcquireDnode(pMnode, newDnodeId[2]);
2,894✔
2719
      if (pNew3 == NULL) {
2,894✔
2720
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2721
        if (terrno != 0) code = terrno;
×
2722
        goto _OVER;
×
2723
      }
2724
      if (!mndIsDnodeOnline(pNew3, curMs)) {
2,894✔
2725
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2726
        goto _OVER;
×
2727
      }
2728
    }
2729

2730
    if (oldDnodeId[0] != 0) {
25,090✔
2731
      pOld1 = mndAcquireDnode(pMnode, oldDnodeId[0]);
24,023✔
2732
      if (pOld1 == NULL) {
24,023✔
2733
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2734
        if (terrno != 0) code = terrno;
×
2735
        goto _OVER;
×
2736
      }
2737
      if (!mndIsDnodeOnline(pOld1, curMs)) {
24,023✔
2738
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
32✔
2739
        goto _OVER;
32✔
2740
      }
2741
    }
2742

2743
    if (oldDnodeId[1] != 0) {
25,058✔
2744
      pOld2 = mndAcquireDnode(pMnode, oldDnodeId[1]);
8,382✔
2745
      if (pOld2 == NULL) {
8,382✔
2746
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2747
        if (terrno != 0) code = terrno;
×
2748
        goto _OVER;
×
2749
      }
2750
      if (!mndIsDnodeOnline(pOld2, curMs)) {
8,382✔
2751
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2752
        goto _OVER;
×
2753
      }
2754
    }
2755

2756
    if (oldDnodeId[2] != 0) {
25,058✔
2757
      pOld3 = mndAcquireDnode(pMnode, oldDnodeId[2]);
2,862✔
2758
      if (pOld3 == NULL) {
2,862✔
2759
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2760
        if (terrno != 0) code = terrno;
×
2761
        goto _OVER;
×
2762
      }
2763
      if (!mndIsDnodeOnline(pOld3, curMs)) {
2,862✔
2764
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2765
        goto _OVER;
×
2766
      }
2767
    }
2768

2769
    if (pNew1 == NULL && pOld1 == NULL && pNew2 == NULL && pOld2 == NULL && pNew3 == NULL && pOld3 == NULL) {
25,058✔
2770
      // terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
2771
      code = 0;
1,067✔
2772
      goto _OVER;
1,067✔
2773
    }
2774

2775
    code = mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, pNew2, pOld2, pNew3, pOld3);
23,991✔
2776

2777
  } else if (pVgroup->replica == 2) {
1,448✔
2778
    if (req.dnodeId1 <= 0 || req.dnodeId2 <= 0) {
1,448✔
2779
      code = TSDB_CODE_MND_INVALID_REPLICA;
×
2780
      goto _OVER;
×
2781
    }
2782

2783
    if (req.dnodeId1 == req.dnodeId2) {
1,448✔
2784
      code = TSDB_CODE_MND_INVALID_REPLICA;
×
2785
      goto _OVER;
×
2786
    }
2787

2788
    if (req.dnodeId1 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId1 != pVgroup->vnodeGid[1].dnodeId) {
1,448✔
2789
      newDnodeId[++newIndex] = req.dnodeId1;
1,448✔
2790
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
1,448✔
2791
    }
2792

2793
    if (req.dnodeId2 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[1].dnodeId) {
1,448✔
2794
      newDnodeId[++newIndex] = req.dnodeId2;
1,448✔
2795
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
1,448✔
2796
    }
2797

2798
    if (req.dnodeId1 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[0].dnodeId) {
1,448✔
2799
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[0].dnodeId;
1,448✔
2800
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
1,448✔
2801
    }
2802

2803
    if (req.dnodeId1 != pVgroup->vnodeGid[1].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[1].dnodeId) {
1,448✔
2804
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[1].dnodeId;
1,448✔
2805
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
1,448✔
2806
    }
2807

2808
    if (newDnodeId[0] != 0) {
1,448✔
2809
      pNew1 = mndAcquireDnode(pMnode, newDnodeId[0]);
1,448✔
2810
      if (pNew1 == NULL) {
1,448✔
2811
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2812
        if (terrno != 0) code = terrno;
×
2813
        goto _OVER;
×
2814
      }
2815
      if (!mndIsDnodeOnline(pNew1, curMs)) {
1,448✔
2816
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2817
        goto _OVER;
×
2818
      }
2819
    }
2820

2821
    if (newDnodeId[1] != 0) {
1,448✔
2822
      pNew2 = mndAcquireDnode(pMnode, newDnodeId[1]);
1,448✔
2823
      if (pNew2 == NULL) {
1,448✔
2824
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2825
        if (terrno != 0) code = terrno;
×
2826
        goto _OVER;
×
2827
      }
2828
      if (!mndIsDnodeOnline(pNew2, curMs)) {
1,448✔
2829
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2830
        goto _OVER;
×
2831
      }
2832
    }
2833

2834
    if (oldDnodeId[0] != 0) {
1,448✔
2835
      pOld1 = mndAcquireDnode(pMnode, oldDnodeId[0]);
1,448✔
2836
      if (pOld1 == NULL) {
1,448✔
2837
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2838
        if (terrno != 0) code = terrno;
×
2839
        goto _OVER;
×
2840
      }
2841
      if (!mndIsDnodeOnline(pOld1, curMs)) {
1,448✔
2842
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2843
        goto _OVER;
×
2844
      }
2845
    }
2846

2847
    if (oldDnodeId[1] != 0) {
1,448✔
2848
      pOld2 = mndAcquireDnode(pMnode, oldDnodeId[1]);
1,448✔
2849
      if (pOld2 == NULL) {
1,448✔
2850
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2851
        if (terrno != 0) code = terrno;
×
2852
        goto _OVER;
×
2853
      }
2854
      if (!mndIsDnodeOnline(pOld2, curMs)) {
1,448✔
2855
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2856
        goto _OVER;
×
2857
      }
2858
    }
2859

2860
    if (pNew1 == NULL && pOld1 == NULL && pNew2 == NULL && pOld2 == NULL) {
1,448✔
2861
      // terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
2862
      code = 0;
×
2863
      goto _OVER;
×
2864
    }
2865

2866
    code = mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, pNew2, pOld2, NULL, NULL);
1,448✔
2867
  } else {
2868
    code = TSDB_CODE_MND_REQ_REJECTED;
×
2869
    goto _OVER;
×
2870
  }
2871

2872
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
37,338✔
2873

2874
  char obj[33] = {0};
37,338✔
2875
  (void)tsnprintf(obj, sizeof(obj), "%d", req.vgId);
37,338✔
2876

2877
  auditRecord(pReq, pMnode->clusterId, "RedistributeVgroup", "", obj, req.sql, req.sqlLen);
37,338✔
2878

2879
_OVER:
45,201✔
2880
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
45,201✔
2881
    mError("vgId:%d, failed to redistribute to dnode %d:%d:%d since %s", req.vgId, req.dnodeId1, req.dnodeId2,
9,957✔
2882
           req.dnodeId3, tstrerror(code));
2883
  }
2884

2885
  mndReleaseDnode(pMnode, pNew1);
45,201✔
2886
  mndReleaseDnode(pMnode, pNew2);
45,201✔
2887
  mndReleaseDnode(pMnode, pNew3);
45,201✔
2888
  mndReleaseDnode(pMnode, pOld1);
45,201✔
2889
  mndReleaseDnode(pMnode, pOld2);
45,201✔
2890
  mndReleaseDnode(pMnode, pOld3);
45,201✔
2891
  mndReleaseVgroup(pMnode, pVgroup);
45,201✔
2892
  mndReleaseDb(pMnode, pDb);
45,201✔
2893
  tFreeSRedistributeVgroupReq(&req);
45,201✔
2894

2895
  TAOS_RETURN(code);
45,201✔
2896
}
2897

2898
static void *mndBuildSForceBecomeFollowerReq(SMnode *pMnode, SVgObj *pVgroup, int32_t dnodeId, int32_t *pContLen) {
6,436✔
2899
  SForceBecomeFollowerReq balanceReq = {
6,436✔
2900
      .vgId = pVgroup->vgId,
6,436✔
2901
  };
2902

2903
  int32_t contLen = tSerializeSForceBecomeFollowerReq(NULL, 0, &balanceReq);
6,436✔
2904
  if (contLen < 0) {
6,436✔
2905
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2906
    return NULL;
×
2907
  }
2908
  contLen += sizeof(SMsgHead);
6,436✔
2909

2910
  void *pReq = taosMemoryMalloc(contLen);
6,436✔
2911
  if (pReq == NULL) {
6,436✔
2912
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2913
    return NULL;
×
2914
  }
2915

2916
  SMsgHead *pHead = pReq;
6,436✔
2917
  pHead->contLen = htonl(contLen);
6,436✔
2918
  pHead->vgId = htonl(pVgroup->vgId);
6,436✔
2919

2920
  if (tSerializeSForceBecomeFollowerReq((char *)pReq + sizeof(SMsgHead), contLen, &balanceReq) < 0) {
6,436✔
2921
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2922
    taosMemoryFree(pReq);
×
2923
    return NULL;
×
2924
  }
2925
  *pContLen = contLen;
6,436✔
2926
  return pReq;
6,436✔
2927
}
2928

2929
int32_t mndAddBalanceVgroupLeaderAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t dnodeId) {
6,436✔
2930
  int32_t    code = 0;
6,436✔
2931
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
6,436✔
2932
  if (pDnode == NULL) {
6,436✔
2933
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2934
    if (terrno != 0) code = terrno;
×
2935
    TAOS_RETURN(code);
×
2936
  }
2937

2938
  STransAction action = {0};
6,436✔
2939
  action.epSet = mndGetDnodeEpset(pDnode);
6,436✔
2940
  mndReleaseDnode(pMnode, pDnode);
6,436✔
2941

2942
  int32_t contLen = 0;
6,436✔
2943
  void   *pReq = mndBuildSForceBecomeFollowerReq(pMnode, pVgroup, dnodeId, &contLen);
6,436✔
2944
  if (pReq == NULL) {
6,436✔
2945
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2946
    if (terrno != 0) code = terrno;
×
2947
    TAOS_RETURN(code);
×
2948
  }
2949

2950
  action.pCont = pReq;
6,436✔
2951
  action.contLen = contLen;
6,436✔
2952
  action.msgType = TDMT_SYNC_FORCE_FOLLOWER;
6,436✔
2953

2954
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
6,436✔
2955
    taosMemoryFree(pReq);
×
2956
    TAOS_RETURN(code);
×
2957
  }
2958

2959
  TAOS_RETURN(code);
6,436✔
2960
}
2961

2962
static void *mndBuildAlterVnodeElectBaselineReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId,
38,616✔
2963
                                          int32_t *pContLen, int32_t ms) {
2964
  SAlterVnodeElectBaselineReq alterReq = {
38,616✔
2965
      .vgId = pVgroup->vgId,
38,616✔
2966
      .electBaseLine = ms,
2967
  };
2968

2969
  int32_t contLen = tSerializeSAlterVnodeReplicaReq(NULL, 0, &alterReq);
38,616✔
2970
  if (contLen < 0) {
38,616✔
2971
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2972
    return NULL;
×
2973
  }
2974

2975
  void *pReq = taosMemoryMalloc(contLen);
38,616✔
2976
  if (pReq == NULL) {
38,616✔
2977
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2978
    return NULL;
×
2979
  }
2980

2981
  if (tSerializeSAlterVnodeReplicaReq(pReq, contLen, &alterReq) < 0) {
38,616✔
2982
    mError("vgId:%d, failed to serialize alter vnode req,since %s", alterReq.vgId, terrstr());
×
2983
    taosMemoryFree(pReq);
×
2984
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2985
    return NULL;
×
2986
  }
2987
  *pContLen = contLen;
38,616✔
2988
  return pReq;
38,616✔
2989
}
2990

2991
static int32_t mndAddAlterVnodeElectionBaselineActionToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId, int32_t ms) {
38,616✔
2992
  int32_t    code = 0;
38,616✔
2993
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
38,616✔
2994
  if (pDnode == NULL) {
38,616✔
2995
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2996
    if (terrno != 0) code = terrno;
×
2997
    TAOS_RETURN(code);
×
2998
  }
2999

3000
  STransAction action = {0};
38,616✔
3001
  action.epSet = mndGetDnodeEpset(pDnode);
38,616✔
3002
  mndReleaseDnode(pMnode, pDnode);
38,616✔
3003

3004
  int32_t contLen = 0;
38,616✔
3005
  void   *pReq = mndBuildAlterVnodeElectBaselineReq(pMnode, pDb, pVgroup, dnodeId, &contLen, ms);
38,616✔
3006
  if (pReq == NULL) {
38,616✔
3007
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3008
    if (terrno != 0) code = terrno;
×
3009
    TAOS_RETURN(code);
×
3010
  }
3011

3012
  action.pCont = pReq;
38,616✔
3013
  action.contLen = contLen;
38,616✔
3014
  action.msgType = TDMT_VND_ALTER_ELECTBASELINE;
38,616✔
3015
  action.groupId = pVgroup->vgId;
38,616✔
3016

3017
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
38,616✔
3018
    taosMemoryFree(pReq);
×
3019
    TAOS_RETURN(code);
×
3020
  }
3021

3022
  TAOS_RETURN(code);
38,616✔
3023
}
3024

3025
static int32_t mndAddAlterVgroupElectionBaselineActionToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTrans, int32_t index){
12,872✔
3026
  int32_t code = 0;
12,872✔
3027
  SSdb   *pSdb = pMnode->pSdb;
12,872✔
3028

3029
  int32_t vgid = pVgroup->vgId;
12,872✔
3030
  int8_t  replica = pVgroup->replica;
12,872✔
3031

3032
  if (pVgroup->replica <= 1) {
12,872✔
3033
    mInfo("trans:%d, vgid:%d no need to balance, replica:%d", pTrans->id, vgid, replica);
×
3034
    return -1;
×
3035
  }
3036

3037
  for(int32_t i = 0; i < 3; i++){
51,488✔
3038
    if(i == index%3){
38,616✔
3039
      mInfo("trans:%d, balance leader to dnode:%d", pTrans->id, pVgroup->vnodeGid[i].dnodeId);
6,436✔
3040
      TAOS_CHECK_RETURN(mndAddAlterVnodeElectionBaselineActionToTrans(pMnode, pTrans, NULL, pVgroup,
6,436✔
3041
                                                                      pVgroup->vnodeGid[i].dnodeId, 1500));
3042
    }
3043
    else{
3044
    TAOS_CHECK_RETURN(
32,180✔
3045
        mndAddAlterVnodeElectionBaselineActionToTrans(pMnode, pTrans, NULL, pVgroup, pVgroup->vnodeGid[i].dnodeId, 5000));
3046
    }
3047
  }
3048
  return code; 
12,872✔
3049
}
3050

3051
int32_t mndAddVgroupBalanceToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTrans, int32_t index) {
6,903✔
3052
  int32_t code = 0;
6,903✔
3053
  SSdb   *pSdb = pMnode->pSdb;
6,903✔
3054

3055
  int32_t vgid = pVgroup->vgId;
6,903✔
3056
  int8_t  replica = pVgroup->replica;
6,903✔
3057

3058
  if (pVgroup->replica <= 1) {
6,903✔
3059
    mInfo("trans:%d, vgid:%d no need to balance, replica:%d", pTrans->id, vgid, replica);
9✔
3060
    return -1;
9✔
3061
  }
3062

3063
  int32_t dnodeId = 0;
6,894✔
3064

3065
  for (int i = 0; i < replica; i++) {
12,176✔
3066
    if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEADER) {
11,718✔
3067
      dnodeId = pVgroup->vnodeGid[i].dnodeId;
6,436✔
3068
      break;
6,436✔
3069
    }
3070
  }
3071

3072
  bool       exist = false;
6,894✔
3073
  bool       online = false;
6,894✔
3074
  int64_t    curMs = taosGetTimestampMs();
6,894✔
3075
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
6,894✔
3076
  if (pDnode != NULL) {
6,894✔
3077
    exist = true;
6,436✔
3078
    online = mndIsDnodeOnline(pDnode, curMs);
6,436✔
3079
    mndReleaseDnode(pMnode, pDnode);
6,436✔
3080
  }
3081

3082
  if (exist && online) {
13,330✔
3083
    mInfo("trans:%d, vgid:%d force drop leader from dnode:%d", pTrans->id, vgid, dnodeId);    
6,436✔
3084
    TAOS_CHECK_RETURN(mndAddAlterVgroupElectionBaselineActionToTrans(pMnode, pVgroup, pTrans, index));
6,436✔
3085

3086
    if ((code = mndAddBalanceVgroupLeaderAction(pMnode, pTrans, pVgroup, dnodeId)) != 0) {
6,436✔
3087
      mError("trans:%d, vgid:%d failed to be balanced to dnode:%d", pTrans->id, vgid, dnodeId);
×
3088
      TAOS_RETURN(code);
×
3089
    }
3090

3091
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, NULL, pVgroup));
6,436✔
3092

3093
    TAOS_CHECK_RETURN(mndAddAlterVgroupElectionBaselineActionToTrans(pMnode, pVgroup, pTrans, -1));
6,436✔
3094

3095
    SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
6,436✔
3096
    if (pDb == NULL) {
6,436✔
3097
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3098
      if (terrno != 0) code = terrno;
×
3099
      mError("trans:%d, vgid:%d failed to be balanced to dnode:%d, because db not exist", pTrans->id, vgid, dnodeId);
×
3100
      TAOS_RETURN(code);
×
3101
    }
3102

3103
    mndReleaseDb(pMnode, pDb);
6,436✔
3104
  } else {
3105
    mInfo("trans:%d, vgid:%d cant be balanced to dnode:%d, exist:%d, online:%d", pTrans->id, vgid, dnodeId, exist,
458✔
3106
          online);
3107
  }
3108

3109
  TAOS_RETURN(code);
6,894✔
3110
}
3111

3112
extern int32_t mndProcessVgroupBalanceLeaderMsgImp(SRpcMsg *pReq);
3113

3114
int32_t mndProcessVgroupBalanceLeaderMsg(SRpcMsg *pReq) { return mndProcessVgroupBalanceLeaderMsgImp(pReq); }
2,480✔
3115

3116
#ifndef TD_ENTERPRISE
3117
int32_t mndProcessVgroupBalanceLeaderMsgImp(SRpcMsg *pReq) { return 0; }
3118
#endif
3119

3120
static int32_t mndCheckDnodeMemory(SMnode *pMnode, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pOldVgroup,
209,007✔
3121
                                   SVgObj *pNewVgroup, SArray *pArray) {
3122
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
606,686✔
3123
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
397,679✔
3124
    bool       inVgroup = false;
397,679✔
3125
    int64_t    oldMemUsed = 0;
397,679✔
3126
    int64_t    newMemUsed = 0;
397,679✔
3127
    mDebug("db:%s, vgId:%d, check dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName, pNewVgroup->vgId,
397,679✔
3128
           pDnode->id, pDnode->memAvail, pDnode->memUsed);
3129
    for (int32_t j = 0; j < pOldVgroup->replica; ++j) {
1,112,518✔
3130
      SVnodeGid *pVgId = &pOldVgroup->vnodeGid[j];
714,839✔
3131
      if (pDnode->id == pVgId->dnodeId) {
714,839✔
3132
        oldMemUsed = mndGetVgroupMemory(pMnode, pOldDb, pOldVgroup);
314,727✔
3133
        inVgroup = true;
314,727✔
3134
      }
3135
    }
3136
    for (int32_t j = 0; j < pNewVgroup->replica; ++j) {
1,112,518✔
3137
      SVnodeGid *pVgId = &pNewVgroup->vnodeGid[j];
714,839✔
3138
      if (pDnode->id == pVgId->dnodeId) {
714,839✔
3139
        newMemUsed = mndGetVgroupMemory(pMnode, pNewDb, pNewVgroup);
314,727✔
3140
        inVgroup = true;
314,727✔
3141
      }
3142
    }
3143

3144
    mDebug("db:%s, vgId:%d, memory in dnode:%d, oldUsed:%" PRId64 ", newUsed:%" PRId64, pNewVgroup->dbName,
397,679✔
3145
           pNewVgroup->vgId, pDnode->id, oldMemUsed, newMemUsed);
3146

3147
    pDnode->memUsed = pDnode->memUsed - oldMemUsed + newMemUsed;
397,679✔
3148
    if (pDnode->memAvail - pDnode->memUsed <= 0) {
397,679✔
3149
      mError("db:%s, vgId:%d, no enough memory in dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName,
×
3150
             pNewVgroup->vgId, pDnode->id, pDnode->memAvail, pDnode->memUsed);
3151
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE);
×
3152
    } else if (inVgroup) {
397,679✔
3153
      mInfo("db:%s, vgId:%d, memory in dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName, pNewVgroup->vgId,
314,727✔
3154
            pDnode->id, pDnode->memAvail, pDnode->memUsed);
3155
    } else {
3156
    }
3157
  }
3158
  return 0;
209,007✔
3159
}
3160

3161
int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
244,232✔
3162
                                  SArray *pArray, SVgObj *pNewVgroup) {
3163
  int32_t code = 0;
244,232✔
3164
  memcpy(pNewVgroup, pVgroup, sizeof(SVgObj));
244,232✔
3165

3166
  if (pVgroup->replica <= 0 || pVgroup->replica == pNewDb->cfg.replications) {
244,232✔
3167
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfigAction(pMnode, pTrans, pNewDb, pVgroup));
209,007✔
3168
    TAOS_CHECK_RETURN(mndCheckDnodeMemory(pMnode, pOldDb, pNewDb, pNewVgroup, pVgroup, pArray));
209,007✔
3169
    return 0;
209,007✔
3170
  }
3171

3172
  // mndTransSetGroupParallel(pTrans);
3173

3174
  if (pNewDb->cfg.replications == 3) {
35,225✔
3175
    mInfo("trans:%d, db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pTrans->id, pVgroup->dbName, pVgroup->vgId,
28,830✔
3176
          pVgroup->vnodeGid[0].dnodeId);
3177

3178
    // add second
3179
    if (pNewVgroup->replica == 1) {
28,830✔
3180
      TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
28,830✔
3181
    }
3182

3183
    // learner stage
3184
    pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
28,097✔
3185
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
28,097✔
3186
    TAOS_CHECK_RETURN(
28,097✔
3187
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3188

3189
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &pNewVgroup->vnodeGid[1]));
28,097✔
3190

3191
    // follower stage
3192
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
28,097✔
3193
    TAOS_CHECK_RETURN(mndAddAlterVnodeTypeAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
28,097✔
3194
    TAOS_CHECK_RETURN(
28,097✔
3195
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3196

3197
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
28,097✔
3198

3199
    // add third
3200
    if (pNewVgroup->replica == 2) {
28,097✔
3201
      TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
28,097✔
3202
    }
3203

3204
    pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
27,952✔
3205
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
27,952✔
3206
    pNewVgroup->vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
27,952✔
3207
    TAOS_CHECK_RETURN(
27,952✔
3208
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3209
    TAOS_CHECK_RETURN(
27,952✔
3210
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
3211
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &pNewVgroup->vnodeGid[2]));
27,952✔
3212

3213
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
27,952✔
3214
  } else if (pNewDb->cfg.replications == 1) {
6,395✔
3215
    mInfo("trans:%d, db:%s, vgId:%d, will remove 2 vnodes, vn:0 dnode:%d vn:1 dnode:%d vn:2 dnode:%d", pTrans->id,
4,929✔
3216
          pVgroup->dbName, pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId, pVgroup->vnodeGid[1].dnodeId,
3217
          pVgroup->vnodeGid[2].dnodeId);
3218

3219
    SVnodeGid del1 = {0};
4,929✔
3220
    SVnodeGid del2 = {0};
4,929✔
3221
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroup(pMnode, pTrans, pNewVgroup, pArray, &del1));
4,929✔
3222
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &del1, true));
4,929✔
3223
    TAOS_CHECK_RETURN(
4,929✔
3224
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3225
    TAOS_CHECK_RETURN(
4,929✔
3226
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
3227
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
4,929✔
3228

3229
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroup(pMnode, pTrans, pNewVgroup, pArray, &del2));
4,929✔
3230
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &del2, true));
4,929✔
3231
    TAOS_CHECK_RETURN(
4,929✔
3232
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3233
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
4,929✔
3234
  } else if (pNewDb->cfg.replications == 2) {
1,466✔
3235
    mInfo("trans:%d, db:%s, vgId:%d, will add 1 vnode, vn:0 dnode:%d", pTrans->id, pVgroup->dbName, pVgroup->vgId,
1,466✔
3236
          pVgroup->vnodeGid[0].dnodeId);
3237

3238
    // add second
3239
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
1,466✔
3240

3241
    // learner stage
3242
    pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
1,466✔
3243
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
1,466✔
3244
    TAOS_CHECK_RETURN(
1,466✔
3245
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3246

3247
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &pNewVgroup->vnodeGid[1]));
1,466✔
3248

3249
    // follower stage
3250
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
1,466✔
3251
    TAOS_CHECK_RETURN(mndAddAlterVnodeTypeAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
1,466✔
3252
    TAOS_CHECK_RETURN(
1,466✔
3253
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3254

3255
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
1,466✔
3256
  } else {
3257
    return -1;
×
3258
  }
3259

3260
  mndSortVnodeGid(pNewVgroup);
34,347✔
3261

3262
  {
3263
    SSdbRaw *pVgRaw = mndVgroupActionEncode(pNewVgroup);
34,347✔
3264
    if (pVgRaw == NULL) {
34,347✔
3265
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3266
      if (terrno != 0) code = terrno;
×
3267
      TAOS_RETURN(code);
×
3268
    }
3269
    if ((code = mndTransAppendCommitlog(pTrans, pVgRaw)) != 0) {
34,347✔
3270
      sdbFreeRaw(pVgRaw);
×
3271
      TAOS_RETURN(code);
×
3272
    }
3273
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
34,347✔
3274
    if (code != 0) {
34,347✔
3275
      mError("vgId:%d, failed to set raw status since %s at line:%d", pNewVgroup->vgId, tstrerror(code), __LINE__);
×
3276
      TAOS_RETURN(code);
×
3277
    }
3278
  }
3279

3280
  TAOS_RETURN(code);
34,347✔
3281
}
3282

3283
int32_t mndBuildRaftAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
×
3284
                                      SArray *pArray) {
3285
  int32_t code = 0;
×
3286
  SVgObj  newVgroup = {0};
×
3287
  memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
×
3288

3289
  if (pVgroup->replica <= 0 || pVgroup->replica == pNewDb->cfg.replications) {
×
3290
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfigAction(pMnode, pTrans, pNewDb, pVgroup));
×
3291
    TAOS_CHECK_RETURN(mndCheckDnodeMemory(pMnode, pOldDb, pNewDb, &newVgroup, pVgroup, pArray));
×
3292
    return 0;
×
3293
  }
3294

3295
  mndTransSetSerial(pTrans);
×
3296

3297
  mInfo("trans:%d, vgId:%d, alter vgroup, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id, pVgroup->vgId,
×
3298
        pVgroup->syncConfChangeVer, pVgroup->version, pVgroup->replica);
3299

3300
  if (newVgroup.replica == 1 && pNewDb->cfg.replications == 3) {
×
3301
    mInfo("db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId,
×
3302
          pVgroup->vnodeGid[0].dnodeId);
3303

3304
    // add second
3305
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVgroup, pArray));
×
3306
    // add third
3307
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVgroup, pArray));
×
3308

3309
    // add learner stage
3310
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3311
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3312
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3313
    TAOS_CHECK_RETURN(
×
3314
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3315
    mInfo("trans:%d, vgId:%d, add change config, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id,
×
3316
          pVgroup->vgId, newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
3317
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[1]));
×
3318
    mInfo("trans:%d, vgId:%d, create vnode, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id, pVgroup->vgId,
×
3319
          newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
3320
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[2]));
×
3321
    mInfo("trans:%d, vgId:%d, create vnode, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id, pVgroup->vgId,
×
3322
          newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
3323

3324
    // check learner
3325
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3326
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3327
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3328
    TAOS_CHECK_RETURN(
×
3329
        mndAddCheckLearnerCatchupAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[1].dnodeId));
3330
    TAOS_CHECK_RETURN(
×
3331
        mndAddCheckLearnerCatchupAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[2].dnodeId));
3332

3333
    // change raft type
3334
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3335
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3336
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3337
    TAOS_CHECK_RETURN(
×
3338
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3339

3340
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3341

3342
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3343
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3344
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3345
    TAOS_CHECK_RETURN(
×
3346
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3347

3348
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3349

3350
    SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
×
3351
    if (pVgRaw == NULL) {
×
3352
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3353
      if (terrno != 0) code = terrno;
×
3354
      TAOS_RETURN(code);
×
3355
    }
3356
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
3357
      sdbFreeRaw(pVgRaw);
×
3358
      TAOS_RETURN(code);
×
3359
    }
3360
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3361
    if (code != 0) {
×
3362
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3363
             __LINE__);
3364
      TAOS_RETURN(code);
×
3365
    }
3366
  } else if (newVgroup.replica == 3 && pNewDb->cfg.replications == 1) {
×
3367
    mInfo("db:%s, vgId:%d, will remove 2 vnodes, vn:0 dnode:%d vn:1 dnode:%d vn:2 dnode:%d", pVgroup->dbName,
×
3368
          pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId, pVgroup->vnodeGid[1].dnodeId, pVgroup->vnodeGid[2].dnodeId);
3369

3370
    SVnodeGid del1 = {0};
×
3371
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroupWithoutSave(pMnode, pTrans, &newVgroup, pArray, &del1));
×
3372

3373
    TAOS_CHECK_RETURN(
×
3374
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3375

3376
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3377

3378
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del1, true));
×
3379

3380
    SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
×
3381
    if (pVgRaw == NULL) {
×
3382
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3383
      if (terrno != 0) code = terrno;
×
3384
      TAOS_RETURN(code);
×
3385
    }
3386
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
3387
      sdbFreeRaw(pVgRaw);
×
3388
      TAOS_RETURN(code);
×
3389
    }
3390
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3391
    if (code != 0) {
×
3392
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3393
             __LINE__);
3394
      TAOS_RETURN(code);
×
3395
    }
3396

3397
    SVnodeGid del2 = {0};
×
3398
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroupWithoutSave(pMnode, pTrans, &newVgroup, pArray, &del2));
×
3399

3400
    TAOS_CHECK_RETURN(
×
3401
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3402

3403
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3404

3405
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del2, true));
×
3406

3407
    pVgRaw = mndVgroupActionEncode(&newVgroup);
×
3408
    if (pVgRaw == NULL) {
×
3409
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3410
      if (terrno != 0) code = terrno;
×
3411
      TAOS_RETURN(code);
×
3412
    }
3413
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
3414
      sdbFreeRaw(pVgRaw);
×
3415
      TAOS_RETURN(code);
×
3416
    }
3417
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3418
    if (code != 0) {
×
3419
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3420
             __LINE__);
3421
      TAOS_RETURN(code);
×
3422
    }
3423
  } else {
3424
    return -1;
×
3425
  }
3426

3427
  mndSortVnodeGid(&newVgroup);
×
3428

3429
  {
3430
    SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
×
3431
    if (pVgRaw == NULL) {
×
3432
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3433
      if (terrno != 0) code = terrno;
×
3434
      TAOS_RETURN(code);
×
3435
    }
3436
    if ((code = mndTransAppendCommitlog(pTrans, pVgRaw)) != 0) {
×
3437
      sdbFreeRaw(pVgRaw);
×
3438
      TAOS_RETURN(code);
×
3439
    }
3440
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3441
    if (code != 0) {
×
3442
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3443
             __LINE__);
3444
      TAOS_RETURN(code);
×
3445
    }
3446
  }
3447

3448
  TAOS_RETURN(code);
×
3449
}
3450

3451
int32_t mndBuildRestoreAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *db, SVgObj *pVgroup, SDnodeObj *pDnode,
6,070✔
3452
                                         SDnodeObj *pAnotherDnode) {
3453
  int32_t code = 0;
6,070✔
3454
  SVgObj  newVgroup = {0};
6,070✔
3455
  memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
6,070✔
3456

3457
  mInfo("trans:%d, db:%s, vgId:%d, restore vnodes, vn:0 dnode:%d", pTrans->id, pVgroup->dbName, pVgroup->vgId,
6,070✔
3458
        pVgroup->vnodeGid[0].dnodeId);
3459

3460
  if (newVgroup.replica == 1) {
6,070✔
3461
    int selected = 0;
×
3462
    for (int i = 0; i < newVgroup.replica; i++) {
×
3463
      newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3464
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3465
        selected = i;
×
3466
      }
3467
    }
3468
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, &newVgroup.vnodeGid[selected]));
×
3469
  } else if (newVgroup.replica == 2) {
6,070✔
3470
    for (int i = 0; i < newVgroup.replica; i++) {
×
3471
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3472
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3473
      } else {
3474
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3475
      }
3476
    }
3477
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pAnotherDnode));
×
3478

3479
    for (int i = 0; i < newVgroup.replica; i++) {
×
3480
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3481
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3482
      } else {
3483
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3484
      }
3485
    }
3486
    TAOS_CHECK_RETURN(mndRestoreAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, pDnode));
×
3487

3488
    for (int i = 0; i < newVgroup.replica; i++) {
×
3489
      newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3490
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3491
      }
3492
    }
3493
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pDnode));
×
3494
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pAnotherDnode));
×
3495
  } else if (newVgroup.replica == 3) {
6,070✔
3496
    for (int i = 0; i < newVgroup.replica; i++) {
24,280✔
3497
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
18,210✔
3498
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_LEARNER;
6,070✔
3499
      } else {
3500
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
12,140✔
3501
      }
3502
    }
3503
    TAOS_CHECK_RETURN(mndRestoreAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, pDnode));
6,070✔
3504

3505
    for (int i = 0; i < newVgroup.replica; i++) {
24,280✔
3506
      newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
18,210✔
3507
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
18,210✔
3508
      }
3509
    }
3510
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pDnode));
6,070✔
3511
  }
3512
  SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
6,070✔
3513
  if (pVgRaw == NULL) {
6,070✔
3514
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3515
    if (terrno != 0) code = terrno;
×
3516
    TAOS_RETURN(code);
×
3517
  }
3518
  if ((code = mndTransAppendCommitlog(pTrans, pVgRaw)) != 0) {
6,070✔
3519
    sdbFreeRaw(pVgRaw);
×
3520
    TAOS_RETURN(code);
×
3521
  }
3522
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
6,070✔
3523
  if (code != 0) {
6,070✔
3524
    mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code), __LINE__);
×
3525
    TAOS_RETURN(code);
×
3526
  }
3527

3528
  TAOS_RETURN(code);
6,070✔
3529
}
3530

3531
static int32_t mndAddAdjustVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
×
3532
  return 0;
×
3533
}
3534

3535
typedef int32_t (*FpTransActionCb)(STrans *pTrans, SSdbRaw *pRaw);
3536

3537
static int32_t mndAddVgStatusAction(STrans *pTrans, SVgObj *pVg, ESdbStatus vgStatus, ETrnStage stage) {
57,373✔
3538
  int32_t         code = 0;
57,373✔
3539
  FpTransActionCb appendActionCb = (stage == TRN_STAGE_COMMIT_ACTION) ? mndTransAppendCommitlog : mndTransAppendRedolog;
57,373✔
3540
  SSdbRaw        *pRaw = mndVgroupActionEncode(pVg);
57,373✔
3541
  if (pRaw == NULL) {
57,373✔
3542
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3543
    if (terrno != 0) code = terrno;
×
3544
    goto _err;
×
3545
  }
3546
  if ((code = appendActionCb(pTrans, pRaw)) != 0) goto _err;
57,373✔
3547
  code = sdbSetRawStatus(pRaw, vgStatus);
57,373✔
3548
  if (code != 0) {
57,373✔
3549
    mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", pVg->vgId, tstrerror(code), __LINE__);
×
3550
    goto _err;
×
3551
  }
3552
  pRaw = NULL;
57,373✔
3553
  TAOS_RETURN(code);
57,373✔
3554
_err:
×
3555
  sdbFreeRaw(pRaw);
×
3556
  TAOS_RETURN(code);
×
3557
}
3558

3559
static int32_t mndAddDbStatusAction(STrans *pTrans, SDbObj *pDb, ESdbStatus dbStatus, ETrnStage stage) {
22,681✔
3560
  int32_t         code = 0;
22,681✔
3561
  FpTransActionCb appendActionCb = (stage == TRN_STAGE_COMMIT_ACTION) ? mndTransAppendCommitlog : mndTransAppendRedolog;
22,681✔
3562
  SSdbRaw        *pRaw = mndDbActionEncode(pDb);
22,681✔
3563
  if (pRaw == NULL) {
22,681✔
3564
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3565
    if (terrno != 0) code = terrno;
×
3566
    goto _err;
×
3567
  }
3568
  if ((code = appendActionCb(pTrans, pRaw)) != 0) goto _err;
22,681✔
3569
  code = sdbSetRawStatus(pRaw, dbStatus);
22,681✔
3570
  if (code != 0) {
22,681✔
3571
    mError("db:%s, failed to set raw status to ready, error:%s, line:%d", pDb->name, tstrerror(code), __LINE__);
×
3572
    goto _err;
×
3573
  }
3574
  pRaw = NULL;
22,681✔
3575
  TAOS_RETURN(code);
22,681✔
3576
_err:
×
3577
  sdbFreeRaw(pRaw);
×
3578
  TAOS_RETURN(code);
×
3579
}
3580

3581
int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) {
11,636✔
3582
  int32_t code = -1;
11,636✔
3583
  STrans *pTrans = NULL;
11,636✔
3584
  SDbObj  dbObj = {0};
11,636✔
3585
  SArray *pArray = mndBuildDnodesArray(pMnode, 0, NULL);
11,636✔
3586

3587
#if defined(USE_SHARED_STORAGE)
3588
  if (tsSsEnabled) {
11,636✔
3589
    code = TSDB_CODE_OPS_NOT_SUPPORT;
×
3590
    mError("vgId:%d, db:%s, shared storage exists, split vgroup not allowed", pVgroup->vgId, pVgroup->dbName);
×
3591
    goto _OVER;
×
3592
  }
3593
#endif
3594

3595
  /*
3596
    if (pDb->cfg.withArbitrator) {
3597
      code = TSDB_CODE_OPS_NOT_SUPPORT;
3598
      mError("vgId:%d, db:%s, with arbitrator, split vgroup not allowed", pVgroup->vgId, pVgroup->dbName);
3599
      goto _OVER;
3600
    }
3601
  */
3602

3603
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "split-vgroup");
11,636✔
3604
  if (pTrans == NULL) {
11,636✔
3605
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3606
    if (terrno != 0) code = terrno;
×
3607
    goto _OVER;
×
3608
  }
3609
  mndTransSetSerial(pTrans);
11,636✔
3610
  mInfo("trans:%d, used to split vgroup, vgId:%d", pTrans->id, pVgroup->vgId);
11,636✔
3611

3612
  mndTransSetDbName(pTrans, pDb->name, NULL);
11,636✔
3613
  TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
11,636✔
3614
  TAOS_CHECK_GOTO(mndTransCheckConflictWithRetention(pMnode, pTrans), NULL, _OVER);
11,413✔
3615

3616
  SVgObj newVg1 = {0};
11,413✔
3617
  memcpy(&newVg1, pVgroup, sizeof(SVgObj));
11,413✔
3618
  mInfo("vgId:%d, vgroup info before split, replica:%d hashBegin:%u hashEnd:%u", newVg1.vgId, newVg1.replica,
11,413✔
3619
        newVg1.hashBegin, newVg1.hashEnd);
3620
  for (int32_t i = 0; i < newVg1.replica; ++i) {
33,053✔
3621
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg1.vgId, i, newVg1.vnodeGid[i].dnodeId);
21,640✔
3622
  }
3623

3624
  if (newVg1.replica == 1) {
11,413✔
3625
    TAOS_CHECK_GOTO(mndAddVnodeToVgroup(pMnode, pTrans, &newVg1, pArray), NULL, _OVER);
5,933✔
3626

3627
    newVg1.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
5,933✔
3628
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId), NULL,
5,933✔
3629
                    _OVER);
3630
    TAOS_CHECK_GOTO(mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg1, &newVg1.vnodeGid[1]), NULL, _OVER);
5,933✔
3631

3632
    newVg1.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
5,933✔
3633
    TAOS_CHECK_GOTO(mndAddAlterVnodeTypeAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[1].dnodeId), NULL, _OVER);
5,933✔
3634
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId), NULL,
5,933✔
3635
                    _OVER);
3636

3637
    TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1), NULL, _OVER);
5,933✔
3638
  } else if (newVg1.replica == 3) {
5,480✔
3639
    SVnodeGid del1 = {0};
4,747✔
3640
    TAOS_CHECK_GOTO(mndRemoveVnodeFromVgroup(pMnode, pTrans, &newVg1, pArray, &del1), NULL, _OVER);
4,747✔
3641
    TAOS_CHECK_GOTO(mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg1, &del1, true), NULL, _OVER);
4,747✔
3642
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId), NULL,
4,747✔
3643
                    _OVER);
3644
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[1].dnodeId), NULL,
4,747✔
3645
                    _OVER);
3646
  } else {
3647
    // goto _OVER;
3648
  }
3649

3650
  for (int32_t i = 0; i < newVg1.replica; ++i) {
34,239✔
3651
    TAOS_CHECK_GOTO(mndAddDisableVnodeWriteAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[i].dnodeId), NULL,
22,826✔
3652
                    _OVER);
3653
  }
3654
  TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1), NULL, _OVER);
11,413✔
3655

3656
  SVgObj newVg2 = {0};
11,413✔
3657
  memcpy(&newVg2, &newVg1, sizeof(SVgObj));
11,413✔
3658
  newVg1.replica = 1;
11,413✔
3659
  newVg1.hashEnd = newVg1.hashBegin / 2 + newVg1.hashEnd / 2;
11,413✔
3660
  memset(&newVg1.vnodeGid[1], 0, sizeof(SVnodeGid));
11,413✔
3661

3662
  newVg2.replica = 1;
11,413✔
3663
  newVg2.hashBegin = newVg1.hashEnd + 1;
11,413✔
3664
  memcpy(&newVg2.vnodeGid[0], &newVg2.vnodeGid[1], sizeof(SVnodeGid));
11,413✔
3665
  memset(&newVg2.vnodeGid[1], 0, sizeof(SVnodeGid));
11,413✔
3666

3667
  mInfo("vgId:%d, vgroup info after split, replica:%d hashrange:[%u, %u] vnode:0 dnode:%d", newVg1.vgId, newVg1.replica,
11,413✔
3668
        newVg1.hashBegin, newVg1.hashEnd, newVg1.vnodeGid[0].dnodeId);
3669
  for (int32_t i = 0; i < newVg1.replica; ++i) {
22,826✔
3670
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg1.vgId, i, newVg1.vnodeGid[i].dnodeId);
11,413✔
3671
  }
3672
  mInfo("vgId:%d, vgroup info after split, replica:%d hashrange:[%u, %u] vnode:0 dnode:%d", newVg2.vgId, newVg2.replica,
11,413✔
3673
        newVg2.hashBegin, newVg2.hashEnd, newVg2.vnodeGid[0].dnodeId);
3674
  for (int32_t i = 0; i < newVg1.replica; ++i) {
22,826✔
3675
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg2.vgId, i, newVg2.vnodeGid[i].dnodeId);
11,413✔
3676
  }
3677

3678
  // alter vgId and hash range
3679
  int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
11,413✔
3680
  int32_t srcVgId = newVg1.vgId;
11,413✔
3681
  newVg1.vgId = maxVgId;
11,413✔
3682
  TAOS_CHECK_GOTO(mndAddNewVgPrepareAction(pMnode, pTrans, &newVg1), NULL, _OVER);
11,413✔
3683
  TAOS_CHECK_GOTO(mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg1), NULL, _OVER);
11,413✔
3684

3685
  maxVgId++;
11,413✔
3686
  srcVgId = newVg2.vgId;
11,413✔
3687
  newVg2.vgId = maxVgId;
11,413✔
3688
  TAOS_CHECK_GOTO(mndAddNewVgPrepareAction(pMnode, pTrans, &newVg2), NULL, _OVER);
11,413✔
3689
  TAOS_CHECK_GOTO(mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg2), NULL, _OVER);
11,413✔
3690

3691
  TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1), NULL, _OVER);
11,413✔
3692
  TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg2), NULL, _OVER);
11,413✔
3693

3694
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg1, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION), NULL, _OVER);
11,413✔
3695
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg2, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION), NULL, _OVER);
11,413✔
3696
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, pVgroup, SDB_STATUS_DROPPED, TRN_STAGE_REDO_ACTION), NULL, _OVER);
11,413✔
3697

3698
  // update db status
3699
  memcpy(&dbObj, pDb, sizeof(SDbObj));
11,413✔
3700
  if (dbObj.cfg.pRetensions != NULL) {
11,413✔
3701
    dbObj.cfg.pRetensions = taosArrayDup(pDb->cfg.pRetensions, NULL);
×
3702
    if (dbObj.cfg.pRetensions == NULL) {
×
3703
      code = terrno;
×
3704
      goto _OVER;
×
3705
    }
3706
  }
3707
  dbObj.vgVersion++;
11,413✔
3708
  dbObj.updateTime = taosGetTimestampMs();
11,413✔
3709
  dbObj.cfg.numOfVgroups++;
11,413✔
3710
  TAOS_CHECK_GOTO(mndAddDbStatusAction(pTrans, &dbObj, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION), NULL, _OVER);
11,413✔
3711

3712
  // adjust vgroup replica
3713
  if (pDb->cfg.replications != newVg1.replica) {
11,413✔
3714
    SVgObj tmpGroup = {0};
5,480✔
3715
    TAOS_CHECK_GOTO(mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg1, pArray, &tmpGroup), NULL, _OVER);
5,480✔
3716
  } else {
3717
    TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg1, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
5,933✔
3718
  }
3719

3720
  if (pDb->cfg.replications != newVg2.replica) {
11,353✔
3721
    SVgObj tmpGroup = {0};
5,420✔
3722
    TAOS_CHECK_GOTO(mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg2, pArray, &tmpGroup), NULL, _OVER);
5,420✔
3723
  } else {
3724
    TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg2, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
5,933✔
3725
  }
3726

3727
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, pVgroup, SDB_STATUS_DROPPED, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
11,268✔
3728

3729
  // commit db status
3730
  dbObj.vgVersion++;
11,268✔
3731
  dbObj.updateTime = taosGetTimestampMs();
11,268✔
3732
  TAOS_CHECK_GOTO(mndAddDbStatusAction(pTrans, &dbObj, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
11,268✔
3733

3734
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
11,268✔
3735
  code = 0;
11,268✔
3736

3737
_OVER:
11,636✔
3738
  taosArrayDestroy(pArray);
11,636✔
3739
  mndTransDrop(pTrans);
11,636✔
3740
  taosArrayDestroy(dbObj.cfg.pRetensions);
11,636✔
3741
  TAOS_RETURN(code);
11,636✔
3742
}
3743

3744
extern int32_t mndProcessSplitVgroupMsgImp(SRpcMsg *pReq);
3745

3746
static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) { return mndProcessSplitVgroupMsgImp(pReq); }
12,380✔
3747

3748
#ifndef TD_ENTERPRISE
3749
int32_t mndProcessSplitVgroupMsgImp(SRpcMsg *pReq) { return 0; }
3750
#endif
3751

3752
static int32_t mndSetBalanceVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
15,480✔
3753
                                              SDnodeObj *pSrc, SDnodeObj *pDst) {
3754
  int32_t code = 0;
15,480✔
3755
  SVgObj  newVg = {0};
15,480✔
3756
  memcpy(&newVg, pVgroup, sizeof(SVgObj));
15,480✔
3757
  mInfo("vgId:%d, vgroup info before balance, replica:%d", newVg.vgId, newVg.replica);
15,480✔
3758
  for (int32_t i = 0; i < newVg.replica; ++i) {
45,356✔
3759
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
29,876✔
3760
  }
3761

3762
  TAOS_CHECK_RETURN(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pDst->id));
15,480✔
3763
  TAOS_CHECK_RETURN(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pSrc->id));
15,480✔
3764

3765
  {
3766
    SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
15,480✔
3767
    if (pRaw == NULL) {
15,480✔
3768
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3769
      if (terrno != 0) code = terrno;
×
3770
      TAOS_RETURN(code);
×
3771
    }
3772
    if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
15,480✔
3773
      sdbFreeRaw(pRaw);
×
3774
      TAOS_RETURN(code);
×
3775
    }
3776
    code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
15,480✔
3777
    if (code != 0) {
15,480✔
3778
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
3779
      TAOS_RETURN(code);
×
3780
    }
3781
  }
3782

3783
  mInfo("vgId:%d, vgroup info after balance, replica:%d", newVg.vgId, newVg.replica);
15,480✔
3784
  for (int32_t i = 0; i < newVg.replica; ++i) {
45,356✔
3785
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
29,876✔
3786
  }
3787
  TAOS_RETURN(code);
15,480✔
3788
}
3789

3790
static int32_t mndBalanceVgroupBetweenDnode(SMnode *pMnode, STrans *pTrans, SDnodeObj *pSrc, SDnodeObj *pDst,
15,480✔
3791
                                            SHashObj *pBalancedVgroups) {
3792
  void   *pIter = NULL;
15,480✔
3793
  int32_t code = -1;
15,480✔
3794
  SSdb   *pSdb = pMnode->pSdb;
15,480✔
3795

3796
  while (1) {
9,614✔
3797
    SVgObj *pVgroup = NULL;
25,094✔
3798
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
25,094✔
3799
    if (pIter == NULL) break;
25,094✔
3800
    if (taosHashGet(pBalancedVgroups, &pVgroup->vgId, sizeof(int32_t)) != NULL) {
25,094✔
3801
      sdbRelease(pSdb, pVgroup);
8,658✔
3802
      continue;
8,658✔
3803
    }
3804

3805
    bool existInSrc = false;
16,436✔
3806
    bool existInDst = false;
16,436✔
3807
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
47,268✔
3808
      SVnodeGid *pGid = &pVgroup->vnodeGid[i];
30,832✔
3809
      if (pGid->dnodeId == pSrc->id) existInSrc = true;
30,832✔
3810
      if (pGid->dnodeId == pDst->id) existInDst = true;
30,832✔
3811
    }
3812

3813
    if (!existInSrc || existInDst) {
16,436✔
3814
      sdbRelease(pSdb, pVgroup);
956✔
3815
      continue;
956✔
3816
    }
3817

3818
    SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
15,480✔
3819
    if (pDb == NULL) {
15,480✔
3820
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3821
      if (terrno != 0) code = terrno;
×
3822
      mError("vgId:%d, balance vgroup can't find db obj dbName:%s", pVgroup->vgId, pVgroup->dbName);
×
3823
      goto _OUT;
×
3824
    }
3825

3826
    if (pDb->cfg.withArbitrator) {
15,480✔
3827
      mInfo("vgId:%d, db:%s, with arbitrator, balance vgroup not allowed", pVgroup->vgId, pVgroup->dbName);
×
3828
      goto _OUT;
×
3829
    }
3830

3831
    code = mndSetBalanceVgroupInfoToTrans(pMnode, pTrans, pDb, pVgroup, pSrc, pDst);
15,480✔
3832
    if (code == 0) {
15,480✔
3833
      code = taosHashPut(pBalancedVgroups, &pVgroup->vgId, sizeof(int32_t), &pVgroup->vgId, sizeof(int32_t));
15,480✔
3834
    }
3835

3836
  _OUT:
15,480✔
3837
    mndReleaseDb(pMnode, pDb);
15,480✔
3838
    sdbRelease(pSdb, pVgroup);
15,480✔
3839
    sdbCancelFetch(pSdb, pIter);
15,480✔
3840
    break;
15,480✔
3841
  }
3842

3843
  return code;
15,480✔
3844
}
3845

3846
static int32_t mndBalanceVgroup(SMnode *pMnode, SRpcMsg *pReq, SArray *pArray) {
11,485✔
3847
  int32_t   code = -1;
11,485✔
3848
  int32_t   numOfVgroups = 0;
11,485✔
3849
  STrans   *pTrans = NULL;
11,485✔
3850
  SHashObj *pBalancedVgroups = NULL;
11,485✔
3851

3852
  pBalancedVgroups = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
11,485✔
3853
  if (pBalancedVgroups == NULL) goto _OVER;
11,485✔
3854

3855
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "balance-vgroup");
11,485✔
3856
  if (pTrans == NULL) {
11,485✔
3857
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3858
    if (terrno != 0) code = terrno;
×
3859
    goto _OVER;
×
3860
  }
3861
  mndTransSetSerial(pTrans);
11,485✔
3862
  mInfo("trans:%d, used to balance vgroup", pTrans->id);
11,485✔
3863
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
11,485✔
3864
  TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
9,924✔
3865
  TAOS_CHECK_GOTO(mndTransCheckConflictWithRetention(pMnode, pTrans), NULL, _OVER);
9,701✔
3866

3867
  while (1) {
15,480✔
3868
    taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
25,181✔
3869
    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
108,515✔
3870
      SDnodeObj *pDnode = taosArrayGet(pArray, i);
83,334✔
3871
      mInfo("dnode:%d, equivalent vnodes:%d others:%d support:%d, score:%f", pDnode->id, pDnode->numOfVnodes,
83,334✔
3872
            pDnode->numOfSupportVnodes, pDnode->numOfOtherNodes, mndGetDnodeScore(pDnode, 0, 1));
3873
    }
3874

3875
    SDnodeObj *pSrc = taosArrayGet(pArray, taosArrayGetSize(pArray) - 1);
25,181✔
3876
    SDnodeObj *pDst = taosArrayGet(pArray, 0);
25,181✔
3877

3878
    float srcScore = mndGetDnodeScore(pSrc, -1, 1);
25,181✔
3879
    float dstScore = mndGetDnodeScore(pDst, 1, 1);
25,181✔
3880
    mInfo("trans:%d, after balance, src dnode:%d score:%f, dst dnode:%d score:%f", pTrans->id, pSrc->id, dstScore,
25,181✔
3881
          pDst->id, dstScore);
3882

3883
    if (srcScore > dstScore - 0.000001) {
25,181✔
3884
      code = mndBalanceVgroupBetweenDnode(pMnode, pTrans, pSrc, pDst, pBalancedVgroups);
15,480✔
3885
      if (code == 0) {
15,480✔
3886
        pSrc->numOfVnodes--;
15,480✔
3887
        pDst->numOfVnodes++;
15,480✔
3888
        numOfVgroups++;
15,480✔
3889
        continue;
15,480✔
3890
      } else {
3891
        mInfo("trans:%d, no vgroup need to balance from dnode:%d to dnode:%d", pTrans->id, pSrc->id, pDst->id);
×
3892
        break;
×
3893
      }
3894
    } else {
3895
      mInfo("trans:%d, no vgroup need to balance any more", pTrans->id);
9,701✔
3896
      break;
9,701✔
3897
    }
3898
  }
3899

3900
  if (numOfVgroups <= 0) {
9,701✔
3901
    mInfo("no need to balance vgroup");
×
3902
    code = 0;
×
3903
  } else {
3904
    mInfo("start to balance vgroup, numOfVgroups:%d", numOfVgroups);
9,701✔
3905
    if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
9,701✔
3906
    code = TSDB_CODE_ACTION_IN_PROGRESS;
9,701✔
3907
  }
3908

3909
_OVER:
11,485✔
3910
  taosHashCleanup(pBalancedVgroups);
11,485✔
3911
  mndTransDrop(pTrans);
11,485✔
3912
  TAOS_RETURN(code);
11,485✔
3913
}
3914

3915
static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) {
13,309✔
3916
  SMnode *pMnode = pReq->info.node;
13,309✔
3917
  int32_t code = -1;
13,309✔
3918
  SArray *pArray = NULL;
13,309✔
3919
  void   *pIter = NULL;
13,309✔
3920
  int64_t curMs = taosGetTimestampMs();
13,309✔
3921

3922
  SBalanceVgroupReq req = {0};
13,309✔
3923
  if (tDeserializeSBalanceVgroupReq(pReq->pCont, pReq->contLen, &req) != 0) {
13,309✔
3924
    code = TSDB_CODE_INVALID_MSG;
×
3925
    goto _OVER;
×
3926
  }
3927

3928
  mInfo("start to balance vgroup");
13,309✔
3929
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_BALANCE_VGROUP)) != 0) {
13,309✔
3930
    goto _OVER;
350✔
3931
  }
3932

3933
  if (sdbGetSize(pMnode->pSdb, SDB_MOUNT) > 0) {
12,959✔
3934
    code = TSDB_CODE_MND_MOUNT_NOT_EMPTY;
×
3935
    goto _OVER;
×
3936
  }
3937

3938
  while (1) {
39,515✔
3939
    SDnodeObj *pDnode = NULL;
52,474✔
3940
    pIter = sdbFetch(pMnode->pSdb, SDB_DNODE, pIter, (void **)&pDnode);
52,474✔
3941
    if (pIter == NULL) break;
52,474✔
3942
    if (!mndIsDnodeOnline(pDnode, curMs)) {
40,989✔
3943
      sdbCancelFetch(pMnode->pSdb, pIter);
1,474✔
3944
      code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
1,474✔
3945
      mError("failed to balance vgroup since %s, dnode:%d", terrstr(), pDnode->id);
1,474✔
3946
      sdbRelease(pMnode->pSdb, pDnode);
1,474✔
3947
      goto _OVER;
1,474✔
3948
    }
3949

3950
    sdbRelease(pMnode->pSdb, pDnode);
39,515✔
3951
  }
3952

3953
  pArray = mndBuildDnodesArray(pMnode, 0, NULL);
11,485✔
3954
  if (pArray == NULL) {
11,485✔
3955
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3956
    if (terrno != 0) code = terrno;
×
3957
    goto _OVER;
×
3958
  }
3959

3960
  if (taosArrayGetSize(pArray) < 2) {
11,485✔
3961
    mInfo("no need to balance vgroup since dnode num less than 2");
×
3962
    code = 0;
×
3963
  } else {
3964
    code = mndBalanceVgroup(pMnode, pReq, pArray);
11,485✔
3965
  }
3966

3967
  auditRecord(pReq, pMnode->clusterId, "balanceVgroup", "", "", req.sql, req.sqlLen);
11,485✔
3968

3969
_OVER:
13,309✔
3970
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
13,309✔
3971
    mError("failed to balance vgroup since %s", tstrerror(code));
3,608✔
3972
  }
3973

3974
  taosArrayDestroy(pArray);
13,309✔
3975
  tFreeSBalanceVgroupReq(&req);
13,309✔
3976
  TAOS_RETURN(code);
13,309✔
3977
}
3978

3979
bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; }
96,011,988✔
3980

3981
bool mndVgroupInDnode(SVgObj *pVgroup, int32_t dnodeId) {
9,712✔
3982
  for (int i = 0; i < pVgroup->replica; i++) {
24,266✔
3983
    if (pVgroup->vnodeGid[i].dnodeId == dnodeId) return true;
20,624✔
3984
  }
3985
  return false;
3,642✔
3986
}
3987

3988
static void *mndBuildCompactVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen, int64_t compactTs,
127,410✔
3989
                                     STimeWindow tw, bool metaOnly, bool force, ETsdbOpType type,
3990
                                     ETriggerType triggerType) {
3991
  SCompactVnodeReq compactReq = {0};
127,410✔
3992
  compactReq.dbUid = pDb->uid;
127,410✔
3993
  compactReq.compactStartTime = compactTs;
127,410✔
3994
  compactReq.tw = tw;
127,410✔
3995
  compactReq.metaOnly = metaOnly;
127,410✔
3996
  compactReq.force = force;
127,410✔
3997
  compactReq.optrType = type;
127,410✔
3998
  compactReq.triggerType = triggerType;
127,410✔
3999
  tstrncpy(compactReq.db, pDb->name, TSDB_DB_FNAME_LEN);
127,410✔
4000

4001
  mInfo("vgId:%d, build compact vnode config req", pVgroup->vgId);
127,410✔
4002
  int32_t contLen = tSerializeSCompactVnodeReq(NULL, 0, &compactReq);
127,410✔
4003
  if (contLen < 0) {
127,410✔
4004
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
4005
    return NULL;
×
4006
  }
4007
  contLen += sizeof(SMsgHead);
127,410✔
4008

4009
  void *pReq = taosMemoryMalloc(contLen);
127,410✔
4010
  if (pReq == NULL) {
127,410✔
4011
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
4012
    return NULL;
×
4013
  }
4014

4015
  SMsgHead *pHead = pReq;
127,410✔
4016
  pHead->contLen = htonl(contLen);
127,410✔
4017
  pHead->vgId = htonl(pVgroup->vgId);
127,410✔
4018

4019
  if (tSerializeSCompactVnodeReq((char *)pReq + sizeof(SMsgHead), contLen, &compactReq) < 0) {
127,410✔
4020
    taosMemoryFree(pReq);
×
4021
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
4022
    return NULL;
×
4023
  }
4024
  *pContLen = contLen;
127,410✔
4025
  return pReq;
127,410✔
4026
}
4027

4028
static int32_t mndAddCompactVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs,
62,093✔
4029
                                        STimeWindow tw, bool metaOnly, bool force, ETsdbOpType type,
4030
                                        ETriggerType triggerType) {
4031
  int32_t      code = 0;
62,093✔
4032
  STransAction action = {0};
62,093✔
4033
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
62,093✔
4034

4035
  int32_t contLen = 0;
62,093✔
4036
  void   *pReq =
4037
      mndBuildCompactVnodeReq(pMnode, pDb, pVgroup, &contLen, compactTs, tw, metaOnly, force, type, triggerType);
62,093✔
4038
  if (pReq == NULL) {
62,093✔
4039
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
4040
    if (terrno != 0) code = terrno;
×
4041
    TAOS_RETURN(code);
×
4042
  }
4043

4044
  action.pCont = pReq;
62,093✔
4045
  action.contLen = contLen;
62,093✔
4046
  action.msgType = TDMT_VND_COMPACT;
62,093✔
4047

4048
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
62,093✔
4049
    taosMemoryFree(pReq);
×
4050
    TAOS_RETURN(code);
×
4051
  }
4052

4053
  TAOS_RETURN(code);
62,093✔
4054
}
4055

4056
int32_t mndBuildCompactVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs,
62,093✔
4057
                                    STimeWindow tw, bool metaOnly, bool force, ETsdbOpType type,
4058
                                    ETriggerType triggerType) {
4059
  TAOS_CHECK_RETURN(
62,093✔
4060
      mndAddCompactVnodeAction(pMnode, pTrans, pDb, pVgroup, compactTs, tw, metaOnly, force, type, triggerType));
4061
  return 0;
62,093✔
4062
}
4063

4064
int32_t mndBuildTrimVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t startTs,
65,317✔
4065
                                 STimeWindow tw, ETsdbOpType type, ETriggerType triggerType) {
4066
  int32_t      code = 0;
65,317✔
4067
  STransAction action = {0};
65,317✔
4068
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
65,317✔
4069

4070
  int32_t contLen = 0;
65,317✔
4071
  // reuse SCompactVnodeReq as SVTrimDbReq
4072
  void *pReq = mndBuildCompactVnodeReq(pMnode, pDb, pVgroup, &contLen, startTs, tw, false, false, type, triggerType);
65,317✔
4073
  if (pReq == NULL) {
65,317✔
4074
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
4075
    if (terrno != 0) code = terrno;
×
4076
    TAOS_RETURN(code);
×
4077
  }
4078

4079
  action.pCont = pReq;
65,317✔
4080
  action.contLen = contLen;
65,317✔
4081
  action.msgType = TDMT_VND_TRIM;
65,317✔
4082

4083
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
65,317✔
4084
    taosMemoryFree(pReq);
×
4085
    TAOS_RETURN(code);
×
4086
  }
4087

4088
  TAOS_RETURN(code);
65,317✔
4089
}
4090

4091
static int32_t mndProcessSetVgroupKeepVersionReq(SRpcMsg *pReq) {
1,193✔
4092
  SMnode *pMnode = pReq->info.node;
1,193✔
4093
  int32_t code = TSDB_CODE_SUCCESS;
1,193✔
4094
  STrans *pTrans = NULL;
1,193✔
4095
  SVgObj *pVgroup = NULL;
1,193✔
4096

4097
  SMndSetVgroupKeepVersionReq req = {0};
1,193✔
4098
  if (tDeserializeSMndSetVgroupKeepVersionReq(pReq->pCont, pReq->contLen, &req) != 0) {
1,193✔
4099
    code = TSDB_CODE_INVALID_MSG;
×
4100
    goto _OVER;
×
4101
  }
4102

4103
  mInfo("start to set vgroup keep version, vgId:%d, keepVersion:%" PRId64, req.vgId, req.keepVersion);
1,193✔
4104

4105
  // Check permission
4106
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB)) != 0) {
1,193✔
4107
    goto _OVER;
×
4108
  }
4109

4110
  // Get vgroup
4111
  pVgroup = mndAcquireVgroup(pMnode, req.vgId);
1,193✔
4112
  if (pVgroup == NULL) {
1,193✔
4113
    code = TSDB_CODE_MND_VGROUP_NOT_EXIST;
×
4114
    mError("vgId:%d not exist, failed to set keep version", req.vgId);
×
4115
    goto _OVER;
×
4116
  }
4117

4118
  // Create transaction
4119
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "set-vgroup-keep-version");
1,193✔
4120
  if (pTrans == NULL) {
1,193✔
4121
    code = terrno != 0 ? terrno : TSDB_CODE_MND_RETURN_VALUE_NULL;
×
4122
    mndReleaseVgroup(pMnode, pVgroup);
×
4123
    goto _OVER;
×
4124
  }
4125

4126
  mndTransSetSerial(pTrans);
1,193✔
4127
  mInfo("trans:%d, used to set vgroup keep version, vgId:%d keepVersion:%" PRId64, pTrans->id, req.vgId,
1,193✔
4128
        req.keepVersion);
4129

4130
  // Update SVgObj's keepVersion in mnode
4131
  SVgObj newVgroup = {0};
1,193✔
4132
  memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
1,193✔
4133
  newVgroup.keepVersion = req.keepVersion;
1,193✔
4134
  newVgroup.keepVersionTime = taosGetTimestampMs();
1,193✔
4135

4136
  // Add prepare log for SDB vgroup update (execute in PREPARE stage, before redo actions)
4137
  SSdbRaw *pCommitRaw = mndVgroupActionEncode(&newVgroup);
1,193✔
4138
  if (pCommitRaw == NULL) {
1,193✔
4139
    code = TSDB_CODE_OUT_OF_MEMORY;
×
4140
    mndReleaseVgroup(pMnode, pVgroup);
×
4141
    goto _OVER;
×
4142
  }
4143
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
1,193✔
4144
    code = terrno;
×
4145
    sdbFreeRaw(pCommitRaw);
×
4146
    mndReleaseVgroup(pMnode, pVgroup);
×
4147
    goto _OVER;
×
4148
  }
4149
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
1,193✔
4150
    mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", pVgroup->vgId, tstrerror(code), __LINE__);
×
4151
    sdbFreeRaw(pCommitRaw);
×
4152
    mndReleaseVgroup(pMnode, pVgroup);
×
4153
    goto _OVER;
×
4154
  }
4155

4156
  // Prepare message for vnodes
4157
  SVndSetKeepVersionReq vndReq = {.keepVersion = req.keepVersion};
1,193✔
4158
  int32_t               reqLen = tSerializeSVndSetKeepVersionReq(NULL, 0, &vndReq);
1,193✔
4159
  int32_t               contLen = reqLen + sizeof(SMsgHead);
1,193✔
4160

4161
  // Send to all replicas of the vgroup
4162
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
4,772✔
4163
    SMsgHead *pHead = taosMemoryMalloc(contLen);
3,579✔
4164
    if (pHead == NULL) {
3,579✔
4165
      code = TSDB_CODE_OUT_OF_MEMORY;
×
4166
      mndReleaseVgroup(pMnode, pVgroup);
×
4167
      goto _OVER;
×
4168
    }
4169

4170
    pHead->contLen = htonl(contLen);
3,579✔
4171
    pHead->vgId = htonl(pVgroup->vgId);
3,579✔
4172

4173
    if (tSerializeSVndSetKeepVersionReq((char *)pHead + sizeof(SMsgHead), reqLen, &vndReq) < 0) {
3,579✔
4174
      taosMemoryFree(pHead);
×
4175
      code = TSDB_CODE_OUT_OF_MEMORY;
×
4176
      mndReleaseVgroup(pMnode, pVgroup);
×
4177
      goto _OVER;
×
4178
    }
4179

4180
    // Get dnode and add action to transaction
4181
    SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgroup->vnodeGid[i].dnodeId);
3,579✔
4182
    if (pDnode == NULL) {
3,579✔
4183
      taosMemoryFree(pHead);
×
4184
      code = TSDB_CODE_MND_DNODE_NOT_EXIST;
×
4185
      mndReleaseVgroup(pMnode, pVgroup);
×
4186
      goto _OVER;
×
4187
    }
4188

4189
    STransAction action = {0};
3,579✔
4190
    action.epSet = mndGetDnodeEpset(pDnode);
3,579✔
4191
    mndReleaseDnode(pMnode, pDnode);
3,579✔
4192
    action.pCont = pHead;
3,579✔
4193
    action.contLen = contLen;
3,579✔
4194
    action.msgType = TDMT_VND_SET_KEEP_VERSION;
3,579✔
4195
    action.acceptableCode = TSDB_CODE_VND_STOPPED;
3,579✔
4196

4197
    if (mndTransAppendRedoAction(pTrans, &action) != 0) {
3,579✔
4198
      taosMemoryFree(pHead);
×
4199
      code = terrno;
×
4200
      mndReleaseVgroup(pMnode, pVgroup);
×
4201
      goto _OVER;
×
4202
    }
4203
  }
4204

4205
  mndReleaseVgroup(pMnode, pVgroup);
1,193✔
4206

4207
  // Prepare and execute transaction
4208
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
1,193✔
4209
    goto _OVER;
×
4210
  }
4211

4212
  code = TSDB_CODE_ACTION_IN_PROGRESS;
1,193✔
4213

4214
_OVER:
1,193✔
4215
  if (pTrans != NULL) mndTransDrop(pTrans);
1,193✔
4216

4217
  return code;
1,193✔
4218
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc