• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3543

29 Nov 2024 02:58AM UTC coverage: 60.842% (+0.02%) from 60.819%
#3543

push

travis-ci

web-flow
Merge pull request #28973 from taosdata/merge/mainto3.0

merge: from main to 3.0

120460 of 253224 branches covered (47.57%)

Branch coverage included in aggregate %.

706 of 908 new or added lines in 18 files covered. (77.75%)

2401 existing lines in 137 files now uncovered.

201633 of 276172 relevant lines covered (73.01%)

19045673.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.54
/source/dnode/mnode/impl/src/mndVgroup.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndVgroup.h"
18
#include "audit.h"
19
#include "mndArbGroup.h"
20
#include "mndDb.h"
21
#include "mndDnode.h"
22
#include "mndMnode.h"
23
#include "mndPrivilege.h"
24
#include "mndShow.h"
25
#include "mndStb.h"
26
#include "mndStream.h"
27
#include "mndTopic.h"
28
#include "mndTrans.h"
29
#include "mndUser.h"
30
#include "tmisce.h"
31

32
#define VGROUP_VER_NUMBER   1
33
#define VGROUP_RESERVE_SIZE 64
34

35
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup);
36
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup);
37
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew);
38
static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw);
39

40
static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
41
static void    mndCancelGetNextVgroup(SMnode *pMnode, void *pIter);
42
static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
43
static void    mndCancelGetNextVnode(SMnode *pMnode, void *pIter);
44

45
static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq);
46
static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq);
47
static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq);
48
static int32_t mndProcessVgroupBalanceLeaderMsg(SRpcMsg *pReq);
49

50
int32_t mndInitVgroup(SMnode *pMnode) {
2,007✔
51
  SSdbTable table = {
2,007✔
52
      .sdbType = SDB_VGROUP,
53
      .keyType = SDB_KEY_INT32,
54
      .encodeFp = (SdbEncodeFp)mndVgroupActionEncode,
55
      .decodeFp = (SdbDecodeFp)mndVgroupActionDecode,
56
      .insertFp = (SdbInsertFp)mndVgroupActionInsert,
57
      .updateFp = (SdbUpdateFp)mndVgroupActionUpdate,
58
      .deleteFp = (SdbDeleteFp)mndVgroupActionDelete,
59
      .validateFp = (SdbValidateFp)mndNewVgActionValidate,
60
  };
61

62
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndTransProcessRsp);
2,007✔
63
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_REPLICA_RSP, mndTransProcessRsp);
2,007✔
64
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIG_RSP, mndTransProcessRsp);
2,007✔
65
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIRM_RSP, mndTransProcessRsp);
2,007✔
66
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_HASHRANGE_RSP, mndTransProcessRsp);
2,007✔
67
  mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndTransProcessRsp);
2,007✔
68
  mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_RSP, mndTransProcessRsp);
2,007✔
69
  mndSetMsgHandle(pMnode, TDMT_VND_DISABLE_WRITE_RSP, mndTransProcessRsp);
2,007✔
70
  mndSetMsgHandle(pMnode, TDMT_SYNC_FORCE_FOLLOWER_RSP, mndTransProcessRsp);
2,007✔
71
  mndSetMsgHandle(pMnode, TDMT_DND_ALTER_VNODE_TYPE_RSP, mndTransProcessRsp);
2,007✔
72
  mndSetMsgHandle(pMnode, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP_RSP, mndTransProcessRsp);
2,007✔
73
  mndSetMsgHandle(pMnode, TDMT_SYNC_CONFIG_CHANGE_RSP, mndTransProcessRsp);
2,007✔
74

75
  mndSetMsgHandle(pMnode, TDMT_MND_REDISTRIBUTE_VGROUP, mndProcessRedistributeVgroupMsg);
2,007✔
76
  mndSetMsgHandle(pMnode, TDMT_MND_SPLIT_VGROUP, mndProcessSplitVgroupMsg);
2,007✔
77
  // mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessVgroupBalanceLeaderMsg);
78
  mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessBalanceVgroupMsg);
2,007✔
79
  mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP_LEADER, mndProcessVgroupBalanceLeaderMsg);
2,007✔
80

81
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndRetrieveVgroups);
2,007✔
82
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndCancelGetNextVgroup);
2,007✔
83
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndRetrieveVnodes);
2,007✔
84
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndCancelGetNextVnode);
2,007✔
85

86
  return sdbSetTable(pMnode->pSdb, table);
2,007✔
87
}
88

89
void mndCleanupVgroup(SMnode *pMnode) {}
2,006✔
90

91
SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) {
45,947✔
92
  int32_t code = 0;
45,947✔
93
  int32_t lino = 0;
45,947✔
94
  terrno = TSDB_CODE_OUT_OF_MEMORY;
45,947✔
95

96
  SSdbRaw *pRaw = sdbAllocRaw(SDB_VGROUP, VGROUP_VER_NUMBER, sizeof(SVgObj) + VGROUP_RESERVE_SIZE);
45,947✔
97
  if (pRaw == NULL) goto _OVER;
45,947!
98

99
  int32_t dataPos = 0;
45,947✔
100
  SDB_SET_INT32(pRaw, dataPos, pVgroup->vgId, _OVER)
45,947!
101
  SDB_SET_INT64(pRaw, dataPos, pVgroup->createdTime, _OVER)
45,947!
102
  SDB_SET_INT64(pRaw, dataPos, pVgroup->updateTime, _OVER)
45,947!
103
  SDB_SET_INT32(pRaw, dataPos, pVgroup->version, _OVER)
45,947!
104
  SDB_SET_INT32(pRaw, dataPos, pVgroup->hashBegin, _OVER)
45,947!
105
  SDB_SET_INT32(pRaw, dataPos, pVgroup->hashEnd, _OVER)
45,947!
106
  SDB_SET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, _OVER)
45,947!
107
  SDB_SET_INT64(pRaw, dataPos, pVgroup->dbUid, _OVER)
45,947!
108
  SDB_SET_INT8(pRaw, dataPos, pVgroup->isTsma, _OVER)
45,947!
109
  SDB_SET_INT8(pRaw, dataPos, pVgroup->replica, _OVER)
45,947!
110
  for (int8_t i = 0; i < pVgroup->replica; ++i) {
97,432✔
111
    SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
51,485✔
112
    SDB_SET_INT32(pRaw, dataPos, pVgid->dnodeId, _OVER)
51,485!
113
  }
114
  SDB_SET_INT32(pRaw, dataPos, pVgroup->syncConfChangeVer, _OVER)
45,947!
115
  SDB_SET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER)
45,947!
116
  SDB_SET_DATALEN(pRaw, dataPos, _OVER)
45,947!
117

118
  terrno = 0;
45,947✔
119

120
_OVER:
45,947✔
121
  if (terrno != 0) {
45,947!
122
    mError("vgId:%d, failed to encode to raw:%p since %s", pVgroup->vgId, pRaw, terrstr());
×
123
    sdbFreeRaw(pRaw);
×
124
    return NULL;
×
125
  }
126

127
  mTrace("vgId:%d, encode to raw:%p, row:%p", pVgroup->vgId, pRaw, pVgroup);
45,947✔
128
  return pRaw;
45,947✔
129
}
130

131
SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
39,579✔
132
  int32_t code = 0;
39,579✔
133
  int32_t lino = 0;
39,579✔
134
  terrno = TSDB_CODE_OUT_OF_MEMORY;
39,579✔
135
  SSdbRow *pRow = NULL;
39,579✔
136
  SVgObj  *pVgroup = NULL;
39,579✔
137

138
  int8_t sver = 0;
39,579✔
139
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
39,579!
140

141
  if (sver < 1 || sver > VGROUP_VER_NUMBER) {
39,579!
142
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
143
    goto _OVER;
×
144
  }
145

146
  pRow = sdbAllocRow(sizeof(SVgObj));
39,579✔
147
  if (pRow == NULL) goto _OVER;
39,579!
148

149
  pVgroup = sdbGetRowObj(pRow);
39,579✔
150
  if (pVgroup == NULL) goto _OVER;
39,579!
151

152
  int32_t dataPos = 0;
39,579✔
153
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->vgId, _OVER)
39,579!
154
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->createdTime, _OVER)
39,579!
155
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->updateTime, _OVER)
39,579!
156
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->version, _OVER)
39,579!
157
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashBegin, _OVER)
39,579!
158
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashEnd, _OVER)
39,579!
159
  SDB_GET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, _OVER)
39,579!
160
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->dbUid, _OVER)
39,579!
161
  SDB_GET_INT8(pRaw, dataPos, &pVgroup->isTsma, _OVER)
39,579!
162
  SDB_GET_INT8(pRaw, dataPos, &pVgroup->replica, _OVER)
39,579!
163
  for (int8_t i = 0; i < pVgroup->replica; ++i) {
85,417✔
164
    SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
45,838✔
165
    SDB_GET_INT32(pRaw, dataPos, &pVgid->dnodeId, _OVER)
45,838!
166
    if (pVgroup->replica == 1) {
45,838✔
167
      pVgid->syncState = TAOS_SYNC_STATE_LEADER;
36,390✔
168
    }
169
  }
170
  if (dataPos + sizeof(int32_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen) {
39,579!
171
    SDB_GET_INT32(pRaw, dataPos, &pVgroup->syncConfChangeVer, _OVER)
39,579!
172
  }
173

174
  SDB_GET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER)
39,579!
175

176
  terrno = 0;
39,579✔
177

178
_OVER:
39,579✔
179
  if (terrno != 0) {
39,579!
180
    mError("vgId:%d, failed to decode from raw:%p since %s", pVgroup == NULL ? 0 : pVgroup->vgId, pRaw, terrstr());
×
181
    taosMemoryFreeClear(pRow);
×
182
    return NULL;
×
183
  }
184

185
  mTrace("vgId:%d, decode from raw:%p, row:%p", pVgroup->vgId, pRaw, pVgroup);
39,579✔
186
  return pRow;
39,579✔
187
}
188

189
static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw) {
10,913✔
190
  SSdb    *pSdb = pMnode->pSdb;
10,913✔
191
  SSdbRow *pRow = NULL;
10,913✔
192
  SVgObj  *pVgroup = NULL;
10,913✔
193
  int      code = -1;
10,913✔
194

195
  pRow = mndVgroupActionDecode(pRaw);
10,913✔
196
  if (pRow == NULL) {
10,913!
197
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
198
    if (terrno != 0) code = terrno;
×
199
    goto _OVER;
×
200
  }
201
  pVgroup = sdbGetRowObj(pRow);
10,913✔
202
  if (pVgroup == NULL) {
10,913!
203
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
204
    if (terrno != 0) code = terrno;
×
205
    goto _OVER;
×
206
  }
207

208
  int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
10,913✔
209
  if (maxVgId > pVgroup->vgId) {
10,913!
210
    mError("trans:%d, vgroup id %d already in use. maxVgId:%d", pTrans->id, pVgroup->vgId, maxVgId);
×
211
    goto _OVER;
×
212
  }
213

214
  code = 0;
10,913✔
215
_OVER:
10,913✔
216
  if (pVgroup) mndVgroupActionDelete(pSdb, pVgroup);
10,913!
217
  taosMemoryFreeClear(pRow);
10,913!
218
  TAOS_RETURN(code);
10,913✔
219
}
220

221
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup) {
11,992✔
222
  mTrace("vgId:%d, perform insert action, row:%p", pVgroup->vgId, pVgroup);
11,992✔
223
  return 0;
11,992✔
224
}
225

226
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup) {
39,573✔
227
  mTrace("vgId:%d, perform delete action, row:%p", pVgroup->vgId, pVgroup);
39,573✔
228
  return 0;
39,573✔
229
}
230

231
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew) {
11,634✔
232
  mTrace("vgId:%d, perform update action, old row:%p new row:%p", pOld->vgId, pOld, pNew);
11,634✔
233
  pOld->updateTime = pNew->updateTime;
11,634✔
234
  pOld->version = pNew->version;
11,634✔
235
  pOld->hashBegin = pNew->hashBegin;
11,634✔
236
  pOld->hashEnd = pNew->hashEnd;
11,634✔
237
  pOld->replica = pNew->replica;
11,634✔
238
  pOld->isTsma = pNew->isTsma;
11,634✔
239
  for (int32_t i = 0; i < pNew->replica; ++i) {
25,737✔
240
    SVnodeGid *pNewGid = &pNew->vnodeGid[i];
14,103✔
241
    for (int32_t j = 0; j < pOld->replica; ++j) {
35,692✔
242
      SVnodeGid *pOldGid = &pOld->vnodeGid[j];
21,589✔
243
      if (pNewGid->dnodeId == pOldGid->dnodeId) {
21,589✔
244
        pNewGid->syncState = pOldGid->syncState;
13,616✔
245
        pNewGid->syncRestore = pOldGid->syncRestore;
13,616✔
246
        pNewGid->syncCanRead = pOldGid->syncCanRead;
13,616✔
247
      }
248
    }
249
  }
250
  pNew->numOfTables = pOld->numOfTables;
11,634✔
251
  pNew->numOfTimeSeries = pOld->numOfTimeSeries;
11,634✔
252
  pNew->totalStorage = pOld->totalStorage;
11,634✔
253
  pNew->compStorage = pOld->compStorage;
11,634✔
254
  pNew->pointsWritten = pOld->pointsWritten;
11,634✔
255
  pNew->compact = pOld->compact;
11,634✔
256
  memcpy(pOld->vnodeGid, pNew->vnodeGid, (TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA) * sizeof(SVnodeGid));
11,634✔
257
  pOld->syncConfChangeVer = pNew->syncConfChangeVer;
11,634✔
258
  return 0;
11,634✔
259
}
260

261
SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId) {
1,014,050✔
262
  SSdb   *pSdb = pMnode->pSdb;
1,014,050✔
263
  SVgObj *pVgroup = sdbAcquire(pSdb, SDB_VGROUP, &vgId);
1,014,050✔
264
  if (pVgroup == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
1,014,050✔
265
    terrno = TSDB_CODE_MND_VGROUP_NOT_EXIST;
6,410✔
266
  }
267
  return pVgroup;
1,014,050✔
268
}
269

270
void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) {
1,009,635✔
271
  SSdb *pSdb = pMnode->pSdb;
1,009,635✔
272
  sdbRelease(pSdb, pVgroup);
1,009,635✔
273
}
1,009,636✔
274

275
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
11,271✔
276
  SCreateVnodeReq createReq = {0};
11,271✔
277
  createReq.vgId = pVgroup->vgId;
11,271✔
278
  memcpy(createReq.db, pDb->name, TSDB_DB_FNAME_LEN);
11,271✔
279
  createReq.dbUid = pDb->uid;
11,271✔
280
  createReq.vgVersion = pVgroup->version;
11,271✔
281
  createReq.numOfStables = pDb->cfg.numOfStables;
11,271✔
282
  createReq.buffer = pDb->cfg.buffer;
11,271✔
283
  createReq.pageSize = pDb->cfg.pageSize;
11,271✔
284
  createReq.pages = pDb->cfg.pages;
11,271✔
285
  createReq.cacheLastSize = pDb->cfg.cacheLastSize;
11,271✔
286
  createReq.daysPerFile = pDb->cfg.daysPerFile;
11,271✔
287
  createReq.daysToKeep0 = pDb->cfg.daysToKeep0;
11,271✔
288
  createReq.daysToKeep1 = pDb->cfg.daysToKeep1;
11,271✔
289
  createReq.daysToKeep2 = pDb->cfg.daysToKeep2;
11,271✔
290
  createReq.keepTimeOffset = pDb->cfg.keepTimeOffset;
11,271✔
291
  createReq.s3ChunkSize = pDb->cfg.s3ChunkSize;
11,271✔
292
  createReq.s3KeepLocal = pDb->cfg.s3KeepLocal;
11,271✔
293
  createReq.s3Compact = pDb->cfg.s3Compact;
11,271✔
294
  createReq.minRows = pDb->cfg.minRows;
11,271✔
295
  createReq.maxRows = pDb->cfg.maxRows;
11,271✔
296
  createReq.walFsyncPeriod = pDb->cfg.walFsyncPeriod;
11,271✔
297
  createReq.walLevel = pDb->cfg.walLevel;
11,271✔
298
  createReq.precision = pDb->cfg.precision;
11,271✔
299
  createReq.compression = pDb->cfg.compression;
11,271✔
300
  createReq.strict = pDb->cfg.strict;
11,271✔
301
  createReq.cacheLast = pDb->cfg.cacheLast;
11,271✔
302
  createReq.replica = 0;
11,271✔
303
  createReq.learnerReplica = 0;
11,271✔
304
  createReq.selfIndex = -1;
11,271✔
305
  createReq.learnerSelfIndex = -1;
11,271✔
306
  createReq.hashBegin = pVgroup->hashBegin;
11,271✔
307
  createReq.hashEnd = pVgroup->hashEnd;
11,271✔
308
  createReq.hashMethod = pDb->cfg.hashMethod;
11,271✔
309
  createReq.numOfRetensions = pDb->cfg.numOfRetensions;
11,271✔
310
  createReq.pRetensions = pDb->cfg.pRetensions;
11,271✔
311
  createReq.isTsma = pVgroup->isTsma;
11,271✔
312
  createReq.pTsma = pVgroup->pTsma;
11,271✔
313
  createReq.walRetentionPeriod = pDb->cfg.walRetentionPeriod;
11,271✔
314
  createReq.walRetentionSize = pDb->cfg.walRetentionSize;
11,271✔
315
  createReq.walRollPeriod = pDb->cfg.walRollPeriod;
11,271✔
316
  createReq.walSegmentSize = pDb->cfg.walSegmentSize;
11,271✔
317
  createReq.sstTrigger = pDb->cfg.sstTrigger;
11,271✔
318
  createReq.hashPrefix = pDb->cfg.hashPrefix;
11,271✔
319
  createReq.hashSuffix = pDb->cfg.hashSuffix;
11,271✔
320
  createReq.tsdbPageSize = pDb->cfg.tsdbPageSize;
11,271✔
321
  createReq.changeVersion = ++(pVgroup->syncConfChangeVer);
11,271✔
322
  createReq.encryptAlgorithm = pDb->cfg.encryptAlgorithm;
11,271✔
323
  int32_t code = 0;
11,271✔
324

325
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
25,565✔
326
    SReplica *pReplica = NULL;
14,294✔
327

328
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
14,294✔
329
      pReplica = &createReq.replicas[createReq.replica];
14,057✔
330
    } else {
331
      pReplica = &createReq.learnerReplicas[createReq.learnerReplica];
237✔
332
    }
333

334
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
14,294✔
335
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
14,294✔
336
    if (pVgidDnode == NULL) {
14,294!
337
      return NULL;
×
338
    }
339

340
    pReplica->id = pVgidDnode->id;
14,294✔
341
    pReplica->port = pVgidDnode->port;
14,294✔
342
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
14,294✔
343
    mndReleaseDnode(pMnode, pVgidDnode);
14,294✔
344

345
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
14,294✔
346
      if (pDnode->id == pVgid->dnodeId) {
14,057✔
347
        createReq.selfIndex = createReq.replica;
11,034✔
348
      }
349
    } else {
350
      if (pDnode->id == pVgid->dnodeId) {
237!
351
        createReq.learnerSelfIndex = createReq.learnerReplica;
237✔
352
      }
353
    }
354

355
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
14,294✔
356
      createReq.replica++;
14,057✔
357
    } else {
358
      createReq.learnerReplica++;
237✔
359
    }
360
  }
361

362
  if (createReq.selfIndex == -1 && createReq.learnerSelfIndex == -1) {
11,271!
363
    terrno = TSDB_CODE_APP_ERROR;
×
364
    return NULL;
×
365
  }
366

367
  createReq.changeVersion = pVgroup->syncConfChangeVer;
11,271✔
368

369
  mInfo(
11,271!
370
      "vgId:%d, build create vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d "
371
      "changeVersion:%d",
372
      createReq.vgId, createReq.replica, createReq.selfIndex, createReq.learnerReplica, createReq.learnerSelfIndex,
373
      createReq.strict, createReq.changeVersion);
374
  for (int32_t i = 0; i < createReq.replica; ++i) {
25,328✔
375
    mInfo("vgId:%d, replica:%d ep:%s:%u", createReq.vgId, i, createReq.replicas[i].fqdn, createReq.replicas[i].port);
14,057!
376
  }
377
  for (int32_t i = 0; i < createReq.learnerReplica; ++i) {
11,508✔
378
    mInfo("vgId:%d, replica:%d ep:%s:%u", createReq.vgId, i, createReq.learnerReplicas[i].fqdn,
237!
379
          createReq.learnerReplicas[i].port);
380
  }
381

382
  int32_t contLen = tSerializeSCreateVnodeReq(NULL, 0, &createReq);
11,271✔
383
  if (contLen < 0) {
11,271!
384
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
385
    return NULL;
×
386
  }
387

388
  void *pReq = taosMemoryMalloc(contLen);
11,271✔
389
  if (pReq == NULL) {
11,271!
390
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
391
    return NULL;
×
392
  }
393

394
  code = tSerializeSCreateVnodeReq(pReq, contLen, &createReq);
11,271✔
395
  if (code < 0) {
11,271!
396
    terrno = TSDB_CODE_APP_ERROR;
×
397
    taosMemoryFree(pReq);
×
398
    mError("vgId:%d, failed to serialize create vnode req,since %s", createReq.vgId, terrstr());
×
399
    return NULL;
×
400
  }
401
  *pContLen = contLen;
11,271✔
402
  return pReq;
11,271✔
403
}
404

405
static void *mndBuildAlterVnodeConfigReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
696✔
406
  SAlterVnodeConfigReq alterReq = {0};
696✔
407
  alterReq.vgVersion = pVgroup->version;
696✔
408
  alterReq.buffer = pDb->cfg.buffer;
696✔
409
  alterReq.pageSize = pDb->cfg.pageSize;
696✔
410
  alterReq.pages = pDb->cfg.pages;
696✔
411
  alterReq.cacheLastSize = pDb->cfg.cacheLastSize;
696✔
412
  alterReq.daysPerFile = pDb->cfg.daysPerFile;
696✔
413
  alterReq.daysToKeep0 = pDb->cfg.daysToKeep0;
696✔
414
  alterReq.daysToKeep1 = pDb->cfg.daysToKeep1;
696✔
415
  alterReq.daysToKeep2 = pDb->cfg.daysToKeep2;
696✔
416
  alterReq.keepTimeOffset = pDb->cfg.keepTimeOffset;
696✔
417
  alterReq.walFsyncPeriod = pDb->cfg.walFsyncPeriod;
696✔
418
  alterReq.walLevel = pDb->cfg.walLevel;
696✔
419
  alterReq.strict = pDb->cfg.strict;
696✔
420
  alterReq.cacheLast = pDb->cfg.cacheLast;
696✔
421
  alterReq.sttTrigger = pDb->cfg.sstTrigger;
696✔
422
  alterReq.minRows = pDb->cfg.minRows;
696✔
423
  alterReq.walRetentionPeriod = pDb->cfg.walRetentionPeriod;
696✔
424
  alterReq.walRetentionSize = pDb->cfg.walRetentionSize;
696✔
425
  alterReq.s3KeepLocal = pDb->cfg.s3KeepLocal;
696✔
426
  alterReq.s3Compact = pDb->cfg.s3Compact;
696✔
427

428
  mInfo("vgId:%d, build alter vnode config req", pVgroup->vgId);
696!
429
  int32_t contLen = tSerializeSAlterVnodeConfigReq(NULL, 0, &alterReq);
696✔
430
  if (contLen < 0) {
696!
431
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
432
    return NULL;
×
433
  }
434
  contLen += sizeof(SMsgHead);
696✔
435

436
  void *pReq = taosMemoryMalloc(contLen);
696✔
437
  if (pReq == NULL) {
696!
438
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
439
    return NULL;
×
440
  }
441

442
  SMsgHead *pHead = pReq;
696✔
443
  pHead->contLen = htonl(contLen);
696✔
444
  pHead->vgId = htonl(pVgroup->vgId);
696✔
445

446
  if (tSerializeSAlterVnodeConfigReq((char *)pReq + sizeof(SMsgHead), contLen, &alterReq) < 0) {
696!
447
    taosMemoryFree(pReq);
×
448
    mError("vgId:%d, failed to serialize alter vnode config req,since %s", pVgroup->vgId, terrstr());
×
449
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
450
    return NULL;
×
451
  }
452
  *pContLen = contLen;
696✔
453
  return pReq;
696✔
454
}
455

456
static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId,
1,575✔
457
                                          int32_t *pContLen) {
458
  SAlterVnodeReplicaReq alterReq = {
1,575✔
459
      .vgId = pVgroup->vgId,
1,575✔
460
      .strict = pDb->cfg.strict,
1,575✔
461
      .replica = 0,
462
      .learnerReplica = 0,
463
      .selfIndex = -1,
464
      .learnerSelfIndex = -1,
465
      .changeVersion = ++(pVgroup->syncConfChangeVer),
1,575✔
466
  };
467

468
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
6,169✔
469
    SReplica *pReplica = NULL;
4,594✔
470

471
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
4,594✔
472
      pReplica = &alterReq.replicas[alterReq.replica];
4,223✔
473
      alterReq.replica++;
4,223✔
474
    } else {
475
      pReplica = &alterReq.learnerReplicas[alterReq.learnerReplica];
371✔
476
      alterReq.learnerReplica++;
371✔
477
    }
478

479
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
4,594✔
480
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
4,594✔
481
    if (pVgidDnode == NULL) return NULL;
4,594!
482

483
    pReplica->id = pVgidDnode->id;
4,594✔
484
    pReplica->port = pVgidDnode->port;
4,594✔
485
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
4,594✔
486
    mndReleaseDnode(pMnode, pVgidDnode);
4,594✔
487

488
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
4,594✔
489
      if (dnodeId == pVgid->dnodeId) {
4,223✔
490
        alterReq.selfIndex = v;
1,575✔
491
      }
492
    } else {
493
      if (dnodeId == pVgid->dnodeId) {
371!
494
        alterReq.learnerSelfIndex = v;
×
495
      }
496
    }
497
  }
498

499
  mInfo(
1,575!
500
      "vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d "
501
      "changeVersion:%d",
502
      alterReq.vgId, alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica, alterReq.learnerSelfIndex,
503
      alterReq.strict, alterReq.changeVersion);
504
  for (int32_t i = 0; i < alterReq.replica; ++i) {
5,798✔
505
    mInfo("vgId:%d, replica:%d ep:%s:%u", alterReq.vgId, i, alterReq.replicas[i].fqdn, alterReq.replicas[i].port);
4,223!
506
  }
507
  for (int32_t i = 0; i < alterReq.learnerReplica; ++i) {
1,946✔
508
    mInfo("vgId:%d, learnerReplica:%d ep:%s:%u", alterReq.vgId, i, alterReq.learnerReplicas[i].fqdn,
371!
509
          alterReq.learnerReplicas[i].port);
510
  }
511

512
  if (alterReq.selfIndex == -1 && alterReq.learnerSelfIndex == -1) {
1,575!
513
    terrno = TSDB_CODE_APP_ERROR;
×
514
    return NULL;
×
515
  }
516

517
  int32_t contLen = tSerializeSAlterVnodeReplicaReq(NULL, 0, &alterReq);
1,575✔
518
  if (contLen < 0) {
1,575!
519
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
520
    return NULL;
×
521
  }
522

523
  void *pReq = taosMemoryMalloc(contLen);
1,575✔
524
  if (pReq == NULL) {
1,575!
525
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
526
    return NULL;
×
527
  }
528

529
  if (tSerializeSAlterVnodeReplicaReq(pReq, contLen, &alterReq) < 0) {
1,575!
530
    mError("vgId:%d, failed to serialize alter vnode req,since %s", alterReq.vgId, terrstr());
×
531
    taosMemoryFree(pReq);
×
532
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
533
    return NULL;
×
534
  }
535
  *pContLen = contLen;
1,575✔
536
  return pReq;
1,575✔
537
}
538

539
static void *mndBuildCheckLearnCatchupReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId,
×
540
                                          int32_t *pContLen) {
541
  SCheckLearnCatchupReq req = {
×
542
      .vgId = pVgroup->vgId,
×
543
      .strict = pDb->cfg.strict,
×
544
      .replica = 0,
545
      .learnerReplica = 0,
546
      .selfIndex = -1,
547
      .learnerSelfIndex = -1,
548
  };
549

550
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
×
551
    SReplica *pReplica = NULL;
×
552

553
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
554
      pReplica = &req.replicas[req.replica];
×
555
      req.replica++;
×
556
    } else {
557
      pReplica = &req.learnerReplicas[req.learnerReplica];
×
558
      req.learnerReplica++;
×
559
    }
560

561
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
×
562
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
×
563
    if (pVgidDnode == NULL) return NULL;
×
564

565
    pReplica->id = pVgidDnode->id;
×
566
    pReplica->port = pVgidDnode->port;
×
567
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
×
568
    mndReleaseDnode(pMnode, pVgidDnode);
×
569

570
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
571
      if (dnodeId == pVgid->dnodeId) {
×
572
        req.selfIndex = v;
×
573
      }
574
    } else {
575
      if (dnodeId == pVgid->dnodeId) {
×
576
        req.learnerSelfIndex = v;
×
577
      }
578
    }
579
  }
580

581
  mInfo("vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d",
×
582
        req.vgId, req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict);
583
  for (int32_t i = 0; i < req.replica; ++i) {
×
584
    mInfo("vgId:%d, replica:%d ep:%s:%u", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port);
×
585
  }
586
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
×
587
    mInfo("vgId:%d, learnerReplica:%d ep:%s:%u", req.vgId, i, req.learnerReplicas[i].fqdn, req.learnerReplicas[i].port);
×
588
  }
589

590
  if (req.selfIndex == -1 && req.learnerSelfIndex == -1) {
×
591
    terrno = TSDB_CODE_APP_ERROR;
×
592
    return NULL;
×
593
  }
594

595
  int32_t contLen = tSerializeSAlterVnodeReplicaReq(NULL, 0, &req);
×
596
  if (contLen < 0) {
×
597
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
598
    return NULL;
×
599
  }
600

601
  void *pReq = taosMemoryMalloc(contLen);
×
602
  if (pReq == NULL) {
×
603
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
604
    return NULL;
×
605
  }
606

607
  if (tSerializeSAlterVnodeReplicaReq(pReq, contLen, &req) < 0) {
×
608
    mError("vgId:%d, failed to serialize alter vnode req,since %s", req.vgId, terrstr());
×
609
    taosMemoryFree(pReq);
×
610
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
611
    return NULL;
×
612
  }
613
  *pContLen = contLen;
×
614
  return pReq;
×
615
}
616

617
static void *mndBuildDisableVnodeWriteReq(SMnode *pMnode, SDbObj *pDb, int32_t vgId, int32_t *pContLen) {
108✔
618
  SDisableVnodeWriteReq disableReq = {
108✔
619
      .vgId = vgId,
620
      .disable = 1,
621
  };
622

623
  mInfo("vgId:%d, build disable vnode write req", vgId);
108!
624
  int32_t contLen = tSerializeSDisableVnodeWriteReq(NULL, 0, &disableReq);
108✔
625
  if (contLen < 0) {
108!
626
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
627
    return NULL;
×
628
  }
629

630
  void *pReq = taosMemoryMalloc(contLen);
108✔
631
  if (pReq == NULL) {
108!
632
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
633
    return NULL;
×
634
  }
635

636
  if (tSerializeSDisableVnodeWriteReq(pReq, contLen, &disableReq) < 0) {
108!
637
    mError("vgId:%d, failed to serialize disable vnode write req,since %s", vgId, terrstr());
×
638
    taosMemoryFree(pReq);
×
639
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
640
    return NULL;
×
641
  }
642
  *pContLen = contLen;
108✔
643
  return pReq;
108✔
644
}
645

646
static void *mndBuildAlterVnodeHashRangeReq(SMnode *pMnode, int32_t srcVgId, SVgObj *pVgroup, int32_t *pContLen) {
108✔
647
  SAlterVnodeHashRangeReq alterReq = {
108✔
648
      .srcVgId = srcVgId,
649
      .dstVgId = pVgroup->vgId,
108✔
650
      .hashBegin = pVgroup->hashBegin,
108✔
651
      .hashEnd = pVgroup->hashEnd,
108✔
652
      .changeVersion = ++(pVgroup->syncConfChangeVer),
108✔
653
  };
654

655
  mInfo("vgId:%d, build alter vnode hashrange req, dstVgId:%d, hashrange:[%u, %u]", srcVgId, pVgroup->vgId,
108!
656
        pVgroup->hashBegin, pVgroup->hashEnd);
657
  int32_t contLen = tSerializeSAlterVnodeHashRangeReq(NULL, 0, &alterReq);
108✔
658
  if (contLen < 0) {
108!
659
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
660
    return NULL;
×
661
  }
662

663
  void *pReq = taosMemoryMalloc(contLen);
108✔
664
  if (pReq == NULL) {
108!
665
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
666
    return NULL;
×
667
  }
668

669
  if (tSerializeSAlterVnodeHashRangeReq(pReq, contLen, &alterReq) < 0) {
108!
670
    mError("vgId:%d, failed to serialize alter vnode hashrange req,since %s", srcVgId, terrstr());
×
671
    taosMemoryFree(pReq);
×
672
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
673
    return NULL;
×
674
  }
675
  *pContLen = contLen;
108✔
676
  return pReq;
108✔
677
}
678

679
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
16,107✔
680
  SDropVnodeReq dropReq = {0};
16,107✔
681
  dropReq.dnodeId = pDnode->id;
16,107✔
682
  dropReq.vgId = pVgroup->vgId;
16,107✔
683
  memcpy(dropReq.db, pDb->name, TSDB_DB_FNAME_LEN);
16,107✔
684
  dropReq.dbUid = pDb->uid;
16,107✔
685

686
  mInfo("vgId:%d, build drop vnode req", dropReq.vgId);
16,107!
687
  int32_t contLen = tSerializeSDropVnodeReq(NULL, 0, &dropReq);
16,107✔
688
  if (contLen < 0) {
16,107!
689
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
690
    return NULL;
×
691
  }
692

693
  void *pReq = taosMemoryMalloc(contLen);
16,107✔
694
  if (pReq == NULL) {
16,107!
695
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
696
    return NULL;
×
697
  }
698

699
  if (tSerializeSDropVnodeReq(pReq, contLen, &dropReq) < 0) {
16,107!
700
    mError("vgId:%d, failed to serialize drop vnode req,since %s", dropReq.vgId, terrstr());
×
701
    taosMemoryFree(pReq);
×
702
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
703
    return NULL;
×
704
  }
705
  *pContLen = contLen;
16,107✔
706
  return pReq;
16,107✔
707
}
708

709
static bool mndResetDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
6,968✔
710
  SDnodeObj *pDnode = pObj;
6,968✔
711
  pDnode->numOfVnodes = 0;
6,968✔
712
  pDnode->numOfOtherNodes = 0;
6,968✔
713
  return true;
6,968✔
714
}
715

716
static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
6,968✔
717
  SDnodeObj *pDnode = pObj;
6,968✔
718
  SArray    *pArray = p1;
6,968✔
719
  int32_t    exceptDnodeId = *(int32_t *)p2;
6,968✔
720
  SArray    *dnodeList = p3;
6,968✔
721

722
  if (exceptDnodeId == pDnode->id) {
6,968✔
723
    return true;
16✔
724
  }
725

726
  if (dnodeList != NULL) {
6,952✔
727
    int32_t dnodeListSize = taosArrayGetSize(dnodeList);
6,284✔
728
    if (dnodeListSize > 0) {
6,284✔
729
      bool inDnodeList = false;
95✔
730
      for (int32_t index = 0; index < dnodeListSize; ++index) {
310✔
731
        int32_t dnodeId = *(int32_t *)taosArrayGet(dnodeList, index);
215✔
732
        if (pDnode->id == dnodeId) {
215✔
733
          inDnodeList = true;
43✔
734
        }
735
      }
736
      if (!inDnodeList) {
95✔
737
        return true;
52✔
738
      }
739
    }
740
  }
741

742
  int64_t curMs = taosGetTimestampMs();
6,900✔
743
  bool    online = mndIsDnodeOnline(pDnode, curMs);
6,900✔
744
  bool    isMnode = mndIsMnode(pMnode, pDnode->id);
6,900✔
745
  pDnode->numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
6,900✔
746
  pDnode->memUsed = mndGetVnodesMemory(pMnode, pDnode->id);
6,900✔
747

748
  mInfo("dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d online:%d memory avail:%" PRId64 " used:%" PRId64, pDnode->id,
6,900!
749
        pDnode->numOfVnodes, pDnode->numOfSupportVnodes, isMnode, online, pDnode->memAvail, pDnode->memUsed);
750

751
  if (isMnode) {
6,900✔
752
    pDnode->numOfOtherNodes++;
5,731✔
753
  }
754

755
  if (online && pDnode->numOfSupportVnodes > 0) {
6,900✔
756
    if (taosArrayPush(pArray, pDnode) == NULL) return false;
6,305!
757
  }
758
  return true;
6,900✔
759
}
760

761
SArray *mndBuildDnodesArray(SMnode *pMnode, int32_t exceptDnodeId, SArray *dnodeList) {
5,472✔
762
  SSdb   *pSdb = pMnode->pSdb;
5,472✔
763
  int32_t numOfDnodes = mndGetDnodeSize(pMnode);
5,472✔
764

765
  SArray *pArray = taosArrayInit(numOfDnodes, sizeof(SDnodeObj));
5,472✔
766
  if (pArray == NULL) {
5,472!
767
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
768
    return NULL;
×
769
  }
770

771
  sdbTraverse(pSdb, SDB_DNODE, mndResetDnodesArrayFp, NULL, NULL, NULL);
5,472✔
772
  sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesArrayFp, pArray, &exceptDnodeId, dnodeList);
5,472✔
773

774
  mDebug("build %d dnodes array", (int32_t)taosArrayGetSize(pArray));
5,472✔
775
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
11,777✔
776
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
6,305✔
777
    mDebug("dnode:%d, vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes);
6,305✔
778
  }
779
  return pArray;
5,472✔
780
}
781

782
static int32_t mndCompareDnodeId(int32_t *dnode1Id, int32_t *dnode2Id) {
×
783
  if (*dnode1Id == *dnode2Id) {
×
784
    return 0;
×
785
  }
786
  return *dnode1Id > *dnode2Id ? 1 : -1;
×
787
}
788

789
static float mndGetDnodeScore(SDnodeObj *pDnode, int32_t additionDnodes, float ratio) {
20,934✔
790
  float totalDnodes = pDnode->numOfVnodes + (float)pDnode->numOfOtherNodes * ratio + additionDnodes;
20,934✔
791
  return totalDnodes / pDnode->numOfSupportVnodes;
20,934✔
792
}
793

794
static int32_t mndCompareDnodeVnodes(SDnodeObj *pDnode1, SDnodeObj *pDnode2) {
4,911✔
795
  float d1Score = mndGetDnodeScore(pDnode1, 0, 0.9);
4,911✔
796
  float d2Score = mndGetDnodeScore(pDnode2, 0, 0.9);
4,911✔
797
  if (d1Score == d2Score) {
4,911✔
798
    return 0;
1,781✔
799
  }
800
  return d1Score > d2Score ? 1 : -1;
3,130✔
801
}
802

803
void mndSortVnodeGid(SVgObj *pVgroup) {
10,228✔
804
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
21,474✔
805
    for (int32_t j = 0; j < pVgroup->replica - 1 - i; ++j) {
12,770✔
806
      if (pVgroup->vnodeGid[j].dnodeId > pVgroup->vnodeGid[j + 1].dnodeId) {
1,524✔
807
        TSWAP(pVgroup->vnodeGid[j], pVgroup->vnodeGid[j + 1]);
573✔
808
      }
809
    }
810
  }
811
}
10,228✔
812

813
static int32_t mndGetAvailableDnode(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, SArray *pArray) {
10,604✔
814
  mDebug("start to sort %d dnodes", (int32_t)taosArrayGetSize(pArray));
10,604✔
815
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
10,604✔
816
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
23,479✔
817
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
12,875✔
818
    mDebug("dnode:%d, score:%f", pDnode->id, mndGetDnodeScore(pDnode, 0, 0.9));
12,875✔
819
  }
820

821
  int32_t size = taosArrayGetSize(pArray);
10,604✔
822
  if (size < pVgroup->replica) {
10,604✔
823
    mError("db:%s, vgId:%d, no enough online dnodes:%d to alloc %d replica", pVgroup->dbName, pVgroup->vgId, size,
446!
824
           pVgroup->replica);
825
    TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_DNODES);
446✔
826
  }
827

828
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
21,202✔
829
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
11,044✔
830
    SDnodeObj *pDnode = taosArrayGet(pArray, v);
11,044✔
831
    if (pDnode == NULL) {
11,044!
832
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_DNODES);
×
833
    }
834
    if (pDnode->numOfVnodes >= pDnode->numOfSupportVnodes) {
11,044!
835
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_VNODES);
×
836
    }
837

838
    int64_t vgMem = mndGetVgroupMemory(pMnode, pDb, pVgroup);
11,044✔
839
    if (pDnode->memAvail - vgMem - pDnode->memUsed <= 0) {
11,044!
840
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d, avail:%" PRId64 " used:%" PRId64,
×
841
             pVgroup->dbName, pVgroup->vgId, vgMem, pDnode->id, pDnode->memAvail, pDnode->memUsed);
842
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE);
×
843
    } else {
844
      pDnode->memUsed += vgMem;
11,044✔
845
    }
846

847
    pVgid->dnodeId = pDnode->id;
11,044✔
848
    if (pVgroup->replica == 1) {
11,044✔
849
      pVgid->syncState = TAOS_SYNC_STATE_LEADER;
9,712✔
850
    } else {
851
      pVgid->syncState = TAOS_SYNC_STATE_FOLLOWER;
1,332✔
852
    }
853

854
    mInfo("db:%s, vgId:%d, vn:%d is alloced, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
11,044!
855
          pVgroup->dbName, pVgroup->vgId, v, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
856
    pDnode->numOfVnodes++;
11,044✔
857
  }
858

859
  mndSortVnodeGid(pVgroup);
10,158✔
860
  return 0;
10,158✔
861
}
862

863
int32_t mndAllocSmaVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup) {
31✔
864
  int32_t code = 0;
31✔
865
  SArray *pArray = mndBuildDnodesArray(pMnode, 0, NULL);
31✔
866
  if (pArray == NULL) {
31!
867
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
868
    if (terrno != 0) code = terrno;
×
869
    TAOS_RETURN(code);
×
870
  }
871

872
  pVgroup->vgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
31✔
873
  pVgroup->isTsma = 1;
31✔
874
  pVgroup->createdTime = taosGetTimestampMs();
31✔
875
  pVgroup->updateTime = pVgroup->createdTime;
31✔
876
  pVgroup->version = 1;
31✔
877
  memcpy(pVgroup->dbName, pDb->name, TSDB_DB_FNAME_LEN);
31✔
878
  pVgroup->dbUid = pDb->uid;
31✔
879
  pVgroup->replica = 1;
31✔
880

881
  if (mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray) != 0) return -1;
31!
882
  taosArrayDestroy(pArray);
31✔
883

884
  mInfo("db:%s, sma vgId:%d is alloced", pDb->name, pVgroup->vgId);
31!
885
  return 0;
31✔
886
}
887

888
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups, SArray *dnodeList) {
5,091✔
889
  int32_t code = -1;
5,091✔
890
  SArray *pArray = NULL;
5,091✔
891
  SVgObj *pVgroups = NULL;
5,091✔
892

893
  pVgroups = taosMemoryCalloc(pDb->cfg.numOfVgroups, sizeof(SVgObj));
5,091✔
894
  if (pVgroups == NULL) {
5,091!
895
    code = terrno;
×
896
    goto _OVER;
×
897
  }
898

899
  pArray = mndBuildDnodesArray(pMnode, 0, dnodeList);
5,091✔
900
  if (pArray == NULL) {
5,091!
901
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
902
    if (terrno != 0) code = terrno;
×
903
    goto _OVER;
×
904
  }
905

906
  mInfo("db:%s, total %d dnodes used to create %d vgroups (%d vnodes)", pDb->name, (int32_t)taosArrayGetSize(pArray),
5,091!
907
        pDb->cfg.numOfVgroups, pDb->cfg.numOfVgroups * pDb->cfg.replications);
908

909
  int32_t  allocedVgroups = 0;
5,091✔
910
  int32_t  maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
5,091✔
911
  uint32_t hashMin = 0;
5,091✔
912
  uint32_t hashMax = UINT32_MAX;
5,091✔
913
  uint32_t hashInterval = (hashMax - hashMin) / pDb->cfg.numOfVgroups;
5,091✔
914

915
  if (maxVgId < 2) maxVgId = 2;
5,091✔
916

917
  for (uint32_t v = 0; v < pDb->cfg.numOfVgroups; v++) {
15,218✔
918
    SVgObj *pVgroup = &pVgroups[v];
10,573✔
919
    pVgroup->vgId = maxVgId++;
10,573✔
920
    pVgroup->createdTime = taosGetTimestampMs();
10,573✔
921
    pVgroup->updateTime = pVgroups->createdTime;
10,573✔
922
    pVgroup->version = 1;
10,573✔
923
    pVgroup->hashBegin = hashMin + hashInterval * v;
10,573✔
924
    if (v == pDb->cfg.numOfVgroups - 1) {
10,573✔
925
      pVgroup->hashEnd = hashMax;
4,740✔
926
    } else {
927
      pVgroup->hashEnd = hashMin + hashInterval * (v + 1) - 1;
5,833✔
928
    }
929

930
    memcpy(pVgroup->dbName, pDb->name, TSDB_DB_FNAME_LEN);
10,573✔
931
    pVgroup->dbUid = pDb->uid;
10,573✔
932
    pVgroup->replica = pDb->cfg.replications;
10,573✔
933

934
    if ((code = mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray)) != 0) {
10,573✔
935
      goto _OVER;
446✔
936
    }
937

938
    allocedVgroups++;
10,127✔
939
  }
940

941
  *ppVgroups = pVgroups;
4,645✔
942
  code = 0;
4,645✔
943

944
  mInfo("db:%s, total %d vgroups is alloced, replica:%d", pDb->name, pDb->cfg.numOfVgroups, pDb->cfg.replications);
4,645!
945

946
_OVER:
×
947
  if (code != 0) taosMemoryFree(pVgroups);
5,091✔
948
  taosArrayDestroy(pArray);
5,091✔
949
  TAOS_RETURN(code);
5,091✔
950
}
951

952
SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup) {
450,670✔
953
  SEpSet epset = {0};
450,670✔
954

955
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
927,998✔
956
    const SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
477,328✔
957
    SDnodeObj       *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
477,328✔
958
    if (pDnode == NULL) continue;
477,328✔
959

960
    if (pVgid->syncState == TAOS_SYNC_STATE_LEADER || pVgid->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
477,185!
961
      epset.inUse = epset.numOfEps;
448,643✔
962
    }
963

964
    if (addEpIntoEpSet(&epset, pDnode->fqdn, pDnode->port) != 0) {
477,185!
965
      mWarn("vgId:%d, failed to add ep:%s:%d into epset", pVgroup->vgId, pDnode->fqdn, pDnode->port);
×
966
    }
967
    mndReleaseDnode(pMnode, pDnode);
477,185✔
968
  }
969
  epsetSort(&epset);
450,670✔
970

971
  return epset;
450,670✔
972
}
973

974
SEpSet mndGetVgroupEpsetById(SMnode *pMnode, int32_t vgId) {
×
975
  SEpSet epset = {0};
×
976

977
  SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
×
978
  if (!pVgroup) return epset;
×
979

980
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
×
981
    const SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
×
982
    SDnodeObj       *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
×
983
    if (pDnode == NULL) continue;
×
984

985
    if (pVgid->syncState == TAOS_SYNC_STATE_LEADER || pVgid->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
×
986
      epset.inUse = epset.numOfEps;
×
987
    }
988

989
    if (addEpIntoEpSet(&epset, pDnode->fqdn, pDnode->port) != 0) {
×
990
      mWarn("vgId:%d, failed to add ep:%s:%d into epset", pVgroup->vgId, pDnode->fqdn, pDnode->port);
×
991
    }
992
    mndReleaseDnode(pMnode, pDnode);
×
993
  }
994

995
  mndReleaseVgroup(pMnode, pVgroup);
×
996
  return epset;
×
997
}
998

999
static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
11,144✔
1000
  SMnode *pMnode = pReq->info.node;
11,144✔
1001
  SSdb   *pSdb = pMnode->pSdb;
11,144✔
1002
  int32_t numOfRows = 0;
11,144✔
1003
  SVgObj *pVgroup = NULL;
11,144✔
1004
  int32_t cols = 0;
11,144✔
1005
  int64_t curMs = taosGetTimestampMs();
11,148✔
1006
  int32_t code = 0;
11,148✔
1007

1008
  SDbObj *pDb = NULL;
11,148✔
1009
  if (strlen(pShow->db) > 0) {
11,148✔
1010
    pDb = mndAcquireDb(pMnode, pShow->db);
974✔
1011
    if (pDb == NULL) {
974!
1012
      return 0;
×
1013
    }
1014
  }
1015

1016
  while (numOfRows < rows) {
509,454✔
1017
    pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
509,427✔
1018
    if (pShow->pIter == NULL) break;
509,040✔
1019

1020
    if (pDb != NULL && pVgroup->dbUid != pDb->uid) {
497,882✔
1021
      sdbRelease(pSdb, pVgroup);
1,478✔
1022
      continue;
1,478✔
1023
    }
1024

1025
    cols = 0;
496,404✔
1026
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
496,404✔
1027
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->vgId, false);
495,636✔
1028
    if (code != 0) {
493,909!
1029
      mError("vgId:%d, failed to set vgId, since %s", pVgroup->vgId, tstrerror(code));
×
1030
      return code;
×
1031
    }
1032

1033
    SName name = {0};
493,909✔
1034
    char  db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
493,909✔
1035
    code = tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB);
493,909✔
1036
    if (code != 0) {
496,380!
1037
      mError("vgId:%d, failed to set dbName, since %s", pVgroup->vgId, tstrerror(code));
×
1038
      return code;
×
1039
    }
1040
    (void)tNameGetDbName(&name, varDataVal(db));
496,380✔
1041
    varDataSetLen(db, strlen(varDataVal(db)));
496,369✔
1042

1043
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
496,369✔
1044
    code = colDataSetVal(pColInfo, numOfRows, (const char *)db, false);
495,572✔
1045
    if (code != 0) {
492,092!
1046
      mError("vgId:%d, failed to set dbName, since %s", pVgroup->vgId, tstrerror(code));
×
1047
      return code;
×
1048
    }
1049

1050
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
492,092✔
1051
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->numOfTables, false);
491,258✔
1052
    if (code != 0) {
490,140!
1053
      mError("vgId:%d, failed to set numOfTables, since %s", pVgroup->vgId, tstrerror(code));
×
1054
      return code;
×
1055
    }
1056

1057
    // default 3 replica, add 1 replica if move vnode
1058
    for (int32_t i = 0; i < 4; ++i) {
2,436,270✔
1059
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,913,164✔
1060
      if (i < pVgroup->replica) {
1,905,262✔
1061
        int16_t dnodeId = (int16_t)pVgroup->vnodeGid[i].dnodeId;
449,640✔
1062
        code = colDataSetVal(pColInfo, numOfRows, (const char *)&dnodeId, false);
449,640✔
1063
        if (code != 0) {
492,077!
1064
          mError("vgId:%d, failed to set dnodeId, since %s", pVgroup->vgId, tstrerror(code));
×
1065
          return code;
×
1066
        }
1067

1068
        bool       exist = false;
492,077✔
1069
        bool       online = false;
492,077✔
1070
        SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgroup->vnodeGid[i].dnodeId);
492,077✔
1071
        if (pDnode != NULL) {
498,496!
1072
          exist = true;
498,504✔
1073
          online = mndIsDnodeOnline(pDnode, curMs);
498,504✔
1074
          mndReleaseDnode(pMnode, pDnode);
498,009✔
1075
        }
1076

1077
        char buf1[20] = {0};
499,516✔
1078
        char role[20] = "offline";
499,516✔
1079
        if (!exist) {
499,516!
1080
          strcpy(role, "dropping");
×
1081
        } else if (online) {
499,516✔
1082
          char *star = "";
499,390✔
1083
          if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEADER ||
499,390✔
1084
              pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
2,946!
1085
            if (!pVgroup->vnodeGid[i].syncRestore && !pVgroup->vnodeGid[i].syncCanRead) {
496,444!
1086
              star = "**";
679✔
1087
            } else if (!pVgroup->vnodeGid[i].syncRestore && pVgroup->vnodeGid[i].syncCanRead) {
495,765!
1088
              star = "*";
×
1089
            } else {
1090
            }
1091
          }
1092
          snprintf(role, sizeof(role), "%s%s", syncStr(pVgroup->vnodeGid[i].syncState), star);
499,390✔
1093
          /*
1094
          mInfo("db:%s, learner progress:%d", pDb->name, pVgroup->vnodeGid[i].learnerProgress);
1095

1096
          if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEARNER) {
1097
            if(pVgroup->vnodeGid[i].learnerProgress < 0){
1098
              snprintf(role, sizeof(role), "%s-",
1099
                syncStr(pVgroup->vnodeGid[i].syncState));
1100

1101
            }
1102
            else if(pVgroup->vnodeGid[i].learnerProgress >= 100){
1103
              snprintf(role, sizeof(role), "%s--",
1104
                syncStr(pVgroup->vnodeGid[i].syncState));
1105
            }
1106
            else{
1107
              snprintf(role, sizeof(role), "%s%d",
1108
                syncStr(pVgroup->vnodeGid[i].syncState), pVgroup->vnodeGid[i].learnerProgress);
1109
            }
1110
          }
1111
          else{
1112
            snprintf(role, sizeof(role), "%s%s", syncStr(pVgroup->vnodeGid[i].syncState), star);
1113
          }
1114
          */
1115
        } else {
1116
        }
1117
        STR_WITH_MAXSIZE_TO_VARSTR(buf1, role, pShow->pMeta->pSchemas[cols].bytes);
499,493✔
1118

1119
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
499,493✔
1120
        code = colDataSetVal(pColInfo, numOfRows, (const char *)buf1, false);
498,381✔
1121
        if (code != 0) {
495,487!
1122
          mError("vgId:%d, failed to set role, since %s", pVgroup->vgId, tstrerror(code));
×
1123
          return code;
×
1124
        }
1125
      } else {
1126
        colDataSetNULL(pColInfo, numOfRows);
1,455,622✔
1127
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,455,622✔
1128
        colDataSetNULL(pColInfo, numOfRows);
1,450,643!
1129
      }
1130
    }
1131

1132
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
523,106✔
1133
    int32_t cacheUsage = (int32_t)pVgroup->cacheUsage;
489,368✔
1134
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&cacheUsage, false);
489,368✔
1135
    if (code != 0) {
490,997!
1136
      mError("vgId:%d, failed to set cacheUsage, since %s", pVgroup->vgId, tstrerror(code));
×
1137
      return code;
×
1138
    }
1139

1140
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
490,997✔
1141
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->numOfCachedTables, false);
489,500✔
1142
    if (code != 0) {
488,412!
1143
      mError("vgId:%d, failed to set numOfCachedTables, since %s", pVgroup->vgId, tstrerror(code));
×
1144
      return code;
×
1145
    }
1146

1147
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
488,412✔
1148
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->isTsma, false);
487,605✔
1149
    if (code != 0) {
487,098!
1150
      mError("vgId:%d, failed to set isTsma, since %s", pVgroup->vgId, tstrerror(code));
×
1151
      return code;
×
1152
    }
1153
    numOfRows++;
487,098✔
1154
    sdbRelease(pSdb, pVgroup);
487,098✔
1155
  }
1156

1157
  if (pDb != NULL) {
11,185✔
1158
    mndReleaseDb(pMnode, pDb);
974✔
1159
  }
1160

1161
  pShow->numOfRows += numOfRows;
11,158✔
1162
  return numOfRows;
11,158✔
1163
}
1164

1165
static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter) {
×
1166
  SSdb *pSdb = pMnode->pSdb;
×
1167
  sdbCancelFetchByType(pSdb, pIter, SDB_VGROUP);
×
1168
}
×
1169

1170
static bool mndGetVnodesNumFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
336,035✔
1171
  SVgObj  *pVgroup = pObj;
336,035✔
1172
  int32_t  dnodeId = *(int32_t *)p1;
336,035✔
1173
  int32_t *pNumOfVnodes = (int32_t *)p2;
336,035✔
1174

1175
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
686,697✔
1176
    if (pVgroup->vnodeGid[v].dnodeId == dnodeId) {
350,662✔
1177
      (*pNumOfVnodes)++;
328,309✔
1178
    }
1179
  }
1180

1181
  return true;
336,035✔
1182
}
1183

1184
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) {
16,511✔
1185
  int32_t numOfVnodes = 0;
16,511✔
1186
  sdbTraverse(pMnode->pSdb, SDB_VGROUP, mndGetVnodesNumFp, &dnodeId, &numOfVnodes, NULL);
16,511✔
1187
  return numOfVnodes;
16,550✔
1188
}
1189

1190
int64_t mndGetVgroupMemory(SMnode *pMnode, SDbObj *pDbInput, SVgObj *pVgroup) {
38,535✔
1191
  SDbObj *pDb = pDbInput;
38,535✔
1192
  if (pDbInput == NULL) {
38,535✔
1193
    pDb = mndAcquireDb(pMnode, pVgroup->dbName);
24,451✔
1194
  }
1195

1196
  int64_t vgroupMemroy = 0;
38,535✔
1197
  if (pDb != NULL) {
38,535✔
1198
    vgroupMemroy = (int64_t)pDb->cfg.buffer * 1024 * 1024 + (int64_t)pDb->cfg.pages * pDb->cfg.pageSize * 1024;
38,529✔
1199
    if (pDb->cfg.cacheLast > 0) {
38,529✔
1200
      vgroupMemroy += (int64_t)pDb->cfg.cacheLastSize * 1024 * 1024;
10,081✔
1201
    }
1202
  }
1203

1204
  if (pDbInput == NULL) {
38,535✔
1205
    mndReleaseDb(pMnode, pDb);
24,451✔
1206
  }
1207
  return vgroupMemroy;
38,535✔
1208
}
1209

1210
static bool mndGetVnodeMemroyFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
28,134✔
1211
  SVgObj  *pVgroup = pObj;
28,134✔
1212
  int32_t  dnodeId = *(int32_t *)p1;
28,134✔
1213
  int64_t *pVnodeMemory = (int64_t *)p2;
28,134✔
1214

1215
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
64,734✔
1216
    if (pVgroup->vnodeGid[v].dnodeId == dnodeId) {
36,600✔
1217
      *pVnodeMemory += mndGetVgroupMemory(pMnode, NULL, pVgroup);
24,113✔
1218
    }
1219
  }
1220

1221
  return true;
28,134✔
1222
}
1223

1224
int64_t mndGetVnodesMemory(SMnode *pMnode, int32_t dnodeId) {
6,900✔
1225
  int64_t vnodeMemory = 0;
6,900✔
1226
  sdbTraverse(pMnode->pSdb, SDB_VGROUP, mndGetVnodeMemroyFp, &dnodeId, &vnodeMemory, NULL);
6,900✔
1227
  return vnodeMemory;
6,900✔
1228
}
1229

1230
static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
5,137✔
1231
  SMnode *pMnode = pReq->info.node;
5,137✔
1232
  SSdb   *pSdb = pMnode->pSdb;
5,137✔
1233
  int32_t numOfRows = 0;
5,137✔
1234
  SVgObj *pVgroup = NULL;
5,137✔
1235
  int32_t cols = 0;
5,137✔
1236
  int64_t curMs = taosGetTimestampMs();
5,138✔
1237
  int32_t code = 0;
5,138✔
1238

1239
  while (numOfRows < rows - TSDB_MAX_REPLICA) {
252,317!
1240
    pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
252,317✔
1241
    if (pShow->pIter == NULL) break;
252,128✔
1242

1243
    for (int32_t i = 0; i < pVgroup->replica && numOfRows < rows; ++i) {
494,211!
1244
      SVnodeGid       *pGid = &pVgroup->vnodeGid[i];
247,039✔
1245
      SColumnInfoData *pColInfo = NULL;
247,039✔
1246
      cols = 0;
247,039✔
1247

1248
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
247,039✔
1249
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pGid->dnodeId, false);
246,731✔
1250
      if (code != 0) {
246,098!
1251
        mError("vgId:%d, failed to set dnodeId, since %s", pVgroup->vgId, tstrerror(code));
×
1252
        return code;
×
1253
      }
1254
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
246,098✔
1255
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->vgId, false);
245,755✔
1256
      if (code != 0) {
245,412!
1257
        mError("vgId:%d, failed to set vgId, since %s", pVgroup->vgId, tstrerror(code));
×
1258
        return code;
×
1259
      }
1260

1261
      // db_name
1262
      const char *dbname = mndGetDbStr(pVgroup->dbName);
245,412✔
1263
      char        b1[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
245,599✔
1264
      if (dbname != NULL) {
245,599!
1265
        STR_WITH_MAXSIZE_TO_VARSTR(b1, dbname, TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE);
246,355✔
1266
      } else {
1267
        STR_WITH_MAXSIZE_TO_VARSTR(b1, "NULL", TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE);
×
1268
      }
1269
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
245,599✔
1270
      code = colDataSetVal(pColInfo, numOfRows, (const char *)b1, false);
246,011✔
1271
      if (code != 0) {
245,506!
1272
        mError("vgId:%d, failed to set dbName, since %s", pVgroup->vgId, tstrerror(code));
×
1273
        return code;
×
1274
      }
1275

1276
      // dnode is online?
1277
      SDnodeObj *pDnode = mndAcquireDnode(pMnode, pGid->dnodeId);
245,506✔
1278
      if (pDnode == NULL) {
246,778!
1279
        mError("failed to acquire dnode. dnodeId:%d", pGid->dnodeId);
×
1280
        break;
×
1281
      }
1282
      bool isDnodeOnline = mndIsDnodeOnline(pDnode, curMs);
246,778✔
1283

1284
      char       buf[20] = {0};
246,522✔
1285
      ESyncState syncState = (isDnodeOnline) ? pGid->syncState : TAOS_SYNC_STATE_OFFLINE;
246,522!
1286
      STR_TO_VARSTR(buf, syncStr(syncState));
246,522✔
1287
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
246,071✔
1288
      code = colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
245,324✔
1289
      if (code != 0) {
244,324!
1290
        mError("vgId:%d, failed to set syncState, since %s", pVgroup->vgId, tstrerror(code));
×
1291
        return code;
×
1292
      }
1293

1294
      int64_t roleTimeMs = (isDnodeOnline) ? pGid->roleTimeMs : 0;
244,324✔
1295
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
244,324✔
1296
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&roleTimeMs, false);
243,955✔
1297
      if (code != 0) {
243,455!
1298
        mError("vgId:%d, failed to set roleTimeMs, since %s", pVgroup->vgId, tstrerror(code));
×
1299
        return code;
×
1300
      }
1301

1302
      int64_t startTimeMs = (isDnodeOnline) ? pGid->startTimeMs : 0;
243,455✔
1303
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
243,455✔
1304
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&startTimeMs, false);
243,162✔
1305
      if (code != 0) {
242,850!
1306
        mError("vgId:%d, failed to set startTimeMs, since %s", pVgroup->vgId, tstrerror(code));
×
1307
        return code;
×
1308
      }
1309

1310
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
242,850✔
1311
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pGid->syncRestore, false);
242,488✔
1312
      if (code != 0) {
242,374!
1313
        mError("vgId:%d, failed to set syncRestore, since %s", pVgroup->vgId, tstrerror(code));
×
1314
        return code;
×
1315
      }
1316

1317
      numOfRows++;
242,374✔
1318
      sdbRelease(pSdb, pDnode);
242,374✔
1319
    }
1320

1321
    sdbRelease(pSdb, pVgroup);
247,172✔
1322
  }
1323

1324
  pShow->numOfRows += numOfRows;
5,142✔
1325
  return numOfRows;
5,142✔
1326
}
1327

1328
static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter) {
×
1329
  SSdb *pSdb = pMnode->pSdb;
×
1330
  sdbCancelFetchByType(pSdb, pIter, SDB_VGROUP);
×
1331
}
×
1332

1333
static int32_t mndAddVnodeToVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray) {
210✔
1334
  int32_t code = 0;
210✔
1335
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
210✔
1336
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
823✔
1337
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
613✔
1338
    mInfo("dnode:%d, equivalent vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes);
613!
1339
  }
1340

1341
  SVnodeGid *pVgid = &pVgroup->vnodeGid[pVgroup->replica];
210✔
1342
  for (int32_t d = 0; d < taosArrayGetSize(pArray); ++d) {
259✔
1343
    SDnodeObj *pDnode = taosArrayGet(pArray, d);
248✔
1344

1345
    bool used = false;
248✔
1346
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
569✔
1347
      if (pDnode->id == pVgroup->vnodeGid[vn].dnodeId) {
370✔
1348
        used = true;
49✔
1349
        break;
49✔
1350
      }
1351
    }
1352
    if (used) continue;
248✔
1353

1354
    if (pDnode == NULL) {
199!
1355
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_DNODES);
×
1356
    }
1357
    if (pDnode->numOfVnodes >= pDnode->numOfSupportVnodes) {
199!
1358
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_VNODES);
×
1359
    }
1360

1361
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
199✔
1362
    if (pDnode->memAvail - vgMem - pDnode->memUsed <= 0) {
199!
1363
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
1364
             pVgroup->dbName, pVgroup->vgId, vgMem, pDnode->id, pDnode->memAvail, pDnode->memUsed);
1365
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE);
×
1366
    } else {
1367
      pDnode->memUsed += vgMem;
199✔
1368
    }
1369

1370
    pVgid->dnodeId = pDnode->id;
199✔
1371
    pVgid->syncState = TAOS_SYNC_STATE_OFFLINE;
199✔
1372
    mInfo("db:%s, vgId:%d, vn:%d is added, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
199!
1373
          pVgroup->dbName, pVgroup->vgId, pVgroup->replica, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
1374

1375
    pVgroup->replica++;
199✔
1376
    pDnode->numOfVnodes++;
199✔
1377

1378
    SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
199✔
1379
    if (pVgRaw == NULL) {
199!
1380
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1381
      if (terrno != 0) code = terrno;
×
1382
      TAOS_RETURN(code);
×
1383
    }
1384
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
199!
1385
      sdbFreeRaw(pVgRaw);
×
1386
      TAOS_RETURN(code);
×
1387
    }
1388
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
199✔
1389
    if (code != 0) {
199!
1390
      mError("vgId:%d, failed to set raw status since %s at line:%d", pVgroup->vgId, tstrerror(code), __LINE__);
×
1391
    }
1392
    TAOS_RETURN(code);
199✔
1393
  }
1394

1395
  code = TSDB_CODE_MND_NO_ENOUGH_DNODES;
11✔
1396
  mError("db:%s, failed to add vnode to vgId:%d since %s", pVgroup->dbName, pVgroup->vgId, tstrerror(code));
11!
1397
  TAOS_RETURN(code);
11✔
1398
}
1399

1400
static int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray,
37✔
1401
                                        SVnodeGid *pDelVgid) {
1402
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
37✔
1403
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
162✔
1404
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
125✔
1405
    mInfo("dnode:%d, equivalent vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes);
125!
1406
  }
1407

1408
  int32_t code = -1;
37✔
1409
  for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) {
49!
1410
    SDnodeObj *pDnode = taosArrayGet(pArray, d);
49✔
1411

1412
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
107✔
1413
      SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
95✔
1414
      if (pVgid->dnodeId == pDnode->id) {
95✔
1415
        int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
37✔
1416
        pDnode->memUsed -= vgMem;
37✔
1417
        mInfo("db:%s, vgId:%d, vn:%d is removed, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
37!
1418
              pVgroup->dbName, pVgroup->vgId, vn, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
1419
        pDnode->numOfVnodes--;
37✔
1420
        pVgroup->replica--;
37✔
1421
        *pDelVgid = *pVgid;
37✔
1422
        *pVgid = pVgroup->vnodeGid[pVgroup->replica];
37✔
1423
        memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
37✔
1424
        code = 0;
37✔
1425
        goto _OVER;
37✔
1426
      }
1427
    }
1428
  }
1429

1430
_OVER:
×
1431
  if (code != 0) {
37!
1432
    code = TSDB_CODE_APP_ERROR;
×
1433
    mError("db:%s, failed to remove vnode from vgId:%d since %s", pVgroup->dbName, pVgroup->vgId, tstrerror(code));
×
1434
    TAOS_RETURN(code);
×
1435
  }
1436

1437
  for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
107✔
1438
    SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
70✔
1439
    mInfo("db:%s, vgId:%d, vn:%d dnode:%d is reserved", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId);
70!
1440
  }
1441

1442
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
37✔
1443
  if (pVgRaw == NULL) {
37!
1444
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1445
    if (terrno != 0) code = terrno;
×
1446
    TAOS_RETURN(code);
×
1447
  }
1448
  if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) {
37!
1449
    sdbFreeRaw(pVgRaw);
×
1450
    TAOS_RETURN(code);
×
1451
  }
1452
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
37✔
1453
  if (code != 0) {
37!
1454
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVgroup->vgId, tstrerror(code), __LINE__);
×
1455
  }
1456

1457
  TAOS_RETURN(code);
37✔
1458
}
1459

1460
static int32_t mndRemoveVnodeFromVgroupWithoutSave(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray,
×
1461
                                                   SVnodeGid *pDelVgid) {
1462
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
×
1463
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
×
1464
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
×
1465
    mInfo("dnode:%d, equivalent vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes);
×
1466
  }
1467

1468
  int32_t code = -1;
×
1469
  for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) {
×
1470
    SDnodeObj *pDnode = taosArrayGet(pArray, d);
×
1471

1472
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
×
1473
      SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
×
1474
      if (pVgid->dnodeId == pDnode->id) {
×
1475
        int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
×
1476
        pDnode->memUsed -= vgMem;
×
1477
        mInfo("db:%s, vgId:%d, vn:%d is removed, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
×
1478
              pVgroup->dbName, pVgroup->vgId, vn, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
1479
        pDnode->numOfVnodes--;
×
1480
        pVgroup->replica--;
×
1481
        *pDelVgid = *pVgid;
×
1482
        *pVgid = pVgroup->vnodeGid[pVgroup->replica];
×
1483
        memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
×
1484
        code = 0;
×
1485
        goto _OVER;
×
1486
      }
1487
    }
1488
  }
1489

1490
_OVER:
×
1491
  if (code != 0) {
×
1492
    code = TSDB_CODE_APP_ERROR;
×
1493
    mError("db:%s, failed to remove vnode from vgId:%d since %s", pVgroup->dbName, pVgroup->vgId, tstrerror(code));
×
1494
    TAOS_RETURN(code);
×
1495
  }
1496

1497
  for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
×
1498
    SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
×
1499
    mInfo("db:%s, vgId:%d, vn:%d dnode:%d is reserved", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId);
×
1500
  }
1501

1502
  TAOS_RETURN(code);
×
1503
}
1504

1505
int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid) {
11,230✔
1506
  int32_t      code = 0;
11,230✔
1507
  STransAction action = {0};
11,230✔
1508

1509
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
11,230✔
1510
  if (pDnode == NULL) return -1;
11,230!
1511
  action.epSet = mndGetDnodeEpset(pDnode);
11,230✔
1512
  mndReleaseDnode(pMnode, pDnode);
11,230✔
1513

1514
  int32_t contLen = 0;
11,230✔
1515
  void   *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
11,230✔
1516
  if (pReq == NULL) return -1;
11,230!
1517

1518
  action.pCont = pReq;
11,230✔
1519
  action.contLen = contLen;
11,230✔
1520
  action.msgType = TDMT_DND_CREATE_VNODE;
11,230✔
1521
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
11,230✔
1522

1523
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
11,230!
1524
    taosMemoryFree(pReq);
×
1525
    TAOS_RETURN(code);
×
1526
  }
1527

1528
  TAOS_RETURN(code);
11,230✔
1529
}
1530

1531
int32_t mndRestoreAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
10✔
1532
                                       SDnodeObj *pDnode) {
1533
  int32_t      code = 0;
10✔
1534
  STransAction action = {0};
10✔
1535

1536
  action.epSet = mndGetDnodeEpset(pDnode);
10✔
1537

1538
  int32_t contLen = 0;
10✔
1539
  void   *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
10✔
1540
  if (pReq == NULL) {
10!
1541
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1542
    if (terrno != 0) code = terrno;
×
1543
    TAOS_RETURN(code);
×
1544
  }
1545

1546
  action.pCont = pReq;
10✔
1547
  action.contLen = contLen;
10✔
1548
  action.msgType = TDMT_DND_CREATE_VNODE;
10✔
1549
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
10✔
1550

1551
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
10!
1552
    taosMemoryFree(pReq);
×
1553
    TAOS_RETURN(code);
×
1554
  }
1555

1556
  TAOS_RETURN(code);
10✔
1557
}
1558

1559
int32_t mndAddAlterVnodeConfirmAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
649✔
1560
  int32_t      code = 0;
649✔
1561
  STransAction action = {0};
649✔
1562
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
649✔
1563

1564
  mInfo("vgId:%d, build alter vnode confirm req", pVgroup->vgId);
649!
1565
  int32_t   contLen = sizeof(SMsgHead);
649✔
1566
  SMsgHead *pHead = taosMemoryMalloc(contLen);
649✔
1567
  if (pHead == NULL) {
649!
1568
    TAOS_RETURN(terrno);
×
1569
  }
1570

1571
  pHead->contLen = htonl(contLen);
649✔
1572
  pHead->vgId = htonl(pVgroup->vgId);
649✔
1573

1574
  action.pCont = pHead;
649✔
1575
  action.contLen = contLen;
649✔
1576
  action.msgType = TDMT_VND_ALTER_CONFIRM;
649✔
1577
  // incorrect redirect result will cause this erro
1578
  action.retryCode = TSDB_CODE_VND_INVALID_VGROUP_ID;
649✔
1579

1580
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
649!
1581
    taosMemoryFree(pHead);
×
1582
    TAOS_RETURN(code);
×
1583
  }
1584

1585
  TAOS_RETURN(code);
649✔
1586
}
1587

1588
int32_t mndAddChangeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pOldVgroup, SVgObj *pNewVgroup,
×
1589
                                 int32_t dnodeId) {
1590
  int32_t      code = 0;
×
1591
  STransAction action = {0};
×
1592
  action.epSet = mndGetVgroupEpset(pMnode, pNewVgroup);
×
1593

1594
  int32_t contLen = 0;
×
1595
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pNewVgroup, dnodeId, &contLen);
×
1596
  if (pReq == NULL) {
×
1597
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1598
    if (terrno != 0) code = terrno;
×
1599
    TAOS_RETURN(code);
×
1600
  }
1601

1602
  int32_t totallen = contLen + sizeof(SMsgHead);
×
1603

1604
  SMsgHead *pHead = taosMemoryMalloc(totallen);
×
1605
  if (pHead == NULL) {
×
1606
    taosMemoryFree(pReq);
×
1607
    TAOS_RETURN(terrno);
×
1608
  }
1609

1610
  pHead->contLen = htonl(totallen);
×
1611
  pHead->vgId = htonl(pNewVgroup->vgId);
×
1612

1613
  memcpy((void *)(pHead + 1), pReq, contLen);
×
1614
  taosMemoryFree(pReq);
×
1615

1616
  action.pCont = pHead;
×
1617
  action.contLen = totallen;
×
1618
  action.msgType = TDMT_SYNC_CONFIG_CHANGE;
×
1619

1620
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
1621
    taosMemoryFree(pHead);
×
1622
    TAOS_RETURN(code);
×
1623
  }
1624

1625
  TAOS_RETURN(code);
×
1626
}
1627

1628
static int32_t mndAddAlterVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, int32_t srcVgId, SVgObj *pVgroup) {
108✔
1629
  int32_t      code = 0;
108✔
1630
  STransAction action = {0};
108✔
1631
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
108✔
1632

1633
  int32_t contLen = 0;
108✔
1634
  void   *pReq = mndBuildAlterVnodeHashRangeReq(pMnode, srcVgId, pVgroup, &contLen);
108✔
1635
  if (pReq == NULL) {
108!
1636
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1637
    if (terrno != 0) code = terrno;
×
1638
    TAOS_RETURN(code);
×
1639
  }
1640

1641
  action.pCont = pReq;
108✔
1642
  action.contLen = contLen;
108✔
1643
  action.msgType = TDMT_VND_ALTER_HASHRANGE;
108✔
1644
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
108✔
1645

1646
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
108!
1647
    taosMemoryFree(pReq);
×
1648
    TAOS_RETURN(code);
×
1649
  }
1650

1651
  mInfo("trans:%d, add alter vnode hash range action for from vgId:%d to vgId:%d", pTrans->id, srcVgId, pVgroup->vgId);
108!
1652
  TAOS_RETURN(code);
108✔
1653
}
1654

1655
int32_t mndAddAlterVnodeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
696✔
1656
  int32_t      code = 0;
696✔
1657
  STransAction action = {0};
696✔
1658
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
696✔
1659

1660
  int32_t contLen = 0;
696✔
1661
  void   *pReq = mndBuildAlterVnodeConfigReq(pMnode, pDb, pVgroup, &contLen);
696✔
1662
  if (pReq == NULL) {
696!
1663
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1664
    if (terrno != 0) code = terrno;
×
1665
    TAOS_RETURN(code);
×
1666
  }
1667

1668
  action.pCont = pReq;
696✔
1669
  action.contLen = contLen;
696✔
1670
  action.msgType = TDMT_VND_ALTER_CONFIG;
696✔
1671

1672
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
696!
1673
    taosMemoryFree(pReq);
×
1674
    TAOS_RETURN(code);
×
1675
  }
1676

1677
  TAOS_RETURN(code);
696✔
1678
}
1679

1680
int32_t mndAddNewVgPrepareAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVg) {
10,230✔
1681
  int32_t  code = 0;
10,230✔
1682
  SSdbRaw *pRaw = mndVgroupActionEncode(pVg);
10,230✔
1683
  if (pRaw == NULL) {
10,230!
1684
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1685
    if (terrno != 0) code = terrno;
×
1686
    goto _err;
×
1687
  }
1688

1689
  TAOS_CHECK_GOTO(mndTransAppendPrepareLog(pTrans, pRaw), NULL, _err);
10,230!
1690
  if (sdbSetRawStatus(pRaw, SDB_STATUS_CREATING) != 0) {
10,230!
1691
    mError("vgId:%d, failed to set raw status at line:%d", pVg->vgId, __LINE__);
×
1692
  }
1693
  if (code != 0) {
10,230!
1694
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVg->vgId, tstrerror(code), __LINE__);
×
1695
    TAOS_RETURN(code);
×
1696
  }
1697
  pRaw = NULL;
10,230✔
1698
  TAOS_RETURN(code);
10,230✔
1699

1700
_err:
×
1701
  sdbFreeRaw(pRaw);
×
1702
  TAOS_RETURN(code);
×
1703
}
1704

1705
int32_t mndAddAlterVnodeReplicaAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
1,338✔
1706
  int32_t    code = 0;
1,338✔
1707
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
1,338✔
1708
  if (pDnode == NULL) {
1,338!
1709
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1710
    if (terrno != 0) code = terrno;
×
1711
    TAOS_RETURN(code);
×
1712
  }
1713

1714
  STransAction action = {0};
1,338✔
1715
  action.epSet = mndGetDnodeEpset(pDnode);
1,338✔
1716
  mndReleaseDnode(pMnode, pDnode);
1,338✔
1717

1718
  int32_t contLen = 0;
1,338✔
1719
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
1,338✔
1720
  if (pReq == NULL) {
1,338!
1721
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1722
    if (terrno != 0) code = terrno;
×
1723
    TAOS_RETURN(code);
×
1724
  }
1725

1726
  action.pCont = pReq;
1,338✔
1727
  action.contLen = contLen;
1,338✔
1728
  action.msgType = TDMT_VND_ALTER_REPLICA;
1,338✔
1729

1730
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
1,338!
1731
    taosMemoryFree(pReq);
×
1732
    TAOS_RETURN(code);
×
1733
  }
1734

1735
  TAOS_RETURN(code);
1,338✔
1736
}
1737

1738
int32_t mndAddCheckLearnerCatchupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
×
1739
  int32_t    code = 0;
×
1740
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
×
1741
  if (pDnode == NULL) {
×
1742
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1743
    if (terrno != 0) code = terrno;
×
1744
    TAOS_RETURN(code);
×
1745
  }
1746

1747
  STransAction action = {0};
×
1748
  action.epSet = mndGetDnodeEpset(pDnode);
×
1749
  mndReleaseDnode(pMnode, pDnode);
×
1750

1751
  int32_t contLen = 0;
×
1752
  void   *pReq = mndBuildCheckLearnCatchupReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
×
1753
  if (pReq == NULL) {
×
1754
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1755
    if (terrno != 0) code = terrno;
×
1756
    TAOS_RETURN(code);
×
1757
  }
1758

1759
  action.pCont = pReq;
×
1760
  action.contLen = contLen;
×
1761
  action.msgType = TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP;
×
1762
  action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
1763
  action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
×
1764

1765
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
1766
    taosMemoryFree(pReq);
×
1767
    TAOS_RETURN(code);
×
1768
  }
1769

1770
  TAOS_RETURN(code);
×
1771
}
1772

1773
int32_t mndAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
227✔
1774
  int32_t    code = 0;
227✔
1775
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
227✔
1776
  if (pDnode == NULL) {
227!
1777
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1778
    if (terrno != 0) code = terrno;
×
1779
    TAOS_RETURN(code);
×
1780
  }
1781

1782
  STransAction action = {0};
227✔
1783
  action.epSet = mndGetDnodeEpset(pDnode);
227✔
1784
  mndReleaseDnode(pMnode, pDnode);
227✔
1785

1786
  int32_t contLen = 0;
227✔
1787
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
227✔
1788
  if (pReq == NULL) {
227!
1789
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1790
    if (terrno != 0) code = terrno;
×
1791
    TAOS_RETURN(code);
×
1792
  }
1793

1794
  action.pCont = pReq;
227✔
1795
  action.contLen = contLen;
227✔
1796
  action.msgType = TDMT_DND_ALTER_VNODE_TYPE;
227✔
1797
  action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
227✔
1798
  action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
227✔
1799

1800
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
227!
1801
    taosMemoryFree(pReq);
×
1802
    TAOS_RETURN(code);
×
1803
  }
1804

1805
  TAOS_RETURN(code);
227✔
1806
}
1807

1808
int32_t mndRestoreAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
10✔
1809
                                          SDnodeObj *pDnode) {
1810
  int32_t      code = 0;
10✔
1811
  STransAction action = {0};
10✔
1812
  action.epSet = mndGetDnodeEpset(pDnode);
10✔
1813

1814
  int32_t contLen = 0;
10✔
1815
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, pDnode->id, &contLen);
10✔
1816
  if (pReq == NULL) {
10!
1817
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1818
    if (terrno != 0) code = terrno;
×
1819
    TAOS_RETURN(code);
×
1820
  }
1821

1822
  action.pCont = pReq;
10✔
1823
  action.contLen = contLen;
10✔
1824
  action.msgType = TDMT_DND_ALTER_VNODE_TYPE;
10✔
1825
  action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
10✔
1826
  action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
10✔
1827

1828
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
10!
1829
    taosMemoryFree(pReq);
×
1830
    TAOS_RETURN(code);
×
1831
  }
1832

1833
  TAOS_RETURN(code);
10✔
1834
}
1835

1836
static int32_t mndAddDisableVnodeWriteAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
108✔
1837
                                             int32_t dnodeId) {
1838
  int32_t    code = 0;
108✔
1839
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
108✔
1840
  if (pDnode == NULL) {
108!
1841
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1842
    if (terrno != 0) code = terrno;
×
1843
    TAOS_RETURN(code);
×
1844
  }
1845

1846
  STransAction action = {0};
108✔
1847
  action.epSet = mndGetDnodeEpset(pDnode);
108✔
1848
  mndReleaseDnode(pMnode, pDnode);
108✔
1849

1850
  int32_t contLen = 0;
108✔
1851
  void   *pReq = mndBuildDisableVnodeWriteReq(pMnode, pDb, pVgroup->vgId, &contLen);
108✔
1852
  if (pReq == NULL) {
108!
1853
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1854
    if (terrno != 0) code = terrno;
×
1855
    TAOS_RETURN(code);
×
1856
  }
1857

1858
  action.pCont = pReq;
108✔
1859
  action.contLen = contLen;
108✔
1860
  action.msgType = TDMT_VND_DISABLE_WRITE;
108✔
1861

1862
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
108!
1863
    taosMemoryFree(pReq);
×
1864
    TAOS_RETURN(code);
×
1865
  }
1866

1867
  TAOS_RETURN(code);
108✔
1868
}
1869

1870
int32_t mndAddDropVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid,
16,092✔
1871
                              bool isRedo) {
1872
  int32_t      code = 0;
16,092✔
1873
  STransAction action = {0};
16,092✔
1874

1875
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
16,092✔
1876
  if (pDnode == NULL) {
16,092!
1877
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1878
    if (terrno != 0) code = terrno;
×
1879
    TAOS_RETURN(code);
×
1880
  }
1881
  action.epSet = mndGetDnodeEpset(pDnode);
16,092✔
1882
  mndReleaseDnode(pMnode, pDnode);
16,092✔
1883

1884
  int32_t contLen = 0;
16,092✔
1885
  void   *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
16,092✔
1886
  if (pReq == NULL) {
16,092!
1887
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1888
    if (terrno != 0) code = terrno;
×
1889
    TAOS_RETURN(code);
×
1890
  }
1891

1892
  action.pCont = pReq;
16,092✔
1893
  action.contLen = contLen;
16,092✔
1894
  action.msgType = TDMT_DND_DROP_VNODE;
16,092✔
1895
  action.acceptableCode = TSDB_CODE_VND_NOT_EXIST;
16,092✔
1896

1897
  if (isRedo) {
16,092✔
1898
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
5,187!
1899
      taosMemoryFree(pReq);
×
1900
      TAOS_RETURN(code);
×
1901
    }
1902
  } else {
1903
    if ((code = mndTransAppendUndoAction(pTrans, &action)) != 0) {
10,905!
1904
      taosMemoryFree(pReq);
×
1905
      TAOS_RETURN(code);
×
1906
    }
1907
  }
1908

1909
  TAOS_RETURN(code);
16,092✔
1910
}
1911

1912
int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t vnIndex,
33✔
1913
                                    SArray *pArray, bool force, bool unsafe) {
1914
  int32_t code = 0;
33✔
1915
  SVgObj  newVg = {0};
33✔
1916
  memcpy(&newVg, pVgroup, sizeof(SVgObj));
33✔
1917

1918
  mInfo("vgId:%d, vgroup info before move, replica:%d", newVg.vgId, newVg.replica);
33!
1919
  for (int32_t i = 0; i < newVg.replica; ++i) {
102✔
1920
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
69!
1921
  }
1922

1923
  if (!force) {
33✔
1924
#if 1
1925
    {
1926
#else
1927
    if (newVg.replica == 1) {
1928
#endif
1929
      mInfo("vgId:%d, will add 1 vnode, replca:%d", pVgroup->vgId, newVg.replica);
29!
1930
      TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray));
29✔
1931
      for (int32_t i = 0; i < newVg.replica - 1; ++i) {
88✔
1932
        TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId));
60!
1933
      }
1934
      TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[newVg.replica - 1]));
28!
1935
      TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg));
28!
1936

1937
      mInfo("vgId:%d, will remove 1 vnode, replca:2", pVgroup->vgId);
28!
1938
      newVg.replica--;
28✔
1939
      SVnodeGid del = newVg.vnodeGid[vnIndex];
28✔
1940
      newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
28✔
1941
      memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
28✔
1942
      {
1943
        SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
28✔
1944
        if (pRaw == NULL) {
28!
1945
          code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1946
          if (terrno != 0) code = terrno;
×
1947
          TAOS_RETURN(code);
×
1948
        }
1949
        if ((code = mndTransAppendRedolog(pTrans, pRaw)) != 0) {
28!
1950
          sdbFreeRaw(pRaw);
×
1951
          TAOS_RETURN(code);
×
1952
        }
1953
        code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
28✔
1954
        if (code != 0) {
28!
1955
          mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
1956
          return code;
×
1957
        }
1958
      }
1959

1960
      TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true));
28!
1961
      for (int32_t i = 0; i < newVg.replica; ++i) {
88✔
1962
        TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId));
60!
1963
      }
1964
      TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg));
28!
1965
#if 1
1966
    }
1967
#else
1968
    } else {  // new replica == 3
1969
      mInfo("vgId:%d, will add 1 vnode, replca:3", pVgroup->vgId);
1970
      if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1;
1971
      mInfo("vgId:%d, will remove 1 vnode, replca:4", pVgroup->vgId);
1972
      newVg.replica--;
1973
      SVnodeGid del = newVg.vnodeGid[vnIndex];
1974
      newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
1975
      memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
1976
      {
1977
        SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
1978
        if (pRaw == NULL) return -1;
1979
        if (mndTransAppendRedolog(pTrans, pRaw) != 0) {
1980
          sdbFreeRaw(pRaw);
1981
          return -1;
1982
        }
1983
      }
1984

1985
      if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true) != 0) return -1;
1986
      for (int32_t i = 0; i < newVg.replica; ++i) {
1987
        if (i == vnIndex) continue;
1988
        if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1;
1989
      }
1990
      if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[vnIndex]) != 0) return -1;
1991
      if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
1992
    }
1993
#endif
1994
  } else {
1995
    mInfo("vgId:%d, will add 1 vnode and force remove 1 vnode", pVgroup->vgId);
4!
1996
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray));
4!
1997
    newVg.replica--;
4✔
1998
    // SVnodeGid del = newVg.vnodeGid[vnIndex];
1999
    newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
4✔
2000
    memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
4✔
2001
    {
2002
      SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
4✔
2003
      if (pRaw == NULL) {
4!
2004
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2005
        if (terrno != 0) code = terrno;
×
2006
        TAOS_RETURN(code);
×
2007
      }
2008
      if ((code = mndTransAppendRedolog(pTrans, pRaw)) != 0) {
4!
2009
        sdbFreeRaw(pRaw);
×
2010
        TAOS_RETURN(code);
×
2011
      }
2012
      code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
4✔
2013
      if (code != 0) {
4!
2014
        mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2015
        return code;
×
2016
      }
2017
    }
2018

2019
    for (int32_t i = 0; i < newVg.replica; ++i) {
12✔
2020
      if (i != vnIndex) {
8✔
2021
        TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId));
4!
2022
      }
2023
    }
2024
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[vnIndex]));
4!
2025
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg));
4!
2026

2027
    if (newVg.replica == 1) {
4✔
2028
      if (force && !unsafe) {
2!
2029
        TAOS_RETURN(TSDB_CODE_VND_META_DATA_UNSAFE_DELETE);
1✔
2030
      }
2031

2032
      SSdb *pSdb = pMnode->pSdb;
1✔
2033
      void *pIter = NULL;
1✔
2034

2035
      while (1) {
3✔
2036
        SStbObj *pStb = NULL;
4✔
2037
        pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
4✔
2038
        if (pIter == NULL) break;
4✔
2039

2040
        if (strcmp(pStb->db, pDb->name) == 0) {
3✔
2041
          if ((code = mndSetForceDropCreateStbRedoActions(pMnode, pTrans, &newVg, pStb)) != 0) {
2!
2042
            sdbCancelFetch(pSdb, pIter);
×
2043
            sdbRelease(pSdb, pStb);
×
2044
            TAOS_RETURN(code);
×
2045
          }
2046
        }
2047

2048
        sdbRelease(pSdb, pStb);
3✔
2049
      }
2050

2051
      mInfo("vgId:%d, all data is dropped since replica=1", pVgroup->vgId);
1!
2052
    }
2053
  }
2054

2055
  {
2056
    SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
31✔
2057
    if (pRaw == NULL) {
31!
2058
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2059
      if (terrno != 0) code = terrno;
×
2060
      TAOS_RETURN(code);
×
2061
    }
2062
    if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
31!
2063
      sdbFreeRaw(pRaw);
×
2064
      TAOS_RETURN(code);
×
2065
    }
2066
    code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
31✔
2067
    if (code != 0) {
31!
2068
      mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2069
      return code;
×
2070
    }
2071
  }
2072

2073
  mInfo("vgId:%d, vgroup info after move, replica:%d", newVg.vgId, newVg.replica);
31!
2074
  for (int32_t i = 0; i < newVg.replica; ++i) {
98✔
2075
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
67!
2076
  }
2077
  TAOS_RETURN(code);
31✔
2078
}
2079

2080
int32_t mndSetMoveVgroupsInfoToTrans(SMnode *pMnode, STrans *pTrans, int32_t delDnodeId, bool force, bool unsafe) {
16✔
2081
  int32_t code = 0;
16✔
2082
  SArray *pArray = mndBuildDnodesArray(pMnode, delDnodeId, NULL);
16✔
2083
  if (pArray == NULL) {
16!
2084
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2085
    if (terrno != 0) code = terrno;
×
2086
    TAOS_RETURN(code);
×
2087
  }
2088

2089
  void *pIter = NULL;
16✔
2090
  while (1) {
44✔
2091
    SVgObj *pVgroup = NULL;
60✔
2092
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
60✔
2093
    if (pIter == NULL) break;
60✔
2094

2095
    int32_t vnIndex = -1;
46✔
2096
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
81✔
2097
      if (pVgroup->vnodeGid[i].dnodeId == delDnodeId) {
68✔
2098
        vnIndex = i;
33✔
2099
        break;
33✔
2100
      }
2101
    }
2102

2103
    code = 0;
46✔
2104
    if (vnIndex != -1) {
46✔
2105
      mInfo("vgId:%d, vnode:%d will be removed from dnode:%d, force:%d", pVgroup->vgId, vnIndex, delDnodeId, force);
33!
2106
      SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
33✔
2107
      code = mndSetMoveVgroupInfoToTrans(pMnode, pTrans, pDb, pVgroup, vnIndex, pArray, force, unsafe);
33✔
2108
      mndReleaseDb(pMnode, pDb);
33✔
2109
    }
2110

2111
    sdbRelease(pMnode->pSdb, pVgroup);
46✔
2112

2113
    if (code != 0) {
46✔
2114
      sdbCancelFetch(pMnode->pSdb, pIter);
2✔
2115
      break;
2✔
2116
    }
2117
  }
2118

2119
  taosArrayDestroy(pArray);
16✔
2120
  TAOS_RETURN(code);
16✔
2121
}
2122

2123
static int32_t mndAddIncVgroupReplicaToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
126✔
2124
                                             int32_t newDnodeId) {
2125
  int32_t code = 0;
126✔
2126
  mInfo("vgId:%d, will add 1 vnode, replica:%d dnode:%d", pVgroup->vgId, pVgroup->replica, newDnodeId);
126!
2127

2128
  // assoc dnode
2129
  SVnodeGid *pGid = &pVgroup->vnodeGid[pVgroup->replica];
126✔
2130
  pVgroup->replica++;
126✔
2131
  pGid->dnodeId = newDnodeId;
126✔
2132
  pGid->syncState = TAOS_SYNC_STATE_OFFLINE;
126✔
2133
  pGid->nodeRole = TAOS_SYNC_ROLE_LEARNER;
126✔
2134

2135
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
126✔
2136
  if (pVgRaw == NULL) {
126!
2137
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2138
    if (terrno != 0) code = terrno;
×
2139
    TAOS_RETURN(code);
×
2140
  }
2141
  if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
126!
2142
    sdbFreeRaw(pVgRaw);
×
2143
    TAOS_RETURN(code);
×
2144
  }
2145
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
126✔
2146
  if (code != 0) {
126!
2147
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVgroup->vgId, tstrerror(code), __LINE__);
×
2148
    TAOS_RETURN(code);
×
2149
  }
2150

2151
  // learner
2152
  for (int32_t i = 0; i < pVgroup->replica - 1; ++i) {
396✔
2153
    TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId));
270!
2154
  }
2155
  TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pGid));
126!
2156

2157
  // voter
2158
  pGid->nodeRole = TAOS_SYNC_ROLE_VOTER;
126✔
2159
  TAOS_CHECK_RETURN(mndAddAlterVnodeTypeAction(pMnode, pTrans, pDb, pVgroup, pGid->dnodeId));
126!
2160
  for (int32_t i = 0; i < pVgroup->replica - 1; ++i) {
396✔
2161
    TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId));
270!
2162
  }
2163

2164
  // confirm
2165
  TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup));
126!
2166

2167
  TAOS_RETURN(code);
126✔
2168
}
2169

2170
static int32_t mndAddDecVgroupReplicaFromTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
126✔
2171
                                               int32_t delDnodeId) {
2172
  int32_t code = 0;
126✔
2173
  mInfo("vgId:%d, will remove 1 vnode, replica:%d dnode:%d", pVgroup->vgId, pVgroup->replica, delDnodeId);
126!
2174

2175
  SVnodeGid *pGid = NULL;
126✔
2176
  SVnodeGid  delGid = {0};
126✔
2177
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
203!
2178
    if (pVgroup->vnodeGid[i].dnodeId == delDnodeId) {
203✔
2179
      pGid = &pVgroup->vnodeGid[i];
126✔
2180
      break;
126✔
2181
    }
2182
  }
2183

2184
  if (pGid == NULL) return 0;
126!
2185

2186
  pVgroup->replica--;
126✔
2187
  memcpy(&delGid, pGid, sizeof(SVnodeGid));
126✔
2188
  memcpy(pGid, &pVgroup->vnodeGid[pVgroup->replica], sizeof(SVnodeGid));
126✔
2189
  memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
126✔
2190

2191
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
126✔
2192
  if (pVgRaw == NULL) {
126!
2193
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2194
    if (terrno != 0) code = terrno;
×
2195
    TAOS_RETURN(code);
×
2196
  }
2197
  if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
126!
2198
    sdbFreeRaw(pVgRaw);
×
2199
    TAOS_RETURN(code);
×
2200
  }
2201
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
126✔
2202
  if (code != 0) {
126!
2203
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVgroup->vgId, tstrerror(code), __LINE__);
×
2204
    TAOS_RETURN(code);
×
2205
  }
2206

2207
  TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pDb, pVgroup, &delGid, true));
126!
2208
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
396✔
2209
    TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId));
270!
2210
  }
2211
  TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup));
126!
2212

2213
  TAOS_RETURN(code);
126✔
2214
}
2215

2216
static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup, SDnodeObj *pNew1,
80✔
2217
                                     SDnodeObj *pOld1, SDnodeObj *pNew2, SDnodeObj *pOld2, SDnodeObj *pNew3,
2218
                                     SDnodeObj *pOld3) {
2219
  int32_t code = -1;
80✔
2220
  STrans *pTrans = NULL;
80✔
2221

2222
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "red-vgroup");
80✔
2223
  if (pTrans == NULL) {
80!
2224
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2225
    if (terrno != 0) code = terrno;
×
2226
    goto _OVER;
×
2227
  }
2228

2229
  mndTransSetDbName(pTrans, pVgroup->dbName, NULL);
80✔
2230
  TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
80✔
2231

2232
  mndTransSetSerial(pTrans);
79✔
2233
  mInfo("trans:%d, used to redistribute vgroup, vgId:%d", pTrans->id, pVgroup->vgId);
79!
2234

2235
  SVgObj newVg = {0};
79✔
2236
  memcpy(&newVg, pVgroup, sizeof(SVgObj));
79✔
2237
  mInfo("vgId:%d, vgroup info before redistribute, replica:%d", newVg.vgId, newVg.replica);
79!
2238
  for (int32_t i = 0; i < newVg.replica; ++i) {
236✔
2239
    mInfo("vgId:%d, vnode:%d dnode:%d role:%s", newVg.vgId, i, newVg.vnodeGid[i].dnodeId,
157!
2240
          syncStr(newVg.vnodeGid[i].syncState));
2241
  }
2242

2243
  if (pNew1 != NULL && pOld1 != NULL) {
79!
2244
    int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew1->id);
79✔
2245
    if (numOfVnodes >= pNew1->numOfSupportVnodes) {
79✔
2246
      mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew1->id, numOfVnodes,
1!
2247
             pNew1->numOfSupportVnodes);
2248
      code = TSDB_CODE_MND_NO_ENOUGH_VNODES;
1✔
2249
      goto _OVER;
1✔
2250
    }
2251

2252
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
78✔
2253
    if (pNew1->memAvail - vgMem - pNew1->memUsed <= 0) {
78!
2254
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
2255
             pVgroup->dbName, pVgroup->vgId, vgMem, pNew1->id, pNew1->memAvail, pNew1->memUsed);
2256
      code = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
×
2257
      goto _OVER;
×
2258
    } else {
2259
      pNew1->memUsed += vgMem;
78✔
2260
    }
2261

2262
    TAOS_CHECK_GOTO(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew1->id), NULL, _OVER);
78!
2263
    TAOS_CHECK_GOTO(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld1->id), NULL, _OVER);
78!
2264
  }
2265

2266
  if (pNew2 != NULL && pOld2 != NULL) {
78!
2267
    int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew2->id);
16✔
2268
    if (numOfVnodes >= pNew2->numOfSupportVnodes) {
16!
2269
      mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew2->id, numOfVnodes,
×
2270
             pNew2->numOfSupportVnodes);
2271
      code = TSDB_CODE_MND_NO_ENOUGH_VNODES;
×
2272
      goto _OVER;
×
2273
    }
2274
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
16✔
2275
    if (pNew2->memAvail - vgMem - pNew2->memUsed <= 0) {
16!
2276
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
2277
             pVgroup->dbName, pVgroup->vgId, vgMem, pNew2->id, pNew2->memAvail, pNew2->memUsed);
2278
      code = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
×
2279
      goto _OVER;
×
2280
    } else {
2281
      pNew2->memUsed += vgMem;
16✔
2282
    }
2283
    TAOS_CHECK_GOTO(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew2->id), NULL, _OVER);
16!
2284
    TAOS_CHECK_GOTO(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld2->id), NULL, _OVER);
16!
2285
  }
2286

2287
  if (pNew3 != NULL && pOld3 != NULL) {
78!
2288
    int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew3->id);
8✔
2289
    if (numOfVnodes >= pNew3->numOfSupportVnodes) {
8!
2290
      mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew3->id, numOfVnodes,
×
2291
             pNew3->numOfSupportVnodes);
2292
      code = TSDB_CODE_MND_NO_ENOUGH_VNODES;
×
2293
      goto _OVER;
×
2294
    }
2295
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
8✔
2296
    if (pNew3->memAvail - vgMem - pNew3->memUsed <= 0) {
8!
2297
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
2298
             pVgroup->dbName, pVgroup->vgId, vgMem, pNew3->id, pNew3->memAvail, pNew3->memUsed);
2299
      code = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
×
2300
      goto _OVER;
×
2301
    } else {
2302
      pNew3->memUsed += vgMem;
8✔
2303
    }
2304
    TAOS_CHECK_GOTO(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew3->id), NULL, _OVER);
8!
2305
    TAOS_CHECK_GOTO(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld3->id), NULL, _OVER);
8!
2306
  }
2307

2308
  {
2309
    SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
78✔
2310
    if (pRaw == NULL) {
78!
2311
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2312
      if (terrno != 0) code = terrno;
×
2313
      goto _OVER;
×
2314
    }
2315
    if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
78!
2316
      sdbFreeRaw(pRaw);
×
2317
      goto _OVER;
×
2318
    }
2319
    code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
78✔
2320
    if (code != 0) {
78!
2321
      mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2322
      goto _OVER;
×
2323
    }
2324
  }
2325

2326
  mInfo("vgId:%d, vgroup info after redistribute, replica:%d", newVg.vgId, newVg.replica);
78!
2327
  for (int32_t i = 0; i < newVg.replica; ++i) {
232✔
2328
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
154!
2329
  }
2330

2331
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
78✔
2332
  code = 0;
77✔
2333

2334
_OVER:
80✔
2335
  mndTransDrop(pTrans);
80✔
2336
  mndReleaseDb(pMnode, pDb);
80✔
2337
  TAOS_RETURN(code);
80✔
2338
}
2339

2340
static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) {
95✔
2341
  SMnode    *pMnode = pReq->info.node;
95✔
2342
  SDnodeObj *pNew1 = NULL;
95✔
2343
  SDnodeObj *pNew2 = NULL;
95✔
2344
  SDnodeObj *pNew3 = NULL;
95✔
2345
  SDnodeObj *pOld1 = NULL;
95✔
2346
  SDnodeObj *pOld2 = NULL;
95✔
2347
  SDnodeObj *pOld3 = NULL;
95✔
2348
  SVgObj    *pVgroup = NULL;
95✔
2349
  SDbObj    *pDb = NULL;
95✔
2350
  int32_t    code = -1;
95✔
2351
  int64_t    curMs = taosGetTimestampMs();
95✔
2352
  int32_t    newDnodeId[3] = {0};
95✔
2353
  int32_t    oldDnodeId[3] = {0};
95✔
2354
  int32_t    newIndex = -1;
95✔
2355
  int32_t    oldIndex = -1;
95✔
2356

2357
  SRedistributeVgroupReq req = {0};
95✔
2358
  if (tDeserializeSRedistributeVgroupReq(pReq->pCont, pReq->contLen, &req) != 0) {
95!
2359
    code = TSDB_CODE_INVALID_MSG;
×
2360
    goto _OVER;
×
2361
  }
2362

2363
  mInfo("vgId:%d, start to redistribute vgroup to dnode %d:%d:%d", req.vgId, req.dnodeId1, req.dnodeId2, req.dnodeId3);
95!
2364
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_REDISTRIBUTE_VGROUP)) != 0) {
95✔
2365
    goto _OVER;
1✔
2366
  }
2367

2368
  pVgroup = mndAcquireVgroup(pMnode, req.vgId);
94✔
2369
  if (pVgroup == NULL) {
94✔
2370
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
3✔
2371
    if (terrno != 0) code = terrno;
3!
2372
    goto _OVER;
3✔
2373
  }
2374

2375
  pDb = mndAcquireDb(pMnode, pVgroup->dbName);
91✔
2376
  if (pDb == NULL) {
91!
2377
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2378
    if (terrno != 0) code = terrno;
×
2379
    goto _OVER;
×
2380
  }
2381

2382
  if (pVgroup->replica == 1) {
91✔
2383
    if (req.dnodeId1 <= 0 || req.dnodeId2 > 0 || req.dnodeId3 > 0) {
44!
2384
      code = TSDB_CODE_MND_INVALID_REPLICA;
×
2385
      goto _OVER;
×
2386
    }
2387

2388
    if (req.dnodeId1 == pVgroup->vnodeGid[0].dnodeId) {
44✔
2389
      // terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
2390
      code = 0;
1✔
2391
      goto _OVER;
1✔
2392
    }
2393

2394
    pNew1 = mndAcquireDnode(pMnode, req.dnodeId1);
43✔
2395
    if (pNew1 == NULL) {
43!
2396
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2397
      if (terrno != 0) code = terrno;
×
2398
      goto _OVER;
×
2399
    }
2400
    if (!mndIsDnodeOnline(pNew1, curMs)) {
43!
UNCOV
2401
      code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
UNCOV
2402
      goto _OVER;
×
2403
    }
2404

2405
    pOld1 = mndAcquireDnode(pMnode, pVgroup->vnodeGid[0].dnodeId);
43✔
2406
    if (pOld1 == NULL) {
43!
2407
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2408
      if (terrno != 0) code = terrno;
×
2409
      goto _OVER;
×
2410
    }
2411
    if (!mndIsDnodeOnline(pOld1, curMs)) {
43✔
2412
      code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
2✔
2413
      goto _OVER;
2✔
2414
    }
2415

2416
    code = mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, NULL, NULL, NULL, NULL);
41✔
2417

2418
  } else if (pVgroup->replica == 3) {
47!
2419
    if (req.dnodeId1 <= 0 || req.dnodeId2 <= 0 || req.dnodeId3 <= 0) {
47!
2420
      code = TSDB_CODE_MND_INVALID_REPLICA;
4✔
2421
      goto _OVER;
4✔
2422
    }
2423

2424
    if (req.dnodeId1 == req.dnodeId2 || req.dnodeId1 == req.dnodeId3 || req.dnodeId2 == req.dnodeId3) {
43!
2425
      code = TSDB_CODE_MND_INVALID_REPLICA;
1✔
2426
      goto _OVER;
1✔
2427
    }
2428

2429
    if (req.dnodeId1 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId1 != pVgroup->vnodeGid[1].dnodeId &&
42✔
2430
        req.dnodeId1 != pVgroup->vnodeGid[2].dnodeId) {
21✔
2431
      newDnodeId[++newIndex] = req.dnodeId1;
18✔
2432
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
18!
2433
    }
2434

2435
    if (req.dnodeId2 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[1].dnodeId &&
42✔
2436
        req.dnodeId2 != pVgroup->vnodeGid[2].dnodeId) {
28✔
2437
      newDnodeId[++newIndex] = req.dnodeId2;
21✔
2438
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
21!
2439
    }
2440

2441
    if (req.dnodeId3 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId3 != pVgroup->vnodeGid[1].dnodeId &&
42✔
2442
        req.dnodeId3 != pVgroup->vnodeGid[2].dnodeId) {
34✔
2443
      newDnodeId[++newIndex] = req.dnodeId3;
28✔
2444
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
28!
2445
    }
2446

2447
    if (req.dnodeId1 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[0].dnodeId &&
42✔
2448
        req.dnodeId3 != pVgroup->vnodeGid[0].dnodeId) {
21✔
2449
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[0].dnodeId;
20✔
2450
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
20!
2451
    }
2452

2453
    if (req.dnodeId1 != pVgroup->vnodeGid[1].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[1].dnodeId &&
42✔
2454
        req.dnodeId3 != pVgroup->vnodeGid[1].dnodeId) {
28✔
2455
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[1].dnodeId;
21✔
2456
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
21!
2457
    }
2458

2459
    if (req.dnodeId1 != pVgroup->vnodeGid[2].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[2].dnodeId &&
42✔
2460
        req.dnodeId3 != pVgroup->vnodeGid[2].dnodeId) {
32✔
2461
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[2].dnodeId;
26✔
2462
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
26!
2463
    }
2464

2465
    if (newDnodeId[0] != 0) {
42✔
2466
      pNew1 = mndAcquireDnode(pMnode, newDnodeId[0]);
41✔
2467
      if (pNew1 == NULL) {
41!
2468
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2469
        if (terrno != 0) code = terrno;
×
2470
        goto _OVER;
×
2471
      }
2472
      if (!mndIsDnodeOnline(pNew1, curMs)) {
41✔
2473
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
1✔
2474
        goto _OVER;
1✔
2475
      }
2476
    }
2477

2478
    if (newDnodeId[1] != 0) {
41✔
2479
      pNew2 = mndAcquireDnode(pMnode, newDnodeId[1]);
17✔
2480
      if (pNew2 == NULL) {
17!
2481
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2482
        if (terrno != 0) code = terrno;
×
2483
        goto _OVER;
×
2484
      }
2485
      if (!mndIsDnodeOnline(pNew2, curMs)) {
17!
2486
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2487
        goto _OVER;
×
2488
      }
2489
    }
2490

2491
    if (newDnodeId[2] != 0) {
41✔
2492
      pNew3 = mndAcquireDnode(pMnode, newDnodeId[2]);
9✔
2493
      if (pNew3 == NULL) {
9!
2494
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2495
        if (terrno != 0) code = terrno;
×
2496
        goto _OVER;
×
2497
      }
2498
      if (!mndIsDnodeOnline(pNew3, curMs)) {
9!
2499
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2500
        goto _OVER;
×
2501
      }
2502
    }
2503

2504
    if (oldDnodeId[0] != 0) {
41✔
2505
      pOld1 = mndAcquireDnode(pMnode, oldDnodeId[0]);
40✔
2506
      if (pOld1 == NULL) {
40!
2507
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2508
        if (terrno != 0) code = terrno;
×
2509
        goto _OVER;
×
2510
      }
2511
      if (!mndIsDnodeOnline(pOld1, curMs)) {
40✔
2512
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
1✔
2513
        goto _OVER;
1✔
2514
      }
2515
    }
2516

2517
    if (oldDnodeId[1] != 0) {
40✔
2518
      pOld2 = mndAcquireDnode(pMnode, oldDnodeId[1]);
16✔
2519
      if (pOld2 == NULL) {
16!
2520
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2521
        if (terrno != 0) code = terrno;
×
2522
        goto _OVER;
×
2523
      }
2524
      if (!mndIsDnodeOnline(pOld2, curMs)) {
16!
2525
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2526
        goto _OVER;
×
2527
      }
2528
    }
2529

2530
    if (oldDnodeId[2] != 0) {
40✔
2531
      pOld3 = mndAcquireDnode(pMnode, oldDnodeId[2]);
8✔
2532
      if (pOld3 == NULL) {
8!
2533
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2534
        if (terrno != 0) code = terrno;
×
2535
        goto _OVER;
×
2536
      }
2537
      if (!mndIsDnodeOnline(pOld3, curMs)) {
8!
2538
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2539
        goto _OVER;
×
2540
      }
2541
    }
2542

2543
    if (pNew1 == NULL && pOld1 == NULL && pNew2 == NULL && pOld2 == NULL && pNew3 == NULL && pOld3 == NULL) {
40!
2544
      // terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
2545
      code = 0;
1✔
2546
      goto _OVER;
1✔
2547
    }
2548

2549
    code = mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, pNew2, pOld2, pNew3, pOld3);
39✔
2550

2551
  } else {
2552
    code = TSDB_CODE_MND_REQ_REJECTED;
×
2553
    goto _OVER;
×
2554
  }
2555

2556
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
80✔
2557

2558
  char obj[33] = {0};
80✔
2559
  sprintf(obj, "%d", req.vgId);
80✔
2560

2561
  auditRecord(pReq, pMnode->clusterId, "RedistributeVgroup", "", obj, req.sql, req.sqlLen);
80✔
2562

2563
_OVER:
95✔
2564
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
95✔
2565
    mError("vgId:%d, failed to redistribute to dnode %d:%d:%d since %s", req.vgId, req.dnodeId1, req.dnodeId2,
16!
2566
           req.dnodeId3, tstrerror(code));
2567
  }
2568

2569
  mndReleaseDnode(pMnode, pNew1);
95✔
2570
  mndReleaseDnode(pMnode, pNew2);
95✔
2571
  mndReleaseDnode(pMnode, pNew3);
95✔
2572
  mndReleaseDnode(pMnode, pOld1);
95✔
2573
  mndReleaseDnode(pMnode, pOld2);
95✔
2574
  mndReleaseDnode(pMnode, pOld3);
95✔
2575
  mndReleaseVgroup(pMnode, pVgroup);
95✔
2576
  mndReleaseDb(pMnode, pDb);
95✔
2577
  tFreeSRedistributeVgroupReq(&req);
95✔
2578

2579
  TAOS_RETURN(code);
95✔
2580
}
2581

2582
static void *mndBuildSForceBecomeFollowerReq(SMnode *pMnode, SVgObj *pVgroup, int32_t dnodeId, int32_t *pContLen) {
2✔
2583
  SForceBecomeFollowerReq balanceReq = {
2✔
2584
      .vgId = pVgroup->vgId,
2✔
2585
  };
2586

2587
  int32_t contLen = tSerializeSForceBecomeFollowerReq(NULL, 0, &balanceReq);
2✔
2588
  if (contLen < 0) {
2!
2589
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2590
    return NULL;
×
2591
  }
2592
  contLen += sizeof(SMsgHead);
2✔
2593

2594
  void *pReq = taosMemoryMalloc(contLen);
2✔
2595
  if (pReq == NULL) {
2!
2596
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2597
    return NULL;
×
2598
  }
2599

2600
  SMsgHead *pHead = pReq;
2✔
2601
  pHead->contLen = htonl(contLen);
2✔
2602
  pHead->vgId = htonl(pVgroup->vgId);
2✔
2603

2604
  if (tSerializeSForceBecomeFollowerReq((char *)pReq + sizeof(SMsgHead), contLen, &balanceReq) < 0) {
2!
2605
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2606
    taosMemoryFree(pReq);
×
2607
    return NULL;
×
2608
  }
2609
  *pContLen = contLen;
2✔
2610
  return pReq;
2✔
2611
}
2612

2613
int32_t mndAddBalanceVgroupLeaderAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t dnodeId) {
2✔
2614
  int32_t    code = 0;
2✔
2615
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
2✔
2616
  if (pDnode == NULL) {
2!
2617
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2618
    if (terrno != 0) code = terrno;
×
2619
    TAOS_RETURN(code);
×
2620
  }
2621

2622
  STransAction action = {0};
2✔
2623
  action.epSet = mndGetDnodeEpset(pDnode);
2✔
2624
  mndReleaseDnode(pMnode, pDnode);
2✔
2625

2626
  int32_t contLen = 0;
2✔
2627
  void   *pReq = mndBuildSForceBecomeFollowerReq(pMnode, pVgroup, dnodeId, &contLen);
2✔
2628
  if (pReq == NULL) {
2!
2629
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2630
    if (terrno != 0) code = terrno;
×
2631
    TAOS_RETURN(code);
×
2632
  }
2633

2634
  action.pCont = pReq;
2✔
2635
  action.contLen = contLen;
2✔
2636
  action.msgType = TDMT_SYNC_FORCE_FOLLOWER;
2✔
2637

2638
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
2!
2639
    taosMemoryFree(pReq);
×
2640
    TAOS_RETURN(code);
×
2641
  }
2642

2643
  TAOS_RETURN(code);
2✔
2644
}
2645

2646
int32_t mndAddVgroupBalanceToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTrans) {
4✔
2647
  int32_t code = 0;
4✔
2648
  SSdb   *pSdb = pMnode->pSdb;
4✔
2649

2650
  int32_t vgid = pVgroup->vgId;
4✔
2651
  int8_t  replica = pVgroup->replica;
4✔
2652

2653
  if (pVgroup->replica <= 1) {
4✔
2654
    mInfo("trans:%d, vgid:%d no need to balance, replica:%d", pTrans->id, vgid, replica);
1!
2655
    return -1;
1✔
2656
  }
2657

2658
  int32_t dnodeId = 0;
3✔
2659

2660
  for (int i = 0; i < replica; i++) {
7✔
2661
    if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEADER) {
6✔
2662
      dnodeId = pVgroup->vnodeGid[i].dnodeId;
2✔
2663
      break;
2✔
2664
    }
2665
  }
2666

2667
  bool       exist = false;
3✔
2668
  bool       online = false;
3✔
2669
  int64_t    curMs = taosGetTimestampMs();
3✔
2670
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
3✔
2671
  if (pDnode != NULL) {
3✔
2672
    exist = true;
2✔
2673
    online = mndIsDnodeOnline(pDnode, curMs);
2✔
2674
    mndReleaseDnode(pMnode, pDnode);
2✔
2675
  }
2676

2677
  if (exist && online) {
5!
2678
    mInfo("trans:%d, vgid:%d leader to dnode:%d", pTrans->id, vgid, dnodeId);
2!
2679

2680
    if ((code = mndAddBalanceVgroupLeaderAction(pMnode, pTrans, pVgroup, dnodeId)) != 0) {
2!
2681
      mError("trans:%d, vgid:%d failed to be balanced to dnode:%d", pTrans->id, vgid, dnodeId);
×
2682
      TAOS_RETURN(code);
×
2683
    }
2684

2685
    SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
2✔
2686
    if (pDb == NULL) {
2!
2687
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2688
      if (terrno != 0) code = terrno;
×
2689
      mError("trans:%d, vgid:%d failed to be balanced to dnode:%d, because db not exist", pTrans->id, vgid, dnodeId);
×
2690
      TAOS_RETURN(code);
×
2691
    }
2692

2693
    mndReleaseDb(pMnode, pDb);
2✔
2694
  } else {
2695
    mInfo("trans:%d, vgid:%d cant be balanced to dnode:%d, exist:%d, online:%d", pTrans->id, vgid, dnodeId, exist,
1!
2696
          online);
2697
  }
2698

2699
  TAOS_RETURN(code);
3✔
2700
}
2701

2702
extern int32_t mndProcessVgroupBalanceLeaderMsgImp(SRpcMsg *pReq);
2703

2704
int32_t mndProcessVgroupBalanceLeaderMsg(SRpcMsg *pReq) { return mndProcessVgroupBalanceLeaderMsgImp(pReq); }
4✔
2705

2706
#ifndef TD_ENTERPRISE
2707
int32_t mndProcessVgroupBalanceLeaderMsgImp(SRpcMsg *pReq) { return 0; }
2708
#endif
2709

2710
static int32_t mndCheckDnodeMemory(SMnode *pMnode, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pOldVgroup,
696✔
2711
                                   SVgObj *pNewVgroup, SArray *pArray) {
2712
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
1,630✔
2713
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
934✔
2714
    bool       inVgroup = false;
934✔
2715
    for (int32_t j = 0; j < pOldVgroup->replica; ++j) {
2,492✔
2716
      SVnodeGid *pVgId = &pOldVgroup->vnodeGid[i];
1,558✔
2717
      if (pDnode->id == pVgId->dnodeId) {
1,558✔
2718
        pDnode->memUsed -= mndGetVgroupMemory(pMnode, pOldDb, pOldVgroup);
1,520✔
2719
        inVgroup = true;
1,520✔
2720
      }
2721
    }
2722
    for (int32_t j = 0; j < pNewVgroup->replica; ++j) {
2,492✔
2723
      SVnodeGid *pVgId = &pNewVgroup->vnodeGid[i];
1,558✔
2724
      if (pDnode->id == pVgId->dnodeId) {
1,558✔
2725
        pDnode->memUsed += mndGetVgroupMemory(pMnode, pNewDb, pNewVgroup);
1,520✔
2726
        inVgroup = true;
1,520✔
2727
      }
2728
    }
2729
    if (pDnode->memAvail - pDnode->memUsed <= 0) {
934!
2730
      mError("db:%s, vgId:%d, no enough memory in dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName,
×
2731
             pNewVgroup->vgId, pDnode->id, pDnode->memAvail, pDnode->memUsed);
2732
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE);
×
2733
    } else if (inVgroup) {
934✔
2734
      mInfo("db:%s, vgId:%d, memory in dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName, pNewVgroup->vgId,
896!
2735
            pDnode->id, pDnode->memAvail, pDnode->memUsed);
2736
    } else {
2737
    }
2738
  }
2739
  return 0;
696✔
2740
}
2741

2742
int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
776✔
2743
                                  SArray *pArray, SVgObj *pNewVgroup) {
2744
  int32_t code = 0;
776✔
2745
  memcpy(pNewVgroup, pVgroup, sizeof(SVgObj));
776✔
2746

2747
  if (pVgroup->replica <= 0 || pVgroup->replica == pNewDb->cfg.replications) {
776!
2748
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfigAction(pMnode, pTrans, pNewDb, pVgroup));
696!
2749
    TAOS_CHECK_RETURN(mndCheckDnodeMemory(pMnode, pOldDb, pNewDb, pNewVgroup, pVgroup, pArray));
696!
2750
    return 0;
696✔
2751
  }
2752

2753
  mndTransSetSerial(pTrans);
80✔
2754

2755
  if (pNewVgroup->replica == 1 && pNewDb->cfg.replications == 3) {
80!
2756
    mInfo("db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId,
76!
2757
          pVgroup->vnodeGid[0].dnodeId);
2758

2759
    // add second
2760
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
76!
2761

2762
    // learner stage
2763
    pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
76✔
2764
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
76✔
2765
    TAOS_CHECK_RETURN(
76!
2766
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
2767

2768
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &pNewVgroup->vnodeGid[1]));
76!
2769

2770
    // follower stage
2771
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
76✔
2772
    TAOS_CHECK_RETURN(mndAddAlterVnodeTypeAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
76!
2773
    TAOS_CHECK_RETURN(
76!
2774
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
2775

2776
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
76!
2777

2778
    // add third
2779
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
76✔
2780

2781
    pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
66✔
2782
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
66✔
2783
    pNewVgroup->vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
66✔
2784
    TAOS_CHECK_RETURN(
66!
2785
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
2786
    TAOS_CHECK_RETURN(
66!
2787
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
2788
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &pNewVgroup->vnodeGid[2]));
66!
2789

2790
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
66!
2791
  } else if (pNewVgroup->replica == 3 && pNewDb->cfg.replications == 1) {
8!
2792
    mInfo("db:%s, vgId:%d, will remove 2 vnodes, vn:0 dnode:%d vn:1 dnode:%d vn:2 dnode:%d", pVgroup->dbName,
4!
2793
          pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId, pVgroup->vnodeGid[1].dnodeId, pVgroup->vnodeGid[2].dnodeId);
2794

2795
    SVnodeGid del1 = {0};
4✔
2796
    SVnodeGid del2 = {0};
4✔
2797
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroup(pMnode, pTrans, pNewVgroup, pArray, &del1));
4!
2798
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &del1, true));
4!
2799
    TAOS_CHECK_RETURN(
4!
2800
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
2801
    TAOS_CHECK_RETURN(
4!
2802
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
2803
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
4!
2804

2805
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroup(pMnode, pTrans, pNewVgroup, pArray, &del2));
4!
2806
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &del2, true));
4!
2807
    TAOS_CHECK_RETURN(
4!
2808
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
2809
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
4!
2810
  } else if (pNewVgroup->replica == 1 && pNewDb->cfg.replications == 2) {
×
2811
    mInfo("db:%s, vgId:%d, will add 1 vnode, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId,
×
2812
          pVgroup->vnodeGid[0].dnodeId);
2813

2814
    // add second
2815
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
×
2816

2817
    // learner stage
2818
    pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2819
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
2820
    TAOS_CHECK_RETURN(
×
2821
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
2822

2823
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &pNewVgroup->vnodeGid[1]));
×
2824

2825
    // follower stage
2826
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2827
    TAOS_CHECK_RETURN(mndAddAlterVnodeTypeAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
×
2828
    TAOS_CHECK_RETURN(
×
2829
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
2830

2831
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
×
2832
  } else {
2833
    return -1;
×
2834
  }
2835

2836
  mndSortVnodeGid(pNewVgroup);
70✔
2837

2838
  {
2839
    SSdbRaw *pVgRaw = mndVgroupActionEncode(pNewVgroup);
70✔
2840
    if (pVgRaw == NULL) {
70!
2841
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2842
      if (terrno != 0) code = terrno;
×
2843
      TAOS_RETURN(code);
×
2844
    }
2845
    if ((code = mndTransAppendCommitlog(pTrans, pVgRaw)) != 0) {
70!
2846
      sdbFreeRaw(pVgRaw);
×
2847
      TAOS_RETURN(code);
×
2848
    }
2849
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
70✔
2850
    if (code != 0) {
70!
2851
      mError("vgId:%d, failed to set raw status since %s at line:%d", pNewVgroup->vgId, tstrerror(code), __LINE__);
×
2852
      TAOS_RETURN(code);
×
2853
    }
2854
  }
2855

2856
  TAOS_RETURN(code);
70✔
2857
}
2858

2859
int32_t mndBuildRaftAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
×
2860
                                      SArray *pArray) {
2861
  int32_t code = 0;
×
2862
  SVgObj  newVgroup = {0};
×
2863
  memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
×
2864

2865
  if (pVgroup->replica <= 0 || pVgroup->replica == pNewDb->cfg.replications) {
×
2866
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfigAction(pMnode, pTrans, pNewDb, pVgroup));
×
2867
    TAOS_CHECK_RETURN(mndCheckDnodeMemory(pMnode, pOldDb, pNewDb, &newVgroup, pVgroup, pArray));
×
2868
    return 0;
×
2869
  }
2870

2871
  mndTransSetSerial(pTrans);
×
2872

2873
  mInfo("trans:%d, vgId:%d, alter vgroup, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id, pVgroup->vgId,
×
2874
        pVgroup->syncConfChangeVer, pVgroup->version, pVgroup->replica);
2875

2876
  if (newVgroup.replica == 1 && pNewDb->cfg.replications == 3) {
×
2877
    mInfo("db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId,
×
2878
          pVgroup->vnodeGid[0].dnodeId);
2879

2880
    // add second
2881
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVgroup, pArray));
×
2882
    // add third
2883
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVgroup, pArray));
×
2884

2885
    // add learner stage
2886
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2887
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
2888
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
2889
    TAOS_CHECK_RETURN(
×
2890
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
2891
    mInfo("trans:%d, vgId:%d, add change config, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id,
×
2892
          pVgroup->vgId, newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
2893
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[1]));
×
2894
    mInfo("trans:%d, vgId:%d, create vnode, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id, pVgroup->vgId,
×
2895
          newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
2896
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[2]));
×
2897
    mInfo("trans:%d, vgId:%d, create vnode, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id, pVgroup->vgId,
×
2898
          newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
2899

2900
    // check learner
2901
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2902
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2903
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2904
    TAOS_CHECK_RETURN(
×
2905
        mndAddCheckLearnerCatchupAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[1].dnodeId));
2906
    TAOS_CHECK_RETURN(
×
2907
        mndAddCheckLearnerCatchupAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[2].dnodeId));
2908

2909
    // change raft type
2910
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2911
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2912
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
2913
    TAOS_CHECK_RETURN(
×
2914
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
2915

2916
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
2917

2918
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2919
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2920
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2921
    TAOS_CHECK_RETURN(
×
2922
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
2923

2924
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
2925

2926
    SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
×
2927
    if (pVgRaw == NULL) {
×
2928
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2929
      if (terrno != 0) code = terrno;
×
2930
      TAOS_RETURN(code);
×
2931
    }
2932
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
2933
      sdbFreeRaw(pVgRaw);
×
2934
      TAOS_RETURN(code);
×
2935
    }
2936
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
2937
    if (code != 0) {
×
2938
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
2939
             __LINE__);
2940
      TAOS_RETURN(code);
×
2941
    }
2942
  } else if (newVgroup.replica == 3 && pNewDb->cfg.replications == 1) {
×
2943
    mInfo("db:%s, vgId:%d, will remove 2 vnodes, vn:0 dnode:%d vn:1 dnode:%d vn:2 dnode:%d", pVgroup->dbName,
×
2944
          pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId, pVgroup->vnodeGid[1].dnodeId, pVgroup->vnodeGid[2].dnodeId);
2945

2946
    SVnodeGid del1 = {0};
×
2947
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroupWithoutSave(pMnode, pTrans, &newVgroup, pArray, &del1));
×
2948

2949
    TAOS_CHECK_RETURN(
×
2950
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
2951

2952
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
2953

2954
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del1, true));
×
2955

2956
    SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
×
2957
    if (pVgRaw == NULL) {
×
2958
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2959
      if (terrno != 0) code = terrno;
×
2960
      TAOS_RETURN(code);
×
2961
    }
2962
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
2963
      sdbFreeRaw(pVgRaw);
×
2964
      TAOS_RETURN(code);
×
2965
    }
2966
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
2967
    if (code != 0) {
×
2968
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
2969
             __LINE__);
2970
      TAOS_RETURN(code);
×
2971
    }
2972

2973
    SVnodeGid del2 = {0};
×
2974
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroupWithoutSave(pMnode, pTrans, &newVgroup, pArray, &del2));
×
2975

2976
    TAOS_CHECK_RETURN(
×
2977
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
2978

2979
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
2980

2981
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del2, true));
×
2982

2983
    pVgRaw = mndVgroupActionEncode(&newVgroup);
×
2984
    if (pVgRaw == NULL) {
×
2985
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2986
      if (terrno != 0) code = terrno;
×
2987
      TAOS_RETURN(code);
×
2988
    }
2989
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
2990
      sdbFreeRaw(pVgRaw);
×
2991
      TAOS_RETURN(code);
×
2992
    }
2993
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
2994
    if (code != 0) {
×
2995
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
2996
             __LINE__);
2997
      TAOS_RETURN(code);
×
2998
    }
2999
  } else {
3000
    return -1;
×
3001
  }
3002

3003
  mndSortVnodeGid(&newVgroup);
×
3004

3005
  {
3006
    SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
×
3007
    if (pVgRaw == NULL) {
×
3008
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3009
      if (terrno != 0) code = terrno;
×
3010
      TAOS_RETURN(code);
×
3011
    }
3012
    if ((code = mndTransAppendCommitlog(pTrans, pVgRaw)) != 0) {
×
3013
      sdbFreeRaw(pVgRaw);
×
3014
      TAOS_RETURN(code);
×
3015
    }
3016
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3017
    if (code != 0) {
×
3018
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3019
             __LINE__);
3020
      TAOS_RETURN(code);
×
3021
    }
3022
  }
3023

3024
  TAOS_RETURN(code);
×
3025
}
3026

3027
int32_t mndBuildRestoreAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *db, SVgObj *pVgroup, SDnodeObj *pDnode,
10✔
3028
                                         SDnodeObj *pAnotherDnode) {
3029
  int32_t code = 0;
10✔
3030
  SVgObj  newVgroup = {0};
10✔
3031
  memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
10✔
3032

3033
  mInfo("db:%s, vgId:%d, restore vnodes, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId);
10!
3034

3035
  if (newVgroup.replica == 1) {
10!
3036
    int selected = 0;
×
3037
    for (int i = 0; i < newVgroup.replica; i++) {
×
3038
      newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3039
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3040
        selected = i;
×
3041
      }
3042
    }
3043
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, &newVgroup.vnodeGid[selected]));
×
3044
  } else if (newVgroup.replica == 2) {
10!
3045
    for (int i = 0; i < newVgroup.replica; i++) {
×
3046
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3047
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3048
      } else {
3049
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3050
      }
3051
    }
3052
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pAnotherDnode));
×
3053

3054
    for (int i = 0; i < newVgroup.replica; i++) {
×
3055
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3056
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3057
      } else {
3058
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3059
      }
3060
    }
3061
    TAOS_CHECK_RETURN(mndRestoreAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, pDnode));
×
3062

3063
    for (int i = 0; i < newVgroup.replica; i++) {
×
3064
      newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3065
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3066
      }
3067
    }
3068
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pDnode));
×
3069
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pAnotherDnode));
×
3070
  } else if (newVgroup.replica == 3) {
10!
3071
    for (int i = 0; i < newVgroup.replica; i++) {
40✔
3072
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
30✔
3073
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_LEARNER;
10✔
3074
      } else {
3075
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
20✔
3076
      }
3077
    }
3078
    TAOS_CHECK_RETURN(mndRestoreAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, pDnode));
10!
3079

3080
    for (int i = 0; i < newVgroup.replica; i++) {
40✔
3081
      newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
30✔
3082
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
30✔
3083
      }
3084
    }
3085
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pDnode));
10!
3086
  }
3087
  SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
10✔
3088
  if (pVgRaw == NULL) {
10!
3089
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3090
    if (terrno != 0) code = terrno;
×
3091
    TAOS_RETURN(code);
×
3092
  }
3093
  if ((code = mndTransAppendCommitlog(pTrans, pVgRaw)) != 0) {
10!
3094
    sdbFreeRaw(pVgRaw);
×
3095
    TAOS_RETURN(code);
×
3096
  }
3097
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
10✔
3098
  if (code != 0) {
10!
3099
    mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code), __LINE__);
×
3100
    TAOS_RETURN(code);
×
3101
  }
3102

3103
  TAOS_RETURN(code);
10✔
3104
}
3105

3106
static int32_t mndAddAdjustVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
×
3107
  return 0;
×
3108
}
3109

3110
typedef int32_t (*FpTransActionCb)(STrans *pTrans, SSdbRaw *pRaw);
3111

3112
static int32_t mndAddVgStatusAction(STrans *pTrans, SVgObj *pVg, ESdbStatus vgStatus, ETrnStage stage) {
256✔
3113
  int32_t         code = 0;
256✔
3114
  FpTransActionCb appendActionCb = (stage == TRN_STAGE_COMMIT_ACTION) ? mndTransAppendCommitlog : mndTransAppendRedolog;
256✔
3115
  SSdbRaw        *pRaw = mndVgroupActionEncode(pVg);
256✔
3116
  if (pRaw == NULL) {
256!
3117
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3118
    if (terrno != 0) code = terrno;
×
3119
    goto _err;
×
3120
  }
3121
  if ((code = appendActionCb(pTrans, pRaw)) != 0) goto _err;
256!
3122
  code = sdbSetRawStatus(pRaw, vgStatus);
256✔
3123
  if (code != 0) {
256!
3124
    mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", pVg->vgId, tstrerror(code), __LINE__);
×
3125
    goto _err;
×
3126
  }
3127
  pRaw = NULL;
256✔
3128
  TAOS_RETURN(code);
256✔
3129
_err:
×
3130
  sdbFreeRaw(pRaw);
×
3131
  TAOS_RETURN(code);
×
3132
}
3133

3134
static int32_t mndAddDbStatusAction(STrans *pTrans, SDbObj *pDb, ESdbStatus dbStatus, ETrnStage stage) {
98✔
3135
  int32_t         code = 0;
98✔
3136
  FpTransActionCb appendActionCb = (stage == TRN_STAGE_COMMIT_ACTION) ? mndTransAppendCommitlog : mndTransAppendRedolog;
98✔
3137
  SSdbRaw        *pRaw = mndDbActionEncode(pDb);
98✔
3138
  if (pRaw == NULL) {
98!
3139
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3140
    if (terrno != 0) code = terrno;
×
3141
    goto _err;
×
3142
  }
3143
  if ((code = appendActionCb(pTrans, pRaw)) != 0) goto _err;
98!
3144
  code = sdbSetRawStatus(pRaw, dbStatus);
98✔
3145
  if (code != 0) {
98!
3146
    mError("db:%s, failed to set raw status to ready, error:%s, line:%d", pDb->name, tstrerror(code), __LINE__);
×
3147
    goto _err;
×
3148
  }
3149
  pRaw = NULL;
98✔
3150
  TAOS_RETURN(code);
98✔
3151
_err:
×
3152
  sdbFreeRaw(pRaw);
×
3153
  TAOS_RETURN(code);
×
3154
}
3155

3156
int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) {
55✔
3157
  int32_t code = -1;
55✔
3158
  STrans *pTrans = NULL;
55✔
3159
  SDbObj  dbObj = {0};
55✔
3160
  SArray *pArray = mndBuildDnodesArray(pMnode, 0, NULL);
55✔
3161

3162
  int32_t numOfStreams = 0;
55✔
3163
  if ((code = mndGetNumOfStreams(pMnode, pDb->name, &numOfStreams)) != 0) {
55!
3164
    goto _OVER;
×
3165
  }
3166
  if (numOfStreams > 0) {
55!
3167
    code = TSDB_CODE_MND_STREAM_MUST_BE_DELETED;
×
3168
    goto _OVER;
×
3169
  }
3170

3171
#if defined(USE_S3)
3172
  extern int8_t tsS3Enabled;
3173
  if (tsS3Enabled) {
55!
3174
    code = TSDB_CODE_OPS_NOT_SUPPORT;
×
3175
    mError("vgId:%d, db:%s, s3 exists, split vgroup not allowed", pVgroup->vgId, pVgroup->dbName);
×
3176
    goto _OVER;
×
3177
  }
3178
#endif
3179

3180
  if (pDb->cfg.withArbitrator) {
55!
3181
    code = TSDB_CODE_OPS_NOT_SUPPORT;
×
3182
    mError("vgId:%d, db:%s, with arbitrator, split vgroup not allowed", pVgroup->vgId, pVgroup->dbName);
×
3183
    goto _OVER;
×
3184
  }
3185

3186
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "split-vgroup");
55✔
3187
  if (pTrans == NULL) {
55!
3188
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3189
    if (terrno != 0) code = terrno;
×
3190
    goto _OVER;
×
3191
  }
3192
  mndTransSetSerial(pTrans);
55✔
3193
  mInfo("trans:%d, used to split vgroup, vgId:%d", pTrans->id, pVgroup->vgId);
55!
3194

3195
  mndTransSetDbName(pTrans, pDb->name, NULL);
55✔
3196
  TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
55✔
3197

3198
  SVgObj newVg1 = {0};
54✔
3199
  memcpy(&newVg1, pVgroup, sizeof(SVgObj));
54✔
3200
  mInfo("vgId:%d, vgroup info before split, replica:%d hashBegin:%u hashEnd:%u", newVg1.vgId, newVg1.replica,
54!
3201
        newVg1.hashBegin, newVg1.hashEnd);
3202
  for (int32_t i = 0; i < newVg1.replica; ++i) {
166✔
3203
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg1.vgId, i, newVg1.vnodeGid[i].dnodeId);
112!
3204
  }
3205

3206
  if (newVg1.replica == 1) {
54✔
3207
    TAOS_CHECK_GOTO(mndAddVnodeToVgroup(pMnode, pTrans, &newVg1, pArray), NULL, _OVER);
25!
3208

3209
    newVg1.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
25✔
3210
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId), NULL,
25!
3211
                    _OVER);
3212
    TAOS_CHECK_GOTO(mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg1, &newVg1.vnodeGid[1]), NULL, _OVER);
25!
3213

3214
    newVg1.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
25✔
3215
    TAOS_CHECK_GOTO(mndAddAlterVnodeTypeAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[1].dnodeId), NULL, _OVER);
25!
3216
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId), NULL,
25!
3217
                    _OVER);
3218

3219
    TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1), NULL, _OVER);
25!
3220
  } else if (newVg1.replica == 3) {
29!
3221
    SVnodeGid del1 = {0};
29✔
3222
    TAOS_CHECK_GOTO(mndRemoveVnodeFromVgroup(pMnode, pTrans, &newVg1, pArray, &del1), NULL, _OVER);
29!
3223
    TAOS_CHECK_GOTO(mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg1, &del1, true), NULL, _OVER);
29!
3224
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId), NULL,
29!
3225
                    _OVER);
3226
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[1].dnodeId), NULL,
29!
3227
                    _OVER);
3228
  } else {
UNCOV
3229
    goto _OVER;
×
3230
  }
3231

3232
  for (int32_t i = 0; i < newVg1.replica; ++i) {
162✔
3233
    TAOS_CHECK_GOTO(mndAddDisableVnodeWriteAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[i].dnodeId), NULL,
108!
3234
                    _OVER);
3235
  }
3236
  TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1), NULL, _OVER);
54!
3237

3238
  SVgObj newVg2 = {0};
54✔
3239
  memcpy(&newVg2, &newVg1, sizeof(SVgObj));
54✔
3240
  newVg1.replica = 1;
54✔
3241
  newVg1.hashEnd = newVg1.hashBegin / 2 + newVg1.hashEnd / 2;
54✔
3242
  memset(&newVg1.vnodeGid[1], 0, sizeof(SVnodeGid));
54✔
3243

3244
  newVg2.replica = 1;
54✔
3245
  newVg2.hashBegin = newVg1.hashEnd + 1;
54✔
3246
  memcpy(&newVg2.vnodeGid[0], &newVg2.vnodeGid[1], sizeof(SVnodeGid));
54✔
3247
  memset(&newVg2.vnodeGid[1], 0, sizeof(SVnodeGid));
54✔
3248

3249
  mInfo("vgId:%d, vgroup info after split, replica:%d hashrange:[%u, %u] vnode:0 dnode:%d", newVg1.vgId, newVg1.replica,
54!
3250
        newVg1.hashBegin, newVg1.hashEnd, newVg1.vnodeGid[0].dnodeId);
3251
  for (int32_t i = 0; i < newVg1.replica; ++i) {
108✔
3252
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg1.vgId, i, newVg1.vnodeGid[i].dnodeId);
54!
3253
  }
3254
  mInfo("vgId:%d, vgroup info after split, replica:%d hashrange:[%u, %u] vnode:0 dnode:%d", newVg2.vgId, newVg2.replica,
54!
3255
        newVg2.hashBegin, newVg2.hashEnd, newVg2.vnodeGid[0].dnodeId);
3256
  for (int32_t i = 0; i < newVg1.replica; ++i) {
108✔
3257
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg2.vgId, i, newVg2.vnodeGid[i].dnodeId);
54!
3258
  }
3259

3260
  // alter vgId and hash range
3261
  int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
54✔
3262
  int32_t srcVgId = newVg1.vgId;
54✔
3263
  newVg1.vgId = maxVgId;
54✔
3264
  TAOS_CHECK_GOTO(mndAddNewVgPrepareAction(pMnode, pTrans, &newVg1), NULL, _OVER);
54!
3265
  TAOS_CHECK_GOTO(mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg1), NULL, _OVER);
54!
3266

3267
  maxVgId++;
54✔
3268
  srcVgId = newVg2.vgId;
54✔
3269
  newVg2.vgId = maxVgId;
54✔
3270
  TAOS_CHECK_GOTO(mndAddNewVgPrepareAction(pMnode, pTrans, &newVg2), NULL, _OVER);
54!
3271
  TAOS_CHECK_GOTO(mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg2), NULL, _OVER);
54!
3272

3273
  TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1), NULL, _OVER);
54!
3274
  TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg2), NULL, _OVER);
54!
3275

3276
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg1, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION), NULL, _OVER);
54!
3277
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg2, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION), NULL, _OVER);
54!
3278
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, pVgroup, SDB_STATUS_DROPPED, TRN_STAGE_REDO_ACTION), NULL, _OVER);
54!
3279

3280
  // update db status
3281
  memcpy(&dbObj, pDb, sizeof(SDbObj));
54✔
3282
  if (dbObj.cfg.pRetensions != NULL) {
54!
3283
    dbObj.cfg.pRetensions = taosArrayDup(pDb->cfg.pRetensions, NULL);
×
3284
    if (dbObj.cfg.pRetensions == NULL) {
×
3285
      code = terrno;
×
UNCOV
3286
      goto _OVER;
×
3287
    }
3288
  }
3289
  dbObj.vgVersion++;
54✔
3290
  dbObj.updateTime = taosGetTimestampMs();
54✔
3291
  dbObj.cfg.numOfVgroups++;
54✔
3292
  TAOS_CHECK_GOTO(mndAddDbStatusAction(pTrans, &dbObj, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION), NULL, _OVER);
54!
3293

3294
  // adjust vgroup replica
3295
  if (pDb->cfg.replications != newVg1.replica) {
54✔
3296
    SVgObj tmpGroup = {0};
29✔
3297
    TAOS_CHECK_GOTO(mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg1, pArray, &tmpGroup), NULL, _OVER);
29!
3298
  } else {
3299
    TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg1, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
25!
3300
  }
3301

3302
  if (pDb->cfg.replications != newVg2.replica) {
54✔
3303
    SVgObj tmpGroup = {0};
29✔
3304
    TAOS_CHECK_GOTO(mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg2, pArray, &tmpGroup), NULL, _OVER);
29✔
3305
  } else {
3306
    TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg2, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
25!
3307
  }
3308

3309
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, pVgroup, SDB_STATUS_DROPPED, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
44!
3310

3311
  // commit db status
3312
  dbObj.vgVersion++;
44✔
3313
  dbObj.updateTime = taosGetTimestampMs();
44✔
3314
  TAOS_CHECK_GOTO(mndAddDbStatusAction(pTrans, &dbObj, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
44!
3315

3316
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
44!
3317
  code = 0;
44✔
3318

3319
_OVER:
55✔
3320
  taosArrayDestroy(pArray);
55✔
3321
  mndTransDrop(pTrans);
55✔
3322
  taosArrayDestroy(dbObj.cfg.pRetensions);
55✔
3323
  TAOS_RETURN(code);
55✔
3324
}
3325

3326
extern int32_t mndProcessSplitVgroupMsgImp(SRpcMsg *pReq);
3327

3328
static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) { return mndProcessSplitVgroupMsgImp(pReq); }
59✔
3329

3330
#ifndef TD_ENTERPRISE
3331
int32_t mndProcessSplitVgroupMsgImp(SRpcMsg *pReq) { return 0; }
3332
#endif
3333

3334
static int32_t mndSetBalanceVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
24✔
3335
                                              SDnodeObj *pSrc, SDnodeObj *pDst) {
3336
  int32_t code = 0;
24✔
3337
  SVgObj  newVg = {0};
24✔
3338
  memcpy(&newVg, pVgroup, sizeof(SVgObj));
24✔
3339
  mInfo("vgId:%d, vgroup info before balance, replica:%d", newVg.vgId, newVg.replica);
24!
3340
  for (int32_t i = 0; i < newVg.replica; ++i) {
68✔
3341
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
44!
3342
  }
3343

3344
  TAOS_CHECK_RETURN(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pDst->id));
24!
3345
  TAOS_CHECK_RETURN(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pSrc->id));
24!
3346

3347
  {
3348
    SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
24✔
3349
    if (pRaw == NULL) {
24!
3350
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3351
      if (terrno != 0) code = terrno;
×
UNCOV
3352
      TAOS_RETURN(code);
×
3353
    }
3354
    if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
24!
3355
      sdbFreeRaw(pRaw);
×
UNCOV
3356
      TAOS_RETURN(code);
×
3357
    }
3358
    code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
24✔
3359
    if (code != 0) {
24!
3360
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
UNCOV
3361
      TAOS_RETURN(code);
×
3362
    }
3363
  }
3364

3365
  mInfo("vgId:%d, vgroup info after balance, replica:%d", newVg.vgId, newVg.replica);
24!
3366
  for (int32_t i = 0; i < newVg.replica; ++i) {
68✔
3367
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
44!
3368
  }
3369
  TAOS_RETURN(code);
24✔
3370
}
3371

3372
static int32_t mndBalanceVgroupBetweenDnode(SMnode *pMnode, STrans *pTrans, SDnodeObj *pSrc, SDnodeObj *pDst,
24✔
3373
                                            SHashObj *pBalancedVgroups) {
3374
  void   *pIter = NULL;
24✔
3375
  int32_t code = -1;
24✔
3376
  SSdb   *pSdb = pMnode->pSdb;
24✔
3377

3378
  while (1) {
16✔
3379
    SVgObj *pVgroup = NULL;
40✔
3380
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
40✔
3381
    if (pIter == NULL) break;
40!
3382
    if (taosHashGet(pBalancedVgroups, &pVgroup->vgId, sizeof(int32_t)) != NULL) {
40✔
3383
      sdbRelease(pSdb, pVgroup);
15✔
3384
      continue;
16✔
3385
    }
3386

3387
    bool existInSrc = false;
25✔
3388
    bool existInDst = false;
25✔
3389
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
70✔
3390
      SVnodeGid *pGid = &pVgroup->vnodeGid[i];
45✔
3391
      if (pGid->dnodeId == pSrc->id) existInSrc = true;
45✔
3392
      if (pGid->dnodeId == pDst->id) existInDst = true;
45!
3393
    }
3394

3395
    if (!existInSrc || existInDst) {
25!
3396
      sdbRelease(pSdb, pVgroup);
1✔
3397
      continue;
1✔
3398
    }
3399

3400
    SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
24✔
3401
    if (pDb == NULL) {
24!
3402
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3403
      if (terrno != 0) code = terrno;
×
3404
      mError("vgId:%d, balance vgroup can't find db obj dbName:%s", pVgroup->vgId, pVgroup->dbName);
×
UNCOV
3405
      goto _OUT;
×
3406
    }
3407

3408
    if (pDb->cfg.withArbitrator) {
24!
3409
      mInfo("vgId:%d, db:%s, with arbitrator, balance vgroup not allowed", pVgroup->vgId, pVgroup->dbName);
×
UNCOV
3410
      goto _OUT;
×
3411
    }
3412

3413
    code = mndSetBalanceVgroupInfoToTrans(pMnode, pTrans, pDb, pVgroup, pSrc, pDst);
24✔
3414
    if (code == 0) {
24!
3415
      code = taosHashPut(pBalancedVgroups, &pVgroup->vgId, sizeof(int32_t), &pVgroup->vgId, sizeof(int32_t));
24✔
3416
    }
3417

UNCOV
3418
  _OUT:
×
3419
    mndReleaseDb(pMnode, pDb);
24✔
3420
    sdbRelease(pSdb, pVgroup);
24✔
3421
    sdbCancelFetch(pSdb, pIter);
24✔
3422
    break;
24✔
3423
  }
3424

3425
  return code;
24✔
3426
}
3427

3428
static int32_t mndBalanceVgroup(SMnode *pMnode, SRpcMsg *pReq, SArray *pArray) {
16✔
3429
  int32_t   code = -1;
16✔
3430
  int32_t   numOfVgroups = 0;
16✔
3431
  STrans   *pTrans = NULL;
16✔
3432
  SHashObj *pBalancedVgroups = NULL;
16✔
3433

3434
  pBalancedVgroups = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
16✔
3435
  if (pBalancedVgroups == NULL) goto _OVER;
16!
3436

3437
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "balance-vgroup");
16✔
3438
  if (pTrans == NULL) {
16!
3439
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3440
    if (terrno != 0) code = terrno;
×
UNCOV
3441
    goto _OVER;
×
3442
  }
3443
  mndTransSetSerial(pTrans);
16✔
3444
  mInfo("trans:%d, used to balance vgroup", pTrans->id);
16!
3445
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
16!
3446
  TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
16✔
3447

3448
  while (1) {
24✔
3449
    taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
39✔
3450
    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
169✔
3451
      SDnodeObj *pDnode = taosArrayGet(pArray, i);
130✔
3452
      mInfo("dnode:%d, equivalent vnodes:%d others:%d support:%d, score:%f", pDnode->id, pDnode->numOfVnodes,
130!
3453
            pDnode->numOfSupportVnodes, pDnode->numOfOtherNodes, mndGetDnodeScore(pDnode, 0, 1));
3454
    }
3455

3456
    SDnodeObj *pSrc = taosArrayGet(pArray, taosArrayGetSize(pArray) - 1);
39✔
3457
    SDnodeObj *pDst = taosArrayGet(pArray, 0);
39✔
3458

3459
    float srcScore = mndGetDnodeScore(pSrc, -1, 1);
39✔
3460
    float dstScore = mndGetDnodeScore(pDst, 1, 1);
39✔
3461
    mInfo("trans:%d, after balance, src dnode:%d score:%f, dst dnode:%d score:%f", pTrans->id, pSrc->id, dstScore,
39!
3462
          pDst->id, dstScore);
3463

3464
    if (srcScore > dstScore - 0.000001) {
39✔
3465
      code = mndBalanceVgroupBetweenDnode(pMnode, pTrans, pSrc, pDst, pBalancedVgroups);
24✔
3466
      if (code == 0) {
24!
3467
        pSrc->numOfVnodes--;
24✔
3468
        pDst->numOfVnodes++;
24✔
3469
        numOfVgroups++;
24✔
3470
        continue;
24✔
3471
      } else {
UNCOV
3472
        mInfo("trans:%d, no vgroup need to balance from dnode:%d to dnode:%d", pTrans->id, pSrc->id, pDst->id);
×
UNCOV
3473
        break;
×
3474
      }
3475
    } else {
3476
      mInfo("trans:%d, no vgroup need to balance any more", pTrans->id);
15!
3477
      break;
15✔
3478
    }
3479
  }
3480

3481
  if (numOfVgroups <= 0) {
15✔
3482
    mInfo("no need to balance vgroup");
1!
3483
    code = 0;
1✔
3484
  } else {
3485
    mInfo("start to balance vgroup, numOfVgroups:%d", numOfVgroups);
14!
3486
    if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
14!
3487
    code = TSDB_CODE_ACTION_IN_PROGRESS;
14✔
3488
  }
3489

3490
_OVER:
16✔
3491
  taosHashCleanup(pBalancedVgroups);
16✔
3492
  mndTransDrop(pTrans);
16✔
3493
  TAOS_RETURN(code);
16✔
3494
}
3495

3496
static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) {
20✔
3497
  SMnode *pMnode = pReq->info.node;
20✔
3498
  int32_t code = -1;
20✔
3499
  SArray *pArray = NULL;
20✔
3500
  void   *pIter = NULL;
20✔
3501
  int64_t curMs = taosGetTimestampMs();
20✔
3502

3503
  SBalanceVgroupReq req = {0};
20✔
3504
  if (tDeserializeSBalanceVgroupReq(pReq->pCont, pReq->contLen, &req) != 0) {
20!
UNCOV
3505
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
3506
    goto _OVER;
×
3507
  }
3508

3509
  mInfo("start to balance vgroup");
20!
3510
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_BALANCE_VGROUP)) != 0) {
20✔
3511
    goto _OVER;
1✔
3512
  }
3513

3514
  while (1) {
56✔
3515
    SDnodeObj *pDnode = NULL;
75✔
3516
    pIter = sdbFetch(pMnode->pSdb, SDB_DNODE, pIter, (void **)&pDnode);
75✔
3517
    if (pIter == NULL) break;
75✔
3518
    if (!mndIsDnodeOnline(pDnode, curMs)) {
58✔
3519
      sdbCancelFetch(pMnode->pSdb, pIter);
2✔
3520
      code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
2✔
3521
      mError("failed to balance vgroup since %s, dnode:%d", terrstr(), pDnode->id);
2!
3522
      sdbRelease(pMnode->pSdb, pDnode);
2✔
3523
      goto _OVER;
2✔
3524
    }
3525

3526
    sdbRelease(pMnode->pSdb, pDnode);
56✔
3527
  }
3528

3529
  pArray = mndBuildDnodesArray(pMnode, 0, NULL);
17✔
3530
  if (pArray == NULL) {
17!
UNCOV
3531
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
3532
    if (terrno != 0) code = terrno;
×
UNCOV
3533
    goto _OVER;
×
3534
  }
3535

3536
  if (taosArrayGetSize(pArray) < 2) {
17✔
3537
    mInfo("no need to balance vgroup since dnode num less than 2");
1!
3538
    code = 0;
1✔
3539
  } else {
3540
    code = mndBalanceVgroup(pMnode, pReq, pArray);
16✔
3541
  }
3542

3543
  auditRecord(pReq, pMnode->clusterId, "balanceVgroup", "", "", req.sql, req.sqlLen);
17✔
3544

3545
_OVER:
20✔
3546
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
20✔
3547
    mError("failed to balance vgroup since %s", tstrerror(code));
4!
3548
  }
3549

3550
  taosArrayDestroy(pArray);
20✔
3551
  tFreeSBalanceVgroupReq(&req);
20✔
3552
  TAOS_RETURN(code);
20✔
3553
}
3554

3555
bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; }
67,861,861!
3556

3557
bool mndVgroupInDnode(SVgObj *pVgroup, int32_t dnodeId) {
16✔
3558
  for (int i = 0; i < pVgroup->replica; i++) {
42✔
3559
    if (pVgroup->vnodeGid[i].dnodeId == dnodeId) return true;
36✔
3560
  }
3561
  return false;
6✔
3562
}
3563

3564
static void *mndBuildCompactVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen, int64_t compactTs,
50✔
3565
                                     STimeWindow tw) {
3566
  SCompactVnodeReq compactReq = {0};
50✔
3567
  compactReq.dbUid = pDb->uid;
50✔
3568
  compactReq.compactStartTime = compactTs;
50✔
3569
  compactReq.tw = tw;
50✔
3570
  tstrncpy(compactReq.db, pDb->name, TSDB_DB_FNAME_LEN);
50✔
3571

3572
  mInfo("vgId:%d, build compact vnode config req", pVgroup->vgId);
50!
3573
  int32_t contLen = tSerializeSCompactVnodeReq(NULL, 0, &compactReq);
50✔
3574
  if (contLen < 0) {
50!
UNCOV
3575
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
3576
    return NULL;
×
3577
  }
3578
  contLen += sizeof(SMsgHead);
50✔
3579

3580
  void *pReq = taosMemoryMalloc(contLen);
50✔
3581
  if (pReq == NULL) {
50!
UNCOV
3582
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
3583
    return NULL;
×
3584
  }
3585

3586
  SMsgHead *pHead = pReq;
50✔
3587
  pHead->contLen = htonl(contLen);
50✔
3588
  pHead->vgId = htonl(pVgroup->vgId);
50✔
3589

3590
  if (tSerializeSCompactVnodeReq((char *)pReq + sizeof(SMsgHead), contLen, &compactReq) < 0) {
50!
UNCOV
3591
    taosMemoryFree(pReq);
×
UNCOV
3592
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
3593
    return NULL;
×
3594
  }
3595
  *pContLen = contLen;
50✔
3596
  return pReq;
50✔
3597
}
3598

3599
static int32_t mndAddCompactVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs,
50✔
3600
                                        STimeWindow tw) {
3601
  int32_t      code = 0;
50✔
3602
  STransAction action = {0};
50✔
3603
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
50✔
3604

3605
  int32_t contLen = 0;
50✔
3606
  void   *pReq = mndBuildCompactVnodeReq(pMnode, pDb, pVgroup, &contLen, compactTs, tw);
50✔
3607
  if (pReq == NULL) {
50!
UNCOV
3608
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
3609
    if (terrno != 0) code = terrno;
×
UNCOV
3610
    TAOS_RETURN(code);
×
3611
  }
3612

3613
  action.pCont = pReq;
50✔
3614
  action.contLen = contLen;
50✔
3615
  action.msgType = TDMT_VND_COMPACT;
50✔
3616

3617
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
50!
UNCOV
3618
    taosMemoryFree(pReq);
×
UNCOV
3619
    TAOS_RETURN(code);
×
3620
  }
3621

3622
  TAOS_RETURN(code);
50✔
3623
}
3624

3625
int32_t mndBuildCompactVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs,
50✔
3626
                                    STimeWindow tw) {
3627
  TAOS_CHECK_RETURN(mndAddCompactVnodeAction(pMnode, pTrans, pDb, pVgroup, compactTs, tw));
50!
3628
  return 0;
50✔
3629
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc