• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4945

30 Jan 2026 06:19AM UTC coverage: 66.87% (+0.02%) from 66.849%
#4945

push

travis-ci

web-flow
merge: from main to 3.0 #34453

1126 of 2018 new or added lines in 72 files covered. (55.8%)

13708 existing lines in 159 files now uncovered.

205277 of 306978 relevant lines covered (66.87%)

126353544.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.86
/source/dnode/mnode/impl/src/mndVgroup.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndVgroup.h"
18
#include "audit.h"
19
#include "mndArbGroup.h"
20
#include "mndDb.h"
21
#include "mndDnode.h"
22
#include "mndEncryptAlgr.h"
23
#include "mndMnode.h"
24
#include "mndPrivilege.h"
25
#include "mndShow.h"
26
#include "mndStb.h"
27
#include "mndStream.h"
28
#include "mndTopic.h"
29
#include "mndTrans.h"
30
#include "mndUser.h"
31
#include "tmisce.h"
32

33
#define VGROUP_VER_COMPAT_MOUNT_KEEP_VER 2
34
#define VGROUP_VER_NUMBER                VGROUP_VER_COMPAT_MOUNT_KEEP_VER
35
#define VGROUP_RESERVE_SIZE              60
36
// since 3.3.6.32/3.3.8.6 mountId + keepVersion + keepVersionTime + VGROUP_RESERVE_SIZE = 4 + 8 + 8 + 60 = 80
37
#define DLEN_AFTER_SYNC_CONF_CHANGE_VER 80
38

39
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup);
40
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup);
41
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew);
42
static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw);
43

44
static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
45
static void    mndCancelGetNextVgroup(SMnode *pMnode, void *pIter);
46
static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
47
static void    mndCancelGetNextVnode(SMnode *pMnode, void *pIter);
48

49
static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq);
50
static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq);
51
static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq);
52
static int32_t mndProcessVgroupBalanceLeaderMsg(SRpcMsg *pReq);
53
static int32_t mndProcessSetVgroupKeepVersionReq(SRpcMsg *pReq);
54

55
int32_t mndInitVgroup(SMnode *pMnode) {
400,272✔
56
  SSdbTable table = {
400,272✔
57
      .sdbType = SDB_VGROUP,
58
      .keyType = SDB_KEY_INT32,
59
      .encodeFp = (SdbEncodeFp)mndVgroupActionEncode,
60
      .decodeFp = (SdbDecodeFp)mndVgroupActionDecode,
61
      .insertFp = (SdbInsertFp)mndVgroupActionInsert,
62
      .updateFp = (SdbUpdateFp)mndVgroupActionUpdate,
63
      .deleteFp = (SdbDeleteFp)mndVgroupActionDelete,
64
      .validateFp = (SdbValidateFp)mndNewVgActionValidate,
65
  };
66

67
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndTransProcessRsp);
400,272✔
68
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_REPLICA_RSP, mndTransProcessRsp);
400,272✔
69
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIG_RSP, mndTransProcessRsp);
400,272✔
70
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIRM_RSP, mndTransProcessRsp);
400,272✔
71
  mndSetMsgHandle(pMnode, TDMT_VND_SET_KEEP_VERSION_RSP, mndTransProcessRsp);
400,272✔
72
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_HASHRANGE_RSP, mndTransProcessRsp);
400,272✔
73
  mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndTransProcessRsp);
400,272✔
74
  mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_RSP, mndTransProcessRsp);
400,272✔
75
  mndSetMsgHandle(pMnode, TDMT_VND_SCAN_RSP, mndTransProcessRsp);
400,272✔
76
  mndSetMsgHandle(pMnode, TDMT_VND_DISABLE_WRITE_RSP, mndTransProcessRsp);
400,272✔
77
  mndSetMsgHandle(pMnode, TDMT_SYNC_FORCE_FOLLOWER_RSP, mndTransProcessRsp);
400,272✔
78
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_ELECTBASELINE_RSP, mndTransProcessRsp);
400,272✔
79
  
80
  mndSetMsgHandle(pMnode, TDMT_DND_ALTER_VNODE_TYPE_RSP, mndTransProcessRsp);
400,272✔
81
  mndSetMsgHandle(pMnode, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP_RSP, mndTransProcessRsp);
400,272✔
82
  mndSetMsgHandle(pMnode, TDMT_SYNC_CONFIG_CHANGE_RSP, mndTransProcessRsp);
400,272✔
83

84
  mndSetMsgHandle(pMnode, TDMT_MND_REDISTRIBUTE_VGROUP, mndProcessRedistributeVgroupMsg);
400,272✔
85
  mndSetMsgHandle(pMnode, TDMT_MND_SPLIT_VGROUP, mndProcessSplitVgroupMsg);
400,272✔
86
  // mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessVgroupBalanceLeaderMsg);
87
  mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessBalanceVgroupMsg);
400,272✔
88
  mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP_LEADER, mndProcessVgroupBalanceLeaderMsg);
400,272✔
89
  mndSetMsgHandle(pMnode, TDMT_MND_SET_VGROUP_KEEP_VERSION, mndProcessSetVgroupKeepVersionReq);
400,272✔
90

91
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndRetrieveVgroups);
400,272✔
92
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndCancelGetNextVgroup);
400,272✔
93
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndRetrieveVnodes);
400,272✔
94
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndCancelGetNextVnode);
400,272✔
95

96
  return sdbSetTable(pMnode->pSdb, table);
400,272✔
97
}
98

99
void mndCleanupVgroup(SMnode *pMnode) {}
400,214✔
100

101
SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) {
10,509,332✔
102
  int32_t code = 0;
10,509,332✔
103
  int32_t lino = 0;
10,509,332✔
104
  terrno = TSDB_CODE_OUT_OF_MEMORY;
10,509,332✔
105

106
  SSdbRaw *pRaw = sdbAllocRaw(SDB_VGROUP, VGROUP_VER_NUMBER, sizeof(SVgObj) + VGROUP_RESERVE_SIZE);
10,509,332✔
107
  if (pRaw == NULL) goto _OVER;
10,509,332✔
108

109
  int32_t dataPos = 0;
10,509,332✔
110
  SDB_SET_INT32(pRaw, dataPos, pVgroup->vgId, _OVER)
10,509,332✔
111
  SDB_SET_INT64(pRaw, dataPos, pVgroup->createdTime, _OVER)
10,509,332✔
112
  SDB_SET_INT64(pRaw, dataPos, pVgroup->updateTime, _OVER)
10,509,332✔
113
  SDB_SET_INT32(pRaw, dataPos, pVgroup->version, _OVER)
10,509,332✔
114
  SDB_SET_INT32(pRaw, dataPos, pVgroup->hashBegin, _OVER)
10,509,332✔
115
  SDB_SET_INT32(pRaw, dataPos, pVgroup->hashEnd, _OVER)
10,509,332✔
116
  SDB_SET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, _OVER)
10,509,332✔
117
  SDB_SET_INT64(pRaw, dataPos, pVgroup->dbUid, _OVER)
10,509,332✔
118
  SDB_SET_INT8(pRaw, dataPos, pVgroup->isTsma, _OVER)
10,509,332✔
119
  SDB_SET_INT8(pRaw, dataPos, pVgroup->replica, _OVER)
10,509,332✔
120
  for (int8_t i = 0; i < pVgroup->replica; ++i) {
22,793,176✔
121
    SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
12,283,844✔
122
    SDB_SET_INT32(pRaw, dataPos, pVgid->dnodeId, _OVER)
12,283,844✔
123
  }
124
  SDB_SET_INT32(pRaw, dataPos, pVgroup->syncConfChangeVer, _OVER)
10,509,332✔
125
  SDB_SET_INT32(pRaw, dataPos, pVgroup->mountVgId, _OVER)
10,509,332✔
126
  SDB_SET_INT64(pRaw, dataPos, pVgroup->keepVersion, _OVER)
10,509,332✔
127
  SDB_SET_INT64(pRaw, dataPos, pVgroup->keepVersionTime, _OVER)
10,509,332✔
128
  SDB_SET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER)
10,509,332✔
129
  SDB_SET_DATALEN(pRaw, dataPos, _OVER)
10,509,332✔
130

131
  terrno = 0;
10,509,332✔
132

133
_OVER:
10,509,332✔
134
  if (terrno != 0) {
10,509,332✔
135
    mError("vgId:%d, failed to encode to raw:%p since %s", pVgroup->vgId, pRaw, terrstr());
×
136
    sdbFreeRaw(pRaw);
×
137
    return NULL;
×
138
  }
139

140
  mTrace("vgId:%d, encode to raw:%p, row:%p", pVgroup->vgId, pRaw, pVgroup);
10,509,332✔
141
  return pRaw;
10,509,332✔
142
}
143

144
SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
9,456,727✔
145
  int32_t code = 0;
9,456,727✔
146
  int32_t lino = 0;
9,456,727✔
147
  terrno = TSDB_CODE_OUT_OF_MEMORY;
9,456,727✔
148
  SSdbRow *pRow = NULL;
9,456,727✔
149
  SVgObj  *pVgroup = NULL;
9,456,727✔
150

151
  int8_t sver = 0;
9,456,727✔
152
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
9,456,727✔
153

154
  if (sver < 1 || sver > VGROUP_VER_NUMBER) {
9,456,727✔
155
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
156
    goto _OVER;
×
157
  }
158

159
  pRow = sdbAllocRow(sizeof(SVgObj));
9,456,727✔
160
  if (pRow == NULL) goto _OVER;
9,456,727✔
161

162
  pVgroup = sdbGetRowObj(pRow);
9,456,727✔
163
  if (pVgroup == NULL) goto _OVER;
9,456,727✔
164

165
  int32_t dataPos = 0;
9,456,727✔
166
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->vgId, _OVER)
9,456,727✔
167
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->createdTime, _OVER)
9,456,727✔
168
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->updateTime, _OVER)
9,456,727✔
169
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->version, _OVER)
9,456,727✔
170
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashBegin, _OVER)
9,456,727✔
171
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashEnd, _OVER)
9,456,727✔
172
  SDB_GET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, _OVER)
9,456,727✔
173
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->dbUid, _OVER)
9,456,727✔
174
  SDB_GET_INT8(pRaw, dataPos, &pVgroup->isTsma, _OVER)
9,456,727✔
175
  SDB_GET_INT8(pRaw, dataPos, &pVgroup->replica, _OVER)
9,456,727✔
176
  for (int8_t i = 0; i < pVgroup->replica; ++i) {
20,628,894✔
177
    SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
11,172,167✔
178
    SDB_GET_INT32(pRaw, dataPos, &pVgid->dnodeId, _OVER)
11,172,167✔
179
    if (pVgroup->replica == 1) {
11,172,167✔
180
      pVgid->syncState = TAOS_SYNC_STATE_LEADER;
8,560,191✔
181
    }
182
    pVgid->snapSeq = -1;
11,172,167✔
183
  }
184
  if (dataPos + 2 * sizeof(int32_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen) {
9,456,727✔
185
    SDB_GET_INT32(pRaw, dataPos, &pVgroup->syncConfChangeVer, _OVER)
9,456,727✔
186
  }
187

188
  int32_t dlenAfterSyncConfChangeVer = pRaw->dataLen - dataPos;
9,456,727✔
189
  if (dataPos + sizeof(int32_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen) {
9,456,727✔
190
    SDB_GET_INT32(pRaw, dataPos, &pVgroup->mountVgId, _OVER)
9,456,727✔
191
  }
192
  if (dataPos + sizeof(int64_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen) {
9,456,727✔
193
    SDB_GET_INT64(pRaw, dataPos, &pVgroup->keepVersion, _OVER)
9,456,727✔
194
  }
195
  if (dataPos + sizeof(int64_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen) {
9,456,727✔
196
    SDB_GET_INT64(pRaw, dataPos, &pVgroup->keepVersionTime, _OVER)
9,456,727✔
197
  }
198
  if (dataPos + VGROUP_RESERVE_SIZE <= pRaw->dataLen) {
9,456,727✔
199
    SDB_GET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER)
9,456,727✔
200
  }
201

202
  if (sver < VGROUP_VER_COMPAT_MOUNT_KEEP_VER) {
9,456,727✔
203
    if (dlenAfterSyncConfChangeVer == DLEN_AFTER_SYNC_CONF_CHANGE_VER) {
×
204
      pVgroup->mountVgId = 0;
×
205
    }
206
    pVgroup->keepVersion = -1;
×
207
    pVgroup->keepVersionTime = 0;
×
208
  }
209

210
  terrno = 0;
9,456,727✔
211

212
_OVER:
9,456,727✔
213
  if (terrno != 0) {
9,456,727✔
214
    mError("vgId:%d, failed to decode from raw:%p since %s", pVgroup == NULL ? 0 : pVgroup->vgId, pRaw, terrstr());
×
215
    taosMemoryFreeClear(pRow);
×
216
    return NULL;
×
217
  }
218

219
  mTrace("vgId:%d, decode from raw:%p, row:%p", pVgroup->vgId, pRaw, pVgroup);
9,456,727✔
220
  return pRow;
9,456,727✔
221
}
222

223
static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw) {
2,481,591✔
224
  SSdb    *pSdb = pMnode->pSdb;
2,481,591✔
225
  SSdbRow *pRow = NULL;
2,481,591✔
226
  SVgObj  *pVgroup = NULL;
2,481,591✔
227
  int      code = -1;
2,481,591✔
228

229
  pRow = mndVgroupActionDecode(pRaw);
2,481,591✔
230
  if (pRow == NULL) {
2,481,591✔
231
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
232
    if (terrno != 0) code = terrno;
×
233
    goto _OVER;
×
234
  }
235
  pVgroup = sdbGetRowObj(pRow);
2,481,591✔
236
  if (pVgroup == NULL) {
2,481,591✔
237
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
238
    if (terrno != 0) code = terrno;
×
239
    goto _OVER;
×
240
  }
241

242
  int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
2,481,591✔
243
  if (maxVgId > pVgroup->vgId) {
2,481,591✔
244
    mError("trans:%d, vgroup id %d already in use. maxVgId:%d", pTrans->id, pVgroup->vgId, maxVgId);
×
245
    goto _OVER;
×
246
  }
247

248
  code = 0;
2,481,591✔
249
_OVER:
2,481,591✔
250
  if (pVgroup) mndVgroupActionDelete(pSdb, pVgroup);
2,481,591✔
251
  taosMemoryFreeClear(pRow);
2,481,591✔
252
  TAOS_RETURN(code);
2,481,591✔
253
}
254

255
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup) {
2,767,324✔
256
  mTrace("vgId:%d, perform insert action, row:%p", pVgroup->vgId, pVgroup);
2,767,324✔
257
  return 0;
2,767,324✔
258
}
259

260
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup) {
9,446,231✔
261
  mTrace("vgId:%d, perform delete action, row:%p", pVgroup->vgId, pVgroup);
9,446,231✔
262
  return 0;
9,446,231✔
263
}
264

265
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew) {
2,871,944✔
266
  mTrace("vgId:%d, perform update action, old row:%p new row:%p", pOld->vgId, pOld, pNew);
2,871,944✔
267
  pOld->updateTime = pNew->updateTime;
2,871,944✔
268
  pOld->version = pNew->version;
2,871,944✔
269
  pOld->hashBegin = pNew->hashBegin;
2,871,944✔
270
  pOld->hashEnd = pNew->hashEnd;
2,871,944✔
271
  pOld->replica = pNew->replica;
2,871,944✔
272
  pOld->isTsma = pNew->isTsma;
2,871,944✔
273
  pOld->keepVersion = pNew->keepVersion;
2,871,944✔
274
  pOld->keepVersionTime = pNew->keepVersionTime;
2,871,944✔
275
  for (int32_t i = 0; i < pNew->replica; ++i) {
6,644,467✔
276
    SVnodeGid *pNewGid = &pNew->vnodeGid[i];
3,772,523✔
277
    for (int32_t j = 0; j < pOld->replica; ++j) {
10,308,936✔
278
      SVnodeGid *pOldGid = &pOld->vnodeGid[j];
6,536,413✔
279
      if (pNewGid->dnodeId == pOldGid->dnodeId) {
6,536,413✔
280
        pNewGid->syncState = pOldGid->syncState;
3,552,750✔
281
        pNewGid->syncRestore = pOldGid->syncRestore;
3,552,750✔
282
        pNewGid->syncCanRead = pOldGid->syncCanRead;
3,552,750✔
283
        pNewGid->syncAppliedIndex = pOldGid->syncAppliedIndex;
3,552,750✔
284
        pNewGid->syncCommitIndex = pOldGid->syncCommitIndex;
3,552,750✔
285
        pNewGid->bufferSegmentUsed = pOldGid->bufferSegmentUsed;
3,552,750✔
286
        pNewGid->bufferSegmentSize = pOldGid->bufferSegmentSize;
3,552,750✔
287
        pNewGid->learnerProgress = pOldGid->learnerProgress;
3,552,750✔
288
        pNewGid->snapSeq = pOldGid->snapSeq;
3,552,750✔
289
        pNewGid->syncTotalIndex = pOldGid->syncTotalIndex;
3,552,750✔
290
      }
291
    }
292
  }
293
  pNew->numOfTables = pOld->numOfTables;
2,871,944✔
294
  pNew->numOfTimeSeries = pOld->numOfTimeSeries;
2,871,944✔
295
  pNew->totalStorage = pOld->totalStorage;
2,871,944✔
296
  pNew->compStorage = pOld->compStorage;
2,871,944✔
297
  pNew->pointsWritten = pOld->pointsWritten;
2,871,944✔
298
  pNew->compact = pOld->compact;
2,871,944✔
299
  memcpy(pOld->vnodeGid, pNew->vnodeGid, (TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA) * sizeof(SVnodeGid));
2,871,944✔
300
  pOld->syncConfChangeVer = pNew->syncConfChangeVer;
2,871,944✔
301
  tstrncpy(pOld->dbName, pNew->dbName, TSDB_DB_FNAME_LEN);
2,871,944✔
302
  return 0;
2,871,944✔
303
}
304

305
SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId) {
121,120,428✔
306
  SSdb   *pSdb = pMnode->pSdb;
121,120,428✔
307
  SVgObj *pVgroup = sdbAcquire(pSdb, SDB_VGROUP, &vgId);
121,120,428✔
308
  if (pVgroup == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
121,120,428✔
309
    terrno = TSDB_CODE_MND_VGROUP_NOT_EXIST;
966,275✔
310
  }
311
  return pVgroup;
121,120,428✔
312
}
313

314
void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) {
120,347,788✔
315
  SSdb *pSdb = pMnode->pSdb;
120,347,788✔
316
  sdbRelease(pSdb, pVgroup);
120,347,788✔
317
}
120,347,788✔
318

319
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
2,751,236✔
320
  SCreateVnodeReq createReq = {0};
2,751,236✔
321
  createReq.vgId = pVgroup->vgId;
2,751,236✔
322
  memcpy(createReq.db, pDb->name, TSDB_DB_FNAME_LEN);
2,751,236✔
323
  createReq.dbUid = pDb->uid;
2,751,236✔
324
  createReq.vgVersion = pVgroup->version;
2,751,236✔
325
  createReq.numOfStables = pDb->cfg.numOfStables;
2,751,236✔
326
  createReq.buffer = pDb->cfg.buffer;
2,751,236✔
327
  createReq.pageSize = pDb->cfg.pageSize;
2,751,236✔
328
  createReq.pages = pDb->cfg.pages;
2,751,236✔
329
  createReq.cacheLastSize = pDb->cfg.cacheLastSize;
2,751,236✔
330
  createReq.daysPerFile = pDb->cfg.daysPerFile;
2,751,236✔
331
  createReq.daysToKeep0 = pDb->cfg.daysToKeep0;
2,751,236✔
332
  createReq.daysToKeep1 = pDb->cfg.daysToKeep1;
2,751,236✔
333
  createReq.daysToKeep2 = pDb->cfg.daysToKeep2;
2,751,236✔
334
  createReq.keepTimeOffset = pDb->cfg.keepTimeOffset;
2,751,236✔
335
  createReq.ssChunkSize = pDb->cfg.ssChunkSize;
2,751,236✔
336
  createReq.ssKeepLocal = pDb->cfg.ssKeepLocal;
2,751,236✔
337
  createReq.ssCompact = pDb->cfg.ssCompact;
2,751,236✔
338
  createReq.minRows = pDb->cfg.minRows;
2,751,236✔
339
  createReq.maxRows = pDb->cfg.maxRows;
2,751,236✔
340
  createReq.walFsyncPeriod = pDb->cfg.walFsyncPeriod;
2,751,236✔
341
  createReq.walLevel = pDb->cfg.walLevel;
2,751,236✔
342
  createReq.precision = pDb->cfg.precision;
2,751,236✔
343
  createReq.compression = pDb->cfg.compression;
2,751,236✔
344
  createReq.strict = pDb->cfg.strict;
2,751,236✔
345
  createReq.cacheLast = pDb->cfg.cacheLast;
2,751,236✔
346
  createReq.replica = 0;
2,751,236✔
347
  createReq.learnerReplica = 0;
2,751,236✔
348
  createReq.selfIndex = -1;
2,751,236✔
349
  createReq.learnerSelfIndex = -1;
2,751,236✔
350
  createReq.hashBegin = pVgroup->hashBegin;
2,751,236✔
351
  createReq.hashEnd = pVgroup->hashEnd;
2,751,236✔
352
  createReq.hashMethod = pDb->cfg.hashMethod;
2,751,236✔
353
  createReq.numOfRetensions = pDb->cfg.numOfRetensions;
2,751,236✔
354
  createReq.pRetensions = pDb->cfg.pRetensions;
2,751,236✔
355
  createReq.isTsma = pVgroup->isTsma;
2,751,236✔
356
  createReq.pTsma = pVgroup->pTsma;
2,751,236✔
357
  createReq.walRetentionPeriod = pDb->cfg.walRetentionPeriod;
2,751,236✔
358
  createReq.walRetentionSize = pDb->cfg.walRetentionSize;
2,751,236✔
359
  createReq.walRollPeriod = pDb->cfg.walRollPeriod;
2,751,236✔
360
  createReq.walSegmentSize = pDb->cfg.walSegmentSize;
2,751,236✔
361
  createReq.sstTrigger = pDb->cfg.sstTrigger;
2,751,236✔
362
  createReq.hashPrefix = pDb->cfg.hashPrefix;
2,751,236✔
363
  createReq.hashSuffix = pDb->cfg.hashSuffix;
2,751,236✔
364
  createReq.tsdbPageSize = pDb->cfg.tsdbPageSize;
2,751,236✔
365
  createReq.changeVersion = ++(pVgroup->syncConfChangeVer);
2,751,236✔
366
  // createReq.encryptAlgorithm = pDb->cfg.encryptAlgorithm;
367
  memset(createReq.encryptAlgrName, 0, TSDB_ENCRYPT_ALGR_NAME_LEN);
2,751,236✔
368
  if (pDb->cfg.encryptAlgorithm > 0) {
2,751,236✔
369
    mndGetEncryptOsslAlgrNameById(pMnode, pDb->cfg.encryptAlgorithm, createReq.encryptAlgrName);
3,092✔
370
  }
371
  createReq.isAudit = pDb->cfg.isAudit ? 1 : 0;
2,751,236✔
372
  createReq.allowDrop = pDb->cfg.allowDrop;
2,751,236✔
373
  int32_t code = 0;
2,751,236✔
374

375
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
6,520,337✔
376
    SReplica *pReplica = NULL;
3,769,101✔
377

378
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
3,769,101✔
379
      pReplica = &createReq.replicas[createReq.replica];
3,665,453✔
380
    } else {
381
      pReplica = &createReq.learnerReplicas[createReq.learnerReplica];
103,648✔
382
    }
383

384
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
3,769,101✔
385
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
3,769,101✔
386
    if (pVgidDnode == NULL) {
3,769,101✔
UNCOV
387
      return NULL;
×
388
    }
389

390
    pReplica->id = pVgidDnode->id;
3,769,101✔
391
    pReplica->port = pVgidDnode->port;
3,769,101✔
392
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
3,769,101✔
393
    mndReleaseDnode(pMnode, pVgidDnode);
3,769,101✔
394

395
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
3,769,101✔
396
      if (pDnode->id == pVgid->dnodeId) {
3,665,453✔
397
        createReq.selfIndex = createReq.replica;
2,647,588✔
398
      }
399
    } else {
400
      if (pDnode->id == pVgid->dnodeId) {
103,648✔
401
        createReq.learnerSelfIndex = createReq.learnerReplica;
103,648✔
402
      }
403
    }
404

405
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
3,769,101✔
406
      createReq.replica++;
3,665,453✔
407
    } else {
408
      createReq.learnerReplica++;
103,648✔
409
    }
410
  }
411

412
  if (createReq.selfIndex == -1 && createReq.learnerSelfIndex == -1) {
2,751,236✔
UNCOV
413
    terrno = TSDB_CODE_APP_ERROR;
×
UNCOV
414
    return NULL;
×
415
  }
416

417
  createReq.changeVersion = pVgroup->syncConfChangeVer;
2,751,236✔
418

419
  mInfo(
2,751,236✔
420
      "vgId:%d, build create vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d "
421
      "changeVersion:%d",
422
      createReq.vgId, createReq.replica, createReq.selfIndex, createReq.learnerReplica, createReq.learnerSelfIndex,
423
      createReq.strict, createReq.changeVersion);
424
  for (int32_t i = 0; i < createReq.replica; ++i) {
6,416,689✔
425
    mInfo("vgId:%d, replica:%d ep:%s:%u", createReq.vgId, i, createReq.replicas[i].fqdn, createReq.replicas[i].port);
3,665,453✔
426
  }
427
  for (int32_t i = 0; i < createReq.learnerReplica; ++i) {
2,854,884✔
428
    mInfo("vgId:%d, replica:%d ep:%s:%u", createReq.vgId, i, createReq.learnerReplicas[i].fqdn,
103,648✔
429
          createReq.learnerReplicas[i].port);
430
  }
431

432
  int32_t contLen = tSerializeSCreateVnodeReq(NULL, 0, &createReq);
2,751,236✔
433
  if (contLen < 0) {
2,751,236✔
UNCOV
434
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
435
    return NULL;
×
436
  }
437

438
  void *pReq = taosMemoryMalloc(contLen);
2,751,236✔
439
  if (pReq == NULL) {
2,751,236✔
UNCOV
440
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
441
    return NULL;
×
442
  }
443

444
  code = tSerializeSCreateVnodeReq(pReq, contLen, &createReq);
2,751,236✔
445
  if (code < 0) {
2,751,236✔
UNCOV
446
    terrno = TSDB_CODE_APP_ERROR;
×
UNCOV
447
    taosMemoryFree(pReq);
×
448
    mError("vgId:%d, failed to serialize create vnode req,since %s", createReq.vgId, terrstr());
×
449
    return NULL;
×
450
  }
451
  *pContLen = contLen;
2,751,236✔
452
  return pReq;
2,751,236✔
453
}
454

455
static void *mndBuildAlterVnodeConfigReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
187,098✔
456
  SAlterVnodeConfigReq alterReq = {0};
187,098✔
457
  alterReq.vgVersion = pVgroup->version;
187,098✔
458
  alterReq.buffer = pDb->cfg.buffer;
187,098✔
459
  alterReq.pageSize = pDb->cfg.pageSize;
187,098✔
460
  alterReq.pages = pDb->cfg.pages;
187,098✔
461
  alterReq.cacheLastSize = pDb->cfg.cacheLastSize;
187,098✔
462
  alterReq.daysPerFile = pDb->cfg.daysPerFile;
187,098✔
463
  alterReq.daysToKeep0 = pDb->cfg.daysToKeep0;
187,098✔
464
  alterReq.daysToKeep1 = pDb->cfg.daysToKeep1;
187,098✔
465
  alterReq.daysToKeep2 = pDb->cfg.daysToKeep2;
187,098✔
466
  alterReq.keepTimeOffset = pDb->cfg.keepTimeOffset;
187,098✔
467
  alterReq.walFsyncPeriod = pDb->cfg.walFsyncPeriod;
187,098✔
468
  alterReq.walLevel = pDb->cfg.walLevel;
187,098✔
469
  alterReq.strict = pDb->cfg.strict;
187,098✔
470
  alterReq.cacheLast = pDb->cfg.cacheLast;
187,098✔
471
  alterReq.sttTrigger = pDb->cfg.sstTrigger;
187,098✔
472
  alterReq.minRows = pDb->cfg.minRows;
187,098✔
473
  alterReq.walRetentionPeriod = pDb->cfg.walRetentionPeriod;
187,098✔
474
  alterReq.walRetentionSize = pDb->cfg.walRetentionSize;
187,098✔
475
  alterReq.ssKeepLocal = pDb->cfg.ssKeepLocal;
187,098✔
476
  alterReq.ssCompact = pDb->cfg.ssCompact;
187,098✔
477
  alterReq.allowDrop = (int8_t)pDb->cfg.allowDrop;
187,098✔
478

479
  mInfo("vgId:%d, build alter vnode config req", pVgroup->vgId);
187,098✔
480
  int32_t contLen = tSerializeSAlterVnodeConfigReq(NULL, 0, &alterReq);
187,098✔
481
  if (contLen < 0) {
187,098✔
UNCOV
482
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
483
    return NULL;
×
484
  }
485
  contLen += sizeof(SMsgHead);
187,098✔
486

487
  void *pReq = taosMemoryMalloc(contLen);
187,098✔
488
  if (pReq == NULL) {
187,098✔
UNCOV
489
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
490
    return NULL;
×
491
  }
492

493
  SMsgHead *pHead = pReq;
187,098✔
494
  pHead->contLen = htonl(contLen);
187,098✔
495
  pHead->vgId = htonl(pVgroup->vgId);
187,098✔
496

497
  if (tSerializeSAlterVnodeConfigReq((char *)pReq + sizeof(SMsgHead), contLen, &alterReq) < 0) {
187,098✔
UNCOV
498
    taosMemoryFree(pReq);
×
UNCOV
499
    mError("vgId:%d, failed to serialize alter vnode config req,since %s", pVgroup->vgId, terrstr());
×
UNCOV
500
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
501
    return NULL;
×
502
  }
503
  *pContLen = contLen;
187,098✔
504
  return pReq;
187,098✔
505
}
506

507
static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId,
786,788✔
508
                                          int32_t *pContLen) {
509
  SAlterVnodeReplicaReq alterReq = {
1,573,576✔
510
      .vgId = pVgroup->vgId,
786,788✔
511
      .strict = pDb->cfg.strict,
786,788✔
512
      .replica = 0,
513
      .learnerReplica = 0,
514
      .selfIndex = -1,
515
      .learnerSelfIndex = -1,
516
      .changeVersion = ++(pVgroup->syncConfChangeVer),
1,573,576✔
517
  };
518

519
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
3,221,275✔
520
    SReplica *pReplica = NULL;
2,434,487✔
521

522
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
2,434,487✔
523
      pReplica = &alterReq.replicas[alterReq.replica];
2,243,249✔
524
      alterReq.replica++;
2,243,249✔
525
    } else {
526
      pReplica = &alterReq.learnerReplicas[alterReq.learnerReplica];
191,238✔
527
      alterReq.learnerReplica++;
191,238✔
528
    }
529

530
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
2,434,487✔
531
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
2,434,487✔
532
    if (pVgidDnode == NULL) return NULL;
2,434,487✔
533

534
    pReplica->id = pVgidDnode->id;
2,434,487✔
535
    pReplica->port = pVgidDnode->port;
2,434,487✔
536
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
2,434,487✔
537
    mndReleaseDnode(pMnode, pVgidDnode);
2,434,487✔
538

539
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
2,434,487✔
540
      if (dnodeId == pVgid->dnodeId) {
2,243,249✔
541
        alterReq.selfIndex = v;
786,788✔
542
      }
543
    } else {
544
      if (dnodeId == pVgid->dnodeId) {
191,238✔
UNCOV
545
        alterReq.learnerSelfIndex = v;
×
546
      }
547
    }
548
  }
549

550
  mInfo(
786,788✔
551
      "vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d "
552
      "changeVersion:%d",
553
      alterReq.vgId, alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica, alterReq.learnerSelfIndex,
554
      alterReq.strict, alterReq.changeVersion);
555
  for (int32_t i = 0; i < alterReq.replica; ++i) {
3,030,037✔
556
    mInfo("vgId:%d, replica:%d ep:%s:%u", alterReq.vgId, i, alterReq.replicas[i].fqdn, alterReq.replicas[i].port);
2,243,249✔
557
  }
558
  for (int32_t i = 0; i < alterReq.learnerReplica; ++i) {
978,026✔
559
    mInfo("vgId:%d, learnerReplica:%d ep:%s:%u", alterReq.vgId, i, alterReq.learnerReplicas[i].fqdn,
191,238✔
560
          alterReq.learnerReplicas[i].port);
561
  }
562

563
  if (alterReq.selfIndex == -1 && alterReq.learnerSelfIndex == -1) {
786,788✔
UNCOV
564
    terrno = TSDB_CODE_APP_ERROR;
×
UNCOV
565
    return NULL;
×
566
  }
567

568
  int32_t contLen = tSerializeSAlterVnodeReplicaReq(NULL, 0, &alterReq);
786,788✔
569
  if (contLen < 0) {
786,788✔
UNCOV
570
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
571
    return NULL;
×
572
  }
573

574
  void *pReq = taosMemoryMalloc(contLen);
786,788✔
575
  if (pReq == NULL) {
786,788✔
UNCOV
576
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
577
    return NULL;
×
578
  }
579

580
  if (tSerializeSAlterVnodeReplicaReq(pReq, contLen, &alterReq) < 0) {
786,788✔
UNCOV
581
    mError("vgId:%d, failed to serialize alter vnode req,since %s", alterReq.vgId, terrstr());
×
UNCOV
582
    taosMemoryFree(pReq);
×
UNCOV
583
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
584
    return NULL;
×
585
  }
586
  *pContLen = contLen;
786,788✔
587
  return pReq;
786,788✔
588
}
589

UNCOV
590
static void *mndBuildCheckLearnCatchupReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId,
×
591
                                          int32_t *pContLen) {
UNCOV
592
  SCheckLearnCatchupReq req = {
×
593
      .vgId = pVgroup->vgId,
×
UNCOV
594
      .strict = pDb->cfg.strict,
×
595
      .replica = 0,
596
      .learnerReplica = 0,
597
      .selfIndex = -1,
598
      .learnerSelfIndex = -1,
599
  };
600

UNCOV
601
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
×
UNCOV
602
    SReplica *pReplica = NULL;
×
603

604
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
605
      pReplica = &req.replicas[req.replica];
×
UNCOV
606
      req.replica++;
×
607
    } else {
608
      pReplica = &req.learnerReplicas[req.learnerReplica];
×
609
      req.learnerReplica++;
×
610
    }
611

612
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
×
UNCOV
613
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
×
UNCOV
614
    if (pVgidDnode == NULL) return NULL;
×
615

616
    pReplica->id = pVgidDnode->id;
×
617
    pReplica->port = pVgidDnode->port;
×
UNCOV
618
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
×
619
    mndReleaseDnode(pMnode, pVgidDnode);
×
620

621
    if (pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
622
      if (dnodeId == pVgid->dnodeId) {
×
UNCOV
623
        req.selfIndex = v;
×
624
      }
625
    } else {
626
      if (dnodeId == pVgid->dnodeId) {
×
UNCOV
627
        req.learnerSelfIndex = v;
×
628
      }
629
    }
630
  }
631

UNCOV
632
  mInfo("vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d",
×
633
        req.vgId, req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict);
UNCOV
634
  for (int32_t i = 0; i < req.replica; ++i) {
×
635
    mInfo("vgId:%d, replica:%d ep:%s:%u", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port);
×
636
  }
637
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
×
638
    mInfo("vgId:%d, learnerReplica:%d ep:%s:%u", req.vgId, i, req.learnerReplicas[i].fqdn, req.learnerReplicas[i].port);
×
639
  }
640

641
  if (req.selfIndex == -1 && req.learnerSelfIndex == -1) {
×
UNCOV
642
    terrno = TSDB_CODE_APP_ERROR;
×
UNCOV
643
    return NULL;
×
644
  }
645

646
  int32_t contLen = tSerializeSAlterVnodeReplicaReq(NULL, 0, &req);
×
UNCOV
647
  if (contLen < 0) {
×
UNCOV
648
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
649
    return NULL;
×
650
  }
651

652
  void *pReq = taosMemoryMalloc(contLen);
×
UNCOV
653
  if (pReq == NULL) {
×
UNCOV
654
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
655
    return NULL;
×
656
  }
657

658
  if (tSerializeSAlterVnodeReplicaReq(pReq, contLen, &req) < 0) {
×
UNCOV
659
    mError("vgId:%d, failed to serialize alter vnode req,since %s", req.vgId, terrstr());
×
UNCOV
660
    taosMemoryFree(pReq);
×
661
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
662
    return NULL;
×
663
  }
664
  *pContLen = contLen;
×
665
  return pReq;
×
666
}
667

668
static void *mndBuildDisableVnodeWriteReq(SMnode *pMnode, SDbObj *pDb, int32_t vgId, int32_t *pContLen) {
31,742✔
669
  SDisableVnodeWriteReq disableReq = {
31,742✔
670
      .vgId = vgId,
671
      .disable = 1,
672
  };
673

674
  mInfo("vgId:%d, build disable vnode write req", vgId);
31,742✔
675
  int32_t contLen = tSerializeSDisableVnodeWriteReq(NULL, 0, &disableReq);
31,742✔
676
  if (contLen < 0) {
31,742✔
UNCOV
677
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
678
    return NULL;
×
679
  }
680

681
  void *pReq = taosMemoryMalloc(contLen);
31,742✔
682
  if (pReq == NULL) {
31,742✔
UNCOV
683
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
684
    return NULL;
×
685
  }
686

687
  if (tSerializeSDisableVnodeWriteReq(pReq, contLen, &disableReq) < 0) {
31,742✔
UNCOV
688
    mError("vgId:%d, failed to serialize disable vnode write req,since %s", vgId, terrstr());
×
UNCOV
689
    taosMemoryFree(pReq);
×
UNCOV
690
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
691
    return NULL;
×
692
  }
693
  *pContLen = contLen;
31,742✔
694
  return pReq;
31,742✔
695
}
696

697
static void *mndBuildAlterVnodeHashRangeReq(SMnode *pMnode, int32_t srcVgId, SVgObj *pVgroup, int32_t *pContLen) {
31,742✔
698
  SAlterVnodeHashRangeReq alterReq = {
63,484✔
699
      .srcVgId = srcVgId,
700
      .dstVgId = pVgroup->vgId,
31,742✔
701
      .hashBegin = pVgroup->hashBegin,
31,742✔
702
      .hashEnd = pVgroup->hashEnd,
31,742✔
703
      .changeVersion = ++(pVgroup->syncConfChangeVer),
63,484✔
704
  };
705

706
  mInfo("vgId:%d, build alter vnode hashrange req, dstVgId:%d, hashrange:[%u, %u]", srcVgId, pVgroup->vgId,
31,742✔
707
        pVgroup->hashBegin, pVgroup->hashEnd);
708
  int32_t contLen = tSerializeSAlterVnodeHashRangeReq(NULL, 0, &alterReq);
31,742✔
709
  if (contLen < 0) {
31,742✔
UNCOV
710
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
711
    return NULL;
×
712
  }
713

714
  void *pReq = taosMemoryMalloc(contLen);
31,742✔
715
  if (pReq == NULL) {
31,742✔
UNCOV
716
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
717
    return NULL;
×
718
  }
719

720
  if (tSerializeSAlterVnodeHashRangeReq(pReq, contLen, &alterReq) < 0) {
31,742✔
UNCOV
721
    mError("vgId:%d, failed to serialize alter vnode hashrange req,since %s", srcVgId, terrstr());
×
UNCOV
722
    taosMemoryFree(pReq);
×
UNCOV
723
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
724
    return NULL;
×
725
  }
726
  *pContLen = contLen;
31,742✔
727
  return pReq;
31,742✔
728
}
729

730
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
4,057,989✔
731
  SDropVnodeReq dropReq = {0};
4,057,989✔
732
  dropReq.dnodeId = pDnode->id;
4,057,989✔
733
  dropReq.vgId = pVgroup->vgId;
4,057,989✔
734
  memcpy(dropReq.db, pDb->name, TSDB_DB_FNAME_LEN);
4,057,989✔
735
  dropReq.dbUid = pDb->uid;
4,057,989✔
736

737
  mInfo("vgId:%d, build drop vnode req", dropReq.vgId);
4,057,989✔
738
  int32_t contLen = tSerializeSDropVnodeReq(NULL, 0, &dropReq);
4,057,989✔
739
  if (contLen < 0) {
4,057,989✔
UNCOV
740
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
741
    return NULL;
×
742
  }
743

744
  void *pReq = taosMemoryMalloc(contLen);
4,057,989✔
745
  if (pReq == NULL) {
4,057,989✔
UNCOV
746
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
747
    return NULL;
×
748
  }
749

750
  if (tSerializeSDropVnodeReq(pReq, contLen, &dropReq) < 0) {
4,057,989✔
UNCOV
751
    mError("vgId:%d, failed to serialize drop vnode req,since %s", dropReq.vgId, terrstr());
×
UNCOV
752
    taosMemoryFree(pReq);
×
UNCOV
753
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
754
    return NULL;
×
755
  }
756
  *pContLen = contLen;
4,057,989✔
757
  return pReq;
4,057,989✔
758
}
759

760
static bool mndResetDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
1,787,584✔
761
  SDnodeObj *pDnode = pObj;
1,787,584✔
762
  pDnode->numOfVnodes = 0;
1,787,584✔
763
  pDnode->numOfOtherNodes = 0;
1,787,584✔
764
  return true;
1,787,584✔
765
}
766

767
static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
1,787,584✔
768
  SDnodeObj *pDnode = pObj;
1,787,584✔
769
  SArray    *pArray = p1;
1,787,584✔
770
  int32_t    exceptDnodeId = *(int32_t *)p2;
1,787,584✔
771
  SArray    *dnodeList = p3;
1,787,584✔
772

773
  if (exceptDnodeId == pDnode->id) {
1,787,584✔
774
    return true;
7,003✔
775
  }
776

777
  if (dnodeList != NULL) {
1,780,581✔
778
    int32_t dnodeListSize = taosArrayGetSize(dnodeList);
67,363✔
779
    if (dnodeListSize > 0) {
67,363✔
780
      bool inDnodeList = false;
67,363✔
781
      for (int32_t index = 0; index < dnodeListSize; ++index) {
219,514✔
782
        int32_t dnodeId = *(int32_t *)taosArrayGet(dnodeList, index);
152,151✔
783
        if (pDnode->id == dnodeId) {
152,151✔
784
          inDnodeList = true;
31,119✔
785
        }
786
      }
787
      if (!inDnodeList) {
67,363✔
788
        return true;
36,244✔
789
      }
790
    } else {
UNCOV
791
      return true;  // TS-6191
×
792
    }
793
  }
794

795
  int64_t curMs = taosGetTimestampMs();
1,744,337✔
796
  bool    online = mndIsDnodeOnline(pDnode, curMs);
1,744,337✔
797
  bool    isMnode = mndIsMnode(pMnode, pDnode->id);
1,744,337✔
798
  pDnode->numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
1,744,337✔
799
  pDnode->memUsed = mndGetVnodesMemory(pMnode, pDnode->id);
1,744,337✔
800

801
  mInfo("dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d online:%d memory avail:%" PRId64 " used:%" PRId64, pDnode->id,
1,744,337✔
802
        pDnode->numOfVnodes, pDnode->numOfSupportVnodes, isMnode, online, pDnode->memAvail, pDnode->memUsed);
803

804
  if (isMnode) {
1,744,337✔
805
    pDnode->numOfOtherNodes++;
1,249,400✔
806
  }
807

808
  if (online && pDnode->numOfSupportVnodes > 0) {
1,744,337✔
809
    if (taosArrayPush(pArray, pDnode) == NULL) return false;
1,699,499✔
810
  }
811
  return true;
1,744,337✔
812
}
813

UNCOV
814
static bool isDnodeInList(SArray *dnodeList, int32_t dnodeId) {
×
UNCOV
815
  int32_t dnodeListSize = taosArrayGetSize(dnodeList);
×
UNCOV
816
  for (int32_t i = 0; i < dnodeListSize; ++i) {
×
817
    int32_t id = *(int32_t *)TARRAY_GET_ELEM(dnodeList, i);
×
818
    if (id == dnodeId) {
×
819
      return true;
×
820
    }
821
  }
822
  return false;
×
823
}
824

825
#ifdef TD_ENTERPRISE
UNCOV
826
static float mndGetDnodeScore1(SDnodeObj *pDnode, int32_t additionDnodes, float ratio) {
×
UNCOV
827
  float totalDnodes = pDnode->numOfVnodes + (float)pDnode->numOfOtherNodes * ratio + additionDnodes;
×
UNCOV
828
  float result = totalDnodes / pDnode->numOfSupportVnodes;
×
829
  return pDnode->numOfVnodes > 0 ? -result : result;
×
830
}
831

832
static int32_t mndCompareDnodeVnodes1(SDnodeObj *pDnode1, SDnodeObj *pDnode2) {
×
UNCOV
833
  float d1Score = mndGetDnodeScore1(pDnode1, 0, 0.9);
×
UNCOV
834
  float d2Score = mndGetDnodeScore1(pDnode2, 0, 0.9);
×
835
  if (d1Score == d2Score) {
×
836
    if (pDnode1->id == pDnode2->id) {
×
837
      return 0;
×
838
    }
839
    return pDnode1->id > pDnode2->id ? 1 : -1;
×
840
  }
UNCOV
841
  return d1Score > d2Score ? 1 : -1;
×
842
}
843

844
static bool mndBuildDnodesListFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
×
UNCOV
845
  SDnodeObj *pDnode = pObj;
×
UNCOV
846
  SArray    *pArray = p1;
×
847

848
  bool isMnode = mndIsMnode(pMnode, pDnode->id);
×
849
  pDnode->numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
×
850

851
  if (isMnode) {
×
852
    pDnode->numOfOtherNodes++;
×
853
  }
854

855
  if (pDnode->numOfSupportVnodes > 0) {
×
UNCOV
856
    if (taosArrayPush(pArray, pDnode) == NULL) return false;
×
857
  }
858
  return true;
×
859
}
860

861
// TS-6191
862
static int32_t mndBuildNodesCheckDualReplica(SMnode *pMnode, int32_t nDnodes, SArray *dnodeList, SArray **ppDnodeList) {
1,217,096✔
863
  int32_t code = 0;
1,217,096✔
864
  if (!grantCheckDualReplicaDnodes(pMnode)) {
1,217,096✔
865
    TAOS_RETURN(code);
1,217,096✔
866
  }
UNCOV
867
  SSdb   *pSdb = pMnode->pSdb;
×
UNCOV
868
  SArray *pArray = taosArrayInit(nDnodes, sizeof(SDnodeObj));
×
UNCOV
869
  if (pArray == NULL) {
×
870
    TAOS_RETURN(code = terrno);
×
871
  }
872
  *ppDnodeList = pArray;
×
873

UNCOV
874
  sdbTraverse(pSdb, SDB_DNODE, mndResetDnodesArrayFp, NULL, NULL, NULL);
×
875
  sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesListFp, pArray, NULL, NULL);
×
876

877
  int32_t arrSize = taosArrayGetSize(pArray);
×
878
  if (arrSize <= 0) {
×
UNCOV
879
    TAOS_RETURN(code);
×
880
  }
881
  if (arrSize > 1) taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes1);
×
882

UNCOV
883
  int32_t dnodeListSize = taosArrayGetSize(dnodeList);
×
884
  if (dnodeListSize <= 0) {
×
UNCOV
885
    if (arrSize > 2) taosArrayRemoveBatch(pArray, 2, arrSize - 2, NULL);
×
886
  } else {
887
    int32_t nDnodesWithVnodes = 0;
×
888
    for (int32_t i = 0; i < arrSize; ++i) {
×
UNCOV
889
      SDnodeObj *pDnode = TARRAY_GET_ELEM(pArray, i);
×
890
      if (pDnode->numOfVnodes <= 0) {
×
891
        break;
×
892
      }
893
      ++nDnodesWithVnodes;
×
894
    }
UNCOV
895
    int32_t dnodeId = -1;
×
896
    if (nDnodesWithVnodes == 1) {
×
UNCOV
897
      dnodeId = ((SDnodeObj *)TARRAY_GET_ELEM(pArray, 0))->id;
×
898
    } else if (nDnodesWithVnodes >= 2) {
×
899
      // must select the dnodes from the 1st 2 dnodes
900
      taosArrayRemoveBatch(pArray, 2, arrSize - 2, NULL);
×
901
    }
UNCOV
902
    for (int32_t i = 0; i < TARRAY_SIZE(pArray);) {
×
903
      SDnodeObj *pDnode = taosArrayGet(pArray, i);
×
UNCOV
904
      if (!isDnodeInList(dnodeList, pDnode->id)) {
×
905
        taosArrayRemove(pArray, i);
×
906
        continue;
×
907
      }
908
      ++i;
×
909
    }
UNCOV
910
    if (nDnodesWithVnodes == 1) {
×
911
      SDnodeObj *pDnode = taosArrayGet(pArray, 0);
×
UNCOV
912
      if (pDnode && (pDnode->id != dnodeId)) {  // the first dnode is not in dnodeList, remove the last element
×
913
        taosArrayRemove(pArray, taosArrayGetSize(pArray) - 1);
×
914
      }
915
    }
916
  }
917

UNCOV
918
  TAOS_RETURN(code);
×
919
}
920
#endif
921

922
SArray *mndBuildDnodesArray(SMnode *pMnode, int32_t exceptDnodeId, SArray *dnodeList) {
1,217,096✔
923
  SSdb   *pSdb = pMnode->pSdb;
1,217,096✔
924
  int32_t numOfDnodes = mndGetDnodeSize(pMnode);
1,217,096✔
925
  SArray *tDnodeList = NULL;
1,217,096✔
926
  SArray *pDnodeList = NULL;
1,217,096✔
927

928
  SArray *pArray = taosArrayInit(numOfDnodes, sizeof(SDnodeObj));
1,217,096✔
929
  if (pArray == NULL) {
1,217,096✔
UNCOV
930
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
931
    return NULL;
×
932
  }
933
  if (taosArrayGetSize(dnodeList) > 0) {
1,217,096✔
934
    tDnodeList = dnodeList;
13,817✔
935
  }
936
#ifdef TD_ENTERPRISE
937
  if (0 != mndBuildNodesCheckDualReplica(pMnode, numOfDnodes, tDnodeList, &pDnodeList)) {
1,217,096✔
UNCOV
938
    taosArrayDestroy(pArray);
×
UNCOV
939
    return NULL;
×
940
  }
941
#endif
942
  sdbTraverse(pSdb, SDB_DNODE, mndResetDnodesArrayFp, NULL, NULL, NULL);
1,217,096✔
943
  sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesArrayFp, pArray, &exceptDnodeId, pDnodeList ? pDnodeList : tDnodeList);
1,217,096✔
944

945
  mDebug("build %d dnodes array", (int32_t)taosArrayGetSize(pArray));
1,217,096✔
946
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
2,916,595✔
947
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
1,699,499✔
948
    mDebug("dnode:%d, vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes);
1,699,499✔
949
  }
950
  taosArrayDestroy(pDnodeList);
1,217,096✔
951
  return pArray;
1,217,096✔
952
}
953

UNCOV
954
static int32_t mndCompareDnodeId(int32_t *dnode1Id, int32_t *dnode2Id) {
×
UNCOV
955
  if (*dnode1Id == *dnode2Id) {
×
UNCOV
956
    return 0;
×
957
  }
958
  return *dnode1Id > *dnode2Id ? 1 : -1;
×
959
}
960

961
static float mndGetDnodeScore(SDnodeObj *pDnode, int32_t additionDnodes, float ratio) {
8,690,254✔
962
  float totalDnodes = pDnode->numOfVnodes + (float)pDnode->numOfOtherNodes * ratio + additionDnodes;
8,690,254✔
963
  return totalDnodes / pDnode->numOfSupportVnodes;
8,690,254✔
964
}
965

966
static int32_t mndCompareDnodeVnodes(SDnodeObj *pDnode1, SDnodeObj *pDnode2) {
2,700,335✔
967
  float d1Score = mndGetDnodeScore(pDnode1, 0, 0.9);
2,700,335✔
968
  float d2Score = mndGetDnodeScore(pDnode2, 0, 0.9);
2,700,335✔
969
  if (d1Score == d2Score) {
2,700,335✔
970
    return 0;
895,322✔
971
  }
972
  return d1Score > d2Score ? 1 : -1;
1,805,013✔
973
}
974

975
void mndSortVnodeGid(SVgObj *pVgroup) {
2,390,968✔
976
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
5,085,885✔
977
    for (int32_t j = 0; j < pVgroup->replica - 1 - i; ++j) {
3,147,159✔
978
      if (pVgroup->vnodeGid[j].dnodeId > pVgroup->vnodeGid[j + 1].dnodeId) {
452,242✔
979
        TSWAP(pVgroup->vnodeGid[j], pVgroup->vnodeGid[j + 1]);
203,862✔
980
      }
981
    }
982
  }
983
}
2,390,968✔
984

985
static int32_t mndGetAvailableDnode(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, SArray *pArray) {
2,363,139✔
986
  mDebug("start to sort %d dnodes", (int32_t)taosArrayGetSize(pArray));
2,363,139✔
987
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
2,363,139✔
988
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
5,586,452✔
989
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
3,223,313✔
990
    mDebug("dnode:%d, score:%f", pDnode->id, mndGetDnodeScore(pDnode, 0, 0.9));
3,223,313✔
991
  }
992

993
  int32_t size = taosArrayGetSize(pArray);
2,363,139✔
994
  if (size < pVgroup->replica) {
2,363,139✔
995
    mError("db:%s, vgId:%d, no enough online dnodes:%d to alloc %d replica", pVgroup->dbName, pVgroup->vgId, size,
4,496✔
996
           pVgroup->replica);
997
    TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_DNODES);
4,496✔
998
  }
999

1000
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
4,964,593✔
1001
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
2,605,950✔
1002
    SDnodeObj *pDnode = taosArrayGet(pArray, v);
2,605,950✔
1003
    if (pDnode == NULL) {
2,605,950✔
UNCOV
1004
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_DNODES);
×
1005
    }
1006
    if (pDnode->numOfVnodes >= pDnode->numOfSupportVnodes) {
2,605,950✔
1007
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_VNODES);
×
1008
    }
1009

1010
    int64_t vgMem = mndGetVgroupMemory(pMnode, pDb, pVgroup);
2,605,950✔
1011
    if (pDnode->memAvail - vgMem - pDnode->memUsed <= 0) {
2,605,950✔
UNCOV
1012
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d, avail:%" PRId64 " used:%" PRId64,
×
1013
             pVgroup->dbName, pVgroup->vgId, vgMem, pDnode->id, pDnode->memAvail, pDnode->memUsed);
UNCOV
1014
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE);
×
1015
    } else {
1016
      pDnode->memUsed += vgMem;
2,605,950✔
1017
    }
1018

1019
    pVgid->dnodeId = pDnode->id;
2,605,950✔
1020
    if (pVgroup->replica == 1) {
2,605,950✔
1021
      pVgid->syncState = TAOS_SYNC_STATE_LEADER;
2,231,994✔
1022
    } else {
1023
      pVgid->syncState = TAOS_SYNC_STATE_FOLLOWER;
373,956✔
1024
    }
1025

1026
    mInfo("db:%s, vgId:%d, vn:%d is alloced, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
2,605,950✔
1027
          pVgroup->dbName, pVgroup->vgId, v, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
1028
    pDnode->numOfVnodes++;
2,605,950✔
1029
  }
1030

1031
  mndSortVnodeGid(pVgroup);
2,358,643✔
1032
  return 0;
2,358,643✔
1033
}
1034

UNCOV
1035
int32_t mndAllocSmaVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup) {
×
UNCOV
1036
  int32_t code = 0;
×
UNCOV
1037
  SArray *pArray = mndBuildDnodesArray(pMnode, 0, NULL);
×
1038
  if (pArray == NULL) {
×
1039
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1040
    if (terrno != 0) code = terrno;
×
1041
    TAOS_RETURN(code);
×
1042
  }
1043

1044
  pVgroup->vgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
×
UNCOV
1045
  pVgroup->isTsma = 1;
×
UNCOV
1046
  pVgroup->createdTime = taosGetTimestampMs();
×
1047
  pVgroup->updateTime = pVgroup->createdTime;
×
1048
  pVgroup->version = 1;
×
1049
  memcpy(pVgroup->dbName, pDb->name, TSDB_DB_FNAME_LEN);
×
1050
  pVgroup->dbUid = pDb->uid;
×
1051
  pVgroup->replica = 1;
×
1052
  pVgroup->keepVersion = -1;  // default: WAL keep version disabled
×
1053
  pVgroup->keepVersionTime = 0;
×
1054

1055
  if (mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray) != 0) return -1;
×
1056
  taosArrayDestroy(pArray);
×
1057

1058
  mInfo("db:%s, sma vgId:%d is alloced", pDb->name, pVgroup->vgId);
×
1059
  return 0;
×
1060
}
1061

1062
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups, SArray *dnodeList) {
1,103,192✔
1063
  int32_t code = -1;
1,103,192✔
1064
  SArray *pArray = NULL;
1,103,192✔
1065
  SVgObj *pVgroups = NULL;
1,103,192✔
1066

1067
  pVgroups = taosMemoryCalloc(pDb->cfg.numOfVgroups, sizeof(SVgObj));
1,103,192✔
1068
  if (pVgroups == NULL) {
1,103,192✔
UNCOV
1069
    code = terrno;
×
UNCOV
1070
    goto _OVER;
×
1071
  }
1072

1073
  pArray = mndBuildDnodesArray(pMnode, 0, dnodeList);
1,103,192✔
1074
  if (pArray == NULL) {
1,103,192✔
UNCOV
1075
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
1076
    if (terrno != 0) code = terrno;
×
UNCOV
1077
    goto _OVER;
×
1078
  }
1079

1080
  mInfo("db:%s, total %d dnodes used to create %d vgroups (%d vnodes)", pDb->name, (int32_t)taosArrayGetSize(pArray),
1,103,192✔
1081
        pDb->cfg.numOfVgroups, pDb->cfg.numOfVgroups * pDb->cfg.replications);
1082

1083
  int32_t  allocedVgroups = 0;
1,103,192✔
1084
  int32_t  maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
1,103,192✔
1085
  uint32_t hashMin = 0;
1,103,192✔
1086
  uint32_t hashMax = UINT32_MAX;
1,103,192✔
1087
  uint32_t hashInterval = (hashMax - hashMin) / pDb->cfg.numOfVgroups;
1,103,192✔
1088

1089
  if (maxVgId < 2) maxVgId = 2;
1,103,192✔
1090

1091
  for (uint32_t v = 0; v < pDb->cfg.numOfVgroups; v++) {
3,461,835✔
1092
    SVgObj *pVgroup = &pVgroups[v];
2,363,139✔
1093
    pVgroup->vgId = maxVgId++;
2,363,139✔
1094
    pVgroup->createdTime = taosGetTimestampMs();
2,363,139✔
1095
    pVgroup->updateTime = pVgroups->createdTime;
2,363,139✔
1096
    pVgroup->version = 1;
2,363,139✔
1097
    pVgroup->hashBegin = hashMin + hashInterval * v;
2,363,139✔
1098
    if (v == pDb->cfg.numOfVgroups - 1) {
2,363,139✔
1099
      pVgroup->hashEnd = hashMax;
1,100,849✔
1100
    } else {
1101
      pVgroup->hashEnd = hashMin + hashInterval * (v + 1) - 1;
1,262,290✔
1102
    }
1103

1104
    memcpy(pVgroup->dbName, pDb->name, TSDB_DB_FNAME_LEN);
2,363,139✔
1105
    pVgroup->dbUid = pDb->uid;
2,363,139✔
1106
    pVgroup->replica = pDb->cfg.replications;
2,363,139✔
1107
    pVgroup->keepVersion = -1;  // default: WAL keep version disabled
2,363,139✔
1108
    pVgroup->keepVersionTime = 0;
2,363,139✔
1109

1110
    if ((code = mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray)) != 0) {
2,363,139✔
1111
      goto _OVER;
4,496✔
1112
    }
1113

1114
    allocedVgroups++;
2,358,643✔
1115
  }
1116

1117
  *ppVgroups = pVgroups;
1,098,696✔
1118
  code = 0;
1,098,696✔
1119

1120
  mInfo("db:%s, total %d vgroups is alloced, replica:%d", pDb->name, pDb->cfg.numOfVgroups, pDb->cfg.replications);
1,098,696✔
1121

UNCOV
1122
_OVER:
×
1123
  if (code != 0) taosMemoryFree(pVgroups);
1,103,192✔
1124
  taosArrayDestroy(pArray);
1,103,192✔
1125
  TAOS_RETURN(code);
1,103,192✔
1126
}
1127

1128
SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup) {
27,425,797✔
1129
  SEpSet epset = {0};
27,425,797✔
1130

1131
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
61,172,001✔
1132
    const SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
33,746,204✔
1133
    SDnodeObj       *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
33,746,204✔
1134
    if (pDnode == NULL) continue;
33,746,204✔
1135

1136
    if (pVgid->syncState == TAOS_SYNC_STATE_LEADER || pVgid->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
33,731,506✔
1137
      epset.inUse = epset.numOfEps;
26,959,364✔
1138
    }
1139

1140
    if (addEpIntoEpSet(&epset, pDnode->fqdn, pDnode->port) != 0) {
33,731,506✔
UNCOV
1141
      mWarn("vgId:%d, failed to add ep:%s:%d into epset", pVgroup->vgId, pDnode->fqdn, pDnode->port);
×
1142
    }
1143
    mndReleaseDnode(pMnode, pDnode);
33,731,506✔
1144
  }
1145
  epsetSort(&epset);
27,425,797✔
1146

1147
  return epset;
27,425,797✔
1148
}
1149

1150
SEpSet mndGetVgroupEpsetById(SMnode *pMnode, int32_t vgId) {
525,837✔
1151
  SEpSet epset = {0};
525,837✔
1152

1153
  SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
525,837✔
1154
  if (!pVgroup) return epset;
525,837✔
1155

1156
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
1,115,309✔
1157
    const SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
589,472✔
1158
    SDnodeObj       *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
589,472✔
1159
    if (pDnode == NULL) continue;
589,472✔
1160

1161
    if (pVgid->syncState == TAOS_SYNC_STATE_LEADER || pVgid->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
589,472✔
1162
      epset.inUse = epset.numOfEps;
498,007✔
1163
    }
1164

1165
    if (addEpIntoEpSet(&epset, pDnode->fqdn, pDnode->port) != 0) {
589,472✔
UNCOV
1166
      mWarn("vgId:%d, failed to add ep:%s:%d into epset", pVgroup->vgId, pDnode->fqdn, pDnode->port);
×
1167
    }
1168
    mndReleaseDnode(pMnode, pDnode);
589,472✔
1169
  }
1170

1171
  mndReleaseVgroup(pMnode, pVgroup);
525,837✔
1172
  return epset;
525,837✔
1173
}
1174

1175
static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
270,673✔
1176
  SMnode   *pMnode = pReq->info.node;
270,673✔
1177
  SSdb     *pSdb = pMnode->pSdb;
270,673✔
1178
  int32_t   numOfRows = 0;
270,673✔
1179
  SVgObj   *pVgroup = NULL;
270,673✔
1180
  SDbObj   *pVgDb = NULL;
270,673✔
1181
  int32_t   cols = 0;
270,673✔
1182
  int64_t   curMs = taosGetTimestampMs();
270,673✔
1183
  int32_t   code = 0, lino = 0;
270,673✔
1184
  SDbObj   *pDb = NULL;
270,673✔
1185
  SUserObj *pUser = NULL;
270,673✔
1186
  SDbObj   *pIterDb = NULL;
270,673✔
1187
  char      objFName[TSDB_OBJ_FNAME_LEN + 1] = {0};
270,673✔
1188
  bool      showAll = false, showIter = false;
270,673✔
1189
  int64_t   dbUid = 0;
270,673✔
1190

1191
  MND_SHOW_CHECK_OBJ_PRIVILEGE_ALL(RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), PRIV_SHOW_VGROUPS, PRIV_OBJ_DB, 0, _OVER);
270,673✔
1192

1193
  if (strlen(pShow->db) > 0) {
270,673✔
1194
    pDb = mndAcquireDb(pMnode, pShow->db);
237,448✔
1195
    if (pDb == NULL) {
237,448✔
UNCOV
1196
      goto _OVER;
×
1197
    }
1198
  }
1199

1200
  while (numOfRows < rows) {
1,432,594✔
1201
    pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
1,432,594✔
1202
    if (pShow->pIter == NULL) break;
1,432,594✔
1203

1204
    if (pDb != NULL && pVgroup->dbUid != pDb->uid) {
1,162,711✔
1205
      sdbRelease(pSdb, pVgroup);
360,794✔
1206
      continue;
360,794✔
1207
    }
1208

1209
    MND_SHOW_CHECK_DB_PRIVILEGE(pDb, pVgroup->dbName, pVgroup, RPC_MSG_TOKEN(pReq), MND_OPER_SHOW_VGROUPS, _OVER);
801,917✔
1210

1211
    cols = 0;
799,863✔
1212
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
799,863✔
1213
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->vgId, false, pVgroup, pShow->pIter, _OVER);
799,863✔
1214

1215
    SName name = {0};
799,863✔
1216
    char  db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
799,863✔
1217
    code = tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB);
799,863✔
1218
    if (code != 0) {
799,863✔
UNCOV
1219
      mError("vgId:%d, failed to set dbName, since %s", pVgroup->vgId, tstrerror(code));
×
UNCOV
1220
      sdbRelease(pSdb, pVgroup);
×
1221
      // sdbCancelFetch(pSdb, pShow->pIter);
1222
      goto _OVER;
×
1223
    }
1224
    (void)tNameGetDbName(&name, varDataVal(db));
799,863✔
1225
    varDataSetLen(db, strlen(varDataVal(db)));
799,863✔
1226

1227
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
799,863✔
1228
    COL_DATA_SET_VAL_GOTO((const char *)db, false, pVgroup, pShow->pIter, _OVER);
799,863✔
1229

1230
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
799,863✔
1231
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->numOfTables, false, pVgroup, pShow->pIter, _OVER);
799,863✔
1232

1233
    bool isReady = false;
799,863✔
1234
    bool isLeaderRestored = false;
799,863✔
1235
    bool hasFollowerRestored = false;
799,863✔
1236
    ESyncState leaderState = TAOS_SYNC_STATE_OFFLINE;
799,863✔
1237
    // default 3 replica, add 1 replica if move vnode
1238
    for (int32_t i = 0; i < 4; ++i) {
3,999,315✔
1239
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3,199,452✔
1240
      if (i < pVgroup->replica) {
3,199,452✔
1241
        int16_t dnodeId = (int16_t)pVgroup->vnodeGid[i].dnodeId;
1,686,872✔
1242
        COL_DATA_SET_VAL_GOTO((const char *)&dnodeId, false, pVgroup, pShow->pIter, _OVER);
1,686,872✔
1243

1244
        bool       exist = false;
1,686,872✔
1245
        bool       online = false;
1,686,872✔
1246
        SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgroup->vnodeGid[i].dnodeId);
1,686,872✔
1247
        if (pDnode != NULL) {
1,686,872✔
1248
          exist = true;
1,686,872✔
1249
          online = mndIsDnodeOnline(pDnode, curMs);
1,686,872✔
1250
          mndReleaseDnode(pMnode, pDnode);
1,686,872✔
1251
        }
1252

1253
        char buf1[20] = {0};
1,686,872✔
1254
        char role[20] = "offline";
1,686,872✔
1255
        if (!exist) {
1,686,872✔
UNCOV
1256
          tstrncpy(role, "dropping", sizeof(role));
×
1257
        } else if (online) {
1,686,872✔
1258
          if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEADER ||
1,668,260✔
1259
              pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
1,053,257✔
1260
            if (pVgroup->vnodeGid[i].syncRestore) {
615,003✔
1261
              isLeaderRestored = true;
526,628✔
1262
            }
1263
          } else if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_FOLLOWER) {
1,053,257✔
1264
            if (pVgroup->vnodeGid[i].syncRestore) {
863,613✔
1265
              hasFollowerRestored = true;
499,611✔
1266
            }
1267
          }
1268
          if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEADER ||
1,668,260✔
1269
              pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER)
1,053,257✔
1270
            leaderState = pVgroup->vnodeGid[i].syncState;
615,003✔
1271
          snprintf(role, sizeof(role), "%s", syncStr(pVgroup->vnodeGid[i].syncState));
1,668,260✔
1272
        }
1273
        STR_WITH_MAXSIZE_TO_VARSTR(buf1, role, pShow->pMeta->pSchemas[cols].bytes);
1,686,872✔
1274

1275
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,686,872✔
1276
        COL_DATA_SET_VAL_GOTO((const char *)buf1, false, pVgroup, pShow->pIter, _OVER);
1,686,872✔
1277

1278
        char applyStr[TSDB_SYNC_APPLY_COMMIT_LEN + 1] = {0};
1,686,872✔
1279
        char buf[TSDB_SYNC_APPLY_COMMIT_LEN + VARSTR_HEADER_SIZE + 1] = {0};
1,686,872✔
1280

1281
        if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEARNER &&
1,686,872✔
1282
            (pVgroup->vnodeGid[i].snapSeq > 0 && pVgroup->vnodeGid[i].snapSeq < SYNC_SNAPSHOT_SEQ_END)) {
36,198✔
UNCOV
1283
          if (pDb != NULL) {
×
UNCOV
1284
            mInfo("db:%s, learner progress:%d", pDb->name, pVgroup->vnodeGid[i].learnerProgress);
×
1285
          } else {
1286
            mInfo("db:null, learner progress:%d", pVgroup->vnodeGid[i].learnerProgress);
×
1287
          }
1288

1289
          snprintf(applyStr, sizeof(applyStr), "%" PRId64 "/%" PRId64 "/%" PRId64 "(snap:%d)(learner:%d)",
×
UNCOV
1290
                   pVgroup->vnodeGid[i].syncAppliedIndex, pVgroup->vnodeGid[i].syncCommitIndex,
×
UNCOV
1291
                   pVgroup->vnodeGid[i].syncTotalIndex, pVgroup->vnodeGid[i].snapSeq,
×
1292
                   pVgroup->vnodeGid[i].learnerProgress);
×
1293
        } else if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEARNER) {
1,686,872✔
1294
          if (pDb != NULL) {
36,198✔
1295
            mInfo("db:%s, learner progress:%d", pDb->name, pVgroup->vnodeGid[i].learnerProgress);
34,538✔
1296
          } else {
1297
            mInfo("db:null, learner progress:%d", pVgroup->vnodeGid[i].learnerProgress);
1,660✔
1298
          }
1299

1300
          snprintf(applyStr, sizeof(applyStr), "%" PRId64 "/%" PRId64 "/%" PRId64 "(learner:%d)",
144,792✔
1301
                   pVgroup->vnodeGid[i].syncAppliedIndex, pVgroup->vnodeGid[i].syncCommitIndex,
72,396✔
1302
                   pVgroup->vnodeGid[i].syncTotalIndex, pVgroup->vnodeGid[i].learnerProgress);
72,396✔
1303
        } else if (pVgroup->vnodeGid[i].snapSeq > 0 && pVgroup->vnodeGid[i].snapSeq < SYNC_SNAPSHOT_SEQ_END) {
1,650,674✔
1304
          snprintf(applyStr, sizeof(applyStr), "%" PRId64 "/%" PRId64 "(snap:%d)",
702✔
1305
                   pVgroup->vnodeGid[i].syncAppliedIndex, pVgroup->vnodeGid[i].syncCommitIndex,
468✔
1306
                   pVgroup->vnodeGid[i].snapSeq);
234✔
1307
        } else {
1308
          snprintf(applyStr, sizeof(applyStr), "%" PRId64 "/%" PRId64, pVgroup->vnodeGid[i].syncAppliedIndex,
1,650,440✔
1309
                   pVgroup->vnodeGid[i].syncCommitIndex);
1,650,440✔
1310
        }
1311

1312
        STR_WITH_MAXSIZE_TO_VARSTR(buf, applyStr, pShow->pMeta->pSchemas[cols].bytes);
1,686,872✔
1313

1314
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,686,872✔
1315
        COL_DATA_SET_VAL_GOTO((const char *)&buf, false, pVgroup, pShow->pIter, _OVER);
1,686,872✔
1316
      } else {
1317
        colDataSetNULL(pColInfo, numOfRows);
1,512,580✔
1318
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,512,580✔
1319
        colDataSetNULL(pColInfo, numOfRows);
1,512,580✔
1320
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,512,580✔
1321
        colDataSetNULL(pColInfo, numOfRows);
1,512,580✔
1322
      }
1323
    }
1324

1325
    if (pVgroup->replica >= 3) {
799,863✔
1326
      if (isLeaderRestored && hasFollowerRestored) isReady = true;
351,288✔
1327
    } else if (pVgroup->replica == 2) {
448,575✔
1328
      if (leaderState == TAOS_SYNC_STATE_LEADER) {
184,433✔
1329
        if (isLeaderRestored && hasFollowerRestored) isReady = true;
89,146✔
1330
      } else if (leaderState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
95,287✔
UNCOV
1331
        if (isLeaderRestored) isReady = true;
×
1332
      }
1333
    } else {
1334
      if (isLeaderRestored) isReady = true;
264,142✔
1335
    }
1336
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
799,863✔
1337
    COL_DATA_SET_VAL_GOTO((const char *)&isReady, false, pVgroup, pShow->pIter, _OVER);
799,863✔
1338

1339
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
799,863✔
1340
    int64_t cacheUsage = (int64_t)pVgroup->cacheUsage;
799,863✔
1341
    COL_DATA_SET_VAL_GOTO((const char *)&cacheUsage, false, pVgroup, pShow->pIter, _OVER);
799,863✔
1342

1343
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
799,863✔
1344
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->numOfCachedTables, false, pVgroup, pShow->pIter, _OVER);
799,863✔
1345

1346
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
799,863✔
1347
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->isTsma, false, pVgroup, pShow->pIter, _OVER);
799,863✔
1348

1349
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
799,863✔
1350
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->mountVgId, false, pVgroup, pShow->pIter, _OVER);
799,863✔
1351

1352
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
799,863✔
1353
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->keepVersion, false, pVgroup, pShow->pIter, _OVER);
799,863✔
1354

1355
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
799,863✔
1356
    COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->keepVersionTime, false, pVgroup, pShow->pIter, _OVER);
799,863✔
1357

1358
    numOfRows++;
799,863✔
1359
    sdbRelease(pSdb, pVgroup);
799,863✔
1360
  }
1361
_OVER:
270,673✔
1362
  if (pUser) mndReleaseUser(pMnode, pUser);
270,673✔
1363
  if (pDb != NULL) {
270,673✔
1364
    mndReleaseDb(pMnode, pDb);
237,448✔
1365
  }
1366
  if (code != 0) {
270,673✔
UNCOV
1367
    mError("failed to retrieve vgroup info at line %d since %s", lino, tstrerror(code));
×
UNCOV
1368
    TAOS_RETURN(code);
×
1369
  }
1370

1371
  pShow->numOfRows += numOfRows;
270,673✔
1372
  return numOfRows;
270,673✔
1373
}
1374

1375
static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter) {
790✔
1376
  SSdb *pSdb = pMnode->pSdb;
790✔
1377
  sdbCancelFetchByType(pSdb, pIter, SDB_VGROUP);
790✔
1378
}
790✔
1379

1380
static bool mndGetVnodesNumFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
9,380,535✔
1381
  SVgObj  *pVgroup = pObj;
9,380,535✔
1382
  int32_t  dnodeId = *(int32_t *)p1;
9,380,535✔
1383
  int32_t *pNumOfVnodes = (int32_t *)p2;
9,380,535✔
1384

1385
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
24,675,832✔
1386
    if (pVgroup->vnodeGid[v].dnodeId == dnodeId) {
15,295,297✔
1387
      (*pNumOfVnodes)++;
6,095,593✔
1388
    }
1389
  }
1390

1391
  return true;
9,380,535✔
1392
}
1393

1394
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) {
3,610,958✔
1395
  int32_t numOfVnodes = 0;
3,610,958✔
1396
  sdbTraverse(pMnode->pSdb, SDB_VGROUP, mndGetVnodesNumFp, &dnodeId, &numOfVnodes, NULL);
3,610,958✔
1397
  return numOfVnodes;
3,610,958✔
1398
}
1399

1400
int64_t mndGetVgroupMemory(SMnode *pMnode, SDbObj *pDbInput, SVgObj *pVgroup) {
7,663,510✔
1401
  SDbObj *pDb = pDbInput;
7,663,510✔
1402
  if (pDbInput == NULL) {
7,663,510✔
1403
    pDb = mndAcquireDb(pMnode, pVgroup->dbName);
4,453,504✔
1404
  }
1405

1406
  int64_t vgroupMemroy = 0;
7,663,510✔
1407
  if (pDb != NULL) {
7,663,510✔
1408
    int64_t buffer = (int64_t)pDb->cfg.buffer * 1024 * 1024;
7,663,510✔
1409
    int64_t cache = (int64_t)pDb->cfg.pages * pDb->cfg.pageSize * 1024;
7,663,510✔
1410
    vgroupMemroy = buffer + cache;
7,663,510✔
1411
    int64_t cacheLast = (int64_t)pDb->cfg.cacheLastSize * 1024 * 1024;
7,663,510✔
1412
    if (pDb->cfg.cacheLast > 0) {
7,663,510✔
1413
      vgroupMemroy += cacheLast;
973,946✔
1414
    }
1415
    mDebug("db:%s, vgroup:%d, buffer:%" PRId64 " cache:%" PRId64 " cacheLast:%" PRId64, pDb->name, pVgroup->vgId,
7,663,510✔
1416
           buffer, cache, cacheLast);
1417
  }
1418

1419
  if (pDbInput == NULL) {
7,663,510✔
1420
    mndReleaseDb(pMnode, pDb);
4,453,504✔
1421
  }
1422
  return vgroupMemroy;
7,663,510✔
1423
}
1424

1425
static bool mndGetVnodeMemroyFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
6,131,498✔
1426
  SVgObj  *pVgroup = pObj;
6,131,498✔
1427
  int32_t  dnodeId = *(int32_t *)p1;
6,131,498✔
1428
  int64_t *pVnodeMemory = (int64_t *)p2;
6,131,498✔
1429

1430
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
14,181,552✔
1431
    if (pVgroup->vnodeGid[v].dnodeId == dnodeId) {
8,050,054✔
1432
      *pVnodeMemory += mndGetVgroupMemory(pMnode, NULL, pVgroup);
4,308,594✔
1433
    }
1434
  }
1435

1436
  return true;
6,131,498✔
1437
}
1438

1439
int64_t mndGetVnodesMemory(SMnode *pMnode, int32_t dnodeId) {
1,744,584✔
1440
  int64_t vnodeMemory = 0;
1,744,584✔
1441
  sdbTraverse(pMnode->pSdb, SDB_VGROUP, mndGetVnodeMemroyFp, &dnodeId, &vnodeMemory, NULL);
1,744,584✔
1442
  return vnodeMemory;
1,744,584✔
1443
}
1444

UNCOV
1445
void calculateRstoreFinishTime(double rate, int64_t applyCount, char *restoreStr, size_t restoreStrSize) {
×
UNCOV
1446
  if (rate == 0) {
×
UNCOV
1447
    snprintf(restoreStr, restoreStrSize, "0:0:0");
×
UNCOV
1448
    return;
×
1449
  }
1450

UNCOV
1451
  int64_t costTime = applyCount / rate;
×
UNCOV
1452
  int64_t totalSeconds = costTime / 1000;
×
UNCOV
1453
  int64_t hours = totalSeconds / 3600;
×
1454
  totalSeconds %= 3600;
×
1455
  int64_t minutes = totalSeconds / 60;
×
1456
  int64_t seconds = totalSeconds % 60;
×
1457
  snprintf(restoreStr, restoreStrSize, "%" PRId64 ":%" PRId64 ":%" PRId64, hours, minutes, seconds);
×
1458
}
1459

1460
static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
12,769✔
1461
  SMnode   *pMnode = pReq->info.node;
12,769✔
1462
  SSdb     *pSdb = pMnode->pSdb;
12,769✔
1463
  int32_t   numOfRows = 0;
12,769✔
1464
  SVgObj   *pVgroup = NULL;
12,769✔
1465
  SDbObj   *pVgDb = NULL;
12,769✔
1466
  int32_t   cols = 0;
12,769✔
1467
  int64_t   curMs = taosGetTimestampMs();
12,769✔
1468
  int32_t   code = 0, lino = 0;
12,769✔
1469
  SUserObj *pUser = NULL;
12,769✔
1470
  SDbObj   *pDb = NULL, *pIterDb = NULL;
12,769✔
1471
  char      objFName[TSDB_OBJ_FNAME_LEN + 1] = {0};
12,769✔
1472
  bool      showAll = false, showIter = false;
12,769✔
1473
  int64_t   dbUid = 0;
12,769✔
1474

1475
  MND_SHOW_CHECK_OBJ_PRIVILEGE_ALL(RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), PRIV_SHOW_VNODES, PRIV_OBJ_DB, 0, _OVER);
12,769✔
1476

1477
  while (numOfRows < rows - TSDB_MAX_REPLICA) {
37,891✔
1478
    pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
37,891✔
1479
    if (pShow->pIter == NULL) break;
37,891✔
1480

1481
    MND_SHOW_CHECK_DB_PRIVILEGE(pDb, pVgroup->dbName, pVgroup, RPC_MSG_TOKEN(pReq), MND_OPER_SHOW_VNODES, _OVER);
25,122✔
1482

1483
    for (int32_t i = 0; i < pVgroup->replica && numOfRows < rows; ++i) {
72,992✔
1484
      SVnodeGid       *pGid = &pVgroup->vnodeGid[i];
47,870✔
1485
      SColumnInfoData *pColInfo = NULL;
47,870✔
1486
      cols = 0;
47,870✔
1487

1488
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
47,870✔
1489
      COL_DATA_SET_VAL_GOTO((const char *)&pGid->dnodeId, false, pVgroup, pShow->pIter, _OVER);
47,870✔
1490
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
47,870✔
1491
      COL_DATA_SET_VAL_GOTO((const char *)&pVgroup->vgId, false, pVgroup, pShow->pIter, _OVER);
47,870✔
1492

1493
      // db_name
1494
      const char *dbname = mndGetDbStr(pVgroup->dbName);
47,870✔
1495
      char        b1[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
47,870✔
1496
      if (dbname != NULL) {
47,870✔
1497
        STR_WITH_MAXSIZE_TO_VARSTR(b1, dbname, TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE);
47,870✔
1498
      } else {
UNCOV
1499
        STR_WITH_MAXSIZE_TO_VARSTR(b1, "NULL", TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE);
×
1500
      }
1501
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
47,870✔
1502
      COL_DATA_SET_VAL_GOTO((const char *)b1, false, pVgroup, pShow->pIter, _OVER);
47,870✔
1503

1504
      // dnode is online?
1505
      SDnodeObj *pDnode = mndAcquireDnode(pMnode, pGid->dnodeId);
47,870✔
1506
      if (pDnode == NULL) {
47,870✔
UNCOV
1507
        mError("failed to acquire dnode. dnodeId:%d", pGid->dnodeId);
×
UNCOV
1508
        break;
×
1509
      }
1510
      bool isDnodeOnline = mndIsDnodeOnline(pDnode, curMs);
47,870✔
1511
      sdbRelease(pSdb, pDnode);
47,870✔
1512

1513
      char       buf[20] = {0};
47,870✔
1514
      ESyncState syncState = (isDnodeOnline) ? pGid->syncState : TAOS_SYNC_STATE_OFFLINE;
47,870✔
1515
      STR_TO_VARSTR(buf, syncStr(syncState));
47,870✔
1516
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
47,870✔
1517
      COL_DATA_SET_VAL_GOTO((const char *)buf, false, pVgroup, pShow->pIter, _OVER);
47,870✔
1518

1519
      int64_t roleTimeMs = (isDnodeOnline) ? pGid->roleTimeMs : 0;
47,870✔
1520
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
47,870✔
1521
      COL_DATA_SET_VAL_GOTO((const char *)&roleTimeMs, false, pVgroup, pShow->pIter, _OVER);
47,870✔
1522

1523
      int64_t startTimeMs = (isDnodeOnline) ? pGid->startTimeMs : 0;
47,870✔
1524
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
47,870✔
1525
      COL_DATA_SET_VAL_GOTO((const char *)&startTimeMs, false, pVgroup, pShow->pIter, _OVER);
47,870✔
1526

1527
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
47,870✔
1528
      COL_DATA_SET_VAL_GOTO((const char *)&pGid->syncRestore, false, pVgroup, pShow->pIter, _OVER);
47,870✔
1529

1530
      int64_t unappliedCount = pGid->syncCommitIndex - pGid->syncAppliedIndex;
47,870✔
1531
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
47,870✔
1532
      char restoreStr[20] = {0};
47,870✔
1533
      if (unappliedCount > 0) {
47,870✔
UNCOV
1534
        calculateRstoreFinishTime(pGid->appliedRate, unappliedCount, restoreStr, sizeof(restoreStr));
×
1535
      }
1536
      STR_TO_VARSTR(buf, restoreStr);
47,870✔
1537
      COL_DATA_SET_VAL_GOTO((const char *)&buf, false, pVgroup, pShow->pIter, _OVER);
47,870✔
1538

1539
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
47,870✔
1540
      COL_DATA_SET_VAL_GOTO((const char *)&unappliedCount, false, pVgroup, pShow->pIter, _OVER);
47,870✔
1541

1542
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
47,870✔
1543
      COL_DATA_SET_VAL_GOTO((const char *)&pGid->bufferSegmentUsed, false, pVgroup, pShow->pIter, _OVER);
47,870✔
1544

1545
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
47,870✔
1546
      COL_DATA_SET_VAL_GOTO((const char *)&pGid->bufferSegmentSize, false, pVgroup, pShow->pIter, _OVER);
47,870✔
1547

1548
      numOfRows++;
47,870✔
1549
    }
1550
    sdbRelease(pSdb, pVgroup);
25,122✔
1551
  }
1552
_OVER:
12,769✔
1553
  if (pUser) mndReleaseUser(pMnode, pUser);
12,769✔
1554
  if (pDb) mndReleaseDb(pMnode, pDb);
12,769✔
1555
  if (code != 0) {
12,769✔
UNCOV
1556
    mError("failed to retrieve vnode info at line %d since %s", lino, tstrerror(code));
×
UNCOV
1557
    return code;
×
1558
  }
1559
  pShow->numOfRows += numOfRows;
12,769✔
1560
  return numOfRows;
12,769✔
1561
}
1562

UNCOV
1563
static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter) {
×
UNCOV
1564
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
1565
  sdbCancelFetchByType(pSdb, pIter, SDB_VGROUP);
×
1566
}
×
1567

1568
static int32_t mndAddVnodeToVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray) {
82,517✔
1569
  int32_t code = 0;
82,517✔
1570
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
82,517✔
1571
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
328,514✔
1572
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
245,997✔
1573
    mInfo("trans:%d, dnode:%d, equivalent vnodes:%d others:%d", pTrans->id, pDnode->id, pDnode->numOfVnodes,
245,997✔
1574
          pDnode->numOfOtherNodes);
1575
  }
1576

1577
  SVnodeGid *pVgid = &pVgroup->vnodeGid[pVgroup->replica];
82,517✔
1578
  for (int32_t d = 0; d < taosArrayGetSize(pArray); ++d) {
103,394✔
1579
    SDnodeObj *pDnode = taosArrayGet(pArray, d);
99,805✔
1580

1581
    bool used = false;
99,805✔
1582
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
233,850✔
1583
      if (pDnode->id == pVgroup->vnodeGid[vn].dnodeId) {
154,922✔
1584
        used = true;
20,877✔
1585
        break;
20,877✔
1586
      }
1587
    }
1588
    if (used) continue;
99,805✔
1589

1590
    if (pDnode == NULL) {
78,928✔
UNCOV
1591
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_DNODES);
×
1592
    }
1593
    if (pDnode->numOfVnodes >= pDnode->numOfSupportVnodes) {
78,928✔
1594
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_VNODES);
×
1595
    }
1596

1597
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
78,928✔
1598
    if (pDnode->memAvail - vgMem - pDnode->memUsed <= 0) {
78,928✔
UNCOV
1599
      mError("trans:%d, db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
1600
             pTrans->id, pVgroup->dbName, pVgroup->vgId, vgMem, pDnode->id, pDnode->memAvail, pDnode->memUsed);
UNCOV
1601
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE);
×
1602
    } else {
1603
      pDnode->memUsed += vgMem;
78,928✔
1604
    }
1605

1606
    pVgid->dnodeId = pDnode->id;
78,928✔
1607
    pVgid->syncState = TAOS_SYNC_STATE_OFFLINE;
78,928✔
1608
    mInfo("trans:%id, db:%s, vgId:%d, vn:%d is added, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
78,928✔
1609
          pTrans->id, pVgroup->dbName, pVgroup->vgId, pVgroup->replica, vgMem, pVgid->dnodeId, pDnode->memAvail,
1610
          pDnode->memUsed);
1611

1612
    pVgroup->replica++;
78,928✔
1613
    pDnode->numOfVnodes++;
78,928✔
1614

1615
    SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
78,928✔
1616
    if (pVgRaw == NULL) {
78,928✔
UNCOV
1617
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
1618
      if (terrno != 0) code = terrno;
×
UNCOV
1619
      TAOS_RETURN(code);
×
1620
    }
1621
    if ((code = mndTransAppendGroupRedolog(pTrans, pVgRaw, pVgroup->vgId)) != 0) {
78,928✔
1622
      sdbFreeRaw(pVgRaw);
×
UNCOV
1623
      TAOS_RETURN(code);
×
1624
    }
1625
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
78,928✔
1626
    if (code != 0) {
78,928✔
UNCOV
1627
      mError("trans:%d, vgId:%d, failed to set raw status since %s at line:%d", pTrans->id, pVgroup->vgId,
×
1628
             tstrerror(code), __LINE__);
1629
    }
1630
    TAOS_RETURN(code);
78,928✔
1631
  }
1632

1633
  code = TSDB_CODE_MND_NO_ENOUGH_DNODES;
3,589✔
1634
  mError("trans:%d, db:%s, failed to add vnode to vgId:%d since %s", pTrans->id, pVgroup->dbName, pVgroup->vgId,
3,589✔
1635
         tstrerror(code));
1636
  TAOS_RETURN(code);
3,589✔
1637
}
1638

1639
static int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray,
16,452✔
1640
                                        SVnodeGid *pDelVgid) {
1641
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
16,452✔
1642
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
63,502✔
1643
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
47,050✔
1644
    mInfo("trans:%d, dnode:%d, equivalent vnodes:%d others:%d", pTrans->id, pDnode->id, pDnode->numOfVnodes,
47,050✔
1645
          pDnode->numOfOtherNodes);
1646
  }
1647

1648
  int32_t code = -1;
16,452✔
1649
  for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) {
18,936✔
1650
    SDnodeObj *pDnode = taosArrayGet(pArray, d);
18,659✔
1651

1652
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
36,522✔
1653
      SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
34,038✔
1654
      if (pVgid->dnodeId == pDnode->id) {
34,038✔
1655
        int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
16,175✔
1656
        pDnode->memUsed -= vgMem;
16,175✔
1657
        mInfo("trans:%d, db:%s, vgId:%d, vn:%d is removed, memory:%" PRId64 ", dnode:%d avail:%" PRId64
16,175✔
1658
              " used:%" PRId64,
1659
              pTrans->id, pVgroup->dbName, pVgroup->vgId, vn, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
1660
        pDnode->numOfVnodes--;
16,175✔
1661
        pVgroup->replica--;
16,175✔
1662
        *pDelVgid = *pVgid;
16,175✔
1663
        *pVgid = pVgroup->vnodeGid[pVgroup->replica];
16,175✔
1664
        memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
16,175✔
1665
        code = 0;
16,175✔
1666
        goto _OVER;
16,175✔
1667
      }
1668
    }
1669
  }
1670

1671
_OVER:
277✔
1672
  if (code != 0) {
16,452✔
1673
    code = TSDB_CODE_APP_ERROR;
277✔
1674
    mError("trans:%d, db:%s, failed to remove vnode from vgId:%d since %s", pTrans->id, pVgroup->dbName, pVgroup->vgId,
277✔
1675
           tstrerror(code));
1676
    TAOS_RETURN(code);
277✔
1677
  }
1678

1679
  for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
45,207✔
1680
    SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
29,032✔
1681
    mInfo("trans:%d, db:%s, vgId:%d, vn:%d dnode:%d is reserved", pTrans->id, pVgroup->dbName, pVgroup->vgId, vn,
29,032✔
1682
          pVgid->dnodeId);
1683
  }
1684

1685
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
16,175✔
1686
  if (pVgRaw == NULL) {
16,175✔
UNCOV
1687
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
1688
    if (terrno != 0) code = terrno;
×
UNCOV
1689
    TAOS_RETURN(code);
×
1690
  }
1691
  if (mndTransAppendGroupRedolog(pTrans, pVgRaw, pVgroup->vgId) != 0) {
16,175✔
1692
    sdbFreeRaw(pVgRaw);
×
UNCOV
1693
    TAOS_RETURN(code);
×
1694
  }
1695
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
16,175✔
1696
  if (code != 0) {
16,175✔
UNCOV
1697
    mError("trans:%d, vgId:%d, failed to set raw status since %s at line:%d", pTrans->id, pVgroup->vgId,
×
1698
           tstrerror(code), __LINE__);
1699
  }
1700

1701
  TAOS_RETURN(code);
16,175✔
1702
}
1703

UNCOV
1704
static int32_t mndRemoveVnodeFromVgroupWithoutSave(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray,
×
1705
                                                   SVnodeGid *pDelVgid) {
UNCOV
1706
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
×
1707
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
×
UNCOV
1708
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
×
1709
    mInfo("dnode:%d, equivalent vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes);
×
1710
  }
1711

1712
  int32_t code = -1;
×
UNCOV
1713
  for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) {
×
UNCOV
1714
    SDnodeObj *pDnode = taosArrayGet(pArray, d);
×
1715

1716
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
×
1717
      SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
×
UNCOV
1718
      if (pVgid->dnodeId == pDnode->id) {
×
1719
        int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
×
1720
        pDnode->memUsed -= vgMem;
×
1721
        mInfo("db:%s, vgId:%d, vn:%d is removed, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
×
1722
              pVgroup->dbName, pVgroup->vgId, vn, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
1723
        pDnode->numOfVnodes--;
×
1724
        pVgroup->replica--;
×
UNCOV
1725
        *pDelVgid = *pVgid;
×
1726
        *pVgid = pVgroup->vnodeGid[pVgroup->replica];
×
1727
        memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
×
1728
        code = 0;
×
1729
        goto _OVER;
×
1730
      }
1731
    }
1732
  }
1733

UNCOV
1734
_OVER:
×
UNCOV
1735
  if (code != 0) {
×
UNCOV
1736
    code = TSDB_CODE_APP_ERROR;
×
1737
    mError("db:%s, failed to remove vnode from vgId:%d since %s", pVgroup->dbName, pVgroup->vgId, tstrerror(code));
×
1738
    TAOS_RETURN(code);
×
1739
  }
1740

1741
  for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
×
UNCOV
1742
    SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
×
UNCOV
1743
    mInfo("db:%s, vgId:%d, vn:%d dnode:%d is reserved", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId);
×
1744
  }
1745

1746
  TAOS_RETURN(code);
×
1747
}
1748

1749
int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid) {
2,749,126✔
1750
  int32_t      code = 0;
2,749,126✔
1751
  STransAction action = {0};
2,749,126✔
1752

1753
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
2,749,126✔
1754
  if (pDnode == NULL) return -1;
2,749,126✔
1755
  action.epSet = mndGetDnodeEpset(pDnode);
2,749,126✔
1756
  mndReleaseDnode(pMnode, pDnode);
2,749,126✔
1757

1758
  int32_t contLen = 0;
2,749,126✔
1759
  void   *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
2,749,126✔
1760
  if (pReq == NULL) return -1;
2,749,126✔
1761

1762
  action.pCont = pReq;
2,749,126✔
1763
  action.contLen = contLen;
2,749,126✔
1764
  action.msgType = TDMT_DND_CREATE_VNODE;
2,749,126✔
1765
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
2,749,126✔
1766
  action.groupId = pVgroup->vgId;
2,749,126✔
1767

1768
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
2,749,126✔
UNCOV
1769
    taosMemoryFree(pReq);
×
UNCOV
1770
    TAOS_RETURN(code);
×
1771
  }
1772

1773
  TAOS_RETURN(code);
2,749,126✔
1774
}
1775

1776
int32_t mndRestoreAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
2,110✔
1777
                                       SDnodeObj *pDnode) {
1778
  int32_t      code = 0;
2,110✔
1779
  STransAction action = {0};
2,110✔
1780

1781
  action.epSet = mndGetDnodeEpset(pDnode);
2,110✔
1782

1783
  int32_t contLen = 0;
2,110✔
1784
  void   *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
2,110✔
1785
  if (pReq == NULL) {
2,110✔
UNCOV
1786
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
1787
    if (terrno != 0) code = terrno;
×
UNCOV
1788
    TAOS_RETURN(code);
×
1789
  }
1790

1791
  action.pCont = pReq;
2,110✔
1792
  action.contLen = contLen;
2,110✔
1793
  action.msgType = TDMT_DND_CREATE_VNODE;
2,110✔
1794
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
2,110✔
1795
  action.groupId = pVgroup->vgId;
2,110✔
1796

1797
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
2,110✔
UNCOV
1798
    taosMemoryFree(pReq);
×
UNCOV
1799
    TAOS_RETURN(code);
×
1800
  }
1801

1802
  TAOS_RETURN(code);
2,110✔
1803
}
1804

1805
int32_t mndAddAlterVnodeConfirmAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
279,651✔
1806
  int32_t      code = 0;
279,651✔
1807
  STransAction action = {0};
279,651✔
1808
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
279,651✔
1809

1810
  mInfo("trans:%d, vgId:%d, build alter vnode confirm req", pTrans->id, pVgroup->vgId);
279,651✔
1811
  int32_t   contLen = sizeof(SMsgHead);
279,651✔
1812
  SMsgHead *pHead = taosMemoryMalloc(contLen);
279,651✔
1813
  if (pHead == NULL) {
279,651✔
UNCOV
1814
    TAOS_RETURN(terrno);
×
1815
  }
1816

1817
  pHead->contLen = htonl(contLen);
279,651✔
1818
  pHead->vgId = htonl(pVgroup->vgId);
279,651✔
1819

1820
  action.pCont = pHead;
279,651✔
1821
  action.contLen = contLen;
279,651✔
1822
  action.msgType = TDMT_VND_ALTER_CONFIRM;
279,651✔
1823
  // incorrect redirect result will cause this erro
1824
  action.retryCode = TSDB_CODE_VND_INVALID_VGROUP_ID;
279,651✔
1825
  action.groupId = pVgroup->vgId;
279,651✔
1826

1827
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
279,651✔
UNCOV
1828
    taosMemoryFree(pHead);
×
UNCOV
1829
    TAOS_RETURN(code);
×
1830
  }
1831

1832
  TAOS_RETURN(code);
279,651✔
1833
}
1834

UNCOV
1835
int32_t mndAddChangeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pOldVgroup, SVgObj *pNewVgroup,
×
1836
                                 int32_t dnodeId) {
UNCOV
1837
  int32_t      code = 0;
×
1838
  STransAction action = {0};
×
UNCOV
1839
  action.epSet = mndGetVgroupEpset(pMnode, pNewVgroup);
×
1840

1841
  int32_t contLen = 0;
×
1842
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pNewVgroup, dnodeId, &contLen);
×
UNCOV
1843
  if (pReq == NULL) {
×
1844
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1845
    if (terrno != 0) code = terrno;
×
1846
    TAOS_RETURN(code);
×
1847
  }
1848

1849
  int32_t totallen = contLen + sizeof(SMsgHead);
×
1850

UNCOV
1851
  SMsgHead *pHead = taosMemoryMalloc(totallen);
×
1852
  if (pHead == NULL) {
×
UNCOV
1853
    taosMemoryFree(pReq);
×
1854
    TAOS_RETURN(terrno);
×
1855
  }
1856

1857
  pHead->contLen = htonl(totallen);
×
UNCOV
1858
  pHead->vgId = htonl(pNewVgroup->vgId);
×
1859

1860
  memcpy((void *)(pHead + 1), pReq, contLen);
×
1861
  taosMemoryFree(pReq);
×
1862

1863
  action.pCont = pHead;
×
1864
  action.contLen = totallen;
×
UNCOV
1865
  action.msgType = TDMT_SYNC_CONFIG_CHANGE;
×
1866

1867
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
1868
    taosMemoryFree(pHead);
×
UNCOV
1869
    TAOS_RETURN(code);
×
1870
  }
1871

1872
  TAOS_RETURN(code);
×
1873
}
1874

1875
static int32_t mndAddAlterVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, int32_t srcVgId, SVgObj *pVgroup) {
31,742✔
1876
  int32_t      code = 0;
31,742✔
1877
  STransAction action = {0};
31,742✔
1878
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
31,742✔
1879

1880
  int32_t contLen = 0;
31,742✔
1881
  void   *pReq = mndBuildAlterVnodeHashRangeReq(pMnode, srcVgId, pVgroup, &contLen);
31,742✔
1882
  if (pReq == NULL) {
31,742✔
UNCOV
1883
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
1884
    if (terrno != 0) code = terrno;
×
UNCOV
1885
    TAOS_RETURN(code);
×
1886
  }
1887

1888
  action.pCont = pReq;
31,742✔
1889
  action.contLen = contLen;
31,742✔
1890
  action.msgType = TDMT_VND_ALTER_HASHRANGE;
31,742✔
1891
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
31,742✔
1892

1893
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
31,742✔
UNCOV
1894
    taosMemoryFree(pReq);
×
UNCOV
1895
    TAOS_RETURN(code);
×
1896
  }
1897

1898
  mInfo("trans:%d, add alter vnode hash range action for from vgId:%d to vgId:%d", pTrans->id, srcVgId, pVgroup->vgId);
31,742✔
1899
  TAOS_RETURN(code);
31,742✔
1900
}
1901

1902
int32_t mndAddAlterVnodeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
187,098✔
1903
  int32_t      code = 0;
187,098✔
1904
  STransAction action = {0};
187,098✔
1905
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
187,098✔
1906

1907
  int32_t contLen = 0;
187,098✔
1908
  void   *pReq = mndBuildAlterVnodeConfigReq(pMnode, pDb, pVgroup, &contLen);
187,098✔
1909
  if (pReq == NULL) {
187,098✔
UNCOV
1910
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
1911
    if (terrno != 0) code = terrno;
×
UNCOV
1912
    TAOS_RETURN(code);
×
1913
  }
1914

1915
  action.pCont = pReq;
187,098✔
1916
  action.contLen = contLen;
187,098✔
1917
  action.msgType = TDMT_VND_ALTER_CONFIG;
187,098✔
1918
  action.groupId = pVgroup->vgId;
187,098✔
1919

1920
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
187,098✔
UNCOV
1921
    taosMemoryFree(pReq);
×
UNCOV
1922
    TAOS_RETURN(code);
×
1923
  }
1924

1925
  TAOS_RETURN(code);
187,098✔
1926
}
1927

1928
int32_t mndAddNewVgPrepareAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVg) {
2,391,373✔
1929
  int32_t  code = 0;
2,391,373✔
1930
  SSdbRaw *pRaw = mndVgroupActionEncode(pVg);
2,391,373✔
1931
  if (pRaw == NULL) {
2,391,373✔
UNCOV
1932
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
1933
    if (terrno != 0) code = terrno;
×
UNCOV
1934
    goto _err;
×
1935
  }
1936

1937
  TAOS_CHECK_GOTO(mndTransAppendPrepareLog(pTrans, pRaw), NULL, _err);
2,391,373✔
1938
  if (sdbSetRawStatus(pRaw, SDB_STATUS_CREATING) != 0) {
2,391,373✔
UNCOV
1939
    mError("vgId:%d, failed to set raw status at line:%d", pVg->vgId, __LINE__);
×
1940
  }
1941
  if (code != 0) {
2,391,373✔
1942
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVg->vgId, tstrerror(code), __LINE__);
×
UNCOV
1943
    TAOS_RETURN(code);
×
1944
  }
1945
  pRaw = NULL;
2,391,373✔
1946
  TAOS_RETURN(code);
2,391,373✔
1947

UNCOV
1948
_err:
×
UNCOV
1949
  sdbFreeRaw(pRaw);
×
UNCOV
1950
  TAOS_RETURN(code);
×
1951
}
1952

1953
int32_t mndAddAlterVnodeReplicaAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
683,140✔
1954
  int32_t    code = 0;
683,140✔
1955
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
683,140✔
1956
  if (pDnode == NULL) {
683,140✔
UNCOV
1957
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
1958
    if (terrno != 0) code = terrno;
×
UNCOV
1959
    TAOS_RETURN(code);
×
1960
  }
1961

1962
  STransAction action = {0};
683,140✔
1963
  action.epSet = mndGetDnodeEpset(pDnode);
683,140✔
1964
  mndReleaseDnode(pMnode, pDnode);
683,140✔
1965

1966
  int32_t contLen = 0;
683,140✔
1967
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
683,140✔
1968
  if (pReq == NULL) {
683,140✔
UNCOV
1969
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
1970
    if (terrno != 0) code = terrno;
×
UNCOV
1971
    TAOS_RETURN(code);
×
1972
  }
1973

1974
  action.pCont = pReq;
683,140✔
1975
  action.contLen = contLen;
683,140✔
1976
  action.msgType = TDMT_VND_ALTER_REPLICA;
683,140✔
1977
  action.groupId = pVgroup->vgId;
683,140✔
1978

1979
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
683,140✔
UNCOV
1980
    taosMemoryFree(pReq);
×
UNCOV
1981
    TAOS_RETURN(code);
×
1982
  }
1983

1984
  TAOS_RETURN(code);
683,140✔
1985
}
1986

UNCOV
1987
int32_t mndAddCheckLearnerCatchupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
×
UNCOV
1988
  int32_t    code = 0;
×
UNCOV
1989
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
×
1990
  if (pDnode == NULL) {
×
1991
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1992
    if (terrno != 0) code = terrno;
×
1993
    TAOS_RETURN(code);
×
1994
  }
1995

1996
  STransAction action = {0};
×
UNCOV
1997
  action.epSet = mndGetDnodeEpset(pDnode);
×
UNCOV
1998
  mndReleaseDnode(pMnode, pDnode);
×
1999

2000
  int32_t contLen = 0;
×
2001
  void   *pReq = mndBuildCheckLearnCatchupReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
×
UNCOV
2002
  if (pReq == NULL) {
×
2003
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2004
    if (terrno != 0) code = terrno;
×
2005
    TAOS_RETURN(code);
×
2006
  }
2007

2008
  action.pCont = pReq;
×
UNCOV
2009
  action.contLen = contLen;
×
UNCOV
2010
  action.msgType = TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP;
×
2011
  action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
2012
  action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
×
2013

2014
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
2015
    taosMemoryFree(pReq);
×
UNCOV
2016
    TAOS_RETURN(code);
×
2017
  }
2018

2019
  TAOS_RETURN(code);
×
2020
}
2021

2022
int32_t mndAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
101,538✔
2023
  int32_t    code = 0;
101,538✔
2024
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
101,538✔
2025
  if (pDnode == NULL) {
101,538✔
UNCOV
2026
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2027
    if (terrno != 0) code = terrno;
×
UNCOV
2028
    TAOS_RETURN(code);
×
2029
  }
2030

2031
  STransAction action = {0};
101,538✔
2032
  action.epSet = mndGetDnodeEpset(pDnode);
101,538✔
2033
  mndReleaseDnode(pMnode, pDnode);
101,538✔
2034

2035
  int32_t contLen = 0;
101,538✔
2036
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
101,538✔
2037
  if (pReq == NULL) {
101,538✔
UNCOV
2038
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2039
    if (terrno != 0) code = terrno;
×
UNCOV
2040
    TAOS_RETURN(code);
×
2041
  }
2042

2043
  action.pCont = pReq;
101,538✔
2044
  action.contLen = contLen;
101,538✔
2045
  action.msgType = TDMT_DND_ALTER_VNODE_TYPE;
101,538✔
2046
  action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
101,538✔
2047
  action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
101,538✔
2048
  action.groupId = pVgroup->vgId;
101,538✔
2049

2050
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
101,538✔
UNCOV
2051
    taosMemoryFree(pReq);
×
UNCOV
2052
    TAOS_RETURN(code);
×
2053
  }
2054

2055
  TAOS_RETURN(code);
101,538✔
2056
}
2057

2058
int32_t mndRestoreAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
2,110✔
2059
                                          SDnodeObj *pDnode) {
2060
  int32_t      code = 0;
2,110✔
2061
  STransAction action = {0};
2,110✔
2062
  action.epSet = mndGetDnodeEpset(pDnode);
2,110✔
2063

2064
  int32_t contLen = 0;
2,110✔
2065
  void   *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, pDnode->id, &contLen);
2,110✔
2066
  if (pReq == NULL) {
2,110✔
UNCOV
2067
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2068
    if (terrno != 0) code = terrno;
×
UNCOV
2069
    TAOS_RETURN(code);
×
2070
  }
2071

2072
  action.pCont = pReq;
2,110✔
2073
  action.contLen = contLen;
2,110✔
2074
  action.msgType = TDMT_DND_ALTER_VNODE_TYPE;
2,110✔
2075
  action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
2,110✔
2076
  action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
2,110✔
2077
  action.groupId = pVgroup->vgId;
2,110✔
2078

2079
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
2,110✔
UNCOV
2080
    taosMemoryFree(pReq);
×
UNCOV
2081
    TAOS_RETURN(code);
×
2082
  }
2083

2084
  TAOS_RETURN(code);
2,110✔
2085
}
2086

2087
static int32_t mndAddDisableVnodeWriteAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
31,742✔
2088
                                             int32_t dnodeId) {
2089
  int32_t    code = 0;
31,742✔
2090
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
31,742✔
2091
  if (pDnode == NULL) {
31,742✔
UNCOV
2092
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2093
    if (terrno != 0) code = terrno;
×
UNCOV
2094
    TAOS_RETURN(code);
×
2095
  }
2096

2097
  STransAction action = {0};
31,742✔
2098
  action.epSet = mndGetDnodeEpset(pDnode);
31,742✔
2099
  mndReleaseDnode(pMnode, pDnode);
31,742✔
2100

2101
  int32_t contLen = 0;
31,742✔
2102
  void   *pReq = mndBuildDisableVnodeWriteReq(pMnode, pDb, pVgroup->vgId, &contLen);
31,742✔
2103
  if (pReq == NULL) {
31,742✔
UNCOV
2104
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2105
    if (terrno != 0) code = terrno;
×
UNCOV
2106
    TAOS_RETURN(code);
×
2107
  }
2108

2109
  action.pCont = pReq;
31,742✔
2110
  action.contLen = contLen;
31,742✔
2111
  action.msgType = TDMT_VND_DISABLE_WRITE;
31,742✔
2112

2113
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
31,742✔
UNCOV
2114
    taosMemoryFree(pReq);
×
UNCOV
2115
    TAOS_RETURN(code);
×
2116
  }
2117

2118
  TAOS_RETURN(code);
31,742✔
2119
}
2120

2121
int32_t mndAddDropVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid,
4,057,989✔
2122
                              bool isRedo) {
2123
  int32_t      code = 0;
4,057,989✔
2124
  STransAction action = {0};
4,057,989✔
2125

2126
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
4,057,989✔
2127
  if (pDnode == NULL) {
4,057,989✔
UNCOV
2128
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2129
    if (terrno != 0) code = terrno;
×
UNCOV
2130
    TAOS_RETURN(code);
×
2131
  }
2132
  action.epSet = mndGetDnodeEpset(pDnode);
4,057,989✔
2133
  mndReleaseDnode(pMnode, pDnode);
4,057,989✔
2134

2135
  int32_t contLen = 0;
4,057,989✔
2136
  void   *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
4,057,989✔
2137
  if (pReq == NULL) {
4,057,989✔
UNCOV
2138
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2139
    if (terrno != 0) code = terrno;
×
UNCOV
2140
    TAOS_RETURN(code);
×
2141
  }
2142

2143
  action.pCont = pReq;
4,057,989✔
2144
  action.contLen = contLen;
4,057,989✔
2145
  action.msgType = TDMT_DND_DROP_VNODE;
4,057,989✔
2146
  action.acceptableCode = TSDB_CODE_VND_NOT_EXIST;
4,057,989✔
2147
  action.groupId = pVgroup->vgId;
4,057,989✔
2148

2149
  if (isRedo) {
4,057,989✔
2150
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
1,452,039✔
UNCOV
2151
      taosMemoryFree(pReq);
×
UNCOV
2152
      TAOS_RETURN(code);
×
2153
    }
2154
  } else {
2155
    if ((code = mndTransAppendUndoAction(pTrans, &action)) != 0) {
2,605,950✔
UNCOV
2156
      taosMemoryFree(pReq);
×
UNCOV
2157
      TAOS_RETURN(code);
×
2158
    }
2159
  }
2160

2161
  TAOS_RETURN(code);
4,057,989✔
2162
}
2163

2164
int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t vnIndex,
14,003✔
2165
                                    SArray *pArray, bool force, bool unsafe) {
2166
  int32_t code = 0;
14,003✔
2167
  SVgObj  newVg = {0};
14,003✔
2168
  memcpy(&newVg, pVgroup, sizeof(SVgObj));
14,003✔
2169

2170
  mInfo("vgId:%d, trans:%d, vgroup info before move, replica:%d", newVg.vgId, pTrans->id, newVg.replica);
14,003✔
2171
  for (int32_t i = 0; i < newVg.replica; ++i) {
45,210✔
2172
    mInfo("vgId:%d, trans:%d, vnode:%d dnode:%d", newVg.vgId, pTrans->id, i, newVg.vnodeGid[i].dnodeId);
31,207✔
2173
  }
2174

2175
  if (!force) {
14,003✔
2176
#if 1
2177
    {
2178
#else
2179
    if (newVg.replica == 1) {
2180
#endif
2181
      mInfo("vgId:%d, trans:%d, will add 1 vnode, replca:%d", pVgroup->vgId, pTrans->id, newVg.replica);
14,003✔
2182
      TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray));
14,003✔
2183
      for (int32_t i = 0; i < newVg.replica - 1; ++i) {
45,210✔
2184
        TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId));
31,207✔
2185
      }
2186
      TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[newVg.replica - 1]));
14,003✔
2187
      TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg));
14,003✔
2188

2189
      mInfo("vgId:%d, trans:%d, will remove 1 vnode, replca:2", pVgroup->vgId, pTrans->id);
14,003✔
2190
      newVg.replica--;
14,003✔
2191
      SVnodeGid del = newVg.vnodeGid[vnIndex];
14,003✔
2192
      newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
14,003✔
2193
      memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
14,003✔
2194
      {
2195
        SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
14,003✔
2196
        if (pRaw == NULL) {
14,003✔
UNCOV
2197
          code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2198
          if (terrno != 0) code = terrno;
×
UNCOV
2199
          TAOS_RETURN(code);
×
2200
        }
2201
        if ((code = mndTransAppendGroupRedolog(pTrans, pRaw, pVgroup->vgId)) != 0) {
14,003✔
2202
          sdbFreeRaw(pRaw);
×
UNCOV
2203
          TAOS_RETURN(code);
×
2204
        }
2205
        code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
14,003✔
2206
        if (code != 0) {
14,003✔
UNCOV
2207
          mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
UNCOV
2208
          return code;
×
2209
        }
2210
      }
2211

2212
      TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true));
14,003✔
2213
      for (int32_t i = 0; i < newVg.replica; ++i) {
45,210✔
2214
        TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId));
31,207✔
2215
      }
2216
      TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg));
14,003✔
2217
#if 1
2218
    }
2219
#else
2220
    } else {  // new replica == 3
2221
      mInfo("vgId:%d, will add 1 vnode, replca:3", pVgroup->vgId);
2222
      if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1;
2223
      mInfo("vgId:%d, will remove 1 vnode, replca:4", pVgroup->vgId);
2224
      newVg.replica--;
2225
      SVnodeGid del = newVg.vnodeGid[vnIndex];
2226
      newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
2227
      memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
2228
      {
2229
        SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
2230
        if (pRaw == NULL) return -1;
2231
        if (mndTransAppendRedolog(pTrans, pRaw) != 0) {
2232
          sdbFreeRaw(pRaw);
2233
          return -1;
2234
        }
2235
      }
2236

2237
      if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true) != 0) return -1;
2238
      for (int32_t i = 0; i < newVg.replica; ++i) {
2239
        if (i == vnIndex) continue;
2240
        if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1;
2241
      }
2242
      if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[vnIndex]) != 0) return -1;
2243
      if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
2244
    }
2245
#endif
2246
  } else {
UNCOV
2247
    mInfo("vgId:%d, will add 1 vnode and force remove 1 vnode", pVgroup->vgId);
×
UNCOV
2248
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray));
×
UNCOV
2249
    newVg.replica--;
×
2250
    // SVnodeGid del = newVg.vnodeGid[vnIndex];
2251
    newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
×
2252
    memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
×
2253
    {
2254
      SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
×
2255
      if (pRaw == NULL) {
×
UNCOV
2256
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2257
        if (terrno != 0) code = terrno;
×
2258
        TAOS_RETURN(code);
×
2259
      }
2260
      if ((code = mndTransAppendGroupRedolog(pTrans, pRaw, pVgroup->vgId)) != 0) {
×
2261
        sdbFreeRaw(pRaw);
×
UNCOV
2262
        TAOS_RETURN(code);
×
2263
      }
2264
      code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
×
2265
      if (code != 0) {
×
UNCOV
2266
        mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
2267
        return code;
×
2268
      }
2269
    }
2270

UNCOV
2271
    for (int32_t i = 0; i < newVg.replica; ++i) {
×
UNCOV
2272
      if (i != vnIndex) {
×
UNCOV
2273
        TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId));
×
2274
      }
2275
    }
2276
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[vnIndex]));
×
UNCOV
2277
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg));
×
2278

2279
    if (newVg.replica == 1) {
×
2280
      if (force && !unsafe) {
×
UNCOV
2281
        TAOS_RETURN(TSDB_CODE_VND_META_DATA_UNSAFE_DELETE);
×
2282
      }
2283

2284
      SSdb *pSdb = pMnode->pSdb;
×
UNCOV
2285
      void *pIter = NULL;
×
2286

2287
      while (1) {
×
2288
        SStbObj *pStb = NULL;
×
UNCOV
2289
        pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
×
2290
        if (pIter == NULL) break;
×
2291

2292
        if (strcmp(pStb->db, pDb->name) == 0) {
×
2293
          if ((code = mndSetForceDropCreateStbRedoActions(pMnode, pTrans, &newVg, pStb)) != 0) {
×
UNCOV
2294
            sdbCancelFetch(pSdb, pIter);
×
2295
            sdbRelease(pSdb, pStb);
×
2296
            TAOS_RETURN(code);
×
2297
          }
2298
        }
2299

UNCOV
2300
        sdbRelease(pSdb, pStb);
×
2301
      }
2302

2303
      mInfo("vgId:%d, all data is dropped since replica=1", pVgroup->vgId);
×
2304
    }
2305
  }
2306

2307
  {
2308
    SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
14,003✔
2309
    if (pRaw == NULL) {
14,003✔
UNCOV
2310
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2311
      if (terrno != 0) code = terrno;
×
UNCOV
2312
      TAOS_RETURN(code);
×
2313
    }
2314
    if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
14,003✔
2315
      sdbFreeRaw(pRaw);
×
UNCOV
2316
      TAOS_RETURN(code);
×
2317
    }
2318
    code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
14,003✔
2319
    if (code != 0) {
14,003✔
UNCOV
2320
      mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
UNCOV
2321
      return code;
×
2322
    }
2323
  }
2324

2325
  mInfo("vgId:%d, vgroup info after move, replica:%d", newVg.vgId, newVg.replica);
14,003✔
2326
  for (int32_t i = 0; i < newVg.replica; ++i) {
45,210✔
2327
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
31,207✔
2328
  }
2329
  TAOS_RETURN(code);
14,003✔
2330
}
2331

2332
int32_t mndSetMoveVgroupsInfoToTrans(SMnode *pMnode, STrans *pTrans, int32_t delDnodeId, bool force, bool unsafe) {
7,003✔
2333
  int32_t code = 0;
7,003✔
2334
  SArray *pArray = mndBuildDnodesArray(pMnode, delDnodeId, NULL);
7,003✔
2335
  if (pArray == NULL) {
7,003✔
UNCOV
2336
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2337
    if (terrno != 0) code = terrno;
×
UNCOV
2338
    TAOS_RETURN(code);
×
2339
  }
2340

2341
  void *pIter = NULL;
7,003✔
2342
  while (1) {
20,925✔
2343
    SVgObj *pVgroup = NULL;
27,928✔
2344
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
27,928✔
2345
    if (pIter == NULL) break;
27,928✔
2346

2347
    int32_t vnIndex = -1;
20,925✔
2348
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
41,982✔
2349
      if (pVgroup->vnodeGid[i].dnodeId == delDnodeId) {
35,060✔
2350
        vnIndex = i;
14,003✔
2351
        break;
14,003✔
2352
      }
2353
    }
2354

2355
    code = 0;
20,925✔
2356
    if (vnIndex != -1) {
20,925✔
2357
      mInfo("vgId:%d, trans:%d, vnode:%d will be removed from dnode:%d, force:%d", pVgroup->vgId, pTrans->id, vnIndex,
14,003✔
2358
            delDnodeId, force);
2359
      SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
14,003✔
2360
      code = mndSetMoveVgroupInfoToTrans(pMnode, pTrans, pDb, pVgroup, vnIndex, pArray, force, unsafe);
14,003✔
2361
      mndReleaseDb(pMnode, pDb);
14,003✔
2362
    }
2363

2364
    sdbRelease(pMnode->pSdb, pVgroup);
20,925✔
2365

2366
    if (code != 0) {
20,925✔
UNCOV
2367
      sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
2368
      break;
×
2369
    }
2370
  }
2371

2372
  taosArrayDestroy(pArray);
7,003✔
2373
  TAOS_RETURN(code);
7,003✔
2374
}
2375

2376
static int32_t mndAddIncVgroupReplicaToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
64,248✔
2377
                                             int32_t newDnodeId) {
2378
  int32_t code = 0;
64,248✔
2379
  mInfo("vgId:%d, will add 1 vnode, replica:%d dnode:%d", pVgroup->vgId, pVgroup->replica, newDnodeId);
64,248✔
2380

2381
  // assoc dnode
2382
  SVnodeGid *pGid = &pVgroup->vnodeGid[pVgroup->replica];
64,248✔
2383
  pVgroup->replica++;
64,248✔
2384
  pGid->dnodeId = newDnodeId;
64,248✔
2385
  pGid->syncState = TAOS_SYNC_STATE_OFFLINE;
64,248✔
2386
  pGid->nodeRole = TAOS_SYNC_ROLE_LEARNER;
64,248✔
2387

2388
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
64,248✔
2389
  if (pVgRaw == NULL) {
64,248✔
UNCOV
2390
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2391
    if (terrno != 0) code = terrno;
×
UNCOV
2392
    TAOS_RETURN(code);
×
2393
  }
2394
  if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
64,248✔
2395
    sdbFreeRaw(pVgRaw);
×
UNCOV
2396
    TAOS_RETURN(code);
×
2397
  }
2398
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
64,248✔
2399
  if (code != 0) {
64,248✔
UNCOV
2400
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVgroup->vgId, tstrerror(code), __LINE__);
×
UNCOV
2401
    TAOS_RETURN(code);
×
2402
  }
2403

2404
  // learner
2405
  for (int32_t i = 0; i < pVgroup->replica - 1; ++i) {
218,196✔
2406
    TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId));
153,948✔
2407
  }
2408
  TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pGid));
64,248✔
2409

2410
  // voter
2411
  pGid->nodeRole = TAOS_SYNC_ROLE_VOTER;
64,248✔
2412
  TAOS_CHECK_RETURN(mndAddAlterVnodeTypeAction(pMnode, pTrans, pDb, pVgroup, pGid->dnodeId));
64,248✔
2413
  for (int32_t i = 0; i < pVgroup->replica - 1; ++i) {
218,196✔
2414
    TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId));
153,948✔
2415
  }
2416

2417
  // confirm
2418
  TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup));
64,248✔
2419

2420
  TAOS_RETURN(code);
64,248✔
2421
}
2422

2423
static int32_t mndAddDecVgroupReplicaFromTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
64,248✔
2424
                                               int32_t delDnodeId) {
2425
  int32_t code = 0;
64,248✔
2426
  mInfo("vgId:%d, will remove 1 vnode, replica:%d dnode:%d", pVgroup->vgId, pVgroup->replica, delDnodeId);
64,248✔
2427

2428
  SVnodeGid *pGid = NULL;
64,248✔
2429
  SVnodeGid  delGid = {0};
64,248✔
2430
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
112,280✔
2431
    if (pVgroup->vnodeGid[i].dnodeId == delDnodeId) {
112,280✔
2432
      pGid = &pVgroup->vnodeGid[i];
64,248✔
2433
      break;
64,248✔
2434
    }
2435
  }
2436

2437
  if (pGid == NULL) return 0;
64,248✔
2438

2439
  pVgroup->replica--;
64,248✔
2440
  memcpy(&delGid, pGid, sizeof(SVnodeGid));
64,248✔
2441
  memcpy(pGid, &pVgroup->vnodeGid[pVgroup->replica], sizeof(SVnodeGid));
64,248✔
2442
  memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
64,248✔
2443

2444
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
64,248✔
2445
  if (pVgRaw == NULL) {
64,248✔
UNCOV
2446
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2447
    if (terrno != 0) code = terrno;
×
UNCOV
2448
    TAOS_RETURN(code);
×
2449
  }
2450
  if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
64,248✔
2451
    sdbFreeRaw(pVgRaw);
×
UNCOV
2452
    TAOS_RETURN(code);
×
2453
  }
2454
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
64,248✔
2455
  if (code != 0) {
64,248✔
UNCOV
2456
    mError("vgId:%d, failed to set raw status since %s at line:%d", pVgroup->vgId, tstrerror(code), __LINE__);
×
UNCOV
2457
    TAOS_RETURN(code);
×
2458
  }
2459

2460
  TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pDb, pVgroup, &delGid, true));
64,248✔
2461
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
218,196✔
2462
    TAOS_CHECK_RETURN(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId));
153,948✔
2463
  }
2464
  TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup));
64,248✔
2465

2466
  TAOS_RETURN(code);
64,248✔
2467
}
2468

2469
static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup, SDnodeObj *pNew1,
35,747✔
2470
                                     SDnodeObj *pOld1, SDnodeObj *pNew2, SDnodeObj *pOld2, SDnodeObj *pNew3,
2471
                                     SDnodeObj *pOld3) {
2472
  int32_t code = -1;
35,747✔
2473
  STrans *pTrans = NULL;
35,747✔
2474

2475
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "red-vgroup");
35,747✔
2476
  if (pTrans == NULL) {
35,747✔
UNCOV
2477
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2478
    if (terrno != 0) code = terrno;
×
UNCOV
2479
    goto _OVER;
×
2480
  }
2481

2482
  mndTransSetDbName(pTrans, pVgroup->dbName, NULL);
35,747✔
2483
  TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
35,747✔
2484
  TAOS_CHECK_GOTO(mndTransCheckConflictWithRetention(pMnode, pTrans), NULL, _OVER);
35,688✔
2485

2486
  mndTransSetSerial(pTrans);
35,688✔
2487
  mInfo("trans:%d, used to redistribute vgroup, vgId:%d", pTrans->id, pVgroup->vgId);
35,688✔
2488

2489
  SVgObj newVg = {0};
35,688✔
2490
  memcpy(&newVg, pVgroup, sizeof(SVgObj));
35,688✔
2491
  mInfo("vgId:%d, vgroup info before redistribute, replica:%d", newVg.vgId, newVg.replica);
35,688✔
2492
  for (int32_t i = 0; i < newVg.replica; ++i) {
120,558✔
2493
    mInfo("vgId:%d, vnode:%d dnode:%d role:%s", newVg.vgId, i, newVg.vnodeGid[i].dnodeId,
84,870✔
2494
          syncStr(newVg.vnodeGid[i].syncState));
2495
  }
2496

2497
  if (pNew1 != NULL && pOld1 != NULL) {
35,688✔
2498
    int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew1->id);
35,688✔
2499
    if (numOfVnodes >= pNew1->numOfSupportVnodes) {
35,688✔
2500
      mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew1->id, numOfVnodes,
665✔
2501
             pNew1->numOfSupportVnodes);
2502
      code = TSDB_CODE_MND_NO_ENOUGH_VNODES;
665✔
2503
      goto _OVER;
665✔
2504
    }
2505

2506
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
35,023✔
2507
    if (pNew1->memAvail - vgMem - pNew1->memUsed <= 0) {
35,023✔
UNCOV
2508
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
2509
             pVgroup->dbName, pVgroup->vgId, vgMem, pNew1->id, pNew1->memAvail, pNew1->memUsed);
UNCOV
2510
      code = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
×
2511
      goto _OVER;
×
2512
    } else {
2513
      pNew1->memUsed += vgMem;
35,023✔
2514
    }
2515

2516
    TAOS_CHECK_GOTO(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew1->id), NULL, _OVER);
35,023✔
2517
    TAOS_CHECK_GOTO(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld1->id), NULL, _OVER);
35,023✔
2518
  }
2519

2520
  if (pNew2 != NULL && pOld2 != NULL) {
35,023✔
2521
    int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew2->id);
10,625✔
2522
    if (numOfVnodes >= pNew2->numOfSupportVnodes) {
10,625✔
UNCOV
2523
      mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew2->id, numOfVnodes,
×
2524
             pNew2->numOfSupportVnodes);
UNCOV
2525
      code = TSDB_CODE_MND_NO_ENOUGH_VNODES;
×
2526
      goto _OVER;
×
2527
    }
2528
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
10,625✔
2529
    if (pNew2->memAvail - vgMem - pNew2->memUsed <= 0) {
10,625✔
UNCOV
2530
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
2531
             pVgroup->dbName, pVgroup->vgId, vgMem, pNew2->id, pNew2->memAvail, pNew2->memUsed);
UNCOV
2532
      code = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
×
2533
      goto _OVER;
×
2534
    } else {
2535
      pNew2->memUsed += vgMem;
10,625✔
2536
    }
2537
    TAOS_CHECK_GOTO(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew2->id), NULL, _OVER);
10,625✔
2538
    TAOS_CHECK_GOTO(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld2->id), NULL, _OVER);
10,625✔
2539
  }
2540

2541
  if (pNew3 != NULL && pOld3 != NULL) {
35,023✔
2542
    int32_t numOfVnodes = mndGetVnodesNum(pMnode, pNew3->id);
4,159✔
2543
    if (numOfVnodes >= pNew3->numOfSupportVnodes) {
4,159✔
UNCOV
2544
      mError("vgId:%d, no enough vnodes in dnode:%d, numOfVnodes:%d support:%d", newVg.vgId, pNew3->id, numOfVnodes,
×
2545
             pNew3->numOfSupportVnodes);
UNCOV
2546
      code = TSDB_CODE_MND_NO_ENOUGH_VNODES;
×
2547
      goto _OVER;
×
2548
    }
2549
    int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
4,159✔
2550
    if (pNew3->memAvail - vgMem - pNew3->memUsed <= 0) {
4,159✔
UNCOV
2551
      mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
×
2552
             pVgroup->dbName, pVgroup->vgId, vgMem, pNew3->id, pNew3->memAvail, pNew3->memUsed);
UNCOV
2553
      code = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
×
2554
      goto _OVER;
×
2555
    } else {
2556
      pNew3->memUsed += vgMem;
4,159✔
2557
    }
2558
    TAOS_CHECK_GOTO(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew3->id), NULL, _OVER);
4,159✔
2559
    TAOS_CHECK_GOTO(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld3->id), NULL, _OVER);
4,159✔
2560
  }
2561

2562
  {
2563
    SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
35,023✔
2564
    if (pRaw == NULL) {
35,023✔
UNCOV
2565
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2566
      if (terrno != 0) code = terrno;
×
UNCOV
2567
      goto _OVER;
×
2568
    }
2569
    if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
35,023✔
2570
      sdbFreeRaw(pRaw);
×
UNCOV
2571
      goto _OVER;
×
2572
    }
2573
    code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
35,023✔
2574
    if (code != 0) {
35,023✔
UNCOV
2575
      mError("vgId:%d, failed to set raw status since %s at line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
UNCOV
2576
      goto _OVER;
×
2577
    }
2578
  }
2579

2580
  mInfo("vgId:%d, vgroup info after redistribute, replica:%d", newVg.vgId, newVg.replica);
35,023✔
2581
  for (int32_t i = 0; i < newVg.replica; ++i) {
117,898✔
2582
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
82,875✔
2583
  }
2584

2585
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
35,023✔
2586
  code = 0;
34,433✔
2587

2588
_OVER:
35,747✔
2589
  mndTransDrop(pTrans);
35,747✔
2590
  mndReleaseDb(pMnode, pDb);
35,747✔
2591
  TAOS_RETURN(code);
35,747✔
2592
}
2593

2594
static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) {
43,612✔
2595
  SMnode    *pMnode = pReq->info.node;
43,612✔
2596
  SDnodeObj *pNew1 = NULL;
43,612✔
2597
  SDnodeObj *pNew2 = NULL;
43,612✔
2598
  SDnodeObj *pNew3 = NULL;
43,612✔
2599
  SDnodeObj *pOld1 = NULL;
43,612✔
2600
  SDnodeObj *pOld2 = NULL;
43,612✔
2601
  SDnodeObj *pOld3 = NULL;
43,612✔
2602
  SVgObj    *pVgroup = NULL;
43,612✔
2603
  SDbObj    *pDb = NULL;
43,612✔
2604
  int32_t    code = -1;
43,612✔
2605
  int64_t    curMs = taosGetTimestampMs();
43,612✔
2606
  int32_t    newDnodeId[3] = {0};
43,612✔
2607
  int32_t    oldDnodeId[3] = {0};
43,612✔
2608
  int32_t    newIndex = -1;
43,612✔
2609
  int32_t    oldIndex = -1;
43,612✔
2610
  int64_t    tss = taosGetTimestampMs();
43,612✔
2611

2612
  SRedistributeVgroupReq req = {0};
43,612✔
2613
  if (tDeserializeSRedistributeVgroupReq(pReq->pCont, pReq->contLen, &req) != 0) {
43,612✔
UNCOV
2614
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
2615
    goto _OVER;
×
2616
  }
2617

2618
  mInfo("vgId:%d, start to redistribute vgroup to dnode %d:%d:%d", req.vgId, req.dnodeId1, req.dnodeId2, req.dnodeId3);
43,612✔
2619
  if ((code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_REDISTRIBUTE_VGROUP)) != 0) {
43,612✔
UNCOV
2620
    goto _OVER;
×
2621
  }
2622

2623
  pVgroup = mndAcquireVgroup(pMnode, req.vgId);
43,612✔
2624
  if (pVgroup == NULL) {
43,612✔
2625
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
1,995✔
2626
    if (terrno != 0) code = terrno;
1,995✔
2627
    goto _OVER;
1,995✔
2628
  }
2629
  if (pVgroup->mountVgId) {
41,617✔
UNCOV
2630
    code = TSDB_CODE_MND_MOUNT_OBJ_NOT_SUPPORT;
×
UNCOV
2631
    goto _OVER;
×
2632
  }
2633
  pDb = mndAcquireDb(pMnode, pVgroup->dbName);
41,617✔
2634
  if (pDb == NULL) {
41,617✔
UNCOV
2635
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2636
    if (terrno != 0) code = terrno;
×
UNCOV
2637
    goto _OVER;
×
2638
  }
2639

2640
  if (pVgroup->replica == 1) {
41,617✔
2641
    if (req.dnodeId1 <= 0 || req.dnodeId2 > 0 || req.dnodeId3 > 0) {
10,471✔
UNCOV
2642
      code = TSDB_CODE_MND_INVALID_REPLICA;
×
UNCOV
2643
      goto _OVER;
×
2644
    }
2645

2646
    if (req.dnodeId1 == pVgroup->vnodeGid[0].dnodeId) {
10,471✔
2647
      // terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
UNCOV
2648
      code = 0;
×
UNCOV
2649
      goto _OVER;
×
2650
    }
2651

2652
    pNew1 = mndAcquireDnode(pMnode, req.dnodeId1);
10,471✔
2653
    if (pNew1 == NULL) {
10,471✔
UNCOV
2654
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2655
      if (terrno != 0) code = terrno;
×
UNCOV
2656
      goto _OVER;
×
2657
    }
2658
    if (!mndIsDnodeOnline(pNew1, curMs)) {
10,471✔
2659
      code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
UNCOV
2660
      goto _OVER;
×
2661
    }
2662

2663
    pOld1 = mndAcquireDnode(pMnode, pVgroup->vnodeGid[0].dnodeId);
10,471✔
2664
    if (pOld1 == NULL) {
10,471✔
UNCOV
2665
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2666
      if (terrno != 0) code = terrno;
×
UNCOV
2667
      goto _OVER;
×
2668
    }
2669
    if (!mndIsDnodeOnline(pOld1, curMs)) {
10,471✔
2670
      code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
UNCOV
2671
      goto _OVER;
×
2672
    }
2673

2674
    code = mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, NULL, NULL, NULL, NULL);
10,471✔
2675

2676
  } else if (pVgroup->replica == 3) {
31,146✔
2677
    if (req.dnodeId1 <= 0 || req.dnodeId2 <= 0 || req.dnodeId3 <= 0) {
29,776✔
2678
      code = TSDB_CODE_MND_INVALID_REPLICA;
2,660✔
2679
      goto _OVER;
2,660✔
2680
    }
2681

2682
    if (req.dnodeId1 == req.dnodeId2 || req.dnodeId1 == req.dnodeId3 || req.dnodeId2 == req.dnodeId3) {
27,116✔
2683
      code = TSDB_CODE_MND_INVALID_REPLICA;
665✔
2684
      goto _OVER;
665✔
2685
    }
2686

2687
    if (req.dnodeId1 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId1 != pVgroup->vnodeGid[1].dnodeId &&
26,451✔
2688
        req.dnodeId1 != pVgroup->vnodeGid[2].dnodeId) {
12,474✔
2689
      newDnodeId[++newIndex] = req.dnodeId1;
11,144✔
2690
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
11,144✔
2691
    }
2692

2693
    if (req.dnodeId2 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[1].dnodeId &&
26,451✔
2694
        req.dnodeId2 != pVgroup->vnodeGid[2].dnodeId) {
17,626✔
2695
      newDnodeId[++newIndex] = req.dnodeId2;
12,992✔
2696
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
12,992✔
2697
    }
2698

2699
    if (req.dnodeId3 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId3 != pVgroup->vnodeGid[1].dnodeId &&
26,451✔
2700
        req.dnodeId3 != pVgroup->vnodeGid[2].dnodeId) {
21,152✔
2701
      newDnodeId[++newIndex] = req.dnodeId3;
16,996✔
2702
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
16,996✔
2703
    }
2704

2705
    if (req.dnodeId1 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[0].dnodeId &&
26,451✔
2706
        req.dnodeId3 != pVgroup->vnodeGid[0].dnodeId) {
13,720✔
2707
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[0].dnodeId;
12,418✔
2708
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
12,418✔
2709
    }
2710

2711
    if (req.dnodeId1 != pVgroup->vnodeGid[1].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[1].dnodeId &&
26,451✔
2712
        req.dnodeId3 != pVgroup->vnodeGid[1].dnodeId) {
16,380✔
2713
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[1].dnodeId;
12,383✔
2714
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
12,383✔
2715
    }
2716

2717
    if (req.dnodeId1 != pVgroup->vnodeGid[2].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[2].dnodeId &&
26,451✔
2718
        req.dnodeId3 != pVgroup->vnodeGid[2].dnodeId) {
20,487✔
2719
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[2].dnodeId;
16,331✔
2720
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
16,331✔
2721
    }
2722

2723
    if (newDnodeId[0] != 0) {
26,451✔
2724
      pNew1 = mndAcquireDnode(pMnode, newDnodeId[0]);
25,620✔
2725
      if (pNew1 == NULL) {
25,620✔
UNCOV
2726
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2727
        if (terrno != 0) code = terrno;
×
UNCOV
2728
        goto _OVER;
×
2729
      }
2730
      if (!mndIsDnodeOnline(pNew1, curMs)) {
25,620✔
2731
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
665✔
2732
        goto _OVER;
665✔
2733
      }
2734
    }
2735

2736
    if (newDnodeId[1] != 0) {
25,786✔
2737
      pNew2 = mndAcquireDnode(pMnode, newDnodeId[1]);
10,304✔
2738
      if (pNew2 == NULL) {
10,304✔
UNCOV
2739
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2740
        if (terrno != 0) code = terrno;
×
UNCOV
2741
        goto _OVER;
×
2742
      }
2743
      if (!mndIsDnodeOnline(pNew2, curMs)) {
10,304✔
2744
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
UNCOV
2745
        goto _OVER;
×
2746
      }
2747
    }
2748

2749
    if (newDnodeId[2] != 0) {
25,786✔
2750
      pNew3 = mndAcquireDnode(pMnode, newDnodeId[2]);
5,208✔
2751
      if (pNew3 == NULL) {
5,208✔
UNCOV
2752
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2753
        if (terrno != 0) code = terrno;
×
UNCOV
2754
        goto _OVER;
×
2755
      }
2756
      if (!mndIsDnodeOnline(pNew3, curMs)) {
5,208✔
2757
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
UNCOV
2758
        goto _OVER;
×
2759
      }
2760
    }
2761

2762
    if (oldDnodeId[0] != 0) {
25,786✔
2763
      pOld1 = mndAcquireDnode(pMnode, oldDnodeId[0]);
24,955✔
2764
      if (pOld1 == NULL) {
24,955✔
UNCOV
2765
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2766
        if (terrno != 0) code = terrno;
×
UNCOV
2767
        goto _OVER;
×
2768
      }
2769
      if (!mndIsDnodeOnline(pOld1, curMs)) {
24,955✔
2770
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
1,049✔
2771
        goto _OVER;
1,049✔
2772
      }
2773
    }
2774

2775
    if (oldDnodeId[1] != 0) {
24,737✔
2776
      pOld2 = mndAcquireDnode(pMnode, oldDnodeId[1]);
9,255✔
2777
      if (pOld2 == NULL) {
9,255✔
UNCOV
2778
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2779
        if (terrno != 0) code = terrno;
×
UNCOV
2780
        goto _OVER;
×
2781
      }
2782
      if (!mndIsDnodeOnline(pOld2, curMs)) {
9,255✔
2783
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
UNCOV
2784
        goto _OVER;
×
2785
      }
2786
    }
2787

2788
    if (oldDnodeId[2] != 0) {
24,737✔
2789
      pOld3 = mndAcquireDnode(pMnode, oldDnodeId[2]);
4,159✔
2790
      if (pOld3 == NULL) {
4,159✔
UNCOV
2791
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2792
        if (terrno != 0) code = terrno;
×
UNCOV
2793
        goto _OVER;
×
2794
      }
2795
      if (!mndIsDnodeOnline(pOld3, curMs)) {
4,159✔
2796
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
UNCOV
2797
        goto _OVER;
×
2798
      }
2799
    }
2800

2801
    if (pNew1 == NULL && pOld1 == NULL && pNew2 == NULL && pOld2 == NULL && pNew3 == NULL && pOld3 == NULL) {
24,737✔
2802
      // terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
2803
      code = 0;
831✔
2804
      goto _OVER;
831✔
2805
    }
2806

2807
    code = mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, pNew2, pOld2, pNew3, pOld3);
23,906✔
2808

2809
  } else if (pVgroup->replica == 2) {
1,370✔
2810
    if (req.dnodeId1 <= 0 || req.dnodeId2 <= 0) {
1,370✔
UNCOV
2811
      code = TSDB_CODE_MND_INVALID_REPLICA;
×
UNCOV
2812
      goto _OVER;
×
2813
    }
2814

2815
    if (req.dnodeId1 == req.dnodeId2) {
1,370✔
UNCOV
2816
      code = TSDB_CODE_MND_INVALID_REPLICA;
×
UNCOV
2817
      goto _OVER;
×
2818
    }
2819

2820
    if (req.dnodeId1 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId1 != pVgroup->vnodeGid[1].dnodeId) {
1,370✔
2821
      newDnodeId[++newIndex] = req.dnodeId1;
1,370✔
2822
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
1,370✔
2823
    }
2824

2825
    if (req.dnodeId2 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[1].dnodeId) {
1,370✔
2826
      newDnodeId[++newIndex] = req.dnodeId2;
1,370✔
2827
      mInfo("vgId:%d, dnode:%d will be added, index:%d", pVgroup->vgId, newDnodeId[newIndex], newIndex);
1,370✔
2828
    }
2829

2830
    if (req.dnodeId1 != pVgroup->vnodeGid[0].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[0].dnodeId) {
1,370✔
2831
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[0].dnodeId;
1,370✔
2832
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
1,370✔
2833
    }
2834

2835
    if (req.dnodeId1 != pVgroup->vnodeGid[1].dnodeId && req.dnodeId2 != pVgroup->vnodeGid[1].dnodeId) {
1,370✔
2836
      oldDnodeId[++oldIndex] = pVgroup->vnodeGid[1].dnodeId;
1,370✔
2837
      mInfo("vgId:%d, dnode:%d will be removed, index:%d", pVgroup->vgId, oldDnodeId[oldIndex], oldIndex);
1,370✔
2838
    }
2839

2840
    if (newDnodeId[0] != 0) {
1,370✔
2841
      pNew1 = mndAcquireDnode(pMnode, newDnodeId[0]);
1,370✔
2842
      if (pNew1 == NULL) {
1,370✔
UNCOV
2843
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2844
        if (terrno != 0) code = terrno;
×
UNCOV
2845
        goto _OVER;
×
2846
      }
2847
      if (!mndIsDnodeOnline(pNew1, curMs)) {
1,370✔
2848
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
UNCOV
2849
        goto _OVER;
×
2850
      }
2851
    }
2852

2853
    if (newDnodeId[1] != 0) {
1,370✔
2854
      pNew2 = mndAcquireDnode(pMnode, newDnodeId[1]);
1,370✔
2855
      if (pNew2 == NULL) {
1,370✔
UNCOV
2856
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2857
        if (terrno != 0) code = terrno;
×
UNCOV
2858
        goto _OVER;
×
2859
      }
2860
      if (!mndIsDnodeOnline(pNew2, curMs)) {
1,370✔
2861
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
UNCOV
2862
        goto _OVER;
×
2863
      }
2864
    }
2865

2866
    if (oldDnodeId[0] != 0) {
1,370✔
2867
      pOld1 = mndAcquireDnode(pMnode, oldDnodeId[0]);
1,370✔
2868
      if (pOld1 == NULL) {
1,370✔
UNCOV
2869
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2870
        if (terrno != 0) code = terrno;
×
UNCOV
2871
        goto _OVER;
×
2872
      }
2873
      if (!mndIsDnodeOnline(pOld1, curMs)) {
1,370✔
2874
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
UNCOV
2875
        goto _OVER;
×
2876
      }
2877
    }
2878

2879
    if (oldDnodeId[1] != 0) {
1,370✔
2880
      pOld2 = mndAcquireDnode(pMnode, oldDnodeId[1]);
1,370✔
2881
      if (pOld2 == NULL) {
1,370✔
UNCOV
2882
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2883
        if (terrno != 0) code = terrno;
×
UNCOV
2884
        goto _OVER;
×
2885
      }
2886
      if (!mndIsDnodeOnline(pOld2, curMs)) {
1,370✔
2887
        code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
×
UNCOV
2888
        goto _OVER;
×
2889
      }
2890
    }
2891

2892
    if (pNew1 == NULL && pOld1 == NULL && pNew2 == NULL && pOld2 == NULL) {
1,370✔
2893
      // terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
UNCOV
2894
      code = 0;
×
UNCOV
2895
      goto _OVER;
×
2896
    }
2897

2898
    code = mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, pNew2, pOld2, NULL, NULL);
1,370✔
2899
  } else {
UNCOV
2900
    code = TSDB_CODE_MND_REQ_REJECTED;
×
UNCOV
2901
    goto _OVER;
×
2902
  }
2903

2904
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
35,747✔
2905

2906
  if (tsAuditLevel >= AUDIT_LEVEL_CLUSTER) {
35,747✔
2907
    char obj[33] = {0};
35,747✔
2908
    (void)tsnprintf(obj, sizeof(obj), "%d", req.vgId);
35,747✔
2909

2910
    int64_t tse = taosGetTimestampMs();
35,747✔
2911
    double  duration = (double)(tse - tss);
35,747✔
2912
    duration = duration / 1000;
35,747✔
2913
    auditRecord(pReq, pMnode->clusterId, "RedistributeVgroup", "", obj, req.sql, req.sqlLen, duration, 0);
35,747✔
2914
  }
2915
_OVER:
43,612✔
2916
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
43,612✔
2917
    mError("vgId:%d, failed to redistribute to dnode %d:%d:%d since %s", req.vgId, req.dnodeId1, req.dnodeId2,
8,348✔
2918
           req.dnodeId3, tstrerror(code));
2919
  }
2920

2921
  mndReleaseDnode(pMnode, pNew1);
43,612✔
2922
  mndReleaseDnode(pMnode, pNew2);
43,612✔
2923
  mndReleaseDnode(pMnode, pNew3);
43,612✔
2924
  mndReleaseDnode(pMnode, pOld1);
43,612✔
2925
  mndReleaseDnode(pMnode, pOld2);
43,612✔
2926
  mndReleaseDnode(pMnode, pOld3);
43,612✔
2927
  mndReleaseVgroup(pMnode, pVgroup);
43,612✔
2928
  mndReleaseDb(pMnode, pDb);
43,612✔
2929
  tFreeSRedistributeVgroupReq(&req);
43,612✔
2930

2931
  TAOS_RETURN(code);
43,612✔
2932
}
2933

2934
static void *mndBuildSForceBecomeFollowerReq(SMnode *pMnode, SVgObj *pVgroup, int32_t dnodeId, int32_t *pContLen) {
3,975✔
2935
  SForceBecomeFollowerReq balanceReq = {
3,975✔
2936
      .vgId = pVgroup->vgId,
3,975✔
2937
  };
2938

2939
  int32_t contLen = tSerializeSForceBecomeFollowerReq(NULL, 0, &balanceReq);
3,975✔
2940
  if (contLen < 0) {
3,975✔
UNCOV
2941
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
2942
    return NULL;
×
2943
  }
2944
  contLen += sizeof(SMsgHead);
3,975✔
2945

2946
  void *pReq = taosMemoryMalloc(contLen);
3,975✔
2947
  if (pReq == NULL) {
3,975✔
UNCOV
2948
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
2949
    return NULL;
×
2950
  }
2951

2952
  SMsgHead *pHead = pReq;
3,975✔
2953
  pHead->contLen = htonl(contLen);
3,975✔
2954
  pHead->vgId = htonl(pVgroup->vgId);
3,975✔
2955

2956
  if (tSerializeSForceBecomeFollowerReq((char *)pReq + sizeof(SMsgHead), contLen, &balanceReq) < 0) {
3,975✔
UNCOV
2957
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
2958
    taosMemoryFree(pReq);
×
UNCOV
2959
    return NULL;
×
2960
  }
2961
  *pContLen = contLen;
3,975✔
2962
  return pReq;
3,975✔
2963
}
2964

2965
int32_t mndAddBalanceVgroupLeaderAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t dnodeId) {
3,975✔
2966
  int32_t    code = 0;
3,975✔
2967
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
3,975✔
2968
  if (pDnode == NULL) {
3,975✔
UNCOV
2969
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2970
    if (terrno != 0) code = terrno;
×
UNCOV
2971
    TAOS_RETURN(code);
×
2972
  }
2973

2974
  STransAction action = {0};
3,975✔
2975
  action.epSet = mndGetDnodeEpset(pDnode);
3,975✔
2976
  mndReleaseDnode(pMnode, pDnode);
3,975✔
2977

2978
  int32_t contLen = 0;
3,975✔
2979
  void   *pReq = mndBuildSForceBecomeFollowerReq(pMnode, pVgroup, dnodeId, &contLen);
3,975✔
2980
  if (pReq == NULL) {
3,975✔
UNCOV
2981
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
2982
    if (terrno != 0) code = terrno;
×
UNCOV
2983
    TAOS_RETURN(code);
×
2984
  }
2985

2986
  action.pCont = pReq;
3,975✔
2987
  action.contLen = contLen;
3,975✔
2988
  action.msgType = TDMT_SYNC_FORCE_FOLLOWER;
3,975✔
2989

2990
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
3,975✔
UNCOV
2991
    taosMemoryFree(pReq);
×
UNCOV
2992
    TAOS_RETURN(code);
×
2993
  }
2994

2995
  TAOS_RETURN(code);
3,975✔
2996
}
2997

2998
static void *mndBuildAlterVnodeElectBaselineReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId,
23,850✔
2999
                                          int32_t *pContLen, int32_t ms) {
3000
  SAlterVnodeElectBaselineReq alterReq = {
23,850✔
3001
      .vgId = pVgroup->vgId,
23,850✔
3002
      .electBaseLine = ms,
3003
  };
3004

3005
  int32_t contLen = tSerializeSAlterVnodeReplicaReq(NULL, 0, &alterReq);
23,850✔
3006
  if (contLen < 0) {
23,850✔
UNCOV
3007
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
3008
    return NULL;
×
3009
  }
3010

3011
  void *pReq = taosMemoryMalloc(contLen);
23,850✔
3012
  if (pReq == NULL) {
23,850✔
UNCOV
3013
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
3014
    return NULL;
×
3015
  }
3016

3017
  if (tSerializeSAlterVnodeReplicaReq(pReq, contLen, &alterReq) < 0) {
23,850✔
UNCOV
3018
    mError("vgId:%d, failed to serialize alter vnode req,since %s", alterReq.vgId, terrstr());
×
UNCOV
3019
    taosMemoryFree(pReq);
×
UNCOV
3020
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
3021
    return NULL;
×
3022
  }
3023
  *pContLen = contLen;
23,850✔
3024
  return pReq;
23,850✔
3025
}
3026

3027
static int32_t mndAddAlterVnodeElectionBaselineActionToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId, int32_t ms) {
23,850✔
3028
  int32_t    code = 0;
23,850✔
3029
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
23,850✔
3030
  if (pDnode == NULL) {
23,850✔
UNCOV
3031
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
3032
    if (terrno != 0) code = terrno;
×
UNCOV
3033
    TAOS_RETURN(code);
×
3034
  }
3035

3036
  STransAction action = {0};
23,850✔
3037
  action.epSet = mndGetDnodeEpset(pDnode);
23,850✔
3038
  mndReleaseDnode(pMnode, pDnode);
23,850✔
3039

3040
  int32_t contLen = 0;
23,850✔
3041
  void   *pReq = mndBuildAlterVnodeElectBaselineReq(pMnode, pDb, pVgroup, dnodeId, &contLen, ms);
23,850✔
3042
  if (pReq == NULL) {
23,850✔
UNCOV
3043
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
3044
    if (terrno != 0) code = terrno;
×
UNCOV
3045
    TAOS_RETURN(code);
×
3046
  }
3047

3048
  action.pCont = pReq;
23,850✔
3049
  action.contLen = contLen;
23,850✔
3050
  action.msgType = TDMT_VND_ALTER_ELECTBASELINE;
23,850✔
3051
  action.groupId = pVgroup->vgId;
23,850✔
3052

3053
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
23,850✔
UNCOV
3054
    taosMemoryFree(pReq);
×
UNCOV
3055
    TAOS_RETURN(code);
×
3056
  }
3057

3058
  TAOS_RETURN(code);
23,850✔
3059
}
3060

3061
static int32_t mndAddAlterVgroupElectionBaselineActionToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTrans, int32_t index){
7,950✔
3062
  int32_t code = 0;
7,950✔
3063
  SSdb   *pSdb = pMnode->pSdb;
7,950✔
3064

3065
  int32_t vgid = pVgroup->vgId;
7,950✔
3066
  int8_t  replica = pVgroup->replica;
7,950✔
3067

3068
  if (pVgroup->replica <= 1) {
7,950✔
UNCOV
3069
    mInfo("trans:%d, vgid:%d no need to balance, replica:%d", pTrans->id, vgid, replica);
×
UNCOV
3070
    return -1;
×
3071
  }
3072

3073
  for(int32_t i = 0; i < 3; i++){
31,800✔
3074
    if(i == index%3){
23,850✔
3075
      mInfo("trans:%d, balance leader to dnode:%d", pTrans->id, pVgroup->vnodeGid[i].dnodeId);
3,975✔
3076
      TAOS_CHECK_RETURN(mndAddAlterVnodeElectionBaselineActionToTrans(pMnode, pTrans, NULL, pVgroup,
3,975✔
3077
                                                                      pVgroup->vnodeGid[i].dnodeId, 1500));
3078
    }
3079
    else{
3080
    TAOS_CHECK_RETURN(
19,875✔
3081
        mndAddAlterVnodeElectionBaselineActionToTrans(pMnode, pTrans, NULL, pVgroup, pVgroup->vnodeGid[i].dnodeId, 5000));
3082
    }
3083
  }
3084
  return code; 
7,950✔
3085
}
3086

3087
int32_t mndAddVgroupBalanceToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTrans, int32_t index) {
4,713✔
3088
  int32_t code = 0;
4,713✔
3089
  SSdb   *pSdb = pMnode->pSdb;
4,713✔
3090

3091
  int32_t vgid = pVgroup->vgId;
4,713✔
3092
  int8_t  replica = pVgroup->replica;
4,713✔
3093

3094
  if (pVgroup->replica <= 1) {
4,713✔
3095
    mInfo("trans:%d, vgid:%d no need to balance, replica:%d", pTrans->id, vgid, replica);
258✔
3096
    return -1;
258✔
3097
  }
3098

3099
  int32_t dnodeId = 0;
4,455✔
3100

3101
  for (int i = 0; i < replica; i++) {
9,925✔
3102
    if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEADER) {
9,445✔
3103
      dnodeId = pVgroup->vnodeGid[i].dnodeId;
3,975✔
3104
      break;
3,975✔
3105
    }
3106
  }
3107

3108
  bool       exist = false;
4,455✔
3109
  bool       online = false;
4,455✔
3110
  int64_t    curMs = taosGetTimestampMs();
4,455✔
3111
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
4,455✔
3112
  if (pDnode != NULL) {
4,455✔
3113
    exist = true;
3,975✔
3114
    online = mndIsDnodeOnline(pDnode, curMs);
3,975✔
3115
    mndReleaseDnode(pMnode, pDnode);
3,975✔
3116
  }
3117

3118
  if (exist && online) {
8,430✔
3119
    mInfo("trans:%d, vgid:%d force drop leader from dnode:%d", pTrans->id, vgid, dnodeId);    
3,975✔
3120
    TAOS_CHECK_RETURN(mndAddAlterVgroupElectionBaselineActionToTrans(pMnode, pVgroup, pTrans, index));
3,975✔
3121

3122
    if ((code = mndAddBalanceVgroupLeaderAction(pMnode, pTrans, pVgroup, dnodeId)) != 0) {
3,975✔
UNCOV
3123
      mError("trans:%d, vgid:%d failed to be balanced to dnode:%d", pTrans->id, vgid, dnodeId);
×
UNCOV
3124
      TAOS_RETURN(code);
×
3125
    }
3126

3127
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, NULL, pVgroup));
3,975✔
3128

3129
    TAOS_CHECK_RETURN(mndAddAlterVgroupElectionBaselineActionToTrans(pMnode, pVgroup, pTrans, -1));
3,975✔
3130

3131
    SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
3,975✔
3132
    if (pDb == NULL) {
3,975✔
UNCOV
3133
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
3134
      if (terrno != 0) code = terrno;
×
UNCOV
3135
      mError("trans:%d, vgid:%d failed to be balanced to dnode:%d, because db not exist", pTrans->id, vgid, dnodeId);
×
3136
      TAOS_RETURN(code);
×
3137
    }
3138

3139
    mndReleaseDb(pMnode, pDb);
3,975✔
3140
  } else {
3141
    mInfo("trans:%d, vgid:%d cant be balanced to dnode:%d, exist:%d, online:%d", pTrans->id, vgid, dnodeId, exist,
480✔
3142
          online);
3143
  }
3144

3145
  TAOS_RETURN(code);
4,455✔
3146
}
3147

3148
extern int32_t mndProcessVgroupBalanceLeaderMsgImp(SRpcMsg *pReq);
3149

3150
int32_t mndProcessVgroupBalanceLeaderMsg(SRpcMsg *pReq) { return mndProcessVgroupBalanceLeaderMsgImp(pReq); }
2,002✔
3151

3152
#ifndef TD_ENTERPRISE
3153
int32_t mndProcessVgroupBalanceLeaderMsgImp(SRpcMsg *pReq) { return 0; }
3154
#endif
3155

3156
static int32_t mndCheckDnodeMemory(SMnode *pMnode, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pOldVgroup,
187,098✔
3157
                                   SVgObj *pNewVgroup, SArray *pArray) {
3158
  for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
570,196✔
3159
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
383,098✔
3160
    bool       inVgroup = false;
383,098✔
3161
    int64_t    oldMemUsed = 0;
383,098✔
3162
    int64_t    newMemUsed = 0;
383,098✔
3163
    mDebug("db:%s, vgId:%d, check dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName, pNewVgroup->vgId,
383,098✔
3164
           pDnode->id, pDnode->memAvail, pDnode->memUsed);
3165
    for (int32_t j = 0; j < pOldVgroup->replica; ++j) {
1,109,504✔
3166
      SVnodeGid *pVgId = &pOldVgroup->vnodeGid[j];
726,406✔
3167
      if (pDnode->id == pVgId->dnodeId) {
726,406✔
3168
        oldMemUsed = mndGetVgroupMemory(pMnode, pOldDb, pOldVgroup);
301,534✔
3169
        inVgroup = true;
301,534✔
3170
      }
3171
    }
3172
    for (int32_t j = 0; j < pNewVgroup->replica; ++j) {
1,109,504✔
3173
      SVnodeGid *pVgId = &pNewVgroup->vnodeGid[j];
726,406✔
3174
      if (pDnode->id == pVgId->dnodeId) {
726,406✔
3175
        newMemUsed = mndGetVgroupMemory(pMnode, pNewDb, pNewVgroup);
301,534✔
3176
        inVgroup = true;
301,534✔
3177
      }
3178
    }
3179

3180
    mDebug("db:%s, vgId:%d, memory in dnode:%d, oldUsed:%" PRId64 ", newUsed:%" PRId64, pNewVgroup->dbName,
383,098✔
3181
           pNewVgroup->vgId, pDnode->id, oldMemUsed, newMemUsed);
3182

3183
    pDnode->memUsed = pDnode->memUsed - oldMemUsed + newMemUsed;
383,098✔
3184
    if (pDnode->memAvail - pDnode->memUsed <= 0) {
383,098✔
UNCOV
3185
      mError("db:%s, vgId:%d, no enough memory in dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName,
×
3186
             pNewVgroup->vgId, pDnode->id, pDnode->memAvail, pDnode->memUsed);
UNCOV
3187
      TAOS_RETURN(TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE);
×
3188
    } else if (inVgroup) {
383,098✔
3189
      mInfo("db:%s, vgId:%d, memory in dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName, pNewVgroup->vgId,
301,534✔
3190
            pDnode->id, pDnode->memAvail, pDnode->memUsed);
3191
    } else {
3192
    }
3193
  }
3194
  return 0;
187,098✔
3195
}
3196

3197
int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
222,744✔
3198
                                  SArray *pArray, SVgObj *pNewVgroup) {
3199
  int32_t code = 0;
222,744✔
3200
  memcpy(pNewVgroup, pVgroup, sizeof(SVgObj));
222,744✔
3201

3202
  if (pVgroup->replica <= 0 || pVgroup->replica == pNewDb->cfg.replications) {
222,744✔
3203
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfigAction(pMnode, pTrans, pNewDb, pVgroup));
187,098✔
3204
    TAOS_CHECK_RETURN(mndCheckDnodeMemory(pMnode, pOldDb, pNewDb, pNewVgroup, pVgroup, pArray));
187,098✔
3205
    return 0;
187,098✔
3206
  }
3207

3208
  // mndTransSetGroupParallel(pTrans);
3209

3210
  if (pNewDb->cfg.replications == 3) {
35,646✔
3211
    mInfo("trans:%d, db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pTrans->id, pVgroup->dbName, pVgroup->vgId,
30,956✔
3212
          pVgroup->vnodeGid[0].dnodeId);
3213

3214
    // add second
3215
    if (pNewVgroup->replica == 1) {
30,956✔
3216
      TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
30,956✔
3217
    }
3218

3219
    // learner stage
3220
    pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
30,272✔
3221
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
30,272✔
3222
    TAOS_CHECK_RETURN(
30,272✔
3223
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3224

3225
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &pNewVgroup->vnodeGid[1]));
30,272✔
3226

3227
    // follower stage
3228
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
30,272✔
3229
    TAOS_CHECK_RETURN(mndAddAlterVnodeTypeAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
30,272✔
3230
    TAOS_CHECK_RETURN(
30,272✔
3231
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3232

3233
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
30,272✔
3234

3235
    // add third
3236
    if (pNewVgroup->replica == 2) {
30,272✔
3237
      TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
30,272✔
3238
    }
3239

3240
    pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
27,635✔
3241
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
27,635✔
3242
    pNewVgroup->vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
27,635✔
3243
    TAOS_CHECK_RETURN(
27,635✔
3244
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3245
    TAOS_CHECK_RETURN(
27,635✔
3246
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
3247
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &pNewVgroup->vnodeGid[2]));
27,635✔
3248

3249
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
27,635✔
3250
  } else if (pNewDb->cfg.replications == 1) {
4,690✔
3251
    mInfo("trans:%d, db:%s, vgId:%d, will remove 2 vnodes, vn:0 dnode:%d vn:1 dnode:%d vn:2 dnode:%d", pTrans->id,
3,318✔
3252
          pVgroup->dbName, pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId, pVgroup->vnodeGid[1].dnodeId,
3253
          pVgroup->vnodeGid[2].dnodeId);
3254

3255
    SVnodeGid del1 = {0};
3,318✔
3256
    SVnodeGid del2 = {0};
3,318✔
3257
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroup(pMnode, pTrans, pNewVgroup, pArray, &del1));
3,318✔
3258
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &del1, true));
3,318✔
3259
    TAOS_CHECK_RETURN(
3,318✔
3260
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3261
    TAOS_CHECK_RETURN(
3,318✔
3262
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
3263
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
3,318✔
3264

3265
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroup(pMnode, pTrans, pNewVgroup, pArray, &del2));
3,318✔
3266
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &del2, true));
3,318✔
3267
    TAOS_CHECK_RETURN(
3,318✔
3268
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3269
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
3,318✔
3270
  } else if (pNewDb->cfg.replications == 2) {
1,372✔
3271
    mInfo("trans:%d, db:%s, vgId:%d, will add 1 vnode, vn:0 dnode:%d", pTrans->id, pVgroup->dbName, pVgroup->vgId,
1,372✔
3272
          pVgroup->vnodeGid[0].dnodeId);
3273

3274
    // add second
3275
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
1,372✔
3276

3277
    // learner stage
3278
    pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
1,372✔
3279
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
1,372✔
3280
    TAOS_CHECK_RETURN(
1,372✔
3281
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3282

3283
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &pNewVgroup->vnodeGid[1]));
1,372✔
3284

3285
    // follower stage
3286
    pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
1,372✔
3287
    TAOS_CHECK_RETURN(mndAddAlterVnodeTypeAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[1].dnodeId));
1,372✔
3288
    TAOS_CHECK_RETURN(
1,372✔
3289
        mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
3290

3291
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
1,372✔
3292
  } else {
UNCOV
3293
    return -1;
×
3294
  }
3295

3296
  mndSortVnodeGid(pNewVgroup);
32,325✔
3297

3298
  {
3299
    SSdbRaw *pVgRaw = mndVgroupActionEncode(pNewVgroup);
32,325✔
3300
    if (pVgRaw == NULL) {
32,325✔
UNCOV
3301
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
3302
      if (terrno != 0) code = terrno;
×
UNCOV
3303
      TAOS_RETURN(code);
×
3304
    }
3305
    if ((code = mndTransAppendCommitlog(pTrans, pVgRaw)) != 0) {
32,325✔
3306
      sdbFreeRaw(pVgRaw);
×
UNCOV
3307
      TAOS_RETURN(code);
×
3308
    }
3309
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
32,325✔
3310
    if (code != 0) {
32,325✔
UNCOV
3311
      mError("vgId:%d, failed to set raw status since %s at line:%d", pNewVgroup->vgId, tstrerror(code), __LINE__);
×
UNCOV
3312
      TAOS_RETURN(code);
×
3313
    }
3314
  }
3315

3316
  TAOS_RETURN(code);
32,325✔
3317
}
3318

UNCOV
3319
int32_t mndBuildRaftAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
×
3320
                                      SArray *pArray) {
UNCOV
3321
  int32_t code = 0;
×
3322
  SVgObj  newVgroup = {0};
×
UNCOV
3323
  memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
×
3324

3325
  if (pVgroup->replica <= 0 || pVgroup->replica == pNewDb->cfg.replications) {
×
3326
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfigAction(pMnode, pTrans, pNewDb, pVgroup));
×
UNCOV
3327
    TAOS_CHECK_RETURN(mndCheckDnodeMemory(pMnode, pOldDb, pNewDb, &newVgroup, pVgroup, pArray));
×
3328
    return 0;
×
3329
  }
3330

3331
  mndTransSetSerial(pTrans);
×
3332

UNCOV
3333
  mInfo("trans:%d, vgId:%d, alter vgroup, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id, pVgroup->vgId,
×
3334
        pVgroup->syncConfChangeVer, pVgroup->version, pVgroup->replica);
3335

3336
  if (newVgroup.replica == 1 && pNewDb->cfg.replications == 3) {
×
UNCOV
3337
    mInfo("db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId,
×
3338
          pVgroup->vnodeGid[0].dnodeId);
3339

3340
    // add second
UNCOV
3341
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVgroup, pArray));
×
3342
    // add third
UNCOV
3343
    TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, &newVgroup, pArray));
×
3344

3345
    // add learner stage
3346
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
UNCOV
3347
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
UNCOV
3348
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3349
    TAOS_CHECK_RETURN(
×
3350
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3351
    mInfo("trans:%d, vgId:%d, add change config, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id,
×
3352
          pVgroup->vgId, newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
UNCOV
3353
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[1]));
×
3354
    mInfo("trans:%d, vgId:%d, create vnode, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id, pVgroup->vgId,
×
3355
          newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
3356
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[2]));
×
3357
    mInfo("trans:%d, vgId:%d, create vnode, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id, pVgroup->vgId,
×
3358
          newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
3359

3360
    // check learner
UNCOV
3361
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
UNCOV
3362
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
UNCOV
3363
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3364
    TAOS_CHECK_RETURN(
×
3365
        mndAddCheckLearnerCatchupAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[1].dnodeId));
3366
    TAOS_CHECK_RETURN(
×
3367
        mndAddCheckLearnerCatchupAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[2].dnodeId));
3368

3369
    // change raft type
UNCOV
3370
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
UNCOV
3371
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
UNCOV
3372
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3373
    TAOS_CHECK_RETURN(
×
3374
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3375

3376
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3377

UNCOV
3378
    newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3379
    newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
UNCOV
3380
    newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3381
    TAOS_CHECK_RETURN(
×
3382
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3383

3384
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3385

UNCOV
3386
    SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
×
3387
    if (pVgRaw == NULL) {
×
UNCOV
3388
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3389
      if (terrno != 0) code = terrno;
×
3390
      TAOS_RETURN(code);
×
3391
    }
3392
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
3393
      sdbFreeRaw(pVgRaw);
×
UNCOV
3394
      TAOS_RETURN(code);
×
3395
    }
3396
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3397
    if (code != 0) {
×
UNCOV
3398
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3399
             __LINE__);
3400
      TAOS_RETURN(code);
×
3401
    }
UNCOV
3402
  } else if (newVgroup.replica == 3 && pNewDb->cfg.replications == 1) {
×
3403
    mInfo("db:%s, vgId:%d, will remove 2 vnodes, vn:0 dnode:%d vn:1 dnode:%d vn:2 dnode:%d", pVgroup->dbName,
×
3404
          pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId, pVgroup->vnodeGid[1].dnodeId, pVgroup->vnodeGid[2].dnodeId);
3405

3406
    SVnodeGid del1 = {0};
×
UNCOV
3407
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroupWithoutSave(pMnode, pTrans, &newVgroup, pArray, &del1));
×
3408

3409
    TAOS_CHECK_RETURN(
×
3410
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3411

3412
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3413

UNCOV
3414
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del1, true));
×
3415

UNCOV
3416
    SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
×
3417
    if (pVgRaw == NULL) {
×
UNCOV
3418
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3419
      if (terrno != 0) code = terrno;
×
3420
      TAOS_RETURN(code);
×
3421
    }
3422
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
3423
      sdbFreeRaw(pVgRaw);
×
UNCOV
3424
      TAOS_RETURN(code);
×
3425
    }
3426
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3427
    if (code != 0) {
×
UNCOV
3428
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3429
             __LINE__);
3430
      TAOS_RETURN(code);
×
3431
    }
3432

3433
    SVnodeGid del2 = {0};
×
UNCOV
3434
    TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroupWithoutSave(pMnode, pTrans, &newVgroup, pArray, &del2));
×
3435

3436
    TAOS_CHECK_RETURN(
×
3437
        mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId));
3438

3439
    TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup));
×
3440

UNCOV
3441
    TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del2, true));
×
3442

UNCOV
3443
    pVgRaw = mndVgroupActionEncode(&newVgroup);
×
3444
    if (pVgRaw == NULL) {
×
UNCOV
3445
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3446
      if (terrno != 0) code = terrno;
×
3447
      TAOS_RETURN(code);
×
3448
    }
3449
    if ((code = mndTransAppendRedolog(pTrans, pVgRaw)) != 0) {
×
3450
      sdbFreeRaw(pVgRaw);
×
UNCOV
3451
      TAOS_RETURN(code);
×
3452
    }
3453
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3454
    if (code != 0) {
×
UNCOV
3455
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3456
             __LINE__);
3457
      TAOS_RETURN(code);
×
3458
    }
3459
  } else {
3460
    return -1;
×
3461
  }
3462

3463
  mndSortVnodeGid(&newVgroup);
×
3464

3465
  {
3466
    SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
×
UNCOV
3467
    if (pVgRaw == NULL) {
×
UNCOV
3468
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
3469
      if (terrno != 0) code = terrno;
×
3470
      TAOS_RETURN(code);
×
3471
    }
3472
    if ((code = mndTransAppendCommitlog(pTrans, pVgRaw)) != 0) {
×
3473
      sdbFreeRaw(pVgRaw);
×
UNCOV
3474
      TAOS_RETURN(code);
×
3475
    }
3476
    code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
×
3477
    if (code != 0) {
×
UNCOV
3478
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code),
×
3479
             __LINE__);
3480
      TAOS_RETURN(code);
×
3481
    }
3482
  }
3483

UNCOV
3484
  TAOS_RETURN(code);
×
3485
}
3486

3487
int32_t mndBuildRestoreAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *db, SVgObj *pVgroup, SDnodeObj *pDnode,
2,110✔
3488
                                         SDnodeObj *pAnotherDnode) {
3489
  int32_t code = 0;
2,110✔
3490
  SVgObj  newVgroup = {0};
2,110✔
3491
  memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
2,110✔
3492

3493
  mInfo("trans:%d, db:%s, vgId:%d, restore vnodes, vn:0 dnode:%d", pTrans->id, pVgroup->dbName, pVgroup->vgId,
2,110✔
3494
        pVgroup->vnodeGid[0].dnodeId);
3495

3496
  if (newVgroup.replica == 1) {
2,110✔
UNCOV
3497
    int selected = 0;
×
UNCOV
3498
    for (int i = 0; i < newVgroup.replica; i++) {
×
UNCOV
3499
      newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3500
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3501
        selected = i;
×
3502
      }
3503
    }
3504
    TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, &newVgroup.vnodeGid[selected]));
×
3505
  } else if (newVgroup.replica == 2) {
2,110✔
UNCOV
3506
    for (int i = 0; i < newVgroup.replica; i++) {
×
3507
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
UNCOV
3508
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3509
      } else {
3510
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3511
      }
3512
    }
3513
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pAnotherDnode));
×
3514

UNCOV
3515
    for (int i = 0; i < newVgroup.replica; i++) {
×
3516
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
UNCOV
3517
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3518
      } else {
3519
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3520
      }
3521
    }
3522
    TAOS_CHECK_RETURN(mndRestoreAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, pDnode));
×
3523

UNCOV
3524
    for (int i = 0; i < newVgroup.replica; i++) {
×
3525
      newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
UNCOV
3526
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
×
3527
      }
3528
    }
3529
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pDnode));
×
UNCOV
3530
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pAnotherDnode));
×
3531
  } else if (newVgroup.replica == 3) {
2,110✔
3532
    for (int i = 0; i < newVgroup.replica; i++) {
8,440✔
3533
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
6,330✔
3534
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_LEARNER;
2,110✔
3535
      } else {
3536
        newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
4,220✔
3537
      }
3538
    }
3539
    TAOS_CHECK_RETURN(mndRestoreAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, pDnode));
2,110✔
3540

3541
    for (int i = 0; i < newVgroup.replica; i++) {
8,440✔
3542
      newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
6,330✔
3543
      if (newVgroup.vnodeGid[i].dnodeId == pDnode->id) {
6,330✔
3544
      }
3545
    }
3546
    TAOS_CHECK_RETURN(mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pDnode));
2,110✔
3547
  }
3548
  SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
2,110✔
3549
  if (pVgRaw == NULL) {
2,110✔
UNCOV
3550
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
3551
    if (terrno != 0) code = terrno;
×
UNCOV
3552
    TAOS_RETURN(code);
×
3553
  }
3554
  if ((code = mndTransAppendCommitlog(pTrans, pVgRaw)) != 0) {
2,110✔
3555
    sdbFreeRaw(pVgRaw);
×
UNCOV
3556
    TAOS_RETURN(code);
×
3557
  }
3558
  code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
2,110✔
3559
  if (code != 0) {
2,110✔
UNCOV
3560
    mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVgroup.vgId, tstrerror(code), __LINE__);
×
UNCOV
3561
    TAOS_RETURN(code);
×
3562
  }
3563

3564
  TAOS_RETURN(code);
2,110✔
3565
}
3566

UNCOV
3567
static int32_t mndAddAdjustVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
×
UNCOV
3568
  return 0;
×
3569
}
3570

3571
typedef int32_t (*FpTransActionCb)(STrans *pTrans, SSdbRaw *pRaw);
3572

3573
static int32_t mndAddVgStatusAction(STrans *pTrans, SVgObj *pVg, ESdbStatus vgStatus, ETrnStage stage) {
72,139✔
3574
  int32_t         code = 0;
72,139✔
3575
  FpTransActionCb appendActionCb = (stage == TRN_STAGE_COMMIT_ACTION) ? mndTransAppendCommitlog : mndTransAppendRedolog;
72,139✔
3576
  SSdbRaw        *pRaw = mndVgroupActionEncode(pVg);
72,139✔
3577
  if (pRaw == NULL) {
72,139✔
UNCOV
3578
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
3579
    if (terrno != 0) code = terrno;
×
UNCOV
3580
    goto _err;
×
3581
  }
3582
  if ((code = appendActionCb(pTrans, pRaw)) != 0) goto _err;
72,139✔
3583
  code = sdbSetRawStatus(pRaw, vgStatus);
72,139✔
3584
  if (code != 0) {
72,139✔
UNCOV
3585
    mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", pVg->vgId, tstrerror(code), __LINE__);
×
UNCOV
3586
    goto _err;
×
3587
  }
3588
  pRaw = NULL;
72,139✔
3589
  TAOS_RETURN(code);
72,139✔
UNCOV
3590
_err:
×
UNCOV
3591
  sdbFreeRaw(pRaw);
×
UNCOV
3592
  TAOS_RETURN(code);
×
3593
}
3594

3595
static int32_t mndAddDbStatusAction(STrans *pTrans, SDbObj *pDb, ESdbStatus dbStatus, ETrnStage stage) {
29,105✔
3596
  int32_t         code = 0;
29,105✔
3597
  FpTransActionCb appendActionCb = (stage == TRN_STAGE_COMMIT_ACTION) ? mndTransAppendCommitlog : mndTransAppendRedolog;
29,105✔
3598
  SSdbRaw        *pRaw = mndDbActionEncode(pDb);
29,105✔
3599
  if (pRaw == NULL) {
29,105✔
UNCOV
3600
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
3601
    if (terrno != 0) code = terrno;
×
UNCOV
3602
    goto _err;
×
3603
  }
3604
  if ((code = appendActionCb(pTrans, pRaw)) != 0) goto _err;
29,105✔
3605
  code = sdbSetRawStatus(pRaw, dbStatus);
29,105✔
3606
  if (code != 0) {
29,105✔
UNCOV
3607
    mError("db:%s, failed to set raw status to ready, error:%s, line:%d", pDb->name, tstrerror(code), __LINE__);
×
UNCOV
3608
    goto _err;
×
3609
  }
3610
  pRaw = NULL;
29,105✔
3611
  TAOS_RETURN(code);
29,105✔
UNCOV
3612
_err:
×
UNCOV
3613
  sdbFreeRaw(pRaw);
×
UNCOV
3614
  TAOS_RETURN(code);
×
3615
}
3616

3617
int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) {
16,475✔
3618
  int32_t code = -1;
16,475✔
3619
  STrans *pTrans = NULL;
16,475✔
3620
  SDbObj  dbObj = {0};
16,475✔
3621
  SArray *pArray = mndBuildDnodesArray(pMnode, 0, NULL);
16,475✔
3622

3623
#if defined(USE_SHARED_STORAGE)
3624
  if (tsSsEnabled) {
16,475✔
UNCOV
3625
    code = TSDB_CODE_OPS_NOT_SUPPORT;
×
UNCOV
3626
    mError("vgId:%d, db:%s, shared storage exists, split vgroup not allowed", pVgroup->vgId, pVgroup->dbName);
×
UNCOV
3627
    goto _OVER;
×
3628
  }
3629
#endif
3630

3631
  /*
3632
    if (pDb->cfg.withArbitrator) {
3633
      code = TSDB_CODE_OPS_NOT_SUPPORT;
3634
      mError("vgId:%d, db:%s, with arbitrator, split vgroup not allowed", pVgroup->vgId, pVgroup->dbName);
3635
      goto _OVER;
3636
    }
3637
  */
3638

3639
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "split-vgroup");
16,475✔
3640
  if (pTrans == NULL) {
16,475✔
UNCOV
3641
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
3642
    if (terrno != 0) code = terrno;
×
UNCOV
3643
    goto _OVER;
×
3644
  }
3645
  mndTransSetSerial(pTrans);
16,475✔
3646
  mInfo("trans:%d, used to split vgroup, vgId:%d", pTrans->id, pVgroup->vgId);
16,475✔
3647

3648
  mndTransSetDbName(pTrans, pDb->name, NULL);
16,475✔
3649
  TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
16,475✔
3650
  TAOS_CHECK_GOTO(mndTransCheckConflictWithRetention(pMnode, pTrans), NULL, _OVER);
16,416✔
3651

3652
  SVgObj newVg1 = {0};
16,416✔
3653
  memcpy(&newVg1, pVgroup, sizeof(SVgObj));
16,416✔
3654
  mInfo("vgId:%d, vgroup info before split, replica:%d hashBegin:%u hashEnd:%u", newVg1.vgId, newVg1.replica,
16,416✔
3655
        newVg1.hashBegin, newVg1.hashEnd);
3656
  for (int32_t i = 0; i < newVg1.replica; ++i) {
53,150✔
3657
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg1.vgId, i, newVg1.vnodeGid[i].dnodeId);
36,734✔
3658
  }
3659

3660
  if (newVg1.replica == 1) {
16,416✔
3661
    TAOS_CHECK_GOTO(mndAddVnodeToVgroup(pMnode, pTrans, &newVg1, pArray), NULL, _OVER);
5,914✔
3662

3663
    newVg1.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
5,646✔
3664
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId), NULL,
5,646✔
3665
                    _OVER);
3666
    TAOS_CHECK_GOTO(mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg1, &newVg1.vnodeGid[1]), NULL, _OVER);
5,646✔
3667

3668
    newVg1.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
5,646✔
3669
    TAOS_CHECK_GOTO(mndAddAlterVnodeTypeAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[1].dnodeId), NULL, _OVER);
5,646✔
3670
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId), NULL,
5,646✔
3671
                    _OVER);
3672

3673
    TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1), NULL, _OVER);
5,646✔
3674
  } else if (newVg1.replica == 3) {
10,502✔
3675
    SVnodeGid del1 = {0};
9,816✔
3676
    TAOS_CHECK_GOTO(mndRemoveVnodeFromVgroup(pMnode, pTrans, &newVg1, pArray, &del1), NULL, _OVER);
9,816✔
3677
    TAOS_CHECK_GOTO(mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg1, &del1, true), NULL, _OVER);
9,539✔
3678
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId), NULL,
9,539✔
3679
                    _OVER);
3680
    TAOS_CHECK_GOTO(mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[1].dnodeId), NULL,
9,539✔
3681
                    _OVER);
3682
  } else {
3683
    // goto _OVER;
3684
  }
3685

3686
  for (int32_t i = 0; i < newVg1.replica; ++i) {
47,613✔
3687
    TAOS_CHECK_GOTO(mndAddDisableVnodeWriteAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[i].dnodeId), NULL,
31,742✔
3688
                    _OVER);
3689
  }
3690
  TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1), NULL, _OVER);
15,871✔
3691

3692
  SVgObj newVg2 = {0};
15,871✔
3693
  memcpy(&newVg2, &newVg1, sizeof(SVgObj));
15,871✔
3694
  newVg1.replica = 1;
15,871✔
3695
  newVg1.hashEnd = newVg1.hashBegin / 2 + newVg1.hashEnd / 2;
15,871✔
3696
  memset(&newVg1.vnodeGid[1], 0, sizeof(SVnodeGid));
15,871✔
3697

3698
  newVg2.replica = 1;
15,871✔
3699
  newVg2.hashBegin = newVg1.hashEnd + 1;
15,871✔
3700
  memcpy(&newVg2.vnodeGid[0], &newVg2.vnodeGid[1], sizeof(SVnodeGid));
15,871✔
3701
  memset(&newVg2.vnodeGid[1], 0, sizeof(SVnodeGid));
15,871✔
3702

3703
  mInfo("vgId:%d, vgroup info after split, replica:%d hashrange:[%u, %u] vnode:0 dnode:%d", newVg1.vgId, newVg1.replica,
15,871✔
3704
        newVg1.hashBegin, newVg1.hashEnd, newVg1.vnodeGid[0].dnodeId);
3705
  for (int32_t i = 0; i < newVg1.replica; ++i) {
31,742✔
3706
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg1.vgId, i, newVg1.vnodeGid[i].dnodeId);
15,871✔
3707
  }
3708
  mInfo("vgId:%d, vgroup info after split, replica:%d hashrange:[%u, %u] vnode:0 dnode:%d", newVg2.vgId, newVg2.replica,
15,871✔
3709
        newVg2.hashBegin, newVg2.hashEnd, newVg2.vnodeGid[0].dnodeId);
3710
  for (int32_t i = 0; i < newVg1.replica; ++i) {
31,742✔
3711
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg2.vgId, i, newVg2.vnodeGid[i].dnodeId);
15,871✔
3712
  }
3713

3714
  // alter vgId and hash range
3715
  int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
15,871✔
3716
  int32_t srcVgId = newVg1.vgId;
15,871✔
3717
  newVg1.vgId = maxVgId;
15,871✔
3718
  TAOS_CHECK_GOTO(mndAddNewVgPrepareAction(pMnode, pTrans, &newVg1), NULL, _OVER);
15,871✔
3719
  TAOS_CHECK_GOTO(mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg1), NULL, _OVER);
15,871✔
3720

3721
  maxVgId++;
15,871✔
3722
  srcVgId = newVg2.vgId;
15,871✔
3723
  newVg2.vgId = maxVgId;
15,871✔
3724
  TAOS_CHECK_GOTO(mndAddNewVgPrepareAction(pMnode, pTrans, &newVg2), NULL, _OVER);
15,871✔
3725
  TAOS_CHECK_GOTO(mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg2), NULL, _OVER);
15,871✔
3726

3727
  TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1), NULL, _OVER);
15,871✔
3728
  TAOS_CHECK_GOTO(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg2), NULL, _OVER);
15,871✔
3729

3730
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg1, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION), NULL, _OVER);
15,871✔
3731
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg2, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION), NULL, _OVER);
15,871✔
3732
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, pVgroup, SDB_STATUS_DROPPED, TRN_STAGE_REDO_ACTION), NULL, _OVER);
15,871✔
3733

3734
  // update db status
3735
  memcpy(&dbObj, pDb, sizeof(SDbObj));
15,871✔
3736
  if (dbObj.cfg.pRetensions != NULL) {
15,871✔
UNCOV
3737
    dbObj.cfg.pRetensions = taosArrayDup(pDb->cfg.pRetensions, NULL);
×
UNCOV
3738
    if (dbObj.cfg.pRetensions == NULL) {
×
UNCOV
3739
      code = terrno;
×
3740
      goto _OVER;
×
3741
    }
3742
  }
3743
  dbObj.vgVersion++;
15,871✔
3744
  dbObj.updateTime = taosGetTimestampMs();
15,871✔
3745
  dbObj.cfg.numOfVgroups++;
15,871✔
3746
  TAOS_CHECK_GOTO(mndAddDbStatusAction(pTrans, &dbObj, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION), NULL, _OVER);
15,871✔
3747

3748
  // adjust vgroup replica
3749
  if (pDb->cfg.replications != newVg1.replica) {
15,871✔
3750
    SVgObj tmpGroup = {0};
10,225✔
3751
    TAOS_CHECK_GOTO(mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg1, pArray, &tmpGroup), NULL, _OVER);
10,225✔
3752
  } else {
3753
    TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg1, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
5,646✔
3754
  }
3755

3756
  if (pDb->cfg.replications != newVg2.replica) {
14,801✔
3757
    SVgObj tmpGroup = {0};
9,155✔
3758
    TAOS_CHECK_GOTO(mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg2, pArray, &tmpGroup), NULL, _OVER);
9,155✔
3759
  } else {
3760
    TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, &newVg2, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
5,646✔
3761
  }
3762

3763
  TAOS_CHECK_GOTO(mndAddVgStatusAction(pTrans, pVgroup, SDB_STATUS_DROPPED, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
13,234✔
3764

3765
  // commit db status
3766
  dbObj.vgVersion++;
13,234✔
3767
  dbObj.updateTime = taosGetTimestampMs();
13,234✔
3768
  TAOS_CHECK_GOTO(mndAddDbStatusAction(pTrans, &dbObj, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION), NULL, _OVER);
13,234✔
3769

3770
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
13,234✔
3771
  code = 0;
13,234✔
3772

3773
_OVER:
16,475✔
3774
  taosArrayDestroy(pArray);
16,475✔
3775
  mndTransDrop(pTrans);
16,475✔
3776
  taosArrayDestroy(dbObj.cfg.pRetensions);
16,475✔
3777
  TAOS_RETURN(code);
16,475✔
3778
}
3779

3780
extern int32_t mndProcessSplitVgroupMsgImp(SRpcMsg *pReq);
3781

3782
static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) { return mndProcessSplitVgroupMsgImp(pReq); }
16,845✔
3783

3784
#ifndef TD_ENTERPRISE
3785
int32_t mndProcessSplitVgroupMsgImp(SRpcMsg *pReq) { return 0; }
3786
#endif
3787

3788
static int32_t mndSetBalanceVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
14,441✔
3789
                                              SDnodeObj *pSrc, SDnodeObj *pDst) {
3790
  int32_t code = 0;
14,441✔
3791
  SVgObj  newVg = {0};
14,441✔
3792
  memcpy(&newVg, pVgroup, sizeof(SVgObj));
14,441✔
3793
  mInfo("vgId:%d, vgroup info before balance, replica:%d", newVg.vgId, newVg.replica);
14,441✔
3794
  for (int32_t i = 0; i < newVg.replica; ++i) {
42,532✔
3795
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
28,091✔
3796
  }
3797

3798
  TAOS_CHECK_RETURN(mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pDst->id));
14,441✔
3799
  TAOS_CHECK_RETURN(mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pSrc->id));
14,441✔
3800

3801
  {
3802
    SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
14,441✔
3803
    if (pRaw == NULL) {
14,441✔
UNCOV
3804
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
3805
      if (terrno != 0) code = terrno;
×
UNCOV
3806
      TAOS_RETURN(code);
×
3807
    }
3808
    if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
14,441✔
3809
      sdbFreeRaw(pRaw);
×
UNCOV
3810
      TAOS_RETURN(code);
×
3811
    }
3812
    code = sdbSetRawStatus(pRaw, SDB_STATUS_READY);
14,441✔
3813
    if (code != 0) {
14,441✔
UNCOV
3814
      mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", newVg.vgId, tstrerror(code), __LINE__);
×
UNCOV
3815
      TAOS_RETURN(code);
×
3816
    }
3817
  }
3818

3819
  mInfo("vgId:%d, vgroup info after balance, replica:%d", newVg.vgId, newVg.replica);
14,441✔
3820
  for (int32_t i = 0; i < newVg.replica; ++i) {
42,532✔
3821
    mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
28,091✔
3822
  }
3823
  TAOS_RETURN(code);
14,441✔
3824
}
3825

3826
static int32_t mndBalanceVgroupBetweenDnode(SMnode *pMnode, STrans *pTrans, SDnodeObj *pSrc, SDnodeObj *pDst,
14,441✔
3827
                                            SHashObj *pBalancedVgroups) {
3828
  void   *pIter = NULL;
14,441✔
3829
  int32_t code = -1;
14,441✔
3830
  SSdb   *pSdb = pMnode->pSdb;
14,441✔
3831

3832
  while (1) {
8,891✔
3833
    SVgObj *pVgroup = NULL;
23,332✔
3834
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
23,332✔
3835
    if (pIter == NULL) break;
23,332✔
3836
    if (taosHashGet(pBalancedVgroups, &pVgroup->vgId, sizeof(int32_t)) != NULL) {
23,332✔
3837
      sdbRelease(pSdb, pVgroup);
8,197✔
3838
      continue;
8,197✔
3839
    }
3840

3841
    bool existInSrc = false;
15,135✔
3842
    bool existInDst = false;
15,135✔
3843
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
43,920✔
3844
      SVnodeGid *pGid = &pVgroup->vnodeGid[i];
28,785✔
3845
      if (pGid->dnodeId == pSrc->id) existInSrc = true;
28,785✔
3846
      if (pGid->dnodeId == pDst->id) existInDst = true;
28,785✔
3847
    }
3848

3849
    if (!existInSrc || existInDst) {
15,135✔
3850
      sdbRelease(pSdb, pVgroup);
694✔
3851
      continue;
694✔
3852
    }
3853

3854
    SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
14,441✔
3855
    if (pDb == NULL) {
14,441✔
UNCOV
3856
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
3857
      if (terrno != 0) code = terrno;
×
UNCOV
3858
      mError("vgId:%d, balance vgroup can't find db obj dbName:%s", pVgroup->vgId, pVgroup->dbName);
×
3859
      goto _OUT;
×
3860
    }
3861

3862
    if (pDb->cfg.withArbitrator) {
14,441✔
UNCOV
3863
      mInfo("vgId:%d, db:%s, with arbitrator, balance vgroup not allowed", pVgroup->vgId, pVgroup->dbName);
×
UNCOV
3864
      goto _OUT;
×
3865
    }
3866

3867
    code = mndSetBalanceVgroupInfoToTrans(pMnode, pTrans, pDb, pVgroup, pSrc, pDst);
14,441✔
3868
    if (code == 0) {
14,441✔
3869
      code = taosHashPut(pBalancedVgroups, &pVgroup->vgId, sizeof(int32_t), &pVgroup->vgId, sizeof(int32_t));
14,441✔
3870
    }
3871

3872
  _OUT:
14,441✔
3873
    mndReleaseDb(pMnode, pDb);
14,441✔
3874
    sdbRelease(pSdb, pVgroup);
14,441✔
3875
    sdbCancelFetch(pSdb, pIter);
14,441✔
3876
    break;
14,441✔
3877
  }
3878

3879
  return code;
14,441✔
3880
}
3881

3882
static int32_t mndBalanceVgroup(SMnode *pMnode, SRpcMsg *pReq, SArray *pArray) {
9,626✔
3883
  int32_t   code = -1;
9,626✔
3884
  int32_t   numOfVgroups = 0;
9,626✔
3885
  STrans   *pTrans = NULL;
9,626✔
3886
  SHashObj *pBalancedVgroups = NULL;
9,626✔
3887

3888
  pBalancedVgroups = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
9,626✔
3889
  if (pBalancedVgroups == NULL) goto _OVER;
9,626✔
3890

3891
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "balance-vgroup");
9,626✔
3892
  if (pTrans == NULL) {
9,626✔
UNCOV
3893
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
3894
    if (terrno != 0) code = terrno;
×
UNCOV
3895
    goto _OVER;
×
3896
  }
3897
  mndTransSetSerial(pTrans);
9,626✔
3898
  mInfo("trans:%d, used to balance vgroup", pTrans->id);
9,626✔
3899
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
9,626✔
3900
  TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
9,036✔
3901
  TAOS_CHECK_GOTO(mndTransCheckConflictWithRetention(pMnode, pTrans), NULL, _OVER);
8,977✔
3902

3903
  while (1) {
14,441✔
3904
    taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
23,418✔
3905
    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
101,112✔
3906
      SDnodeObj *pDnode = taosArrayGet(pArray, i);
77,694✔
3907
      mInfo("dnode:%d, equivalent vnodes:%d others:%d support:%d, score:%f", pDnode->id, pDnode->numOfVnodes,
77,694✔
3908
            pDnode->numOfSupportVnodes, pDnode->numOfOtherNodes, mndGetDnodeScore(pDnode, 0, 1));
3909
    }
3910

3911
    SDnodeObj *pSrc = taosArrayGet(pArray, taosArrayGetSize(pArray) - 1);
23,418✔
3912
    SDnodeObj *pDst = taosArrayGet(pArray, 0);
23,418✔
3913

3914
    float srcScore = mndGetDnodeScore(pSrc, -1, 1);
23,418✔
3915
    float dstScore = mndGetDnodeScore(pDst, 1, 1);
23,418✔
3916
    mInfo("trans:%d, after balance, src dnode:%d score:%f, dst dnode:%d score:%f", pTrans->id, pSrc->id, dstScore,
23,418✔
3917
          pDst->id, dstScore);
3918

3919
    if (srcScore > dstScore - 0.000001) {
23,418✔
3920
      code = mndBalanceVgroupBetweenDnode(pMnode, pTrans, pSrc, pDst, pBalancedVgroups);
14,441✔
3921
      if (code == 0) {
14,441✔
3922
        pSrc->numOfVnodes--;
14,441✔
3923
        pDst->numOfVnodes++;
14,441✔
3924
        numOfVgroups++;
14,441✔
3925
        continue;
14,441✔
3926
      } else {
UNCOV
3927
        mInfo("trans:%d, no vgroup need to balance from dnode:%d to dnode:%d", pTrans->id, pSrc->id, pDst->id);
×
UNCOV
3928
        break;
×
3929
      }
3930
    } else {
3931
      mInfo("trans:%d, no vgroup need to balance any more", pTrans->id);
8,977✔
3932
      break;
8,977✔
3933
    }
3934
  }
3935

3936
  if (numOfVgroups <= 0) {
8,977✔
UNCOV
3937
    mInfo("no need to balance vgroup");
×
UNCOV
3938
    code = 0;
×
3939
  } else {
3940
    mInfo("start to balance vgroup, numOfVgroups:%d", numOfVgroups);
8,977✔
3941
    if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
8,977✔
3942
    code = TSDB_CODE_ACTION_IN_PROGRESS;
8,977✔
3943
  }
3944

3945
_OVER:
9,626✔
3946
  taosHashCleanup(pBalancedVgroups);
9,626✔
3947
  mndTransDrop(pTrans);
9,626✔
3948
  TAOS_RETURN(code);
9,626✔
3949
}
3950

3951
static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) {
11,038✔
3952
  SMnode *pMnode = pReq->info.node;
11,038✔
3953
  int32_t code = -1;
11,038✔
3954
  SArray *pArray = NULL;
11,038✔
3955
  void   *pIter = NULL;
11,038✔
3956
  int64_t curMs = taosGetTimestampMs();
11,038✔
3957
  int64_t tss = taosGetTimestampMs();
11,038✔
3958

3959
  SBalanceVgroupReq req = {0};
11,038✔
3960
  if (tDeserializeSBalanceVgroupReq(pReq->pCont, pReq->contLen, &req) != 0) {
11,038✔
UNCOV
3961
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
3962
    goto _OVER;
×
3963
  }
3964

3965
  mInfo("start to balance vgroup");
11,038✔
3966
  if ((code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_BALANCE_VGROUP)) != 0) {
11,038✔
UNCOV
3967
    goto _OVER;
×
3968
  }
3969

3970
  if (sdbGetSize(pMnode->pSdb, SDB_MOUNT) > 0) {
11,038✔
UNCOV
3971
    code = TSDB_CODE_MND_MOUNT_NOT_EMPTY;
×
UNCOV
3972
    goto _OVER;
×
3973
  }
3974

3975
  while (1) {
33,719✔
3976
    SDnodeObj *pDnode = NULL;
44,757✔
3977
    pIter = sdbFetch(pMnode->pSdb, SDB_DNODE, pIter, (void **)&pDnode);
44,757✔
3978
    if (pIter == NULL) break;
44,757✔
3979
    if (!mndIsDnodeOnline(pDnode, curMs)) {
35,131✔
3980
      sdbCancelFetch(pMnode->pSdb, pIter);
1,412✔
3981
      code = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
1,412✔
3982
      mError("failed to balance vgroup since %s, dnode:%d", terrstr(), pDnode->id);
1,412✔
3983
      sdbRelease(pMnode->pSdb, pDnode);
1,412✔
3984
      goto _OVER;
1,412✔
3985
    }
3986

3987
    sdbRelease(pMnode->pSdb, pDnode);
33,719✔
3988
  }
3989

3990
  pArray = mndBuildDnodesArray(pMnode, 0, NULL);
9,626✔
3991
  if (pArray == NULL) {
9,626✔
UNCOV
3992
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
3993
    if (terrno != 0) code = terrno;
×
UNCOV
3994
    goto _OVER;
×
3995
  }
3996

3997
  if (taosArrayGetSize(pArray) < 2) {
9,626✔
UNCOV
3998
    mInfo("no need to balance vgroup since dnode num less than 2");
×
UNCOV
3999
    code = 0;
×
4000
  } else {
4001
    code = mndBalanceVgroup(pMnode, pReq, pArray);
9,626✔
4002
  }
4003

4004
  if (tsAuditLevel >= AUDIT_LEVEL_CLUSTER) {
9,626✔
4005
    int64_t tse = taosGetTimestampMs();
9,626✔
4006
    double  duration = (double)(tse - tss);
9,626✔
4007
    duration = duration / 1000;
9,626✔
4008
    auditRecord(pReq, pMnode->clusterId, "balanceVgroup", "", "", req.sql, req.sqlLen, duration, 0);
9,626✔
4009
  }
4010

4011
_OVER:
11,038✔
4012
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
11,038✔
4013
    mError("failed to balance vgroup since %s", tstrerror(code));
2,061✔
4014
  }
4015

4016
  taosArrayDestroy(pArray);
11,038✔
4017
  tFreeSBalanceVgroupReq(&req);
11,038✔
4018
  TAOS_RETURN(code);
11,038✔
4019
}
4020

4021
bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; }
111,716,832✔
4022

4023
bool mndVgroupInDnode(SVgObj *pVgroup, int32_t dnodeId) {
3,376✔
4024
  for (int i = 0; i < pVgroup->replica; i++) {
8,434✔
4025
    if (pVgroup->vnodeGid[i].dnodeId == dnodeId) return true;
7,168✔
4026
  }
4027
  return false;
1,266✔
4028
}
4029

4030
static void *mndBuildCompactVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen, int64_t compactTs,
121,570✔
4031
                                     STimeWindow tw, bool metaOnly, bool force, ETsdbOpType type,
4032
                                     ETriggerType triggerType) {
4033
  SCompactVnodeReq compactReq = {0};
121,570✔
4034
  compactReq.dbUid = pDb->uid;
121,570✔
4035
  compactReq.compactStartTime = compactTs;
121,570✔
4036
  compactReq.tw = tw;
121,570✔
4037
  compactReq.metaOnly = metaOnly;
121,570✔
4038
  compactReq.force = force;
121,570✔
4039
  compactReq.optrType = type;
121,570✔
4040
  compactReq.triggerType = triggerType;
121,570✔
4041
  tstrncpy(compactReq.db, pDb->name, TSDB_DB_FNAME_LEN);
121,570✔
4042

4043
  mInfo("vgId:%d, build compact vnode config req", pVgroup->vgId);
121,570✔
4044
  int32_t contLen = tSerializeSCompactVnodeReq(NULL, 0, &compactReq);
121,570✔
4045
  if (contLen < 0) {
121,570✔
UNCOV
4046
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
4047
    return NULL;
×
4048
  }
4049
  contLen += sizeof(SMsgHead);
121,570✔
4050

4051
  void *pReq = taosMemoryMalloc(contLen);
121,570✔
4052
  if (pReq == NULL) {
121,570✔
UNCOV
4053
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
4054
    return NULL;
×
4055
  }
4056

4057
  SMsgHead *pHead = pReq;
121,570✔
4058
  pHead->contLen = htonl(contLen);
121,570✔
4059
  pHead->vgId = htonl(pVgroup->vgId);
121,570✔
4060

4061
  if (tSerializeSCompactVnodeReq((char *)pReq + sizeof(SMsgHead), contLen, &compactReq) < 0) {
121,570✔
UNCOV
4062
    taosMemoryFree(pReq);
×
UNCOV
4063
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
4064
    return NULL;
×
4065
  }
4066
  *pContLen = contLen;
121,570✔
4067
  return pReq;
121,570✔
4068
}
4069

4070
static int32_t mndAddCompactVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs,
54,453✔
4071
                                        STimeWindow tw, bool metaOnly, bool force, ETsdbOpType type,
4072
                                        ETriggerType triggerType) {
4073
  int32_t      code = 0;
54,453✔
4074
  STransAction action = {0};
54,453✔
4075
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
54,453✔
4076

4077
  int32_t contLen = 0;
54,453✔
4078
  void   *pReq =
4079
      mndBuildCompactVnodeReq(pMnode, pDb, pVgroup, &contLen, compactTs, tw, metaOnly, force, type, triggerType);
54,453✔
4080
  if (pReq == NULL) {
54,453✔
UNCOV
4081
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
4082
    if (terrno != 0) code = terrno;
×
UNCOV
4083
    TAOS_RETURN(code);
×
4084
  }
4085

4086
  action.pCont = pReq;
54,453✔
4087
  action.contLen = contLen;
54,453✔
4088
  action.msgType = TDMT_VND_COMPACT;
54,453✔
4089

4090
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
54,453✔
UNCOV
4091
    taosMemoryFree(pReq);
×
UNCOV
4092
    TAOS_RETURN(code);
×
4093
  }
4094

4095
  TAOS_RETURN(code);
54,453✔
4096
}
4097

4098
int32_t mndBuildCompactVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs,
54,453✔
4099
                                    STimeWindow tw, bool metaOnly, bool force, ETsdbOpType type,
4100
                                    ETriggerType triggerType) {
4101
  TAOS_CHECK_RETURN(
54,453✔
4102
      mndAddCompactVnodeAction(pMnode, pTrans, pDb, pVgroup, compactTs, tw, metaOnly, force, type, triggerType));
4103
  return 0;
54,453✔
4104
}
4105

4106
int32_t mndBuildTrimVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t startTs,
67,117✔
4107
                                 STimeWindow tw, ETsdbOpType type, ETriggerType triggerType) {
4108
  int32_t      code = 0;
67,117✔
4109
  STransAction action = {0};
67,117✔
4110
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
67,117✔
4111

4112
  int32_t contLen = 0;
67,117✔
4113
  // reuse SCompactVnodeReq as SVTrimDbReq
4114
  void *pReq = mndBuildCompactVnodeReq(pMnode, pDb, pVgroup, &contLen, startTs, tw, false, false, type, triggerType);
67,117✔
4115
  if (pReq == NULL) {
67,117✔
UNCOV
4116
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
4117
    if (terrno != 0) code = terrno;
×
UNCOV
4118
    TAOS_RETURN(code);
×
4119
  }
4120

4121
  action.pCont = pReq;
67,117✔
4122
  action.contLen = contLen;
67,117✔
4123
  action.msgType = TDMT_VND_TRIM;
67,117✔
4124

4125
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
67,117✔
UNCOV
4126
    taosMemoryFree(pReq);
×
UNCOV
4127
    TAOS_RETURN(code);
×
4128
  }
4129

4130
  TAOS_RETURN(code);
67,117✔
4131
}
4132

4133
static int32_t mndProcessSetVgroupKeepVersionReq(SRpcMsg *pReq) {
1,072✔
4134
  SMnode *pMnode = pReq->info.node;
1,072✔
4135
  int32_t code = TSDB_CODE_SUCCESS;
1,072✔
4136
  STrans *pTrans = NULL;
1,072✔
4137
  SVgObj *pVgroup = NULL;
1,072✔
4138

4139
  SMndSetVgroupKeepVersionReq req = {0};
1,072✔
4140
  if (tDeserializeSMndSetVgroupKeepVersionReq(pReq->pCont, pReq->contLen, &req) != 0) {
1,072✔
UNCOV
4141
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
4142
    goto _OVER;
×
4143
  }
4144

4145
  mInfo("start to set vgroup keep version, vgId:%d, keepVersion:%" PRId64, req.vgId, req.keepVersion);
1,072✔
4146

4147
  // Check permission
4148
  if ((code = mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_WRITE_DB)) != 0) {
1,072✔
UNCOV
4149
    goto _OVER;
×
4150
  }
4151

4152
  // Get vgroup
4153
  pVgroup = mndAcquireVgroup(pMnode, req.vgId);
1,072✔
4154
  if (pVgroup == NULL) {
1,072✔
UNCOV
4155
    code = TSDB_CODE_MND_VGROUP_NOT_EXIST;
×
UNCOV
4156
    mError("vgId:%d not exist, failed to set keep version", req.vgId);
×
UNCOV
4157
    goto _OVER;
×
4158
  }
4159

4160
  // Create transaction
4161
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "set-vgroup-keep-version");
1,072✔
4162
  if (pTrans == NULL) {
1,072✔
UNCOV
4163
    code = terrno != 0 ? terrno : TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
4164
    mndReleaseVgroup(pMnode, pVgroup);
×
UNCOV
4165
    goto _OVER;
×
4166
  }
4167

4168
  mndTransSetSerial(pTrans);
1,072✔
4169
  mInfo("trans:%d, used to set vgroup keep version, vgId:%d keepVersion:%" PRId64, pTrans->id, req.vgId,
1,072✔
4170
        req.keepVersion);
4171

4172
  // Update SVgObj's keepVersion in mnode
4173
  SVgObj newVgroup = {0};
1,072✔
4174
  memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
1,072✔
4175
  newVgroup.keepVersion = req.keepVersion;
1,072✔
4176
  newVgroup.keepVersionTime = taosGetTimestampMs();
1,072✔
4177

4178
  // Add prepare log for SDB vgroup update (execute in PREPARE stage, before redo actions)
4179
  SSdbRaw *pCommitRaw = mndVgroupActionEncode(&newVgroup);
1,072✔
4180
  if (pCommitRaw == NULL) {
1,072✔
UNCOV
4181
    code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
4182
    mndReleaseVgroup(pMnode, pVgroup);
×
UNCOV
4183
    goto _OVER;
×
4184
  }
4185
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
1,072✔
4186
    code = terrno;
×
UNCOV
4187
    sdbFreeRaw(pCommitRaw);
×
UNCOV
4188
    mndReleaseVgroup(pMnode, pVgroup);
×
4189
    goto _OVER;
×
4190
  }
4191
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
1,072✔
4192
    mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", pVgroup->vgId, tstrerror(code), __LINE__);
×
UNCOV
4193
    sdbFreeRaw(pCommitRaw);
×
UNCOV
4194
    mndReleaseVgroup(pMnode, pVgroup);
×
4195
    goto _OVER;
×
4196
  }
4197

4198
  // Prepare message for vnodes
4199
  SVndSetKeepVersionReq vndReq = {.keepVersion = req.keepVersion};
1,072✔
4200
  int32_t               reqLen = tSerializeSVndSetKeepVersionReq(NULL, 0, &vndReq);
1,072✔
4201
  int32_t               contLen = reqLen + sizeof(SMsgHead);
1,072✔
4202

4203
  // Send to all replicas of the vgroup
4204
  for (int32_t i = 0; i < pVgroup->replica; ++i) {
4,288✔
4205
    SMsgHead *pHead = taosMemoryMalloc(contLen);
3,216✔
4206
    if (pHead == NULL) {
3,216✔
UNCOV
4207
      code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
4208
      mndReleaseVgroup(pMnode, pVgroup);
×
UNCOV
4209
      goto _OVER;
×
4210
    }
4211

4212
    pHead->contLen = htonl(contLen);
3,216✔
4213
    pHead->vgId = htonl(pVgroup->vgId);
3,216✔
4214

4215
    if (tSerializeSVndSetKeepVersionReq((char *)pHead + sizeof(SMsgHead), reqLen, &vndReq) < 0) {
3,216✔
UNCOV
4216
      taosMemoryFree(pHead);
×
UNCOV
4217
      code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
4218
      mndReleaseVgroup(pMnode, pVgroup);
×
4219
      goto _OVER;
×
4220
    }
4221

4222
    // Get dnode and add action to transaction
4223
    SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgroup->vnodeGid[i].dnodeId);
3,216✔
4224
    if (pDnode == NULL) {
3,216✔
UNCOV
4225
      taosMemoryFree(pHead);
×
UNCOV
4226
      code = TSDB_CODE_MND_DNODE_NOT_EXIST;
×
UNCOV
4227
      mndReleaseVgroup(pMnode, pVgroup);
×
4228
      goto _OVER;
×
4229
    }
4230

4231
    STransAction action = {0};
3,216✔
4232
    action.epSet = mndGetDnodeEpset(pDnode);
3,216✔
4233
    mndReleaseDnode(pMnode, pDnode);
3,216✔
4234
    action.pCont = pHead;
3,216✔
4235
    action.contLen = contLen;
3,216✔
4236
    action.msgType = TDMT_VND_SET_KEEP_VERSION;
3,216✔
4237
    action.acceptableCode = TSDB_CODE_VND_STOPPED;
3,216✔
4238

4239
    if (mndTransAppendRedoAction(pTrans, &action) != 0) {
3,216✔
UNCOV
4240
      taosMemoryFree(pHead);
×
UNCOV
4241
      code = terrno;
×
UNCOV
4242
      mndReleaseVgroup(pMnode, pVgroup);
×
4243
      goto _OVER;
×
4244
    }
4245
  }
4246

4247
  mndReleaseVgroup(pMnode, pVgroup);
1,072✔
4248

4249
  // Prepare and execute transaction
4250
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
1,072✔
UNCOV
4251
    goto _OVER;
×
4252
  }
4253

4254
  code = TSDB_CODE_ACTION_IN_PROGRESS;
1,072✔
4255

4256
_OVER:
1,072✔
4257
  if (pTrans != NULL) mndTransDrop(pTrans);
1,072✔
4258

4259
  return code;
1,072✔
4260
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc