• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4404

30 Jun 2025 02:45AM UTC coverage: 62.241% (-0.4%) from 62.635%
#4404

push

travis-ci

web-flow
Merge pull request #31480 from taosdata/docs/3.0/TD-34215

add stmt2 docs

153837 of 315978 branches covered (48.69%)

Branch coverage included in aggregate %.

238272 of 314005 relevant lines covered (75.88%)

6134648.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.54
/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "metrics.h"
18
#include "taos_monitor.h"
19
#include "vmInt.h"
20
#include "vnodeInt.h"
21

22
extern taos_counter_t *tsInsertCounter;
23

24
// Forward declaration for function defined in metrics.c
25
extern int32_t addWriteMetrics(int32_t vgId, int32_t dnodeId, int64_t clusterId, const char *dnodeEp,
26
                               const char *dbname, const SRawWriteMetrics *pRawMetrics);
27

28
void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
95,894✔
29
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad));
95,894✔
30
  if (pInfo->pVloads == NULL) return;
95,894!
31

32
  tfsUpdateSize(pMgmt->pTfs);
95,894✔
33

34
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
95,894✔
35

36
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
95,894✔
37
  while (pIter) {
338,672✔
38
    SVnodeObj **ppVnode = pIter;
242,778✔
39
    if (ppVnode == NULL || *ppVnode == NULL) continue;
242,778!
40

41
    SVnodeObj *pVnode = *ppVnode;
242,778✔
42
    SVnodeLoad vload = {.vgId = pVnode->vgId};
242,778✔
43
    if (!pVnode->failed) {
242,778!
44
      if (vnodeGetLoad(pVnode->pImpl, &vload) != 0) {
242,778!
45
        dError("failed to get vnode load");
×
46
      }
47
      if (isReset) vnodeResetLoad(pVnode->pImpl, &vload);
242,778✔
48
    }
49
    if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
485,556!
50
      dError("failed to push vnode load");
×
51
    }
52
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
242,778✔
53
  }
54

55
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
95,894✔
56
}
57

58
void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
5✔
59
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoadLite));
5✔
60
  if (!pInfo->pVloads) return;
5!
61

62
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
5✔
63

64
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
5✔
65
  while (pIter) {
5!
66
    SVnodeObj **ppVnode = pIter;
×
67
    if (ppVnode == NULL || *ppVnode == NULL) continue;
×
68

69
    SVnodeObj *pVnode = *ppVnode;
×
70
    if (!pVnode->failed) {
×
71
      SVnodeLoadLite vload = {0};
×
72
      if (vnodeGetLoadLite(pVnode->pImpl, &vload) == 0) {
×
73
        if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
×
74
          taosArrayDestroy(pInfo->pVloads);
×
75
          pInfo->pVloads = NULL;
×
76
          break;
×
77
        }
78
      }
79
    }
80
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
81
  }
82

83
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
5✔
84
}
85

86
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
16✔
87
  SMonVloadInfo vloads = {0};
16✔
88
  vmGetVnodeLoads(pMgmt, &vloads, true);
16✔
89

90
  SArray *pVloads = vloads.pVloads;
16✔
91
  if (pVloads == NULL) return;
16!
92

93
  int32_t totalVnodes = 0;
16✔
94
  int32_t masterNum = 0;
16✔
95
  int64_t numOfSelectReqs = 0;
16✔
96
  int64_t numOfInsertReqs = 0;
16✔
97
  int64_t numOfInsertSuccessReqs = 0;
16✔
98
  int64_t numOfBatchInsertReqs = 0;
16✔
99
  int64_t numOfBatchInsertSuccessReqs = 0;
16✔
100

101
  for (int32_t i = 0; i < taosArrayGetSize(pVloads); ++i) {
54✔
102
    SVnodeLoad *pLoad = taosArrayGet(pVloads, i);
38✔
103
    numOfSelectReqs += pLoad->numOfSelectReqs;
38✔
104
    numOfInsertReqs += pLoad->numOfInsertReqs;
38✔
105
    numOfInsertSuccessReqs += pLoad->numOfInsertSuccessReqs;
38✔
106
    numOfBatchInsertReqs += pLoad->numOfBatchInsertReqs;
38✔
107
    numOfBatchInsertSuccessReqs += pLoad->numOfBatchInsertSuccessReqs;
38✔
108
    if (pLoad->syncState == TAOS_SYNC_STATE_LEADER || pLoad->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
38!
109
      masterNum++;
38✔
110
    }
111
    totalVnodes++;
38✔
112
  }
113

114
  pInfo->vstat.totalVnodes = totalVnodes;
16✔
115
  pInfo->vstat.masterNum = masterNum;
16✔
116
  pInfo->vstat.numOfSelectReqs = numOfSelectReqs;
16✔
117
  pInfo->vstat.numOfInsertReqs = numOfInsertReqs;                          // delta
16✔
118
  pInfo->vstat.numOfInsertSuccessReqs = numOfInsertSuccessReqs;            // delta
16✔
119
  pInfo->vstat.numOfBatchInsertReqs = numOfBatchInsertReqs;                // delta
16✔
120
  pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;  // delta
16✔
121
  pMgmt->state.totalVnodes = totalVnodes;
16✔
122
  pMgmt->state.masterNum = masterNum;
16✔
123
  pMgmt->state.numOfSelectReqs = numOfSelectReqs;
16✔
124
  pMgmt->state.numOfInsertReqs = numOfInsertReqs;
16✔
125
  pMgmt->state.numOfInsertSuccessReqs = numOfInsertSuccessReqs;
16✔
126
  pMgmt->state.numOfBatchInsertReqs = numOfBatchInsertReqs;
16✔
127
  pMgmt->state.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;
16✔
128

129
  if (tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs) != 0) {
16!
130
    dError("failed to get tfs monitor info");
×
131
  }
132
  taosArrayDestroy(pVloads);
16✔
133
}
134

135
void vmCleanExpriedSamples(SVnodeMgmt *pMgmt) {
16✔
136
  int list_size = taos_counter_get_keys_size(tsInsertCounter);
16✔
137
  if (list_size == 0) return;
16✔
138
  int32_t *vgroup_ids;
139
  char   **keys;
140
  int      r = 0;
1✔
141
  r = taos_counter_get_vgroup_ids(tsInsertCounter, &keys, &vgroup_ids, &list_size);
1✔
142
  if (r) {
1!
143
    dError("failed to get vgroup ids");
×
144
    return;
×
145
  }
146
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
1✔
147
  for (int i = 0; i < list_size; i++) {
2✔
148
    int32_t vgroup_id = vgroup_ids[i];
1✔
149
    void   *vnode = taosHashGet(pMgmt->runngingHash, &vgroup_id, sizeof(int32_t));
1✔
150
    if (vnode == NULL) {
1!
151
      r = taos_counter_delete(tsInsertCounter, keys[i]);
×
152
      if (r) {
×
153
        dError("failed to delete monitor sample key:%s", keys[i]);
×
154
      }
155
    }
156
  }
157
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
1✔
158
  if (vgroup_ids) taosMemoryFree(vgroup_ids);
1!
159
  if (keys) taosMemoryFree(keys);
1!
160
  return;
1✔
161
}
162

163
void vmCleanExpiredMetrics(SVnodeMgmt *pMgmt) {
×
164
  if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0 || !tsEnableMetrics) {
×
165
    return;
×
166
  }
167

168
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
169
  void     *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
170
  SHashObj *pValidVgroups = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
×
171
  if (pValidVgroups == NULL) {
×
172
    (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
173
    return;
×
174
  }
175

176
  while (pIter != NULL) {
×
177
    SVnodeObj **ppVnode = pIter;
×
178
    if (ppVnode && *ppVnode) {
×
179
      int32_t vgId = (*ppVnode)->vgId;
×
180
      char    dummy = 1;  // hash table value (we only care about the key)
×
181
      if (taosHashPut(pValidVgroups, &vgId, sizeof(int32_t), &dummy, sizeof(char)) != 0) {
×
182
        dError("failed to put vgId:%d to valid vgroups hash", vgId);
×
183
      }
184
    }
185
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
186
  }
187
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
188

189
  // Clean expired metrics by removing metrics for non-existent vgroups
190
  int32_t code = cleanupExpiredMetrics(pValidVgroups);
×
191
  if (code != TSDB_CODE_SUCCESS) {
×
192
    dError("failed to clean expired metrics, code:%d", code);
×
193
  }
194

195
  taosHashCleanup(pValidVgroups);
×
196
}
197

198
static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
10,640✔
199
  memcpy(pCfg, &vnodeCfgDefault, sizeof(SVnodeCfg));
10,640✔
200

201
  pCfg->vgId = pCreate->vgId;
10,640✔
202
  tstrncpy(pCfg->dbname, pCreate->db, sizeof(pCfg->dbname));
10,640✔
203
  pCfg->dbId = pCreate->dbUid;
10,640✔
204
  pCfg->szPage = pCreate->pageSize * 1024;
10,640✔
205
  pCfg->szCache = pCreate->pages;
10,640✔
206
  pCfg->cacheLast = pCreate->cacheLast;
10,640✔
207
  pCfg->cacheLastSize = pCreate->cacheLastSize;
10,640✔
208
  pCfg->szBuf = (uint64_t)pCreate->buffer * 1024 * 1024;
10,640✔
209
  pCfg->isWeak = true;
10,640✔
210
  pCfg->isTsma = pCreate->isTsma;
10,640✔
211
  pCfg->tsdbCfg.compression = pCreate->compression;
10,640✔
212
  pCfg->tsdbCfg.precision = pCreate->precision;
10,640✔
213
  pCfg->tsdbCfg.days = pCreate->daysPerFile;
10,640✔
214
  pCfg->tsdbCfg.keep0 = pCreate->daysToKeep0;
10,640✔
215
  pCfg->tsdbCfg.keep1 = pCreate->daysToKeep1;
10,640✔
216
  pCfg->tsdbCfg.keep2 = pCreate->daysToKeep2;
10,640✔
217
  pCfg->tsdbCfg.keepTimeOffset = pCreate->keepTimeOffset;
10,640✔
218
  pCfg->tsdbCfg.minRows = pCreate->minRows;
10,640✔
219
  pCfg->tsdbCfg.maxRows = pCreate->maxRows;
10,640✔
220
  for (size_t i = 0; i < taosArrayGetSize(pCreate->pRetensions); ++i) {
10,667✔
221
    SRetention *pRetention = &pCfg->tsdbCfg.retentions[i];
27✔
222
    memcpy(pRetention, taosArrayGet(pCreate->pRetensions, i), sizeof(SRetention));
27✔
223
    if (i == 0) {
27✔
224
      if ((pRetention->freq >= 0 && pRetention->keep > 0)) pCfg->isRsma = 1;
9!
225
    }
226
  }
227
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
228
  pCfg->tsdbCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
10,640✔
229
  if (pCfg->tsdbCfg.encryptAlgorithm == DND_CA_SM4) {
10,640✔
230
    tstrncpy(pCfg->tsdbCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
1✔
231
  }
232
#else
233
  pCfg->tsdbCfg.encryptAlgorithm = 0;
234
#endif
235

236
  pCfg->walCfg.vgId = pCreate->vgId;
10,640✔
237
  pCfg->walCfg.fsyncPeriod = pCreate->walFsyncPeriod;
10,640✔
238
  pCfg->walCfg.retentionPeriod = pCreate->walRetentionPeriod;
10,640✔
239
  pCfg->walCfg.rollPeriod = pCreate->walRollPeriod;
10,640✔
240
  pCfg->walCfg.retentionSize = pCreate->walRetentionSize;
10,640✔
241
  pCfg->walCfg.segSize = pCreate->walSegmentSize;
10,640✔
242
  pCfg->walCfg.level = pCreate->walLevel;
10,640✔
243
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
244
  pCfg->walCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
10,640✔
245
  if (pCfg->walCfg.encryptAlgorithm == DND_CA_SM4) {
10,640✔
246
    tstrncpy(pCfg->walCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
1✔
247
  }
248
#else
249
  pCfg->walCfg.encryptAlgorithm = 0;
250
#endif
251

252
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
253
  pCfg->tdbEncryptAlgorithm = pCreate->encryptAlgorithm;
10,640✔
254
  if (pCfg->tdbEncryptAlgorithm == DND_CA_SM4) {
10,640✔
255
    tstrncpy(pCfg->tdbEncryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
1✔
256
  }
257
#else
258
  pCfg->tdbEncryptAlgorithm = 0;
259
#endif
260

261
  pCfg->sttTrigger = pCreate->sstTrigger;
10,640✔
262
  pCfg->hashBegin = pCreate->hashBegin;
10,640✔
263
  pCfg->hashEnd = pCreate->hashEnd;
10,640✔
264
  pCfg->hashMethod = pCreate->hashMethod;
10,640✔
265
  pCfg->hashPrefix = pCreate->hashPrefix;
10,640✔
266
  pCfg->hashSuffix = pCreate->hashSuffix;
10,640✔
267
  pCfg->tsdbPageSize = pCreate->tsdbPageSize * 1024;
10,640✔
268

269
  pCfg->s3ChunkSize = pCreate->s3ChunkSize;
10,640✔
270
  pCfg->s3KeepLocal = pCreate->s3KeepLocal;
10,640✔
271
  pCfg->s3Compact = pCreate->s3Compact;
10,640✔
272

273
  pCfg->standby = 0;
10,640✔
274
  pCfg->syncCfg.replicaNum = 0;
10,640✔
275
  pCfg->syncCfg.totalReplicaNum = 0;
10,640✔
276
  pCfg->syncCfg.changeVersion = pCreate->changeVersion;
10,640✔
277

278
  memset(&pCfg->syncCfg.nodeInfo, 0, sizeof(pCfg->syncCfg.nodeInfo));
10,640✔
279
  for (int32_t i = 0; i < pCreate->replica; ++i) {
24,658✔
280
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
14,018✔
281
    pNode->nodeId = pCreate->replicas[pCfg->syncCfg.replicaNum].id;
14,018✔
282
    pNode->nodePort = pCreate->replicas[pCfg->syncCfg.replicaNum].port;
14,018✔
283
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
14,018✔
284
    tstrncpy(pNode->nodeFqdn, pCreate->replicas[pCfg->syncCfg.replicaNum].fqdn, TSDB_FQDN_LEN);
14,018✔
285
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
14,018✔
286
    pCfg->syncCfg.replicaNum++;
14,018✔
287
  }
288
  if (pCreate->selfIndex != -1) {
10,640✔
289
    pCfg->syncCfg.myIndex = pCreate->selfIndex;
10,443✔
290
  }
291
  for (int32_t i = pCfg->syncCfg.replicaNum; i < pCreate->replica + pCreate->learnerReplica; ++i) {
10,837✔
292
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
197✔
293
    pNode->nodeId = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].id;
197✔
294
    pNode->nodePort = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].port;
197✔
295
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
197✔
296
    tstrncpy(pNode->nodeFqdn, pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].fqdn, TSDB_FQDN_LEN);
197✔
297
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
197✔
298
    pCfg->syncCfg.totalReplicaNum++;
197✔
299
  }
300
  pCfg->syncCfg.totalReplicaNum += pCfg->syncCfg.replicaNum;
10,640✔
301
  if (pCreate->learnerSelfIndex != -1) {
10,640✔
302
    pCfg->syncCfg.myIndex = pCreate->replica + pCreate->learnerSelfIndex;
197✔
303
  }
304
}
10,640✔
305

306
static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) {
10,639✔
307
  pCfg->vgId = pCreate->vgId;
10,639✔
308
  pCfg->vgVersion = pCreate->vgVersion;
10,639✔
309
  pCfg->dropped = 0;
10,639✔
310
  snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCreate->vgId);
10,639✔
311
}
10,639✔
312

313
static int32_t vmTsmaAdjustDays(SVnodeCfg *pCfg, SCreateVnodeReq *pReq) {
10,639✔
314
  if (pReq->isTsma) {
10,639!
315
    SMsgHead *smaMsg = pReq->pTsma;
×
316
    uint32_t  contLen = (uint32_t)(htonl(smaMsg->contLen) - sizeof(SMsgHead));
×
317
    return smaGetTSmaDays(pCfg, POINTER_SHIFT(smaMsg, sizeof(SMsgHead)), contLen, &pCfg->tsdbCfg.days);
×
318
  }
319
  return 0;
10,639✔
320
}
321

322
#if 0
323
static int32_t vmTsmaProcessCreate(SVnode *pVnode, SCreateVnodeReq *pReq) {
324
  if (pReq->isTsma) {
325
    SMsgHead *smaMsg = pReq->pTsma;
326
    uint32_t  contLen = (uint32_t)(htonl(smaMsg->contLen) - sizeof(SMsgHead));
327
    return vnodeProcessCreateTSma(pVnode, POINTER_SHIFT(smaMsg, sizeof(SMsgHead)), contLen);
328
  }
329
  return 0;
330
}
331
#endif
332

333
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
10,632✔
334
  SCreateVnodeReq req = {0};
10,632✔
335
  SVnodeCfg       vnodeCfg = {0};
10,632✔
336
  SWrapperCfg     wrapperCfg = {0};
10,632✔
337
  int32_t         code = -1;
10,632✔
338
  char            path[TSDB_FILENAME_LEN] = {0};
10,632✔
339

340
  if (tDeserializeSCreateVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
10,632!
341
    return TSDB_CODE_INVALID_MSG;
×
342
  }
343

344
  if (req.learnerReplica == 0) {
10,585✔
345
    req.learnerSelfIndex = -1;
10,394✔
346
  }
347

348
  dInfo(
10,585!
349
      "vgId:%d, vnode management handle msgType:%s, start to create vnode, page:%d pageSize:%d buffer:%d szPage:%d "
350
      "szBuf:%" PRIu64 ", cacheLast:%d cacheLastSize:%d sstTrigger:%d tsdbPageSize:%d %d dbname:%s dbId:%" PRId64
351
      ", days:%d keep0:%d keep1:%d keep2:%d keepTimeOffset%d s3ChunkSize:%d s3KeepLocal:%d s3Compact:%d tsma:%d "
352
      "precision:%d compression:%d minRows:%d maxRows:%d"
353
      ", wal fsync:%d level:%d retentionPeriod:%d retentionSize:%" PRId64 " rollPeriod:%d segSize:%" PRId64
354
      ", hash method:%d begin:%u end:%u prefix:%d surfix:%d replica:%d selfIndex:%d "
355
      "learnerReplica:%d learnerSelfIndex:%d strict:%d changeVersion:%d encryptAlgorithm:%d",
356
      req.vgId, TMSG_INFO(pMsg->msgType), req.pages, req.pageSize, req.buffer, req.pageSize * 1024,
357
      (uint64_t)req.buffer * 1024 * 1024, req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize,
358
      req.tsdbPageSize * 1024, req.db, req.dbUid, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
359
      req.keepTimeOffset, req.s3ChunkSize, req.s3KeepLocal, req.s3Compact, req.isTsma, req.precision, req.compression,
360
      req.minRows, req.maxRows, req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize,
361
      req.walRollPeriod, req.walSegmentSize, req.hashMethod, req.hashBegin, req.hashEnd, req.hashPrefix, req.hashSuffix,
362
      req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict, req.changeVersion,
363
      req.encryptAlgorithm);
364

365
  for (int32_t i = 0; i < req.replica; ++i) {
24,659✔
366
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port,
14,020!
367
          req.replicas[i].id);
368
  }
369
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
10,836✔
370
    dInfo("vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.vgId, i, req.learnerReplicas[i].fqdn,
197!
371
          req.learnerReplicas[i].port, req.replicas[i].id);
372
  }
373

374
  SReplica *pReplica = NULL;
10,639✔
375
  if (req.selfIndex != -1) {
10,639✔
376
    pReplica = &req.replicas[req.selfIndex];
10,443✔
377
  } else {
378
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
196✔
379
  }
380
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
10,639!
381
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
10,640!
382
    code = TSDB_CODE_INVALID_MSG;
×
383
    dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode, reason:%s", req.vgId, pReplica->id,
×
384
           pReplica->fqdn, pReplica->port, tstrerror(code));
385
    return code;
×
386
  }
387

388
  if (req.encryptAlgorithm == DND_CA_SM4) {
10,640✔
389
    if (strlen(tsEncryptKey) == 0) {
1!
390
      code = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
391
      dError("vgId:%d, failed to create vnode since encrypt key is empty, reason:%s", req.vgId, tstrerror(code));
×
392
      return code;
×
393
    }
394
  }
395

396
  vmGenerateVnodeCfg(&req, &vnodeCfg);
10,640✔
397

398
  if ((code = vmTsmaAdjustDays(&vnodeCfg, &req)) < 0) {
10,640!
399
    dError("vgId:%d, failed to adjust tsma days since %s", req.vgId, tstrerror(code));
×
400
    goto _OVER;
×
401
  }
402

403
  vmGenerateWrapperCfg(pMgmt, &req, &wrapperCfg);
10,639✔
404

405
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, req.vgId, false);
10,639✔
406
  if (pVnode != NULL && (req.replica == 1 || !pVnode->failed)) {
10,638!
407
    dError("vgId:%d, already exist", req.vgId);
299!
408
    (void)tFreeSCreateVnodeReq(&req);
299✔
409
    vmReleaseVnode(pMgmt, pVnode);
299✔
410
    code = TSDB_CODE_VND_ALREADY_EXIST;
299✔
411
    return 0;
299✔
412
  }
413

414
  int32_t diskPrimary = vmGetPrimaryDisk(pMgmt, vnodeCfg.vgId);
10,339✔
415
  if (diskPrimary < 0) {
10,341!
416
    diskPrimary = vmAllocPrimaryDisk(pMgmt, vnodeCfg.vgId);
10,341✔
417
  }
418
  wrapperCfg.diskPrimary = diskPrimary;
10,341✔
419

420
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
10,341✔
421

422
  if ((code = vnodeCreate(path, &vnodeCfg, diskPrimary, pMgmt->pTfs)) < 0) {
10,341!
423
    dError("vgId:%d, failed to create vnode since %s", req.vgId, tstrerror(code));
×
424
    vmReleaseVnode(pMgmt, pVnode);
×
425
    vmCleanPrimaryDisk(pMgmt, req.vgId);
×
426
    (void)tFreeSCreateVnodeReq(&req);
×
427
    return code;
×
428
  }
429

430
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, true);
10,341✔
431
  if (pImpl == NULL) {
10,339!
432
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
433
    code = terrno != 0 ? terrno : -1;
×
434
    goto _OVER;
×
435
  }
436

437
  code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl);
10,339✔
438
  if (code != 0) {
10,341!
439
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
440
    code = terrno != 0 ? terrno : code;
×
441
    goto _OVER;
×
442
  }
443

444
#if 0
445
  code = vmTsmaProcessCreate(pImpl, &req);
446
  if (code != 0) {
447
    dError("vgId:%d, failed to create tsma since %s", req.vgId, terrstr());
448
    code = terrno;
449
    goto _OVER;
450
  }
451
#endif
452

453
  code = vnodeStart(pImpl);
10,341✔
454
  if (code != 0) {
10,341!
455
    dError("vgId:%d, failed to start sync since %s", req.vgId, terrstr());
×
456
    goto _OVER;
×
457
  }
458

459
  code = vmWriteVnodeListToFile(pMgmt);
10,341✔
460
  if (code != 0) {
10,341!
461
    code = terrno != 0 ? terrno : code;
×
462
    goto _OVER;
×
463
  }
464

465
_OVER:
10,341✔
466
  vmCleanPrimaryDisk(pMgmt, req.vgId);
10,341✔
467

468
  if (code != 0) {
10,341!
469
    vmCloseFailedVnode(pMgmt, req.vgId);
×
470

471
    vnodeClose(pImpl);
×
472
    vnodeDestroy(0, path, pMgmt->pTfs, 0);
×
473
  } else {
474
    dInfo("vgId:%d, vnode management handle msgType:%s, end to create vnode, vnode is created", req.vgId,
10,341!
475
          TMSG_INFO(pMsg->msgType));
476
  }
477

478
  (void)tFreeSCreateVnodeReq(&req);
10,341✔
479
  terrno = code;
10,341✔
480
  return code;
10,341✔
481
}
482

483
// alter replica doesn't use this, but restore dnode still use this
484
int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
3,897✔
485
  SAlterVnodeTypeReq req = {0};
3,897✔
486
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
3,897!
487
    terrno = TSDB_CODE_INVALID_MSG;
×
488
    return -1;
×
489
  }
490

491
  if (req.learnerReplicas == 0) {
492
    req.learnerSelfIndex = -1;
493
  }
494

495
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process alter-node-type-request", req.vgId,
3,897!
496
        TMSG_INFO(pMsg->msgType));
497

498
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
3,897✔
499
  if (pVnode == NULL) {
3,897!
500
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
501
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
502
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
503
    return -1;
×
504
  }
505

506
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
3,897✔
507
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
3,897!
508
  if (role == TAOS_SYNC_ROLE_VOTER) {
3,897!
509
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
510
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
511
    vmReleaseVnode(pMgmt, pVnode);
×
512
    return -1;
×
513
  }
514

515
  dInfo("vgId:%d, checking node catch up", req.vgId);
3,897!
516
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
3,897✔
517
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
3,703✔
518
    vmReleaseVnode(pMgmt, pVnode);
3,703✔
519
    return -1;
3,703✔
520
  }
521

522
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
194!
523

524
  int32_t vgId = req.vgId;
194✔
525
  dInfo("vgId:%d, start to alter vnode type replica:%d selfIndex:%d strict:%d changeVersion:%d", vgId, req.replica,
194!
526
        req.selfIndex, req.strict, req.changeVersion);
527
  for (int32_t i = 0; i < req.replica; ++i) {
772✔
528
    SReplica *pReplica = &req.replicas[i];
578✔
529
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
578!
530
  }
531
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
194!
532
    SReplica *pReplica = &req.learnerReplicas[i];
×
533
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
×
534
  }
535

536
  if (req.replica <= 0 || (req.selfIndex < 0 && req.learnerSelfIndex < 0) || req.selfIndex >= req.replica ||
194!
537
      req.learnerSelfIndex >= req.learnerReplica) {
194!
538
    terrno = TSDB_CODE_INVALID_MSG;
×
539
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
540
    vmReleaseVnode(pMgmt, pVnode);
×
541
    return -1;
×
542
  }
543

544
  SReplica *pReplica = NULL;
194✔
545
  if (req.selfIndex != -1) {
194!
546
    pReplica = &req.replicas[req.selfIndex];
194✔
547
  } else {
548
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
×
549
  }
550

551
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
194!
552
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
194!
553
    terrno = TSDB_CODE_INVALID_MSG;
×
554
    dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode", vgId, pReplica->id, pReplica->fqdn,
×
555
           pReplica->port);
556
    vmReleaseVnode(pMgmt, pVnode);
×
557
    return -1;
×
558
  }
559

560
  dInfo("vgId:%d, start to close vnode", vgId);
194!
561
  SWrapperCfg wrapperCfg = {
194✔
562
      .dropped = pVnode->dropped,
194✔
563
      .vgId = pVnode->vgId,
194✔
564
      .vgVersion = pVnode->vgVersion,
194✔
565
      .diskPrimary = pVnode->diskPrimary,
194✔
566
  };
567
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
194✔
568

569
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
194✔
570
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
194✔
571

572
  int32_t diskPrimary = wrapperCfg.diskPrimary;
194✔
573
  char    path[TSDB_FILENAME_LEN] = {0};
194✔
574
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
194✔
575

576
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
194!
577
  if (vnodeAlterReplica(path, &req, diskPrimary, pMgmt->pTfs) < 0) {
194!
578
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
579
    return -1;
×
580
  }
581

582
  dInfo("vgId:%d, begin to open vnode", vgId);
194!
583
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, false);
194✔
584
  if (pImpl == NULL) {
194!
585
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
586
    return -1;
×
587
  }
588

589
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
194!
590
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
591
    return -1;
×
592
  }
593

594
  if (vnodeStart(pImpl) != 0) {
194!
595
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
596
    return -1;
×
597
  }
598

599
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process alter-node-type-request, vnode config is altered",
194!
600
        req.vgId, TMSG_INFO(pMsg->msgType));
601
  return 0;
194✔
602
}
603

604
int32_t vmProcessCheckLearnCatchupReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
605
  SCheckLearnCatchupReq req = {0};
×
606
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
×
607
    terrno = TSDB_CODE_INVALID_MSG;
×
608
    return -1;
×
609
  }
610

611
  if (req.learnerReplicas == 0) {
612
    req.learnerSelfIndex = -1;
613
  }
614

615
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process check-learner-catchup-request", req.vgId,
×
616
        TMSG_INFO(pMsg->msgType));
617

618
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
×
619
  if (pVnode == NULL) {
×
620
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
621
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
622
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
623
    return -1;
×
624
  }
625

626
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
×
627
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
×
628
  if (role == TAOS_SYNC_ROLE_VOTER) {
×
629
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
630
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
631
    vmReleaseVnode(pMgmt, pVnode);
×
632
    return -1;
×
633
  }
634

635
  dInfo("vgId:%d, checking node catch up", req.vgId);
×
636
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
×
637
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
×
638
    vmReleaseVnode(pMgmt, pVnode);
×
639
    return -1;
×
640
  }
641

642
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
×
643

644
  vmReleaseVnode(pMgmt, pVnode);
×
645

646
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process check-learner-catchup-request", req.vgId,
×
647
        TMSG_INFO(pMsg->msgType));
648

649
  return 0;
×
650
}
651

652
int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
32✔
653
  SDisableVnodeWriteReq req = {0};
32✔
654
  if (tDeserializeSDisableVnodeWriteReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
32!
655
    terrno = TSDB_CODE_INVALID_MSG;
×
656
    return -1;
×
657
  }
658

659
  dInfo("vgId:%d, vnode write disable:%d", req.vgId, req.disable);
32!
660

661
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
32✔
662
  if (pVnode == NULL) {
32!
663
    dError("vgId:%d, failed to disable write since %s", req.vgId, terrstr());
×
664
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
665
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
666
    return -1;
×
667
  }
668

669
  pVnode->disable = req.disable;
32✔
670
  vmReleaseVnode(pMgmt, pVnode);
32✔
671
  return 0;
32✔
672
}
673

674
int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
30✔
675
  SAlterVnodeHashRangeReq req = {0};
30✔
676
  if (tDeserializeSAlterVnodeHashRangeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
30!
677
    terrno = TSDB_CODE_INVALID_MSG;
×
678
    return -1;
×
679
  }
680

681
  int32_t srcVgId = req.srcVgId;
30✔
682
  int32_t dstVgId = req.dstVgId;
30✔
683

684
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, dstVgId);
30✔
685
  if (pVnode != NULL) {
30!
686
    dError("vgId:%d, vnode already exist", dstVgId);
×
687
    vmReleaseVnode(pMgmt, pVnode);
×
688
    terrno = TSDB_CODE_VND_ALREADY_EXIST;
×
689
    return -1;
×
690
  }
691

692
  dInfo("vgId:%d, start to alter vnode hashrange:[%u, %u], dstVgId:%d", req.srcVgId, req.hashBegin, req.hashEnd,
30!
693
        req.dstVgId);
694
  pVnode = vmAcquireVnode(pMgmt, srcVgId);
30✔
695
  if (pVnode == NULL) {
30!
696
    dError("vgId:%d, failed to alter hashrange since %s", srcVgId, terrstr());
×
697
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
698
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
699
    return -1;
×
700
  }
701

702
  SWrapperCfg wrapperCfg = {
30✔
703
      .dropped = pVnode->dropped,
30✔
704
      .vgId = dstVgId,
705
      .vgVersion = pVnode->vgVersion,
30✔
706
      .diskPrimary = pVnode->diskPrimary,
30✔
707
  };
708
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
30✔
709

710
  // prepare alter
711
  pVnode->toVgId = dstVgId;
30✔
712
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
30!
713
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
714
    return -1;
×
715
  }
716

717
  dInfo("vgId:%d, close vnode", srcVgId);
30!
718
  vmCloseVnode(pMgmt, pVnode, true, false);
30✔
719

720
  int32_t diskPrimary = wrapperCfg.diskPrimary;
30✔
721
  char    srcPath[TSDB_FILENAME_LEN] = {0};
30✔
722
  char    dstPath[TSDB_FILENAME_LEN] = {0};
30✔
723
  snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
30✔
724
  snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
30✔
725

726
  dInfo("vgId:%d, alter vnode hashrange at %s", srcVgId, srcPath);
30!
727
  if (vnodeAlterHashRange(srcPath, dstPath, &req, diskPrimary, pMgmt->pTfs) < 0) {
30!
728
    dError("vgId:%d, failed to alter vnode hashrange since %s", srcVgId, terrstr());
×
729
    return -1;
×
730
  }
731

732
  dInfo("vgId:%d, open vnode", dstVgId);
30!
733
  SVnode *pImpl = vnodeOpen(dstPath, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, false);
30✔
734

735
  if (pImpl == NULL) {
30!
736
    dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr());
×
737
    return -1;
×
738
  }
739

740
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
30!
741
    dError("vgId:%d, failed to open vnode mgmt since %s", dstVgId, terrstr());
×
742
    return -1;
×
743
  }
744

745
  if (vnodeStart(pImpl) != 0) {
30!
746
    dError("vgId:%d, failed to start sync since %s", dstVgId, terrstr());
×
747
    return -1;
×
748
  }
749

750
  // complete alter
751
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
30!
752
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
753
    return -1;
×
754
  }
755

756
  dInfo("vgId:%d, vnode hashrange is altered", dstVgId);
30!
757
  return 0;
30✔
758
}
759

760
int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
1,358✔
761
  SAlterVnodeReplicaReq alterReq = {0};
1,358✔
762
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
1,358!
763
    terrno = TSDB_CODE_INVALID_MSG;
×
764
    return -1;
×
765
  }
766

767
  if (alterReq.learnerReplica == 0) {
1,358✔
768
    alterReq.learnerSelfIndex = -1;
989✔
769
  }
770

771
  int32_t vgId = alterReq.vgId;
1,358✔
772
  dInfo(
1,358!
773
      "vgId:%d, vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
774
      "learnerSelfIndex:%d strict:%d changeVersion:%d",
775
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
776
      alterReq.learnerSelfIndex, alterReq.strict, alterReq.changeVersion);
777

778
  for (int32_t i = 0; i < alterReq.replica; ++i) {
5,316✔
779
    SReplica *pReplica = &alterReq.replicas[i];
3,958✔
780
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
3,958!
781
  }
782
  for (int32_t i = 0; i < alterReq.learnerReplica; ++i) {
1,727✔
783
    SReplica *pReplica = &alterReq.learnerReplicas[i];
369✔
784
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
369!
785
  }
786

787
  if (alterReq.replica <= 0 || (alterReq.selfIndex < 0 && alterReq.learnerSelfIndex < 0) ||
1,358!
788
      alterReq.selfIndex >= alterReq.replica || alterReq.learnerSelfIndex >= alterReq.learnerReplica) {
1,358!
789
    terrno = TSDB_CODE_INVALID_MSG;
×
790
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
791
    return -1;
×
792
  }
793

794
  SReplica *pReplica = NULL;
1,358✔
795
  if (alterReq.selfIndex != -1) {
1,358!
796
    pReplica = &alterReq.replicas[alterReq.selfIndex];
1,358✔
797
  } else {
798
    pReplica = &alterReq.learnerReplicas[alterReq.learnerSelfIndex];
×
799
  }
800

801
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
1,358!
802
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
1,358!
803
    terrno = TSDB_CODE_INVALID_MSG;
×
804
    dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode", vgId, pReplica->id, pReplica->fqdn,
×
805
           pReplica->port);
806
    return -1;
×
807
  }
808

809
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
1,358✔
810
  if (pVnode == NULL) {
1,358!
811
    dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
×
812
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
813
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
814
    return -1;
×
815
  }
816

817
  dInfo("vgId:%d, start to close vnode", vgId);
1,358!
818
  SWrapperCfg wrapperCfg = {
1,358✔
819
      .dropped = pVnode->dropped,
1,358✔
820
      .vgId = pVnode->vgId,
1,358✔
821
      .vgVersion = pVnode->vgVersion,
1,358✔
822
      .diskPrimary = pVnode->diskPrimary,
1,358✔
823
  };
824
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
1,358✔
825

826
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
1,358✔
827
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
1,358✔
828

829
  int32_t diskPrimary = wrapperCfg.diskPrimary;
1,358✔
830
  char    path[TSDB_FILENAME_LEN] = {0};
1,358✔
831
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
1,358✔
832

833
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
1,358!
834
  if (vnodeAlterReplica(path, &alterReq, diskPrimary, pMgmt->pTfs) < 0) {
1,358!
835
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
836
    return -1;
×
837
  }
838

839
  dInfo("vgId:%d, begin to open vnode", vgId);
1,358!
840
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, false);
1,358✔
841
  if (pImpl == NULL) {
1,358!
842
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
843
    return -1;
×
844
  }
845

846
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
1,358!
847
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
848
    return -1;
×
849
  }
850

851
  if (vnodeStart(pImpl) != 0) {
1,358!
852
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
853
    return -1;
×
854
  }
855

856
  dInfo(
1,358!
857
      "vgId:%d, vnode management handle msgType:%s, end to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
858
      "learnerSelfIndex:%d strict:%d",
859
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
860
      alterReq.learnerSelfIndex, alterReq.strict);
861
  return 0;
1,358✔
862
}
863

864
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
4,034✔
865
  int32_t       code = 0;
4,034✔
866
  SDropVnodeReq dropReq = {0};
4,034✔
867
  if (tDeserializeSDropVnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
4,034!
868
    terrno = TSDB_CODE_INVALID_MSG;
×
869
    return terrno;
×
870
  }
871

872
  int32_t vgId = dropReq.vgId;
4,034✔
873
  dInfo("vgId:%d, start to drop vnode", vgId);
4,034!
874

875
  if (dropReq.dnodeId != pMgmt->pData->dnodeId) {
4,034!
876
    terrno = TSDB_CODE_INVALID_MSG;
×
877
    dError("vgId:%d, dnodeId:%d not matched with local dnode", dropReq.vgId, dropReq.dnodeId);
×
878
    return terrno;
×
879
  }
880

881
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, vgId, false);
4,034✔
882
  if (pVnode == NULL) {
4,034!
883
    dInfo("vgId:%d, failed to drop since %s", vgId, terrstr());
×
884
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
885
    return terrno;
×
886
  }
887

888
  pVnode->dropped = 1;
4,034✔
889
  if ((code = vmWriteVnodeListToFile(pMgmt)) != 0) {
4,034!
890
    pVnode->dropped = 0;
×
891
    vmReleaseVnode(pMgmt, pVnode);
×
892
    return code;
×
893
  }
894

895
  vmCloseVnode(pMgmt, pVnode, false, false);
4,034✔
896
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
4,034!
897
    dError("vgId:%d, failed to write vnode list since %s", vgId, terrstr());
×
898
  }
899

900
  dInfo("vgId:%d, is dropped", vgId);
4,034!
901
  return 0;
4,034✔
902
}
903

904
int32_t vmProcessArbHeartBeatReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
79✔
905
  SVArbHeartBeatReq arbHbReq = {0};
79✔
906
  SVArbHeartBeatRsp arbHbRsp = {0};
79✔
907
  if (tDeserializeSVArbHeartBeatReq(pMsg->pCont, pMsg->contLen, &arbHbReq) != 0) {
79!
908
    terrno = TSDB_CODE_INVALID_MSG;
×
909
    return -1;
×
910
  }
911

912
  if (arbHbReq.dnodeId != pMgmt->pData->dnodeId) {
79!
913
    terrno = TSDB_CODE_INVALID_MSG;
×
914
    dError("dnodeId:%d not matched with local dnode", arbHbReq.dnodeId);
×
915
    goto _OVER;
×
916
  }
917

918
  if (strlen(arbHbReq.arbToken) == 0) {
79!
919
    terrno = TSDB_CODE_INVALID_MSG;
×
920
    dError("dnodeId:%d arbToken is empty", arbHbReq.dnodeId);
×
921
    goto _OVER;
×
922
  }
923

924
  size_t size = taosArrayGetSize(arbHbReq.hbMembers);
79✔
925

926
  arbHbRsp.dnodeId = pMgmt->pData->dnodeId;
79✔
927
  tstrncpy(arbHbRsp.arbToken, arbHbReq.arbToken, TSDB_ARB_TOKEN_SIZE);
79✔
928
  arbHbRsp.hbMembers = taosArrayInit(size, sizeof(SVArbHbRspMember));
79✔
929
  if (arbHbRsp.hbMembers == NULL) {
79!
930
    goto _OVER;
×
931
  }
932

933
  for (int32_t i = 0; i < size; i++) {
179✔
934
    SVArbHbReqMember *pReqMember = taosArrayGet(arbHbReq.hbMembers, i);
100✔
935
    SVnodeObj        *pVnode = vmAcquireVnode(pMgmt, pReqMember->vgId);
100✔
936
    if (pVnode == NULL) {
100!
937
      dError("dnodeId:%d vgId:%d not found failed to process arb hb req", arbHbReq.dnodeId, pReqMember->vgId);
×
938
      continue;
×
939
    }
940

941
    SVArbHbRspMember rspMember = {0};
100✔
942
    rspMember.vgId = pReqMember->vgId;
100✔
943
    rspMember.hbSeq = pReqMember->hbSeq;
100✔
944
    if (vnodeGetArbToken(pVnode->pImpl, rspMember.memberToken) != 0) {
100!
945
      dError("dnodeId:%d vgId:%d failed to get arb token", arbHbReq.dnodeId, pReqMember->vgId);
×
946
      vmReleaseVnode(pMgmt, pVnode);
×
947
      continue;
×
948
    }
949

950
    if (vnodeUpdateArbTerm(pVnode->pImpl, arbHbReq.arbTerm) != 0) {
100!
951
      dError("dnodeId:%d vgId:%d failed to update arb term", arbHbReq.dnodeId, pReqMember->vgId);
×
952
      vmReleaseVnode(pMgmt, pVnode);
×
953
      continue;
×
954
    }
955

956
    if (taosArrayPush(arbHbRsp.hbMembers, &rspMember) == NULL) {
200!
957
      dError("dnodeId:%d vgId:%d failed to push arb hb rsp member", arbHbReq.dnodeId, pReqMember->vgId);
×
958
      vmReleaseVnode(pMgmt, pVnode);
×
959
      goto _OVER;
×
960
    }
961

962
    vmReleaseVnode(pMgmt, pVnode);
100✔
963
  }
964

965
  SRpcMsg rspMsg = {.info = pMsg->info};
79✔
966
  int32_t rspLen = tSerializeSVArbHeartBeatRsp(NULL, 0, &arbHbRsp);
79✔
967
  if (rspLen < 0) {
79!
968
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
969
    goto _OVER;
×
970
  }
971

972
  void *pRsp = rpcMallocCont(rspLen);
79✔
973
  if (pRsp == NULL) {
79!
974
    terrno = terrno;
×
975
    goto _OVER;
×
976
  }
977

978
  if (tSerializeSVArbHeartBeatRsp(pRsp, rspLen, &arbHbRsp) <= 0) {
79!
979
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
980
    rpcFreeCont(pRsp);
×
981
    goto _OVER;
×
982
  }
983
  pMsg->info.rsp = pRsp;
79✔
984
  pMsg->info.rspLen = rspLen;
79✔
985

986
  terrno = TSDB_CODE_SUCCESS;
79✔
987

988
_OVER:
79✔
989
  tFreeSVArbHeartBeatReq(&arbHbReq);
79✔
990
  tFreeSVArbHeartBeatRsp(&arbHbRsp);
79✔
991
  return terrno;
79✔
992
}
993

994
SArray *vmGetMsgHandles() {
2,672✔
995
  int32_t code = -1;
2,672✔
996
  SArray *pArray = taosArrayInit(32, sizeof(SMgmtHandle));
2,672✔
997
  if (pArray == NULL) goto _OVER;
2,672!
998

999
  if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1000
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,672!
1001
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,672!
1002
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,672!
1003
  if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,672!
1004
  if (dmSetMgmtHandle(pArray, TDMT_VND_EXEC_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,672!
1005
  if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,672!
1006
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,672!
1007
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1008
  if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1009
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,672!
1010
  if (dmSetMgmtHandle(pArray, TDMT_VND_VSUBTABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,672!
1011
  if (dmSetMgmtHandle(pArray, TDMT_VND_VSTB_REF_DBS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,672!
1012
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_CFG, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,672!
1013
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,672!
1014
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,672!
1015
  if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,672!
1016
  if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,672!
1017
  if (dmSetMgmtHandle(pArray, TDMT_SCH_TASK_NOTIFY, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,672!
1018
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1019
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1020
  if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_TTL_EXPIRED_TBS, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1021
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1022
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1023
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1024
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1025
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_SMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1026
  if (dmSetMgmtHandle(pArray, TDMT_VND_CANCEL_SMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1027
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1028
  if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1029
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1030
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DELETE_SUB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1031
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1032
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SEEK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,672!
1033
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_ADD_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1034
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DEL_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1035
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,672!
1036
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME_PUSH, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,672!
1037
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_WALINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,672!
1038
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_COMMITTEDINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,672!
1039
  if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1040
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1041
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1042
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,672!
1043
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1044
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1045
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_COMPACT_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,672!
1046
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1047
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_NAME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,672!
1048

1049
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1050
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1051
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,672!
1052
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
2,672!
1053
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
2,672!
1054
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
2,672!
1055
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
2,672!
1056
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
2,672!
1057
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
2,672!
1058
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
2,672!
1059
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
2,672!
1060
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1061
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1062
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_ALL_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1063
  if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
2,672!
1064
  if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
2,672!
1065
  if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
2,672!
1066
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY, vmPutMsgToStreamLongExecQueue, 0) == NULL) goto _OVER;
2,672!
1067

1068
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_CHKPT_EXEC, vmPutMsgToStreamChkQueue, 0) == NULL) goto _OVER;
2,672!
1069
  if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,672!
1070
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,672!
1071
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,672!
1072

1073
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1074
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_CONSEN_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1075
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1076
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1077
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1078
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_START, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1079
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1080
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1081
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1082

1083
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,672!
1084
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1085
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1086
  if (dmSetMgmtHandle(pArray, TDMT_VND_DISABLE_WRITE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,672!
1087
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,672!
1088
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1089
  if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1090
  if (dmSetMgmtHandle(pArray, TDMT_VND_S3MIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1091
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
2,672!
1092
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,672!
1093
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,672!
1094
  if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,672!
1095
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CONFIG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1096

1097
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT_ELECTION, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,672!
1098
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,672!
1099
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,672!
1100
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,672!
1101
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,672!
1102
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,672!
1103
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,672!
1104
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,672!
1105
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,672!
1106
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,672!
1107
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,672!
1108
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_FORCE_FOLLOWER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,672!
1109

1110
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
2,672!
1111
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
2,672!
1112
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
2,672!
1113
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
2,672!
1114
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
2,672!
1115

1116
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_HEARTBEAT, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,672!
1117
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_CHECK_SYNC, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,672!
1118
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_ASSIGNED_LEADER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,672!
1119

1120
  code = 0;
2,672✔
1121

1122
_OVER:
2,672✔
1123
  if (code != 0) {
2,672!
1124
    taosArrayDestroy(pArray);
×
1125
    return NULL;
×
1126
  } else {
1127
    return pArray;
2,672✔
1128
  }
1129
}
1130

1131
void vmUpdateMetricsInfo(SVnodeMgmt *pMgmt, int64_t clusterId) {
×
1132
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
1133

1134
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
1135
  while (pIter) {
×
1136
    SVnodeObj **ppVnode = pIter;
×
1137
    if (ppVnode == NULL || *ppVnode == NULL) {
×
1138
      continue;
×
1139
    }
1140

1141
    SVnodeObj *pVnode = *ppVnode;
×
1142
    if (!pVnode->failed) {
×
1143
      SRawWriteMetrics metrics = {0};
×
1144
      if (vnodeGetRawWriteMetrics(pVnode->pImpl, &metrics) == 0) {
×
1145
        // Add the metrics to the global metrics system with cluster ID
1146
        SName   name = {0};
×
1147
        int32_t code = tNameFromString(&name, pVnode->pImpl->config.dbname, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1148
        if (code < 0) {
×
1149
          dError("failed to get db name since %s", tstrerror(code));
×
1150
          continue;
×
1151
        }
1152
        code = addWriteMetrics(pVnode->vgId, pMgmt->pData->dnodeId, clusterId, tsLocalEp, name.dbname, &metrics);
×
1153
        if (code != TSDB_CODE_SUCCESS) {
×
1154
          dError("Failed to add write metrics for vgId: %d, code: %d", pVnode->vgId, code);
×
1155
        } else {
1156
          // After successfully adding metrics, reset the vnode's write metrics using atomic operations
1157
          if (vnodeResetRawWriteMetrics(pVnode->pImpl, &metrics) != 0) {
×
1158
            dError("Failed to reset write metrics for vgId: %d", pVnode->vgId);
×
1159
          }
1160
        }
1161
      } else {
1162
        dError("Failed to get write metrics for vgId: %d", pVnode->vgId);
×
1163
      }
1164
    }
1165
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
1166
  }
1167

1168
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
1169
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc