• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3621

22 Feb 2025 11:44AM UTC coverage: 2.037% (-61.5%) from 63.573%
#3621

push

travis-ci

web-flow
Merge pull request #29874 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

4357 of 287032 branches covered (1.52%)

Branch coverage included in aggregate %.

0 of 174 new or added lines in 18 files covered. (0.0%)

213359 existing lines in 469 files now uncovered.

7260 of 283369 relevant lines covered (2.56%)

23737.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/mnode/impl/src/mndVgroup.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "mndArbGroup.h"
19
#include "mndDb.h"
20
#include "mndDnode.h"
21
#include "mndMnode.h"
22
#include "mndPrivilege.h"
23
#include "mndShow.h"
24
#include "mndStb.h"
25
#include "mndStream.h"
26
#include "mndTopic.h"
27
#include "mndTrans.h"
28
#include "mndUser.h"
29
#include "mndVgroup.h"
30
#include "tmisce.h"
31

32
#define VGROUP_VER_NUMBER   1
33
#define VGROUP_RESERVE_SIZE 64
34

35
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup);
36
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup);
37
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew);
38
static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw);
39

40
static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
41
static void    mndCancelGetNextVgroup(SMnode *pMnode, void *pIter);
42
static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
43
static void    mndCancelGetNextVnode(SMnode *pMnode, void *pIter);
44

45
static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq);
46
static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq);
47
static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq);
48
static int32_t mndProcessVgroupBalanceLeaderMsg(SRpcMsg *pReq);
49

UNCOV
50
int32_t mndInitVgroup(SMnode *pMnode) {
×
UNCOV
51
  SSdbTable table = {
×
52
      .sdbType = SDB_VGROUP,
53
      .keyType = SDB_KEY_INT32,
54
      .encodeFp = (SdbEncodeFp)mndVgroupActionEncode,
55
      .decodeFp = (SdbDecodeFp)mndVgroupActionDecode,
56
      .insertFp = (SdbInsertFp)mndVgroupActionInsert,
57
      .updateFp = (SdbUpdateFp)mndVgroupActionUpdate,
58
      .deleteFp = (SdbDeleteFp)mndVgroupActionDelete,
59
      .validateFp = (SdbValidateFp)mndNewVgActionValidate,
60
  };
61

UNCOV
62
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndTransProcessRsp);
×
UNCOV
63
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_REPLICA_RSP, mndTransProcessRsp);
×
UNCOV
64
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIG_RSP, mndTransProcessRsp);
×
UNCOV
65
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIRM_RSP, mndTransProcessRsp);
×
UNCOV
66
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_HASHRANGE_RSP, mndTransProcessRsp);
×
UNCOV
67
  mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndTransProcessRsp);
×
UNCOV
68
  mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_RSP, mndTransProcessRsp);
×
UNCOV
69
  mndSetMsgHandle(pMnode, TDMT_VND_DISABLE_WRITE_RSP, mndTransProcessRsp);
×
UNCOV
70
  mndSetMsgHandle(pMnode, TDMT_SYNC_FORCE_FOLLOWER_RSP, mndTransProcessRsp);
×
UNCOV
71
  mndSetMsgHandle(pMnode, TDMT_DND_ALTER_VNODE_TYPE_RSP, mndTransProcessRsp);
×
UNCOV
72
  mndSetMsgHandle(pMnode, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP_RSP, mndTransProcessRsp);
×
UNCOV
73
  mndSetMsgHandle(pMnode, TDMT_SYNC_CONFIG_CHANGE_RSP, mndTransProcessRsp);
×
74

UNCOV
75
  mndSetMsgHandle(pMnode, TDMT_MND_REDISTRIBUTE_VGROUP, mndProcessRedistributeVgroupMsg);
×
UNCOV
76
  mndSetMsgHandle(pMnode, TDMT_MND_SPLIT_VGROUP, mndProcessSplitVgroupMsg);
×
77
  // mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessVgroupBalanceLeaderMsg);
UNCOV
78
  mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessBalanceVgroupMsg);
×
UNCOV
79
  mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP_LEADER, mndProcessVgroupBalanceLeaderMsg);
×
80

UNCOV
81
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndRetrieveVgroups);
×
UNCOV
82
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndCancelGetNextVgroup);
×
UNCOV
83
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndRetrieveVnodes);
×
UNCOV
84
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndCancelGetNextVnode);
×
85

UNCOV
86
  return sdbSetTable(pMnode->pSdb, table);
×
87
}
88

UNCOV
89
void mndCleanupVgroup(SMnode *pMnode) {}
×
90

UNCOV
91
SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) {
×
UNCOV
92
  int32_t code = 0;
×
UNCOV
93
  int32_t lino = 0;
×
UNCOV
94
  terrno = TSDB_CODE_OUT_OF_MEMORY;
×
95

UNCOV
96
  SSdbRaw *pRaw = sdbAllocRaw(SDB_VGROUP, VGROUP_VER_NUMBER, sizeof(SVgObj) + VGROUP_RESERVE_SIZE);
×
UNCOV
97
  if (pRaw == NULL) goto _OVER;
×
98

UNCOV
99
  int32_t dataPos = 0;
×
UNCOV
100
  SDB_SET_INT32(pRaw, dataPos, pVgroup->vgId, _OVER)
×
UNCOV
101
  SDB_SET_INT64(pRaw, dataPos, pVgroup->createdTime, _OVER)
×
UNCOV
102
  SDB_SET_INT64(pRaw, dataPos, pVgroup->updateTime, _OVER)
×
UNCOV
103
  SDB_SET_INT32(pRaw, dataPos, pVgroup->version, _OVER)
×
UNCOV
104
  SDB_SET_INT32(pRaw, dataPos, pVgroup->hashBegin, _OVER)
×
UNCOV
105
  SDB_SET_INT32(pRaw, dataPos, pVgroup->hashEnd, _OVER)
×
UNCOV
106
  SDB_SET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, _OVER)
×
UNCOV
107
  SDB_SET_INT64(pRaw, dataPos, pVgroup->dbUid, _OVER)
×
UNCOV
108
  SDB_SET_INT8(pRaw, dataPos, pVgroup->isTsma, _OVER)
×
UNCOV
109
  SDB_SET_INT8(pRaw, dataPos, pVgroup->replica, _OVER)
×
UNCOV
110
  for (int8_t i = 0; i < pVgroup->replica; ++i) {
×
UNCOV
111
    SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
×
UNCOV
112
    SDB_SET_INT32(pRaw, dataPos, pVgid->dnodeId, _OVER)
×
113
  }
UNCOV
114
  SDB_SET_INT32(pRaw, dataPos, pVgroup->syncConfChangeVer, _OVER)
×
UNCOV
115
  SDB_SET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER)
×
UNCOV
116
  SDB_SET_DATALEN(pRaw, dataPos, _OVER)
×
117

UNCOV
118
  terrno = 0;
×
119

UNCOV
120
_OVER:
×
UNCOV
121
  if (terrno != 0) {
×
122
    mError("vgId:%d, failed to encode to raw:%p since %s", pVgroup->vgId, pRaw, terrstr());
×
123
    sdbFreeRaw(pRaw);
×
124
    return NULL;
×
125
  }
126

UNCOV
127
  mTrace("vgId:%d, encode to raw:%p, row:%p", pVgroup->vgId, pRaw, pVgroup);
×
UNCOV
128
  return pRaw;
×
129
}
130

UNCOV
131
SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
×
UNCOV
132
  int32_t code = 0;
×
UNCOV
133
  int32_t lino = 0;
×
UNCOV
134
  terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
135
  SSdbRow *pRow = NULL;
×
UNCOV
136
  SVgObj  *pVgroup = NULL;
×
137

UNCOV
138
  int8_t sver = 0;
×
UNCOV
139
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
×
140

UNCOV
141
  if (sver < 1 || sver > VGROUP_VER_NUMBER) {
×
142
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
143
    goto _OVER;
×
144
  }
145

UNCOV
146
  pRow = sdbAllocRow(sizeof(SVgObj));
×
UNCOV
147
  if (pRow == NULL) goto _OVER;
×
148

UNCOV
149
  pVgroup = sdbGetRowObj(pRow);
×
UNCOV
150
  if (pVgroup == NULL) goto _OVER;
×
151

UNCOV
152
  int32_t dataPos = 0;
×
UNCOV
153
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->vgId, _OVER)
×
UNCOV
154
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->createdTime, _OVER)
×
UNCOV
155
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->updateTime, _OVER)
×
UNCOV
156
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->version, _OVER)
×
UNCOV
157
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashBegin, _OVER)
×
UNCOV
158
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashEnd, _OVER)
×
UNCOV
159
  SDB_GET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, _OVER)
×
UNCOV
160
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->dbUid, _OVER)
×
UNCOV
161
  SDB_GET_INT8(pRaw, dataPos, &pVgroup->isTsma, _OVER)
×
UNCOV
162
  SDB_GET_INT8(pRaw, dataPos, &pVgroup->replica, _OVER)
×
UNCOV
163
  for (int8_t i = 0; i < pVgroup->replica; ++i) {
×
UNCOV
164
    SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
×
UNCOV
165
    SDB_GET_INT32(pRaw, dataPos, &pVgid->dnodeId, _OVER)
×
UNCOV
166
    if (pVgroup->replica == 1) {
×
UNCOV
167
      pVgid->syncState = TAOS_SYNC_STATE_LEADER;
×
168
    }
169
  }
UNCOV
170
  if (dataPos + sizeof(int32_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen) {
×
UNCOV
171
    SDB_GET_INT32(pRaw, dataPos, &pVgroup->syncConfChangeVer, _OVER)
×
172
  }
173

UNCOV
174
  SDB_GET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER)
×
175

UNCOV
176
  terrno = 0;
×
177

UNCOV
178
_OVER:
×
UNCOV
179
  if (terrno != 0) {
×
180
    mError("vgId:%d, failed to decode from raw:%p since %s", pVgroup == NULL ? 0 : pVgroup->vgId, pRaw, terrstr());
×
181
    taosMemoryFreeClear(pRow);
×
182
    return NULL;
×
183
  }
184

UNCOV
185
  mTrace("vgId:%d, decode from raw:%p, row:%p", pVgroup->vgId, pRaw, pVgroup);
×
UNCOV
186
  return pRow;
×
187
}
188

UNCOV
189
static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw) {
×
UNCOV
190
  SSdb    *pSdb = pMnode->pSdb;
×
UNCOV
191
  SSdbRow *pRow = NULL;
×
UNCOV
192
  SVgObj  *pVgroup = NULL;
×
UNCOV
193
  int      code = -1;
×
194

UNCOV
195
  pRow = mndVgroupActionDecode(pRaw);
×
UNCOV
196
  if (pRow == NULL) {
×
197
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
198
    if (terrno != 0) code = terrno;
×
199
    goto _OVER;
×
200
  }
UNCOV
201
  pVgroup = sdbGetRowObj(pRow);
×
UNCOV
202
  if (pVgroup == NULL) {
×
203
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
204
    if (terrno != 0) code = terrno;
×
205
    goto _OVER;
×
206
  }
207

UNCOV
208
  int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
×
UNCOV
209
  if (maxVgId > pVgroup->vgId) {
×
210
    mError("trans:%d, vgroup id %d already in use. maxVgId:%d", pTrans->id, pVgroup->vgId, maxVgId);
×
211
    goto _OVER;
×
212
  }
213

UNCOV
214
  code = 0;
×
UNCOV
215
_OVER:
×
UNCOV
216
  if (pVgroup) mndVgroupActionDelete(pSdb, pVgroup);
×
UNCOV
217
  taosMemoryFreeClear(pRow);
×
UNCOV
218
  TAOS_RETURN(code);
×
219
}
220

UNCOV
221
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup) {
×
UNCOV
222
  mTrace("vgId:%d, perform insert action, row:%p", pVgroup->vgId, pVgroup);
×
UNCOV
223
  return 0;
×
224
}
225

UNCOV
226
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup) {
×
UNCOV
227
  mTrace("vgId:%d, perform delete action, row:%p", pVgroup->vgId, pVgroup);
×
UNCOV
228
  return 0;
×
229
}
230

UNCOV
231
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew) {
×
UNCOV
232
  mTrace("vgId:%d, perform update action, old row:%p new row:%p", pOld->vgId, pOld, pNew);
×
UNCOV
233
  pOld->updateTime = pNew->updateTime;
×
UNCOV
234
  pOld->version = pNew->version;
×
UNCOV
235
  pOld->hashBegin = pNew->hashBegin;
×
UNCOV
236
  pOld->hashEnd = pNew->hashEnd;
×
UNCOV
237
  pOld->replica = pNew->replica;
×
UNCOV
238
  pOld->isTsma = pNew->isTsma;
×
UNCOV
239
  for (int32_t i = 0; i < pNew->replica; ++i) {
×
UNCOV
240
    SVnodeGid *pNewGid = &pNew->vnodeGid[i];
×
UNCOV
241
    for (int32_t j = 0; j < pOld->replica; ++j) {
×
UNCOV
242
      SVnodeGid *pOldGid = &pOld->vnodeGid[j];
×
UNCOV
243
      if (pNewGid->dnodeId == pOldGid->dnodeId) {
×
UNCOV
244
        pNewGid->syncState = pOldGid->syncState;
×
UNCOV
245
        pNewGid->syncRestore = pOldGid->syncRestore;
×
UNCOV
246
        pNewGid->syncCanRead = pOldGid->syncCanRead;
×
UNCOV
247
        pNewGid->syncAppliedIndex = pOldGid->syncAppliedIndex;
×
UNCOV
248
        pNewGid->syncCommitIndex = pOldGid->syncCommitIndex;
×
249
      }
250
    }
251
  }
UNCOV
252
  pNew->numOfTables = pOld->numOfTables;
×
UNCOV
253
  pNew->numOfTimeSeries = pOld->numOfTimeSeries;
×
UNCOV
254
  pNew->totalStorage = pOld->totalStorage;
×
UNCOV
255
  pNew->compStorage = pOld->compStorage;
×
UNCOV
256
  pNew->pointsWritten = pOld->pointsWritten;
×
UNCOV
257
  pNew->compact = pOld->compact;
×
UNCOV
258
  memcpy(pOld->vnodeGid, pNew->vnodeGid, (TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA) * sizeof(SVnodeGid));
×
UNCOV
259
  pOld->syncConfChangeVer = pNew->syncConfChangeVer;
×
UNCOV
260
  return 0;
×
261
}
262

UNCOV
263
SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId) {
×
UNCOV
264
  SSdb   *pSdb = pMnode->pSdb;
×
UNCOV
265
  SVgObj *pVgroup = sdbAcquire(pSdb, SDB_VGROUP, &vgId);
×
UNCOV
266
  if (pVgroup == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
UNCOV
267
    terrno = TSDB_CODE_MND_VGROUP_NOT_EXIST;
×
268
  }
UNCOV
269
  return pVgroup;
×
270
}
271

UNCOV
272
void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) {
×
UNCOV
273
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
274
  sdbRelease(pSdb, pVgroup);
×
UNCOV
275
}
×
276

UNCOV
277
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
×
UNCOV
278
  SCreateVnodeReq createReq = {0};
×
UNCOV
279
  createReq.vgId = pVgroup->vgId;
×
UNCOV
280
  memcpy(createReq.db, pDb->name, TSDB_DB_FNAME_LEN);
×
UNCOV
281
  createReq.dbUid = pDb->uid;
×
UNCOV
282
  createReq.vgVersion = pVgroup->version;
×
UNCOV
283
  createReq.numOfStables = pDb->cfg.numOfStables;
×
UNCOV
284
  createReq.buffer = pDb->cfg.buffer;
×
UNCOV
285
  createReq.pageSize = pDb->cfg.pageSize;
×
UNCOV
286
  createReq.pages = pDb->cfg.pages;
×
UNCOV
287
  createReq.cacheLastSize = pDb->cfg.cacheLastSize;
×
UNCOV
288
  createReq.daysPerFile = pDb->cfg.daysPerFile;
×
UNCOV
289
  createReq.daysToKeep0 = pDb->cfg.daysToKeep0;
×
UNCOV
290
  createReq.daysToKeep1 = pDb->cfg.daysToKeep1;
×
UNCOV
291
  createReq.daysToKeep2 = pDb->cfg.daysToKeep2;
×
UNCOV
292
  createReq.keepTimeOffset = pDb->cfg.keepTimeOffset;
×
UNCOV
293
  createReq.s3ChunkSize = pDb->cfg.s3ChunkSize;
×
UNCOV
294
  createReq.s3KeepLocal = pDb->cfg.s3KeepLocal;
×
UNCOV
295
  createReq.s3Compact = pDb->cfg.s3Compact;
×
UNCOV
296
  createReq.minRows = pDb->cfg.minRows;
×
UNCOV
297
  createReq.maxRows = pDb->cfg.maxRows;
×
UNCOV
298
  createReq.walFsyncPeriod = pDb->cfg.walFsyncPeriod;
×
UNCOV
299
  createReq.walLevel = pDb->cfg.walLevel;
×
UNCOV
300
  createReq.precision = pDb->cfg.precision;
×
UNCOV
301
  createReq.compression = pDb->cfg.compression;
×
UNCOV
302
  createReq.strict = pDb->cfg.strict;
×
UNCOV
303
  createReq.cacheLast = pDb->cfg.cacheLast;
×
UNCOV
304
  createReq.replica = 0;
×
UNCOV
305
  createReq.learnerReplica = 0;
×
UNCOV
306
  createReq.selfIndex = -1;
×
UNCOV
307
  createReq.learnerSelfIndex = -1;
×
UNCOV
308
  createReq.hashBegin = pVgroup->hashBegin;
×
UNCOV
309
  createReq.hashEnd = pVgroup->hashEnd;
×
UNCOV
310
  createReq.hashMethod = pDb->cfg.hashMethod;
×
UNCOV
311
  createReq.numOfRetensions = pDb->cfg.numOfRetensions;
×
UNCOV
312
  createReq.pRetensions = pDb->cfg.pRetensions;
×
UNCOV
313
  createReq.isTsma = pVgroup->isTsma;
×
UNCOV
314
  createReq.pTsma = pVgroup->pTsma;
×
UNCOV
315
  createReq.walRetentionPeriod = pDb->cfg.walRetentionPeriod;
×
UNCOV
316
  createReq.walRetentionSize = pDb->cfg.walRetentionSize;
×
UNCOV
317
  createReq.walRollPeriod = pDb->cfg.walRollPeriod;
×
UNCOV
318
  createReq.walSegmentSize = pDb->cfg.walSegmentSize;
×
UNCOV
319
  createReq.sstTrigger = pDb->cfg.sstTrigger;
×
UNCOV
320
  createReq.hashPrefix = pDb->cfg.hashPrefix;
×
UNCOV
321
  createReq.hashSuffix = pDb->cfg.hashSuffix;
×
UNCOV
322
  createReq.tsdbPageSize = pDb->cfg.tsdbPageSize;
×
UNCOV
323
  createReq.changeVersion = ++(pVgroup->syncConfChangeVer);
×
UNCOV
324
  createReq.encryptAlgorithm = pDb->cfg.encryptAlgorithm;
×
UNCOV
325
  int32_t code = 0;
×
326

UNCOV
327
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
×
UNCOV
328
    SReplica *pReplica = NULL;
×
329

UNCOV
330
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
UNCOV
331
      pReplica = &createReq.replicas[createReq.replica];
×
332
    } else {
UNCOV
333
      pReplica = &createReq.learnerReplicas[createReq.learnerReplica];
×
334
    }
335

UNCOV
336
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
×
UNCOV
337
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
×
UNCOV
338
    if (pVgidDnode == NULL) {
×
339
      return NULL;
×
340
    }
341

UNCOV
342
    pReplica->id = pVgidDnode->id;
×
UNCOV
343
    pReplica->port = pVgidDnode->port;
×
UNCOV
344
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
×
UNCOV
345
    mndReleaseDnode(pMnode, pVgidDnode);
×
346

UNCOV
347
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
UNCOV
348
      if (pDnode->id == pVgid->dnodeId) {
×
UNCOV
349
        createReq.selfIndex = createReq.replica;
×
350
      }
351
    } else {
UNCOV
352
      if (pDnode->id == pVgid->dnodeId) {
×
UNCOV
353
        createReq.learnerSelfIndex = createReq.learnerReplica;
×
354
      }
355
    }
356

UNCOV
357
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
UNCOV
358
      createReq.replica++;
×
359
    } else {
UNCOV
360
      createReq.learnerReplica++;
×
361
    }
362
  }
363

UNCOV
364
  if (createReq.selfIndex == -1 && createReq.learnerSelfIndex == -1) {
×
365
    terrno = TSDB_CODE_APP_ERROR;
×
366
    return NULL;
×
367
  }
368

UNCOV
369
  createReq.changeVersion = pVgroup->syncConfChangeVer;
×
370

UNCOV
371
  mInfo(
×
372
      "vgId:%d, build create vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d "
373
      "changeVersion:%d",
374
      createReq.vgId, createReq.replica, createReq.selfIndex, createReq.learnerReplica, createReq.learnerSelfIndex,
375
      createReq.strict, createReq.changeVersion);
UNCOV
376
  for (int32_t i = 0; i < createReq.replica; ++i) {
×
UNCOV
377
    mInfo("vgId:%d, replica:%d ep:%s:%u", createReq.vgId, i, createReq.replicas[i].fqdn, createReq.replicas[i].port);
×
378
  }
UNCOV
379
  for (int32_t i = 0; i < createReq.learnerReplica; ++i) {
×
UNCOV
380
    mInfo("vgId:%d, replica:%d ep:%s:%u", createReq.vgId, i, createReq.learnerReplicas[i].fqdn,
×
381
          createReq.learnerReplicas[i].port);
382
  }
383

UNCOV
384
  int32_t contLen = tSerializeSCreateVnodeReq(NULL, 0, &createReq);
×
UNCOV
385
  if (contLen < 0) {
×
386
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
387
    return NULL;
×
388
  }
389

UNCOV
390
  void *pReq = taosMemoryMalloc(contLen);
×
UNCOV
391
  if (pReq == NULL) {
×
392
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
393
    return NULL;
×
394
  }
395

UNCOV
396
  code = tSerializeSCreateVnodeReq(pReq, contLen, &createReq);
×
UNCOV
397
  if (code < 0) {
×
398
    terrno = TSDB_CODE_APP_ERROR;
×
399
    taosMemoryFree(pReq);
×
400
    mError("vgId:%d, failed to serialize create vnode req,since %s", createReq.vgId, terrstr());
×
401
    return NULL;
×
402
  }
UNCOV
403
  *pContLen = contLen;
×
UNCOV
404
  return pReq;
×
405
}
406

UNCOV
407
static void *mndBuildAlterVnodeConfigReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
×
UNCOV
408
  SAlterVnodeConfigReq alterReq = {0};
×
UNCOV
409
  alterReq.vgVersion = pVgroup->version;
×
UNCOV
410
  alterReq.buffer = pDb->cfg.buffer;
×
UNCOV
411
  alterReq.pageSize = pDb->cfg.pageSize;
×
UNCOV
412
  alterReq.pages = pDb->cfg.pages;
×
UNCOV
413
  alterReq.cacheLastSize = pDb->cfg.cacheLastSize;
×
UNCOV
414
  alterReq.daysPerFile = pDb->cfg.daysPerFile;
×
UNCOV
415
  alterReq.daysToKeep0 = pDb->cfg.daysToKeep0;
×
UNCOV
416
  alterReq.daysToKeep1 = pDb->cfg.daysToKeep1;
×
UNCOV
417
  alterReq.daysToKeep2 = pDb->cfg.daysToKeep2;
×
UNCOV
418
  alterReq.keepTimeOffset = pDb->cfg.keepTimeOffset;
×
UNCOV
419
  alterReq.walFsyncPeriod = pDb->cfg.walFsyncPeriod;
×
UNCOV
420
  alterReq.walLevel = pDb->cfg.walLevel;
×
UNCOV
421
  alterReq.strict = pDb->cfg.strict;
×
UNCOV
422
  alterReq.cacheLast = pDb->cfg.cacheLast;
×
UNCOV
423
  alterReq.sttTrigger = pDb->cfg.sstTrigger;
×
UNCOV
424
  alterReq.minRows = pDb->cfg.minRows;
×
UNCOV
425
  alterReq.walRetentionPeriod = pDb->cfg.walRetentionPeriod;
×
UNCOV
426
  alterReq.walRetentionSize = pDb->cfg.walRetentionSize;
×
UNCOV
427
  alterReq.s3KeepLocal = pDb->cfg.s3KeepLocal;
×
UNCOV
428
  alterReq.s3Compact = pDb->cfg.s3Compact;
×
429

UNCOV
430
  mInfo("vgId:%d, build alter vnode config req", pVgroup->vgId);
×
UNCOV
431
  int32_t contLen = tSerializeSAlterVnodeConfigReq(NULL, 0, &alterReq);
×
UNCOV
432
  if (contLen < 0) {
×
433
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
434
    return NULL;
×
435
  }
UNCOV
436
  contLen += sizeof(SMsgHead);
×
437

UNCOV
438
  void *pReq = taosMemoryMalloc(contLen);
×
UNCOV
439
  if (pReq == NULL) {
×
440
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
441
    return NULL;
×
442
  }
443

UNCOV
444
  SMsgHead *pHead = pReq;
×
UNCOV
445
  pHead->contLen = htonl(contLen);
×
UNCOV
446
  pHead->vgId = htonl(pVgroup->vgId);
×
447

UNCOV
448
  if (tSerializeSAlterVnodeConfigReq((char *)pReq + sizeof(SMsgHead), contLen, &alterReq) < 0) {
×
449
    taosMemoryFree(pReq);
×
450
    mError("vgId:%d, failed to serialize alter vnode config req,since %s", pVgroup->vgId, terrstr());
×
451
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
452
    return NULL;
×
453
  }
UNCOV
454
  *pContLen = contLen;
×
UNCOV
455
  return pReq;
×
456
}
457

UNCOV
458
static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId,
×
459
                                          int32_t *pContLen) {
UNCOV
460
  SAlterVnodeReplicaReq alterReq = {
×
UNCOV
461
      .vgId = pVgroup->vgId,
×
UNCOV
462
      .strict = pDb->cfg.strict,
×
463
      .replica = 0,
464
      .learnerReplica = 0,
465
      .selfIndex = -1,
466
      .learnerSelfIndex = -1,
UNCOV
467
      .changeVersion = ++(pVgroup->syncConfChangeVer),
×
468
  };
469

UNCOV
470
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
×
UNCOV
471
    SReplica *pReplica = NULL;
×
472

UNCOV
473
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
UNCOV
474
      pReplica = &alterReq.replicas[alterReq.replica];
×
UNCOV
475
      alterReq.replica++;
×
476
    } else {
UNCOV
477
      pReplica = &alterReq.learnerReplicas[alterReq.learnerReplica];
×
UNCOV
478
      alterReq.learnerReplica++;
×
479
    }
480

UNCOV
481
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
×
UNCOV
482
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
×
UNCOV
483
    if (pVgidDnode == NULL) return NULL;
×
484

UNCOV
485
    pReplica->id = pVgidDnode->id;
×
UNCOV
486
    pReplica->port = pVgidDnode->port;
×
UNCOV
487
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
×
UNCOV
488
    mndReleaseDnode(pMnode, pVgidDnode);
×
489

UNCOV
490
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
UNCOV
491
      if (dnodeId == pVgid->dnodeId) {
×
UNCOV
492
        alterReq.selfIndex = v;
×
493
      }
494
    } else {
UNCOV
495
      if (dnodeId == pVgid->dnodeId) {
×
496
        alterReq.learnerSelfIndex = v;
×
497
      }
498
    }
499
  }
500

UNCOV
501
  mInfo(
×
502
      "vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d "
503
      "changeVersion:%d",
504
      alterReq.vgId, alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica, alterReq.learnerSelfIndex,
505
      alterReq.strict, alterReq.changeVersion);
UNCOV
506
  for (int32_t i = 0; i < alterReq.replica; ++i) {
×
UNCOV
507
    mInfo("vgId:%d, replica:%d ep:%s:%u", alterReq.vgId, i, alterReq.replicas[i].fqdn, alterReq.replicas[i].port);
×
508
  }
UNCOV
509
  for (int32_t i = 0; i < alterReq.learnerReplica; ++i) {
×
UNCOV
510
    mInfo("vgId:%d, learnerReplica:%d ep:%s:%u", alterReq.vgId, i, alterReq.learnerReplicas[i].fqdn,
×
511
          alterReq.learnerReplicas[i].port);
512
  }
513

UNCOV
514
  if (alterReq.selfIndex == -1 && alterReq.learnerSelfIndex == -1) {
×
515
    terrno = TSDB_CODE_APP_ERROR;
×
516
    return NULL;
×
517
  }
518

UNCOV
519
  int32_t contLen = tSerializeSAlterVnodeReplicaReq(NULL, 0, &alterReq);
×
UNCOV
520
  if (contLen < 0) {
×
521
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
522
    return NULL;
×
523
  }
524

UNCOV
525
  void *pReq = taosMemoryMalloc(contLen);
×
UNCOV
526
  if (pReq == NULL) {
×
527
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
528
    return NULL;
×
529
  }
530

UNCOV
531
  if (tSerializeSAlterVnodeReplicaReq(pReq, contLen, &alterReq) < 0) {
×
532
    mError("vgId:%d, failed to serialize alter vnode req,since %s", alterReq.vgId, terrstr());
×
533
    taosMemoryFree(pReq);
×
534
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
535
    return NULL;
×
536
  }
UNCOV
537
  *pContLen = contLen;
×
UNCOV
538
  return pReq;
×
539
}
540

541
static void *mndBuildCheckLearnCatchupReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId,
×
542
                                          int32_t *pContLen) {
543
  SCheckLearnCatchupReq req = {
×
544
      .vgId = pVgroup->vgId,
×
545
      .strict = pDb->cfg.strict,
×
546
      .replica = 0,
547
      .learnerReplica = 0,
548
      .selfIndex = -1,
549
      .learnerSelfIndex = -1,
550
  };
551

552
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
×
553
    SReplica *pReplica = NULL;
×
554

555
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
556
      pReplica = &req.replicas[req.replica];
×
557
      req.replica++;
×
558
    } else {
559
      pReplica = &req.learnerReplicas[req.learnerReplica];
×
560
      req.learnerReplica++;
×
561
    }
562

563
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
×
564
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
×
565
    if (pVgidDnode == NULL) return NULL;
×
566

567
    pReplica->id = pVgidDnode->id;
×
568
    pReplica->port = pVgidDnode->port;
×
569
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
×
570
    mndReleaseDnode(pMnode, pVgidDnode);
×
571

572
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
573
      if (dnodeId == pVgid->dnodeId) {
×
574
        req.selfIndex = v;
×
575
      }
576
    } else {
577
      if (dnodeId == pVgid->dnodeId) {
×
578
        req.learnerSelfIndex = v;
×
579
      }
580
    }
581
  }
582

583
  mInfo("vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d",
×
584
        req.vgId, req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict);
585
  for (int32_t i = 0; i < req.replica; ++i) {
×
586
    mInfo("vgId:%d, replica:%d ep:%s:%u", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port);
×
587
  }
588
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
×
589
    mInfo("vgId:%d, learnerReplica:%d ep:%s:%u", req.vgId, i, req.learnerReplicas[i].fqdn, req.learnerReplicas[i].port);
×
590
  }
591

592
  if (req.selfIndex == -1 && req.learnerSelfIndex == -1) {
×
593
    terrno = TSDB_CODE_APP_ERROR;
×
594
    return NULL;
×
595
  }
596

597
  int32_t contLen = tSerializeSAlterVnodeReplicaReq(NULL, 0, &req);
×
598
  if (contLen < 0) {
×
599
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
600
    return NULL;
×
601
  }
602

603
  void *pReq = taosMemoryMalloc(contLen);
×
604
  if (pReq == NULL) {
×
605
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
606
    return NULL;
×
607
  }
608

609
  if (tSerializeSAlterVnodeReplicaReq(pReq, contLen, &req) < 0) {
×
610
    mError("vgId:%d, failed to serialize alter vnode req,since %s", req.vgId, terrstr());
×
611
    taosMemoryFree(pReq);
×
612
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
613
    return NULL;
×
614
  }
615
  *pContLen = contLen;
×
616
  return pReq;
×
617
}
618

UNCOV
619
static void *mndBuildDisableVnodeWriteReq(SMnode *pMnode, SDbObj *pDb, int32_t vgId, int32_t *pContLen) {
×
UNCOV
620
  SDisableVnodeWriteReq disableReq = {
×
621
      .vgId = vgId,
622
      .disable = 1,
623
  };
624

UNCOV
625
  mInfo("vgId:%d, build disable vnode write req", vgId);
×
UNCOV
626
  int32_t contLen = tSerializeSDisableVnodeWriteReq(NULL, 0, &disableReq);
×
UNCOV
627
  if (contLen < 0) {
×
628
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
629
    return NULL;
×
630
  }
631

UNCOV
632
  void *pReq = taosMemoryMalloc(contLen);
×
UNCOV
633
  if (pReq == NULL) {
×
634
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
635
    return NULL;
×
636
  }
637

UNCOV
638
  if (tSerializeSDisableVnodeWriteReq(pReq, contLen, &disableReq) < 0) {
×
639
    mError("vgId:%d, failed to serialize disable vnode write req,since %s", vgId, terrstr());
×
640
    taosMemoryFree(pReq);
×
641
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
642
    return NULL;
×
643
  }
UNCOV
644
  *pContLen = contLen;
×
UNCOV
645
  return pReq;
×
646
}
647

UNCOV
648
static void *mndBuildAlterVnodeHashRangeReq(SMnode *pMnode, int32_t srcVgId, SVgObj *pVgroup, int32_t *pContLen) {
×
UNCOV
649
  SAlterVnodeHashRangeReq alterReq = {
×
650
      .srcVgId = srcVgId,
UNCOV
651
      .dstVgId = pVgroup->vgId,
×
UNCOV
652
      .hashBegin = pVgroup->hashBegin,
×
UNCOV
653
      .hashEnd = pVgroup->hashEnd,
×
UNCOV
654
      .changeVersion = ++(pVgroup->syncConfChangeVer),
×
655
  };
656

UNCOV
657
  mInfo("vgId:%d, build alter vnode hashrange req, dstVgId:%d, hashrange:[%u, %u]", srcVgId, pVgroup->vgId,
×
658
        pVgroup->hashBegin, pVgroup->hashEnd);
UNCOV
659
  int32_t contLen = tSerializeSAlterVnodeHashRangeReq(NULL, 0, &alterReq);
×
UNCOV
660
  if (contLen < 0) {
×
661
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
662
    return NULL;
×
663
  }
664

UNCOV
665
  void *pReq = taosMemoryMalloc(contLen);
×
UNCOV
666
  if (pReq == NULL) {
×
667
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
668
    return NULL;
×
669
  }
670

UNCOV
671
  if (tSerializeSAlterVnodeHashRangeReq(pReq, contLen, &alterReq) < 0) {
×
672
    mError("vgId:%d, failed to serialize alter vnode hashrange req,since %s", srcVgId, terrstr());
×
673
    taosMemoryFree(pReq);
×
674
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
675
    return NULL;
×
676
  }
UNCOV
677
  *pContLen = contLen;
×
UNCOV
678
  return pReq;
×
679
}
680

UNCOV
681
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
×
UNCOV
682
  SDropVnodeReq dropReq = {0};
×
UNCOV
683
  dropReq.dnodeId = pDnode->id;
×
UNCOV
684
  dropReq.vgId = pVgroup->vgId;
×
UNCOV
685
  memcpy(dropReq.db, pDb->name, TSDB_DB_FNAME_LEN);
×
UNCOV
686
  dropReq.dbUid = pDb->uid;
×
687

UNCOV
688
  mInfo("vgId:%d, build drop vnode req", dropReq.vgId);
×
UNCOV
689
  int32_t contLen = tSerializeSDropVnodeReq(NULL, 0, &dropReq);
×
UNCOV
690
  if (contLen < 0) {
×
691
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
692
    return NULL;
×
693
  }
694

UNCOV
695
  void *pReq = taosMemoryMalloc(contLen);
×
UNCOV
696
  if (pReq == NULL) {
×
697
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
698
    return NULL;
×
699
  }
700

UNCOV
701
  if (tSerializeSDropVnodeReq(pReq, contLen, &dropReq) < 0) {
×
702
    mError("vgId:%d, failed to serialize drop vnode req,since %s", dropReq.vgId, terrstr());
×
703
    taosMemoryFree(pReq);
×
704
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
705
    return NULL;
×
706
  }
UNCOV
707
  *pContLen = contLen;
×
UNCOV
708
  return pReq;
×
709
}
710

UNCOV
711
static bool mndResetDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
×
UNCOV
712
  SDnodeObj *pDnode = pObj;
×
UNCOV
713
  pDnode->numOfVnodes = 0;
×
UNCOV
714
  pDnode->numOfOtherNodes = 0;
×
UNCOV
715
  return true;
×
716
}
717

UNCOV
718
static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
×
UNCOV
719
  SDnodeObj *pDnode = pObj;
×
UNCOV
720
  SArray    *pArray = p1;
×
UNCOV
721
  int32_t    exceptDnodeId = *(int32_t *)p2;
×
UNCOV
722
  SArray    *dnodeList = p3;
×
723

UNCOV
724
  if (exceptDnodeId == pDnode->id) {
×
UNCOV
725
    return true;
×
726
  }
727

UNCOV
728
  if (dnodeList != NULL) {
×
UNCOV
729
    int32_t dnodeListSize = taosArrayGetSize(dnodeList);
×
UNCOV
730
    if (dnodeListSize > 0) {
×
UNCOV
731
      bool inDnodeList = false;
×
UNCOV
732
      for (int32_t index = 0; index < dnodeListSize; ++index) {
×
UNCOV
733
        int32_t dnodeId = *(int32_t *)taosArrayGet(dnodeList, index);
×
UNCOV
734
        if (pDnode->id == dnodeId) {
×
UNCOV
735
          inDnodeList = true;
×
736
        }
737
      }
UNCOV
738
      if (!inDnodeList) {
×
UNCOV
739
        return true;
×
740
      }
741
    }
742
  }
743

UNCOV
744
  int64_t curMs = taosGetTimestampMs();
×
UNCOV
745
  bool    online = mndIsDnodeOnline(pDnode, curMs);
×
UNCOV
746
  bool    isMnode = mndIsMnode(pMnode, pDnode->id);
×
UNCOV
747
  pDnode->numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
×
UNCOV
748
  pDnode->memUsed = mndGetVnodesMemory(pMnode, pDnode->id);
×
749

UNCOV
750
  mInfo("dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d online:%d memory avail:%" PRId64 " used:%" PRId64, pDnode->id,
×
751
        pDnode->numOfVnodes, pDnode->numOfSupportVnodes, isMnode, online, pDnode->memAvail, pDnode->memUsed);
752

UNCOV
753
  if (isMnode) {
×
UNCOV
754
    pDnode->numOfOtherNodes++;
×
755
  }
756

UNCOV
757
  if (online && pDnode->numOfSupportVnodes > 0) {
×
UNCOV
758
    if (taosArrayPush(pArray, pDnode) == NULL) return false;
×
759
  }
UNCOV
760
  return true;
×
761
}
762

UNCOV
763
SArray *mndBuildDnodesArray(SMnode *pMnode, int32_t exceptDnodeId, SArray *dnodeList) {
×
UNCOV
764
  SSdb   *pSdb = pMnode->pSdb;
×
UNCOV
765
  int32_t numOfDnodes = mndGetDnodeSize(pMnode);
×
766

UNCOV
767
  SArray *pArray = taosArrayInit(numOfDnodes, sizeof(SDnodeObj));
×
UNCOV
768
  if (pArray == NULL) {
×
769
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
770
    return NULL;
×
771
  }
772

UNCOV
773
  sdbTraverse(pSdb, SDB_DNODE, mndResetDnodesArrayFp, NULL, NULL, NULL);
×
UNCOV
774
  sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesArrayFp, pArray, &exceptDnodeId, dnodeList);
×
775

UNCOV
776
  mDebug("build %d dnodes array", (int32_t)taosArrayGetSize(pArray));
×
UNCOV
777
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
×
UNCOV
778
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
×
UNCOV
779
    mDebug("dnode:%d, vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes);
×
780
  }
UNCOV
781
  return pArray;
×
782
}
783

784
static int32_t mndCompareDnodeId(int32_t *dnode1Id, int32_t *dnode2Id) {
×
785
  if (*dnode1Id == *dnode2Id) {
×
786
    return 0;
×
787
  }
788
  return *dnode1Id > *dnode2Id ? 1 : -1;
×
789
}
790

UNCOV
791
static float mndGetDnodeScore(SDnodeObj *pDnode, int32_t additionDnodes, float ratio) {
×
UNCOV
792
  float totalDnodes = pDnode->numOfVnodes + (float)pDnode->numOfOtherNodes * ratio + additionDnodes;
×
UNCOV
793
  return totalDnodes / pDnode->numOfSupportVnodes;
×
794
}
795

UNCOV
796
static int32_t mndCompareDnodeVnodes(SDnodeObj *pDnode1, SDnodeObj *pDnode2) {
×
UNCOV
797
  float d1Score = mndGetDnodeScore(pDnode1, 0, 0.9);
×
UNCOV
798
  float d2Score = mndGetDnodeScore(pDnode2, 0, 0.9);
×
UNCOV
799
  if (d1Score == d2Score) {
×
UNCOV
800
    return 0;
×
801
  }
UNCOV
802
  return d1Score > d2Score ? 1 : -1;
×
803
}
804

UNCOV
805
void mndSortVnodeGid(SVgObj *pVgroup) {
×
UNCOV
806
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
×
UNCOV
807
    for (int32_t j = 0; j < pVgroup->replica - 1 - i; ++j) {
×
UNCOV
808
      if (pVgroup->vnodeGid[j].dnodeId > pVgroup->vnodeGid[j + 1].dnodeId) {
×
UNCOV
809
        TSWAP(pVgroup->vnodeGid[j], pVgroup->vnodeGid[j + 1]);
×
810
      }
811
    }
812
  }
UNCOV
813
}
×
814

UNCOV
815
static int32_t mndGetAvailableDnode(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, SArray *pArray) {
×
UNCOV
816
  mDebug("start to sort %d dnodes", (int32_t)taosArrayGetSize(pArray));
×
UNCOV
817
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
×
UNCOV
818
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
×
UNCOV
819
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
×
UNCOV
820
    mDebug("dnode:%d, score:%f", pDnode->id, mndGetDnodeScore(pDnode, 0, 0.9));
×
821
  }
822

UNCOV
823
  int32_t size = taosArrayGetSize(pArray);
×
UNCOV
824
  if (size < pVgroup->replica) {
×
UNCOV
825
    mError("db:%s, vgId:%d, no enough online dnodes:%d to alloc %d replica", pVgroup->dbName, pVgroup->vgId, size,
×
826
           pVgroup->replica);
UNCOV
827
    TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_DNODES);
×
828
  }
829

UNCOV
830
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
×
UNCOV
831
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
×
UNCOV
832
    SDnodeObj *pDnode = taosArrayGet(pArray, v);
×
UNCOV
833
    if (pDnode == NULL) {
×
834
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_DNODES);
×
835
    }
UNCOV
836
    if (pDnode->numOfVnodes >= pDnode->numOfSupportVnodes) {
×
837
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_VNODES);
×
838
    }
839

UNCOV
840
    int64_t vgMem = mndGetVgroupMemory(pMnode, pDb, pVgroup);
×
UNCOV
841
    if (pDnode->memAvail - vgMem - pDnode->memUsed <= 0) {
×
842
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d, avail:%" PRId64 " used:%" PRId64,
×
843
             pVgroup->dbName, pVgroup->vgId, vgMem, pDnode->id, pDnode->memAvail, pDnode->memUsed);
844
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE);
×
845
    } else {
UNCOV
846
      pDnode->memUsed += vgMem;
×
847
    }
848

UNCOV
849
    pVgid->dnodeId = pDnode->id;
×
UNCOV
850
    if (pVgroup->replica == 1) {
×
UNCOV
851
      pVgid->syncState = TAOS_SYNC_STATE_LEADER;
×
852
    } else {
UNCOV
853
      pVgid->syncState = TAOS_SYNC_STATE_FOLLOWER;
×
854
    }
855

UNCOV
856
    mInfo("db:%s, vgId:%d, vn:%d is alloced, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
×
857
          pVgroup->dbName, pVgroup->vgId, v, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
UNCOV
858
    pDnode->numOfVnodes++;
×
859
  }
860

UNCOV
861
  mndSortVnodeGid(pVgroup);
×
UNCOV
862
  return 0;
×
863
}
864

UNCOV
865
int32_t mndAllocSmaVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup) {
×
UNCOV
866
  int32_t code = 0;
×
UNCOV
867
  SArray *pArray = mndBuildDnodesArray(pMnode, 0, NULL);
×
UNCOV
868
  if (pArray == NULL) {
×
869
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
870
    if (terrno != 0) code = terrno;
×
871
    TAOS_RETURN(code);
×
872
  }
873

UNCOV
874
  pVgroup->vgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
×
UNCOV
875
  pVgroup->isTsma = 1;
×
UNCOV
876
  pVgroup->createdTime = taosGetTimestampMs();
×
UNCOV
877
  pVgroup->updateTime = pVgroup->createdTime;
×
UNCOV
878
  pVgroup->version = 1;
×
UNCOV
879
  memcpy(pVgroup->dbName, pDb->name, TSDB_DB_FNAME_LEN);
×
UNCOV
880
  pVgroup->dbUid = pDb->uid;
×
UNCOV
881
  pVgroup->replica = 1;
×
882

UNCOV
883
  if (mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray) != 0) return -1;
×
UNCOV
884
  taosArrayDestroy(pArray);
×
885

UNCOV
886
  mInfo("db:%s, sma vgId:%d is alloced", pDb->name, pVgroup->vgId);
×
UNCOV
887
  return 0;
×
888
}
889

UNCOV
890
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups, SArray *dnodeList) {
×
UNCOV
891
  int32_t code = -1;
×
UNCOV
892
  SArray *pArray = NULL;
×
UNCOV
893
  SVgObj *pVgroups = NULL;
×
894

UNCOV
895
  pVgroups = taosMemoryCalloc(pDb->cfg.numOfVgroups, sizeof(SVgObj));
×
UNCOV
896
  if (pVgroups == NULL) {
×
897
    code = terrno;
×
898
    goto _OVER;
×
899
  }
900

UNCOV
901
  pArray = mndBuildDnodesArray(pMnode, 0, dnodeList);
×
UNCOV
902
  if (pArray == NULL) {
×
903
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
904
    if (terrno != 0) code = terrno;
×
905
    goto _OVER;
×
906
  }
907

UNCOV
908
  mInfo("db:%s, total %d dnodes used to create %d vgroups (%d vnodes)", pDb->name, (int32_t)taosArrayGetSize(pArray),
×
909
        pDb->cfg.numOfVgroups, pDb->cfg.numOfVgroups * pDb->cfg.replications);
910

UNCOV
911
  int32_t  allocedVgroups = 0;
×
UNCOV
912
  int32_t  maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
×
UNCOV
913
  uint32_t hashMin = 0;
×
UNCOV
914
  uint32_t hashMax = UINT32_MAX;
×
UNCOV
915
  uint32_t hashInterval = (hashMax - hashMin) / pDb->cfg.numOfVgroups;
×
916

UNCOV
917
  if (maxVgId < 2) maxVgId = 2;
×
918

UNCOV
919
  for (uint32_t v = 0; v < pDb->cfg.numOfVgroups; v++) {
×
UNCOV
920
    SVgObj *pVgroup = &pVgroups[v];
×
UNCOV
921
    pVgroup->vgId = maxVgId++;
×
UNCOV
922
    pVgroup->createdTime = taosGetTimestampMs();
×
UNCOV
923
    pVgroup->updateTime = pVgroups->createdTime;
×
UNCOV
924
    pVgroup->version = 1;
×
UNCOV
925
    pVgroup->hashBegin = hashMin + hashInterval * v;
×
UNCOV
926
    if (v == pDb->cfg.numOfVgroups - 1) {
×
UNCOV
927
      pVgroup->hashEnd = hashMax;
×
928
    } else {
UNCOV
929
      pVgroup->hashEnd = hashMin + hashInterval * (v + 1) - 1;
×
930
    }
931

UNCOV
932
    memcpy(pVgroup->dbName, pDb->name, TSDB_DB_FNAME_LEN);
×
UNCOV
933
    pVgroup->dbUid = pDb->uid;
×
UNCOV
934
    pVgroup->replica = pDb->cfg.replications;
×
935

UNCOV
936
    if ((code = mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray)) != 0) {
×
UNCOV
937
      goto _OVER;
×
938
    }
939

UNCOV
940
    allocedVgroups++;
×
941
  }
942

UNCOV
943
  *ppVgroups = pVgroups;
×
UNCOV
944
  code = 0;
×
945

UNCOV
946
  mInfo("db:%s, total %d vgroups is alloced, replica:%d", pDb->name, pDb->cfg.numOfVgroups, pDb->cfg.replications);
×
947

948
_OVER:
×
UNCOV
949
  if (code != 0) taosMemoryFree(pVgroups);
×
UNCOV
950
  taosArrayDestroy(pArray);
×
UNCOV
951
  TAOS_RETURN(code);
×
952
}
953

UNCOV
954
SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup) {
×
UNCOV
955
  SEpSet epset = {0};
×
956

UNCOV
957
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
×
UNCOV
958
    const SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
×
UNCOV
959
    SDnodeObj       *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
×
UNCOV
960
    if (pDnode == NULL) continue;
×
961

UNCOV
962
    if (pVgid->syncState == TAOS_SYNC_STATE_LEADER || pVgid->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
×
UNCOV
963
      epset.inUse = epset.numOfEps;
×
964
    }
965

UNCOV
966
    if (addEpIntoEpSet(&epset, pDnode->fqdn, pDnode->port) != 0) {
×
967
      mWarn("vgId:%d, failed to add ep:%s:%d into epset", pVgroup->vgId, pDnode->fqdn, pDnode->port);
×
968
    }
UNCOV
969
    mndReleaseDnode(pMnode, pDnode);
×
970
  }
UNCOV
971
  epsetSort(&epset);
×
972

UNCOV
973
  return epset;
×
974
}
975

UNCOV
976
SEpSet mndGetVgroupEpsetById(SMnode *pMnode, int32_t vgId) {
×
UNCOV
977
  SEpSet epset = {0};
×
978

UNCOV
979
  SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
×
UNCOV
980
  if (!pVgroup) return epset;
×
981

UNCOV
982
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
×
UNCOV
983
    const SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
×
UNCOV
984
    SDnodeObj       *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
×
UNCOV
985
    if (pDnode == NULL) continue;
×
986

UNCOV
987
    if (pVgid->syncState == TAOS_SYNC_STATE_LEADER || pVgid->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
×
UNCOV
988
      epset.inUse = epset.numOfEps;
×
989
    }
990

UNCOV
991
    if (addEpIntoEpSet(&epset, pDnode->fqdn, pDnode->port) != 0) {
×
992
      mWarn("vgId:%d, failed to add ep:%s:%d into epset", pVgroup->vgId, pDnode->fqdn, pDnode->port);
×
993
    }
UNCOV
994
    mndReleaseDnode(pMnode, pDnode);
×
995
  }
996

UNCOV
997
  mndReleaseVgroup(pMnode, pVgroup);
×
UNCOV
998
  return epset;
×
999
}
1000

UNCOV
1001
static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
UNCOV
1002
  SMnode *pMnode = pReq->info.node;
×
UNCOV
1003
  SSdb   *pSdb = pMnode->pSdb;
×
UNCOV
1004
  int32_t numOfRows = 0;
×
UNCOV
1005
  SVgObj *pVgroup = NULL;
×
UNCOV
1006
  int32_t cols = 0;
×
UNCOV
1007
  int64_t curMs = taosGetTimestampMs();
×
UNCOV
1008
  int32_t code = 0;
×
1009

UNCOV
1010
  SDbObj *pDb = NULL;
×
UNCOV
1011
  if (strlen(pShow->db) > 0) {
×
UNCOV
1012
    pDb = mndAcquireDb(pMnode, pShow->db);
×
UNCOV
1013
    if (pDb == NULL) {
×
1014
      return 0;
×
1015
    }
1016
  }
1017

UNCOV
1018
  while (numOfRows < rows) {
×
UNCOV
1019
    pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
×
UNCOV
1020
    if (pShow->pIter == NULL) break;
×
1021

UNCOV
1022
    if (pDb != NULL && pVgroup->dbUid != pDb->uid) {
×
UNCOV
1023
      sdbRelease(pSdb, pVgroup);
×
UNCOV
1024
      continue;
×
1025
    }
1026

UNCOV
1027
    cols = 0;
×
UNCOV
1028
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1029
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->vgId, false);
×
UNCOV
1030
    if (code != 0) {
×
1031
      mError("vgId:%d, failed to set vgId, since %s", pVgroup->vgId, tstrerror(code));
×
1032
      return code;
×
1033
    }
1034

UNCOV
1035
    SName name = {0};
×
UNCOV
1036
    char  db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1037
    code = tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB);
×
UNCOV
1038
    if (code != 0) {
×
1039
      mError("vgId:%d, failed to set dbName, since %s", pVgroup->vgId, tstrerror(code));
×
1040
      return code;
×
1041
    }
UNCOV
1042
    (void)tNameGetDbName(&name, varDataVal(db));
×
UNCOV
1043
    varDataSetLen(db, strlen(varDataVal(db)));
×
1044

UNCOV
1045
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1046
    code = colDataSetVal(pColInfo, numOfRows, (const char *)db, false);
×
UNCOV
1047
    if (code != 0) {
×
1048
      mError("vgId:%d, failed to set dbName, since %s", pVgroup->vgId, tstrerror(code));
×
1049
      return code;
×
1050
    }
1051

UNCOV
1052
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1053
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->numOfTables, false);
×
UNCOV
1054
    if (code != 0) {
×
1055
      mError("vgId:%d, failed to set numOfTables, since %s", pVgroup->vgId, tstrerror(code));
×
1056
      return code;
×
1057
    }
1058

1059
    // default 3 replica, add 1 replica if move vnode
UNCOV
1060
    for (int32_t i = 0; i < 4; ++i) {
×
UNCOV
1061
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1062
      if (i < pVgroup->replica) {
×
UNCOV
1063
        int16_t dnodeId = (int16_t)pVgroup->vnodeGid[i].dnodeId;
×
UNCOV
1064
        code = colDataSetVal(pColInfo, numOfRows, (const char *)&dnodeId, false);
×
UNCOV
1065
        if (code != 0) {
×
1066
          mError("vgId:%d, failed to set dnodeId, since %s", pVgroup->vgId, tstrerror(code));
×
1067
          return code;
×
1068
        }
1069

UNCOV
1070
        bool       exist = false;
×
UNCOV
1071
        bool       online = false;
×
UNCOV
1072
        SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgroup->vnodeGid[i].dnodeId);
×
UNCOV
1073
        if (pDnode != NULL) {
×
UNCOV
1074
          exist = true;
×
UNCOV
1075
          online = mndIsDnodeOnline(pDnode, curMs);
×
UNCOV
1076
          mndReleaseDnode(pMnode, pDnode);
×
1077
        }
1078

UNCOV
1079
        char buf1[20] = {0};
×
UNCOV
1080
        char role[20] = "offline";
×
UNCOV
1081
        if (!exist) {
×
1082
          tstrncpy(role, "dropping", sizeof(role));
×
UNCOV
1083
        } else if (online) {
×
UNCOV
1084
          char *star = "";
×
UNCOV
1085
          if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEADER ||
×
UNCOV
1086
              pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
×
UNCOV
1087
            if (!pVgroup->vnodeGid[i].syncRestore && !pVgroup->vnodeGid[i].syncCanRead) {
×
UNCOV
1088
              star = "**";
×
UNCOV
1089
            } else if (!pVgroup->vnodeGid[i].syncRestore && pVgroup->vnodeGid[i].syncCanRead) {
×
1090
              star = "*";
×
1091
            } else {
1092
            }
1093
          }
UNCOV
1094
          snprintf(role, sizeof(role), "%s%s", syncStr(pVgroup->vnodeGid[i].syncState), star);
×
1095
          /*
1096
          mInfo("db:%s, learner progress:%d", pDb->name, pVgroup->vnodeGid[i].learnerProgress);
1097

1098
          if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEARNER) {
1099
            if(pVgroup->vnodeGid[i].learnerProgress < 0){
1100
              snprintf(role, sizeof(role), "%s-",
1101
                syncStr(pVgroup->vnodeGid[i].syncState));
1102

1103
            }
1104
            else if(pVgroup->vnodeGid[i].learnerProgress >= 100){
1105
              snprintf(role, sizeof(role), "%s--",
1106
                syncStr(pVgroup->vnodeGid[i].syncState));
1107
            }
1108
            else{
1109
              snprintf(role, sizeof(role), "%s%d",
1110
                syncStr(pVgroup->vnodeGid[i].syncState), pVgroup->vnodeGid[i].learnerProgress);
1111
            }
1112
          }
1113
          else{
1114
            snprintf(role, sizeof(role), "%s%s", syncStr(pVgroup->vnodeGid[i].syncState), star);
1115
          }
1116
          */
1117
        } else {
1118
        }
UNCOV
1119
        STR_WITH_MAXSIZE_TO_VARSTR(buf1, role, pShow->pMeta->pSchemas[cols].bytes);
×
1120

UNCOV
1121
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1122
        code = colDataSetVal(pColInfo, numOfRows, (const char *)buf1, false);
×
UNCOV
1123
        if (code != 0) {
×
1124
          mError("vgId:%d, failed to set role, since %s", pVgroup->vgId, tstrerror(code));
×
1125
          return code;
×
1126
        }
1127

UNCOV
1128
        char applyStr[TSDB_SYNC_APPLY_COMMIT_LEN + 1] = {0};
×
UNCOV
1129
        char buf[TSDB_SYNC_APPLY_COMMIT_LEN + VARSTR_HEADER_SIZE + 1] = {0};
×
UNCOV
1130
        snprintf(applyStr, sizeof(applyStr), "%" PRId64 "/%" PRId64, pVgroup->vnodeGid[i].syncAppliedIndex,
×
UNCOV
1131
                 pVgroup->vnodeGid[i].syncCommitIndex);
×
UNCOV
1132
        STR_WITH_MAXSIZE_TO_VARSTR(buf, applyStr, pShow->pMeta->pSchemas[cols].bytes);
×
1133

UNCOV
1134
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1135
        code = colDataSetVal(pColInfo, numOfRows, (const char *)&buf, false);
×
UNCOV
1136
        if (code != 0) {
×
1137
          mError("vgId:%d, failed to set role, since %s", pVgroup->vgId, tstrerror(code));
×
1138
          return code;
×
1139
        }
1140
      } else {
UNCOV
1141
        colDataSetNULL(pColInfo, numOfRows);
×
UNCOV
1142
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1143
        colDataSetNULL(pColInfo, numOfRows);
×
UNCOV
1144
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1145
        colDataSetNULL(pColInfo, numOfRows);
×
1146
      }
1147
    }
1148

UNCOV
1149
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1150
    int32_t cacheUsage = (int32_t)pVgroup->cacheUsage;
×
UNCOV
1151
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&cacheUsage, false);
×
UNCOV
1152
    if (code != 0) {
×
1153
      mError("vgId:%d, failed to set cacheUsage, since %s", pVgroup->vgId, tstrerror(code));
×
1154
      return code;
×
1155
    }
1156

UNCOV
1157
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1158
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->numOfCachedTables, false);
×
UNCOV
1159
    if (code != 0) {
×
1160
      mError("vgId:%d, failed to set numOfCachedTables, since %s", pVgroup->vgId, tstrerror(code));
×
1161
      return code;
×
1162
    }
1163

UNCOV
1164
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1165
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->isTsma, false);
×
UNCOV
1166
    if (code != 0) {
×
1167
      mError("vgId:%d, failed to set isTsma, since %s", pVgroup->vgId, tstrerror(code));
×
1168
      return code;
×
1169
    }
UNCOV
1170
    numOfRows++;
×
UNCOV
1171
    sdbRelease(pSdb, pVgroup);
×
1172
  }
1173

UNCOV
1174
  if (pDb != NULL) {
×
UNCOV
1175
    mndReleaseDb(pMnode, pDb);
×
1176
  }
1177

UNCOV
1178
  pShow->numOfRows += numOfRows;
×
UNCOV
1179
  return numOfRows;
×
1180
}
1181

1182
static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter) {
×
1183
  SSdb *pSdb = pMnode->pSdb;
×
1184
  sdbCancelFetchByType(pSdb, pIter, SDB_VGROUP);
×
1185
}
×
1186

UNCOV
1187
static bool mndGetVnodesNumFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
×
UNCOV
1188
  SVgObj  *pVgroup = pObj;
×
UNCOV
1189
  int32_t  dnodeId = *(int32_t *)p1;
×
UNCOV
1190
  int32_t *pNumOfVnodes = (int32_t *)p2;
×
1191

UNCOV
1192
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
×
UNCOV
1193
    if (pVgroup->vnodeGid[v].dnodeId == dnodeId) {
×
UNCOV
1194
      (*pNumOfVnodes)++;
×
1195
    }
1196
  }
1197

UNCOV
1198
  return true;
×
1199
}
1200

UNCOV
1201
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) {
×
UNCOV
1202
  int32_t numOfVnodes = 0;
×
UNCOV
1203
  sdbTraverse(pMnode->pSdb, SDB_VGROUP, mndGetVnodesNumFp, &dnodeId, &numOfVnodes, NULL);
×
UNCOV
1204
  return numOfVnodes;
×
1205
}
1206

UNCOV
1207
int64_t mndGetVgroupMemory(SMnode *pMnode, SDbObj *pDbInput, SVgObj *pVgroup) {
×
UNCOV
1208
  SDbObj *pDb = pDbInput;
×
UNCOV
1209
  if (pDbInput == NULL) {
×
UNCOV
1210
    pDb = mndAcquireDb(pMnode, pVgroup->dbName);
×
1211
  }
1212

UNCOV
1213
  int64_t vgroupMemroy = 0;
×
UNCOV
1214
  if (pDb != NULL) {
×
UNCOV
1215
    int64_t buffer = (int64_t)pDb->cfg.buffer * 1024 * 1024;
×
UNCOV
1216
    int64_t cache = (int64_t)pDb->cfg.pages * pDb->cfg.pageSize * 1024;
×
UNCOV
1217
    vgroupMemroy = buffer + cache;
×
UNCOV
1218
    int64_t cacheLast = (int64_t)pDb->cfg.cacheLastSize * 1024 * 1024;
×
UNCOV
1219
    if (pDb->cfg.cacheLast > 0) {
×
UNCOV
1220
      vgroupMemroy += cacheLast;
×
1221
    }
UNCOV
1222
    mDebug("db:%s, vgroup:%d, buffer:%" PRId64 " cache:%" PRId64 " cacheLast:%" PRId64, pDb->name, pVgroup->vgId,
×
1223
           buffer, cache, cacheLast);
1224
  }
1225

UNCOV
1226
  if (pDbInput == NULL) {
×
UNCOV
1227
    mndReleaseDb(pMnode, pDb);
×
1228
  }
UNCOV
1229
  return vgroupMemroy;
×
1230
}
1231

UNCOV
1232
static bool mndGetVnodeMemroyFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
×
UNCOV
1233
  SVgObj  *pVgroup = pObj;
×
UNCOV
1234
  int32_t  dnodeId = *(int32_t *)p1;
×
UNCOV
1235
  int64_t *pVnodeMemory = (int64_t *)p2;
×
1236

UNCOV
1237
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
×
UNCOV
1238
    if (pVgroup->vnodeGid[v].dnodeId == dnodeId) {
×
UNCOV
1239
      *pVnodeMemory += mndGetVgroupMemory(pMnode, NULL, pVgroup);
×
1240
    }
1241
  }
1242

UNCOV
1243
  return true;
×
1244
}
1245

UNCOV
1246
int64_t mndGetVnodesMemory(SMnode *pMnode, int32_t dnodeId) {
×
UNCOV
1247
  int64_t vnodeMemory = 0;
×
UNCOV
1248
  sdbTraverse(pMnode->pSdb, SDB_VGROUP, mndGetVnodeMemroyFp, &dnodeId, &vnodeMemory, NULL);
×
UNCOV
1249
  return vnodeMemory;
×
1250
}
1251

UNCOV
1252
void calculateRstoreFinishTime(double rate, int64_t applyCount, char *restoreStr, size_t restoreStrSize) {
×
UNCOV
1253
  if (rate == 0) {
×
UNCOV
1254
    snprintf(restoreStr, restoreStrSize, "0:0:0");
×
UNCOV
1255
    return;
×
1256
  }
1257

UNCOV
1258
  int64_t costTime = applyCount / rate;
×
UNCOV
1259
  int64_t totalSeconds = costTime / 1000;
×
UNCOV
1260
  int64_t hours = totalSeconds / 3600;
×
UNCOV
1261
  totalSeconds %= 3600;
×
UNCOV
1262
  int64_t minutes = totalSeconds / 60;
×
UNCOV
1263
  int64_t seconds = totalSeconds % 60;
×
UNCOV
1264
  snprintf(restoreStr, restoreStrSize, "%" PRId64 ":%" PRId64 ":%" PRId64, hours, minutes, seconds);
×
1265
}
1266

UNCOV
1267
static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
UNCOV
1268
  SMnode *pMnode = pReq->info.node;
×
UNCOV
1269
  SSdb   *pSdb = pMnode->pSdb;
×
UNCOV
1270
  int32_t numOfRows = 0;
×
UNCOV
1271
  SVgObj *pVgroup = NULL;
×
UNCOV
1272
  int32_t cols = 0;
×
UNCOV
1273
  int64_t curMs = taosGetTimestampMs();
×
UNCOV
1274
  int32_t code = 0;
×
1275

UNCOV
1276
  while (numOfRows < rows - TSDB_MAX_REPLICA) {
×
UNCOV
1277
    pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
×
UNCOV
1278
    if (pShow->pIter == NULL) break;
×
1279

UNCOV
1280
    for (int32_t i = 0; i < pVgroup->replica && numOfRows < rows; ++i) {
×
UNCOV
1281
      SVnodeGid       *pGid = &pVgroup->vnodeGid[i];
×
UNCOV
1282
      SColumnInfoData *pColInfo = NULL;
×
UNCOV
1283
      cols = 0;
×
1284

UNCOV
1285
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1286
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pGid->dnodeId, false);
×
UNCOV
1287
      if (code != 0) {
×
1288
        mError("vgId:%d, failed to set dnodeId, since %s", pVgroup->vgId, tstrerror(code));
×
1289
        return code;
×
1290
      }
UNCOV
1291
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1292
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->vgId, false);
×
UNCOV
1293
      if (code != 0) {
×
1294
        mError("vgId:%d, failed to set vgId, since %s", pVgroup->vgId, tstrerror(code));
×
1295
        return code;
×
1296
      }
1297

1298
      // db_name
UNCOV
1299
      const char *dbname = mndGetDbStr(pVgroup->dbName);
×
UNCOV
1300
      char        b1[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1301
      if (dbname != NULL) {
×
UNCOV
1302
        STR_WITH_MAXSIZE_TO_VARSTR(b1, dbname, TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE);
×
1303
      } else {
1304
        STR_WITH_MAXSIZE_TO_VARSTR(b1, "NULL", TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE);
×
1305
      }
UNCOV
1306
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1307
      code = colDataSetVal(pColInfo, numOfRows, (const char *)b1, false);
×
UNCOV
1308
      if (code != 0) {
×
1309
        mError("vgId:%d, failed to set dbName, since %s", pVgroup->vgId, tstrerror(code));
×
1310
        return code;
×
1311
      }
1312

1313
      // dnode is online?
UNCOV
1314
      SDnodeObj *pDnode = mndAcquireDnode(pMnode, pGid->dnodeId);
×
UNCOV
1315
      if (pDnode == NULL) {
×
1316
        mError("failed to acquire dnode. dnodeId:%d", pGid->dnodeId);
×
1317
        break;
×
1318
      }
UNCOV
1319
      bool isDnodeOnline = mndIsDnodeOnline(pDnode, curMs);
×
1320

UNCOV
1321
      char       buf[20] = {0};
×
UNCOV
1322
      ESyncState syncState = (isDnodeOnline) ? pGid->syncState : TAOS_SYNC_STATE_OFFLINE;
×
UNCOV
1323
      STR_TO_VARSTR(buf, syncStr(syncState));
×
UNCOV
1324
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1325
      code = colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
×
UNCOV
1326
      if (code != 0) {
×
1327
        mError("vgId:%d, failed to set syncState, since %s", pVgroup->vgId, tstrerror(code));
×
1328
        return code;
×
1329
      }
1330

UNCOV
1331
      int64_t roleTimeMs = (isDnodeOnline) ? pGid->roleTimeMs : 0;
×
UNCOV
1332
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1333
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&roleTimeMs, false);
×
UNCOV
1334
      if (code != 0) {
×
1335
        mError("vgId:%d, failed to set roleTimeMs, since %s", pVgroup->vgId, tstrerror(code));
×
1336
        return code;
×
1337
      }
1338

UNCOV
1339
      int64_t startTimeMs = (isDnodeOnline) ? pGid->startTimeMs : 0;
×
UNCOV
1340
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1341
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&startTimeMs, false);
×
UNCOV
1342
      if (code != 0) {
×
1343
        mError("vgId:%d, failed to set startTimeMs, since %s", pVgroup->vgId, tstrerror(code));
×
1344
        return code;
×
1345
      }
1346

UNCOV
1347
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1348
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pGid->syncRestore, false);
×
UNCOV
1349
      if (code != 0) {
×
1350
        mError("vgId:%d, failed to set syncRestore, since %s", pVgroup->vgId, tstrerror(code));
×
1351
        return code;
×
1352
      }
1353

UNCOV
1354
      int64_t unappliedCount = pGid->syncCommitIndex - pGid->syncAppliedIndex;
×
UNCOV
1355
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1356
      char restoreStr[20] = {0};
×
UNCOV
1357
      if (unappliedCount > 0) {
×
UNCOV
1358
        calculateRstoreFinishTime(pGid->appliedRate, unappliedCount, restoreStr, sizeof(restoreStr));
×
1359
      }
UNCOV
1360
      STR_TO_VARSTR(buf, restoreStr);
×
UNCOV
1361
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&buf, false);
×
UNCOV
1362
      if (code != 0) {
×
1363
        mError("vgId:%d, failed to set syncRestore finish time, since %s", pVgroup->vgId, tstrerror(code));
×
1364
        return code;
×
1365
      }
1366

UNCOV
1367
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1368
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&unappliedCount, false);
×
UNCOV
1369
      if (code != 0) {
×
1370
        mError("vgId:%d, failed to set syncRestore, since %s", pVgroup->vgId, tstrerror(code));
×
1371
        return code;
×
1372
      }
1373

UNCOV
1374
      numOfRows++;
×
UNCOV
1375
      sdbRelease(pSdb, pDnode);
×
1376
    }
1377

UNCOV
1378
    sdbRelease(pSdb, pVgroup);
×
1379
  }
1380

UNCOV
1381
  pShow->numOfRows += numOfRows;
×
UNCOV
1382
  return numOfRows;
×
1383
}
1384

1385
static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter) {
×
1386
  SSdb *pSdb = pMnode->pSdb;
×
1387
  sdbCancelFetchByType(pSdb, pIter, SDB_VGROUP);
×
1388
}
×
1389

UNCOV
1390
static int32_t mndAddVnodeToVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray) {
×
UNCOV
1391
  int32_t code = 0;
×
UNCOV
1392
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
×
UNCOV
1393
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
×
UNCOV
1394
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
×
UNCOV
1395
    mInfo("dnode:%d, equivalent vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes);
×
1396
  }
1397

UNCOV
1398
  SVnodeGid *pVgid = &pVgroup->vnodeGid[pVgroup->replica];
×
UNCOV
1399
  for (int32_t d = 0; d < taosArrayGetSize(pArray); ++d) {
×
UNCOV
1400
    SDnodeObj *pDnode = taosArrayGet(pArray, d);
×
1401

UNCOV
1402
    bool used = false;
×
UNCOV
1403
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
×
UNCOV
1404
      if (pDnode->id == pVgroup->vnodeGid[vn].dnodeId) {
×
UNCOV
1405
        used = true;
×
UNCOV
1406
        break;
×
1407
      }
1408
    }
UNCOV
1409
    if (used) continue;
×
1410

UNCOV
1411
    if (pDnode == NULL) {
×
1412
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_DNODES);
×
1413
    }
UNCOV
1414
    if (pDnode->numOfVnodes >= pDnode->numOfSupportVnodes) {
×
1415
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_VNODES);
×
1416
    }
1417

UNCOV
1418
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
×
UNCOV
1419
    if (pDnode->memAvail - vgMem - pDnode->memUsed <= 0) {
×
1420
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
1421
             pVgroup->dbName, pVgroup->vgId, vgMem, pDnode->id, pDnode->memAvail, pDnode->memUsed);
1422
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE);
×
1423
    } else {
UNCOV
1424
      pDnode->memUsed += vgMem;
×
1425
    }
1426

UNCOV
1427
    pVgid->dnodeId = pDnode->id;
×
UNCOV
1428
    pVgid->syncState = TAOS_SYNC_STATE_OFFLINE;
×
UNCOV
1429
    mInfo("db:%s, vgId:%d, vn:%d is added, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
×
1430
          pVgroup->dbName, pVgroup->vgId, pVgroup->replica, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
1431

UNCOV
1432
    pVgroup->replica++;
×
UNCOV
1433
    pDnode->numOfVnodes++;
×
1434

UNCOV
1435
    SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
×
UNCOV
1436
    if (pVgRaw == NULL) {
×
1437
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1438
      if (terrno != 0) code = terrno;
×
1439
      TAOS_RETURN(code);
×
1440
    }
UNCOV
1441
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
1442
      sdbFreeRaw(pVgRaw);
×
1443
      TAOS_RETURN(code);
×
1444
    }
UNCOV
1445
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
UNCOV
1446
    if (code != 0) {
×
1447
      mError("vgId:%d, failed to set raw status since %s at line:%d", pVgroup->vgId, tstrerror(code), __LINE__);
×
1448
    }
UNCOV
1449
    TAOS_RETURN(code);
×
1450
  }
1451

UNCOV
1452
  code = TSDB_CODE_MND_NO_ENOUGH_DNODES;
×
UNCOV
1453
  mError("db:%s, failed to add vnode to vgId:%d since %s", pVgroup->dbName, pVgroup->vgId, tstrerror(code));
×
UNCOV
1454
  TAOS_RETURN(code);
×
1455
}
1456

UNCOV
1457
static int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray,
×
1458
                                        SVnodeGid *pDelVgid) {
UNCOV
1459
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
×
UNCOV
1460
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
×
UNCOV
1461
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
×
UNCOV
1462
    mInfo("dnode:%d, equivalent vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes);
×
1463
  }
1464

UNCOV
1465
  int32_t code = -1;
×
UNCOV
1466
  for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) {
×
UNCOV
1467
    SDnodeObj *pDnode = taosArrayGet(pArray, d);
×
1468

UNCOV
1469
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
×
UNCOV
1470
      SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
×
UNCOV
1471
      if (pVgid->dnodeId == pDnode->id) {
×
UNCOV
1472
        int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
×
UNCOV
1473
        pDnode->memUsed -= vgMem;
×
UNCOV
1474
        mInfo("db:%s, vgId:%d, vn:%d is removed, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
×
1475
              pVgroup->dbName, pVgroup->vgId, vn, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
UNCOV
1476
        pDnode->numOfVnodes--;
×
UNCOV
1477
        pVgroup->replica--;
×
UNCOV
1478
        *pDelVgid = *pVgid;
×
UNCOV
1479
        *pVgid = pVgroup->vnodeGid[pVgroup->replica];
×
UNCOV
1480
        memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
×
UNCOV
1481
        code = 0;
×
UNCOV
1482
        goto _OVER;
×
1483
      }
1484
    }
1485
  }
1486

1487
_OVER:
×
UNCOV
1488
  if (code != 0) {
×
1489
    code = TSDB_CODE_APP_ERROR;
×
1490
    mError("db:%s, failed to remove vnode from vgId:%d since %s", pVgroup->dbName, pVgroup->vgId, tstrerror(code));
×
1491
    TAOS_RETURN(code);
×
1492
  }
1493

UNCOV
1494
  for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
×
UNCOV
1495
    SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
×
UNCOV
1496
    mInfo("db:%s, vgId:%d, vn:%d dnode:%d is reserved", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId);
×
1497
  }
1498

UNCOV
1499
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
×
UNCOV
1500
  if (pVgRaw == NULL) {
×
1501
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1502
    if (terrno != 0) code = terrno;
×
1503
    TAOS_RETURN(code);
×
1504
  }
UNCOV
1505
  if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) {
×
1506
    sdbFreeRaw(pVgRaw);
×
1507
    TAOS_RETURN(code);
×
1508
  }
UNCOV
1509
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
UNCOV
1510
  if (code != 0) {
×
1511
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVgroup->vgId, tstrerror(code), __LINE__);
×
1512
  }
1513

UNCOV
1514
  TAOS_RETURN(code);
×
1515
}
1516

1517
static int32_t mndRemoveVnodeFromVgroupWithoutSave(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray,
×
1518
                                                   SVnodeGid *pDelVgid) {
1519
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
×
1520
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
×
1521
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
×
1522
    mInfo("dnode:%d, equivalent vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes);
×
1523
  }
1524

1525
  int32_t code = -1;
×
1526
  for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) {
×
1527
    SDnodeObj *pDnode = taosArrayGet(pArray, d);
×
1528

1529
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
×
1530
      SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
×
1531
      if (pVgid->dnodeId == pDnode->id) {
×
1532
        int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
×
1533
        pDnode->memUsed -= vgMem;
×
1534
        mInfo("db:%s, vgId:%d, vn:%d is removed, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
×
1535
              pVgroup->dbName, pVgroup->vgId, vn, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
1536
        pDnode->numOfVnodes--;
×
1537
        pVgroup->replica--;
×
1538
        *pDelVgid = *pVgid;
×
1539
        *pVgid = pVgroup->vnodeGid[pVgroup->replica];
×
1540
        memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
×
1541
        code = 0;
×
1542
        goto _OVER;
×
1543
      }
1544
    }
1545
  }
1546

1547
_OVER:
×
1548
  if (code != 0) {
×
1549
    code = TSDB_CODE_APP_ERROR;
×
1550
    mError("db:%s, failed to remove vnode from vgId:%d since %s", pVgroup->dbName, pVgroup->vgId, tstrerror(code));
×
1551
    TAOS_RETURN(code);
×
1552
  }
1553

1554
  for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
×
1555
    SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
×
1556
    mInfo("db:%s, vgId:%d, vn:%d dnode:%d is reserved", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId);
×
1557
  }
1558

1559
  TAOS_RETURN(code);
×
1560
}
1561

UNCOV
1562
int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid) {
×
UNCOV
1563
  int32_t      code = 0;
×
UNCOV
1564
  STransAction action = {0};
×
1565

UNCOV
1566
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
×
UNCOV
1567
  if (pDnode == NULL) return -1;
×
UNCOV
1568
  action.epSet = mndGetDnodeEpset(pDnode);
×
UNCOV
1569
  mndReleaseDnode(pMnode, pDnode);
×
1570

UNCOV
1571
  int32_t contLen = 0;
×
UNCOV
1572
  void   *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
×
UNCOV
1573
  if (pReq == NULL) return -1;
×
1574

UNCOV
1575
  action.pCont = pReq;
×
UNCOV
1576
  action.contLen = contLen;
×
UNCOV
1577
  action.msgType = TDMT_DND_CREATE_VNODE;
×
UNCOV
1578
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
×
1579

UNCOV
1580
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
1581
    taosMemoryFree(pReq);
×
1582
    TAOS_RETURN(code);
×
1583
  }
1584

UNCOV
1585
  TAOS_RETURN(code);
×
1586
}
1587

UNCOV
1588
int32_t mndRestoreAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
×
1589
                                       SDnodeObj *pDnode) {
UNCOV
1590
  int32_t      code = 0;
×
UNCOV
1591
  STransAction action = {0};
×
1592

UNCOV
1593
  action.epSet = mndGetDnodeEpset(pDnode);
×
1594

UNCOV
1595
  int32_t contLen = 0;
×
UNCOV
1596
  void   *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
×
UNCOV
1597
  if (pReq == NULL) {
×
1598
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1599
    if (terrno != 0) code = terrno;
×
1600
    TAOS_RETURN(code);
×
1601
  }
1602

UNCOV
1603
  action.pCont = pReq;
×
UNCOV
1604
  action.contLen = contLen;
×
UNCOV
1605
  action.msgType = TDMT_DND_CREATE_VNODE;
×
UNCOV
1606
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
×
1607

UNCOV
1608
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
1609
    taosMemoryFree(pReq);
×
1610
    TAOS_RETURN(code);
×
1611
  }
1612

UNCOV
1613
  TAOS_RETURN(code);
×
1614
}
1615

UNCOV
1616
int32_t mndAddAlterVnodeConfirmAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
×
UNCOV
1617
  int32_t      code = 0;
×
UNCOV
1618
  STransAction action = {0};
×
UNCOV
1619
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
1620

UNCOV
1621
  mInfo("vgId:%d, build alter vnode confirm req", pVgroup->vgId);
×
UNCOV
1622
  int32_t   contLen = sizeof(SMsgHead);
×
UNCOV
1623
  SMsgHead *pHead = taosMemoryMalloc(contLen);
×
UNCOV
1624
  if (pHead == NULL) {
×
1625
    TAOS_RETURN(terrno);
×
1626
  }
1627

UNCOV
1628
  pHead->contLen = htonl(contLen);
×
UNCOV
1629
  pHead->vgId = htonl(pVgroup->vgId);
×
1630

UNCOV
1631
  action.pCont = pHead;
×
UNCOV
1632
  action.contLen = contLen;
×
UNCOV
1633
  action.msgType = TDMT_VND_ALTER_CONFIRM;
×
1634
  // incorrect redirect result will cause this erro
UNCOV
1635
  action.retryCode = TSDB_CODE_VND_INVALID_VGROUP_ID;
×
1636

UNCOV
1637
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
1638
    taosMemoryFree(pHead);
×
1639
    TAOS_RETURN(code);
×
1640
  }
1641

UNCOV
1642
  TAOS_RETURN(code);
×
1643
}
1644

1645
int32_t mndAddChangeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pOldVgroup, SVgObj *pNewVgroup,
×
1646
                                 int32_t dnodeId) {
1647
  int32_t      code = 0;
×
1648
  STransAction action = {0};
×
1649
  action.epSet = mndGetVgroupEpset(pMnode, pNewVgroup);
×
1650

1651
  int32_t contLen = 0;
×
1652
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pNewVgroup, dnodeId, &contLen);
×
1653
  if (pReq == NULL) {
×
1654
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1655
    if (terrno != 0) code = terrno;
×
1656
    TAOS_RETURN(code);
×
1657
  }
1658

1659
  int32_t totallen = contLen + sizeof(SMsgHead);
×
1660

1661
  SMsgHead *pHead = taosMemoryMalloc(totallen);
×
1662
  if (pHead == NULL) {
×
1663
    taosMemoryFree(pReq);
×
1664
    TAOS_RETURN(terrno);
×
1665
  }
1666

1667
  pHead->contLen = htonl(totallen);
×
1668
  pHead->vgId = htonl(pNewVgroup->vgId);
×
1669

1670
  memcpy((void *)(pHead + 1), pReq, contLen);
×
1671
  taosMemoryFree(pReq);
×
1672

1673
  action.pCont = pHead;
×
1674
  action.contLen = totallen;
×
1675
  action.msgType = TDMT_SYNC_CONFIG_CHANGE;
×
1676

1677
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
1678
    taosMemoryFree(pHead);
×
1679
    TAOS_RETURN(code);
×
1680
  }
1681

1682
  TAOS_RETURN(code);
×
1683
}
1684

UNCOV
1685
static int32_t mndAddAlterVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, int32_t srcVgId, SVgObj *pVgroup) {
×
UNCOV
1686
  int32_t      code = 0;
×
UNCOV
1687
  STransAction action = {0};
×
UNCOV
1688
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
1689

UNCOV
1690
  int32_t contLen = 0;
×
UNCOV
1691
  void   *pReq = mndBuildAlterVnodeHashRangeReq(pMnode, srcVgId, pVgroup, &contLen);
×
UNCOV
1692
  if (pReq == NULL) {
×
1693
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1694
    if (terrno != 0) code = terrno;
×
1695
    TAOS_RETURN(code);
×
1696
  }
1697

UNCOV
1698
  action.pCont = pReq;
×
UNCOV
1699
  action.contLen = contLen;
×
UNCOV
1700
  action.msgType = TDMT_VND_ALTER_HASHRANGE;
×
UNCOV
1701
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
×
1702

UNCOV
1703
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
1704
    taosMemoryFree(pReq);
×
1705
    TAOS_RETURN(code);
×
1706
  }
1707

UNCOV
1708
  mInfo("trans:%d, add alter vnode hash range action for from vgId:%d to vgId:%d", pTrans->id, srcVgId, pVgroup->vgId);
×
UNCOV
1709
  TAOS_RETURN(code);
×
1710
}
1711

UNCOV
1712
int32_t mndAddAlterVnodeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
×
UNCOV
1713
  int32_t      code = 0;
×
UNCOV
1714
  STransAction action = {0};
×
UNCOV
1715
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
1716

UNCOV
1717
  int32_t contLen = 0;
×
UNCOV
1718
  void   *pReq = mndBuildAlterVnodeConfigReq(pMnode, pDb, pVgroup, &contLen);
×
UNCOV
1719
  if (pReq == NULL) {
×
1720
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1721
    if (terrno != 0) code = terrno;
×
1722
    TAOS_RETURN(code);
×
1723
  }
1724

UNCOV
1725
  action.pCont = pReq;
×
UNCOV
1726
  action.contLen = contLen;
×
UNCOV
1727
  action.msgType = TDMT_VND_ALTER_CONFIG;
×
1728

UNCOV
1729
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
1730
    taosMemoryFree(pReq);
×
1731
    TAOS_RETURN(code);
×
1732
  }
1733

UNCOV
1734
  TAOS_RETURN(code);
×
1735
}
1736

UNCOV
1737
int32_t mndAddNewVgPrepareAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVg) {
×
UNCOV
1738
  int32_t  code = 0;
×
UNCOV
1739
  SSdbRaw *pRaw = mndVgroupActionEncode(pVg);
×
UNCOV
1740
  if (pRaw == NULL) {
×
1741
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1742
    if (terrno != 0) code = terrno;
×
1743
    goto _err;
×
1744
  }
1745

UNCOV
1746
  TAOS_CHECK_GOTO(mndTransAppendPrepareLog(pTrans, pRaw), NULL, _err);
×
UNCOV
1747
  if (sdbSetRawStatus(pRaw, SDB_STATUS_CREATING) != 0) {
×
1748
    mError("vgId:%d, failed to set raw status at line:%d", pVg->vgId, __LINE__);
×
1749
  }
UNCOV
1750
  if (code != 0) {
×
1751
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVg->vgId, tstrerror(code), __LINE__);
×
1752
    TAOS_RETURN(code);
×
1753
  }
UNCOV
1754
  pRaw = NULL;
×
UNCOV
1755
  TAOS_RETURN(code);
×
1756

1757
_err:
×
1758
  sdbFreeRaw(pRaw);
×
1759
  TAOS_RETURN(code);
×
1760
}
1761

UNCOV
1762
int32_t mndAddAlterVnodeReplicaAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
×
UNCOV
1763
  int32_t    code = 0;
×
UNCOV
1764
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
×
UNCOV
1765
  if (pDnode == NULL) {
×
1766
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1767
    if (terrno != 0) code = terrno;
×
1768
    TAOS_RETURN(code);
×
1769
  }
1770

UNCOV
1771
  STransAction action = {0};
×
UNCOV
1772
  action.epSet = mndGetDnodeEpset(pDnode);
×
UNCOV
1773
  mndReleaseDnode(pMnode, pDnode);
×
1774

UNCOV
1775
  int32_t contLen = 0;
×
UNCOV
1776
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
×
UNCOV
1777
  if (pReq == NULL) {
×
1778
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1779
    if (terrno != 0) code = terrno;
×
1780
    TAOS_RETURN(code);
×
1781
  }
1782

UNCOV
1783
  action.pCont = pReq;
×
UNCOV
1784
  action.contLen = contLen;
×
UNCOV
1785
  action.msgType = TDMT_VND_ALTER_REPLICA;
×
1786

UNCOV
1787
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
1788
    taosMemoryFree(pReq);
×
1789
    TAOS_RETURN(code);
×
1790
  }
1791

UNCOV
1792
  TAOS_RETURN(code);
×
1793
}
1794

1795
int32_t mndAddCheckLearnerCatchupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
×
1796
  int32_t    code = 0;
×
1797
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
×
1798
  if (pDnode == NULL) {
×
1799
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1800
    if (terrno != 0) code = terrno;
×
1801
    TAOS_RETURN(code);
×
1802
  }
1803

1804
  STransAction action = {0};
×
1805
  action.epSet = mndGetDnodeEpset(pDnode);
×
1806
  mndReleaseDnode(pMnode, pDnode);
×
1807

1808
  int32_t contLen = 0;
×
1809
  void   *pReq = mndBuildCheckLearnCatchupReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
×
1810
  if (pReq == NULL) {
×
1811
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1812
    if (terrno != 0) code = terrno;
×
1813
    TAOS_RETURN(code);
×
1814
  }
1815

1816
  action.pCont = pReq;
×
1817
  action.contLen = contLen;
×
1818
  action.msgType = TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP;
×
1819
  action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
1820
  action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
×
1821

1822
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
1823
    taosMemoryFree(pReq);
×
1824
    TAOS_RETURN(code);
×
1825
  }
1826

1827
  TAOS_RETURN(code);
×
1828
}
1829

UNCOV
1830
int32_t mndAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
×
UNCOV
1831
  int32_t    code = 0;
×
UNCOV
1832
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
×
UNCOV
1833
  if (pDnode == NULL) {
×
1834
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1835
    if (terrno != 0) code = terrno;
×
1836
    TAOS_RETURN(code);
×
1837
  }
1838

UNCOV
1839
  STransAction action = {0};
×
UNCOV
1840
  action.epSet = mndGetDnodeEpset(pDnode);
×
UNCOV
1841
  mndReleaseDnode(pMnode, pDnode);
×
1842

UNCOV
1843
  int32_t contLen = 0;
×
UNCOV
1844
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
×
UNCOV
1845
  if (pReq == NULL) {
×
1846
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1847
    if (terrno != 0) code = terrno;
×
1848
    TAOS_RETURN(code);
×
1849
  }
1850

UNCOV
1851
  action.pCont = pReq;
×
UNCOV
1852
  action.contLen = contLen;
×
UNCOV
1853
  action.msgType = TDMT_DND_ALTER_VNODE_TYPE;
×
UNCOV
1854
  action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
UNCOV
1855
  action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
×
1856

UNCOV
1857
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
1858
    taosMemoryFree(pReq);
×
1859
    TAOS_RETURN(code);
×
1860
  }
1861

UNCOV
1862
  TAOS_RETURN(code);
×
1863
}
1864

UNCOV
1865
int32_t mndRestoreAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
×
1866
                                          SDnodeObj *pDnode) {
UNCOV
1867
  int32_t      code = 0;
×
UNCOV
1868
  STransAction action = {0};
×
UNCOV
1869
  action.epSet = mndGetDnodeEpset(pDnode);
×
1870

UNCOV
1871
  int32_t contLen = 0;
×
UNCOV
1872
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, pDnode->id, &contLen);
×
UNCOV
1873
  if (pReq == NULL) {
×
1874
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1875
    if (terrno != 0) code = terrno;
×
1876
    TAOS_RETURN(code);
×
1877
  }
1878

UNCOV
1879
  action.pCont = pReq;
×
UNCOV
1880
  action.contLen = contLen;
×
UNCOV
1881
  action.msgType = TDMT_DND_ALTER_VNODE_TYPE;
×
UNCOV
1882
  action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
UNCOV
1883
  action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
×
1884

UNCOV
1885
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
1886
    taosMemoryFree(pReq);
×
1887
    TAOS_RETURN(code);
×
1888
  }
1889

UNCOV
1890
  TAOS_RETURN(code);
×
1891
}
1892

UNCOV
1893
static int32_t mndAddDisableVnodeWriteAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
×
1894
                                             int32_t dnodeId) {
UNCOV
1895
  int32_t    code = 0;
×
UNCOV
1896
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
×
UNCOV
1897
  if (pDnode == NULL) {
×
1898
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1899
    if (terrno != 0) code = terrno;
×
1900
    TAOS_RETURN(code);
×
1901
  }
1902

UNCOV
1903
  STransAction action = {0};
×
UNCOV
1904
  action.epSet = mndGetDnodeEpset(pDnode);
×
UNCOV
1905
  mndReleaseDnode(pMnode, pDnode);
×
1906

UNCOV
1907
  int32_t contLen = 0;
×
UNCOV
1908
  void   *pReq = mndBuildDisableVnodeWriteReq(pMnode, pDb, pVgroup->vgId, &contLen);
×
UNCOV
1909
  if (pReq == NULL) {
×
1910
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1911
    if (terrno != 0) code = terrno;
×
1912
    TAOS_RETURN(code);
×
1913
  }
1914

UNCOV
1915
  action.pCont = pReq;
×
UNCOV
1916
  action.contLen = contLen;
×
UNCOV
1917
  action.msgType = TDMT_VND_DISABLE_WRITE;
×
1918

UNCOV
1919
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
1920
    taosMemoryFree(pReq);
×
1921
    TAOS_RETURN(code);
×
1922
  }
1923

UNCOV
1924
  TAOS_RETURN(code);
×
1925
}
1926

UNCOV
1927
int32_t mndAddDropVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid,
×
1928
                              bool isRedo) {
UNCOV
1929
  int32_t      code = 0;
×
UNCOV
1930
  STransAction action = {0};
×
1931

UNCOV
1932
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
×
UNCOV
1933
  if (pDnode == NULL) {
×
1934
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1935
    if (terrno != 0) code = terrno;
×
1936
    TAOS_RETURN(code);
×
1937
  }
UNCOV
1938
  action.epSet = mndGetDnodeEpset(pDnode);
×
UNCOV
1939
  mndReleaseDnode(pMnode, pDnode);
×
1940

UNCOV
1941
  int32_t contLen = 0;
×
UNCOV
1942
  void   *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
×
UNCOV
1943
  if (pReq == NULL) {
×
1944
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1945
    if (terrno != 0) code = terrno;
×
1946
    TAOS_RETURN(code);
×
1947
  }
1948

UNCOV
1949
  action.pCont = pReq;
×
UNCOV
1950
  action.contLen = contLen;
×
UNCOV
1951
  action.msgType = TDMT_DND_DROP_VNODE;
×
UNCOV
1952
  action.acceptableCode = TSDB_CODE_VND_NOT_EXIST;
×
1953

UNCOV
1954
  if (isRedo) {
×
UNCOV
1955
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
1956
      taosMemoryFree(pReq);
×
1957
      TAOS_RETURN(code);
×
1958
    }
1959
  } else {
UNCOV
1960
    if ((code = mndTransAppendUndoAction(pTrans, &action)) != 0) {
×
1961
      taosMemoryFree(pReq);
×
1962
      TAOS_RETURN(code);
×
1963
    }
1964
  }
1965

UNCOV
1966
  TAOS_RETURN(code);
×
1967
}
1968

UNCOV
1969
int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t vnIndex,
×
1970
                                    SArray *pArray, bool force, bool unsafe) {
UNCOV
1971
  int32_t code = 0;
×
UNCOV
1972
  SVgObj  newVg = {0};
×
UNCOV
1973
  memcpy(&newVg, pVgroup, sizeof(SVgObj));
×
1974

UNCOV
1975
  mInfo("vgId:%d, vgroup info before move, replica:%d", newVg.vgId, newVg.replica);
×
UNCOV
1976
  for (int32_t i = 0; i < newVg.replica; ++i) {
×
UNCOV
1977
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
×
1978
  }
1979

UNCOV
1980
  if (!force) {
×
1981
#if 1
1982
    {
1983
#else
1984
    if (newVg.replica == 1) {
1985
#endif
UNCOV
1986
      mInfo("vgId:%d, will add 1 vnode, replca:%d", pVgroup->vgId, newVg.replica);
×
UNCOV
1987
      TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray));
×
UNCOV
1988
      for (int32_t i = 0; i < newVg.replica - 1; ++i) {
×
UNCOV
1989
        TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId));
×
1990
      }
UNCOV
1991
      TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[newVg.replica - 1]));
×
UNCOV
1992
      TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg));
×
1993

UNCOV
1994
      mInfo("vgId:%d, will remove 1 vnode, replca:2", pVgroup->vgId);
×
UNCOV
1995
      newVg.replica--;
×
UNCOV
1996
      SVnodeGid del = newVg.vnodeGid[vnIndex];
×
UNCOV
1997
      newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
×
UNCOV
1998
      memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
×
1999
      {
UNCOV
2000
        SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
×
UNCOV
2001
        if (pRaw == NULL) {
×
2002
          code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2003
          if (terrno != 0) code = terrno;
×
2004
          TAOS_RETURN(code);
×
2005
        }
UNCOV
2006
        if ((code = mndTransAppendRedolog(pTrans, pRaw)) != 0) {
×
2007
          sdbFreeRaw(pRaw);
×
2008
          TAOS_RETURN(code);
×
2009
        }
UNCOV
2010
        code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
×
UNCOV
2011
        if (code != 0) {
×
2012
          mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2013
          return code;
×
2014
        }
2015
      }
2016

UNCOV
2017
      TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true));
×
UNCOV
2018
      for (int32_t i = 0; i < newVg.replica; ++i) {
×
UNCOV
2019
        TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId));
×
2020
      }
UNCOV
2021
      TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg));
×
2022
#if 1
2023
    }
2024
#else
2025
    } else {  // new replica == 3
2026
      mInfo("vgId:%d, will add 1 vnode, replca:3", pVgroup->vgId);
2027
      if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1;
2028
      mInfo("vgId:%d, will remove 1 vnode, replca:4", pVgroup->vgId);
2029
      newVg.replica--;
2030
      SVnodeGid del = newVg.vnodeGid[vnIndex];
2031
      newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
2032
      memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
2033
      {
2034
        SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
2035
        if (pRaw == NULL) return -1;
2036
        if (mndTransAppendRedolog(pTrans, pRaw) != 0) {
2037
          sdbFreeRaw(pRaw);
2038
          return -1;
2039
        }
2040
      }
2041

2042
      if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true) != 0) return -1;
2043
      for (int32_t i = 0; i < newVg.replica; ++i) {
2044
        if (i == vnIndex) continue;
2045
        if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1;
2046
      }
2047
      if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[vnIndex]) != 0) return -1;
2048
      if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
2049
    }
2050
#endif
2051
  } else {
UNCOV
2052
    mInfo("vgId:%d, will add 1 vnode and force remove 1 vnode", pVgroup->vgId);
×
UNCOV
2053
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray));
×
UNCOV
2054
    newVg.replica--;
×
2055
    // SVnodeGid del = newVg.vnodeGid[vnIndex];
UNCOV
2056
    newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
×
UNCOV
2057
    memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
×
2058
    {
UNCOV
2059
      SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
×
UNCOV
2060
      if (pRaw == NULL) {
×
2061
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2062
        if (terrno != 0) code = terrno;
×
2063
        TAOS_RETURN(code);
×
2064
      }
UNCOV
2065
      if ((code = mndTransAppendRedolog(pTrans, pRaw)) != 0) {
×
2066
        sdbFreeRaw(pRaw);
×
2067
        TAOS_RETURN(code);
×
2068
      }
UNCOV
2069
      code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
×
UNCOV
2070
      if (code != 0) {
×
2071
        mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2072
        return code;
×
2073
      }
2074
    }
2075

UNCOV
2076
    for (int32_t i = 0; i < newVg.replica; ++i) {
×
UNCOV
2077
      if (i != vnIndex) {
×
UNCOV
2078
        TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId));
×
2079
      }
2080
    }
UNCOV
2081
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[vnIndex]));
×
UNCOV
2082
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg));
×
2083

UNCOV
2084
    if (newVg.replica == 1) {
×
UNCOV
2085
      if (force && !unsafe) {
×
UNCOV
2086
        TAOS_RETURN(TSDB_CODE_VND_META_DATA_UNSAFE_DELETE);
×
2087
      }
2088

UNCOV
2089
      SSdb *pSdb = pMnode->pSdb;
×
UNCOV
2090
      void *pIter = NULL;
×
2091

UNCOV
2092
      while (1) {
×
UNCOV
2093
        SStbObj *pStb = NULL;
×
UNCOV
2094
        pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
×
UNCOV
2095
        if (pIter == NULL) break;
×
2096

UNCOV
2097
        if (strcmp(pStb->db, pDb->name) == 0) {
×
UNCOV
2098
          if ((code = mndSetForceDropCreateStbRedoActions(pMnode, pTrans, &newVg, pStb)) != 0) {
×
2099
            sdbCancelFetch(pSdb, pIter);
×
2100
            sdbRelease(pSdb, pStb);
×
2101
            TAOS_RETURN(code);
×
2102
          }
2103
        }
2104

UNCOV
2105
        sdbRelease(pSdb, pStb);
×
2106
      }
2107

UNCOV
2108
      mInfo("vgId:%d, all data is dropped since replica=1", pVgroup->vgId);
×
2109
    }
2110
  }
2111

2112
  {
UNCOV
2113
    SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
×
UNCOV
2114
    if (pRaw == NULL) {
×
2115
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2116
      if (terrno != 0) code = terrno;
×
2117
      TAOS_RETURN(code);
×
2118
    }
UNCOV
2119
    if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
×
2120
      sdbFreeRaw(pRaw);
×
2121
      TAOS_RETURN(code);
×
2122
    }
UNCOV
2123
    code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
×
UNCOV
2124
    if (code != 0) {
×
2125
      mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2126
      return code;
×
2127
    }
2128
  }
2129

UNCOV
2130
  mInfo("vgId:%d, vgroup info after move, replica:%d", newVg.vgId, newVg.replica);
×
UNCOV
2131
  for (int32_t i = 0; i < newVg.replica; ++i) {
×
UNCOV
2132
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
×
2133
  }
UNCOV
2134
  TAOS_RETURN(code);
×
2135
}
2136

UNCOV
2137
int32_t mndSetMoveVgroupsInfoToTrans(SMnode *pMnode, STrans *pTrans, int32_t delDnodeId, bool force, bool unsafe) {
×
UNCOV
2138
  int32_t code = 0;
×
UNCOV
2139
  SArray *pArray = mndBuildDnodesArray(pMnode, delDnodeId, NULL);
×
UNCOV
2140
  if (pArray == NULL) {
×
2141
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2142
    if (terrno != 0) code = terrno;
×
2143
    TAOS_RETURN(code);
×
2144
  }
2145

UNCOV
2146
  void *pIter = NULL;
×
UNCOV
2147
  while (1) {
×
UNCOV
2148
    SVgObj *pVgroup = NULL;
×
UNCOV
2149
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
UNCOV
2150
    if (pIter == NULL) break;
×
2151

UNCOV
2152
    int32_t vnIndex = -1;
×
UNCOV
2153
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
×
UNCOV
2154
      if (pVgroup->vnodeGid[i].dnodeId == delDnodeId) {
×
UNCOV
2155
        vnIndex = i;
×
UNCOV
2156
        break;
×
2157
      }
2158
    }
2159

UNCOV
2160
    code = 0;
×
UNCOV
2161
    if (vnIndex != -1) {
×
UNCOV
2162
      mInfo("vgId:%d, vnode:%d will be removed from dnode:%d, force:%d", pVgroup->vgId, vnIndex, delDnodeId, force);
×
UNCOV
2163
      SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
×
UNCOV
2164
      code = mndSetMoveVgroupInfoToTrans(pMnode, pTrans, pDb, pVgroup, vnIndex, pArray, force, unsafe);
×
UNCOV
2165
      mndReleaseDb(pMnode, pDb);
×
2166
    }
2167

UNCOV
2168
    sdbRelease(pMnode->pSdb, pVgroup);
×
2169

UNCOV
2170
    if (code != 0) {
×
UNCOV
2171
      sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
2172
      break;
×
2173
    }
2174
  }
2175

UNCOV
2176
  taosArrayDestroy(pArray);
×
UNCOV
2177
  TAOS_RETURN(code);
×
2178
}
2179

UNCOV
2180
static int32_t mndAddIncVgroupReplicaToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
×
2181
                                             int32_t newDnodeId) {
UNCOV
2182
  int32_t code = 0;
×
UNCOV
2183
  mInfo("vgId:%d, will add 1 vnode, replica:%d dnode:%d", pVgroup->vgId, pVgroup->replica, newDnodeId);
×
2184

2185
  // assoc dnode
UNCOV
2186
  SVnodeGid *pGid = &pVgroup->vnodeGid[pVgroup->replica];
×
UNCOV
2187
  pVgroup->replica++;
×
UNCOV
2188
  pGid->dnodeId = newDnodeId;
×
UNCOV
2189
  pGid->syncState = TAOS_SYNC_STATE_OFFLINE;
×
UNCOV
2190
  pGid->nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
2191

UNCOV
2192
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
×
UNCOV
2193
  if (pVgRaw == NULL) {
×
2194
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2195
    if (terrno != 0) code = terrno;
×
2196
    TAOS_RETURN(code);
×
2197
  }
UNCOV
2198
  if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
2199
    sdbFreeRaw(pVgRaw);
×
2200
    TAOS_RETURN(code);
×
2201
  }
UNCOV
2202
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
UNCOV
2203
  if (code != 0) {
×
2204
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVgroup->vgId, tstrerror(code), __LINE__);
×
2205
    TAOS_RETURN(code);
×
2206
  }
2207

2208
  // learner
UNCOV
2209
  for (int32_t i = 0; i < pVgroup->replica - 1; ++i) {
×
UNCOV
2210
    TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId));
×
2211
  }
UNCOV
2212
  TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pGid));
×
2213

2214
  // voter
UNCOV
2215
  pGid->nodeRole = TAOS_SYNC_ROLE_VOTER;
×
UNCOV
2216
  TAOS_CHECK_RETURN(mndAddAlterVnodeTypeAction(pMnode, pTrans, pDb, pVgroup, pGid->dnodeId));
×
UNCOV
2217
  for (int32_t i = 0; i < pVgroup->replica - 1; ++i) {
×
UNCOV
2218
    TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId));
×
2219
  }
2220

2221
  // confirm
UNCOV
2222
  TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup));
×
2223

UNCOV
2224
  TAOS_RETURN(code);
×
2225
}
2226

UNCOV
2227
static int32_t mndAddDecVgroupReplicaFromTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
×
2228
                                               int32_t delDnodeId) {
UNCOV
2229
  int32_t code = 0;
×
UNCOV
2230
  mInfo("vgId:%d, will remove 1 vnode, replica:%d dnode:%d", pVgroup->vgId, pVgroup->replica, delDnodeId);
×
2231

UNCOV
2232
  SVnodeGid *pGid = NULL;
×
UNCOV
2233
  SVnodeGid  delGid = {0};
×
UNCOV
2234
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
×
UNCOV
2235
    if (pVgroup->vnodeGid[i].dnodeId == delDnodeId) {
×
UNCOV
2236
      pGid = &pVgroup->vnodeGid[i];
×
UNCOV
2237
      break;
×
2238
    }
2239
  }
2240

UNCOV
2241
  if (pGid == NULL) return 0;
×
2242

UNCOV
2243
  pVgroup->replica--;
×
UNCOV
2244
  memcpy(&delGid, pGid, sizeof(SVnodeGid));
×
UNCOV
2245
  memcpy(pGid, &pVgroup->vnodeGid[pVgroup->replica], sizeof(SVnodeGid));
×
UNCOV
2246
  memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
×
2247

UNCOV
2248
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
×
UNCOV
2249
  if (pVgRaw == NULL) {
×
2250
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2251
    if (terrno != 0) code = terrno;
×
2252
    TAOS_RETURN(code);
×
2253
  }
UNCOV
2254
  if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
2255
    sdbFreeRaw(pVgRaw);
×
2256
    TAOS_RETURN(code);
×
2257
  }
UNCOV
2258
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
UNCOV
2259
  if (code != 0) {
×
2260
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVgroup->vgId, tstrerror(code), __LINE__);
×
2261
    TAOS_RETURN(code);
×
2262
  }
2263

UNCOV
2264
  TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pDb, pVgroup, &delGid, true));
×
UNCOV
2265
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
×
UNCOV
2266
    TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId));
×
2267
  }
UNCOV
2268
  TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup));
×
2269

UNCOV
2270
  TAOS_RETURN(code);
×
2271
}
2272

UNCOV
2273
static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup, SDnodeObj *pNew1,
×
2274
                                     SDnodeObj *pOld1, SDnodeObj *pNew2, SDnodeObj *pOld2, SDnodeObj *pNew3,
2275
                                     SDnodeObj *pOld3) {
UNCOV
2276
  int32_t code = -1;
×
UNCOV
2277
  STrans *pTrans = NULL;
×
2278

UNCOV
2279
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "red-vgroup");
×
UNCOV
2280
  if (pTrans == NULL) {
×
2281
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2282
    if (terrno != 0) code = terrno;
×
2283
    goto _OVER;
×
2284
  }
2285

UNCOV
2286
  mndTransSetDbName(pTrans, pVgroup->dbName, NULL);
×
UNCOV
2287
  TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
×
2288

UNCOV
2289
  mndTransSetSerial(pTrans);
×
UNCOV
2290
  mInfo("trans:%d, used to redistribute vgroup, vgId:%d", pTrans->id, pVgroup->vgId);
×
2291

UNCOV
2292
  SVgObj newVg = {0};
×
UNCOV
2293
  memcpy(&newVg, pVgroup, sizeof(SVgObj));
×
UNCOV
2294
  mInfo("vgId:%d, vgroup info before redistribute, replica:%d", newVg.vgId, newVg.replica);
×
UNCOV
2295
  for (int32_t i = 0; i < newVg.replica; ++i) {
×
UNCOV
2296
    mInfo("vgId:%d, vnode:%d dnode:%d role:%s", newVg.vgId, i, newVg.vnodeGid[i].dnodeId,
×
2297
          syncStr(newVg.vnodeGid[i].syncState));
2298
  }
2299

UNCOV
2300
  if (pNew1 != NULL && pOld1 != NULL) {
×
UNCOV
2301
    int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew1->id);
×
UNCOV
2302
    if (numOfVnodes >= pNew1->numOfSupportVnodes) {
×
UNCOV
2303
      mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew1->id, numOfVnodes,
×
2304
             pNew1->numOfSupportVnodes);
UNCOV
2305
      code = TSDB_CODE_MND_NO_ENOUGH_VNODES;
×
UNCOV
2306
      goto _OVER;
×
2307
    }
2308

UNCOV
2309
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
×
UNCOV
2310
    if (pNew1->memAvail - vgMem - pNew1->memUsed <= 0) {
×
2311
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
2312
             pVgroup->dbName, pVgroup->vgId, vgMem, pNew1->id, pNew1->memAvail, pNew1->memUsed);
2313
      code = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
×
2314
      goto _OVER;
×
2315
    } else {
UNCOV
2316
      pNew1->memUsed += vgMem;
×
2317
    }
2318

UNCOV
2319
    TAOS_CHECK_GOTO(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew1->id), NULL, _OVER);
×
UNCOV
2320
    TAOS_CHECK_GOTO(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld1->id), NULL, _OVER);
×
2321
  }
2322

UNCOV
2323
  if (pNew2 != NULL && pOld2 != NULL) {
×
UNCOV
2324
    int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew2->id);
×
UNCOV
2325
    if (numOfVnodes >= pNew2->numOfSupportVnodes) {
×
2326
      mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew2->id, numOfVnodes,
×
2327
             pNew2->numOfSupportVnodes);
2328
      code = TSDB_CODE_MND_NO_ENOUGH_VNODES;
×
2329
      goto _OVER;
×
2330
    }
UNCOV
2331
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
×
UNCOV
2332
    if (pNew2->memAvail - vgMem - pNew2->memUsed <= 0) {
×
2333
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
2334
             pVgroup->dbName, pVgroup->vgId, vgMem, pNew2->id, pNew2->memAvail, pNew2->memUsed);
2335
      code = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
×
2336
      goto _OVER;
×
2337
    } else {
UNCOV
2338
      pNew2->memUsed += vgMem;
×
2339
    }
UNCOV
2340
    TAOS_CHECK_GOTO(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew2->id), NULL, _OVER);
×
UNCOV
2341
    TAOS_CHECK_GOTO(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld2->id), NULL, _OVER);
×
2342
  }
2343

UNCOV
2344
  if (pNew3 != NULL && pOld3 != NULL) {
×
UNCOV
2345
    int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew3->id);
×
UNCOV
2346
    if (numOfVnodes >= pNew3->numOfSupportVnodes) {
×
2347
      mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew3->id, numOfVnodes,
×
2348
             pNew3->numOfSupportVnodes);
2349
      code = TSDB_CODE_MND_NO_ENOUGH_VNODES;
×
2350
      goto _OVER;
×
2351
    }
UNCOV
2352
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
×
UNCOV
2353
    if (pNew3->memAvail - vgMem - pNew3->memUsed <= 0) {
×
2354
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
2355
             pVgroup->dbName, pVgroup->vgId, vgMem, pNew3->id, pNew3->memAvail, pNew3->memUsed);
2356
      code = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
×
2357
      goto _OVER;
×
2358
    } else {
UNCOV
2359
      pNew3->memUsed += vgMem;
×
2360
    }
UNCOV
2361
    TAOS_CHECK_GOTO(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew3->id), NULL, _OVER);
×
UNCOV
2362
    TAOS_CHECK_GOTO(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld3->id), NULL, _OVER);
×
2363
  }
2364

2365
  {
UNCOV
2366
    SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
×
UNCOV
2367
    if (pRaw == NULL) {
×
2368
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2369
      if (terrno != 0) code = terrno;
×
2370
      goto _OVER;
×
2371
    }
UNCOV
2372
    if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
×
2373
      sdbFreeRaw(pRaw);
×
2374
      goto _OVER;
×
2375
    }
UNCOV
2376
    code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
×
UNCOV
2377
    if (code != 0) {
×
2378
      mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2379
      goto _OVER;
×
2380
    }
2381
  }
2382

UNCOV
2383
  mInfo("vgId:%d, vgroup info after redistribute, replica:%d", newVg.vgId, newVg.replica);
×
UNCOV
2384
  for (int32_t i = 0; i < newVg.replica; ++i) {
×
UNCOV
2385
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
×
2386
  }
2387

UNCOV
2388
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
UNCOV
2389
  code = 0;
×
2390

UNCOV
2391
_OVER:
×
UNCOV
2392
  mndTransDrop(pTrans);
×
UNCOV
2393
  mndReleaseDb(pMnode, pDb);
×
UNCOV
2394
  TAOS_RETURN(code);
×
2395
}
2396

UNCOV
2397
static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) {
×
UNCOV
2398
  SMnode    *pMnode = pReq->info.node;
×
UNCOV
2399
  SDnodeObj *pNew1 = NULL;
×
UNCOV
2400
  SDnodeObj *pNew2 = NULL;
×
UNCOV
2401
  SDnodeObj *pNew3 = NULL;
×
UNCOV
2402
  SDnodeObj *pOld1 = NULL;
×
UNCOV
2403
  SDnodeObj *pOld2 = NULL;
×
UNCOV
2404
  SDnodeObj *pOld3 = NULL;
×
UNCOV
2405
  SVgObj    *pVgroup = NULL;
×
UNCOV
2406
  SDbObj    *pDb = NULL;
×
UNCOV
2407
  int32_t    code = -1;
×
UNCOV
2408
  int64_t    curMs = taosGetTimestampMs();
×
UNCOV
2409
  int32_t    newDnodeId[3] = {0};
×
UNCOV
2410
  int32_t    oldDnodeId[3] = {0};
×
UNCOV
2411
  int32_t    newIndex = -1;
×
UNCOV
2412
  int32_t    oldIndex = -1;
×
2413

UNCOV
2414
  SRedistributeVgroupReq req = {0};
×
UNCOV
2415
  if (tDeserializeSRedistributeVgroupReq(pReq->pCont, pReq->contLen, &req) != 0) {
×
2416
    code = TSDB_CODE_INVALID_MSG;
×
2417
    goto _OVER;
×
2418
  }
2419

UNCOV
2420
  mInfo("vgId:%d, start to redistribute vgroup to dnode %d:%d:%d", req.vgId, req.dnodeId1, req.dnodeId2, req.dnodeId3);
×
UNCOV
2421
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_REDISTRIBUTE_VGROUP)) != 0) {
×
UNCOV
2422
    goto _OVER;
×
2423
  }
2424

UNCOV
2425
  pVgroup = mndAcquireVgroup(pMnode, req.vgId);
×
UNCOV
2426
  if (pVgroup == NULL) {
×
UNCOV
2427
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2428
    if (terrno != 0) code = terrno;
×
UNCOV
2429
    goto _OVER;
×
2430
  }
2431

UNCOV
2432
  pDb = mndAcquireDb(pMnode, pVgroup->dbName);
×
UNCOV
2433
  if (pDb == NULL) {
×
2434
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2435
    if (terrno != 0) code = terrno;
×
2436
    goto _OVER;
×
2437
  }
2438

UNCOV
2439
  if (pVgroup->replica == 1) {
×
UNCOV
2440
    if (req.dnodeId1 <= 0 || req.dnodeId2 > 0 || req.dnodeId3 > 0) {
×
2441
      code = TSDB_CODE_MND_INVALID_REPLICA;
×
2442
      goto _OVER;
×
2443
    }
2444

UNCOV
2445
    if (req.dnodeId1 == pVgroup->vnodeGid[0].dnodeId) {
×
2446
      // terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
UNCOV
2447
      code = 0;
×
UNCOV
2448
      goto _OVER;
×
2449
    }
2450

UNCOV
2451
    pNew1 = mndAcquireDnode(pMnode, req.dnodeId1);
×
UNCOV
2452
    if (pNew1 == NULL) {
×
2453
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2454
      if (terrno != 0) code = terrno;
×
2455
      goto _OVER;
×
2456
    }
UNCOV
2457
    if (!mndIsDnodeOnline(pNew1, curMs)) {
×
2458
      code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2459
      goto _OVER;
×
2460
    }
2461

UNCOV
2462
    pOld1 = mndAcquireDnode(pMnode, pVgroup->vnodeGid[0].dnodeId);
×
UNCOV
2463
    if (pOld1 == NULL) {
×
2464
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2465
      if (terrno != 0) code = terrno;
×
2466
      goto _OVER;
×
2467
    }
UNCOV
2468
    if (!mndIsDnodeOnline(pOld1, curMs)) {
×
UNCOV
2469
      code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
UNCOV
2470
      goto _OVER;
×
2471
    }
2472

UNCOV
2473
    code = mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, NULL, NULL, NULL, NULL);
×
2474

UNCOV
2475
  } else if (pVgroup->replica == 3) {
×
UNCOV
2476
    if (req.dnodeId1 <= 0 || req.dnodeId2 <= 0 || req.dnodeId3 <= 0) {
×
UNCOV
2477
      code = TSDB_CODE_MND_INVALID_REPLICA;
×
UNCOV
2478
      goto _OVER;
×
2479
    }
2480

UNCOV
2481
    if (req.dnodeId1 == req.dnodeId2 || req.dnodeId1 == req.dnodeId3 || req.dnodeId2 == req.dnodeId3) {
×
UNCOV
2482
      code = TSDB_CODE_MND_INVALID_REPLICA;
×
UNCOV
2483
      goto _OVER;
×
2484
    }
2485

UNCOV
2486
    if (req.dnodeId1 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId1 != pVgroup->vnodeGid[1].dnodeId &&
×
UNCOV
2487
        req.dnodeId1 != pVgroup->vnodeGid[2].dnodeId) {
×
UNCOV
2488
      newDnodeId[++newIndex] = req.dnodeId1;
×
UNCOV
2489
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
×
2490
    }
2491

UNCOV
2492
    if (req.dnodeId2 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[1].dnodeId &&
×
UNCOV
2493
        req.dnodeId2 != pVgroup->vnodeGid[2].dnodeId) {
×
UNCOV
2494
      newDnodeId[++newIndex] = req.dnodeId2;
×
UNCOV
2495
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
×
2496
    }
2497

UNCOV
2498
    if (req.dnodeId3 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId3 != pVgroup->vnodeGid[1].dnodeId &&
×
UNCOV
2499
        req.dnodeId3 != pVgroup->vnodeGid[2].dnodeId) {
×
UNCOV
2500
      newDnodeId[++newIndex] = req.dnodeId3;
×
UNCOV
2501
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
×
2502
    }
2503

UNCOV
2504
    if (req.dnodeId1 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[0].dnodeId &&
×
UNCOV
2505
        req.dnodeId3 != pVgroup->vnodeGid[0].dnodeId) {
×
UNCOV
2506
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[0].dnodeId;
×
UNCOV
2507
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
×
2508
    }
2509

UNCOV
2510
    if (req.dnodeId1 != pVgroup->vnodeGid[1].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[1].dnodeId &&
×
UNCOV
2511
        req.dnodeId3 != pVgroup->vnodeGid[1].dnodeId) {
×
UNCOV
2512
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[1].dnodeId;
×
UNCOV
2513
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
×
2514
    }
2515

UNCOV
2516
    if (req.dnodeId1 != pVgroup->vnodeGid[2].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[2].dnodeId &&
×
UNCOV
2517
        req.dnodeId3 != pVgroup->vnodeGid[2].dnodeId) {
×
UNCOV
2518
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[2].dnodeId;
×
UNCOV
2519
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
×
2520
    }
2521

UNCOV
2522
    if (newDnodeId[0] != 0) {
×
UNCOV
2523
      pNew1 = mndAcquireDnode(pMnode, newDnodeId[0]);
×
UNCOV
2524
      if (pNew1 == NULL) {
×
2525
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2526
        if (terrno != 0) code = terrno;
×
2527
        goto _OVER;
×
2528
      }
UNCOV
2529
      if (!mndIsDnodeOnline(pNew1, curMs)) {
×
UNCOV
2530
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
UNCOV
2531
        goto _OVER;
×
2532
      }
2533
    }
2534

UNCOV
2535
    if (newDnodeId[1] != 0) {
×
UNCOV
2536
      pNew2 = mndAcquireDnode(pMnode, newDnodeId[1]);
×
UNCOV
2537
      if (pNew2 == NULL) {
×
2538
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2539
        if (terrno != 0) code = terrno;
×
2540
        goto _OVER;
×
2541
      }
UNCOV
2542
      if (!mndIsDnodeOnline(pNew2, curMs)) {
×
2543
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2544
        goto _OVER;
×
2545
      }
2546
    }
2547

UNCOV
2548
    if (newDnodeId[2] != 0) {
×
UNCOV
2549
      pNew3 = mndAcquireDnode(pMnode, newDnodeId[2]);
×
UNCOV
2550
      if (pNew3 == NULL) {
×
2551
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2552
        if (terrno != 0) code = terrno;
×
2553
        goto _OVER;
×
2554
      }
UNCOV
2555
      if (!mndIsDnodeOnline(pNew3, curMs)) {
×
2556
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2557
        goto _OVER;
×
2558
      }
2559
    }
2560

UNCOV
2561
    if (oldDnodeId[0] != 0) {
×
UNCOV
2562
      pOld1 = mndAcquireDnode(pMnode, oldDnodeId[0]);
×
UNCOV
2563
      if (pOld1 == NULL) {
×
2564
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2565
        if (terrno != 0) code = terrno;
×
2566
        goto _OVER;
×
2567
      }
UNCOV
2568
      if (!mndIsDnodeOnline(pOld1, curMs)) {
×
UNCOV
2569
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
UNCOV
2570
        goto _OVER;
×
2571
      }
2572
    }
2573

UNCOV
2574
    if (oldDnodeId[1] != 0) {
×
UNCOV
2575
      pOld2 = mndAcquireDnode(pMnode, oldDnodeId[1]);
×
UNCOV
2576
      if (pOld2 == NULL) {
×
2577
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2578
        if (terrno != 0) code = terrno;
×
2579
        goto _OVER;
×
2580
      }
UNCOV
2581
      if (!mndIsDnodeOnline(pOld2, curMs)) {
×
2582
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2583
        goto _OVER;
×
2584
      }
2585
    }
2586

UNCOV
2587
    if (oldDnodeId[2] != 0) {
×
UNCOV
2588
      pOld3 = mndAcquireDnode(pMnode, oldDnodeId[2]);
×
UNCOV
2589
      if (pOld3 == NULL) {
×
2590
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2591
        if (terrno != 0) code = terrno;
×
2592
        goto _OVER;
×
2593
      }
UNCOV
2594
      if (!mndIsDnodeOnline(pOld3, curMs)) {
×
2595
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2596
        goto _OVER;
×
2597
      }
2598
    }
2599

UNCOV
2600
    if (pNew1 == NULL && pOld1 == NULL && pNew2 == NULL && pOld2 == NULL && pNew3 == NULL && pOld3 == NULL) {
×
2601
      // terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
UNCOV
2602
      code = 0;
×
UNCOV
2603
      goto _OVER;
×
2604
    }
2605

UNCOV
2606
    code = mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, pNew2, pOld2, pNew3, pOld3);
×
2607

2608
  } else {
2609
    code = TSDB_CODE_MND_REQ_REJECTED;
×
2610
    goto _OVER;
×
2611
  }
2612

UNCOV
2613
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
2614

UNCOV
2615
  char obj[33] = {0};
×
UNCOV
2616
  (void)tsnprintf(obj, sizeof(obj), "%d", req.vgId);
×
2617

UNCOV
2618
  auditRecord(pReq, pMnode->clusterId, "RedistributeVgroup", "", obj, req.sql, req.sqlLen);
×
2619

UNCOV
2620
_OVER:
×
UNCOV
2621
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
2622
    mError("vgId:%d, failed to redistribute to dnode %d:%d:%d since %s", req.vgId, req.dnodeId1, req.dnodeId2,
×
2623
           req.dnodeId3, tstrerror(code));
2624
  }
2625

UNCOV
2626
  mndReleaseDnode(pMnode, pNew1);
×
UNCOV
2627
  mndReleaseDnode(pMnode, pNew2);
×
UNCOV
2628
  mndReleaseDnode(pMnode, pNew3);
×
UNCOV
2629
  mndReleaseDnode(pMnode, pOld1);
×
UNCOV
2630
  mndReleaseDnode(pMnode, pOld2);
×
UNCOV
2631
  mndReleaseDnode(pMnode, pOld3);
×
UNCOV
2632
  mndReleaseVgroup(pMnode, pVgroup);
×
UNCOV
2633
  mndReleaseDb(pMnode, pDb);
×
UNCOV
2634
  tFreeSRedistributeVgroupReq(&req);
×
2635

UNCOV
2636
  TAOS_RETURN(code);
×
2637
}
2638

UNCOV
2639
static void *mndBuildSForceBecomeFollowerReq(SMnode *pMnode, SVgObj *pVgroup, int32_t dnodeId, int32_t *pContLen) {
×
UNCOV
2640
  SForceBecomeFollowerReq balanceReq = {
×
UNCOV
2641
      .vgId = pVgroup->vgId,
×
2642
  };
2643

UNCOV
2644
  int32_t contLen = tSerializeSForceBecomeFollowerReq(NULL, 0, &balanceReq);
×
UNCOV
2645
  if (contLen < 0) {
×
2646
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2647
    return NULL;
×
2648
  }
UNCOV
2649
  contLen += sizeof(SMsgHead);
×
2650

UNCOV
2651
  void *pReq = taosMemoryMalloc(contLen);
×
UNCOV
2652
  if (pReq == NULL) {
×
2653
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2654
    return NULL;
×
2655
  }
2656

UNCOV
2657
  SMsgHead *pHead = pReq;
×
UNCOV
2658
  pHead->contLen = htonl(contLen);
×
UNCOV
2659
  pHead->vgId = htonl(pVgroup->vgId);
×
2660

UNCOV
2661
  if (tSerializeSForceBecomeFollowerReq((char *)pReq + sizeof(SMsgHead), contLen, &balanceReq) < 0) {
×
2662
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2663
    taosMemoryFree(pReq);
×
2664
    return NULL;
×
2665
  }
UNCOV
2666
  *pContLen = contLen;
×
UNCOV
2667
  return pReq;
×
2668
}
2669

UNCOV
2670
int32_t mndAddBalanceVgroupLeaderAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t dnodeId) {
×
UNCOV
2671
  int32_t    code = 0;
×
UNCOV
2672
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
×
UNCOV
2673
  if (pDnode == NULL) {
×
2674
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2675
    if (terrno != 0) code = terrno;
×
2676
    TAOS_RETURN(code);
×
2677
  }
2678

UNCOV
2679
  STransAction action = {0};
×
UNCOV
2680
  action.epSet = mndGetDnodeEpset(pDnode);
×
UNCOV
2681
  mndReleaseDnode(pMnode, pDnode);
×
2682

UNCOV
2683
  int32_t contLen = 0;
×
UNCOV
2684
  void   *pReq = mndBuildSForceBecomeFollowerReq(pMnode, pVgroup, dnodeId, &contLen);
×
UNCOV
2685
  if (pReq == NULL) {
×
2686
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2687
    if (terrno != 0) code = terrno;
×
2688
    TAOS_RETURN(code);
×
2689
  }
2690

UNCOV
2691
  action.pCont = pReq;
×
UNCOV
2692
  action.contLen = contLen;
×
UNCOV
2693
  action.msgType = TDMT_SYNC_FORCE_FOLLOWER;
×
2694

UNCOV
2695
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
2696
    taosMemoryFree(pReq);
×
2697
    TAOS_RETURN(code);
×
2698
  }
2699

UNCOV
2700
  TAOS_RETURN(code);
×
2701
}
2702

UNCOV
2703
int32_t mndAddVgroupBalanceToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTrans) {
×
UNCOV
2704
  int32_t code = 0;
×
UNCOV
2705
  SSdb   *pSdb = pMnode->pSdb;
×
2706

UNCOV
2707
  int32_t vgid = pVgroup->vgId;
×
UNCOV
2708
  int8_t  replica = pVgroup->replica;
×
2709

UNCOV
2710
  if (pVgroup->replica <= 1) {
×
UNCOV
2711
    mInfo("trans:%d, vgid:%d no need to balance, replica:%d", pTrans->id, vgid, replica);
×
UNCOV
2712
    return -1;
×
2713
  }
2714

UNCOV
2715
  int32_t dnodeId = 0;
×
2716

UNCOV
2717
  for (int i = 0; i < replica; i++) {
×
UNCOV
2718
    if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEADER) {
×
UNCOV
2719
      dnodeId = pVgroup->vnodeGid[i].dnodeId;
×
UNCOV
2720
      break;
×
2721
    }
2722
  }
2723

UNCOV
2724
  bool       exist = false;
×
UNCOV
2725
  bool       online = false;
×
UNCOV
2726
  int64_t    curMs = taosGetTimestampMs();
×
UNCOV
2727
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
×
UNCOV
2728
  if (pDnode != NULL) {
×
UNCOV
2729
    exist = true;
×
UNCOV
2730
    online = mndIsDnodeOnline(pDnode, curMs);
×
UNCOV
2731
    mndReleaseDnode(pMnode, pDnode);
×
2732
  }
2733

UNCOV
2734
  if (exist && online) {
×
UNCOV
2735
    mInfo("trans:%d, vgid:%d leader to dnode:%d", pTrans->id, vgid, dnodeId);
×
2736

UNCOV
2737
    if ((code = mndAddBalanceVgroupLeaderAction(pMnode, pTrans, pVgroup, dnodeId)) != 0) {
×
2738
      mError("trans:%d, vgid:%d failed to be balanced to dnode:%d", pTrans->id, vgid, dnodeId);
×
2739
      TAOS_RETURN(code);
×
2740
    }
2741

UNCOV
2742
    SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
×
UNCOV
2743
    if (pDb == NULL) {
×
2744
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2745
      if (terrno != 0) code = terrno;
×
2746
      mError("trans:%d, vgid:%d failed to be balanced to dnode:%d, because db not exist", pTrans->id, vgid, dnodeId);
×
2747
      TAOS_RETURN(code);
×
2748
    }
2749

UNCOV
2750
    mndReleaseDb(pMnode, pDb);
×
2751
  } else {
UNCOV
2752
    mInfo("trans:%d, vgid:%d cant be balanced to dnode:%d, exist:%d, online:%d", pTrans->id, vgid, dnodeId, exist,
×
2753
          online);
2754
  }
2755

UNCOV
2756
  TAOS_RETURN(code);
×
2757
}
2758

2759
extern int32_t mndProcessVgroupBalanceLeaderMsgImp(SRpcMsg *pReq);
2760

UNCOV
2761
int32_t mndProcessVgroupBalanceLeaderMsg(SRpcMsg *pReq) { return mndProcessVgroupBalanceLeaderMsgImp(pReq); }
×
2762

2763
#ifndef TD_ENTERPRISE
2764
int32_t mndProcessVgroupBalanceLeaderMsgImp(SRpcMsg *pReq) { return 0; }
2765
#endif
2766

UNCOV
2767
static int32_t mndCheckDnodeMemory(SMnode *pMnode, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pOldVgroup,
×
2768
                                   SVgObj *pNewVgroup, SArray *pArray) {
UNCOV
2769
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
×
UNCOV
2770
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
×
UNCOV
2771
    bool       inVgroup = false;
×
UNCOV
2772
    int64_t    oldMemUsed = 0;
×
UNCOV
2773
    int64_t    newMemUsed = 0;
×
UNCOV
2774
    mDebug("db:%s, vgId:%d, check dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName, pNewVgroup->vgId,
×
2775
           pDnode->id, pDnode->memAvail, pDnode->memUsed);
UNCOV
2776
    for (int32_t j = 0; j < pOldVgroup->replica; ++j) {
×
UNCOV
2777
      SVnodeGid *pVgId = &pOldVgroup->vnodeGid[j];
×
UNCOV
2778
      if (pDnode->id == pVgId->dnodeId) {
×
UNCOV
2779
        oldMemUsed = mndGetVgroupMemory(pMnode, pOldDb, pOldVgroup);
×
UNCOV
2780
        inVgroup = true;
×
2781
      }
2782
    }
UNCOV
2783
    for (int32_t j = 0; j < pNewVgroup->replica; ++j) {
×
UNCOV
2784
      SVnodeGid *pVgId = &pNewVgroup->vnodeGid[j];
×
UNCOV
2785
      if (pDnode->id == pVgId->dnodeId) {
×
UNCOV
2786
        newMemUsed = mndGetVgroupMemory(pMnode, pNewDb, pNewVgroup);
×
UNCOV
2787
        inVgroup = true;
×
2788
      }
2789
    }
2790

UNCOV
2791
    mDebug("db:%s, vgId:%d, memory in dnode:%d, oldUsed:%" PRId64 ", newUsed:%" PRId64, pNewVgroup->dbName,
×
2792
           pNewVgroup->vgId, pDnode->id, oldMemUsed, newMemUsed);
2793

UNCOV
2794
    pDnode->memUsed = pDnode->memUsed - oldMemUsed + newMemUsed;
×
UNCOV
2795
    if (pDnode->memAvail - pDnode->memUsed <= 0) {
×
2796
      mError("db:%s, vgId:%d, no enough memory in dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName,
×
2797
             pNewVgroup->vgId, pDnode->id, pDnode->memAvail, pDnode->memUsed);
2798
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE);
×
UNCOV
2799
    } else if (inVgroup) {
×
UNCOV
2800
      mInfo("db:%s, vgId:%d, memory in dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName, pNewVgroup->vgId,
×
2801
            pDnode->id, pDnode->memAvail, pDnode->memUsed);
2802
    } else {
2803
    }
2804
  }
UNCOV
2805
  return 0;
×
2806
}
2807

UNCOV
2808
int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
×
2809
                                  SArray *pArray, SVgObj *pNewVgroup) {
UNCOV
2810
  int32_t code = 0;
×
UNCOV
2811
  memcpy(pNewVgroup, pVgroup, sizeof(SVgObj));
×
2812

UNCOV
2813
  if (pVgroup->replica <= 0 || pVgroup->replica == pNewDb->cfg.replications) {
×
UNCOV
2814
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfigAction(pMnode, pTrans, pNewDb, pVgroup));
×
UNCOV
2815
    TAOS_CHECK_RETURN(mndCheckDnodeMemory(pMnode, pOldDb, pNewDb, pNewVgroup, pVgroup, pArray));
×
UNCOV
2816
    return 0;
×
2817
  }
2818

UNCOV
2819
  mndTransSetSerial(pTrans);
×
2820

UNCOV
2821
  if (pNewDb->cfg.replications == 3) {
×
UNCOV
2822
    mInfo("db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId,
×
2823
          pVgroup->vnodeGid[0].dnodeId);
2824

2825
    // add second
UNCOV
2826
    if (pNewVgroup->replica == 1) {
×
UNCOV
2827
      TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
×
2828
    }
2829

2830
    // learner stage
UNCOV
2831
    pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
UNCOV
2832
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
UNCOV
2833
    TAOS_CHECK_RETURN(
×
2834
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
2835

UNCOV
2836
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &pNewVgroup->vnodeGid[1]));
×
2837

2838
    // follower stage
UNCOV
2839
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
UNCOV
2840
    TAOS_CHECK_RETURN(mndAddAlterVnodeTypeAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
×
UNCOV
2841
    TAOS_CHECK_RETURN(
×
2842
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
2843

UNCOV
2844
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
×
2845

2846
    // add third
UNCOV
2847
    if (pNewVgroup->replica == 2) {
×
UNCOV
2848
      TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
×
2849
    }
2850

UNCOV
2851
    pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
UNCOV
2852
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
UNCOV
2853
    pNewVgroup->vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
UNCOV
2854
    TAOS_CHECK_RETURN(
×
2855
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
UNCOV
2856
    TAOS_CHECK_RETURN(
×
2857
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
UNCOV
2858
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &pNewVgroup->vnodeGid[2]));
×
2859

UNCOV
2860
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
×
UNCOV
2861
  } else if (pNewDb->cfg.replications == 1) {
×
UNCOV
2862
    mInfo("db:%s, vgId:%d, will remove 2 vnodes, vn:0 dnode:%d vn:1 dnode:%d vn:2 dnode:%d", pVgroup->dbName,
×
2863
          pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId, pVgroup->vnodeGid[1].dnodeId, pVgroup->vnodeGid[2].dnodeId);
2864

UNCOV
2865
    SVnodeGid del1 = {0};
×
UNCOV
2866
    SVnodeGid del2 = {0};
×
UNCOV
2867
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroup(pMnode, pTrans, pNewVgroup, pArray, &del1));
×
UNCOV
2868
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &del1, true));
×
UNCOV
2869
    TAOS_CHECK_RETURN(
×
2870
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
UNCOV
2871
    TAOS_CHECK_RETURN(
×
2872
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
UNCOV
2873
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
×
2874

UNCOV
2875
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroup(pMnode, pTrans, pNewVgroup, pArray, &del2));
×
UNCOV
2876
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &del2, true));
×
UNCOV
2877
    TAOS_CHECK_RETURN(
×
2878
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
UNCOV
2879
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
×
2880
  } else if (pNewDb->cfg.replications == 2) {
×
2881
    mInfo("db:%s, vgId:%d, will add 1 vnode, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId,
×
2882
          pVgroup->vnodeGid[0].dnodeId);
2883

2884
    // add second
2885
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
×
2886

2887
    // learner stage
2888
    pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2889
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
2890
    TAOS_CHECK_RETURN(
×
2891
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
2892

2893
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &pNewVgroup->vnodeGid[1]));
×
2894

2895
    // follower stage
2896
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2897
    TAOS_CHECK_RETURN(mndAddAlterVnodeTypeAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
×
2898
    TAOS_CHECK_RETURN(
×
2899
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
2900

2901
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
×
2902
  } else {
2903
    return -1;
×
2904
  }
2905

UNCOV
2906
  mndSortVnodeGid(pNewVgroup);
×
2907

2908
  {
UNCOV
2909
    SSdbRaw *pVgRaw = mndVgroupActionEncode(pNewVgroup);
×
UNCOV
2910
    if (pVgRaw == NULL) {
×
2911
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2912
      if (terrno != 0) code = terrno;
×
2913
      TAOS_RETURN(code);
×
2914
    }
UNCOV
2915
    if ((code = mndTransAppendCommitlog(pTrans, pVgRaw)) != 0) {
×
2916
      sdbFreeRaw(pVgRaw);
×
2917
      TAOS_RETURN(code);
×
2918
    }
UNCOV
2919
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
UNCOV
2920
    if (code != 0) {
×
2921
      mError("vgId:%d, failed to set raw status since %s at line:%d", pNewVgroup->vgId, tstrerror(code), __LINE__);
×
2922
      TAOS_RETURN(code);
×
2923
    }
2924
  }
2925

UNCOV
2926
  TAOS_RETURN(code);
×
2927
}
2928

2929
int32_t mndBuildRaftAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
×
2930
                                      SArray *pArray) {
2931
  int32_t code = 0;
×
2932
  SVgObj  newVgroup = {0};
×
2933
  memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
×
2934

2935
  if (pVgroup->replica <= 0 || pVgroup->replica == pNewDb->cfg.replications) {
×
2936
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfigAction(pMnode, pTrans, pNewDb, pVgroup));
×
2937
    TAOS_CHECK_RETURN(mndCheckDnodeMemory(pMnode, pOldDb, pNewDb, &newVgroup, pVgroup, pArray));
×
2938
    return 0;
×
2939
  }
2940

2941
  mndTransSetSerial(pTrans);
×
2942

2943
  mInfo("trans:%d, vgId:%d, alter vgroup, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id, pVgroup->vgId,
×
2944
        pVgroup->syncConfChangeVer, pVgroup->version, pVgroup->replica);
2945

2946
  if (newVgroup.replica == 1 && pNewDb->cfg.replications == 3) {
×
2947
    mInfo("db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId,
×
2948
          pVgroup->vnodeGid[0].dnodeId);
2949

2950
    // add second
2951
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVgroup, pArray));
×
2952
    // add third
2953
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVgroup, pArray));
×
2954

2955
    // add learner stage
2956
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2957
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
2958
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
2959
    TAOS_CHECK_RETURN(
×
2960
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
2961
    mInfo("trans:%d, vgId:%d, add change config, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id,
×
2962
          pVgroup->vgId, newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
2963
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[1]));
×
2964
    mInfo("trans:%d, vgId:%d, create vnode, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id, pVgroup->vgId,
×
2965
          newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
2966
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[2]));
×
2967
    mInfo("trans:%d, vgId:%d, create vnode, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id, pVgroup->vgId,
×
2968
          newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
2969

2970
    // check learner
2971
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2972
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2973
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2974
    TAOS_CHECK_RETURN(
×
2975
        mndAddCheckLearnerCatchupAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[1].dnodeId));
2976
    TAOS_CHECK_RETURN(
×
2977
        mndAddCheckLearnerCatchupAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[2].dnodeId));
2978

2979
    // change raft type
2980
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2981
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2982
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
2983
    TAOS_CHECK_RETURN(
×
2984
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
2985

2986
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
2987

2988
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2989
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2990
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2991
    TAOS_CHECK_RETURN(
×
2992
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
2993

2994
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
2995

2996
    SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
×
2997
    if (pVgRaw == NULL) {
×
2998
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2999
      if (terrno != 0) code = terrno;
×
3000
      TAOS_RETURN(code);
×
3001
    }
3002
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
3003
      sdbFreeRaw(pVgRaw);
×
3004
      TAOS_RETURN(code);
×
3005
    }
3006
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3007
    if (code != 0) {
×
3008
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3009
             __LINE__);
3010
      TAOS_RETURN(code);
×
3011
    }
3012
  } else if (newVgroup.replica == 3 && pNewDb->cfg.replications == 1) {
×
3013
    mInfo("db:%s, vgId:%d, will remove 2 vnodes, vn:0 dnode:%d vn:1 dnode:%d vn:2 dnode:%d", pVgroup->dbName,
×
3014
          pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId, pVgroup->vnodeGid[1].dnodeId, pVgroup->vnodeGid[2].dnodeId);
3015

3016
    SVnodeGid del1 = {0};
×
3017
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroupWithoutSave(pMnode, pTrans, &newVgroup, pArray, &del1));
×
3018

3019
    TAOS_CHECK_RETURN(
×
3020
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3021

3022
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3023

3024
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del1, true));
×
3025

3026
    SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
×
3027
    if (pVgRaw == NULL) {
×
3028
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3029
      if (terrno != 0) code = terrno;
×
3030
      TAOS_RETURN(code);
×
3031
    }
3032
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
3033
      sdbFreeRaw(pVgRaw);
×
3034
      TAOS_RETURN(code);
×
3035
    }
3036
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3037
    if (code != 0) {
×
3038
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3039
             __LINE__);
3040
      TAOS_RETURN(code);
×
3041
    }
3042

3043
    SVnodeGid del2 = {0};
×
3044
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroupWithoutSave(pMnode, pTrans, &newVgroup, pArray, &del2));
×
3045

3046
    TAOS_CHECK_RETURN(
×
3047
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3048

3049
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3050

3051
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del2, true));
×
3052

3053
    pVgRaw = mndVgroupActionEncode(&newVgroup);
×
3054
    if (pVgRaw == NULL) {
×
3055
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3056
      if (terrno != 0) code = terrno;
×
3057
      TAOS_RETURN(code);
×
3058
    }
3059
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
3060
      sdbFreeRaw(pVgRaw);
×
3061
      TAOS_RETURN(code);
×
3062
    }
3063
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3064
    if (code != 0) {
×
3065
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3066
             __LINE__);
3067
      TAOS_RETURN(code);
×
3068
    }
3069
  } else {
3070
    return -1;
×
3071
  }
3072

3073
  mndSortVnodeGid(&newVgroup);
×
3074

3075
  {
3076
    SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
×
3077
    if (pVgRaw == NULL) {
×
3078
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3079
      if (terrno != 0) code = terrno;
×
3080
      TAOS_RETURN(code);
×
3081
    }
3082
    if ((code = mndTransAppendCommitlog(pTrans, pVgRaw)) != 0) {
×
3083
      sdbFreeRaw(pVgRaw);
×
3084
      TAOS_RETURN(code);
×
3085
    }
3086
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3087
    if (code != 0) {
×
3088
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3089
             __LINE__);
3090
      TAOS_RETURN(code);
×
3091
    }
3092
  }
3093

3094
  TAOS_RETURN(code);
×
3095
}
3096

UNCOV
3097
int32_t mndBuildRestoreAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *db, SVgObj *pVgroup, SDnodeObj *pDnode,
×
3098
                                         SDnodeObj *pAnotherDnode) {
UNCOV
3099
  int32_t code = 0;
×
UNCOV
3100
  SVgObj  newVgroup = {0};
×
UNCOV
3101
  memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
×
3102

UNCOV
3103
  mInfo("db:%s, vgId:%d, restore vnodes, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId);
×
3104

UNCOV
3105
  if (newVgroup.replica == 1) {
×
3106
    int selected = 0;
×
3107
    for (int i = 0; i < newVgroup.replica; i++) {
×
3108
      newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3109
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3110
        selected = i;
×
3111
      }
3112
    }
3113
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, &newVgroup.vnodeGid[selected]));
×
UNCOV
3114
  } else if (newVgroup.replica == 2) {
×
3115
    for (int i = 0; i < newVgroup.replica; i++) {
×
3116
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3117
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3118
      } else {
3119
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3120
      }
3121
    }
3122
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pAnotherDnode));
×
3123

3124
    for (int i = 0; i < newVgroup.replica; i++) {
×
3125
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3126
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3127
      } else {
3128
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3129
      }
3130
    }
3131
    TAOS_CHECK_RETURN(mndRestoreAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, pDnode));
×
3132

3133
    for (int i = 0; i < newVgroup.replica; i++) {
×
3134
      newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3135
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3136
      }
3137
    }
3138
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pDnode));
×
3139
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pAnotherDnode));
×
UNCOV
3140
  } else if (newVgroup.replica == 3) {
×
UNCOV
3141
    for (int i = 0; i < newVgroup.replica; i++) {
×
UNCOV
3142
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
UNCOV
3143
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3144
      } else {
UNCOV
3145
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3146
      }
3147
    }
UNCOV
3148
    TAOS_CHECK_RETURN(mndRestoreAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, pDnode));
×
3149

UNCOV
3150
    for (int i = 0; i < newVgroup.replica; i++) {
×
UNCOV
3151
      newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
UNCOV
3152
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3153
      }
3154
    }
UNCOV
3155
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pDnode));
×
3156
  }
UNCOV
3157
  SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
×
UNCOV
3158
  if (pVgRaw == NULL) {
×
3159
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3160
    if (terrno != 0) code = terrno;
×
3161
    TAOS_RETURN(code);
×
3162
  }
UNCOV
3163
  if ((code = mndTransAppendCommitlog(pTrans, pVgRaw)) != 0) {
×
3164
    sdbFreeRaw(pVgRaw);
×
3165
    TAOS_RETURN(code);
×
3166
  }
UNCOV
3167
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
UNCOV
3168
  if (code != 0) {
×
3169
    mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code), __LINE__);
×
3170
    TAOS_RETURN(code);
×
3171
  }
3172

UNCOV
3173
  TAOS_RETURN(code);
×
3174
}
3175

3176
static int32_t mndAddAdjustVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
×
3177
  return 0;
×
3178
}
3179

3180
typedef int32_t (*FpTransActionCb)(STrans *pTrans, SSdbRaw *pRaw);
3181

UNCOV
3182
static int32_t mndAddVgStatusAction(STrans *pTrans, SVgObj *pVg, ESdbStatus vgStatus, ETrnStage stage) {
×
UNCOV
3183
  int32_t         code = 0;
×
UNCOV
3184
  FpTransActionCb appendActionCb = (stage == TRN_STAGE_COMMIT_ACTION) ? mndTransAppendCommitlog : mndTransAppendRedolog;
×
UNCOV
3185
  SSdbRaw        *pRaw = mndVgroupActionEncode(pVg);
×
UNCOV
3186
  if (pRaw == NULL) {
×
3187
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3188
    if (terrno != 0) code = terrno;
×
3189
    goto _err;
×
3190
  }
UNCOV
3191
  if ((code = appendActionCb(pTrans, pRaw)) != 0) goto _err;
×
UNCOV
3192
  code = sdbSetRawStatus(pRaw, vgStatus);
×
UNCOV
3193
  if (code != 0) {
×
3194
    mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", pVg->vgId, tstrerror(code), __LINE__);
×
3195
    goto _err;
×
3196
  }
UNCOV
3197
  pRaw = NULL;
×
UNCOV
3198
  TAOS_RETURN(code);
×
3199
_err:
×
3200
  sdbFreeRaw(pRaw);
×
3201
  TAOS_RETURN(code);
×
3202
}
3203

UNCOV
3204
static int32_t mndAddDbStatusAction(STrans *pTrans, SDbObj *pDb, ESdbStatus dbStatus, ETrnStage stage) {
×
UNCOV
3205
  int32_t         code = 0;
×
UNCOV
3206
  FpTransActionCb appendActionCb = (stage == TRN_STAGE_COMMIT_ACTION) ? mndTransAppendCommitlog : mndTransAppendRedolog;
×
UNCOV
3207
  SSdbRaw        *pRaw = mndDbActionEncode(pDb);
×
UNCOV
3208
  if (pRaw == NULL) {
×
3209
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3210
    if (terrno != 0) code = terrno;
×
3211
    goto _err;
×
3212
  }
UNCOV
3213
  if ((code = appendActionCb(pTrans, pRaw)) != 0) goto _err;
×
UNCOV
3214
  code = sdbSetRawStatus(pRaw, dbStatus);
×
UNCOV
3215
  if (code != 0) {
×
3216
    mError("db:%s, failed to set raw status to ready, error:%s, line:%d", pDb->name, tstrerror(code), __LINE__);
×
3217
    goto _err;
×
3218
  }
UNCOV
3219
  pRaw = NULL;
×
UNCOV
3220
  TAOS_RETURN(code);
×
3221
_err:
×
3222
  sdbFreeRaw(pRaw);
×
3223
  TAOS_RETURN(code);
×
3224
}
3225

UNCOV
3226
int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) {
×
UNCOV
3227
  int32_t code = -1;
×
UNCOV
3228
  STrans *pTrans = NULL;
×
UNCOV
3229
  SDbObj  dbObj = {0};
×
UNCOV
3230
  SArray *pArray = mndBuildDnodesArray(pMnode, 0, NULL);
×
3231

UNCOV
3232
  int32_t numOfStreams = 0;
×
UNCOV
3233
  if ((code = mndGetNumOfStreams(pMnode, pDb->name, &numOfStreams)) != 0) {
×
3234
    goto _OVER;
×
3235
  }
UNCOV
3236
  if (numOfStreams > 0) {
×
3237
    code = TSDB_CODE_MND_STREAM_MUST_BE_DELETED;
×
3238
    goto _OVER;
×
3239
  }
3240

3241
#if defined(USE_S3)
3242
  extern int8_t tsS3Enabled;
UNCOV
3243
  if (tsS3Enabled) {
×
3244
    code = TSDB_CODE_OPS_NOT_SUPPORT;
×
3245
    mError("vgId:%d, db:%s, s3 exists, split vgroup not allowed", pVgroup->vgId, pVgroup->dbName);
×
3246
    goto _OVER;
×
3247
  }
3248
#endif
3249

UNCOV
3250
  if (pDb->cfg.withArbitrator) {
×
3251
    code = TSDB_CODE_OPS_NOT_SUPPORT;
×
3252
    mError("vgId:%d, db:%s, with arbitrator, split vgroup not allowed", pVgroup->vgId, pVgroup->dbName);
×
3253
    goto _OVER;
×
3254
  }
3255

UNCOV
3256
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "split-vgroup");
×
UNCOV
3257
  if (pTrans == NULL) {
×
3258
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3259
    if (terrno != 0) code = terrno;
×
3260
    goto _OVER;
×
3261
  }
UNCOV
3262
  mndTransSetSerial(pTrans);
×
UNCOV
3263
  mInfo("trans:%d, used to split vgroup, vgId:%d", pTrans->id, pVgroup->vgId);
×
3264

UNCOV
3265
  mndTransSetDbName(pTrans, pDb->name, NULL);
×
UNCOV
3266
  TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
×
3267

UNCOV
3268
  SVgObj newVg1 = {0};
×
UNCOV
3269
  memcpy(&newVg1, pVgroup, sizeof(SVgObj));
×
UNCOV
3270
  mInfo("vgId:%d, vgroup info before split, replica:%d hashBegin:%u hashEnd:%u", newVg1.vgId, newVg1.replica,
×
3271
        newVg1.hashBegin, newVg1.hashEnd);
UNCOV
3272
  for (int32_t i = 0; i < newVg1.replica; ++i) {
×
UNCOV
3273
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg1.vgId, i, newVg1.vnodeGid[i].dnodeId);
×
3274
  }
3275

UNCOV
3276
  if (newVg1.replica == 1) {
×
UNCOV
3277
    TAOS_CHECK_GOTO(mndAddVnodeToVgroup(pMnode, pTrans, &newVg1, pArray), NULL, _OVER);
×
3278

UNCOV
3279
    newVg1.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
UNCOV
3280
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId), NULL,
×
3281
                    _OVER);
UNCOV
3282
    TAOS_CHECK_GOTO(mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg1, &newVg1.vnodeGid[1]), NULL, _OVER);
×
3283

UNCOV
3284
    newVg1.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
UNCOV
3285
    TAOS_CHECK_GOTO(mndAddAlterVnodeTypeAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[1].dnodeId), NULL, _OVER);
×
UNCOV
3286
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId), NULL,
×
3287
                    _OVER);
3288

UNCOV
3289
    TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1), NULL, _OVER);
×
UNCOV
3290
  } else if (newVg1.replica == 3) {
×
UNCOV
3291
    SVnodeGid del1 = {0};
×
UNCOV
3292
    TAOS_CHECK_GOTO(mndRemoveVnodeFromVgroup(pMnode, pTrans, &newVg1, pArray, &del1), NULL, _OVER);
×
UNCOV
3293
    TAOS_CHECK_GOTO(mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg1, &del1, true), NULL, _OVER);
×
UNCOV
3294
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId), NULL,
×
3295
                    _OVER);
UNCOV
3296
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[1].dnodeId), NULL,
×
3297
                    _OVER);
3298
  } else {
3299
    goto _OVER;
×
3300
  }
3301

UNCOV
3302
  for (int32_t i = 0; i < newVg1.replica; ++i) {
×
UNCOV
3303
    TAOS_CHECK_GOTO(mndAddDisableVnodeWriteAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[i].dnodeId), NULL,
×
3304
                    _OVER);
3305
  }
UNCOV
3306
  TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1), NULL, _OVER);
×
3307

UNCOV
3308
  SVgObj newVg2 = {0};
×
UNCOV
3309
  memcpy(&newVg2, &newVg1, sizeof(SVgObj));
×
UNCOV
3310
  newVg1.replica = 1;
×
UNCOV
3311
  newVg1.hashEnd = newVg1.hashBegin / 2 + newVg1.hashEnd / 2;
×
UNCOV
3312
  memset(&newVg1.vnodeGid[1], 0, sizeof(SVnodeGid));
×
3313

UNCOV
3314
  newVg2.replica = 1;
×
UNCOV
3315
  newVg2.hashBegin = newVg1.hashEnd + 1;
×
UNCOV
3316
  memcpy(&newVg2.vnodeGid[0], &newVg2.vnodeGid[1], sizeof(SVnodeGid));
×
UNCOV
3317
  memset(&newVg2.vnodeGid[1], 0, sizeof(SVnodeGid));
×
3318

UNCOV
3319
  mInfo("vgId:%d, vgroup info after split, replica:%d hashrange:[%u, %u] vnode:0 dnode:%d", newVg1.vgId, newVg1.replica,
×
3320
        newVg1.hashBegin, newVg1.hashEnd, newVg1.vnodeGid[0].dnodeId);
UNCOV
3321
  for (int32_t i = 0; i < newVg1.replica; ++i) {
×
UNCOV
3322
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg1.vgId, i, newVg1.vnodeGid[i].dnodeId);
×
3323
  }
UNCOV
3324
  mInfo("vgId:%d, vgroup info after split, replica:%d hashrange:[%u, %u] vnode:0 dnode:%d", newVg2.vgId, newVg2.replica,
×
3325
        newVg2.hashBegin, newVg2.hashEnd, newVg2.vnodeGid[0].dnodeId);
UNCOV
3326
  for (int32_t i = 0; i < newVg1.replica; ++i) {
×
UNCOV
3327
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg2.vgId, i, newVg2.vnodeGid[i].dnodeId);
×
3328
  }
3329

3330
  // alter vgId and hash range
UNCOV
3331
  int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
×
UNCOV
3332
  int32_t srcVgId = newVg1.vgId;
×
UNCOV
3333
  newVg1.vgId = maxVgId;
×
UNCOV
3334
  TAOS_CHECK_GOTO(mndAddNewVgPrepareAction(pMnode, pTrans, &newVg1), NULL, _OVER);
×
UNCOV
3335
  TAOS_CHECK_GOTO(mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg1), NULL, _OVER);
×
3336

UNCOV
3337
  maxVgId++;
×
UNCOV
3338
  srcVgId = newVg2.vgId;
×
UNCOV
3339
  newVg2.vgId = maxVgId;
×
UNCOV
3340
  TAOS_CHECK_GOTO(mndAddNewVgPrepareAction(pMnode, pTrans, &newVg2), NULL, _OVER);
×
UNCOV
3341
  TAOS_CHECK_GOTO(mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg2), NULL, _OVER);
×
3342

UNCOV
3343
  TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1), NULL, _OVER);
×
UNCOV
3344
  TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg2), NULL, _OVER);
×
3345

UNCOV
3346
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg1, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION), NULL, _OVER);
×
UNCOV
3347
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg2, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION), NULL, _OVER);
×
UNCOV
3348
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, pVgroup, SDB_STATUS_DROPPED, TRN_STAGE_REDO_ACTION), NULL, _OVER);
×
3349

3350
  // update db status
UNCOV
3351
  memcpy(&dbObj, pDb, sizeof(SDbObj));
×
UNCOV
3352
  if (dbObj.cfg.pRetensions != NULL) {
×
3353
    dbObj.cfg.pRetensions = taosArrayDup(pDb->cfg.pRetensions, NULL);
×
3354
    if (dbObj.cfg.pRetensions == NULL) {
×
3355
      code = terrno;
×
3356
      goto _OVER;
×
3357
    }
3358
  }
UNCOV
3359
  dbObj.vgVersion++;
×
UNCOV
3360
  dbObj.updateTime = taosGetTimestampMs();
×
UNCOV
3361
  dbObj.cfg.numOfVgroups++;
×
UNCOV
3362
  TAOS_CHECK_GOTO(mndAddDbStatusAction(pTrans, &dbObj, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION), NULL, _OVER);
×
3363

3364
  // adjust vgroup replica
UNCOV
3365
  if (pDb->cfg.replications != newVg1.replica) {
×
UNCOV
3366
    SVgObj tmpGroup = {0};
×
UNCOV
3367
    TAOS_CHECK_GOTO(mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg1, pArray, &tmpGroup), NULL, _OVER);
×
3368
  } else {
UNCOV
3369
    TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg1, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
×
3370
  }
3371

UNCOV
3372
  if (pDb->cfg.replications != newVg2.replica) {
×
UNCOV
3373
    SVgObj tmpGroup = {0};
×
UNCOV
3374
    TAOS_CHECK_GOTO(mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg2, pArray, &tmpGroup), NULL, _OVER);
×
3375
  } else {
UNCOV
3376
    TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg2, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
×
3377
  }
3378

UNCOV
3379
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, pVgroup, SDB_STATUS_DROPPED, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
×
3380

3381
  // commit db status
UNCOV
3382
  dbObj.vgVersion++;
×
UNCOV
3383
  dbObj.updateTime = taosGetTimestampMs();
×
UNCOV
3384
  TAOS_CHECK_GOTO(mndAddDbStatusAction(pTrans, &dbObj, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
×
3385

UNCOV
3386
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
UNCOV
3387
  code = 0;
×
3388

UNCOV
3389
_OVER:
×
UNCOV
3390
  taosArrayDestroy(pArray);
×
UNCOV
3391
  mndTransDrop(pTrans);
×
UNCOV
3392
  taosArrayDestroy(dbObj.cfg.pRetensions);
×
UNCOV
3393
  TAOS_RETURN(code);
×
3394
}
3395

3396
extern int32_t mndProcessSplitVgroupMsgImp(SRpcMsg *pReq);
3397

UNCOV
3398
static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) { return mndProcessSplitVgroupMsgImp(pReq); }
×
3399

3400
#ifndef TD_ENTERPRISE
3401
int32_t mndProcessSplitVgroupMsgImp(SRpcMsg *pReq) { return 0; }
3402
#endif
3403

UNCOV
3404
static int32_t mndSetBalanceVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
×
3405
                                              SDnodeObj *pSrc, SDnodeObj *pDst) {
UNCOV
3406
  int32_t code = 0;
×
UNCOV
3407
  SVgObj  newVg = {0};
×
UNCOV
3408
  memcpy(&newVg, pVgroup, sizeof(SVgObj));
×
UNCOV
3409
  mInfo("vgId:%d, vgroup info before balance, replica:%d", newVg.vgId, newVg.replica);
×
UNCOV
3410
  for (int32_t i = 0; i < newVg.replica; ++i) {
×
UNCOV
3411
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
×
3412
  }
3413

UNCOV
3414
  TAOS_CHECK_RETURN(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pDst->id));
×
UNCOV
3415
  TAOS_CHECK_RETURN(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pSrc->id));
×
3416

3417
  {
UNCOV
3418
    SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
×
UNCOV
3419
    if (pRaw == NULL) {
×
3420
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3421
      if (terrno != 0) code = terrno;
×
3422
      TAOS_RETURN(code);
×
3423
    }
UNCOV
3424
    if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
×
3425
      sdbFreeRaw(pRaw);
×
3426
      TAOS_RETURN(code);
×
3427
    }
UNCOV
3428
    code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
×
UNCOV
3429
    if (code != 0) {
×
3430
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
3431
      TAOS_RETURN(code);
×
3432
    }
3433
  }
3434

UNCOV
3435
  mInfo("vgId:%d, vgroup info after balance, replica:%d", newVg.vgId, newVg.replica);
×
UNCOV
3436
  for (int32_t i = 0; i < newVg.replica; ++i) {
×
UNCOV
3437
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
×
3438
  }
UNCOV
3439
  TAOS_RETURN(code);
×
3440
}
3441

UNCOV
3442
static int32_t mndBalanceVgroupBetweenDnode(SMnode *pMnode, STrans *pTrans, SDnodeObj *pSrc, SDnodeObj *pDst,
×
3443
                                            SHashObj *pBalancedVgroups) {
UNCOV
3444
  void   *pIter = NULL;
×
UNCOV
3445
  int32_t code = -1;
×
UNCOV
3446
  SSdb   *pSdb = pMnode->pSdb;
×
3447

UNCOV
3448
  while (1) {
×
UNCOV
3449
    SVgObj *pVgroup = NULL;
×
UNCOV
3450
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
UNCOV
3451
    if (pIter == NULL) break;
×
UNCOV
3452
    if (taosHashGet(pBalancedVgroups, &pVgroup->vgId, sizeof(int32_t)) != NULL) {
×
UNCOV
3453
      sdbRelease(pSdb, pVgroup);
×
UNCOV
3454
      continue;
×
3455
    }
3456

UNCOV
3457
    bool existInSrc = false;
×
UNCOV
3458
    bool existInDst = false;
×
UNCOV
3459
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
×
UNCOV
3460
      SVnodeGid *pGid = &pVgroup->vnodeGid[i];
×
UNCOV
3461
      if (pGid->dnodeId == pSrc->id) existInSrc = true;
×
UNCOV
3462
      if (pGid->dnodeId == pDst->id) existInDst = true;
×
3463
    }
3464

UNCOV
3465
    if (!existInSrc || existInDst) {
×
UNCOV
3466
      sdbRelease(pSdb, pVgroup);
×
UNCOV
3467
      continue;
×
3468
    }
3469

UNCOV
3470
    SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
×
UNCOV
3471
    if (pDb == NULL) {
×
3472
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3473
      if (terrno != 0) code = terrno;
×
3474
      mError("vgId:%d, balance vgroup can't find db obj dbName:%s", pVgroup->vgId, pVgroup->dbName);
×
3475
      goto _OUT;
×
3476
    }
3477

UNCOV
3478
    if (pDb->cfg.withArbitrator) {
×
3479
      mInfo("vgId:%d, db:%s, with arbitrator, balance vgroup not allowed", pVgroup->vgId, pVgroup->dbName);
×
3480
      goto _OUT;
×
3481
    }
3482

UNCOV
3483
    code = mndSetBalanceVgroupInfoToTrans(pMnode, pTrans, pDb, pVgroup, pSrc, pDst);
×
UNCOV
3484
    if (code == 0) {
×
UNCOV
3485
      code = taosHashPut(pBalancedVgroups, &pVgroup->vgId, sizeof(int32_t), &pVgroup->vgId, sizeof(int32_t));
×
3486
    }
3487

3488
  _OUT:
×
UNCOV
3489
    mndReleaseDb(pMnode, pDb);
×
UNCOV
3490
    sdbRelease(pSdb, pVgroup);
×
UNCOV
3491
    sdbCancelFetch(pSdb, pIter);
×
UNCOV
3492
    break;
×
3493
  }
3494

UNCOV
3495
  return code;
×
3496
}
3497

UNCOV
3498
static int32_t mndBalanceVgroup(SMnode *pMnode, SRpcMsg *pReq, SArray *pArray) {
×
UNCOV
3499
  int32_t   code = -1;
×
UNCOV
3500
  int32_t   numOfVgroups = 0;
×
UNCOV
3501
  STrans   *pTrans = NULL;
×
UNCOV
3502
  SHashObj *pBalancedVgroups = NULL;
×
3503

UNCOV
3504
  pBalancedVgroups = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
×
UNCOV
3505
  if (pBalancedVgroups == NULL) goto _OVER;
×
3506

UNCOV
3507
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "balance-vgroup");
×
UNCOV
3508
  if (pTrans == NULL) {
×
3509
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3510
    if (terrno != 0) code = terrno;
×
3511
    goto _OVER;
×
3512
  }
UNCOV
3513
  mndTransSetSerial(pTrans);
×
UNCOV
3514
  mInfo("trans:%d, used to balance vgroup", pTrans->id);
×
UNCOV
3515
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
×
UNCOV
3516
  TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
×
3517

UNCOV
3518
  while (1) {
×
UNCOV
3519
    taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
×
UNCOV
3520
    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
×
UNCOV
3521
      SDnodeObj *pDnode = taosArrayGet(pArray, i);
×
UNCOV
3522
      mInfo("dnode:%d, equivalent vnodes:%d others:%d support:%d, score:%f", pDnode->id, pDnode->numOfVnodes,
×
3523
            pDnode->numOfSupportVnodes, pDnode->numOfOtherNodes, mndGetDnodeScore(pDnode, 0, 1));
3524
    }
3525

UNCOV
3526
    SDnodeObj *pSrc = taosArrayGet(pArray, taosArrayGetSize(pArray) - 1);
×
UNCOV
3527
    SDnodeObj *pDst = taosArrayGet(pArray, 0);
×
3528

UNCOV
3529
    float srcScore = mndGetDnodeScore(pSrc, -1, 1);
×
UNCOV
3530
    float dstScore = mndGetDnodeScore(pDst, 1, 1);
×
UNCOV
3531
    mInfo("trans:%d, after balance, src dnode:%d score:%f, dst dnode:%d score:%f", pTrans->id, pSrc->id, dstScore,
×
3532
          pDst->id, dstScore);
3533

UNCOV
3534
    if (srcScore > dstScore - 0.000001) {
×
UNCOV
3535
      code = mndBalanceVgroupBetweenDnode(pMnode, pTrans, pSrc, pDst, pBalancedVgroups);
×
UNCOV
3536
      if (code == 0) {
×
UNCOV
3537
        pSrc->numOfVnodes--;
×
UNCOV
3538
        pDst->numOfVnodes++;
×
UNCOV
3539
        numOfVgroups++;
×
UNCOV
3540
        continue;
×
3541
      } else {
3542
        mInfo("trans:%d, no vgroup need to balance from dnode:%d to dnode:%d", pTrans->id, pSrc->id, pDst->id);
×
3543
        break;
×
3544
      }
3545
    } else {
UNCOV
3546
      mInfo("trans:%d, no vgroup need to balance any more", pTrans->id);
×
UNCOV
3547
      break;
×
3548
    }
3549
  }
3550

UNCOV
3551
  if (numOfVgroups <= 0) {
×
3552
    mInfo("no need to balance vgroup");
×
3553
    code = 0;
×
3554
  } else {
UNCOV
3555
    mInfo("start to balance vgroup, numOfVgroups:%d", numOfVgroups);
×
UNCOV
3556
    if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
×
UNCOV
3557
    code = TSDB_CODE_ACTION_IN_PROGRESS;
×
3558
  }
3559

UNCOV
3560
_OVER:
×
UNCOV
3561
  taosHashCleanup(pBalancedVgroups);
×
UNCOV
3562
  mndTransDrop(pTrans);
×
UNCOV
3563
  TAOS_RETURN(code);
×
3564
}
3565

UNCOV
3566
static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) {
×
UNCOV
3567
  SMnode *pMnode = pReq->info.node;
×
UNCOV
3568
  int32_t code = -1;
×
UNCOV
3569
  SArray *pArray = NULL;
×
UNCOV
3570
  void   *pIter = NULL;
×
UNCOV
3571
  int64_t curMs = taosGetTimestampMs();
×
3572

UNCOV
3573
  SBalanceVgroupReq req = {0};
×
UNCOV
3574
  if (tDeserializeSBalanceVgroupReq(pReq->pCont, pReq->contLen, &req) != 0) {
×
3575
    code = TSDB_CODE_INVALID_MSG;
×
3576
    goto _OVER;
×
3577
  }
3578

UNCOV
3579
  mInfo("start to balance vgroup");
×
UNCOV
3580
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_BALANCE_VGROUP)) != 0) {
×
UNCOV
3581
    goto _OVER;
×
3582
  }
3583

UNCOV
3584
  while (1) {
×
UNCOV
3585
    SDnodeObj *pDnode = NULL;
×
UNCOV
3586
    pIter = sdbFetch(pMnode->pSdb, SDB_DNODE, pIter, (void **)&pDnode);
×
UNCOV
3587
    if (pIter == NULL) break;
×
UNCOV
3588
    if (!mndIsDnodeOnline(pDnode, curMs)) {
×
UNCOV
3589
      sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
3590
      code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
UNCOV
3591
      mError("failed to balance vgroup since %s, dnode:%d", terrstr(), pDnode->id);
×
UNCOV
3592
      sdbRelease(pMnode->pSdb, pDnode);
×
UNCOV
3593
      goto _OVER;
×
3594
    }
3595

UNCOV
3596
    sdbRelease(pMnode->pSdb, pDnode);
×
3597
  }
3598

UNCOV
3599
  pArray = mndBuildDnodesArray(pMnode, 0, NULL);
×
UNCOV
3600
  if (pArray == NULL) {
×
3601
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3602
    if (terrno != 0) code = terrno;
×
3603
    goto _OVER;
×
3604
  }
3605

UNCOV
3606
  if (taosArrayGetSize(pArray) < 2) {
×
UNCOV
3607
    mInfo("no need to balance vgroup since dnode num less than 2");
×
UNCOV
3608
    code = 0;
×
3609
  } else {
UNCOV
3610
    code = mndBalanceVgroup(pMnode, pReq, pArray);
×
3611
  }
3612

UNCOV
3613
  auditRecord(pReq, pMnode->clusterId, "balanceVgroup", "", "", req.sql, req.sqlLen);
×
3614

UNCOV
3615
_OVER:
×
UNCOV
3616
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
3617
    mError("failed to balance vgroup since %s", tstrerror(code));
×
3618
  }
3619

UNCOV
3620
  taosArrayDestroy(pArray);
×
UNCOV
3621
  tFreeSBalanceVgroupReq(&req);
×
UNCOV
3622
  TAOS_RETURN(code);
×
3623
}
3624

UNCOV
3625
bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; }
×
3626

UNCOV
3627
bool mndVgroupInDnode(SVgObj *pVgroup, int32_t dnodeId) {
×
UNCOV
3628
  for (int i = 0; i < pVgroup->replica; i++) {
×
UNCOV
3629
    if (pVgroup->vnodeGid[i].dnodeId == dnodeId) return true;
×
3630
  }
UNCOV
3631
  return false;
×
3632
}
3633

UNCOV
3634
static void *mndBuildCompactVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen, int64_t compactTs,
×
3635
                                     STimeWindow tw) {
UNCOV
3636
  SCompactVnodeReq compactReq = {0};
×
UNCOV
3637
  compactReq.dbUid = pDb->uid;
×
UNCOV
3638
  compactReq.compactStartTime = compactTs;
×
UNCOV
3639
  compactReq.tw = tw;
×
UNCOV
3640
  tstrncpy(compactReq.db, pDb->name, TSDB_DB_FNAME_LEN);
×
3641

UNCOV
3642
  mInfo("vgId:%d, build compact vnode config req", pVgroup->vgId);
×
UNCOV
3643
  int32_t contLen = tSerializeSCompactVnodeReq(NULL, 0, &compactReq);
×
UNCOV
3644
  if (contLen < 0) {
×
3645
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
3646
    return NULL;
×
3647
  }
UNCOV
3648
  contLen += sizeof(SMsgHead);
×
3649

UNCOV
3650
  void *pReq = taosMemoryMalloc(contLen);
×
UNCOV
3651
  if (pReq == NULL) {
×
3652
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
3653
    return NULL;
×
3654
  }
3655

UNCOV
3656
  SMsgHead *pHead = pReq;
×
UNCOV
3657
  pHead->contLen = htonl(contLen);
×
UNCOV
3658
  pHead->vgId = htonl(pVgroup->vgId);
×
3659

UNCOV
3660
  if (tSerializeSCompactVnodeReq((char *)pReq + sizeof(SMsgHead), contLen, &compactReq) < 0) {
×
3661
    taosMemoryFree(pReq);
×
3662
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
3663
    return NULL;
×
3664
  }
UNCOV
3665
  *pContLen = contLen;
×
UNCOV
3666
  return pReq;
×
3667
}
3668

UNCOV
3669
static int32_t mndAddCompactVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs,
×
3670
                                        STimeWindow tw) {
UNCOV
3671
  int32_t      code = 0;
×
UNCOV
3672
  STransAction action = {0};
×
UNCOV
3673
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
3674

UNCOV
3675
  int32_t contLen = 0;
×
UNCOV
3676
  void   *pReq = mndBuildCompactVnodeReq(pMnode, pDb, pVgroup, &contLen, compactTs, tw);
×
UNCOV
3677
  if (pReq == NULL) {
×
3678
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3679
    if (terrno != 0) code = terrno;
×
3680
    TAOS_RETURN(code);
×
3681
  }
3682

UNCOV
3683
  action.pCont = pReq;
×
UNCOV
3684
  action.contLen = contLen;
×
UNCOV
3685
  action.msgType = TDMT_VND_COMPACT;
×
3686

UNCOV
3687
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
3688
    taosMemoryFree(pReq);
×
3689
    TAOS_RETURN(code);
×
3690
  }
3691

UNCOV
3692
  TAOS_RETURN(code);
×
3693
}
3694

UNCOV
3695
int32_t mndBuildCompactVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs,
×
3696
                                    STimeWindow tw) {
UNCOV
3697
  TAOS_CHECK_RETURN(mndAddCompactVnodeAction(pMnode, pTrans, pDb, pVgroup, compactTs, tw));
×
UNCOV
3698
  return 0;
×
3699
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc