• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4933

20 Jan 2026 10:44AM UTC coverage: 66.671% (+0.03%) from 66.646%
#4933

push

travis-ci

web-flow
merge: from main to 3.0 #34340

73 of 178 new or added lines in 9 files covered. (41.01%)

1199 existing lines in 124 files now uncovered.

203121 of 304663 relevant lines covered (66.67%)

132228377.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.08
/source/dnode/mnode/impl/src/mndVgroup.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndVgroup.h"
18
#include "audit.h"
19
#include "mndArbGroup.h"
20
#include "mndDb.h"
21
#include "mndDnode.h"
22
#include "mndEncryptAlgr.h"
23
#include "mndMnode.h"
24
#include "mndPrivilege.h"
25
#include "mndShow.h"
26
#include "mndStb.h"
27
#include "mndStream.h"
28
#include "mndTopic.h"
29
#include "mndTrans.h"
30
#include "mndUser.h"
31
#include "tmisce.h"
32

33
#define VGROUP_VER_COMPAT_MOUNT_KEEP_VER 2
34
#define VGROUP_VER_NUMBER                VGROUP_VER_COMPAT_MOUNT_KEEP_VER
35
#define VGROUP_RESERVE_SIZE              60
36
// since 3.3.6.32/3.3.8.6 mountId + keepVersion + keepVersionTime + VGROUP_RESERVE_SIZE = 4 + 8 + 8 + 60 = 80
37
#define DLEN_AFTER_SYNC_CONF_CHANGE_VER 80
38

39
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup);
40
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup);
41
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew);
42
static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw);
43

44
static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
45
static void    mndCancelGetNextVgroup(SMnode *pMnode, void *pIter);
46
static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
47
static void    mndCancelGetNextVnode(SMnode *pMnode, void *pIter);
48

49
static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq);
50
static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq);
51
static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq);
52
static int32_t mndProcessVgroupBalanceLeaderMsg(SRpcMsg *pReq);
53
static int32_t mndProcessSetVgroupKeepVersionReq(SRpcMsg *pReq);
54

55
int32_t mndInitVgroup(SMnode *pMnode) {
397,353✔
56
  SSdbTable table = {
397,353✔
57
      .sdbType = SDB_VGROUP,
58
      .keyType = SDB_KEY_INT32,
59
      .encodeFp = (SdbEncodeFp)mndVgroupActionEncode,
60
      .decodeFp = (SdbDecodeFp)mndVgroupActionDecode,
61
      .insertFp = (SdbInsertFp)mndVgroupActionInsert,
62
      .updateFp = (SdbUpdateFp)mndVgroupActionUpdate,
63
      .deleteFp = (SdbDeleteFp)mndVgroupActionDelete,
64
      .validateFp = (SdbValidateFp)mndNewVgActionValidate,
65
  };
66

67
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndTransProcessRsp);
397,353✔
68
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_REPLICA_RSP, mndTransProcessRsp);
397,353✔
69
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIG_RSP, mndTransProcessRsp);
397,353✔
70
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIRM_RSP, mndTransProcessRsp);
397,353✔
71
  mndSetMsgHandle(pMnode, TDMT_VND_SET_KEEP_VERSION_RSP, mndTransProcessRsp);
397,353✔
72
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_HASHRANGE_RSP, mndTransProcessRsp);
397,353✔
73
  mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndTransProcessRsp);
397,353✔
74
  mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_RSP, mndTransProcessRsp);
397,353✔
75
  mndSetMsgHandle(pMnode, TDMT_VND_SCAN_RSP, mndTransProcessRsp);
397,353✔
76
  mndSetMsgHandle(pMnode, TDMT_VND_DISABLE_WRITE_RSP, mndTransProcessRsp);
397,353✔
77
  mndSetMsgHandle(pMnode, TDMT_SYNC_FORCE_FOLLOWER_RSP, mndTransProcessRsp);
397,353✔
78
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_ELECTBASELINE_RSP, mndTransProcessRsp);
397,353✔
79
  
80
  mndSetMsgHandle(pMnode, TDMT_DND_ALTER_VNODE_TYPE_RSP, mndTransProcessRsp);
397,353✔
81
  mndSetMsgHandle(pMnode, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP_RSP, mndTransProcessRsp);
397,353✔
82
  mndSetMsgHandle(pMnode, TDMT_SYNC_CONFIG_CHANGE_RSP, mndTransProcessRsp);
397,353✔
83

84
  mndSetMsgHandle(pMnode, TDMT_MND_REDISTRIBUTE_VGROUP, mndProcessRedistributeVgroupMsg);
397,353✔
85
  mndSetMsgHandle(pMnode, TDMT_MND_SPLIT_VGROUP, mndProcessSplitVgroupMsg);
397,353✔
86
  // mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessVgroupBalanceLeaderMsg);
87
  mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessBalanceVgroupMsg);
397,353✔
88
  mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP_LEADER, mndProcessVgroupBalanceLeaderMsg);
397,353✔
89
  mndSetMsgHandle(pMnode, TDMT_MND_SET_VGROUP_KEEP_VERSION, mndProcessSetVgroupKeepVersionReq);
397,353✔
90

91
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndRetrieveVgroups);
397,353✔
92
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndCancelGetNextVgroup);
397,353✔
93
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndRetrieveVnodes);
397,353✔
94
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndCancelGetNextVnode);
397,353✔
95

96
  return sdbSetTable(pMnode->pSdb, table);
397,353✔
97
}
98

99
void mndCleanupVgroup(SMnode *pMnode) {}
397,295✔
100

101
SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) {
10,350,823✔
102
  int32_t code = 0;
10,350,823✔
103
  int32_t lino = 0;
10,350,823✔
104
  terrno = TSDB_CODE_OUT_OF_MEMORY;
10,350,823✔
105

106
  SSdbRaw *pRaw = sdbAllocRaw(SDB_VGROUP, VGROUP_VER_NUMBER, sizeof(SVgObj) + VGROUP_RESERVE_SIZE);
10,350,823✔
107
  if (pRaw == NULL) goto _OVER;
10,350,823✔
108

109
  int32_t dataPos = 0;
10,350,823✔
110
  SDB_SET_INT32(pRaw, dataPos, pVgroup->vgId, _OVER)
10,350,823✔
111
  SDB_SET_INT64(pRaw, dataPos, pVgroup->createdTime, _OVER)
10,350,823✔
112
  SDB_SET_INT64(pRaw, dataPos, pVgroup->updateTime, _OVER)
10,350,823✔
113
  SDB_SET_INT32(pRaw, dataPos, pVgroup->version, _OVER)
10,350,823✔
114
  SDB_SET_INT32(pRaw, dataPos, pVgroup->hashBegin, _OVER)
10,350,823✔
115
  SDB_SET_INT32(pRaw, dataPos, pVgroup->hashEnd, _OVER)
10,350,823✔
116
  SDB_SET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, _OVER)
10,350,823✔
117
  SDB_SET_INT64(pRaw, dataPos, pVgroup->dbUid, _OVER)
10,350,823✔
118
  SDB_SET_INT8(pRaw, dataPos, pVgroup->isTsma, _OVER)
10,350,823✔
119
  SDB_SET_INT8(pRaw, dataPos, pVgroup->replica, _OVER)
10,350,823✔
120
  for (int8_t i = 0; i < pVgroup->replica; ++i) {
22,486,602✔
121
    SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
12,135,779✔
122
    SDB_SET_INT32(pRaw, dataPos, pVgid->dnodeId, _OVER)
12,135,779✔
123
  }
124
  SDB_SET_INT32(pRaw, dataPos, pVgroup->syncConfChangeVer, _OVER)
10,350,823✔
125
  SDB_SET_INT32(pRaw, dataPos, pVgroup->mountVgId, _OVER)
10,350,823✔
126
  SDB_SET_INT64(pRaw, dataPos, pVgroup->keepVersion, _OVER)
10,350,823✔
127
  SDB_SET_INT64(pRaw, dataPos, pVgroup->keepVersionTime, _OVER)
10,350,823✔
128
  SDB_SET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER)
10,350,823✔
129
  SDB_SET_DATALEN(pRaw, dataPos, _OVER)
10,350,823✔
130

131
  terrno = 0;
10,350,823✔
132

133
_OVER:
10,350,823✔
134
  if (terrno != 0) {
10,350,823✔
135
    mError("vgId:%d, failed to encode to raw:%p since %s", pVgroup->vgId, pRaw, terrstr());
×
136
    sdbFreeRaw(pRaw);
×
137
    return NULL;
×
138
  }
139

140
  mTrace("vgId:%d, encode to raw:%p, row:%p", pVgroup->vgId, pRaw, pVgroup);
10,350,823✔
141
  return pRaw;
10,350,823✔
142
}
143

144
SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
9,347,737✔
145
  int32_t code = 0;
9,347,737✔
146
  int32_t lino = 0;
9,347,737✔
147
  terrno = TSDB_CODE_OUT_OF_MEMORY;
9,347,737✔
148
  SSdbRow *pRow = NULL;
9,347,737✔
149
  SVgObj  *pVgroup = NULL;
9,347,737✔
150

151
  int8_t sver = 0;
9,347,737✔
152
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
9,347,737✔
153

154
  if (sver < 1 || sver > VGROUP_VER_NUMBER) {
9,347,737✔
155
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
156
    goto _OVER;
×
157
  }
158

159
  pRow = sdbAllocRow(sizeof(SVgObj));
9,347,737✔
160
  if (pRow == NULL) goto _OVER;
9,347,737✔
161

162
  pVgroup = sdbGetRowObj(pRow);
9,347,737✔
163
  if (pVgroup == NULL) goto _OVER;
9,347,737✔
164

165
  int32_t dataPos = 0;
9,347,737✔
166
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->vgId, _OVER)
9,347,737✔
167
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->createdTime, _OVER)
9,347,737✔
168
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->updateTime, _OVER)
9,347,737✔
169
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->version, _OVER)
9,347,737✔
170
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashBegin, _OVER)
9,347,737✔
171
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashEnd, _OVER)
9,347,737✔
172
  SDB_GET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, _OVER)
9,347,737✔
173
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->dbUid, _OVER)
9,347,737✔
174
  SDB_GET_INT8(pRaw, dataPos, &pVgroup->isTsma, _OVER)
9,347,737✔
175
  SDB_GET_INT8(pRaw, dataPos, &pVgroup->replica, _OVER)
9,347,737✔
176
  for (int8_t i = 0; i < pVgroup->replica; ++i) {
20,440,432✔
177
    SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
11,092,695✔
178
    SDB_GET_INT32(pRaw, dataPos, &pVgid->dnodeId, _OVER)
11,092,695✔
179
    if (pVgroup->replica == 1) {
11,092,695✔
180
      pVgid->syncState = TAOS_SYNC_STATE_LEADER;
8,436,167✔
181
    }
182
    pVgid->snapSeq = -1;
11,092,695✔
183
  }
184
  if (dataPos + 2 * sizeof(int32_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen) {
9,347,737✔
185
    SDB_GET_INT32(pRaw, dataPos, &pVgroup->syncConfChangeVer, _OVER)
9,347,737✔
186
  }
187

188
  int32_t dlenAfterSyncConfChangeVer = pRaw->dataLen - dataPos;
9,347,737✔
189
  if (dataPos + sizeof(int32_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen) {
9,347,737✔
190
    SDB_GET_INT32(pRaw, dataPos, &pVgroup->mountVgId, _OVER)
9,347,737✔
191
  }
192
  if (dataPos + sizeof(int64_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen) {
9,347,737✔
193
    SDB_GET_INT64(pRaw, dataPos, &pVgroup->keepVersion, _OVER)
9,347,737✔
194
  }
195
  if (dataPos + sizeof(int64_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen) {
9,347,737✔
196
    SDB_GET_INT64(pRaw, dataPos, &pVgroup->keepVersionTime, _OVER)
9,347,737✔
197
  }
198
  if (dataPos + VGROUP_RESERVE_SIZE <= pRaw->dataLen) {
9,347,737✔
199
    SDB_GET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER)
9,347,737✔
200
  }
201

202
  if (sver < VGROUP_VER_COMPAT_MOUNT_KEEP_VER) {
9,347,737✔
203
    if (dlenAfterSyncConfChangeVer == DLEN_AFTER_SYNC_CONF_CHANGE_VER) {
×
204
      pVgroup->mountVgId = 0;
×
205
    }
206
    pVgroup->keepVersion = -1;
×
207
    pVgroup->keepVersionTime = 0;
×
208
  }
209

210
  terrno = 0;
9,347,737✔
211

212
_OVER:
9,347,737✔
213
  if (terrno != 0) {
9,347,737✔
214
    mError("vgId:%d, failed to decode from raw:%p since %s", pVgroup == NULL ? 0 : pVgroup->vgId, pRaw, terrstr());
×
215
    taosMemoryFreeClear(pRow);
×
216
    return NULL;
×
217
  }
218

219
  mTrace("vgId:%d, decode from raw:%p, row:%p", pVgroup->vgId, pRaw, pVgroup);
9,347,737✔
220
  return pRow;
9,347,737✔
221
}
222

223
static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw) {
2,446,230✔
224
  SSdb    *pSdb = pMnode->pSdb;
2,446,230✔
225
  SSdbRow *pRow = NULL;
2,446,230✔
226
  SVgObj  *pVgroup = NULL;
2,446,230✔
227
  int      code = -1;
2,446,230✔
228

229
  pRow = mndVgroupActionDecode(pRaw);
2,446,230✔
230
  if (pRow == NULL) {
2,446,230✔
231
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
232
    if (terrno != 0) code = terrno;
×
233
    goto _OVER;
×
234
  }
235
  pVgroup = sdbGetRowObj(pRow);
2,446,230✔
236
  if (pVgroup == NULL) {
2,446,230✔
237
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
238
    if (terrno != 0) code = terrno;
×
239
    goto _OVER;
×
240
  }
241

242
  int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
2,446,230✔
243
  if (maxVgId > pVgroup->vgId) {
2,446,230✔
244
    mError("trans:%d, vgroup id %d already in use. maxVgId:%d", pTrans->id, pVgroup->vgId, maxVgId);
×
245
    goto _OVER;
×
246
  }
247

248
  code = 0;
2,446,230✔
249
_OVER:
2,446,230✔
250
  if (pVgroup) mndVgroupActionDelete(pSdb, pVgroup);
2,446,230✔
251
  taosMemoryFreeClear(pRow);
2,446,230✔
252
  TAOS_RETURN(code);
2,446,230✔
253
}
254

255
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup) {
2,729,052✔
256
  mTrace("vgId:%d, perform insert action, row:%p", pVgroup->vgId, pVgroup);
2,729,052✔
257
  return 0;
2,729,052✔
258
}
259

260
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup) {
9,337,689✔
261
  mTrace("vgId:%d, perform delete action, row:%p", pVgroup->vgId, pVgroup);
9,337,689✔
262
  return 0;
9,337,689✔
263
}
264

265
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew) {
2,846,840✔
266
  mTrace("vgId:%d, perform update action, old row:%p new row:%p", pOld->vgId, pOld, pNew);
2,846,840✔
267
  pOld->updateTime = pNew->updateTime;
2,846,840✔
268
  pOld->version = pNew->version;
2,846,840✔
269
  pOld->hashBegin = pNew->hashBegin;
2,846,840✔
270
  pOld->hashEnd = pNew->hashEnd;
2,846,840✔
271
  pOld->replica = pNew->replica;
2,846,840✔
272
  pOld->isTsma = pNew->isTsma;
2,846,840✔
273
  pOld->keepVersion = pNew->keepVersion;
2,846,840✔
274
  pOld->keepVersionTime = pNew->keepVersionTime;
2,846,840✔
275
  for (int32_t i = 0; i < pNew->replica; ++i) {
6,612,154✔
276
    SVnodeGid *pNewGid = &pNew->vnodeGid[i];
3,765,314✔
277
    for (int32_t j = 0; j < pOld->replica; ++j) {
10,348,698✔
278
      SVnodeGid *pOldGid = &pOld->vnodeGid[j];
6,583,384✔
279
      if (pNewGid->dnodeId == pOldGid->dnodeId) {
6,583,384✔
280
        pNewGid->syncState = pOldGid->syncState;
3,542,924✔
281
        pNewGid->syncRestore = pOldGid->syncRestore;
3,542,924✔
282
        pNewGid->syncCanRead = pOldGid->syncCanRead;
3,542,924✔
283
        pNewGid->syncAppliedIndex = pOldGid->syncAppliedIndex;
3,542,924✔
284
        pNewGid->syncCommitIndex = pOldGid->syncCommitIndex;
3,542,924✔
285
        pNewGid->bufferSegmentUsed = pOldGid->bufferSegmentUsed;
3,542,924✔
286
        pNewGid->bufferSegmentSize = pOldGid->bufferSegmentSize;
3,542,924✔
287
        pNewGid->learnerProgress = pOldGid->learnerProgress;
3,542,924✔
288
        pNewGid->snapSeq = pOldGid->snapSeq;
3,542,924✔
289
        pNewGid->syncTotalIndex = pOldGid->syncTotalIndex;
3,542,924✔
290
      }
291
    }
292
  }
293
  pNew->numOfTables = pOld->numOfTables;
2,846,840✔
294
  pNew->numOfTimeSeries = pOld->numOfTimeSeries;
2,846,840✔
295
  pNew->totalStorage = pOld->totalStorage;
2,846,840✔
296
  pNew->compStorage = pOld->compStorage;
2,846,840✔
297
  pNew->pointsWritten = pOld->pointsWritten;
2,846,840✔
298
  pNew->compact = pOld->compact;
2,846,840✔
299
  memcpy(pOld->vnodeGid, pNew->vnodeGid, (TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA) * sizeof(SVnodeGid));
2,846,840✔
300
  pOld->syncConfChangeVer = pNew->syncConfChangeVer;
2,846,840✔
301
  tstrncpy(pOld->dbName, pNew->dbName, TSDB_DB_FNAME_LEN);
2,846,840✔
302
  return 0;
2,846,840✔
303
}
304

305
SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId) {
124,357,813✔
306
  SSdb   *pSdb = pMnode->pSdb;
124,357,813✔
307
  SVgObj *pVgroup = sdbAcquire(pSdb, SDB_VGROUP, &vgId);
124,357,813✔
308
  if (pVgroup == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
124,357,813✔
309
    terrno = TSDB_CODE_MND_VGROUP_NOT_EXIST;
855,206✔
310
  }
311
  return pVgroup;
124,357,813✔
312
}
313

314
void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) {
123,683,613✔
315
  SSdb *pSdb = pMnode->pSdb;
123,683,613✔
316
  sdbRelease(pSdb, pVgroup);
123,683,613✔
317
}
123,683,613✔
318

319
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
2,720,971✔
320
  SCreateVnodeReq createReq = {0};
2,720,971✔
321
  createReq.vgId = pVgroup->vgId;
2,720,971✔
322
  memcpy(createReq.db, pDb->name, TSDB_DB_FNAME_LEN);
2,720,971✔
323
  createReq.dbUid = pDb->uid;
2,720,971✔
324
  createReq.vgVersion = pVgroup->version;
2,720,971✔
325
  createReq.numOfStables = pDb->cfg.numOfStables;
2,720,971✔
326
  createReq.buffer = pDb->cfg.buffer;
2,720,971✔
327
  createReq.pageSize = pDb->cfg.pageSize;
2,720,971✔
328
  createReq.pages = pDb->cfg.pages;
2,720,971✔
329
  createReq.cacheLastSize = pDb->cfg.cacheLastSize;
2,720,971✔
330
  createReq.daysPerFile = pDb->cfg.daysPerFile;
2,720,971✔
331
  createReq.daysToKeep0 = pDb->cfg.daysToKeep0;
2,720,971✔
332
  createReq.daysToKeep1 = pDb->cfg.daysToKeep1;
2,720,971✔
333
  createReq.daysToKeep2 = pDb->cfg.daysToKeep2;
2,720,971✔
334
  createReq.keepTimeOffset = pDb->cfg.keepTimeOffset;
2,720,971✔
335
  createReq.ssChunkSize = pDb->cfg.ssChunkSize;
2,720,971✔
336
  createReq.ssKeepLocal = pDb->cfg.ssKeepLocal;
2,720,971✔
337
  createReq.ssCompact = pDb->cfg.ssCompact;
2,720,971✔
338
  createReq.minRows = pDb->cfg.minRows;
2,720,971✔
339
  createReq.maxRows = pDb->cfg.maxRows;
2,720,971✔
340
  createReq.walFsyncPeriod = pDb->cfg.walFsyncPeriod;
2,720,971✔
341
  createReq.walLevel = pDb->cfg.walLevel;
2,720,971✔
342
  createReq.precision = pDb->cfg.precision;
2,720,971✔
343
  createReq.compression = pDb->cfg.compression;
2,720,971✔
344
  createReq.strict = pDb->cfg.strict;
2,720,971✔
345
  createReq.cacheLast = pDb->cfg.cacheLast;
2,720,971✔
346
  createReq.replica = 0;
2,720,971✔
347
  createReq.learnerReplica = 0;
2,720,971✔
348
  createReq.selfIndex = -1;
2,720,971✔
349
  createReq.learnerSelfIndex = -1;
2,720,971✔
350
  createReq.hashBegin = pVgroup->hashBegin;
2,720,971✔
351
  createReq.hashEnd = pVgroup->hashEnd;
2,720,971✔
352
  createReq.hashMethod = pDb->cfg.hashMethod;
2,720,971✔
353
  createReq.numOfRetensions = pDb->cfg.numOfRetensions;
2,720,971✔
354
  createReq.pRetensions = pDb->cfg.pRetensions;
2,720,971✔
355
  createReq.isTsma = pVgroup->isTsma;
2,720,971✔
356
  createReq.pTsma = pVgroup->pTsma;
2,720,971✔
357
  createReq.walRetentionPeriod = pDb->cfg.walRetentionPeriod;
2,720,971✔
358
  createReq.walRetentionSize = pDb->cfg.walRetentionSize;
2,720,971✔
359
  createReq.walRollPeriod = pDb->cfg.walRollPeriod;
2,720,971✔
360
  createReq.walSegmentSize = pDb->cfg.walSegmentSize;
2,720,971✔
361
  createReq.sstTrigger = pDb->cfg.sstTrigger;
2,720,971✔
362
  createReq.hashPrefix = pDb->cfg.hashPrefix;
2,720,971✔
363
  createReq.hashSuffix = pDb->cfg.hashSuffix;
2,720,971✔
364
  createReq.tsdbPageSize = pDb->cfg.tsdbPageSize;
2,720,971✔
365
  createReq.changeVersion = ++(pVgroup->syncConfChangeVer);
2,720,971✔
366
  // createReq.encryptAlgorithm = pDb->cfg.encryptAlgorithm;
367
  memset(createReq.encryptAlgrName, 0, TSDB_ENCRYPT_ALGR_NAME_LEN);
2,720,971✔
368
  if (pDb->cfg.encryptAlgorithm > 0) {
2,720,971✔
369
    mndGetEncryptOsslAlgrNameById(pMnode, pDb->cfg.encryptAlgorithm, createReq.encryptAlgrName);
1,604✔
370
  }
371
  int32_t code = 0;
2,720,971✔
372

373
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
6,466,731✔
374
    SReplica *pReplica = NULL;
3,745,760✔
375

376
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
3,745,760✔
377
      pReplica = &createReq.replicas[createReq.replica];
3,640,998✔
378
    } else {
379
      pReplica = &createReq.learnerReplicas[createReq.learnerReplica];
104,762✔
380
    }
381

382
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
3,745,760✔
383
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
3,745,760✔
384
    if (pVgidDnode == NULL) {
3,745,760✔
385
      return NULL;
×
386
    }
387

388
    pReplica->id = pVgidDnode->id;
3,745,760✔
389
    pReplica->port = pVgidDnode->port;
3,745,760✔
390
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
3,745,760✔
391
    mndReleaseDnode(pMnode, pVgidDnode);
3,745,760✔
392

393
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
3,745,760✔
394
      if (pDnode->id == pVgid->dnodeId) {
3,640,998✔
395
        createReq.selfIndex = createReq.replica;
2,616,209✔
396
      }
397
    } else {
398
      if (pDnode->id == pVgid->dnodeId) {
104,762✔
399
        createReq.learnerSelfIndex = createReq.learnerReplica;
104,762✔
400
      }
401
    }
402

403
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
3,745,760✔
404
      createReq.replica++;
3,640,998✔
405
    } else {
406
      createReq.learnerReplica++;
104,762✔
407
    }
408
  }
409

410
  if (createReq.selfIndex == -1 && createReq.learnerSelfIndex == -1) {
2,720,971✔
411
    terrno = TSDB_CODE_APP_ERROR;
×
412
    return NULL;
×
413
  }
414

415
  createReq.changeVersion = pVgroup->syncConfChangeVer;
2,720,971✔
416

417
  mInfo(
2,720,971✔
418
      "vgId:%d, build create vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d "
419
      "changeVersion:%d",
420
      createReq.vgId, createReq.replica, createReq.selfIndex, createReq.learnerReplica, createReq.learnerSelfIndex,
421
      createReq.strict, createReq.changeVersion);
422
  for (int32_t i = 0; i < createReq.replica; ++i) {
6,361,969✔
423
    mInfo("vgId:%d, replica:%d ep:%s:%u", createReq.vgId, i, createReq.replicas[i].fqdn, createReq.replicas[i].port);
3,640,998✔
424
  }
425
  for (int32_t i = 0; i < createReq.learnerReplica; ++i) {
2,825,733✔
426
    mInfo("vgId:%d, replica:%d ep:%s:%u", createReq.vgId, i, createReq.learnerReplicas[i].fqdn,
104,762✔
427
          createReq.learnerReplicas[i].port);
428
  }
429

430
  int32_t contLen = tSerializeSCreateVnodeReq(NULL, 0, &createReq);
2,720,971✔
431
  if (contLen < 0) {
2,720,971✔
432
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
433
    return NULL;
×
434
  }
435

436
  void *pReq = taosMemoryMalloc(contLen);
2,720,971✔
437
  if (pReq == NULL) {
2,720,971✔
438
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
439
    return NULL;
×
440
  }
441

442
  code = tSerializeSCreateVnodeReq(pReq, contLen, &createReq);
2,720,971✔
443
  if (code < 0) {
2,720,971✔
444
    terrno = TSDB_CODE_APP_ERROR;
×
445
    taosMemoryFree(pReq);
×
446
    mError("vgId:%d, failed to serialize create vnode req,since %s", createReq.vgId, terrstr());
×
447
    return NULL;
×
448
  }
449
  *pContLen = contLen;
2,720,971✔
450
  return pReq;
2,720,971✔
451
}
452

453
static void *mndBuildAlterVnodeConfigReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
197,350✔
454
  SAlterVnodeConfigReq alterReq = {0};
197,350✔
455
  alterReq.vgVersion = pVgroup->version;
197,350✔
456
  alterReq.buffer = pDb->cfg.buffer;
197,350✔
457
  alterReq.pageSize = pDb->cfg.pageSize;
197,350✔
458
  alterReq.pages = pDb->cfg.pages;
197,350✔
459
  alterReq.cacheLastSize = pDb->cfg.cacheLastSize;
197,350✔
460
  alterReq.daysPerFile = pDb->cfg.daysPerFile;
197,350✔
461
  alterReq.daysToKeep0 = pDb->cfg.daysToKeep0;
197,350✔
462
  alterReq.daysToKeep1 = pDb->cfg.daysToKeep1;
197,350✔
463
  alterReq.daysToKeep2 = pDb->cfg.daysToKeep2;
197,350✔
464
  alterReq.keepTimeOffset = pDb->cfg.keepTimeOffset;
197,350✔
465
  alterReq.walFsyncPeriod = pDb->cfg.walFsyncPeriod;
197,350✔
466
  alterReq.walLevel = pDb->cfg.walLevel;
197,350✔
467
  alterReq.strict = pDb->cfg.strict;
197,350✔
468
  alterReq.cacheLast = pDb->cfg.cacheLast;
197,350✔
469
  alterReq.sttTrigger = pDb->cfg.sstTrigger;
197,350✔
470
  alterReq.minRows = pDb->cfg.minRows;
197,350✔
471
  alterReq.walRetentionPeriod = pDb->cfg.walRetentionPeriod;
197,350✔
472
  alterReq.walRetentionSize = pDb->cfg.walRetentionSize;
197,350✔
473
  alterReq.ssKeepLocal = pDb->cfg.ssKeepLocal;
197,350✔
474
  alterReq.ssCompact = pDb->cfg.ssCompact;
197,350✔
475

476
  mInfo("vgId:%d, build alter vnode config req", pVgroup->vgId);
197,350✔
477
  int32_t contLen = tSerializeSAlterVnodeConfigReq(NULL, 0, &alterReq);
197,350✔
478
  if (contLen < 0) {
197,350✔
479
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
480
    return NULL;
×
481
  }
482
  contLen += sizeof(SMsgHead);
197,350✔
483

484
  void *pReq = taosMemoryMalloc(contLen);
197,350✔
485
  if (pReq == NULL) {
197,350✔
486
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
487
    return NULL;
×
488
  }
489

490
  SMsgHead *pHead = pReq;
197,350✔
491
  pHead->contLen = htonl(contLen);
197,350✔
492
  pHead->vgId = htonl(pVgroup->vgId);
197,350✔
493

494
  if (tSerializeSAlterVnodeConfigReq((char *)pReq + sizeof(SMsgHead), contLen, &alterReq) < 0) {
197,350✔
495
    taosMemoryFree(pReq);
×
496
    mError("vgId:%d, failed to serialize alter vnode config req,since %s", pVgroup->vgId, terrstr());
×
497
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
498
    return NULL;
×
499
  }
500
  *pContLen = contLen;
197,350✔
501
  return pReq;
197,350✔
502
}
503

504
static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId,
795,627✔
505
                                          int32_t *pContLen) {
506
  SAlterVnodeReplicaReq alterReq = {
1,591,254✔
507
      .vgId = pVgroup->vgId,
795,627✔
508
      .strict = pDb->cfg.strict,
795,627✔
509
      .replica = 0,
510
      .learnerReplica = 0,
511
      .selfIndex = -1,
512
      .learnerSelfIndex = -1,
513
      .changeVersion = ++(pVgroup->syncConfChangeVer),
1,591,254✔
514
  };
515

516
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
3,257,941✔
517
    SReplica *pReplica = NULL;
2,462,314✔
518

519
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
2,462,314✔
520
      pReplica = &alterReq.replicas[alterReq.replica];
2,268,769✔
521
      alterReq.replica++;
2,268,769✔
522
    } else {
523
      pReplica = &alterReq.learnerReplicas[alterReq.learnerReplica];
193,545✔
524
      alterReq.learnerReplica++;
193,545✔
525
    }
526

527
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
2,462,314✔
528
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
2,462,314✔
529
    if (pVgidDnode == NULL) return NULL;
2,462,314✔
530

531
    pReplica->id = pVgidDnode->id;
2,462,314✔
532
    pReplica->port = pVgidDnode->port;
2,462,314✔
533
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
2,462,314✔
534
    mndReleaseDnode(pMnode, pVgidDnode);
2,462,314✔
535

536
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
2,462,314✔
537
      if (dnodeId == pVgid->dnodeId) {
2,268,769✔
538
        alterReq.selfIndex = v;
795,627✔
539
      }
540
    } else {
541
      if (dnodeId == pVgid->dnodeId) {
193,545✔
542
        alterReq.learnerSelfIndex = v;
×
543
      }
544
    }
545
  }
546

547
  mInfo(
795,627✔
548
      "vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d "
549
      "changeVersion:%d",
550
      alterReq.vgId, alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica, alterReq.learnerSelfIndex,
551
      alterReq.strict, alterReq.changeVersion);
552
  for (int32_t i = 0; i < alterReq.replica; ++i) {
3,064,396✔
553
    mInfo("vgId:%d, replica:%d ep:%s:%u", alterReq.vgId, i, alterReq.replicas[i].fqdn, alterReq.replicas[i].port);
2,268,769✔
554
  }
555
  for (int32_t i = 0; i < alterReq.learnerReplica; ++i) {
989,172✔
556
    mInfo("vgId:%d, learnerReplica:%d ep:%s:%u", alterReq.vgId, i, alterReq.learnerReplicas[i].fqdn,
193,545✔
557
          alterReq.learnerReplicas[i].port);
558
  }
559

560
  if (alterReq.selfIndex == -1 && alterReq.learnerSelfIndex == -1) {
795,627✔
561
    terrno = TSDB_CODE_APP_ERROR;
×
562
    return NULL;
×
563
  }
564

565
  int32_t contLen = tSerializeSAlterVnodeReplicaReq(NULL, 0, &alterReq);
795,627✔
566
  if (contLen < 0) {
795,627✔
567
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
568
    return NULL;
×
569
  }
570

571
  void *pReq = taosMemoryMalloc(contLen);
795,627✔
572
  if (pReq == NULL) {
795,627✔
573
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
574
    return NULL;
×
575
  }
576

577
  if (tSerializeSAlterVnodeReplicaReq(pReq, contLen, &alterReq) < 0) {
795,627✔
578
    mError("vgId:%d, failed to serialize alter vnode req,since %s", alterReq.vgId, terrstr());
×
579
    taosMemoryFree(pReq);
×
580
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
581
    return NULL;
×
582
  }
583
  *pContLen = contLen;
795,627✔
584
  return pReq;
795,627✔
585
}
586

587
static void *mndBuildCheckLearnCatchupReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId,
×
588
                                          int32_t *pContLen) {
589
  SCheckLearnCatchupReq req = {
×
590
      .vgId = pVgroup->vgId,
×
591
      .strict = pDb->cfg.strict,
×
592
      .replica = 0,
593
      .learnerReplica = 0,
594
      .selfIndex = -1,
595
      .learnerSelfIndex = -1,
596
  };
597

598
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
×
599
    SReplica *pReplica = NULL;
×
600

601
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
602
      pReplica = &req.replicas[req.replica];
×
603
      req.replica++;
×
604
    } else {
605
      pReplica = &req.learnerReplicas[req.learnerReplica];
×
606
      req.learnerReplica++;
×
607
    }
608

609
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
×
610
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
×
611
    if (pVgidDnode == NULL) return NULL;
×
612

613
    pReplica->id = pVgidDnode->id;
×
614
    pReplica->port = pVgidDnode->port;
×
615
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
×
616
    mndReleaseDnode(pMnode, pVgidDnode);
×
617

618
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
619
      if (dnodeId == pVgid->dnodeId) {
×
620
        req.selfIndex = v;
×
621
      }
622
    } else {
623
      if (dnodeId == pVgid->dnodeId) {
×
624
        req.learnerSelfIndex = v;
×
625
      }
626
    }
627
  }
628

629
  mInfo("vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d",
×
630
        req.vgId, req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict);
631
  for (int32_t i = 0; i < req.replica; ++i) {
×
632
    mInfo("vgId:%d, replica:%d ep:%s:%u", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port);
×
633
  }
634
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
×
635
    mInfo("vgId:%d, learnerReplica:%d ep:%s:%u", req.vgId, i, req.learnerReplicas[i].fqdn, req.learnerReplicas[i].port);
×
636
  }
637

638
  if (req.selfIndex == -1 && req.learnerSelfIndex == -1) {
×
639
    terrno = TSDB_CODE_APP_ERROR;
×
640
    return NULL;
×
641
  }
642

643
  int32_t contLen = tSerializeSAlterVnodeReplicaReq(NULL, 0, &req);
×
644
  if (contLen < 0) {
×
645
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
646
    return NULL;
×
647
  }
648

649
  void *pReq = taosMemoryMalloc(contLen);
×
650
  if (pReq == NULL) {
×
651
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
652
    return NULL;
×
653
  }
654

655
  if (tSerializeSAlterVnodeReplicaReq(pReq, contLen, &req) < 0) {
×
656
    mError("vgId:%d, failed to serialize alter vnode req,since %s", req.vgId, terrstr());
×
657
    taosMemoryFree(pReq);
×
658
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
659
    return NULL;
×
660
  }
661
  *pContLen = contLen;
×
662
  return pReq;
×
663
}
664

665
static void *mndBuildDisableVnodeWriteReq(SMnode *pMnode, SDbObj *pDb, int32_t vgId, int32_t *pContLen) {
32,230✔
666
  SDisableVnodeWriteReq disableReq = {
32,230✔
667
      .vgId = vgId,
668
      .disable = 1,
669
  };
670

671
  mInfo("vgId:%d, build disable vnode write req", vgId);
32,230✔
672
  int32_t contLen = tSerializeSDisableVnodeWriteReq(NULL, 0, &disableReq);
32,230✔
673
  if (contLen < 0) {
32,230✔
674
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
675
    return NULL;
×
676
  }
677

678
  void *pReq = taosMemoryMalloc(contLen);
32,230✔
679
  if (pReq == NULL) {
32,230✔
680
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
681
    return NULL;
×
682
  }
683

684
  if (tSerializeSDisableVnodeWriteReq(pReq, contLen, &disableReq) < 0) {
32,230✔
685
    mError("vgId:%d, failed to serialize disable vnode write req,since %s", vgId, terrstr());
×
686
    taosMemoryFree(pReq);
×
687
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
688
    return NULL;
×
689
  }
690
  *pContLen = contLen;
32,230✔
691
  return pReq;
32,230✔
692
}
693

694
static void *mndBuildAlterVnodeHashRangeReq(SMnode *pMnode, int32_t srcVgId, SVgObj *pVgroup, int32_t *pContLen) {
32,230✔
695
  SAlterVnodeHashRangeReq alterReq = {
64,460✔
696
      .srcVgId = srcVgId,
697
      .dstVgId = pVgroup->vgId,
32,230✔
698
      .hashBegin = pVgroup->hashBegin,
32,230✔
699
      .hashEnd = pVgroup->hashEnd,
32,230✔
700
      .changeVersion = ++(pVgroup->syncConfChangeVer),
64,460✔
701
  };
702

703
  mInfo("vgId:%d, build alter vnode hashrange req, dstVgId:%d, hashrange:[%u, %u]", srcVgId, pVgroup->vgId,
32,230✔
704
        pVgroup->hashBegin, pVgroup->hashEnd);
705
  int32_t contLen = tSerializeSAlterVnodeHashRangeReq(NULL, 0, &alterReq);
32,230✔
706
  if (contLen < 0) {
32,230✔
707
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
708
    return NULL;
×
709
  }
710

711
  void *pReq = taosMemoryMalloc(contLen);
32,230✔
712
  if (pReq == NULL) {
32,230✔
713
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
714
    return NULL;
×
715
  }
716

717
  if (tSerializeSAlterVnodeHashRangeReq(pReq, contLen, &alterReq) < 0) {
32,230✔
718
    mError("vgId:%d, failed to serialize alter vnode hashrange req,since %s", srcVgId, terrstr());
×
719
    taosMemoryFree(pReq);
×
720
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
721
    return NULL;
×
722
  }
723
  *pContLen = contLen;
32,230✔
724
  return pReq;
32,230✔
725
}
726

727
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
4,017,672✔
728
  SDropVnodeReq dropReq = {0};
4,017,672✔
729
  dropReq.dnodeId = pDnode->id;
4,017,672✔
730
  dropReq.vgId = pVgroup->vgId;
4,017,672✔
731
  memcpy(dropReq.db, pDb->name, TSDB_DB_FNAME_LEN);
4,017,672✔
732
  dropReq.dbUid = pDb->uid;
4,017,672✔
733

734
  mInfo("vgId:%d, build drop vnode req", dropReq.vgId);
4,017,672✔
735
  int32_t contLen = tSerializeSDropVnodeReq(NULL, 0, &dropReq);
4,017,672✔
736
  if (contLen < 0) {
4,017,672✔
737
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
738
    return NULL;
×
739
  }
740

741
  void *pReq = taosMemoryMalloc(contLen);
4,017,672✔
742
  if (pReq == NULL) {
4,017,672✔
743
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
744
    return NULL;
×
745
  }
746

747
  if (tSerializeSDropVnodeReq(pReq, contLen, &dropReq) < 0) {
4,017,672✔
748
    mError("vgId:%d, failed to serialize drop vnode req,since %s", dropReq.vgId, terrstr());
×
749
    taosMemoryFree(pReq);
×
750
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
751
    return NULL;
×
752
  }
753
  *pContLen = contLen;
4,017,672✔
754
  return pReq;
4,017,672✔
755
}
756

757
static bool mndResetDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
1,774,652✔
758
  SDnodeObj *pDnode = pObj;
1,774,652✔
759
  pDnode->numOfVnodes = 0;
1,774,652✔
760
  pDnode->numOfOtherNodes = 0;
1,774,652✔
761
  return true;
1,774,652✔
762
}
763

764
static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
1,774,652✔
765
  SDnodeObj *pDnode = pObj;
1,774,652✔
766
  SArray    *pArray = p1;
1,774,652✔
767
  int32_t    exceptDnodeId = *(int32_t *)p2;
1,774,652✔
768
  SArray    *dnodeList = p3;
1,774,652✔
769

770
  if (exceptDnodeId == pDnode->id) {
1,774,652✔
771
    return true;
6,994✔
772
  }
773

774
  if (dnodeList != NULL) {
1,767,658✔
775
    int32_t dnodeListSize = taosArrayGetSize(dnodeList);
68,139✔
776
    if (dnodeListSize > 0) {
68,139✔
777
      bool inDnodeList = false;
68,139✔
778
      for (int32_t index = 0; index < dnodeListSize; ++index) {
222,042✔
779
        int32_t dnodeId = *(int32_t *)taosArrayGet(dnodeList, index);
153,903✔
780
        if (pDnode->id == dnodeId) {
153,903✔
781
          inDnodeList = true;
31,479✔
782
        }
783
      }
784
      if (!inDnodeList) {
68,139✔
785
        return true;
36,660✔
786
      }
787
    } else {
788
      return true;  // TS-6191
×
789
    }
790
  }
791

792
  int64_t curMs = taosGetTimestampMs();
1,730,998✔
793
  bool    online = mndIsDnodeOnline(pDnode, curMs);
1,730,998✔
794
  bool    isMnode = mndIsMnode(pMnode, pDnode->id);
1,730,998✔
795
  pDnode->numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
1,730,998✔
796
  pDnode->memUsed = mndGetVnodesMemory(pMnode, pDnode->id);
1,730,998✔
797

798
  mInfo("dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d online:%d memory avail:%" PRId64 " used:%" PRId64, pDnode->id,
1,730,998✔
799
        pDnode->numOfVnodes, pDnode->numOfSupportVnodes, isMnode, online, pDnode->memAvail, pDnode->memUsed);
800

801
  if (isMnode) {
1,730,998✔
802
    pDnode->numOfOtherNodes++;
1,235,920✔
803
  }
804

805
  if (online && pDnode->numOfSupportVnodes > 0) {
1,730,998✔
806
    if (taosArrayPush(pArray, pDnode) == NULL) return false;
1,685,975✔
807
  }
808
  return true;
1,730,998✔
809
}
810

811
static bool isDnodeInList(SArray *dnodeList, int32_t dnodeId) {
×
812
  int32_t dnodeListSize = taosArrayGetSize(dnodeList);
×
813
  for (int32_t i = 0; i < dnodeListSize; ++i) {
×
814
    int32_t id = *(int32_t *)TARRAY_GET_ELEM(dnodeList, i);
×
815
    if (id == dnodeId) {
×
816
      return true;
×
817
    }
818
  }
819
  return false;
×
820
}
821

822
#ifdef TD_ENTERPRISE
823
static float mndGetDnodeScore1(SDnodeObj *pDnode, int32_t additionDnodes, float ratio) {
×
824
  float totalDnodes = pDnode->numOfVnodes + (float)pDnode->numOfOtherNodes * ratio + additionDnodes;
×
825
  float result = totalDnodes / pDnode->numOfSupportVnodes;
×
826
  return pDnode->numOfVnodes > 0 ? -result : result;
×
827
}
828

829
static int32_t mndCompareDnodeVnodes1(SDnodeObj *pDnode1, SDnodeObj *pDnode2) {
×
830
  float d1Score = mndGetDnodeScore1(pDnode1, 0, 0.9);
×
831
  float d2Score = mndGetDnodeScore1(pDnode2, 0, 0.9);
×
832
  if (d1Score == d2Score) {
×
833
    if (pDnode1->id == pDnode2->id) {
×
834
      return 0;
×
835
    }
836
    return pDnode1->id > pDnode2->id ? 1 : -1;
×
837
  }
838
  return d1Score > d2Score ? 1 : -1;
×
839
}
840

841
static bool mndBuildDnodesListFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
×
842
  SDnodeObj *pDnode = pObj;
×
843
  SArray    *pArray = p1;
×
844

845
  bool isMnode = mndIsMnode(pMnode, pDnode->id);
×
846
  pDnode->numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
×
847

848
  if (isMnode) {
×
849
    pDnode->numOfOtherNodes++;
×
850
  }
851

852
  if (pDnode->numOfSupportVnodes > 0) {
×
853
    if (taosArrayPush(pArray, pDnode) == NULL) return false;
×
854
  }
855
  return true;
×
856
}
857

858
// TS-6191
859
static int32_t mndBuildNodesCheckDualReplica(SMnode *pMnode, int32_t nDnodes, SArray *dnodeList, SArray **ppDnodeList) {
1,205,394✔
860
  int32_t code = 0;
1,205,394✔
861
  if (!grantCheckDualReplicaDnodes(pMnode)) {
1,205,394✔
862
    TAOS_RETURN(code);
1,205,394✔
863
  }
864
  SSdb   *pSdb = pMnode->pSdb;
×
865
  SArray *pArray = taosArrayInit(nDnodes, sizeof(SDnodeObj));
×
866
  if (pArray == NULL) {
×
867
    TAOS_RETURN(code = terrno);
×
868
  }
869
  *ppDnodeList = pArray;
×
870

871
  sdbTraverse(pSdb, SDB_DNODE, mndResetDnodesArrayFp, NULL, NULL, NULL);
×
872
  sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesListFp, pArray, NULL, NULL);
×
873

874
  int32_t arrSize = taosArrayGetSize(pArray);
×
875
  if (arrSize <= 0) {
×
876
    TAOS_RETURN(code);
×
877
  }
878
  if (arrSize > 1) taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes1);
×
879

880
  int32_t dnodeListSize = taosArrayGetSize(dnodeList);
×
881
  if (dnodeListSize <= 0) {
×
882
    if (arrSize > 2) taosArrayRemoveBatch(pArray, 2, arrSize - 2, NULL);
×
883
  } else {
884
    int32_t nDnodesWithVnodes = 0;
×
885
    for (int32_t i = 0; i < arrSize; ++i) {
×
886
      SDnodeObj *pDnode = TARRAY_GET_ELEM(pArray, i);
×
887
      if (pDnode->numOfVnodes <= 0) {
×
888
        break;
×
889
      }
890
      ++nDnodesWithVnodes;
×
891
    }
892
    int32_t dnodeId = -1;
×
893
    if (nDnodesWithVnodes == 1) {
×
894
      dnodeId = ((SDnodeObj *)TARRAY_GET_ELEM(pArray, 0))->id;
×
895
    } else if (nDnodesWithVnodes >= 2) {
×
896
      // must select the dnodes from the 1st 2 dnodes
897
      taosArrayRemoveBatch(pArray, 2, arrSize - 2, NULL);
×
898
    }
899
    for (int32_t i = 0; i < TARRAY_SIZE(pArray);) {
×
900
      SDnodeObj *pDnode = taosArrayGet(pArray, i);
×
901
      if (!isDnodeInList(dnodeList, pDnode->id)) {
×
902
        taosArrayRemove(pArray, i);
×
903
        continue;
×
904
      }
905
      ++i;
×
906
    }
907
    if (nDnodesWithVnodes == 1) {
×
908
      SDnodeObj *pDnode = taosArrayGet(pArray, 0);
×
909
      if (pDnode && (pDnode->id != dnodeId)) {  // the first dnode is not in dnodeList, remove the last element
×
910
        taosArrayRemove(pArray, taosArrayGetSize(pArray) - 1);
×
911
      }
912
    }
913
  }
914

915
  TAOS_RETURN(code);
×
916
}
917
#endif
918

919
SArray *mndBuildDnodesArray(SMnode *pMnode, int32_t exceptDnodeId, SArray *dnodeList) {
1,205,394✔
920
  SSdb   *pSdb = pMnode->pSdb;
1,205,394✔
921
  int32_t numOfDnodes = mndGetDnodeSize(pMnode);
1,205,394✔
922
  SArray *tDnodeList = NULL;
1,205,394✔
923
  SArray *pDnodeList = NULL;
1,205,394✔
924

925
  SArray *pArray = taosArrayInit(numOfDnodes, sizeof(SDnodeObj));
1,205,394✔
926
  if (pArray == NULL) {
1,205,394✔
927
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
928
    return NULL;
×
929
  }
930
  if (taosArrayGetSize(dnodeList) > 0) {
1,205,394✔
931
    tDnodeList = dnodeList;
13,977✔
932
  }
933
#ifdef TD_ENTERPRISE
934
  if (0 != mndBuildNodesCheckDualReplica(pMnode, numOfDnodes, tDnodeList, &pDnodeList)) {
1,205,394✔
935
    taosArrayDestroy(pArray);
×
936
    return NULL;
×
937
  }
938
#endif
939
  sdbTraverse(pSdb, SDB_DNODE, mndResetDnodesArrayFp, NULL, NULL, NULL);
1,205,394✔
940
  sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesArrayFp, pArray, &exceptDnodeId, pDnodeList ? pDnodeList : tDnodeList);
1,205,394✔
941

942
  mDebug("build %d dnodes array", (int32_t)taosArrayGetSize(pArray));
1,205,394✔
943
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
2,891,369✔
944
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
1,685,975✔
945
    mDebug("dnode:%d, vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes);
1,685,975✔
946
  }
947
  taosArrayDestroy(pDnodeList);
1,205,394✔
948
  return pArray;
1,205,394✔
949
}
950

951
static int32_t mndCompareDnodeId(int32_t *dnode1Id, int32_t *dnode2Id) {
×
952
  if (*dnode1Id == *dnode2Id) {
×
953
    return 0;
×
954
  }
955
  return *dnode1Id > *dnode2Id ? 1 : -1;
×
956
}
957

958
static float mndGetDnodeScore(SDnodeObj *pDnode, int32_t additionDnodes, float ratio) {
8,598,256✔
959
  float totalDnodes = pDnode->numOfVnodes + (float)pDnode->numOfOtherNodes * ratio + additionDnodes;
8,598,256✔
960
  return totalDnodes / pDnode->numOfSupportVnodes;
8,598,256✔
961
}
962

963
static int32_t mndCompareDnodeVnodes(SDnodeObj *pDnode1, SDnodeObj *pDnode2) {
2,676,572✔
964
  float d1Score = mndGetDnodeScore(pDnode1, 0, 0.9);
2,676,572✔
965
  float d2Score = mndGetDnodeScore(pDnode2, 0, 0.9);
2,676,572✔
966
  if (d1Score == d2Score) {
2,676,572✔
967
    return 0;
883,401✔
968
  }
969
  return d1Score > d2Score ? 1 : -1;
1,793,171✔
970
}
971

972
void mndSortVnodeGid(SVgObj *pVgroup) {
2,358,281✔
973
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
5,022,508✔
974
    for (int32_t j = 0; j < pVgroup->replica - 1 - i; ++j) {
3,119,458✔
975
      if (pVgroup->vnodeGid[j].dnodeId > pVgroup->vnodeGid[j + 1].dnodeId) {
455,231✔
976
        TSWAP(pVgroup->vnodeGid[j], pVgroup->vnodeGid[j + 1]);
207,459✔
977
      }
978
    }
979
  }
980
}
2,358,281✔
981

982
static int32_t mndGetAvailableDnode(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, SArray *pArray) {
2,330,035✔
983
  mDebug("start to sort %d dnodes", (int32_t)taosArrayGetSize(pArray));
2,330,035✔
984
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
2,330,035✔
985
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
5,508,688✔
986
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
3,178,653✔
987
    mDebug("dnode:%d, score:%f", pDnode->id, mndGetDnodeScore(pDnode, 0, 0.9));
3,178,653✔
988
  }
989

990
  int32_t size = taosArrayGetSize(pArray);
2,330,035✔
991
  if (size < pVgroup->replica) {
2,330,035✔
992
    mError("db:%s, vgId:%d, no enough online dnodes:%d to alloc %d replica", pVgroup->dbName, pVgroup->vgId, size,
4,450✔
993
           pVgroup->replica);
994
    TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_DNODES);
4,450✔
995
  }
996

997
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
4,899,842✔
998
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
2,574,257✔
999
    SDnodeObj *pDnode = taosArrayGet(pArray, v);
2,574,257✔
1000
    if (pDnode == NULL) {
2,574,257✔
1001
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_DNODES);
×
1002
    }
1003
    if (pDnode->numOfVnodes >= pDnode->numOfSupportVnodes) {
2,574,257✔
1004
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_VNODES);
×
1005
    }
1006

1007
    int64_t vgMem = mndGetVgroupMemory(pMnode, pDb, pVgroup);
2,574,257✔
1008
    if (pDnode->memAvail - vgMem - pDnode->memUsed <= 0) {
2,574,257✔
1009
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d, avail:%" PRId64 " used:%" PRId64,
×
1010
             pVgroup->dbName, pVgroup->vgId, vgMem, pDnode->id, pDnode->memAvail, pDnode->memUsed);
1011
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE);
×
1012
    } else {
1013
      pDnode->memUsed += vgMem;
2,574,257✔
1014
    }
1015

1016
    pVgid->dnodeId = pDnode->id;
2,574,257✔
1017
    if (pVgroup->replica == 1) {
2,574,257✔
1018
      pVgid->syncState = TAOS_SYNC_STATE_LEADER;
2,198,240✔
1019
    } else {
1020
      pVgid->syncState = TAOS_SYNC_STATE_FOLLOWER;
376,017✔
1021
    }
1022

1023
    mInfo("db:%s, vgId:%d, vn:%d is alloced, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
2,574,257✔
1024
          pVgroup->dbName, pVgroup->vgId, v, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
1025
    pDnode->numOfVnodes++;
2,574,257✔
1026
  }
1027

1028
  mndSortVnodeGid(pVgroup);
2,325,585✔
1029
  return 0;
2,325,585✔
1030
}
1031

1032
int32_t mndAllocSmaVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup) {
×
1033
  int32_t code = 0;
×
1034
  SArray *pArray = mndBuildDnodesArray(pMnode, 0, NULL);
×
1035
  if (pArray == NULL) {
×
1036
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1037
    if (terrno != 0) code = terrno;
×
1038
    TAOS_RETURN(code);
×
1039
  }
1040

1041
  pVgroup->vgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
×
1042
  pVgroup->isTsma = 1;
×
1043
  pVgroup->createdTime = taosGetTimestampMs();
×
1044
  pVgroup->updateTime = pVgroup->createdTime;
×
1045
  pVgroup->version = 1;
×
1046
  memcpy(pVgroup->dbName, pDb->name, TSDB_DB_FNAME_LEN);
×
1047
  pVgroup->dbUid = pDb->uid;
×
1048
  pVgroup->replica = 1;
×
1049
  pVgroup->keepVersion = -1;  // default: WAL keep version disabled
×
1050
  pVgroup->keepVersionTime = 0;
×
1051

1052
  if (mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray) != 0) return -1;
×
1053
  taosArrayDestroy(pArray);
×
1054

1055
  mInfo("db:%s, sma vgId:%d is alloced", pDb->name, pVgroup->vgId);
×
1056
  return 0;
×
1057
}
1058

1059
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups, SArray *dnodeList) {
1,090,488✔
1060
  int32_t code = -1;
1,090,488✔
1061
  SArray *pArray = NULL;
1,090,488✔
1062
  SVgObj *pVgroups = NULL;
1,090,488✔
1063

1064
  pVgroups = taosMemoryCalloc(pDb->cfg.numOfVgroups, sizeof(SVgObj));
1,090,488✔
1065
  if (pVgroups == NULL) {
1,090,488✔
1066
    code = terrno;
×
1067
    goto _OVER;
×
1068
  }
1069

1070
  pArray = mndBuildDnodesArray(pMnode, 0, dnodeList);
1,090,488✔
1071
  if (pArray == NULL) {
1,090,488✔
1072
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1073
    if (terrno != 0) code = terrno;
×
1074
    goto _OVER;
×
1075
  }
1076

1077
  mInfo("db:%s, total %d dnodes used to create %d vgroups (%d vnodes)", pDb->name, (int32_t)taosArrayGetSize(pArray),
1,090,488✔
1078
        pDb->cfg.numOfVgroups, pDb->cfg.numOfVgroups * pDb->cfg.replications);
1079

1080
  int32_t  allocedVgroups = 0;
1,090,488✔
1081
  int32_t  maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
1,090,488✔
1082
  uint32_t hashMin = 0;
1,090,488✔
1083
  uint32_t hashMax = UINT32_MAX;
1,090,488✔
1084
  uint32_t hashInterval = (hashMax - hashMin) / pDb->cfg.numOfVgroups;
1,090,488✔
1085

1086
  if (maxVgId < 2) maxVgId = 2;
1,090,488✔
1087

1088
  for (uint32_t v = 0; v < pDb->cfg.numOfVgroups; v++) {
3,416,073✔
1089
    SVgObj *pVgroup = &pVgroups[v];
2,330,035✔
1090
    pVgroup->vgId = maxVgId++;
2,330,035✔
1091
    pVgroup->createdTime = taosGetTimestampMs();
2,330,035✔
1092
    pVgroup->updateTime = pVgroups->createdTime;
2,330,035✔
1093
    pVgroup->version = 1;
2,330,035✔
1094
    pVgroup->hashBegin = hashMin + hashInterval * v;
2,330,035✔
1095
    if (v == pDb->cfg.numOfVgroups - 1) {
2,330,035✔
1096
      pVgroup->hashEnd = hashMax;
1,088,214✔
1097
    } else {
1098
      pVgroup->hashEnd = hashMin + hashInterval * (v + 1) - 1;
1,241,821✔
1099
    }
1100

1101
    memcpy(pVgroup->dbName, pDb->name, TSDB_DB_FNAME_LEN);
2,330,035✔
1102
    pVgroup->dbUid = pDb->uid;
2,330,035✔
1103
    pVgroup->replica = pDb->cfg.replications;
2,330,035✔
1104
    pVgroup->keepVersion = -1;  // default: WAL keep version disabled
2,330,035✔
1105
    pVgroup->keepVersionTime = 0;
2,330,035✔
1106

1107
    if ((code = mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray)) != 0) {
2,330,035✔
1108
      goto _OVER;
4,450✔
1109
    }
1110

1111
    allocedVgroups++;
2,325,585✔
1112
  }
1113

1114
  *ppVgroups = pVgroups;
1,086,038✔
1115
  code = 0;
1,086,038✔
1116

1117
  mInfo("db:%s, total %d vgroups is alloced, replica:%d", pDb->name, pDb->cfg.numOfVgroups, pDb->cfg.replications);
1,086,038✔
1118

1119
_OVER:
×
1120
  if (code != 0) taosMemoryFree(pVgroups);
1,090,488✔
1121
  taosArrayDestroy(pArray);
1,090,488✔
1122
  TAOS_RETURN(code);
1,090,488✔
1123
}
1124

1125
SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup) {
27,477,933✔
1126
  SEpSet epset = {0};
27,477,933✔
1127

1128
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
61,258,780✔
1129
    const SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
33,780,847✔
1130
    SDnodeObj       *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
33,780,847✔
1131
    if (pDnode == NULL) continue;
33,780,847✔
1132

1133
    if (pVgid->syncState == TAOS_SYNC_STATE_LEADER || pVgid->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
33,767,409✔
1134
      epset.inUse = epset.numOfEps;
27,074,332✔
1135
    }
1136

1137
    if (addEpIntoEpSet(&epset, pDnode->fqdn, pDnode->port) != 0) {
33,767,409✔
1138
      mWarn("vgId:%d, failed to add ep:%s:%d into epset", pVgroup->vgId, pDnode->fqdn, pDnode->port);
×
1139
    }
1140
    mndReleaseDnode(pMnode, pDnode);
33,767,409✔
1141
  }
1142
  epsetSort(&epset);
27,477,933✔
1143

1144
  return epset;
27,477,933✔
1145
}
1146

1147
SEpSet mndGetVgroupEpsetById(SMnode *pMnode, int32_t vgId) {
517,723✔
1148
  SEpSet epset = {0};
517,723✔
1149

1150
  SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
517,723✔
1151
  if (!pVgroup) return epset;
517,723✔
1152

1153
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
1,093,793✔
1154
    const SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
576,070✔
1155
    SDnodeObj       *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
576,070✔
1156
    if (pDnode == NULL) continue;
576,070✔
1157

1158
    if (pVgid->syncState == TAOS_SYNC_STATE_LEADER || pVgid->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
576,070✔
1159
      epset.inUse = epset.numOfEps;
487,015✔
1160
    }
1161

1162
    if (addEpIntoEpSet(&epset, pDnode->fqdn, pDnode->port) != 0) {
576,070✔
1163
      mWarn("vgId:%d, failed to add ep:%s:%d into epset", pVgroup->vgId, pDnode->fqdn, pDnode->port);
×
1164
    }
1165
    mndReleaseDnode(pMnode, pDnode);
576,070✔
1166
  }
1167

1168
  mndReleaseVgroup(pMnode, pVgroup);
517,723✔
1169
  return epset;
517,723✔
1170
}
1171

1172
static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
259,176✔
1173
  SMnode   *pMnode = pReq->info.node;
259,176✔
1174
  SSdb     *pSdb = pMnode->pSdb;
259,176✔
1175
  int32_t   numOfRows = 0;
259,176✔
1176
  SVgObj   *pVgroup = NULL;
259,176✔
1177
  SDbObj   *pVgDb = NULL;
259,176✔
1178
  int32_t   cols = 0;
259,176✔
1179
  int64_t   curMs = taosGetTimestampMs();
259,176✔
1180
  int32_t   code = 0, lino = 0;
259,176✔
1181
  SDbObj   *pDb = NULL;
259,176✔
1182
  SUserObj *pUser = NULL;
259,176✔
1183
  SDbObj   *pIterDb = NULL;
259,176✔
1184
  char      objFName[TSDB_OBJ_FNAME_LEN + 1] = {0};
259,176✔
1185
  bool      showAll = false, showIter = false;
259,176✔
1186
  int64_t   dbUid = 0;
259,176✔
1187

1188
  MND_SHOW_CHECK_OBJ_PRIVILEGE_ALL(RPC_MSG_USER(pReq), PRIV_SHOW_VGROUPS, PRIV_OBJ_DB, 0, _OVER);
259,176✔
1189

1190
  if (strlen(pShow->db) > 0) {
259,176✔
1191
    pDb = mndAcquireDb(pMnode, pShow->db);
225,146✔
1192
    if (pDb == NULL) {
225,146✔
1193
      goto _OVER;
×
1194
    }
1195
  }
1196

1197
  while (numOfRows < rows) {
1,410,627✔
1198
    pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
1,410,627✔
1199
    if (pShow->pIter == NULL) break;
1,410,627✔
1200

1201
    if (pDb != NULL && pVgroup->dbUid != pDb->uid) {
1,152,206✔
1202
      sdbRelease(pSdb, pVgroup);
367,957✔
1203
      continue;
367,957✔
1204
    }
1205

1206
    MND_SHOW_CHECK_DB_PRIVILEGE(pDb, pVgroup->dbName, pVgroup, RPC_MSG_TOKEN(pReq), MND_OPER_SHOW_VGROUPS, _OVER);
784,249✔
1207

1208
    cols = 0;
782,286✔
1209
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
782,286✔
1210
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->vgId, false, pVgroup, pShow->pIter, _OVER);
782,286✔
1211

1212
    SName name = {0};
782,286✔
1213
    char  db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
782,286✔
1214
    code = tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB);
782,286✔
1215
    if (code != 0) {
782,286✔
1216
      mError("vgId:%d, failed to set dbName, since %s", pVgroup->vgId, tstrerror(code));
×
1217
      sdbRelease(pSdb, pVgroup);
×
1218
      // sdbCancelFetch(pSdb, pShow->pIter);
1219
      goto _OVER;
×
1220
    }
1221
    (void)tNameGetDbName(&name, varDataVal(db));
782,286✔
1222
    varDataSetLen(db, strlen(varDataVal(db)));
782,286✔
1223

1224
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
782,286✔
1225
    COL_DATA_SET_VAL_GOTO((const char *)db, false, pVgroup, pShow->pIter, _OVER);
782,286✔
1226

1227
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
782,286✔
1228
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->numOfTables, false, pVgroup, pShow->pIter, _OVER);
782,286✔
1229

1230
    bool isReady = false;
782,286✔
1231
    bool isLeaderRestored = false;
782,286✔
1232
    bool hasFollowerRestored = false;
782,286✔
1233
    ESyncState leaderState = TAOS_SYNC_STATE_OFFLINE;
782,286✔
1234
    // default 3 replica, add 1 replica if move vnode
1235
    for (int32_t i = 0; i < 4; ++i) {
3,911,430✔
1236
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3,129,144✔
1237
      if (i < pVgroup->replica) {
3,129,144✔
1238
        int16_t dnodeId = (int16_t)pVgroup->vnodeGid[i].dnodeId;
1,643,700✔
1239
        COL_DATA_SET_VAL_GOTO((const char *)&dnodeId, false, pVgroup, pShow->pIter, _OVER);
1,643,700✔
1240

1241
        bool       exist = false;
1,643,700✔
1242
        bool       online = false;
1,643,700✔
1243
        SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgroup->vnodeGid[i].dnodeId);
1,643,700✔
1244
        if (pDnode != NULL) {
1,643,700✔
1245
          exist = true;
1,643,700✔
1246
          online = mndIsDnodeOnline(pDnode, curMs);
1,643,700✔
1247
          mndReleaseDnode(pMnode, pDnode);
1,643,700✔
1248
        }
1249

1250
        char buf1[20] = {0};
1,643,700✔
1251
        char role[20] = "offline";
1,643,700✔
1252
        if (!exist) {
1,643,700✔
1253
          tstrncpy(role, "dropping", sizeof(role));
×
1254
        } else if (online) {
1,643,700✔
1255
          if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEADER ||
1,626,222✔
1256
              pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
1,030,937✔
1257
            if (pVgroup->vnodeGid[i].syncRestore) {
595,285✔
1258
              isLeaderRestored = true;
521,063✔
1259
            }
1260
          } else if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_FOLLOWER) {
1,030,937✔
1261
            if (pVgroup->vnodeGid[i].syncRestore) {
863,770✔
1262
              hasFollowerRestored = true;
487,857✔
1263
            }
1264
          }
1265
          if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEADER ||
1,626,222✔
1266
              pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER)
1,030,937✔
1267
            leaderState = pVgroup->vnodeGid[i].syncState;
595,285✔
1268
          snprintf(role, sizeof(role), "%s", syncStr(pVgroup->vnodeGid[i].syncState));
1,626,222✔
1269
        }
1270
        STR_WITH_MAXSIZE_TO_VARSTR(buf1, role, pShow->pMeta->pSchemas[cols].bytes);
1,643,700✔
1271

1272
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,643,700✔
1273
        COL_DATA_SET_VAL_GOTO((const char *)buf1, false, pVgroup, pShow->pIter, _OVER);
1,643,700✔
1274

1275
        char applyStr[TSDB_SYNC_APPLY_COMMIT_LEN + 1] = {0};
1,643,700✔
1276
        char buf[TSDB_SYNC_APPLY_COMMIT_LEN + VARSTR_HEADER_SIZE + 1] = {0};
1,643,700✔
1277

1278
        if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEARNER &&
1,643,700✔
1279
            (pVgroup->vnodeGid[i].snapSeq > 0 && pVgroup->vnodeGid[i].snapSeq < SYNC_SNAPSHOT_SEQ_END)) {
41,348✔
1280
          if (pDb != NULL) {
×
UNCOV
1281
            mInfo("db:%s, learner progress:%d", pDb->name, pVgroup->vnodeGid[i].learnerProgress);
×
1282
          } else {
1283
            mInfo("db:null, learner progress:%d", pVgroup->vnodeGid[i].learnerProgress);
×
1284
          }
1285

UNCOV
1286
          snprintf(applyStr, sizeof(applyStr), "%" PRId64 "/%" PRId64 "/%" PRId64 "(snap:%d)(learner:%d)",
×
UNCOV
1287
                   pVgroup->vnodeGid[i].syncAppliedIndex, pVgroup->vnodeGid[i].syncCommitIndex,
×
UNCOV
1288
                   pVgroup->vnodeGid[i].syncTotalIndex, pVgroup->vnodeGid[i].snapSeq,
×
UNCOV
1289
                   pVgroup->vnodeGid[i].learnerProgress);
×
1290
        } else if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEARNER) {
1,643,700✔
1291
          if (pDb != NULL) {
41,348✔
1292
            mInfo("db:%s, learner progress:%d", pDb->name, pVgroup->vnodeGid[i].learnerProgress);
39,708✔
1293
          } else {
1294
            mInfo("db:null, learner progress:%d", pVgroup->vnodeGid[i].learnerProgress);
1,640✔
1295
          }
1296

1297
          snprintf(applyStr, sizeof(applyStr), "%" PRId64 "/%" PRId64 "/%" PRId64 "(learner:%d)",
165,392✔
1298
                   pVgroup->vnodeGid[i].syncAppliedIndex, pVgroup->vnodeGid[i].syncCommitIndex,
82,696✔
1299
                   pVgroup->vnodeGid[i].syncTotalIndex, pVgroup->vnodeGid[i].learnerProgress);
82,696✔
1300
        } else if (pVgroup->vnodeGid[i].snapSeq > 0 && pVgroup->vnodeGid[i].snapSeq < SYNC_SNAPSHOT_SEQ_END) {
1,602,352✔
1301
          snprintf(applyStr, sizeof(applyStr), "%" PRId64 "/%" PRId64 "(snap:%d)",
684✔
1302
                   pVgroup->vnodeGid[i].syncAppliedIndex, pVgroup->vnodeGid[i].syncCommitIndex,
456✔
1303
                   pVgroup->vnodeGid[i].snapSeq);
228✔
1304
        } else {
1305
          snprintf(applyStr, sizeof(applyStr), "%" PRId64 "/%" PRId64, pVgroup->vnodeGid[i].syncAppliedIndex,
1,602,124✔
1306
                   pVgroup->vnodeGid[i].syncCommitIndex);
1,602,124✔
1307
        }
1308

1309
        STR_WITH_MAXSIZE_TO_VARSTR(buf, applyStr, pShow->pMeta->pSchemas[cols].bytes);
1,643,700✔
1310

1311
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,643,700✔
1312
        COL_DATA_SET_VAL_GOTO((const char *)&buf, false, pVgroup, pShow->pIter, _OVER);
1,643,700✔
1313
      } else {
1314
        colDataSetNULL(pColInfo, numOfRows);
1,485,444✔
1315
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,485,444✔
1316
        colDataSetNULL(pColInfo, numOfRows);
1,485,444✔
1317
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,485,444✔
1318
        colDataSetNULL(pColInfo, numOfRows);
1,485,444✔
1319
      }
1320
    }
1321

1322
    if (pVgroup->replica >= 3) {
782,286✔
1323
      if (isLeaderRestored && hasFollowerRestored) isReady = true;
338,915✔
1324
    } else if (pVgroup->replica == 2) {
443,371✔
1325
      if (leaderState == TAOS_SYNC_STATE_LEADER) {
183,584✔
1326
        if (isLeaderRestored && hasFollowerRestored) isReady = true;
85,390✔
1327
      } else if (leaderState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
98,194✔
UNCOV
1328
        if (isLeaderRestored) isReady = true;
×
1329
      }
1330
    } else {
1331
      if (isLeaderRestored) isReady = true;
259,787✔
1332
    }
1333
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
782,286✔
1334
    COL_DATA_SET_VAL_GOTO((const char *)&isReady, false, pVgroup, pShow->pIter, _OVER);
782,286✔
1335

1336
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
782,286✔
1337
    int64_t cacheUsage = (int64_t)pVgroup->cacheUsage;
782,286✔
1338
    COL_DATA_SET_VAL_GOTO((const char *)&cacheUsage, false, pVgroup, pShow->pIter, _OVER);
782,286✔
1339

1340
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
782,286✔
1341
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->numOfCachedTables, false, pVgroup, pShow->pIter, _OVER);
782,286✔
1342

1343
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
782,286✔
1344
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->isTsma, false, pVgroup, pShow->pIter, _OVER);
782,286✔
1345

1346
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
782,286✔
1347
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->mountVgId, false, pVgroup, pShow->pIter, _OVER);
782,286✔
1348

1349
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
782,286✔
1350
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->keepVersion, false, pVgroup, pShow->pIter, _OVER);
782,286✔
1351

1352
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
782,286✔
1353
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->keepVersionTime, false, pVgroup, pShow->pIter, _OVER);
782,286✔
1354

1355
    numOfRows++;
782,286✔
1356
    sdbRelease(pSdb, pVgroup);
782,286✔
1357
  }
1358
_OVER:
259,176✔
1359
  if (pUser) mndReleaseUser(pMnode, pUser);
259,176✔
1360
  if (pDb != NULL) {
259,176✔
1361
    mndReleaseDb(pMnode, pDb);
225,146✔
1362
  }
1363
  if (code != 0) {
259,176✔
UNCOV
1364
    mError("failed to retrieve vgroup info at line %d since %s", lino, tstrerror(code));
×
UNCOV
1365
    TAOS_RETURN(code);
×
1366
  }
1367

1368
  pShow->numOfRows += numOfRows;
259,176✔
1369
  return numOfRows;
259,176✔
1370
}
1371

1372
static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter) {
755✔
1373
  SSdb *pSdb = pMnode->pSdb;
755✔
1374
  sdbCancelFetchByType(pSdb, pIter, SDB_VGROUP);
755✔
1375
}
755✔
1376

1377
static bool mndGetVnodesNumFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
9,371,223✔
1378
  SVgObj  *pVgroup = pObj;
9,371,223✔
1379
  int32_t  dnodeId = *(int32_t *)p1;
9,371,223✔
1380
  int32_t *pNumOfVnodes = (int32_t *)p2;
9,371,223✔
1381

1382
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
24,782,274✔
1383
    if (pVgroup->vnodeGid[v].dnodeId == dnodeId) {
15,411,051✔
1384
      (*pNumOfVnodes)++;
6,079,898✔
1385
    }
1386
  }
1387

1388
  return true;
9,371,223✔
1389
}
1390

1391
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) {
3,606,834✔
1392
  int32_t numOfVnodes = 0;
3,606,834✔
1393
  sdbTraverse(pMnode->pSdb, SDB_VGROUP, mndGetVnodesNumFp, &dnodeId, &numOfVnodes, NULL);
3,606,834✔
1394
  return numOfVnodes;
3,606,834✔
1395
}
1396

1397
int64_t mndGetVgroupMemory(SMnode *pMnode, SDbObj *pDbInput, SVgObj *pVgroup) {
7,639,708✔
1398
  SDbObj *pDb = pDbInput;
7,639,708✔
1399
  if (pDbInput == NULL) {
7,639,708✔
1400
    pDb = mndAcquireDb(pMnode, pVgroup->dbName);
4,439,959✔
1401
  }
1402

1403
  int64_t vgroupMemroy = 0;
7,639,708✔
1404
  if (pDb != NULL) {
7,639,708✔
1405
    int64_t buffer = (int64_t)pDb->cfg.buffer * 1024 * 1024;
7,639,708✔
1406
    int64_t cache = (int64_t)pDb->cfg.pages * pDb->cfg.pageSize * 1024;
7,639,708✔
1407
    vgroupMemroy = buffer + cache;
7,639,708✔
1408
    int64_t cacheLast = (int64_t)pDb->cfg.cacheLastSize * 1024 * 1024;
7,639,708✔
1409
    if (pDb->cfg.cacheLast > 0) {
7,639,708✔
1410
      vgroupMemroy += cacheLast;
977,940✔
1411
    }
1412
    mDebug("db:%s, vgroup:%d, buffer:%" PRId64 " cache:%" PRId64 " cacheLast:%" PRId64, pDb->name, pVgroup->vgId,
7,639,708✔
1413
           buffer, cache, cacheLast);
1414
  }
1415

1416
  if (pDbInput == NULL) {
7,639,708✔
1417
    mndReleaseDb(pMnode, pDb);
4,439,959✔
1418
  }
1419
  return vgroupMemroy;
7,639,708✔
1420
}
1421

1422
static bool mndGetVnodeMemroyFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
6,122,370✔
1423
  SVgObj  *pVgroup = pObj;
6,122,370✔
1424
  int32_t  dnodeId = *(int32_t *)p1;
6,122,370✔
1425
  int64_t *pVnodeMemory = (int64_t *)p2;
6,122,370✔
1426

1427
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
14,270,058✔
1428
    if (pVgroup->vnodeGid[v].dnodeId == dnodeId) {
8,147,688✔
1429
      *pVnodeMemory += mndGetVgroupMemory(pMnode, NULL, pVgroup);
4,293,257✔
1430
    }
1431
  }
1432

1433
  return true;
6,122,370✔
1434
}
1435

1436
int64_t mndGetVnodesMemory(SMnode *pMnode, int32_t dnodeId) {
1,731,234✔
1437
  int64_t vnodeMemory = 0;
1,731,234✔
1438
  sdbTraverse(pMnode->pSdb, SDB_VGROUP, mndGetVnodeMemroyFp, &dnodeId, &vnodeMemory, NULL);
1,731,234✔
1439
  return vnodeMemory;
1,731,234✔
1440
}
1441

1442
void calculateRstoreFinishTime(double rate, int64_t applyCount, char *restoreStr, size_t restoreStrSize) {
276✔
1443
  if (rate == 0) {
276✔
1444
    snprintf(restoreStr, restoreStrSize, "0:0:0");
276✔
1445
    return;
276✔
1446
  }
1447

UNCOV
1448
  int64_t costTime = applyCount / rate;
×
UNCOV
1449
  int64_t totalSeconds = costTime / 1000;
×
UNCOV
1450
  int64_t hours = totalSeconds / 3600;
×
UNCOV
1451
  totalSeconds %= 3600;
×
UNCOV
1452
  int64_t minutes = totalSeconds / 60;
×
UNCOV
1453
  int64_t seconds = totalSeconds % 60;
×
UNCOV
1454
  snprintf(restoreStr, restoreStrSize, "%" PRId64 ":%" PRId64 ":%" PRId64, hours, minutes, seconds);
×
1455
}
1456

1457
static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
12,804✔
1458
  SMnode   *pMnode = pReq->info.node;
12,804✔
1459
  SSdb     *pSdb = pMnode->pSdb;
12,804✔
1460
  int32_t   numOfRows = 0;
12,804✔
1461
  SVgObj   *pVgroup = NULL;
12,804✔
1462
  SDbObj   *pVgDb = NULL;
12,804✔
1463
  int32_t   cols = 0;
12,804✔
1464
  int64_t   curMs = taosGetTimestampMs();
12,804✔
1465
  int32_t   code = 0, lino = 0;
12,804✔
1466
  SUserObj *pUser = NULL;
12,804✔
1467
  SDbObj   *pDb = NULL, *pIterDb = NULL;
12,804✔
1468
  char      objFName[TSDB_OBJ_FNAME_LEN + 1] = {0};
12,804✔
1469
  bool      showAll = false, showIter = false;
12,804✔
1470
  int64_t   dbUid = 0;
12,804✔
1471

1472
  MND_SHOW_CHECK_OBJ_PRIVILEGE_ALL(RPC_MSG_USER(pReq), PRIV_SHOW_VNODES, PRIV_OBJ_DB, 0, _OVER);
12,804✔
1473

1474
  while (numOfRows < rows - TSDB_MAX_REPLICA) {
37,969✔
1475
    pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
37,969✔
1476
    if (pShow->pIter == NULL) break;
37,969✔
1477

1478
    MND_SHOW_CHECK_DB_PRIVILEGE(pDb, pVgroup->dbName, pVgroup, RPC_MSG_TOKEN(pReq), MND_OPER_SHOW_VNODES, _OVER);
25,165✔
1479

1480
    for (int32_t i = 0; i < pVgroup->replica && numOfRows < rows; ++i) {
73,134✔
1481
      SVnodeGid       *pGid = &pVgroup->vnodeGid[i];
47,969✔
1482
      SColumnInfoData *pColInfo = NULL;
47,969✔
1483
      cols = 0;
47,969✔
1484

1485
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
47,969✔
1486
      COL_DATA_SET_VAL_GOTO((const char *)&pGid->dnodeId, false, pVgroup, pShow->pIter, _OVER);
47,969✔
1487
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
47,969✔
1488
      COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->vgId, false, pVgroup, pShow->pIter, _OVER);
47,969✔
1489

1490
      // db_name
1491
      const char *dbname = mndGetDbStr(pVgroup->dbName);
47,969✔
1492
      char        b1[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
47,969✔
1493
      if (dbname != NULL) {
47,969✔
1494
        STR_WITH_MAXSIZE_TO_VARSTR(b1, dbname, TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE);
47,969✔
1495
      } else {
1496
        STR_WITH_MAXSIZE_TO_VARSTR(b1, "NULL", TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE);
×
1497
      }
1498
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
47,969✔
1499
      COL_DATA_SET_VAL_GOTO((const char *)b1, false, pVgroup, pShow->pIter, _OVER);
47,969✔
1500

1501
      // dnode is online?
1502
      SDnodeObj *pDnode = mndAcquireDnode(pMnode, pGid->dnodeId);
47,969✔
1503
      if (pDnode == NULL) {
47,969✔
UNCOV
1504
        mError("failed to acquire dnode. dnodeId:%d", pGid->dnodeId);
×
UNCOV
1505
        break;
×
1506
      }
1507
      bool isDnodeOnline = mndIsDnodeOnline(pDnode, curMs);
47,969✔
1508
      sdbRelease(pSdb, pDnode);
47,969✔
1509

1510
      char       buf[20] = {0};
47,969✔
1511
      ESyncState syncState = (isDnodeOnline) ? pGid->syncState : TAOS_SYNC_STATE_OFFLINE;
47,969✔
1512
      STR_TO_VARSTR(buf, syncStr(syncState));
47,969✔
1513
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
47,969✔
1514
      COL_DATA_SET_VAL_GOTO((const char *)buf, false, pVgroup, pShow->pIter, _OVER);
47,969✔
1515

1516
      int64_t roleTimeMs = (isDnodeOnline) ? pGid->roleTimeMs : 0;
47,969✔
1517
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
47,969✔
1518
      COL_DATA_SET_VAL_GOTO((const char *)&roleTimeMs, false, pVgroup, pShow->pIter, _OVER);
47,969✔
1519

1520
      int64_t startTimeMs = (isDnodeOnline) ? pGid->startTimeMs : 0;
47,969✔
1521
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
47,969✔
1522
      COL_DATA_SET_VAL_GOTO((const char *)&startTimeMs, false, pVgroup, pShow->pIter, _OVER);
47,969✔
1523

1524
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
47,969✔
1525
      COL_DATA_SET_VAL_GOTO((const char *)&pGid->syncRestore, false, pVgroup, pShow->pIter, _OVER);
47,969✔
1526

1527
      int64_t unappliedCount = pGid->syncCommitIndex - pGid->syncAppliedIndex;
47,969✔
1528
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
47,969✔
1529
      char restoreStr[20] = {0};
47,969✔
1530
      if (unappliedCount > 0) {
47,969✔
1531
        calculateRstoreFinishTime(pGid->appliedRate, unappliedCount, restoreStr, sizeof(restoreStr));
276✔
1532
      }
1533
      STR_TO_VARSTR(buf, restoreStr);
47,969✔
1534
      COL_DATA_SET_VAL_GOTO((const char *)&buf, false, pVgroup, pShow->pIter, _OVER);
47,969✔
1535

1536
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
47,969✔
1537
      COL_DATA_SET_VAL_GOTO((const char *)&unappliedCount, false, pVgroup, pShow->pIter, _OVER);
47,969✔
1538

1539
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
47,969✔
1540
      COL_DATA_SET_VAL_GOTO((const char *)&pGid->bufferSegmentUsed, false, pVgroup, pShow->pIter, _OVER);
47,969✔
1541

1542
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
47,969✔
1543
      COL_DATA_SET_VAL_GOTO((const char *)&pGid->bufferSegmentSize, false, pVgroup, pShow->pIter, _OVER);
47,969✔
1544

1545
      numOfRows++;
47,969✔
1546
    }
1547
    sdbRelease(pSdb, pVgroup);
25,165✔
1548
  }
1549
_OVER:
12,804✔
1550
  if (pUser) mndReleaseUser(pMnode, pUser);
12,804✔
1551
  if (pDb) mndReleaseDb(pMnode, pDb);
12,804✔
1552
  if (code != 0) {
12,804✔
1553
    mError("failed to retrieve vnode info at line %d since %s", lino, tstrerror(code));
×
1554
    return code;
×
1555
  }
1556
  pShow->numOfRows += numOfRows;
12,804✔
1557
  return numOfRows;
12,804✔
1558
}
1559

UNCOV
1560
static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter) {
×
UNCOV
1561
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
1562
  sdbCancelFetchByType(pSdb, pIter, SDB_VGROUP);
×
UNCOV
1563
}
×
1564

1565
static int32_t mndAddVnodeToVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray) {
83,039✔
1566
  int32_t code = 0;
83,039✔
1567
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
83,039✔
1568
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
331,110✔
1569
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
248,071✔
1570
    mInfo("trans:%d, dnode:%d, equivalent vnodes:%d others:%d", pTrans->id, pDnode->id, pDnode->numOfVnodes,
248,071✔
1571
          pDnode->numOfOtherNodes);
1572
  }
1573

1574
  SVnodeGid *pVgid = &pVgroup->vnodeGid[pVgroup->replica];
83,039✔
1575
  for (int32_t d = 0; d < taosArrayGetSize(pArray); ++d) {
104,735✔
1576
    SDnodeObj *pDnode = taosArrayGet(pArray, d);
101,356✔
1577

1578
    bool used = false;
101,356✔
1579
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
236,590✔
1580
      if (pDnode->id == pVgroup->vnodeGid[vn].dnodeId) {
156,930✔
1581
        used = true;
21,696✔
1582
        break;
21,696✔
1583
      }
1584
    }
1585
    if (used) continue;
101,356✔
1586

1587
    if (pDnode == NULL) {
79,660✔
1588
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_DNODES);
×
1589
    }
1590
    if (pDnode->numOfVnodes >= pDnode->numOfSupportVnodes) {
79,660✔
UNCOV
1591
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_VNODES);
×
1592
    }
1593

1594
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
79,660✔
1595
    if (pDnode->memAvail - vgMem - pDnode->memUsed <= 0) {
79,660✔
UNCOV
1596
      mError("trans:%d, db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
1597
             pTrans->id, pVgroup->dbName, pVgroup->vgId, vgMem, pDnode->id, pDnode->memAvail, pDnode->memUsed);
UNCOV
1598
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE);
×
1599
    } else {
1600
      pDnode->memUsed += vgMem;
79,660✔
1601
    }
1602

1603
    pVgid->dnodeId = pDnode->id;
79,660✔
1604
    pVgid->syncState = TAOS_SYNC_STATE_OFFLINE;
79,660✔
1605
    mInfo("trans:%id, db:%s, vgId:%d, vn:%d is added, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
79,660✔
1606
          pTrans->id, pVgroup->dbName, pVgroup->vgId, pVgroup->replica, vgMem, pVgid->dnodeId, pDnode->memAvail,
1607
          pDnode->memUsed);
1608

1609
    pVgroup->replica++;
79,660✔
1610
    pDnode->numOfVnodes++;
79,660✔
1611

1612
    SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
79,660✔
1613
    if (pVgRaw == NULL) {
79,660✔
UNCOV
1614
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
1615
      if (terrno != 0) code = terrno;
×
1616
      TAOS_RETURN(code);
×
1617
    }
1618
    if ((code = mndTransAppendGroupRedolog(pTrans, pVgRaw, pVgroup->vgId)) != 0) {
79,660✔
UNCOV
1619
      sdbFreeRaw(pVgRaw);
×
UNCOV
1620
      TAOS_RETURN(code);
×
1621
    }
1622
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
79,660✔
1623
    if (code != 0) {
79,660✔
UNCOV
1624
      mError("trans:%d, vgId:%d, failed to set raw status since %s at line:%d", pTrans->id, pVgroup->vgId,
×
1625
             tstrerror(code), __LINE__);
1626
    }
1627
    TAOS_RETURN(code);
79,660✔
1628
  }
1629

1630
  code = TSDB_CODE_MND_NO_ENOUGH_DNODES;
3,379✔
1631
  mError("trans:%d, db:%s, failed to add vnode to vgId:%d since %s", pTrans->id, pVgroup->dbName, pVgroup->vgId,
3,379✔
1632
         tstrerror(code));
1633
  TAOS_RETURN(code);
3,379✔
1634
}
1635

1636
static int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray,
16,776✔
1637
                                        SVnodeGid *pDelVgid) {
1638
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
16,776✔
1639
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
64,718✔
1640
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
47,942✔
1641
    mInfo("trans:%d, dnode:%d, equivalent vnodes:%d others:%d", pTrans->id, pDnode->id, pDnode->numOfVnodes,
47,942✔
1642
          pDnode->numOfOtherNodes);
1643
  }
1644

1645
  int32_t code = -1;
16,776✔
1646
  for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) {
19,376✔
1647
    SDnodeObj *pDnode = taosArrayGet(pArray, d);
19,101✔
1648

1649
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
38,232✔
1650
      SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
35,632✔
1651
      if (pVgid->dnodeId == pDnode->id) {
35,632✔
1652
        int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
16,501✔
1653
        pDnode->memUsed -= vgMem;
16,501✔
1654
        mInfo("trans:%d, db:%s, vgId:%d, vn:%d is removed, memory:%" PRId64 ", dnode:%d avail:%" PRId64
16,501✔
1655
              " used:%" PRId64,
1656
              pTrans->id, pVgroup->dbName, pVgroup->vgId, vn, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
1657
        pDnode->numOfVnodes--;
16,501✔
1658
        pVgroup->replica--;
16,501✔
1659
        *pDelVgid = *pVgid;
16,501✔
1660
        *pVgid = pVgroup->vnodeGid[pVgroup->replica];
16,501✔
1661
        memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
16,501✔
1662
        code = 0;
16,501✔
1663
        goto _OVER;
16,501✔
1664
      }
1665
    }
1666
  }
1667

1668
_OVER:
275✔
1669
  if (code != 0) {
16,776✔
1670
    code = TSDB_CODE_APP_ERROR;
275✔
1671
    mError("trans:%d, db:%s, failed to remove vnode from vgId:%d since %s", pTrans->id, pVgroup->dbName, pVgroup->vgId,
275✔
1672
           tstrerror(code));
1673
    TAOS_RETURN(code);
275✔
1674
  }
1675

1676
  for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
46,123✔
1677
    SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
29,622✔
1678
    mInfo("trans:%d, db:%s, vgId:%d, vn:%d dnode:%d is reserved", pTrans->id, pVgroup->dbName, pVgroup->vgId, vn,
29,622✔
1679
          pVgid->dnodeId);
1680
  }
1681

1682
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
16,501✔
1683
  if (pVgRaw == NULL) {
16,501✔
UNCOV
1684
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
1685
    if (terrno != 0) code = terrno;
×
1686
    TAOS_RETURN(code);
×
1687
  }
1688
  if (mndTransAppendGroupRedolog(pTrans, pVgRaw, pVgroup->vgId) != 0) {
16,501✔
UNCOV
1689
    sdbFreeRaw(pVgRaw);
×
UNCOV
1690
    TAOS_RETURN(code);
×
1691
  }
1692
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
16,501✔
1693
  if (code != 0) {
16,501✔
UNCOV
1694
    mError("trans:%d, vgId:%d, failed to set raw status since %s at line:%d", pTrans->id, pVgroup->vgId,
×
1695
           tstrerror(code), __LINE__);
1696
  }
1697

1698
  TAOS_RETURN(code);
16,501✔
1699
}
1700

1701
static int32_t mndRemoveVnodeFromVgroupWithoutSave(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray,
×
1702
                                                   SVnodeGid *pDelVgid) {
1703
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
×
UNCOV
1704
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
×
1705
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
×
1706
    mInfo("dnode:%d, equivalent vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes);
×
1707
  }
1708

1709
  int32_t code = -1;
×
1710
  for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) {
×
UNCOV
1711
    SDnodeObj *pDnode = taosArrayGet(pArray, d);
×
1712

1713
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
×
1714
      SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
×
1715
      if (pVgid->dnodeId == pDnode->id) {
×
1716
        int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
×
1717
        pDnode->memUsed -= vgMem;
×
1718
        mInfo("db:%s, vgId:%d, vn:%d is removed, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
×
1719
              pVgroup->dbName, pVgroup->vgId, vn, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
UNCOV
1720
        pDnode->numOfVnodes--;
×
UNCOV
1721
        pVgroup->replica--;
×
UNCOV
1722
        *pDelVgid = *pVgid;
×
1723
        *pVgid = pVgroup->vnodeGid[pVgroup->replica];
×
1724
        memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
×
1725
        code = 0;
×
1726
        goto _OVER;
×
1727
      }
1728
    }
1729
  }
1730

1731
_OVER:
×
1732
  if (code != 0) {
×
UNCOV
1733
    code = TSDB_CODE_APP_ERROR;
×
UNCOV
1734
    mError("db:%s, failed to remove vnode from vgId:%d since %s", pVgroup->dbName, pVgroup->vgId, tstrerror(code));
×
1735
    TAOS_RETURN(code);
×
1736
  }
1737

UNCOV
1738
  for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
×
UNCOV
1739
    SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
×
UNCOV
1740
    mInfo("db:%s, vgId:%d, vn:%d dnode:%d is reserved", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId);
×
1741
  }
1742

UNCOV
1743
  TAOS_RETURN(code);
×
1744
}
1745

1746
int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid) {
2,718,906✔
1747
  int32_t      code = 0;
2,718,906✔
1748
  STransAction action = {0};
2,718,906✔
1749

1750
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
2,718,906✔
1751
  if (pDnode == NULL) return -1;
2,718,906✔
1752
  action.epSet = mndGetDnodeEpset(pDnode);
2,718,906✔
1753
  mndReleaseDnode(pMnode, pDnode);
2,718,906✔
1754

1755
  int32_t contLen = 0;
2,718,906✔
1756
  void   *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
2,718,906✔
1757
  if (pReq == NULL) return -1;
2,718,906✔
1758

1759
  action.pCont = pReq;
2,718,906✔
1760
  action.contLen = contLen;
2,718,906✔
1761
  action.msgType = TDMT_DND_CREATE_VNODE;
2,718,906✔
1762
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
2,718,906✔
1763
  action.groupId = pVgroup->vgId;
2,718,906✔
1764

1765
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
2,718,906✔
UNCOV
1766
    taosMemoryFree(pReq);
×
UNCOV
1767
    TAOS_RETURN(code);
×
1768
  }
1769

1770
  TAOS_RETURN(code);
2,718,906✔
1771
}
1772

1773
int32_t mndRestoreAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
2,065✔
1774
                                       SDnodeObj *pDnode) {
1775
  int32_t      code = 0;
2,065✔
1776
  STransAction action = {0};
2,065✔
1777

1778
  action.epSet = mndGetDnodeEpset(pDnode);
2,065✔
1779

1780
  int32_t contLen = 0;
2,065✔
1781
  void   *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
2,065✔
1782
  if (pReq == NULL) {
2,065✔
UNCOV
1783
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
1784
    if (terrno != 0) code = terrno;
×
UNCOV
1785
    TAOS_RETURN(code);
×
1786
  }
1787

1788
  action.pCont = pReq;
2,065✔
1789
  action.contLen = contLen;
2,065✔
1790
  action.msgType = TDMT_DND_CREATE_VNODE;
2,065✔
1791
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
2,065✔
1792
  action.groupId = pVgroup->vgId;
2,065✔
1793

1794
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
2,065✔
UNCOV
1795
    taosMemoryFree(pReq);
×
UNCOV
1796
    TAOS_RETURN(code);
×
1797
  }
1798

1799
  TAOS_RETURN(code);
2,065✔
1800
}
1801

1802
int32_t mndAddAlterVnodeConfirmAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
282,754✔
1803
  int32_t      code = 0;
282,754✔
1804
  STransAction action = {0};
282,754✔
1805
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
282,754✔
1806

1807
  mInfo("trans:%d, vgId:%d, build alter vnode confirm req", pTrans->id, pVgroup->vgId);
282,754✔
1808
  int32_t   contLen = sizeof(SMsgHead);
282,754✔
1809
  SMsgHead *pHead = taosMemoryMalloc(contLen);
282,754✔
1810
  if (pHead == NULL) {
282,754✔
UNCOV
1811
    TAOS_RETURN(terrno);
×
1812
  }
1813

1814
  pHead->contLen = htonl(contLen);
282,754✔
1815
  pHead->vgId = htonl(pVgroup->vgId);
282,754✔
1816

1817
  action.pCont = pHead;
282,754✔
1818
  action.contLen = contLen;
282,754✔
1819
  action.msgType = TDMT_VND_ALTER_CONFIRM;
282,754✔
1820
  // incorrect redirect result will cause this erro
1821
  action.retryCode = TSDB_CODE_VND_INVALID_VGROUP_ID;
282,754✔
1822
  action.groupId = pVgroup->vgId;
282,754✔
1823

1824
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
282,754✔
UNCOV
1825
    taosMemoryFree(pHead);
×
1826
    TAOS_RETURN(code);
×
1827
  }
1828

1829
  TAOS_RETURN(code);
282,754✔
1830
}
1831

1832
int32_t mndAddChangeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pOldVgroup, SVgObj *pNewVgroup,
×
1833
                                 int32_t dnodeId) {
1834
  int32_t      code = 0;
×
1835
  STransAction action = {0};
×
UNCOV
1836
  action.epSet = mndGetVgroupEpset(pMnode, pNewVgroup);
×
1837

1838
  int32_t contLen = 0;
×
UNCOV
1839
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pNewVgroup, dnodeId, &contLen);
×
1840
  if (pReq == NULL) {
×
1841
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1842
    if (terrno != 0) code = terrno;
×
1843
    TAOS_RETURN(code);
×
1844
  }
1845

1846
  int32_t totallen = contLen + sizeof(SMsgHead);
×
1847

UNCOV
1848
  SMsgHead *pHead = taosMemoryMalloc(totallen);
×
1849
  if (pHead == NULL) {
×
1850
    taosMemoryFree(pReq);
×
UNCOV
1851
    TAOS_RETURN(terrno);
×
1852
  }
1853

1854
  pHead->contLen = htonl(totallen);
×
UNCOV
1855
  pHead->vgId = htonl(pNewVgroup->vgId);
×
1856

1857
  memcpy((void *)(pHead + 1), pReq, contLen);
×
1858
  taosMemoryFree(pReq);
×
1859

UNCOV
1860
  action.pCont = pHead;
×
1861
  action.contLen = totallen;
×
UNCOV
1862
  action.msgType = TDMT_SYNC_CONFIG_CHANGE;
×
1863

UNCOV
1864
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
UNCOV
1865
    taosMemoryFree(pHead);
×
UNCOV
1866
    TAOS_RETURN(code);
×
1867
  }
1868

UNCOV
1869
  TAOS_RETURN(code);
×
1870
}
1871

1872
static int32_t mndAddAlterVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, int32_t srcVgId, SVgObj *pVgroup) {
32,230✔
1873
  int32_t      code = 0;
32,230✔
1874
  STransAction action = {0};
32,230✔
1875
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
32,230✔
1876

1877
  int32_t contLen = 0;
32,230✔
1878
  void   *pReq = mndBuildAlterVnodeHashRangeReq(pMnode, srcVgId, pVgroup, &contLen);
32,230✔
1879
  if (pReq == NULL) {
32,230✔
UNCOV
1880
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
1881
    if (terrno != 0) code = terrno;
×
UNCOV
1882
    TAOS_RETURN(code);
×
1883
  }
1884

1885
  action.pCont = pReq;
32,230✔
1886
  action.contLen = contLen;
32,230✔
1887
  action.msgType = TDMT_VND_ALTER_HASHRANGE;
32,230✔
1888
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
32,230✔
1889

1890
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
32,230✔
UNCOV
1891
    taosMemoryFree(pReq);
×
UNCOV
1892
    TAOS_RETURN(code);
×
1893
  }
1894

1895
  mInfo("trans:%d, add alter vnode hash range action for from vgId:%d to vgId:%d", pTrans->id, srcVgId, pVgroup->vgId);
32,230✔
1896
  TAOS_RETURN(code);
32,230✔
1897
}
1898

1899
int32_t mndAddAlterVnodeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
197,350✔
1900
  int32_t      code = 0;
197,350✔
1901
  STransAction action = {0};
197,350✔
1902
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
197,350✔
1903

1904
  int32_t contLen = 0;
197,350✔
1905
  void   *pReq = mndBuildAlterVnodeConfigReq(pMnode, pDb, pVgroup, &contLen);
197,350✔
1906
  if (pReq == NULL) {
197,350✔
UNCOV
1907
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
1908
    if (terrno != 0) code = terrno;
×
UNCOV
1909
    TAOS_RETURN(code);
×
1910
  }
1911

1912
  action.pCont = pReq;
197,350✔
1913
  action.contLen = contLen;
197,350✔
1914
  action.msgType = TDMT_VND_ALTER_CONFIG;
197,350✔
1915
  action.groupId = pVgroup->vgId;
197,350✔
1916

1917
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
197,350✔
UNCOV
1918
    taosMemoryFree(pReq);
×
UNCOV
1919
    TAOS_RETURN(code);
×
1920
  }
1921

1922
  TAOS_RETURN(code);
197,350✔
1923
}
1924

1925
int32_t mndAddNewVgPrepareAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVg) {
2,358,759✔
1926
  int32_t  code = 0;
2,358,759✔
1927
  SSdbRaw *pRaw = mndVgroupActionEncode(pVg);
2,358,759✔
1928
  if (pRaw == NULL) {
2,358,759✔
UNCOV
1929
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
1930
    if (terrno != 0) code = terrno;
×
1931
    goto _err;
×
1932
  }
1933

1934
  TAOS_CHECK_GOTO(mndTransAppendPrepareLog(pTrans, pRaw), NULL, _err);
2,358,759✔
1935
  if (sdbSetRawStatus(pRaw, SDB_STATUS_CREATING) != 0) {
2,358,759✔
UNCOV
1936
    mError("vgId:%d, failed to set raw status at line:%d", pVg->vgId, __LINE__);
×
1937
  }
1938
  if (code != 0) {
2,358,759✔
1939
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVg->vgId, tstrerror(code), __LINE__);
×
UNCOV
1940
    TAOS_RETURN(code);
×
1941
  }
1942
  pRaw = NULL;
2,358,759✔
1943
  TAOS_RETURN(code);
2,358,759✔
1944

UNCOV
1945
_err:
×
1946
  sdbFreeRaw(pRaw);
×
1947
  TAOS_RETURN(code);
×
1948
}
1949

1950
int32_t mndAddAlterVnodeReplicaAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
690,865✔
1951
  int32_t    code = 0;
690,865✔
1952
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
690,865✔
1953
  if (pDnode == NULL) {
690,865✔
UNCOV
1954
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
1955
    if (terrno != 0) code = terrno;
×
UNCOV
1956
    TAOS_RETURN(code);
×
1957
  }
1958

1959
  STransAction action = {0};
690,865✔
1960
  action.epSet = mndGetDnodeEpset(pDnode);
690,865✔
1961
  mndReleaseDnode(pMnode, pDnode);
690,865✔
1962

1963
  int32_t contLen = 0;
690,865✔
1964
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
690,865✔
1965
  if (pReq == NULL) {
690,865✔
UNCOV
1966
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
1967
    if (terrno != 0) code = terrno;
×
UNCOV
1968
    TAOS_RETURN(code);
×
1969
  }
1970

1971
  action.pCont = pReq;
690,865✔
1972
  action.contLen = contLen;
690,865✔
1973
  action.msgType = TDMT_VND_ALTER_REPLICA;
690,865✔
1974
  action.groupId = pVgroup->vgId;
690,865✔
1975

1976
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
690,865✔
1977
    taosMemoryFree(pReq);
×
1978
    TAOS_RETURN(code);
×
1979
  }
1980

1981
  TAOS_RETURN(code);
690,865✔
1982
}
1983

UNCOV
1984
int32_t mndAddCheckLearnerCatchupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
×
1985
  int32_t    code = 0;
×
1986
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
×
1987
  if (pDnode == NULL) {
×
UNCOV
1988
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1989
    if (terrno != 0) code = terrno;
×
1990
    TAOS_RETURN(code);
×
1991
  }
1992

1993
  STransAction action = {0};
×
1994
  action.epSet = mndGetDnodeEpset(pDnode);
×
UNCOV
1995
  mndReleaseDnode(pMnode, pDnode);
×
1996

1997
  int32_t contLen = 0;
×
1998
  void   *pReq = mndBuildCheckLearnCatchupReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
×
1999
  if (pReq == NULL) {
×
2000
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2001
    if (terrno != 0) code = terrno;
×
UNCOV
2002
    TAOS_RETURN(code);
×
2003
  }
2004

2005
  action.pCont = pReq;
×
UNCOV
2006
  action.contLen = contLen;
×
UNCOV
2007
  action.msgType = TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP;
×
2008
  action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
UNCOV
2009
  action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
×
2010

UNCOV
2011
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
UNCOV
2012
    taosMemoryFree(pReq);
×
UNCOV
2013
    TAOS_RETURN(code);
×
2014
  }
2015

2016
  TAOS_RETURN(code);
×
2017
}
2018

2019
int32_t mndAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
102,697✔
2020
  int32_t    code = 0;
102,697✔
2021
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
102,697✔
2022
  if (pDnode == NULL) {
102,697✔
UNCOV
2023
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2024
    if (terrno != 0) code = terrno;
×
UNCOV
2025
    TAOS_RETURN(code);
×
2026
  }
2027

2028
  STransAction action = {0};
102,697✔
2029
  action.epSet = mndGetDnodeEpset(pDnode);
102,697✔
2030
  mndReleaseDnode(pMnode, pDnode);
102,697✔
2031

2032
  int32_t contLen = 0;
102,697✔
2033
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
102,697✔
2034
  if (pReq == NULL) {
102,697✔
UNCOV
2035
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2036
    if (terrno != 0) code = terrno;
×
UNCOV
2037
    TAOS_RETURN(code);
×
2038
  }
2039

2040
  action.pCont = pReq;
102,697✔
2041
  action.contLen = contLen;
102,697✔
2042
  action.msgType = TDMT_DND_ALTER_VNODE_TYPE;
102,697✔
2043
  action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
102,697✔
2044
  action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
102,697✔
2045
  action.groupId = pVgroup->vgId;
102,697✔
2046

2047
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
102,697✔
UNCOV
2048
    taosMemoryFree(pReq);
×
UNCOV
2049
    TAOS_RETURN(code);
×
2050
  }
2051

2052
  TAOS_RETURN(code);
102,697✔
2053
}
2054

2055
int32_t mndRestoreAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
2,065✔
2056
                                          SDnodeObj *pDnode) {
2057
  int32_t      code = 0;
2,065✔
2058
  STransAction action = {0};
2,065✔
2059
  action.epSet = mndGetDnodeEpset(pDnode);
2,065✔
2060

2061
  int32_t contLen = 0;
2,065✔
2062
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, pDnode->id, &contLen);
2,065✔
2063
  if (pReq == NULL) {
2,065✔
UNCOV
2064
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2065
    if (terrno != 0) code = terrno;
×
UNCOV
2066
    TAOS_RETURN(code);
×
2067
  }
2068

2069
  action.pCont = pReq;
2,065✔
2070
  action.contLen = contLen;
2,065✔
2071
  action.msgType = TDMT_DND_ALTER_VNODE_TYPE;
2,065✔
2072
  action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
2,065✔
2073
  action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
2,065✔
2074
  action.groupId = pVgroup->vgId;
2,065✔
2075

2076
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
2,065✔
UNCOV
2077
    taosMemoryFree(pReq);
×
UNCOV
2078
    TAOS_RETURN(code);
×
2079
  }
2080

2081
  TAOS_RETURN(code);
2,065✔
2082
}
2083

2084
static int32_t mndAddDisableVnodeWriteAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
32,230✔
2085
                                             int32_t dnodeId) {
2086
  int32_t    code = 0;
32,230✔
2087
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
32,230✔
2088
  if (pDnode == NULL) {
32,230✔
UNCOV
2089
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2090
    if (terrno != 0) code = terrno;
×
UNCOV
2091
    TAOS_RETURN(code);
×
2092
  }
2093

2094
  STransAction action = {0};
32,230✔
2095
  action.epSet = mndGetDnodeEpset(pDnode);
32,230✔
2096
  mndReleaseDnode(pMnode, pDnode);
32,230✔
2097

2098
  int32_t contLen = 0;
32,230✔
2099
  void   *pReq = mndBuildDisableVnodeWriteReq(pMnode, pDb, pVgroup->vgId, &contLen);
32,230✔
2100
  if (pReq == NULL) {
32,230✔
UNCOV
2101
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2102
    if (terrno != 0) code = terrno;
×
2103
    TAOS_RETURN(code);
×
2104
  }
2105

2106
  action.pCont = pReq;
32,230✔
2107
  action.contLen = contLen;
32,230✔
2108
  action.msgType = TDMT_VND_DISABLE_WRITE;
32,230✔
2109

2110
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
32,230✔
UNCOV
2111
    taosMemoryFree(pReq);
×
UNCOV
2112
    TAOS_RETURN(code);
×
2113
  }
2114

2115
  TAOS_RETURN(code);
32,230✔
2116
}
2117

2118
int32_t mndAddDropVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid,
4,017,672✔
2119
                              bool isRedo) {
2120
  int32_t      code = 0;
4,017,672✔
2121
  STransAction action = {0};
4,017,672✔
2122

2123
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
4,017,672✔
2124
  if (pDnode == NULL) {
4,017,672✔
UNCOV
2125
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2126
    if (terrno != 0) code = terrno;
×
2127
    TAOS_RETURN(code);
×
2128
  }
2129
  action.epSet = mndGetDnodeEpset(pDnode);
4,017,672✔
2130
  mndReleaseDnode(pMnode, pDnode);
4,017,672✔
2131

2132
  int32_t contLen = 0;
4,017,672✔
2133
  void   *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
4,017,672✔
2134
  if (pReq == NULL) {
4,017,672✔
UNCOV
2135
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2136
    if (terrno != 0) code = terrno;
×
UNCOV
2137
    TAOS_RETURN(code);
×
2138
  }
2139

2140
  action.pCont = pReq;
4,017,672✔
2141
  action.contLen = contLen;
4,017,672✔
2142
  action.msgType = TDMT_DND_DROP_VNODE;
4,017,672✔
2143
  action.acceptableCode = TSDB_CODE_VND_NOT_EXIST;
4,017,672✔
2144
  action.groupId = pVgroup->vgId;
4,017,672✔
2145

2146
  if (isRedo) {
4,017,672✔
2147
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
1,443,415✔
UNCOV
2148
      taosMemoryFree(pReq);
×
UNCOV
2149
      TAOS_RETURN(code);
×
2150
    }
2151
  } else {
2152
    if ((code = mndTransAppendUndoAction(pTrans, &action)) != 0) {
2,574,257✔
UNCOV
2153
      taosMemoryFree(pReq);
×
UNCOV
2154
      TAOS_RETURN(code);
×
2155
    }
2156
  }
2157

2158
  TAOS_RETURN(code);
4,017,672✔
2159
}
2160

2161
int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t vnIndex,
13,994✔
2162
                                    SArray *pArray, bool force, bool unsafe) {
2163
  int32_t code = 0;
13,994✔
2164
  SVgObj  newVg = {0};
13,994✔
2165
  memcpy(&newVg, pVgroup, sizeof(SVgObj));
13,994✔
2166

2167
  mInfo("vgId:%d, trans:%d, vgroup info before move, replica:%d", newVg.vgId, pTrans->id, newVg.replica);
13,994✔
2168
  for (int32_t i = 0; i < newVg.replica; ++i) {
45,194✔
2169
    mInfo("vgId:%d, trans:%d, vnode:%d dnode:%d", newVg.vgId, pTrans->id, i, newVg.vnodeGid[i].dnodeId);
31,200✔
2170
  }
2171

2172
  if (!force) {
13,994✔
2173
#if 1
2174
    {
2175
#else
2176
    if (newVg.replica == 1) {
2177
#endif
2178
      mInfo("vgId:%d, trans:%d, will add 1 vnode, replca:%d", pVgroup->vgId, pTrans->id, newVg.replica);
13,994✔
2179
      TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray));
13,994✔
2180
      for (int32_t i = 0; i < newVg.replica - 1; ++i) {
45,194✔
2181
        TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId));
31,200✔
2182
      }
2183
      TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[newVg.replica - 1]));
13,994✔
2184
      TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg));
13,994✔
2185

2186
      mInfo("vgId:%d, trans:%d, will remove 1 vnode, replca:2", pVgroup->vgId, pTrans->id);
13,994✔
2187
      newVg.replica--;
13,994✔
2188
      SVnodeGid del = newVg.vnodeGid[vnIndex];
13,994✔
2189
      newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
13,994✔
2190
      memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
13,994✔
2191
      {
2192
        SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
13,994✔
2193
        if (pRaw == NULL) {
13,994✔
UNCOV
2194
          code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2195
          if (terrno != 0) code = terrno;
×
2196
          TAOS_RETURN(code);
×
2197
        }
2198
        if ((code = mndTransAppendGroupRedolog(pTrans, pRaw, pVgroup->vgId)) != 0) {
13,994✔
UNCOV
2199
          sdbFreeRaw(pRaw);
×
UNCOV
2200
          TAOS_RETURN(code);
×
2201
        }
2202
        code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
13,994✔
2203
        if (code != 0) {
13,994✔
UNCOV
2204
          mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
UNCOV
2205
          return code;
×
2206
        }
2207
      }
2208

2209
      TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true));
13,994✔
2210
      for (int32_t i = 0; i < newVg.replica; ++i) {
45,194✔
2211
        TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId));
31,200✔
2212
      }
2213
      TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg));
13,994✔
2214
#if 1
2215
    }
2216
#else
2217
    } else {  // new replica == 3
2218
      mInfo("vgId:%d, will add 1 vnode, replca:3", pVgroup->vgId);
2219
      if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1;
2220
      mInfo("vgId:%d, will remove 1 vnode, replca:4", pVgroup->vgId);
2221
      newVg.replica--;
2222
      SVnodeGid del = newVg.vnodeGid[vnIndex];
2223
      newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
2224
      memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
2225
      {
2226
        SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
2227
        if (pRaw == NULL) return -1;
2228
        if (mndTransAppendRedolog(pTrans, pRaw) != 0) {
2229
          sdbFreeRaw(pRaw);
2230
          return -1;
2231
        }
2232
      }
2233

2234
      if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true) != 0) return -1;
2235
      for (int32_t i = 0; i < newVg.replica; ++i) {
2236
        if (i == vnIndex) continue;
2237
        if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1;
2238
      }
2239
      if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[vnIndex]) != 0) return -1;
2240
      if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
2241
    }
2242
#endif
2243
  } else {
2244
    mInfo("vgId:%d, will add 1 vnode and force remove 1 vnode", pVgroup->vgId);
×
2245
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray));
×
2246
    newVg.replica--;
×
2247
    // SVnodeGid del = newVg.vnodeGid[vnIndex];
UNCOV
2248
    newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
×
2249
    memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
×
2250
    {
2251
      SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
×
UNCOV
2252
      if (pRaw == NULL) {
×
2253
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2254
        if (terrno != 0) code = terrno;
×
2255
        TAOS_RETURN(code);
×
2256
      }
UNCOV
2257
      if ((code = mndTransAppendGroupRedolog(pTrans, pRaw, pVgroup->vgId)) != 0) {
×
UNCOV
2258
        sdbFreeRaw(pRaw);
×
UNCOV
2259
        TAOS_RETURN(code);
×
2260
      }
2261
      code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
×
2262
      if (code != 0) {
×
UNCOV
2263
        mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
UNCOV
2264
        return code;
×
2265
      }
2266
    }
2267

2268
    for (int32_t i = 0; i < newVg.replica; ++i) {
×
2269
      if (i != vnIndex) {
×
2270
        TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId));
×
2271
      }
2272
    }
2273
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[vnIndex]));
×
2274
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg));
×
2275

2276
    if (newVg.replica == 1) {
×
2277
      if (force && !unsafe) {
×
2278
        TAOS_RETURN(TSDB_CODE_VND_META_DATA_UNSAFE_DELETE);
×
2279
      }
2280

2281
      SSdb *pSdb = pMnode->pSdb;
×
2282
      void *pIter = NULL;
×
2283

2284
      while (1) {
×
2285
        SStbObj *pStb = NULL;
×
UNCOV
2286
        pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
×
UNCOV
2287
        if (pIter == NULL) break;
×
2288

2289
        if (strcmp(pStb->db, pDb->name) == 0) {
×
UNCOV
2290
          if ((code = mndSetForceDropCreateStbRedoActions(pMnode, pTrans, &newVg, pStb)) != 0) {
×
UNCOV
2291
            sdbCancelFetch(pSdb, pIter);
×
2292
            sdbRelease(pSdb, pStb);
×
UNCOV
2293
            TAOS_RETURN(code);
×
2294
          }
2295
        }
2296

UNCOV
2297
        sdbRelease(pSdb, pStb);
×
2298
      }
2299

2300
      mInfo("vgId:%d, all data is dropped since replica=1", pVgroup->vgId);
×
2301
    }
2302
  }
2303

2304
  {
2305
    SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
13,994✔
2306
    if (pRaw == NULL) {
13,994✔
UNCOV
2307
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2308
      if (terrno != 0) code = terrno;
×
2309
      TAOS_RETURN(code);
×
2310
    }
2311
    if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
13,994✔
UNCOV
2312
      sdbFreeRaw(pRaw);
×
UNCOV
2313
      TAOS_RETURN(code);
×
2314
    }
2315
    code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
13,994✔
2316
    if (code != 0) {
13,994✔
UNCOV
2317
      mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
UNCOV
2318
      return code;
×
2319
    }
2320
  }
2321

2322
  mInfo("vgId:%d, vgroup info after move, replica:%d", newVg.vgId, newVg.replica);
13,994✔
2323
  for (int32_t i = 0; i < newVg.replica; ++i) {
45,194✔
2324
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
31,200✔
2325
  }
2326
  TAOS_RETURN(code);
13,994✔
2327
}
2328

2329
int32_t mndSetMoveVgroupsInfoToTrans(SMnode *pMnode, STrans *pTrans, int32_t delDnodeId, bool force, bool unsafe) {
6,994✔
2330
  int32_t code = 0;
6,994✔
2331
  SArray *pArray = mndBuildDnodesArray(pMnode, delDnodeId, NULL);
6,994✔
2332
  if (pArray == NULL) {
6,994✔
UNCOV
2333
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2334
    if (terrno != 0) code = terrno;
×
UNCOV
2335
    TAOS_RETURN(code);
×
2336
  }
2337

2338
  void *pIter = NULL;
6,994✔
2339
  while (1) {
20,902✔
2340
    SVgObj *pVgroup = NULL;
27,896✔
2341
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
27,896✔
2342
    if (pIter == NULL) break;
27,896✔
2343

2344
    int32_t vnIndex = -1;
20,902✔
2345
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
41,918✔
2346
      if (pVgroup->vnodeGid[i].dnodeId == delDnodeId) {
35,010✔
2347
        vnIndex = i;
13,994✔
2348
        break;
13,994✔
2349
      }
2350
    }
2351

2352
    code = 0;
20,902✔
2353
    if (vnIndex != -1) {
20,902✔
2354
      mInfo("vgId:%d, trans:%d, vnode:%d will be removed from dnode:%d, force:%d", pVgroup->vgId, pTrans->id, vnIndex,
13,994✔
2355
            delDnodeId, force);
2356
      SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
13,994✔
2357
      code = mndSetMoveVgroupInfoToTrans(pMnode, pTrans, pDb, pVgroup, vnIndex, pArray, force, unsafe);
13,994✔
2358
      mndReleaseDb(pMnode, pDb);
13,994✔
2359
    }
2360

2361
    sdbRelease(pMnode->pSdb, pVgroup);
20,902✔
2362

2363
    if (code != 0) {
20,902✔
UNCOV
2364
      sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
2365
      break;
×
2366
    }
2367
  }
2368

2369
  taosArrayDestroy(pArray);
6,994✔
2370
  TAOS_RETURN(code);
6,994✔
2371
}
2372

2373
static int32_t mndAddIncVgroupReplicaToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
64,989✔
2374
                                             int32_t newDnodeId) {
2375
  int32_t code = 0;
64,989✔
2376
  mInfo("vgId:%d, will add 1 vnode, replica:%d dnode:%d", pVgroup->vgId, pVgroup->replica, newDnodeId);
64,989✔
2377

2378
  // assoc dnode
2379
  SVnodeGid *pGid = &pVgroup->vnodeGid[pVgroup->replica];
64,989✔
2380
  pVgroup->replica++;
64,989✔
2381
  pGid->dnodeId = newDnodeId;
64,989✔
2382
  pGid->syncState = TAOS_SYNC_STATE_OFFLINE;
64,989✔
2383
  pGid->nodeRole = TAOS_SYNC_ROLE_LEARNER;
64,989✔
2384

2385
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
64,989✔
2386
  if (pVgRaw == NULL) {
64,989✔
UNCOV
2387
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2388
    if (terrno != 0) code = terrno;
×
2389
    TAOS_RETURN(code);
×
2390
  }
2391
  if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
64,989✔
UNCOV
2392
    sdbFreeRaw(pVgRaw);
×
UNCOV
2393
    TAOS_RETURN(code);
×
2394
  }
2395
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
64,989✔
2396
  if (code != 0) {
64,989✔
UNCOV
2397
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVgroup->vgId, tstrerror(code), __LINE__);
×
UNCOV
2398
    TAOS_RETURN(code);
×
2399
  }
2400

2401
  // learner
2402
  for (int32_t i = 0; i < pVgroup->replica - 1; ++i) {
220,826✔
2403
    TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId));
155,837✔
2404
  }
2405
  TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pGid));
64,989✔
2406

2407
  // voter
2408
  pGid->nodeRole = TAOS_SYNC_ROLE_VOTER;
64,989✔
2409
  TAOS_CHECK_RETURN(mndAddAlterVnodeTypeAction(pMnode, pTrans, pDb, pVgroup, pGid->dnodeId));
64,989✔
2410
  for (int32_t i = 0; i < pVgroup->replica - 1; ++i) {
220,826✔
2411
    TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId));
155,837✔
2412
  }
2413

2414
  // confirm
2415
  TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup));
64,989✔
2416

2417
  TAOS_RETURN(code);
64,989✔
2418
}
2419

2420
static int32_t mndAddDecVgroupReplicaFromTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
64,989✔
2421
                                               int32_t delDnodeId) {
2422
  int32_t code = 0;
64,989✔
2423
  mInfo("vgId:%d, will remove 1 vnode, replica:%d dnode:%d", pVgroup->vgId, pVgroup->replica, delDnodeId);
64,989✔
2424

2425
  SVnodeGid *pGid = NULL;
64,989✔
2426
  SVnodeGid  delGid = {0};
64,989✔
2427
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
113,048✔
2428
    if (pVgroup->vnodeGid[i].dnodeId == delDnodeId) {
113,048✔
2429
      pGid = &pVgroup->vnodeGid[i];
64,989✔
2430
      break;
64,989✔
2431
    }
2432
  }
2433

2434
  if (pGid == NULL) return 0;
64,989✔
2435

2436
  pVgroup->replica--;
64,989✔
2437
  memcpy(&delGid, pGid, sizeof(SVnodeGid));
64,989✔
2438
  memcpy(pGid, &pVgroup->vnodeGid[pVgroup->replica], sizeof(SVnodeGid));
64,989✔
2439
  memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
64,989✔
2440

2441
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
64,989✔
2442
  if (pVgRaw == NULL) {
64,989✔
UNCOV
2443
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2444
    if (terrno != 0) code = terrno;
×
2445
    TAOS_RETURN(code);
×
2446
  }
2447
  if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
64,989✔
UNCOV
2448
    sdbFreeRaw(pVgRaw);
×
UNCOV
2449
    TAOS_RETURN(code);
×
2450
  }
2451
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
64,989✔
2452
  if (code != 0) {
64,989✔
UNCOV
2453
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVgroup->vgId, tstrerror(code), __LINE__);
×
UNCOV
2454
    TAOS_RETURN(code);
×
2455
  }
2456

2457
  TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pDb, pVgroup, &delGid, true));
64,989✔
2458
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
220,826✔
2459
    TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId));
155,837✔
2460
  }
2461
  TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup));
64,989✔
2462

2463
  TAOS_RETURN(code);
64,989✔
2464
}
2465

2466
static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup, SDnodeObj *pNew1,
36,330✔
2467
                                     SDnodeObj *pOld1, SDnodeObj *pNew2, SDnodeObj *pOld2, SDnodeObj *pNew3,
2468
                                     SDnodeObj *pOld3) {
2469
  int32_t code = -1;
36,330✔
2470
  STrans *pTrans = NULL;
36,330✔
2471

2472
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "red-vgroup");
36,330✔
2473
  if (pTrans == NULL) {
36,330✔
UNCOV
2474
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2475
    if (terrno != 0) code = terrno;
×
UNCOV
2476
    goto _OVER;
×
2477
  }
2478

2479
  mndTransSetDbName(pTrans, pVgroup->dbName, NULL);
36,330✔
2480
  TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
36,330✔
2481
  TAOS_CHECK_GOTO(mndTransCheckConflictWithRetention(pMnode, pTrans), NULL, _OVER);
36,267✔
2482

2483
  mndTransSetSerial(pTrans);
36,267✔
2484
  mInfo("trans:%d, used to redistribute vgroup, vgId:%d", pTrans->id, pVgroup->vgId);
36,267✔
2485

2486
  SVgObj newVg = {0};
36,267✔
2487
  memcpy(&newVg, pVgroup, sizeof(SVgObj));
36,267✔
2488
  mInfo("vgId:%d, vgroup info before redistribute, replica:%d", newVg.vgId, newVg.replica);
36,267✔
2489
  for (int32_t i = 0; i < newVg.replica; ++i) {
122,516✔
2490
    mInfo("vgId:%d, vnode:%d dnode:%d role:%s", newVg.vgId, i, newVg.vnodeGid[i].dnodeId,
86,249✔
2491
          syncStr(newVg.vnodeGid[i].syncState));
2492
  }
2493

2494
  if (pNew1 != NULL && pOld1 != NULL) {
36,267✔
2495
    int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew1->id);
36,267✔
2496
    if (numOfVnodes >= pNew1->numOfSupportVnodes) {
36,267✔
2497
      mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew1->id, numOfVnodes,
676✔
2498
             pNew1->numOfSupportVnodes);
2499
      code = TSDB_CODE_MND_NO_ENOUGH_VNODES;
676✔
2500
      goto _OVER;
676✔
2501
    }
2502

2503
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
35,591✔
2504
    if (pNew1->memAvail - vgMem - pNew1->memUsed <= 0) {
35,591✔
UNCOV
2505
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
2506
             pVgroup->dbName, pVgroup->vgId, vgMem, pNew1->id, pNew1->memAvail, pNew1->memUsed);
UNCOV
2507
      code = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
×
UNCOV
2508
      goto _OVER;
×
2509
    } else {
2510
      pNew1->memUsed += vgMem;
35,591✔
2511
    }
2512

2513
    TAOS_CHECK_GOTO(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew1->id), NULL, _OVER);
35,591✔
2514
    TAOS_CHECK_GOTO(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld1->id), NULL, _OVER);
35,591✔
2515
  }
2516

2517
  if (pNew2 != NULL && pOld2 != NULL) {
35,591✔
2518
    int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew2->id);
10,780✔
2519
    if (numOfVnodes >= pNew2->numOfSupportVnodes) {
10,780✔
UNCOV
2520
      mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew2->id, numOfVnodes,
×
2521
             pNew2->numOfSupportVnodes);
2522
      code = TSDB_CODE_MND_NO_ENOUGH_VNODES;
×
UNCOV
2523
      goto _OVER;
×
2524
    }
2525
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
10,780✔
2526
    if (pNew2->memAvail - vgMem - pNew2->memUsed <= 0) {
10,780✔
UNCOV
2527
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
2528
             pVgroup->dbName, pVgroup->vgId, vgMem, pNew2->id, pNew2->memAvail, pNew2->memUsed);
UNCOV
2529
      code = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
×
UNCOV
2530
      goto _OVER;
×
2531
    } else {
2532
      pNew2->memUsed += vgMem;
10,780✔
2533
    }
2534
    TAOS_CHECK_GOTO(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew2->id), NULL, _OVER);
10,780✔
2535
    TAOS_CHECK_GOTO(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld2->id), NULL, _OVER);
10,780✔
2536
  }
2537

2538
  if (pNew3 != NULL && pOld3 != NULL) {
35,591✔
2539
    int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew3->id);
4,170✔
2540
    if (numOfVnodes >= pNew3->numOfSupportVnodes) {
4,170✔
UNCOV
2541
      mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew3->id, numOfVnodes,
×
2542
             pNew3->numOfSupportVnodes);
2543
      code = TSDB_CODE_MND_NO_ENOUGH_VNODES;
×
UNCOV
2544
      goto _OVER;
×
2545
    }
2546
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
4,170✔
2547
    if (pNew3->memAvail - vgMem - pNew3->memUsed <= 0) {
4,170✔
UNCOV
2548
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
2549
             pVgroup->dbName, pVgroup->vgId, vgMem, pNew3->id, pNew3->memAvail, pNew3->memUsed);
UNCOV
2550
      code = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
×
UNCOV
2551
      goto _OVER;
×
2552
    } else {
2553
      pNew3->memUsed += vgMem;
4,170✔
2554
    }
2555
    TAOS_CHECK_GOTO(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew3->id), NULL, _OVER);
4,170✔
2556
    TAOS_CHECK_GOTO(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld3->id), NULL, _OVER);
4,170✔
2557
  }
2558

2559
  {
2560
    SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
35,591✔
2561
    if (pRaw == NULL) {
35,591✔
UNCOV
2562
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2563
      if (terrno != 0) code = terrno;
×
2564
      goto _OVER;
×
2565
    }
2566
    if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
35,591✔
UNCOV
2567
      sdbFreeRaw(pRaw);
×
UNCOV
2568
      goto _OVER;
×
2569
    }
2570
    code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
35,591✔
2571
    if (code != 0) {
35,591✔
UNCOV
2572
      mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
UNCOV
2573
      goto _OVER;
×
2574
    }
2575
  }
2576

2577
  mInfo("vgId:%d, vgroup info after redistribute, replica:%d", newVg.vgId, newVg.replica);
35,591✔
2578
  for (int32_t i = 0; i < newVg.replica; ++i) {
119,812✔
2579
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
84,221✔
2580
  }
2581

2582
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
35,591✔
2583
  code = 0;
34,961✔
2584

2585
_OVER:
36,330✔
2586
  mndTransDrop(pTrans);
36,330✔
2587
  mndReleaseDb(pMnode, pDb);
36,330✔
2588
  TAOS_RETURN(code);
36,330✔
2589
}
2590

2591
static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) {
44,285✔
2592
  SMnode    *pMnode = pReq->info.node;
44,285✔
2593
  SDnodeObj *pNew1 = NULL;
44,285✔
2594
  SDnodeObj *pNew2 = NULL;
44,285✔
2595
  SDnodeObj *pNew3 = NULL;
44,285✔
2596
  SDnodeObj *pOld1 = NULL;
44,285✔
2597
  SDnodeObj *pOld2 = NULL;
44,285✔
2598
  SDnodeObj *pOld3 = NULL;
44,285✔
2599
  SVgObj    *pVgroup = NULL;
44,285✔
2600
  SDbObj    *pDb = NULL;
44,285✔
2601
  int32_t    code = -1;
44,285✔
2602
  int64_t    curMs = taosGetTimestampMs();
44,285✔
2603
  int32_t    newDnodeId[3] = {0};
44,285✔
2604
  int32_t    oldDnodeId[3] = {0};
44,285✔
2605
  int32_t    newIndex = -1;
44,285✔
2606
  int32_t    oldIndex = -1;
44,285✔
2607
  int64_t    tss = taosGetTimestampMs();
44,285✔
2608

2609
  SRedistributeVgroupReq req = {0};
44,285✔
2610
  if (tDeserializeSRedistributeVgroupReq(pReq->pCont, pReq->contLen, &req) != 0) {
44,285✔
UNCOV
2611
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
2612
    goto _OVER;
×
2613
  }
2614

2615
  mInfo("vgId:%d, start to redistribute vgroup to dnode %d:%d:%d", req.vgId, req.dnodeId1, req.dnodeId2, req.dnodeId3);
44,285✔
2616
  if ((code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_REDISTRIBUTE_VGROUP)) != 0) {
44,285✔
UNCOV
2617
    goto _OVER;
×
2618
  }
2619

2620
  pVgroup = mndAcquireVgroup(pMnode, req.vgId);
44,285✔
2621
  if (pVgroup == NULL) {
44,285✔
2622
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
2,028✔
2623
    if (terrno != 0) code = terrno;
2,028✔
2624
    goto _OVER;
2,028✔
2625
  }
2626
  if (pVgroup->mountVgId) {
42,257✔
UNCOV
2627
    code = TSDB_CODE_MND_MOUNT_OBJ_NOT_SUPPORT;
×
UNCOV
2628
    goto _OVER;
×
2629
  }
2630
  pDb = mndAcquireDb(pMnode, pVgroup->dbName);
42,257✔
2631
  if (pDb == NULL) {
42,257✔
2632
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2633
    if (terrno != 0) code = terrno;
×
UNCOV
2634
    goto _OVER;
×
2635
  }
2636

2637
  if (pVgroup->replica == 1) {
42,257✔
2638
    if (req.dnodeId1 <= 0 || req.dnodeId2 > 0 || req.dnodeId3 > 0) {
10,654✔
UNCOV
2639
      code = TSDB_CODE_MND_INVALID_REPLICA;
×
UNCOV
2640
      goto _OVER;
×
2641
    }
2642

2643
    if (req.dnodeId1 == pVgroup->vnodeGid[0].dnodeId) {
10,654✔
2644
      // terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
2645
      code = 0;
×
UNCOV
2646
      goto _OVER;
×
2647
    }
2648

2649
    pNew1 = mndAcquireDnode(pMnode, req.dnodeId1);
10,654✔
2650
    if (pNew1 == NULL) {
10,654✔
UNCOV
2651
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2652
      if (terrno != 0) code = terrno;
×
UNCOV
2653
      goto _OVER;
×
2654
    }
2655
    if (!mndIsDnodeOnline(pNew1, curMs)) {
10,654✔
2656
      code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
UNCOV
2657
      goto _OVER;
×
2658
    }
2659

2660
    pOld1 = mndAcquireDnode(pMnode, pVgroup->vnodeGid[0].dnodeId);
10,654✔
2661
    if (pOld1 == NULL) {
10,654✔
UNCOV
2662
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2663
      if (terrno != 0) code = terrno;
×
UNCOV
2664
      goto _OVER;
×
2665
    }
2666
    if (!mndIsDnodeOnline(pOld1, curMs)) {
10,654✔
UNCOV
2667
      code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
UNCOV
2668
      goto _OVER;
×
2669
    }
2670

2671
    code = mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, NULL, NULL, NULL, NULL);
10,654✔
2672

2673
  } else if (pVgroup->replica == 3) {
31,603✔
2674
    if (req.dnodeId1 <= 0 || req.dnodeId2 <= 0 || req.dnodeId3 <= 0) {
30,233✔
2675
      code = TSDB_CODE_MND_INVALID_REPLICA;
2,704✔
2676
      goto _OVER;
2,704✔
2677
    }
2678

2679
    if (req.dnodeId1 == req.dnodeId2 || req.dnodeId1 == req.dnodeId3 || req.dnodeId2 == req.dnodeId3) {
27,529✔
2680
      code = TSDB_CODE_MND_INVALID_REPLICA;
676✔
2681
      goto _OVER;
676✔
2682
    }
2683

2684
    if (req.dnodeId1 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId1 != pVgroup->vnodeGid[1].dnodeId &&
26,853✔
2685
        req.dnodeId1 != pVgroup->vnodeGid[2].dnodeId) {
13,260✔
2686
      newDnodeId[++newIndex] = req.dnodeId1;
11,251✔
2687
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
11,251✔
2688
    }
2689

2690
    if (req.dnodeId2 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[1].dnodeId &&
26,853✔
2691
        req.dnodeId2 != pVgroup->vnodeGid[2].dnodeId) {
18,050✔
2692
      newDnodeId[++newIndex] = req.dnodeId2;
13,168✔
2693
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
13,168✔
2694
    }
2695

2696
    if (req.dnodeId3 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId3 != pVgroup->vnodeGid[1].dnodeId &&
26,853✔
2697
        req.dnodeId3 != pVgroup->vnodeGid[2].dnodeId) {
21,292✔
2698
      newDnodeId[++newIndex] = req.dnodeId3;
17,233✔
2699
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
17,233✔
2700
    }
2701

2702
    if (req.dnodeId1 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[0].dnodeId &&
26,853✔
2703
        req.dnodeId3 != pVgroup->vnodeGid[0].dnodeId) {
13,893✔
2704
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[0].dnodeId;
12,561✔
2705
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
12,561✔
2706
    }
2707

2708
    if (req.dnodeId1 != pVgroup->vnodeGid[1].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[1].dnodeId &&
26,853✔
2709
        req.dnodeId3 != pVgroup->vnodeGid[1].dnodeId) {
17,417✔
2710
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[1].dnodeId;
13,188✔
2711
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
13,188✔
2712
    }
2713

2714
    if (req.dnodeId1 != pVgroup->vnodeGid[2].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[2].dnodeId &&
26,853✔
2715
        req.dnodeId3 != pVgroup->vnodeGid[2].dnodeId) {
19,962✔
2716
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[2].dnodeId;
15,903✔
2717
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
15,903✔
2718
    }
2719

2720
    if (newDnodeId[0] != 0) {
26,853✔
2721
      pNew1 = mndAcquireDnode(pMnode, newDnodeId[0]);
26,012✔
2722
      if (pNew1 == NULL) {
26,012✔
UNCOV
2723
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2724
        if (terrno != 0) code = terrno;
×
UNCOV
2725
        goto _OVER;
×
2726
      }
2727
      if (!mndIsDnodeOnline(pNew1, curMs)) {
26,012✔
2728
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
676✔
2729
        goto _OVER;
676✔
2730
      }
2731
    }
2732

2733
    if (newDnodeId[1] != 0) {
26,177✔
2734
      pNew2 = mndAcquireDnode(pMnode, newDnodeId[1]);
10,440✔
2735
      if (pNew2 == NULL) {
10,440✔
UNCOV
2736
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2737
        if (terrno != 0) code = terrno;
×
UNCOV
2738
        goto _OVER;
×
2739
      }
2740
      if (!mndIsDnodeOnline(pNew2, curMs)) {
10,440✔
2741
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
255✔
2742
        goto _OVER;
255✔
2743
      }
2744
    }
2745

2746
    if (newDnodeId[2] != 0) {
25,922✔
2747
      pNew3 = mndAcquireDnode(pMnode, newDnodeId[2]);
4,945✔
2748
      if (pNew3 == NULL) {
4,945✔
UNCOV
2749
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2750
        if (terrno != 0) code = terrno;
×
UNCOV
2751
        goto _OVER;
×
2752
      }
2753
      if (!mndIsDnodeOnline(pNew3, curMs)) {
4,945✔
2754
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2755
        goto _OVER;
×
2756
      }
2757
    }
2758

2759
    if (oldDnodeId[0] != 0) {
25,922✔
2760
      pOld1 = mndAcquireDnode(pMnode, oldDnodeId[0]);
25,081✔
2761
      if (pOld1 == NULL) {
25,081✔
UNCOV
2762
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2763
        if (terrno != 0) code = terrno;
×
UNCOV
2764
        goto _OVER;
×
2765
      }
2766
      if (!mndIsDnodeOnline(pOld1, curMs)) {
25,081✔
2767
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
775✔
2768
        goto _OVER;
775✔
2769
      }
2770
    }
2771

2772
    if (oldDnodeId[1] != 0) {
25,147✔
2773
      pOld2 = mndAcquireDnode(pMnode, oldDnodeId[1]);
9,410✔
2774
      if (pOld2 == NULL) {
9,410✔
UNCOV
2775
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2776
        if (terrno != 0) code = terrno;
×
UNCOV
2777
        goto _OVER;
×
2778
      }
2779
      if (!mndIsDnodeOnline(pOld2, curMs)) {
9,410✔
2780
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2781
        goto _OVER;
×
2782
      }
2783
    }
2784

2785
    if (oldDnodeId[2] != 0) {
25,147✔
2786
      pOld3 = mndAcquireDnode(pMnode, oldDnodeId[2]);
4,170✔
2787
      if (pOld3 == NULL) {
4,170✔
UNCOV
2788
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2789
        if (terrno != 0) code = terrno;
×
UNCOV
2790
        goto _OVER;
×
2791
      }
2792
      if (!mndIsDnodeOnline(pOld3, curMs)) {
4,170✔
UNCOV
2793
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
UNCOV
2794
        goto _OVER;
×
2795
      }
2796
    }
2797

2798
    if (pNew1 == NULL && pOld1 == NULL && pNew2 == NULL && pOld2 == NULL && pNew3 == NULL && pOld3 == NULL) {
25,147✔
2799
      // terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
2800
      code = 0;
841✔
2801
      goto _OVER;
841✔
2802
    }
2803

2804
    code = mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, pNew2, pOld2, pNew3, pOld3);
24,306✔
2805

2806
  } else if (pVgroup->replica == 2) {
1,370✔
2807
    if (req.dnodeId1 <= 0 || req.dnodeId2 <= 0) {
1,370✔
UNCOV
2808
      code = TSDB_CODE_MND_INVALID_REPLICA;
×
UNCOV
2809
      goto _OVER;
×
2810
    }
2811

2812
    if (req.dnodeId1 == req.dnodeId2) {
1,370✔
UNCOV
2813
      code = TSDB_CODE_MND_INVALID_REPLICA;
×
UNCOV
2814
      goto _OVER;
×
2815
    }
2816

2817
    if (req.dnodeId1 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId1 != pVgroup->vnodeGid[1].dnodeId) {
1,370✔
2818
      newDnodeId[++newIndex] = req.dnodeId1;
1,370✔
2819
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
1,370✔
2820
    }
2821

2822
    if (req.dnodeId2 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[1].dnodeId) {
1,370✔
2823
      newDnodeId[++newIndex] = req.dnodeId2;
1,370✔
2824
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
1,370✔
2825
    }
2826

2827
    if (req.dnodeId1 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[0].dnodeId) {
1,370✔
2828
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[0].dnodeId;
1,370✔
2829
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
1,370✔
2830
    }
2831

2832
    if (req.dnodeId1 != pVgroup->vnodeGid[1].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[1].dnodeId) {
1,370✔
2833
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[1].dnodeId;
1,370✔
2834
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
1,370✔
2835
    }
2836

2837
    if (newDnodeId[0] != 0) {
1,370✔
2838
      pNew1 = mndAcquireDnode(pMnode, newDnodeId[0]);
1,370✔
2839
      if (pNew1 == NULL) {
1,370✔
UNCOV
2840
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2841
        if (terrno != 0) code = terrno;
×
UNCOV
2842
        goto _OVER;
×
2843
      }
2844
      if (!mndIsDnodeOnline(pNew1, curMs)) {
1,370✔
2845
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2846
        goto _OVER;
×
2847
      }
2848
    }
2849

2850
    if (newDnodeId[1] != 0) {
1,370✔
2851
      pNew2 = mndAcquireDnode(pMnode, newDnodeId[1]);
1,370✔
2852
      if (pNew2 == NULL) {
1,370✔
UNCOV
2853
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2854
        if (terrno != 0) code = terrno;
×
UNCOV
2855
        goto _OVER;
×
2856
      }
2857
      if (!mndIsDnodeOnline(pNew2, curMs)) {
1,370✔
2858
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2859
        goto _OVER;
×
2860
      }
2861
    }
2862

2863
    if (oldDnodeId[0] != 0) {
1,370✔
2864
      pOld1 = mndAcquireDnode(pMnode, oldDnodeId[0]);
1,370✔
2865
      if (pOld1 == NULL) {
1,370✔
UNCOV
2866
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2867
        if (terrno != 0) code = terrno;
×
UNCOV
2868
        goto _OVER;
×
2869
      }
2870
      if (!mndIsDnodeOnline(pOld1, curMs)) {
1,370✔
2871
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
2872
        goto _OVER;
×
2873
      }
2874
    }
2875

2876
    if (oldDnodeId[1] != 0) {
1,370✔
2877
      pOld2 = mndAcquireDnode(pMnode, oldDnodeId[1]);
1,370✔
2878
      if (pOld2 == NULL) {
1,370✔
UNCOV
2879
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2880
        if (terrno != 0) code = terrno;
×
UNCOV
2881
        goto _OVER;
×
2882
      }
2883
      if (!mndIsDnodeOnline(pOld2, curMs)) {
1,370✔
2884
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
UNCOV
2885
        goto _OVER;
×
2886
      }
2887
    }
2888

2889
    if (pNew1 == NULL && pOld1 == NULL && pNew2 == NULL && pOld2 == NULL) {
1,370✔
2890
      // terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
UNCOV
2891
      code = 0;
×
UNCOV
2892
      goto _OVER;
×
2893
    }
2894

2895
    code = mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, pNew2, pOld2, NULL, NULL);
1,370✔
2896
  } else {
UNCOV
2897
    code = TSDB_CODE_MND_REQ_REJECTED;
×
UNCOV
2898
    goto _OVER;
×
2899
  }
2900

2901
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
36,330✔
2902

2903
  if (tsAuditLevel >= AUDIT_LEVEL_CLUSTER) {
36,330✔
2904
    char obj[33] = {0};
36,330✔
2905
    (void)tsnprintf(obj, sizeof(obj), "%d", req.vgId);
36,330✔
2906

2907
    int64_t tse = taosGetTimestampMs();
36,330✔
2908
    double  duration = (double)(tse - tss);
36,330✔
2909
    duration = duration / 1000;
36,330✔
2910
    auditRecord(pReq, pMnode->clusterId, "RedistributeVgroup", "", obj, req.sql, req.sqlLen, duration, 0);
36,330✔
2911
  }
2912
_OVER:
44,285✔
2913
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
44,285✔
2914
    mError("vgId:%d, failed to redistribute to dnode %d:%d:%d since %s", req.vgId, req.dnodeId1, req.dnodeId2,
8,483✔
2915
           req.dnodeId3, tstrerror(code));
2916
  }
2917

2918
  mndReleaseDnode(pMnode, pNew1);
44,285✔
2919
  mndReleaseDnode(pMnode, pNew2);
44,285✔
2920
  mndReleaseDnode(pMnode, pNew3);
44,285✔
2921
  mndReleaseDnode(pMnode, pOld1);
44,285✔
2922
  mndReleaseDnode(pMnode, pOld2);
44,285✔
2923
  mndReleaseDnode(pMnode, pOld3);
44,285✔
2924
  mndReleaseVgroup(pMnode, pVgroup);
44,285✔
2925
  mndReleaseDb(pMnode, pDb);
44,285✔
2926
  tFreeSRedistributeVgroupReq(&req);
44,285✔
2927

2928
  TAOS_RETURN(code);
44,285✔
2929
}
2930

2931
static void *mndBuildSForceBecomeFollowerReq(SMnode *pMnode, SVgObj *pVgroup, int32_t dnodeId, int32_t *pContLen) {
4,017✔
2932
  SForceBecomeFollowerReq balanceReq = {
4,017✔
2933
      .vgId = pVgroup->vgId,
4,017✔
2934
  };
2935

2936
  int32_t contLen = tSerializeSForceBecomeFollowerReq(NULL, 0, &balanceReq);
4,017✔
2937
  if (contLen < 0) {
4,017✔
2938
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
2939
    return NULL;
×
2940
  }
2941
  contLen += sizeof(SMsgHead);
4,017✔
2942

2943
  void *pReq = taosMemoryMalloc(contLen);
4,017✔
2944
  if (pReq == NULL) {
4,017✔
UNCOV
2945
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2946
    return NULL;
×
2947
  }
2948

2949
  SMsgHead *pHead = pReq;
4,017✔
2950
  pHead->contLen = htonl(contLen);
4,017✔
2951
  pHead->vgId = htonl(pVgroup->vgId);
4,017✔
2952

2953
  if (tSerializeSForceBecomeFollowerReq((char *)pReq + sizeof(SMsgHead), contLen, &balanceReq) < 0) {
4,017✔
UNCOV
2954
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
2955
    taosMemoryFree(pReq);
×
UNCOV
2956
    return NULL;
×
2957
  }
2958
  *pContLen = contLen;
4,017✔
2959
  return pReq;
4,017✔
2960
}
2961

2962
int32_t mndAddBalanceVgroupLeaderAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t dnodeId) {
4,017✔
2963
  int32_t    code = 0;
4,017✔
2964
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
4,017✔
2965
  if (pDnode == NULL) {
4,017✔
UNCOV
2966
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2967
    if (terrno != 0) code = terrno;
×
UNCOV
2968
    TAOS_RETURN(code);
×
2969
  }
2970

2971
  STransAction action = {0};
4,017✔
2972
  action.epSet = mndGetDnodeEpset(pDnode);
4,017✔
2973
  mndReleaseDnode(pMnode, pDnode);
4,017✔
2974

2975
  int32_t contLen = 0;
4,017✔
2976
  void   *pReq = mndBuildSForceBecomeFollowerReq(pMnode, pVgroup, dnodeId, &contLen);
4,017✔
2977
  if (pReq == NULL) {
4,017✔
UNCOV
2978
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2979
    if (terrno != 0) code = terrno;
×
2980
    TAOS_RETURN(code);
×
2981
  }
2982

2983
  action.pCont = pReq;
4,017✔
2984
  action.contLen = contLen;
4,017✔
2985
  action.msgType = TDMT_SYNC_FORCE_FOLLOWER;
4,017✔
2986

2987
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
4,017✔
UNCOV
2988
    taosMemoryFree(pReq);
×
UNCOV
2989
    TAOS_RETURN(code);
×
2990
  }
2991

2992
  TAOS_RETURN(code);
4,017✔
2993
}
2994

2995
static void *mndBuildAlterVnodeElectBaselineReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId,
24,102✔
2996
                                          int32_t *pContLen, int32_t ms) {
2997
  SAlterVnodeElectBaselineReq alterReq = {
24,102✔
2998
      .vgId = pVgroup->vgId,
24,102✔
2999
      .electBaseLine = ms,
3000
  };
3001

3002
  int32_t contLen = tSerializeSAlterVnodeReplicaReq(NULL, 0, &alterReq);
24,102✔
3003
  if (contLen < 0) {
24,102✔
UNCOV
3004
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
3005
    return NULL;
×
3006
  }
3007

3008
  void *pReq = taosMemoryMalloc(contLen);
24,102✔
3009
  if (pReq == NULL) {
24,102✔
3010
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
3011
    return NULL;
×
3012
  }
3013

3014
  if (tSerializeSAlterVnodeReplicaReq(pReq, contLen, &alterReq) < 0) {
24,102✔
UNCOV
3015
    mError("vgId:%d, failed to serialize alter vnode req,since %s", alterReq.vgId, terrstr());
×
UNCOV
3016
    taosMemoryFree(pReq);
×
UNCOV
3017
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
3018
    return NULL;
×
3019
  }
3020
  *pContLen = contLen;
24,102✔
3021
  return pReq;
24,102✔
3022
}
3023

3024
static int32_t mndAddAlterVnodeElectionBaselineActionToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId, int32_t ms) {
24,102✔
3025
  int32_t    code = 0;
24,102✔
3026
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
24,102✔
3027
  if (pDnode == NULL) {
24,102✔
UNCOV
3028
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
3029
    if (terrno != 0) code = terrno;
×
UNCOV
3030
    TAOS_RETURN(code);
×
3031
  }
3032

3033
  STransAction action = {0};
24,102✔
3034
  action.epSet = mndGetDnodeEpset(pDnode);
24,102✔
3035
  mndReleaseDnode(pMnode, pDnode);
24,102✔
3036

3037
  int32_t contLen = 0;
24,102✔
3038
  void   *pReq = mndBuildAlterVnodeElectBaselineReq(pMnode, pDb, pVgroup, dnodeId, &contLen, ms);
24,102✔
3039
  if (pReq == NULL) {
24,102✔
UNCOV
3040
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
3041
    if (terrno != 0) code = terrno;
×
UNCOV
3042
    TAOS_RETURN(code);
×
3043
  }
3044

3045
  action.pCont = pReq;
24,102✔
3046
  action.contLen = contLen;
24,102✔
3047
  action.msgType = TDMT_VND_ALTER_ELECTBASELINE;
24,102✔
3048
  action.groupId = pVgroup->vgId;
24,102✔
3049

3050
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
24,102✔
UNCOV
3051
    taosMemoryFree(pReq);
×
UNCOV
3052
    TAOS_RETURN(code);
×
3053
  }
3054

3055
  TAOS_RETURN(code);
24,102✔
3056
}
3057

3058
static int32_t mndAddAlterVgroupElectionBaselineActionToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTrans, int32_t index){
8,034✔
3059
  int32_t code = 0;
8,034✔
3060
  SSdb   *pSdb = pMnode->pSdb;
8,034✔
3061

3062
  int32_t vgid = pVgroup->vgId;
8,034✔
3063
  int8_t  replica = pVgroup->replica;
8,034✔
3064

3065
  if (pVgroup->replica <= 1) {
8,034✔
UNCOV
3066
    mInfo("trans:%d, vgid:%d no need to balance, replica:%d", pTrans->id, vgid, replica);
×
UNCOV
3067
    return -1;
×
3068
  }
3069

3070
  for(int32_t i = 0; i < 3; i++){
32,136✔
3071
    if(i == index%3){
24,102✔
3072
      mInfo("trans:%d, balance leader to dnode:%d", pTrans->id, pVgroup->vnodeGid[i].dnodeId);
4,017✔
3073
      TAOS_CHECK_RETURN(mndAddAlterVnodeElectionBaselineActionToTrans(pMnode, pTrans, NULL, pVgroup,
4,017✔
3074
                                                                      pVgroup->vnodeGid[i].dnodeId, 1500));
3075
    }
3076
    else{
3077
    TAOS_CHECK_RETURN(
20,085✔
3078
        mndAddAlterVnodeElectionBaselineActionToTrans(pMnode, pTrans, NULL, pVgroup, pVgroup->vnodeGid[i].dnodeId, 5000));
3079
    }
3080
  }
3081
  return code; 
8,034✔
3082
}
3083

3084
int32_t mndAddVgroupBalanceToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTrans, int32_t index) {
4,758✔
3085
  int32_t code = 0;
4,758✔
3086
  SSdb   *pSdb = pMnode->pSdb;
4,758✔
3087

3088
  int32_t vgid = pVgroup->vgId;
4,758✔
3089
  int8_t  replica = pVgroup->replica;
4,758✔
3090

3091
  if (pVgroup->replica <= 1) {
4,758✔
3092
    mInfo("trans:%d, vgid:%d no need to balance, replica:%d", pTrans->id, vgid, replica);
262✔
3093
    return -1;
262✔
3094
  }
3095

3096
  int32_t dnodeId = 0;
4,496✔
3097

3098
  for (int i = 0; i < replica; i++) {
9,106✔
3099
    if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEADER) {
8,627✔
3100
      dnodeId = pVgroup->vnodeGid[i].dnodeId;
4,017✔
3101
      break;
4,017✔
3102
    }
3103
  }
3104

3105
  bool       exist = false;
4,496✔
3106
  bool       online = false;
4,496✔
3107
  int64_t    curMs = taosGetTimestampMs();
4,496✔
3108
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
4,496✔
3109
  if (pDnode != NULL) {
4,496✔
3110
    exist = true;
4,017✔
3111
    online = mndIsDnodeOnline(pDnode, curMs);
4,017✔
3112
    mndReleaseDnode(pMnode, pDnode);
4,017✔
3113
  }
3114

3115
  if (exist && online) {
8,513✔
3116
    mInfo("trans:%d, vgid:%d force drop leader from dnode:%d", pTrans->id, vgid, dnodeId);    
4,017✔
3117
    TAOS_CHECK_RETURN(mndAddAlterVgroupElectionBaselineActionToTrans(pMnode, pVgroup, pTrans, index));
4,017✔
3118

3119
    if ((code = mndAddBalanceVgroupLeaderAction(pMnode, pTrans, pVgroup, dnodeId)) != 0) {
4,017✔
UNCOV
3120
      mError("trans:%d, vgid:%d failed to be balanced to dnode:%d", pTrans->id, vgid, dnodeId);
×
UNCOV
3121
      TAOS_RETURN(code);
×
3122
    }
3123

3124
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, NULL, pVgroup));
4,017✔
3125

3126
    TAOS_CHECK_RETURN(mndAddAlterVgroupElectionBaselineActionToTrans(pMnode, pVgroup, pTrans, -1));
4,017✔
3127

3128
    SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
4,017✔
3129
    if (pDb == NULL) {
4,017✔
UNCOV
3130
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
3131
      if (terrno != 0) code = terrno;
×
UNCOV
3132
      mError("trans:%d, vgid:%d failed to be balanced to dnode:%d, because db not exist", pTrans->id, vgid, dnodeId);
×
UNCOV
3133
      TAOS_RETURN(code);
×
3134
    }
3135

3136
    mndReleaseDb(pMnode, pDb);
4,017✔
3137
  } else {
3138
    mInfo("trans:%d, vgid:%d cant be balanced to dnode:%d, exist:%d, online:%d", pTrans->id, vgid, dnodeId, exist,
479✔
3139
          online);
3140
  }
3141

3142
  TAOS_RETURN(code);
4,496✔
3143
}
3144

3145
extern int32_t mndProcessVgroupBalanceLeaderMsgImp(SRpcMsg *pReq);
3146

3147
int32_t mndProcessVgroupBalanceLeaderMsg(SRpcMsg *pReq) { return mndProcessVgroupBalanceLeaderMsgImp(pReq); }
2,025✔
3148

3149
#ifndef TD_ENTERPRISE
3150
int32_t mndProcessVgroupBalanceLeaderMsgImp(SRpcMsg *pReq) { return 0; }
3151
#endif
3152

3153
static int32_t mndCheckDnodeMemory(SMnode *pMnode, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pOldVgroup,
197,350✔
3154
                                   SVgObj *pNewVgroup, SArray *pArray) {
3155
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
591,362✔
3156
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
394,012✔
3157
    bool       inVgroup = false;
394,012✔
3158
    int64_t    oldMemUsed = 0;
394,012✔
3159
    int64_t    newMemUsed = 0;
394,012✔
3160
    mDebug("db:%s, vgId:%d, check dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName, pNewVgroup->vgId,
394,012✔
3161
           pDnode->id, pDnode->memAvail, pDnode->memUsed);
3162
    for (int32_t j = 0; j < pOldVgroup->replica; ++j) {
1,132,796✔
3163
      SVnodeGid *pVgId = &pOldVgroup->vnodeGid[j];
738,784✔
3164
      if (pDnode->id == pVgId->dnodeId) {
738,784✔
3165
        oldMemUsed = mndGetVgroupMemory(pMnode, pOldDb, pOldVgroup);
312,274✔
3166
        inVgroup = true;
312,274✔
3167
      }
3168
    }
3169
    for (int32_t j = 0; j < pNewVgroup->replica; ++j) {
1,132,796✔
3170
      SVnodeGid *pVgId = &pNewVgroup->vnodeGid[j];
738,784✔
3171
      if (pDnode->id == pVgId->dnodeId) {
738,784✔
3172
        newMemUsed = mndGetVgroupMemory(pMnode, pNewDb, pNewVgroup);
312,274✔
3173
        inVgroup = true;
312,274✔
3174
      }
3175
    }
3176

3177
    mDebug("db:%s, vgId:%d, memory in dnode:%d, oldUsed:%" PRId64 ", newUsed:%" PRId64, pNewVgroup->dbName,
394,012✔
3178
           pNewVgroup->vgId, pDnode->id, oldMemUsed, newMemUsed);
3179

3180
    pDnode->memUsed = pDnode->memUsed - oldMemUsed + newMemUsed;
394,012✔
3181
    if (pDnode->memAvail - pDnode->memUsed <= 0) {
394,012✔
UNCOV
3182
      mError("db:%s, vgId:%d, no enough memory in dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName,
×
3183
             pNewVgroup->vgId, pDnode->id, pDnode->memAvail, pDnode->memUsed);
UNCOV
3184
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE);
×
3185
    } else if (inVgroup) {
394,012✔
3186
      mInfo("db:%s, vgId:%d, memory in dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName, pNewVgroup->vgId,
312,274✔
3187
            pDnode->id, pDnode->memAvail, pDnode->memUsed);
3188
    } else {
3189
    }
3190
  }
3191
  return 0;
197,350✔
3192
}
3193

3194
int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
233,425✔
3195
                                  SArray *pArray, SVgObj *pNewVgroup) {
3196
  int32_t code = 0;
233,425✔
3197
  memcpy(pNewVgroup, pVgroup, sizeof(SVgObj));
233,425✔
3198

3199
  if (pVgroup->replica <= 0 || pVgroup->replica == pNewDb->cfg.replications) {
233,425✔
3200
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfigAction(pMnode, pTrans, pNewDb, pVgroup));
197,350✔
3201
    TAOS_CHECK_RETURN(mndCheckDnodeMemory(pMnode, pOldDb, pNewDb, pNewVgroup, pVgroup, pArray));
197,350✔
3202
    return 0;
197,350✔
3203
  }
3204

3205
  // mndTransSetGroupParallel(pTrans);
3206

3207
  if (pNewDb->cfg.replications == 3) {
36,075✔
3208
    mInfo("trans:%d, db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pTrans->id, pVgroup->dbName, pVgroup->vgId,
31,337✔
3209
          pVgroup->vnodeGid[0].dnodeId);
3210

3211
    // add second
3212
    if (pNewVgroup->replica == 1) {
31,337✔
3213
      TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
31,337✔
3214
    }
3215

3216
    // learner stage
3217
    pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
30,655✔
3218
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
30,655✔
3219
    TAOS_CHECK_RETURN(
30,655✔
3220
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3221

3222
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &pNewVgroup->vnodeGid[1]));
30,655✔
3223

3224
    // follower stage
3225
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
30,655✔
3226
    TAOS_CHECK_RETURN(mndAddAlterVnodeTypeAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
30,655✔
3227
    TAOS_CHECK_RETURN(
30,655✔
3228
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3229

3230
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
30,655✔
3231

3232
    // add third
3233
    if (pNewVgroup->replica == 2) {
30,655✔
3234
      TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
30,655✔
3235
    }
3236

3237
    pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
27,958✔
3238
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
27,958✔
3239
    pNewVgroup->vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
27,958✔
3240
    TAOS_CHECK_RETURN(
27,958✔
3241
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3242
    TAOS_CHECK_RETURN(
27,958✔
3243
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
3244
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &pNewVgroup->vnodeGid[2]));
27,958✔
3245

3246
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
27,958✔
3247
  } else if (pNewDb->cfg.replications == 1) {
4,738✔
3248
    mInfo("trans:%d, db:%s, vgId:%d, will remove 2 vnodes, vn:0 dnode:%d vn:1 dnode:%d vn:2 dnode:%d", pTrans->id,
3,380✔
3249
          pVgroup->dbName, pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId, pVgroup->vnodeGid[1].dnodeId,
3250
          pVgroup->vnodeGid[2].dnodeId);
3251

3252
    SVnodeGid del1 = {0};
3,380✔
3253
    SVnodeGid del2 = {0};
3,380✔
3254
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroup(pMnode, pTrans, pNewVgroup, pArray, &del1));
3,380✔
3255
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &del1, true));
3,380✔
3256
    TAOS_CHECK_RETURN(
3,380✔
3257
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3258
    TAOS_CHECK_RETURN(
3,380✔
3259
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
3260
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
3,380✔
3261

3262
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroup(pMnode, pTrans, pNewVgroup, pArray, &del2));
3,380✔
3263
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &del2, true));
3,380✔
3264
    TAOS_CHECK_RETURN(
3,380✔
3265
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3266
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
3,380✔
3267
  } else if (pNewDb->cfg.replications == 2) {
1,358✔
3268
    mInfo("trans:%d, db:%s, vgId:%d, will add 1 vnode, vn:0 dnode:%d", pTrans->id, pVgroup->dbName, pVgroup->vgId,
1,358✔
3269
          pVgroup->vnodeGid[0].dnodeId);
3270

3271
    // add second
3272
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
1,358✔
3273

3274
    // learner stage
3275
    pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
1,358✔
3276
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
1,358✔
3277
    TAOS_CHECK_RETURN(
1,358✔
3278
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3279

3280
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &pNewVgroup->vnodeGid[1]));
1,358✔
3281

3282
    // follower stage
3283
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
1,358✔
3284
    TAOS_CHECK_RETURN(mndAddAlterVnodeTypeAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
1,358✔
3285
    TAOS_CHECK_RETURN(
1,358✔
3286
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3287

3288
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
1,358✔
3289
  } else {
3290
    return -1;
×
3291
  }
3292

3293
  mndSortVnodeGid(pNewVgroup);
32,696✔
3294

3295
  {
3296
    SSdbRaw *pVgRaw = mndVgroupActionEncode(pNewVgroup);
32,696✔
3297
    if (pVgRaw == NULL) {
32,696✔
UNCOV
3298
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
3299
      if (terrno != 0) code = terrno;
×
3300
      TAOS_RETURN(code);
×
3301
    }
3302
    if ((code = mndTransAppendCommitlog(pTrans, pVgRaw)) != 0) {
32,696✔
UNCOV
3303
      sdbFreeRaw(pVgRaw);
×
UNCOV
3304
      TAOS_RETURN(code);
×
3305
    }
3306
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
32,696✔
3307
    if (code != 0) {
32,696✔
3308
      mError("vgId:%d, failed to set raw status since %s at line:%d", pNewVgroup->vgId, tstrerror(code), __LINE__);
×
UNCOV
3309
      TAOS_RETURN(code);
×
3310
    }
3311
  }
3312

3313
  TAOS_RETURN(code);
32,696✔
3314
}
3315

3316
int32_t mndBuildRaftAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
×
3317
                                      SArray *pArray) {
UNCOV
3318
  int32_t code = 0;
×
UNCOV
3319
  SVgObj  newVgroup = {0};
×
3320
  memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
×
3321

3322
  if (pVgroup->replica <= 0 || pVgroup->replica == pNewDb->cfg.replications) {
×
UNCOV
3323
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfigAction(pMnode, pTrans, pNewDb, pVgroup));
×
UNCOV
3324
    TAOS_CHECK_RETURN(mndCheckDnodeMemory(pMnode, pOldDb, pNewDb, &newVgroup, pVgroup, pArray));
×
3325
    return 0;
×
3326
  }
3327

UNCOV
3328
  mndTransSetSerial(pTrans);
×
3329

3330
  mInfo("trans:%d, vgId:%d, alter vgroup, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id, pVgroup->vgId,
×
3331
        pVgroup->syncConfChangeVer, pVgroup->version, pVgroup->replica);
3332

UNCOV
3333
  if (newVgroup.replica == 1 && pNewDb->cfg.replications == 3) {
×
UNCOV
3334
    mInfo("db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId,
×
3335
          pVgroup->vnodeGid[0].dnodeId);
3336

3337
    // add second
3338
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVgroup, pArray));
×
3339
    // add third
3340
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVgroup, pArray));
×
3341

3342
    // add learner stage
3343
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
UNCOV
3344
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3345
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3346
    TAOS_CHECK_RETURN(
×
3347
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
UNCOV
3348
    mInfo("trans:%d, vgId:%d, add change config, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id,
×
3349
          pVgroup->vgId, newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
3350
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[1]));
×
3351
    mInfo("trans:%d, vgId:%d, create vnode, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id, pVgroup->vgId,
×
3352
          newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
3353
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[2]));
×
UNCOV
3354
    mInfo("trans:%d, vgId:%d, create vnode, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id, pVgroup->vgId,
×
3355
          newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
3356

3357
    // check learner
UNCOV
3358
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3359
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3360
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3361
    TAOS_CHECK_RETURN(
×
3362
        mndAddCheckLearnerCatchupAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[1].dnodeId));
UNCOV
3363
    TAOS_CHECK_RETURN(
×
3364
        mndAddCheckLearnerCatchupAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[2].dnodeId));
3365

3366
    // change raft type
3367
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3368
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3369
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3370
    TAOS_CHECK_RETURN(
×
3371
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3372

3373
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3374

3375
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3376
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3377
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3378
    TAOS_CHECK_RETURN(
×
3379
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3380

3381
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3382

3383
    SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
×
UNCOV
3384
    if (pVgRaw == NULL) {
×
3385
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3386
      if (terrno != 0) code = terrno;
×
3387
      TAOS_RETURN(code);
×
3388
    }
3389
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
UNCOV
3390
      sdbFreeRaw(pVgRaw);
×
3391
      TAOS_RETURN(code);
×
3392
    }
UNCOV
3393
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
UNCOV
3394
    if (code != 0) {
×
3395
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3396
             __LINE__);
UNCOV
3397
      TAOS_RETURN(code);
×
3398
    }
UNCOV
3399
  } else if (newVgroup.replica == 3 && pNewDb->cfg.replications == 1) {
×
UNCOV
3400
    mInfo("db:%s, vgId:%d, will remove 2 vnodes, vn:0 dnode:%d vn:1 dnode:%d vn:2 dnode:%d", pVgroup->dbName,
×
3401
          pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId, pVgroup->vnodeGid[1].dnodeId, pVgroup->vnodeGid[2].dnodeId);
3402

3403
    SVnodeGid del1 = {0};
×
UNCOV
3404
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroupWithoutSave(pMnode, pTrans, &newVgroup, pArray, &del1));
×
3405

3406
    TAOS_CHECK_RETURN(
×
3407
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3408

3409
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3410

3411
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del1, true));
×
3412

3413
    SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
×
UNCOV
3414
    if (pVgRaw == NULL) {
×
3415
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3416
      if (terrno != 0) code = terrno;
×
3417
      TAOS_RETURN(code);
×
3418
    }
3419
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
UNCOV
3420
      sdbFreeRaw(pVgRaw);
×
UNCOV
3421
      TAOS_RETURN(code);
×
3422
    }
3423
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
UNCOV
3424
    if (code != 0) {
×
3425
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3426
             __LINE__);
UNCOV
3427
      TAOS_RETURN(code);
×
3428
    }
3429

3430
    SVnodeGid del2 = {0};
×
UNCOV
3431
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroupWithoutSave(pMnode, pTrans, &newVgroup, pArray, &del2));
×
3432

3433
    TAOS_CHECK_RETURN(
×
3434
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3435

3436
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3437

3438
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del2, true));
×
3439

3440
    pVgRaw = mndVgroupActionEncode(&newVgroup);
×
UNCOV
3441
    if (pVgRaw == NULL) {
×
3442
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3443
      if (terrno != 0) code = terrno;
×
3444
      TAOS_RETURN(code);
×
3445
    }
3446
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
UNCOV
3447
      sdbFreeRaw(pVgRaw);
×
UNCOV
3448
      TAOS_RETURN(code);
×
3449
    }
UNCOV
3450
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
UNCOV
3451
    if (code != 0) {
×
3452
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3453
             __LINE__);
UNCOV
3454
      TAOS_RETURN(code);
×
3455
    }
3456
  } else {
3457
    return -1;
×
3458
  }
3459

UNCOV
3460
  mndSortVnodeGid(&newVgroup);
×
3461

3462
  {
3463
    SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
×
UNCOV
3464
    if (pVgRaw == NULL) {
×
3465
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3466
      if (terrno != 0) code = terrno;
×
3467
      TAOS_RETURN(code);
×
3468
    }
3469
    if ((code = mndTransAppendCommitlog(pTrans, pVgRaw)) != 0) {
×
UNCOV
3470
      sdbFreeRaw(pVgRaw);
×
UNCOV
3471
      TAOS_RETURN(code);
×
3472
    }
3473
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
UNCOV
3474
    if (code != 0) {
×
UNCOV
3475
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3476
             __LINE__);
UNCOV
3477
      TAOS_RETURN(code);
×
3478
    }
3479
  }
3480

UNCOV
3481
  TAOS_RETURN(code);
×
3482
}
3483

3484
int32_t mndBuildRestoreAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *db, SVgObj *pVgroup, SDnodeObj *pDnode,
2,065✔
3485
                                         SDnodeObj *pAnotherDnode) {
3486
  int32_t code = 0;
2,065✔
3487
  SVgObj  newVgroup = {0};
2,065✔
3488
  memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
2,065✔
3489

3490
  mInfo("trans:%d, db:%s, vgId:%d, restore vnodes, vn:0 dnode:%d", pTrans->id, pVgroup->dbName, pVgroup->vgId,
2,065✔
3491
        pVgroup->vnodeGid[0].dnodeId);
3492

3493
  if (newVgroup.replica == 1) {
2,065✔
UNCOV
3494
    int selected = 0;
×
3495
    for (int i = 0; i < newVgroup.replica; i++) {
×
3496
      newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3497
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
UNCOV
3498
        selected = i;
×
3499
      }
3500
    }
UNCOV
3501
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, &newVgroup.vnodeGid[selected]));
×
3502
  } else if (newVgroup.replica == 2) {
2,065✔
UNCOV
3503
    for (int i = 0; i < newVgroup.replica; i++) {
×
3504
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3505
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3506
      } else {
UNCOV
3507
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3508
      }
3509
    }
UNCOV
3510
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pAnotherDnode));
×
3511

UNCOV
3512
    for (int i = 0; i < newVgroup.replica; i++) {
×
3513
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3514
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3515
      } else {
UNCOV
3516
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3517
      }
3518
    }
3519
    TAOS_CHECK_RETURN(mndRestoreAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, pDnode));
×
3520

UNCOV
3521
    for (int i = 0; i < newVgroup.replica; i++) {
×
UNCOV
3522
      newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
UNCOV
3523
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3524
      }
3525
    }
UNCOV
3526
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pDnode));
×
UNCOV
3527
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pAnotherDnode));
×
3528
  } else if (newVgroup.replica == 3) {
2,065✔
3529
    for (int i = 0; i < newVgroup.replica; i++) {
8,260✔
3530
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
6,195✔
3531
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_LEARNER;
2,065✔
3532
      } else {
3533
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
4,130✔
3534
      }
3535
    }
3536
    TAOS_CHECK_RETURN(mndRestoreAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, pDnode));
2,065✔
3537

3538
    for (int i = 0; i < newVgroup.replica; i++) {
8,260✔
3539
      newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
6,195✔
3540
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
6,195✔
3541
      }
3542
    }
3543
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pDnode));
2,065✔
3544
  }
3545
  SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
2,065✔
3546
  if (pVgRaw == NULL) {
2,065✔
UNCOV
3547
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
3548
    if (terrno != 0) code = terrno;
×
3549
    TAOS_RETURN(code);
×
3550
  }
3551
  if ((code = mndTransAppendCommitlog(pTrans, pVgRaw)) != 0) {
2,065✔
UNCOV
3552
    sdbFreeRaw(pVgRaw);
×
UNCOV
3553
    TAOS_RETURN(code);
×
3554
  }
3555
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
2,065✔
3556
  if (code != 0) {
2,065✔
3557
    mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code), __LINE__);
×
UNCOV
3558
    TAOS_RETURN(code);
×
3559
  }
3560

3561
  TAOS_RETURN(code);
2,065✔
3562
}
3563

UNCOV
3564
static int32_t mndAddAdjustVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
×
UNCOV
3565
  return 0;
×
3566
}
3567

3568
typedef int32_t (*FpTransActionCb)(STrans *pTrans, SSdbRaw *pRaw);
3569

3570
static int32_t mndAddVgStatusAction(STrans *pTrans, SVgObj *pVg, ESdbStatus vgStatus, ETrnStage stage) {
73,153✔
3571
  int32_t         code = 0;
73,153✔
3572
  FpTransActionCb appendActionCb = (stage == TRN_STAGE_COMMIT_ACTION) ? mndTransAppendCommitlog : mndTransAppendRedolog;
73,153✔
3573
  SSdbRaw        *pRaw = mndVgroupActionEncode(pVg);
73,153✔
3574
  if (pRaw == NULL) {
73,153✔
3575
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
3576
    if (terrno != 0) code = terrno;
×
UNCOV
3577
    goto _err;
×
3578
  }
3579
  if ((code = appendActionCb(pTrans, pRaw)) != 0) goto _err;
73,153✔
3580
  code = sdbSetRawStatus(pRaw, vgStatus);
73,153✔
3581
  if (code != 0) {
73,153✔
UNCOV
3582
    mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", pVg->vgId, tstrerror(code), __LINE__);
×
UNCOV
3583
    goto _err;
×
3584
  }
3585
  pRaw = NULL;
73,153✔
3586
  TAOS_RETURN(code);
73,153✔
UNCOV
3587
_err:
×
UNCOV
3588
  sdbFreeRaw(pRaw);
×
3589
  TAOS_RETURN(code);
×
3590
}
3591

3592
static int32_t mndAddDbStatusAction(STrans *pTrans, SDbObj *pDb, ESdbStatus dbStatus, ETrnStage stage) {
29,533✔
3593
  int32_t         code = 0;
29,533✔
3594
  FpTransActionCb appendActionCb = (stage == TRN_STAGE_COMMIT_ACTION) ? mndTransAppendCommitlog : mndTransAppendRedolog;
29,533✔
3595
  SSdbRaw        *pRaw = mndDbActionEncode(pDb);
29,533✔
3596
  if (pRaw == NULL) {
29,533✔
3597
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
3598
    if (terrno != 0) code = terrno;
×
UNCOV
3599
    goto _err;
×
3600
  }
3601
  if ((code = appendActionCb(pTrans, pRaw)) != 0) goto _err;
29,533✔
3602
  code = sdbSetRawStatus(pRaw, dbStatus);
29,533✔
3603
  if (code != 0) {
29,533✔
UNCOV
3604
    mError("db:%s, failed to set raw status to ready, error:%s, line:%d", pDb->name, tstrerror(code), __LINE__);
×
UNCOV
3605
    goto _err;
×
3606
  }
3607
  pRaw = NULL;
29,533✔
3608
  TAOS_RETURN(code);
29,533✔
UNCOV
3609
_err:
×
UNCOV
3610
  sdbFreeRaw(pRaw);
×
UNCOV
3611
  TAOS_RETURN(code);
×
3612
}
3613

3614
int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) {
16,453✔
3615
  int32_t code = -1;
16,453✔
3616
  STrans *pTrans = NULL;
16,453✔
3617
  SDbObj  dbObj = {0};
16,453✔
3618
  SArray *pArray = mndBuildDnodesArray(pMnode, 0, NULL);
16,453✔
3619

3620
#if defined(USE_SHARED_STORAGE)
3621
  if (tsSsEnabled) {
16,453✔
UNCOV
3622
    code = TSDB_CODE_OPS_NOT_SUPPORT;
×
UNCOV
3623
    mError("vgId:%d, db:%s, shared storage exists, split vgroup not allowed", pVgroup->vgId, pVgroup->dbName);
×
UNCOV
3624
    goto _OVER;
×
3625
  }
3626
#endif
3627

3628
  /*
3629
    if (pDb->cfg.withArbitrator) {
3630
      code = TSDB_CODE_OPS_NOT_SUPPORT;
3631
      mError("vgId:%d, db:%s, with arbitrator, split vgroup not allowed", pVgroup->vgId, pVgroup->dbName);
3632
      goto _OVER;
3633
    }
3634
  */
3635

3636
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "split-vgroup");
16,453✔
3637
  if (pTrans == NULL) {
16,453✔
UNCOV
3638
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
3639
    if (terrno != 0) code = terrno;
×
UNCOV
3640
    goto _OVER;
×
3641
  }
3642
  mndTransSetSerial(pTrans);
16,453✔
3643
  mInfo("trans:%d, used to split vgroup, vgId:%d", pTrans->id, pVgroup->vgId);
16,453✔
3644

3645
  mndTransSetDbName(pTrans, pDb->name, NULL);
16,453✔
3646
  TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
16,453✔
3647
  TAOS_CHECK_GOTO(mndTransCheckConflictWithRetention(pMnode, pTrans), NULL, _OVER);
16,390✔
3648

3649
  SVgObj newVg1 = {0};
16,390✔
3650
  memcpy(&newVg1, pVgroup, sizeof(SVgObj));
16,390✔
3651
  mInfo("vgId:%d, vgroup info before split, replica:%d hashBegin:%u hashEnd:%u", newVg1.vgId, newVg1.replica,
16,390✔
3652
        newVg1.hashBegin, newVg1.hashEnd);
3653
  for (int32_t i = 0; i < newVg1.replica; ++i) {
53,491✔
3654
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg1.vgId, i, newVg1.vnodeGid[i].dnodeId);
37,101✔
3655
  }
3656

3657
  if (newVg1.replica == 1) {
16,390✔
3658
    TAOS_CHECK_GOTO(mndAddVnodeToVgroup(pMnode, pTrans, &newVg1, pArray), NULL, _OVER);
5,695✔
3659

3660
    newVg1.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
5,695✔
3661
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId), NULL,
5,695✔
3662
                    _OVER);
3663
    TAOS_CHECK_GOTO(mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg1, &newVg1.vnodeGid[1]), NULL, _OVER);
5,695✔
3664

3665
    newVg1.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
5,695✔
3666
    TAOS_CHECK_GOTO(mndAddAlterVnodeTypeAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[1].dnodeId), NULL, _OVER);
5,695✔
3667
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId), NULL,
5,695✔
3668
                    _OVER);
3669

3670
    TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1), NULL, _OVER);
5,695✔
3671
  } else if (newVg1.replica == 3) {
10,695✔
3672
    SVnodeGid del1 = {0};
10,016✔
3673
    TAOS_CHECK_GOTO(mndRemoveVnodeFromVgroup(pMnode, pTrans, &newVg1, pArray, &del1), NULL, _OVER);
10,016✔
3674
    TAOS_CHECK_GOTO(mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg1, &del1, true), NULL, _OVER);
9,741✔
3675
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId), NULL,
9,741✔
3676
                    _OVER);
3677
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[1].dnodeId), NULL,
9,741✔
3678
                    _OVER);
3679
  } else {
3680
    // goto _OVER;
3681
  }
3682

3683
  for (int32_t i = 0; i < newVg1.replica; ++i) {
48,345✔
3684
    TAOS_CHECK_GOTO(mndAddDisableVnodeWriteAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[i].dnodeId), NULL,
32,230✔
3685
                    _OVER);
3686
  }
3687
  TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1), NULL, _OVER);
16,115✔
3688

3689
  SVgObj newVg2 = {0};
16,115✔
3690
  memcpy(&newVg2, &newVg1, sizeof(SVgObj));
16,115✔
3691
  newVg1.replica = 1;
16,115✔
3692
  newVg1.hashEnd = newVg1.hashBegin / 2 + newVg1.hashEnd / 2;
16,115✔
3693
  memset(&newVg1.vnodeGid[1], 0, sizeof(SVnodeGid));
16,115✔
3694

3695
  newVg2.replica = 1;
16,115✔
3696
  newVg2.hashBegin = newVg1.hashEnd + 1;
16,115✔
3697
  memcpy(&newVg2.vnodeGid[0], &newVg2.vnodeGid[1], sizeof(SVnodeGid));
16,115✔
3698
  memset(&newVg2.vnodeGid[1], 0, sizeof(SVnodeGid));
16,115✔
3699

3700
  mInfo("vgId:%d, vgroup info after split, replica:%d hashrange:[%u, %u] vnode:0 dnode:%d", newVg1.vgId, newVg1.replica,
16,115✔
3701
        newVg1.hashBegin, newVg1.hashEnd, newVg1.vnodeGid[0].dnodeId);
3702
  for (int32_t i = 0; i < newVg1.replica; ++i) {
32,230✔
3703
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg1.vgId, i, newVg1.vnodeGid[i].dnodeId);
16,115✔
3704
  }
3705
  mInfo("vgId:%d, vgroup info after split, replica:%d hashrange:[%u, %u] vnode:0 dnode:%d", newVg2.vgId, newVg2.replica,
16,115✔
3706
        newVg2.hashBegin, newVg2.hashEnd, newVg2.vnodeGid[0].dnodeId);
3707
  for (int32_t i = 0; i < newVg1.replica; ++i) {
32,230✔
3708
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg2.vgId, i, newVg2.vnodeGid[i].dnodeId);
16,115✔
3709
  }
3710

3711
  // alter vgId and hash range
3712
  int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
16,115✔
3713
  int32_t srcVgId = newVg1.vgId;
16,115✔
3714
  newVg1.vgId = maxVgId;
16,115✔
3715
  TAOS_CHECK_GOTO(mndAddNewVgPrepareAction(pMnode, pTrans, &newVg1), NULL, _OVER);
16,115✔
3716
  TAOS_CHECK_GOTO(mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg1), NULL, _OVER);
16,115✔
3717

3718
  maxVgId++;
16,115✔
3719
  srcVgId = newVg2.vgId;
16,115✔
3720
  newVg2.vgId = maxVgId;
16,115✔
3721
  TAOS_CHECK_GOTO(mndAddNewVgPrepareAction(pMnode, pTrans, &newVg2), NULL, _OVER);
16,115✔
3722
  TAOS_CHECK_GOTO(mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg2), NULL, _OVER);
16,115✔
3723

3724
  TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1), NULL, _OVER);
16,115✔
3725
  TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg2), NULL, _OVER);
16,115✔
3726

3727
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg1, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION), NULL, _OVER);
16,115✔
3728
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg2, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION), NULL, _OVER);
16,115✔
3729
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, pVgroup, SDB_STATUS_DROPPED, TRN_STAGE_REDO_ACTION), NULL, _OVER);
16,115✔
3730

3731
  // update db status
3732
  memcpy(&dbObj, pDb, sizeof(SDbObj));
16,115✔
3733
  if (dbObj.cfg.pRetensions != NULL) {
16,115✔
UNCOV
3734
    dbObj.cfg.pRetensions = taosArrayDup(pDb->cfg.pRetensions, NULL);
×
UNCOV
3735
    if (dbObj.cfg.pRetensions == NULL) {
×
UNCOV
3736
      code = terrno;
×
UNCOV
3737
      goto _OVER;
×
3738
    }
3739
  }
3740
  dbObj.vgVersion++;
16,115✔
3741
  dbObj.updateTime = taosGetTimestampMs();
16,115✔
3742
  dbObj.cfg.numOfVgroups++;
16,115✔
3743
  TAOS_CHECK_GOTO(mndAddDbStatusAction(pTrans, &dbObj, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION), NULL, _OVER);
16,115✔
3744

3745
  // adjust vgroup replica
3746
  if (pDb->cfg.replications != newVg1.replica) {
16,115✔
3747
    SVgObj tmpGroup = {0};
10,420✔
3748
    TAOS_CHECK_GOTO(mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg1, pArray, &tmpGroup), NULL, _OVER);
10,420✔
3749
  } else {
3750
    TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg1, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
5,695✔
3751
  }
3752

3753
  if (pDb->cfg.replications != newVg2.replica) {
14,779✔
3754
    SVgObj tmpGroup = {0};
9,084✔
3755
    TAOS_CHECK_GOTO(mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg2, pArray, &tmpGroup), NULL, _OVER);
9,084✔
3756
  } else {
3757
    TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg2, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
5,695✔
3758
  }
3759

3760
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, pVgroup, SDB_STATUS_DROPPED, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
13,418✔
3761

3762
  // commit db status
3763
  dbObj.vgVersion++;
13,418✔
3764
  dbObj.updateTime = taosGetTimestampMs();
13,418✔
3765
  TAOS_CHECK_GOTO(mndAddDbStatusAction(pTrans, &dbObj, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
13,418✔
3766

3767
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
13,418✔
3768
  code = 0;
13,418✔
3769

3770
_OVER:
16,453✔
3771
  taosArrayDestroy(pArray);
16,453✔
3772
  mndTransDrop(pTrans);
16,453✔
3773
  taosArrayDestroy(dbObj.cfg.pRetensions);
16,453✔
3774
  TAOS_RETURN(code);
16,453✔
3775
}
3776

3777
extern int32_t mndProcessSplitVgroupMsgImp(SRpcMsg *pReq);
3778

3779
static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) { return mndProcessSplitVgroupMsgImp(pReq); }
16,821✔
3780

3781
#ifndef TD_ENTERPRISE
3782
int32_t mndProcessSplitVgroupMsgImp(SRpcMsg *pReq) { return 0; }
3783
#endif
3784

3785
static int32_t mndSetBalanceVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
14,448✔
3786
                                              SDnodeObj *pSrc, SDnodeObj *pDst) {
3787
  int32_t code = 0;
14,448✔
3788
  SVgObj  newVg = {0};
14,448✔
3789
  memcpy(&newVg, pVgroup, sizeof(SVgObj));
14,448✔
3790
  mInfo("vgId:%d, vgroup info before balance, replica:%d", newVg.vgId, newVg.replica);
14,448✔
3791
  for (int32_t i = 0; i < newVg.replica; ++i) {
42,584✔
3792
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
28,136✔
3793
  }
3794

3795
  TAOS_CHECK_RETURN(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pDst->id));
14,448✔
3796
  TAOS_CHECK_RETURN(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pSrc->id));
14,448✔
3797

3798
  {
3799
    SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
14,448✔
3800
    if (pRaw == NULL) {
14,448✔
UNCOV
3801
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
3802
      if (terrno != 0) code = terrno;
×
3803
      TAOS_RETURN(code);
×
3804
    }
3805
    if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
14,448✔
UNCOV
3806
      sdbFreeRaw(pRaw);
×
UNCOV
3807
      TAOS_RETURN(code);
×
3808
    }
3809
    code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
14,448✔
3810
    if (code != 0) {
14,448✔
UNCOV
3811
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
UNCOV
3812
      TAOS_RETURN(code);
×
3813
    }
3814
  }
3815

3816
  mInfo("vgId:%d, vgroup info after balance, replica:%d", newVg.vgId, newVg.replica);
14,448✔
3817
  for (int32_t i = 0; i < newVg.replica; ++i) {
42,584✔
3818
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
28,136✔
3819
  }
3820
  TAOS_RETURN(code);
14,448✔
3821
}
3822

3823
static int32_t mndBalanceVgroupBetweenDnode(SMnode *pMnode, STrans *pTrans, SDnodeObj *pSrc, SDnodeObj *pDst,
14,448✔
3824
                                            SHashObj *pBalancedVgroups) {
3825
  void   *pIter = NULL;
14,448✔
3826
  int32_t code = -1;
14,448✔
3827
  SSdb   *pSdb = pMnode->pSdb;
14,448✔
3828

3829
  while (1) {
8,903✔
3830
    SVgObj *pVgroup = NULL;
23,351✔
3831
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
23,351✔
3832
    if (pIter == NULL) break;
23,351✔
3833
    if (taosHashGet(pBalancedVgroups, &pVgroup->vgId, sizeof(int32_t)) != NULL) {
23,351✔
3834
      sdbRelease(pSdb, pVgroup);
8,210✔
3835
      continue;
8,210✔
3836
    }
3837

3838
    bool existInSrc = false;
15,141✔
3839
    bool existInDst = false;
15,141✔
3840
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
43,970✔
3841
      SVnodeGid *pGid = &pVgroup->vnodeGid[i];
28,829✔
3842
      if (pGid->dnodeId == pSrc->id) existInSrc = true;
28,829✔
3843
      if (pGid->dnodeId == pDst->id) existInDst = true;
28,829✔
3844
    }
3845

3846
    if (!existInSrc || existInDst) {
15,141✔
3847
      sdbRelease(pSdb, pVgroup);
693✔
3848
      continue;
693✔
3849
    }
3850

3851
    SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
14,448✔
3852
    if (pDb == NULL) {
14,448✔
3853
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
3854
      if (terrno != 0) code = terrno;
×
UNCOV
3855
      mError("vgId:%d, balance vgroup can't find db obj dbName:%s", pVgroup->vgId, pVgroup->dbName);
×
UNCOV
3856
      goto _OUT;
×
3857
    }
3858

3859
    if (pDb->cfg.withArbitrator) {
14,448✔
UNCOV
3860
      mInfo("vgId:%d, db:%s, with arbitrator, balance vgroup not allowed", pVgroup->vgId, pVgroup->dbName);
×
UNCOV
3861
      goto _OUT;
×
3862
    }
3863

3864
    code = mndSetBalanceVgroupInfoToTrans(pMnode, pTrans, pDb, pVgroup, pSrc, pDst);
14,448✔
3865
    if (code == 0) {
14,448✔
3866
      code = taosHashPut(pBalancedVgroups, &pVgroup->vgId, sizeof(int32_t), &pVgroup->vgId, sizeof(int32_t));
14,448✔
3867
    }
3868

3869
  _OUT:
14,448✔
3870
    mndReleaseDb(pMnode, pDb);
14,448✔
3871
    sdbRelease(pSdb, pVgroup);
14,448✔
3872
    sdbCancelFetch(pSdb, pIter);
14,448✔
3873
    break;
14,448✔
3874
  }
3875

3876
  return code;
14,448✔
3877
}
3878

3879
static int32_t mndBalanceVgroup(SMnode *pMnode, SRpcMsg *pReq, SArray *pArray) {
9,667✔
3880
  int32_t   code = -1;
9,667✔
3881
  int32_t   numOfVgroups = 0;
9,667✔
3882
  STrans   *pTrans = NULL;
9,667✔
3883
  SHashObj *pBalancedVgroups = NULL;
9,667✔
3884

3885
  pBalancedVgroups = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
9,667✔
3886
  if (pBalancedVgroups == NULL) goto _OVER;
9,667✔
3887

3888
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "balance-vgroup");
9,667✔
3889
  if (pTrans == NULL) {
9,667✔
UNCOV
3890
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
3891
    if (terrno != 0) code = terrno;
×
UNCOV
3892
    goto _OVER;
×
3893
  }
3894
  mndTransSetSerial(pTrans);
9,667✔
3895
  mInfo("trans:%d, used to balance vgroup", pTrans->id);
9,667✔
3896
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
9,667✔
3897
  TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
9,037✔
3898
  TAOS_CHECK_GOTO(mndTransCheckConflictWithRetention(pMnode, pTrans), NULL, _OVER);
8,974✔
3899

3900
  while (1) {
14,448✔
3901
    taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
23,422✔
3902
    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
101,151✔
3903
      SDnodeObj *pDnode = taosArrayGet(pArray, i);
77,729✔
3904
      mInfo("dnode:%d, equivalent vnodes:%d others:%d support:%d, score:%f", pDnode->id, pDnode->numOfVnodes,
77,729✔
3905
            pDnode->numOfSupportVnodes, pDnode->numOfOtherNodes, mndGetDnodeScore(pDnode, 0, 1));
3906
    }
3907

3908
    SDnodeObj *pSrc = taosArrayGet(pArray, taosArrayGetSize(pArray) - 1);
23,422✔
3909
    SDnodeObj *pDst = taosArrayGet(pArray, 0);
23,422✔
3910

3911
    float srcScore = mndGetDnodeScore(pSrc, -1, 1);
23,422✔
3912
    float dstScore = mndGetDnodeScore(pDst, 1, 1);
23,422✔
3913
    mInfo("trans:%d, after balance, src dnode:%d score:%f, dst dnode:%d score:%f", pTrans->id, pSrc->id, dstScore,
23,422✔
3914
          pDst->id, dstScore);
3915

3916
    if (srcScore > dstScore - 0.000001) {
23,422✔
3917
      code = mndBalanceVgroupBetweenDnode(pMnode, pTrans, pSrc, pDst, pBalancedVgroups);
14,448✔
3918
      if (code == 0) {
14,448✔
3919
        pSrc->numOfVnodes--;
14,448✔
3920
        pDst->numOfVnodes++;
14,448✔
3921
        numOfVgroups++;
14,448✔
3922
        continue;
14,448✔
3923
      } else {
UNCOV
3924
        mInfo("trans:%d, no vgroup need to balance from dnode:%d to dnode:%d", pTrans->id, pSrc->id, pDst->id);
×
UNCOV
3925
        break;
×
3926
      }
3927
    } else {
3928
      mInfo("trans:%d, no vgroup need to balance any more", pTrans->id);
8,974✔
3929
      break;
8,974✔
3930
    }
3931
  }
3932

3933
  if (numOfVgroups <= 0) {
8,974✔
UNCOV
3934
    mInfo("no need to balance vgroup");
×
UNCOV
3935
    code = 0;
×
3936
  } else {
3937
    mInfo("start to balance vgroup, numOfVgroups:%d", numOfVgroups);
8,974✔
3938
    if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
8,974✔
3939
    code = TSDB_CODE_ACTION_IN_PROGRESS;
8,974✔
3940
  }
3941

3942
_OVER:
9,667✔
3943
  taosHashCleanup(pBalancedVgroups);
9,667✔
3944
  mndTransDrop(pTrans);
9,667✔
3945
  TAOS_RETURN(code);
9,667✔
3946
}
3947

3948
static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) {
11,085✔
3949
  SMnode *pMnode = pReq->info.node;
11,085✔
3950
  int32_t code = -1;
11,085✔
3951
  SArray *pArray = NULL;
11,085✔
3952
  void   *pIter = NULL;
11,085✔
3953
  int64_t curMs = taosGetTimestampMs();
11,085✔
3954
  int64_t tss = taosGetTimestampMs();
11,085✔
3955

3956
  SBalanceVgroupReq req = {0};
11,085✔
3957
  if (tDeserializeSBalanceVgroupReq(pReq->pCont, pReq->contLen, &req) != 0) {
11,085✔
UNCOV
3958
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
3959
    goto _OVER;
×
3960
  }
3961

3962
  mInfo("start to balance vgroup");
11,085✔
3963
  if ((code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_BALANCE_VGROUP)) != 0) {
11,085✔
UNCOV
3964
    goto _OVER;
×
3965
  }
3966

3967
  if (sdbGetSize(pMnode->pSdb, SDB_MOUNT) > 0) {
11,085✔
UNCOV
3968
    code = TSDB_CODE_MND_MOUNT_NOT_EMPTY;
×
UNCOV
3969
    goto _OVER;
×
3970
  }
3971

3972
  while (1) {
33,874✔
3973
    SDnodeObj *pDnode = NULL;
44,959✔
3974
    pIter = sdbFetch(pMnode->pSdb, SDB_DNODE, pIter, (void **)&pDnode);
44,959✔
3975
    if (pIter == NULL) break;
44,959✔
3976
    if (!mndIsDnodeOnline(pDnode, curMs)) {
35,292✔
3977
      sdbCancelFetch(pMnode->pSdb, pIter);
1,418✔
3978
      code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
1,418✔
3979
      mError("failed to balance vgroup since %s, dnode:%d", terrstr(), pDnode->id);
1,418✔
3980
      sdbRelease(pMnode->pSdb, pDnode);
1,418✔
3981
      goto _OVER;
1,418✔
3982
    }
3983

3984
    sdbRelease(pMnode->pSdb, pDnode);
33,874✔
3985
  }
3986

3987
  pArray = mndBuildDnodesArray(pMnode, 0, NULL);
9,667✔
3988
  if (pArray == NULL) {
9,667✔
UNCOV
3989
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
3990
    if (terrno != 0) code = terrno;
×
UNCOV
3991
    goto _OVER;
×
3992
  }
3993

3994
  if (taosArrayGetSize(pArray) < 2) {
9,667✔
UNCOV
3995
    mInfo("no need to balance vgroup since dnode num less than 2");
×
UNCOV
3996
    code = 0;
×
3997
  } else {
3998
    code = mndBalanceVgroup(pMnode, pReq, pArray);
9,667✔
3999
  }
4000

4001
  if (tsAuditLevel >= AUDIT_LEVEL_CLUSTER) {
9,667✔
4002
    int64_t tse = taosGetTimestampMs();
9,667✔
4003
    double  duration = (double)(tse - tss);
9,667✔
4004
    duration = duration / 1000;
9,667✔
4005
    auditRecord(pReq, pMnode->clusterId, "balanceVgroup", "", "", req.sql, req.sqlLen, duration, 0);
9,667✔
4006
  }
4007

4008
_OVER:
11,085✔
4009
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
11,085✔
4010
    mError("failed to balance vgroup since %s", tstrerror(code));
2,111✔
4011
  }
4012

4013
  taosArrayDestroy(pArray);
11,085✔
4014
  tFreeSBalanceVgroupReq(&req);
11,085✔
4015
  TAOS_RETURN(code);
11,085✔
4016
}
4017

4018
bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; }
106,139,255✔
4019

4020
bool mndVgroupInDnode(SVgObj *pVgroup, int32_t dnodeId) {
3,304✔
4021
  for (int i = 0; i < pVgroup->replica; i++) {
8,244✔
4022
    if (pVgroup->vnodeGid[i].dnodeId == dnodeId) return true;
7,005✔
4023
  }
4024
  return false;
1,239✔
4025
}
4026

4027
static void *mndBuildCompactVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen, int64_t compactTs,
124,539✔
4028
                                     STimeWindow tw, bool metaOnly, bool force, ETsdbOpType type,
4029
                                     ETriggerType triggerType) {
4030
  SCompactVnodeReq compactReq = {0};
124,539✔
4031
  compactReq.dbUid = pDb->uid;
124,539✔
4032
  compactReq.compactStartTime = compactTs;
124,539✔
4033
  compactReq.tw = tw;
124,539✔
4034
  compactReq.metaOnly = metaOnly;
124,539✔
4035
  compactReq.force = force;
124,539✔
4036
  compactReq.optrType = type;
124,539✔
4037
  compactReq.triggerType = triggerType;
124,539✔
4038
  tstrncpy(compactReq.db, pDb->name, TSDB_DB_FNAME_LEN);
124,539✔
4039

4040
  mInfo("vgId:%d, build compact vnode config req", pVgroup->vgId);
124,539✔
4041
  int32_t contLen = tSerializeSCompactVnodeReq(NULL, 0, &compactReq);
124,539✔
4042
  if (contLen < 0) {
124,539✔
4043
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
4044
    return NULL;
×
4045
  }
4046
  contLen += sizeof(SMsgHead);
124,539✔
4047

4048
  void *pReq = taosMemoryMalloc(contLen);
124,539✔
4049
  if (pReq == NULL) {
124,539✔
UNCOV
4050
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
4051
    return NULL;
×
4052
  }
4053

4054
  SMsgHead *pHead = pReq;
124,539✔
4055
  pHead->contLen = htonl(contLen);
124,539✔
4056
  pHead->vgId = htonl(pVgroup->vgId);
124,539✔
4057

4058
  if (tSerializeSCompactVnodeReq((char *)pReq + sizeof(SMsgHead), contLen, &compactReq) < 0) {
124,539✔
UNCOV
4059
    taosMemoryFree(pReq);
×
UNCOV
4060
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
4061
    return NULL;
×
4062
  }
4063
  *pContLen = contLen;
124,539✔
4064
  return pReq;
124,539✔
4065
}
4066

4067
static int32_t mndAddCompactVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs,
55,544✔
4068
                                        STimeWindow tw, bool metaOnly, bool force, ETsdbOpType type,
4069
                                        ETriggerType triggerType) {
4070
  int32_t      code = 0;
55,544✔
4071
  STransAction action = {0};
55,544✔
4072
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
55,544✔
4073

4074
  int32_t contLen = 0;
55,544✔
4075
  void   *pReq =
4076
      mndBuildCompactVnodeReq(pMnode, pDb, pVgroup, &contLen, compactTs, tw, metaOnly, force, type, triggerType);
55,544✔
4077
  if (pReq == NULL) {
55,544✔
UNCOV
4078
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
4079
    if (terrno != 0) code = terrno;
×
4080
    TAOS_RETURN(code);
×
4081
  }
4082

4083
  action.pCont = pReq;
55,544✔
4084
  action.contLen = contLen;
55,544✔
4085
  action.msgType = TDMT_VND_COMPACT;
55,544✔
4086

4087
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
55,544✔
UNCOV
4088
    taosMemoryFree(pReq);
×
UNCOV
4089
    TAOS_RETURN(code);
×
4090
  }
4091

4092
  TAOS_RETURN(code);
55,544✔
4093
}
4094

4095
int32_t mndBuildCompactVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs,
55,544✔
4096
                                    STimeWindow tw, bool metaOnly, bool force, ETsdbOpType type,
4097
                                    ETriggerType triggerType) {
4098
  TAOS_CHECK_RETURN(
55,544✔
4099
      mndAddCompactVnodeAction(pMnode, pTrans, pDb, pVgroup, compactTs, tw, metaOnly, force, type, triggerType));
4100
  return 0;
55,544✔
4101
}
4102

4103
int32_t mndBuildTrimVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t startTs,
68,995✔
4104
                                 STimeWindow tw, ETsdbOpType type, ETriggerType triggerType) {
4105
  int32_t      code = 0;
68,995✔
4106
  STransAction action = {0};
68,995✔
4107
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
68,995✔
4108

4109
  int32_t contLen = 0;
68,995✔
4110
  // reuse SCompactVnodeReq as SVTrimDbReq
4111
  void *pReq = mndBuildCompactVnodeReq(pMnode, pDb, pVgroup, &contLen, startTs, tw, false, false, type, triggerType);
68,995✔
4112
  if (pReq == NULL) {
68,995✔
UNCOV
4113
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
4114
    if (terrno != 0) code = terrno;
×
4115
    TAOS_RETURN(code);
×
4116
  }
4117

4118
  action.pCont = pReq;
68,995✔
4119
  action.contLen = contLen;
68,995✔
4120
  action.msgType = TDMT_VND_TRIM;
68,995✔
4121

4122
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
68,995✔
UNCOV
4123
    taosMemoryFree(pReq);
×
UNCOV
4124
    TAOS_RETURN(code);
×
4125
  }
4126

4127
  TAOS_RETURN(code);
68,995✔
4128
}
4129

4130
static int32_t mndProcessSetVgroupKeepVersionReq(SRpcMsg *pReq) {
1,081✔
4131
  SMnode *pMnode = pReq->info.node;
1,081✔
4132
  int32_t code = TSDB_CODE_SUCCESS;
1,081✔
4133
  STrans *pTrans = NULL;
1,081✔
4134
  SVgObj *pVgroup = NULL;
1,081✔
4135

4136
  SMndSetVgroupKeepVersionReq req = {0};
1,081✔
4137
  if (tDeserializeSMndSetVgroupKeepVersionReq(pReq->pCont, pReq->contLen, &req) != 0) {
1,081✔
4138
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
4139
    goto _OVER;
×
4140
  }
4141

4142
  mInfo("start to set vgroup keep version, vgId:%d, keepVersion:%" PRId64, req.vgId, req.keepVersion);
1,081✔
4143

4144
  // Check permission
4145
  if ((code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_WRITE_DB)) != 0) {
1,081✔
4146
    goto _OVER;
×
4147
  }
4148

4149
  // Get vgroup
4150
  pVgroup = mndAcquireVgroup(pMnode, req.vgId);
1,081✔
4151
  if (pVgroup == NULL) {
1,081✔
4152
    code = TSDB_CODE_MND_VGROUP_NOT_EXIST;
×
4153
    mError("vgId:%d not exist, failed to set keep version", req.vgId);
×
4154
    goto _OVER;
×
4155
  }
4156

4157
  // Create transaction
4158
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "set-vgroup-keep-version");
1,081✔
4159
  if (pTrans == NULL) {
1,081✔
UNCOV
4160
    code = terrno != 0 ? terrno : TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
4161
    mndReleaseVgroup(pMnode, pVgroup);
×
UNCOV
4162
    goto _OVER;
×
4163
  }
4164

4165
  mndTransSetSerial(pTrans);
1,081✔
4166
  mInfo("trans:%d, used to set vgroup keep version, vgId:%d keepVersion:%" PRId64, pTrans->id, req.vgId,
1,081✔
4167
        req.keepVersion);
4168

4169
  // Update SVgObj's keepVersion in mnode
4170
  SVgObj newVgroup = {0};
1,081✔
4171
  memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
1,081✔
4172
  newVgroup.keepVersion = req.keepVersion;
1,081✔
4173
  newVgroup.keepVersionTime = taosGetTimestampMs();
1,081✔
4174

4175
  // Add prepare log for SDB vgroup update (execute in PREPARE stage, before redo actions)
4176
  SSdbRaw *pCommitRaw = mndVgroupActionEncode(&newVgroup);
1,081✔
4177
  if (pCommitRaw == NULL) {
1,081✔
4178
    code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
4179
    mndReleaseVgroup(pMnode, pVgroup);
×
UNCOV
4180
    goto _OVER;
×
4181
  }
4182
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
1,081✔
4183
    code = terrno;
×
4184
    sdbFreeRaw(pCommitRaw);
×
UNCOV
4185
    mndReleaseVgroup(pMnode, pVgroup);
×
UNCOV
4186
    goto _OVER;
×
4187
  }
4188
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
1,081✔
UNCOV
4189
    mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", pVgroup->vgId, tstrerror(code), __LINE__);
×
UNCOV
4190
    sdbFreeRaw(pCommitRaw);
×
UNCOV
4191
    mndReleaseVgroup(pMnode, pVgroup);
×
UNCOV
4192
    goto _OVER;
×
4193
  }
4194

4195
  // Prepare message for vnodes
4196
  SVndSetKeepVersionReq vndReq = {.keepVersion = req.keepVersion};
1,081✔
4197
  int32_t               reqLen = tSerializeSVndSetKeepVersionReq(NULL, 0, &vndReq);
1,081✔
4198
  int32_t               contLen = reqLen + sizeof(SMsgHead);
1,081✔
4199

4200
  // Send to all replicas of the vgroup
4201
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
4,324✔
4202
    SMsgHead *pHead = taosMemoryMalloc(contLen);
3,243✔
4203
    if (pHead == NULL) {
3,243✔
UNCOV
4204
      code = TSDB_CODE_OUT_OF_MEMORY;
×
4205
      mndReleaseVgroup(pMnode, pVgroup);
×
4206
      goto _OVER;
×
4207
    }
4208

4209
    pHead->contLen = htonl(contLen);
3,243✔
4210
    pHead->vgId = htonl(pVgroup->vgId);
3,243✔
4211

4212
    if (tSerializeSVndSetKeepVersionReq((char *)pHead + sizeof(SMsgHead), reqLen, &vndReq) < 0) {
3,243✔
UNCOV
4213
      taosMemoryFree(pHead);
×
4214
      code = TSDB_CODE_OUT_OF_MEMORY;
×
4215
      mndReleaseVgroup(pMnode, pVgroup);
×
4216
      goto _OVER;
×
4217
    }
4218

4219
    // Get dnode and add action to transaction
4220
    SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgroup->vnodeGid[i].dnodeId);
3,243✔
4221
    if (pDnode == NULL) {
3,243✔
UNCOV
4222
      taosMemoryFree(pHead);
×
UNCOV
4223
      code = TSDB_CODE_MND_DNODE_NOT_EXIST;
×
UNCOV
4224
      mndReleaseVgroup(pMnode, pVgroup);
×
UNCOV
4225
      goto _OVER;
×
4226
    }
4227

4228
    STransAction action = {0};
3,243✔
4229
    action.epSet = mndGetDnodeEpset(pDnode);
3,243✔
4230
    mndReleaseDnode(pMnode, pDnode);
3,243✔
4231
    action.pCont = pHead;
3,243✔
4232
    action.contLen = contLen;
3,243✔
4233
    action.msgType = TDMT_VND_SET_KEEP_VERSION;
3,243✔
4234
    action.acceptableCode = TSDB_CODE_VND_STOPPED;
3,243✔
4235

4236
    if (mndTransAppendRedoAction(pTrans, &action) != 0) {
3,243✔
UNCOV
4237
      taosMemoryFree(pHead);
×
UNCOV
4238
      code = terrno;
×
UNCOV
4239
      mndReleaseVgroup(pMnode, pVgroup);
×
4240
      goto _OVER;
×
4241
    }
4242
  }
4243

4244
  mndReleaseVgroup(pMnode, pVgroup);
1,081✔
4245

4246
  // Prepare and execute transaction
4247
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
1,081✔
UNCOV
4248
    goto _OVER;
×
4249
  }
4250

4251
  code = TSDB_CODE_ACTION_IN_PROGRESS;
1,081✔
4252

4253
_OVER:
1,081✔
4254
  if (pTrans != NULL) mndTransDrop(pTrans);
1,081✔
4255

4256
  return code;
1,081✔
4257
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc