• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4945

30 Jan 2026 06:19AM UTC coverage: 66.87% (+0.02%) from 66.849%
#4945

push

travis-ci

web-flow
merge: from main to 3.0 #34453

1126 of 2018 new or added lines in 72 files covered. (55.8%)

13708 existing lines in 159 files now uncovered.

205277 of 306978 relevant lines covered (66.87%)

126353544.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.55
/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "metrics.h"
18
#include "taos_monitor.h"
19
#include "vmInt.h"
20
#include "vnd.h"
21
#include "vnodeInt.h"
22
#include "tencrypt.h"
23

24
extern taos_counter_t *tsInsertCounter;
25

26
// Forward declaration for function defined in metrics.c
27
extern int32_t addWriteMetrics(int32_t vgId, int32_t dnodeId, int64_t clusterId, const char *dnodeEp,
28
                               const char *dbname, const SRawWriteMetrics *pRawMetrics);
29

30
void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
41,731,269✔
31
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad));
41,731,269✔
32
  if (pInfo->pVloads == NULL) return;
41,731,269✔
33

34
  tfsUpdateSize(pMgmt->pTfs);
41,731,269✔
35

36
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
41,731,269✔
37

38
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
41,731,269✔
39
  while (pIter) {
160,449,098✔
40
    SVnodeObj **ppVnode = pIter;
118,717,829✔
41
    if (ppVnode == NULL || *ppVnode == NULL) continue;
118,717,829✔
42

43
    SVnodeObj *pVnode = *ppVnode;
118,717,829✔
44
    SVnodeLoad vload = {.vgId = pVnode->vgId};
118,717,829✔
45
    if (!pVnode->failed) {
118,717,829✔
46
      if (vnodeGetLoad(pVnode->pImpl, &vload) != 0) {
118,717,829✔
47
        dError("failed to get vnode load");
×
48
      }
49
      if (isReset) vnodeResetLoad(pVnode->pImpl, &vload);
118,717,829✔
50
    }
51
    if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
237,435,658✔
52
      dError("failed to push vnode load");
×
53
    }
54
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
118,717,829✔
55
  }
56

57
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
41,731,269✔
58
}
59

60
void vmSetVnodeSyncTimeout(SVnodeMgmt *pMgmt) {
×
61
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
62

63
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
64
  while (pIter) {
×
65
    SVnodeObj **ppVnode = pIter;
×
66
    if (ppVnode == NULL || *ppVnode == NULL) continue;
×
67

68
    SVnodeObj *pVnode = *ppVnode;
×
69

70
    if (vnodeSetSyncTimeout(pVnode->pImpl, tsVnodeElectIntervalMs) != 0) {
×
71
      dError("vgId:%d, failed to vnodeSetSyncTimeout", pVnode->vgId);
×
72
    }
73
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
74
  }
75

76
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
77
}
×
78

79
void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
×
80
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoadLite));
×
81
  if (!pInfo->pVloads) return;
×
82

83
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
84

85
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
86
  while (pIter) {
×
87
    SVnodeObj **ppVnode = pIter;
×
88
    if (ppVnode == NULL || *ppVnode == NULL) continue;
×
89

90
    SVnodeObj *pVnode = *ppVnode;
×
91
    if (!pVnode->failed) {
×
92
      SVnodeLoadLite vload = {0};
×
93
      if (vnodeGetLoadLite(pVnode->pImpl, &vload) == 0) {
×
94
        if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
×
95
          taosArrayDestroy(pInfo->pVloads);
×
96
          pInfo->pVloads = NULL;
×
97
          break;
×
98
        }
99
      }
100
    }
101
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
102
  }
103

104
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
105
}
106

107
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
62✔
108
  SMonVloadInfo vloads = {0};
62✔
109
  vmGetVnodeLoads(pMgmt, &vloads, true);
62✔
110

111
  SArray *pVloads = vloads.pVloads;
62✔
112
  if (pVloads == NULL) return;
62✔
113

114
  int32_t totalVnodes = 0;
62✔
115
  int32_t masterNum = 0;
62✔
116
  int64_t numOfSelectReqs = 0;
62✔
117
  int64_t numOfInsertReqs = 0;
62✔
118
  int64_t numOfInsertSuccessReqs = 0;
62✔
119
  int64_t numOfBatchInsertReqs = 0;
62✔
120
  int64_t numOfBatchInsertSuccessReqs = 0;
62✔
121

122
  for (int32_t i = 0; i < taosArrayGetSize(pVloads); ++i) {
186✔
123
    SVnodeLoad *pLoad = taosArrayGet(pVloads, i);
124✔
124
    numOfSelectReqs += pLoad->numOfSelectReqs;
124✔
125
    numOfInsertReqs += pLoad->numOfInsertReqs;
124✔
126
    numOfInsertSuccessReqs += pLoad->numOfInsertSuccessReqs;
124✔
127
    numOfBatchInsertReqs += pLoad->numOfBatchInsertReqs;
124✔
128
    numOfBatchInsertSuccessReqs += pLoad->numOfBatchInsertSuccessReqs;
124✔
129
    if (pLoad->syncState == TAOS_SYNC_STATE_LEADER || pLoad->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
124✔
130
      masterNum++;
124✔
131
    }
132
    totalVnodes++;
124✔
133
  }
134

135
  pInfo->vstat.totalVnodes = totalVnodes;
62✔
136
  pInfo->vstat.masterNum = masterNum;
62✔
137
  pInfo->vstat.numOfSelectReqs = numOfSelectReqs;
62✔
138
  pInfo->vstat.numOfInsertReqs = numOfInsertReqs;                          // delta
62✔
139
  pInfo->vstat.numOfInsertSuccessReqs = numOfInsertSuccessReqs;            // delta
62✔
140
  pInfo->vstat.numOfBatchInsertReqs = numOfBatchInsertReqs;                // delta
62✔
141
  pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;  // delta
62✔
142
  pMgmt->state.totalVnodes = totalVnodes;
62✔
143
  pMgmt->state.masterNum = masterNum;
62✔
144
  pMgmt->state.numOfSelectReqs = numOfSelectReqs;
62✔
145
  pMgmt->state.numOfInsertReqs = numOfInsertReqs;
62✔
146
  pMgmt->state.numOfInsertSuccessReqs = numOfInsertSuccessReqs;
62✔
147
  pMgmt->state.numOfBatchInsertReqs = numOfBatchInsertReqs;
62✔
148
  pMgmt->state.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;
62✔
149

150
  if (tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs) != 0) {
62✔
151
    dError("failed to get tfs monitor info");
×
152
  }
153
  taosArrayDestroy(pVloads);
62✔
154
}
155

156
void vmCleanExpriedSamples(SVnodeMgmt *pMgmt) {
62✔
157
  int list_size = taos_counter_get_keys_size(tsInsertCounter);
62✔
158
  if (list_size == 0) return;
62✔
159
  int32_t *vgroup_ids;
×
160
  char   **keys;
×
161
  int      r = 0;
×
162
  r = taos_counter_get_vgroup_ids(tsInsertCounter, &keys, &vgroup_ids, &list_size);
×
163
  if (r) {
×
164
    dError("failed to get vgroup ids");
×
165
    return;
×
166
  }
167
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
168
  for (int i = 0; i < list_size; i++) {
×
169
    int32_t vgroup_id = vgroup_ids[i];
×
170
    void   *vnode = taosHashGet(pMgmt->runngingHash, &vgroup_id, sizeof(int32_t));
×
171
    if (vnode == NULL) {
×
172
      r = taos_counter_delete(tsInsertCounter, keys[i]);
×
173
      if (r) {
×
174
        dError("failed to delete monitor sample key:%s", keys[i]);
×
175
      }
176
    }
177
  }
178
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
179
  if (vgroup_ids) taosMemoryFree(vgroup_ids);
×
180
  if (keys) taosMemoryFree(keys);
×
181
  return;
×
182
}
183

184
void vmCleanExpiredMetrics(SVnodeMgmt *pMgmt) {
×
185
  if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0 || !tsEnableMetrics) {
×
186
    return;
×
187
  }
188

189
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
190
  void     *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
191
  SHashObj *pValidVgroups = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
×
192
  if (pValidVgroups == NULL) {
×
193
    (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
194
    return;
×
195
  }
196

197
  while (pIter != NULL) {
×
198
    SVnodeObj **ppVnode = pIter;
×
199
    if (ppVnode && *ppVnode) {
×
200
      int32_t vgId = (*ppVnode)->vgId;
×
201
      char    dummy = 1;  // hash table value (we only care about the key)
×
202
      if (taosHashPut(pValidVgroups, &vgId, sizeof(int32_t), &dummy, sizeof(char)) != 0) {
×
203
        dError("failed to put vgId:%d to valid vgroups hash", vgId);
×
204
      }
205
    }
206
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
207
  }
208
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
209

210
  // Clean expired metrics by removing metrics for non-existent vgroups
211
  int32_t code = cleanupExpiredMetrics(pValidVgroups);
×
212
  if (code != TSDB_CODE_SUCCESS) {
×
213
    dError("failed to clean expired metrics, code:%d", code);
×
214
  }
215

216
  taosHashCleanup(pValidVgroups);
×
217
}
218

219
static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
2,733,853✔
220
  memcpy(pCfg, &vnodeCfgDefault, sizeof(SVnodeCfg));
2,733,853✔
221

222
  pCfg->vgId = pCreate->vgId;
2,733,853✔
223
  tstrncpy(pCfg->dbname, pCreate->db, sizeof(pCfg->dbname));
2,732,740✔
224
  pCfg->dbId = pCreate->dbUid;
2,733,081✔
225
  pCfg->szPage = pCreate->pageSize * 1024;
2,733,381✔
226
  pCfg->szCache = pCreate->pages;
2,732,749✔
227
  pCfg->cacheLast = pCreate->cacheLast;
2,732,773✔
228
  pCfg->cacheLastSize = pCreate->cacheLastSize;
2,730,521✔
229
  pCfg->szBuf = (uint64_t)pCreate->buffer * 1024 * 1024;
2,731,296✔
230
  pCfg->isWeak = true;
2,731,637✔
231
  pCfg->isTsma = pCreate->isTsma;
2,732,020✔
232
  pCfg->tsdbCfg.compression = pCreate->compression;
2,732,655✔
233
  pCfg->tsdbCfg.precision = pCreate->precision;
2,730,710✔
234
  pCfg->tsdbCfg.days = pCreate->daysPerFile;
2,733,085✔
235
  pCfg->tsdbCfg.keep0 = pCreate->daysToKeep0;
2,730,191✔
236
  pCfg->tsdbCfg.keep1 = pCreate->daysToKeep1;
2,730,975✔
237
  pCfg->tsdbCfg.keep2 = pCreate->daysToKeep2;
2,730,063✔
238
  pCfg->tsdbCfg.keepTimeOffset = pCreate->keepTimeOffset;
2,733,141✔
239
  pCfg->tsdbCfg.minRows = pCreate->minRows;
2,730,074✔
240
  pCfg->tsdbCfg.maxRows = pCreate->maxRows;
2,729,848✔
241
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
242
  // pCfg->tsdbCfg.encryptAlgr = pCreate->encryptAlgr;
243
  tstrncpy(pCfg->tsdbCfg.encryptData.encryptAlgrName, pCreate->encryptAlgrName, TSDB_ENCRYPT_ALGR_NAME_LEN);
2,729,284✔
244
  if (pCfg->tsdbCfg.encryptAlgr == DND_CA_SM4 || pCfg->tsdbCfg.encryptData.encryptAlgrName[0] != '\0') {
2,731,446✔
245
    tstrncpy(pCfg->tsdbCfg.encryptData.encryptKey, tsDataKey, ENCRYPT_KEY_LEN + 1);
2,818✔
246
  }
247
#else
248
  pCfg->tsdbCfg.encryptAlgr = 0;
249
#endif
250

251
  pCfg->walCfg.vgId = pCreate->vgId;  // pCreate->mountVgId ? pCreate->mountVgId : pCreate->vgId;
2,728,904✔
252
  pCfg->walCfg.fsyncPeriod = pCreate->walFsyncPeriod;
2,731,412✔
253
  pCfg->walCfg.retentionPeriod = pCreate->walRetentionPeriod;
2,732,648✔
254
  pCfg->walCfg.rollPeriod = pCreate->walRollPeriod;
2,730,098✔
255
  pCfg->walCfg.retentionSize = pCreate->walRetentionSize;
2,731,169✔
256
  pCfg->walCfg.segSize = pCreate->walSegmentSize;
2,728,036✔
257
  pCfg->walCfg.level = pCreate->walLevel;
2,730,538✔
258
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
259
  // pCfg->walCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
260
  tstrncpy(pCfg->walCfg.encryptData.encryptAlgrName, pCreate->encryptAlgrName, TSDB_ENCRYPT_ALGR_NAME_LEN);
2,729,064✔
261
  if (pCfg->walCfg.encryptAlgr == DND_CA_SM4 || pCfg->walCfg.encryptData.encryptAlgrName[0] != '\0') {
2,730,013✔
262
    tstrncpy(pCfg->walCfg.encryptData.encryptKey, tsDataKey, ENCRYPT_KEY_LEN + 1);
170✔
263
  }
264
#else
265
  pCfg->walCfg.encryptAlgr = 0;
266
#endif
267

268
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
269
  // pCfg->tdbEncryptAlgorithm = pCreate->encryptAlgorithm;
270
  tstrncpy(pCfg->tdbEncryptData.encryptAlgrName, pCreate->encryptAlgrName, TSDB_ENCRYPT_ALGR_NAME_LEN);
2,731,900✔
271
  if (pCfg->tdbEncryptAlgr == DND_CA_SM4 || pCfg->tdbEncryptData.encryptAlgrName[0] != '\0') {
2,731,656✔
272
    tstrncpy(pCfg->tdbEncryptData.encryptKey, tsDataKey, ENCRYPT_KEY_LEN + 1);
331✔
273
  }
274
#else
275
  pCfg->tdbEncryptAlgr = 0;
276
#endif
277

278
  pCfg->sttTrigger = pCreate->sstTrigger;
2,729,363✔
279
  pCfg->hashBegin = pCreate->hashBegin;
2,730,372✔
280
  pCfg->hashEnd = pCreate->hashEnd;
2,727,049✔
281
  pCfg->hashMethod = pCreate->hashMethod;
2,729,136✔
282
  pCfg->hashPrefix = pCreate->hashPrefix;
2,728,875✔
283
  pCfg->hashSuffix = pCreate->hashSuffix;
2,729,666✔
284
  pCfg->tsdbPageSize = pCreate->tsdbPageSize * 1024;
2,728,551✔
285

286
  pCfg->ssChunkSize = pCreate->ssChunkSize;
2,728,895✔
287
  pCfg->ssKeepLocal = pCreate->ssKeepLocal;
2,725,750✔
288
  pCfg->ssCompact = pCreate->ssCompact;
2,727,582✔
289

290
  pCfg->isAudit = pCreate->isAudit;
2,726,195✔
291
  pCfg->allowDrop = pCreate->allowDrop;
2,727,776✔
292

293
  pCfg->standby = 0;
2,725,252✔
294
  pCfg->syncCfg.replicaNum = 0;
2,723,362✔
295
  pCfg->syncCfg.totalReplicaNum = 0;
2,728,314✔
296
  pCfg->syncCfg.changeVersion = pCreate->changeVersion;
2,728,789✔
297

298
  memset(&pCfg->syncCfg.nodeInfo, 0, sizeof(pCfg->syncCfg.nodeInfo));
2,721,424✔
299
  for (int32_t i = 0; i < pCreate->replica; ++i) {
6,387,140✔
300
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
3,659,944✔
301
    pNode->nodeId = pCreate->replicas[pCfg->syncCfg.replicaNum].id;
3,658,677✔
302
    pNode->nodePort = pCreate->replicas[pCfg->syncCfg.replicaNum].port;
3,649,874✔
303
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
3,652,565✔
304
    tstrncpy(pNode->nodeFqdn, pCreate->replicas[pCfg->syncCfg.replicaNum].fqdn, TSDB_FQDN_LEN);
3,654,536✔
305
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
3,659,110✔
306
    pCfg->syncCfg.replicaNum++;
3,660,035✔
307
  }
308
  if (pCreate->selfIndex != -1) {
2,732,584✔
309
    pCfg->syncCfg.myIndex = pCreate->selfIndex;
2,640,504✔
310
  }
311
  for (int32_t i = pCfg->syncCfg.replicaNum; i < pCreate->replica + pCreate->learnerReplica; ++i) {
2,812,866✔
312
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
92,486✔
313
    pNode->nodeId = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].id;
92,486✔
314
    pNode->nodePort = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].port;
92,486✔
315
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
92,486✔
316
    tstrncpy(pNode->nodeFqdn, pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].fqdn, TSDB_FQDN_LEN);
92,486✔
317
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
92,486✔
318
    pCfg->syncCfg.totalReplicaNum++;
92,486✔
319
  }
320
  pCfg->syncCfg.totalReplicaNum += pCfg->syncCfg.replicaNum;
2,722,263✔
321
  if (pCreate->learnerSelfIndex != -1) {
2,727,806✔
322
    pCfg->syncCfg.myIndex = pCreate->replica + pCreate->learnerSelfIndex;
92,486✔
323
  }
324
}
2,725,042✔
325

326
static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) {
2,729,428✔
327
  pCfg->vgId = pCreate->vgId;
2,729,428✔
328
  pCfg->vgVersion = pCreate->vgVersion;
2,727,546✔
329
  pCfg->dropped = 0;
2,728,244✔
330
  snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCreate->vgId);
2,720,296✔
331
}
2,731,271✔
332

333
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
2,720,284✔
334
  SCreateVnodeReq req = {0};
2,720,284✔
335
  SVnodeCfg       vnodeCfg = {0};
2,732,873✔
336
  SWrapperCfg     wrapperCfg = {0};
2,729,944✔
337
  int32_t         code = -1;
2,730,077✔
338
  char            path[TSDB_FILENAME_LEN] = {0};
2,730,077✔
339

340
  if (tDeserializeSCreateVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
2,729,603✔
UNCOV
341
    return TSDB_CODE_INVALID_MSG;
×
342
  }
343

344
  if (req.learnerReplica == 0) {
2,732,338✔
345
    req.learnerSelfIndex = -1;
2,640,303✔
346
  }
347

348
  dInfo(
2,732,338✔
349
      "vgId:%d, vnode management handle msgType:%s, start to create vnode, page:%d pageSize:%d buffer:%d szPage:%d "
350
      "szBuf:%" PRIu64 ", cacheLast:%d cacheLastSize:%d sstTrigger:%d tsdbPageSize:%d %d dbname:%s dbId:%" PRId64
351
      ", days:%d keep0:%d keep1:%d keep2:%d keepTimeOffset%d ssChunkSize:%d ssKeepLocal:%d ssCompact:%d tsma:%d "
352
      "precision:%d compression:%d minRows:%d maxRows:%d"
353
      ", wal fsync:%d level:%d retentionPeriod:%d retentionSize:%" PRId64 " rollPeriod:%d segSize:%" PRId64
354
      ", hash method:%d begin:%u end:%u prefix:%d surfix:%d replica:%d selfIndex:%d "
355
      "learnerReplica:%d learnerSelfIndex:%d strict:%d changeVersion:%d encryptAlgorithm:%d encryptAlgrName:%s, "
356
      "isAudit:%" PRIu8 " allowDrop:%" PRIu8,
357
      req.vgId, TMSG_INFO(pMsg->msgType), req.pages, req.pageSize, req.buffer, req.pageSize * 1024,
358
      (uint64_t)req.buffer * 1024 * 1024, req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize,
359
      req.tsdbPageSize * 1024, req.db, req.dbUid, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
360
      req.keepTimeOffset, req.ssChunkSize, req.ssKeepLocal, req.ssCompact, req.isTsma, req.precision, req.compression,
361
      req.minRows, req.maxRows, req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize,
362
      req.walRollPeriod, req.walSegmentSize, req.hashMethod, req.hashBegin, req.hashEnd, req.hashPrefix, req.hashSuffix,
363
      req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict, req.changeVersion,
364
      req.encryptAlgorithm, req.encryptAlgrName, req.isAudit, req.allowDrop);
365

366
  for (int32_t i = 0; i < req.replica; ++i) {
6,393,170✔
367
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port,
3,660,297✔
368
          req.replicas[i].id);
369
  }
370
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
2,825,359✔
371
    dInfo("vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.vgId, i, req.learnerReplicas[i].fqdn,
92,486✔
372
          req.learnerReplicas[i].port, req.replicas[i].id);
373
  }
374

375
  SReplica *pReplica = NULL;
2,732,873✔
376
  if (req.selfIndex != -1) {
2,732,873✔
377
    pReplica = &req.replicas[req.selfIndex];
2,640,387✔
378
  } else {
379
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
92,486✔
380
  }
381
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
2,732,873✔
382
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
2,732,873✔
UNCOV
383
    (void)tFreeSCreateVnodeReq(&req);
×
384

UNCOV
385
    code = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
UNCOV
386
    dError("vgId:%d, dnodeId:%d ep:%s:%u in request, ep:%s:%u in local, %s", req.vgId, pReplica->id,
×
387
           pReplica->fqdn, pReplica->port, tsLocalFqdn, tsServerPort, tstrerror(code));
UNCOV
388
    return code;
×
389
  }
390

391
  if (taosWaitCfgKeyLoaded() != 0) {
2,732,873✔
392
    (void)tFreeSCreateVnodeReq(&req);
×
UNCOV
393
    code = terrno;
×
UNCOV
394
    dError("vgId:%d, failed to create vnode since encrypt key is not loaded, reason:%s", req.vgId, tstrerror(code));
×
UNCOV
395
    return code;
×
396
  }
397

398
  if (req.encryptAlgrName[0] != '\0' && strlen(tsDataKey) == 0) {
2,732,873✔
399
    (void)tFreeSCreateVnodeReq(&req);
×
UNCOV
400
    code = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
UNCOV
401
    dError("vgId:%d, failed to create vnode since encrypt key is empty, reason:%s", req.vgId, tstrerror(code));
×
UNCOV
402
    return code;
×
403
  }
404

405
  vmGenerateVnodeCfg(&req, &vnodeCfg);
2,732,873✔
406

407
  vmGenerateWrapperCfg(pMgmt, &req, &wrapperCfg);
2,728,991✔
408

409
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, req.vgId, false);
2,731,113✔
410
  if (pVnode != NULL && (req.replica == 1 || !pVnode->failed)) {
2,732,538✔
411
    dError("vgId:%d, already exist", req.vgId);
20,560✔
412
    (void)tFreeSCreateVnodeReq(&req);
20,560✔
413
    vmReleaseVnode(pMgmt, pVnode);
20,560✔
414
    code = TSDB_CODE_VND_ALREADY_EXIST;
20,560✔
415
    return 0;
20,560✔
416
  }
417

418
  int32_t diskPrimary = vmGetPrimaryDisk(pMgmt, vnodeCfg.vgId);
2,711,978✔
419
  if (diskPrimary < 0) {
2,709,654✔
420
    diskPrimary = vmAllocPrimaryDisk(pMgmt, vnodeCfg.vgId);
2,711,128✔
421
  }
422
  wrapperCfg.diskPrimary = diskPrimary;
2,710,839✔
423

424
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
2,710,839✔
425

426
  if ((code = vnodeCreate(path, &vnodeCfg, diskPrimary, pMgmt->pTfs)) < 0) {
2,710,839✔
UNCOV
427
    dError("vgId:%d, failed to create vnode since %s", req.vgId, tstrerror(code));
×
UNCOV
428
    vmReleaseVnode(pMgmt, pVnode);
×
UNCOV
429
    vmCleanPrimaryDisk(pMgmt, req.vgId);
×
UNCOV
430
    (void)tFreeSCreateVnodeReq(&req);
×
431
    return code;
×
432
  }
433

434
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, true);
2,712,313✔
435
  if (pImpl == NULL) {
2,712,313✔
UNCOV
436
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
UNCOV
437
    code = terrno != 0 ? terrno : -1;
×
UNCOV
438
    goto _OVER;
×
439
  }
440

441
  code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl);
2,712,313✔
442
  if (code != 0) {
2,712,313✔
UNCOV
443
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
UNCOV
444
    code = terrno != 0 ? terrno : code;
×
UNCOV
445
    goto _OVER;
×
446
  }
447

448
  code = vnodeStart(pImpl);
2,712,313✔
449
  if (code != 0) {
2,711,756✔
UNCOV
450
    dError("vgId:%d, failed to start sync since %s", req.vgId, terrstr());
×
UNCOV
451
    goto _OVER;
×
452
  }
453

454
  code = vmWriteVnodeListToFile(pMgmt);
2,711,756✔
455
  if (code != 0) {
2,712,313✔
UNCOV
456
    code = terrno != 0 ? terrno : code;
×
UNCOV
457
    goto _OVER;
×
458
  }
459

460
_OVER:
2,712,313✔
461
  vmCleanPrimaryDisk(pMgmt, req.vgId);
2,712,313✔
462

463
  if (code != 0) {
2,711,436✔
UNCOV
464
    vmCloseFailedVnode(pMgmt, req.vgId);
×
465

UNCOV
466
    vnodeClose(pImpl);
×
UNCOV
467
    vnodeDestroy(0, path, pMgmt->pTfs, 0);
×
468
  } else {
469
    dInfo("vgId:%d, vnode management handle msgType:%s, end to create vnode, vnode is created", req.vgId,
2,711,436✔
470
          TMSG_INFO(pMsg->msgType));
471
  }
472

473
  (void)tFreeSCreateVnodeReq(&req);
2,711,436✔
474
  terrno = code;
2,711,595✔
475
  return code;
2,711,595✔
476
}
477

478
#ifdef USE_MOUNT
479
typedef struct {
480
  int64_t dbId;
481
  int32_t vgId;
482
  int32_t diskPrimary;
483
} SMountDbVgId;
484
extern int32_t vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo);
485
extern int32_t mndFetchSdbStables(const char *mntName, const char *path, void *output);
486

487
static int compareVnodeInfo(const void *p1, const void *p2) {
2,611✔
488
  SVnodeInfo *v1 = (SVnodeInfo *)p1;
2,611✔
489
  SVnodeInfo *v2 = (SVnodeInfo *)p2;
2,611✔
490

491
  if (v1->config.dbId == v2->config.dbId) {
2,611✔
492
    if (v1->config.vgId == v2->config.vgId) {
1,492✔
UNCOV
493
      return 0;
×
494
    }
495
    return v1->config.vgId > v2->config.vgId ? 1 : -1;
1,492✔
496
  }
497

498
  return v1->config.dbId > v2->config.dbId ? 1 : -1;
1,119✔
499
}
500
static int compareVgDiskPrimary(const void *p1, const void *p2) {
2,611✔
501
  SMountDbVgId *v1 = (SMountDbVgId *)p1;
2,611✔
502
  SMountDbVgId *v2 = (SMountDbVgId *)p2;
2,611✔
503

504
  if (v1->dbId == v2->dbId) {
2,611✔
505
    if (v1->vgId == v2->vgId) {
1,492✔
UNCOV
506
      return 0;
×
507
    }
508
    return v1->vgId > v2->vgId ? 1 : -1;
1,492✔
509
  }
510

511
  return v1->dbId > v2->dbId ? 1 : -1;
1,119✔
512
}
513

514
static int32_t vmRetrieveMountDnode(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
373✔
515
  int32_t   code = 0, lino = 0;
373✔
516
  TdFilePtr pFile = NULL;
373✔
517
  char     *content = NULL;
373✔
518
  SJson    *pJson = NULL;
373✔
519
  int64_t   size = 0;
373✔
520
  int64_t   clusterId = 0, dropped = 0, encryptScope = 0;
373✔
521
  char      file[TSDB_MOUNT_FPATH_LEN] = {0};
373✔
522
  SArray   *pDisks = NULL;
373✔
523
  // step 1: fetch clusterId from dnode.json
524
  (void)snprintf(file, sizeof(file), "%s%s%s%sdnode.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP);
373✔
525
  TAOS_CHECK_EXIT(taosStatFile(file, &size, NULL, NULL));
373✔
526
  TSDB_CHECK_NULL((pFile = taosOpenFile(file, TD_FILE_READ)), code, lino, _exit, terrno);
373✔
527
  TSDB_CHECK_NULL((content = taosMemoryMalloc(size + 1)), code, lino, _exit, terrno);
373✔
528
  if (taosReadFile(pFile, content, size) != size) {
373✔
UNCOV
529
    TAOS_CHECK_EXIT(terrno);
×
530
  }
531
  content[size] = '\0';
373✔
532
  pJson = tjsonParse(content);
373✔
533
  if (pJson == NULL) {
373✔
UNCOV
534
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
535
  }
536
  tjsonGetNumberValue(pJson, "dropped", dropped, code);
373✔
537
  TAOS_CHECK_EXIT(code);
373✔
538
  if (dropped == 1) {
373✔
UNCOV
539
    TAOS_CHECK_EXIT(TSDB_CODE_MND_MOUNT_DNODE_DROPPED);
×
540
  }
541
  tjsonGetNumberValue(pJson, "encryptScope", encryptScope, code);
373✔
542
  TAOS_CHECK_EXIT(code);
373✔
543
  if (encryptScope != 0) {
373✔
UNCOV
544
    TAOS_CHECK_EXIT(TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG);
×
545
  }
546
  tjsonGetNumberValue(pJson, "clusterId", clusterId, code);
373✔
547
  TAOS_CHECK_EXIT(code);
373✔
548
  if (clusterId == 0) {
373✔
UNCOV
549
    TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_CLUSTER_ID);
×
550
  }
551
  pMountInfo->clusterId = clusterId;
373✔
552
  if (content != NULL) taosMemoryFreeClear(content);
373✔
553
  if (pJson != NULL) {
373✔
554
    cJSON_Delete(pJson);
373✔
555
    pJson = NULL;
373✔
556
  }
557
  if (pFile != NULL) taosCloseFile(&pFile);
373✔
558
  // step 2: fetch dataDir from dnode/config/local.json
559
  TAOS_CHECK_EXIT(vmGetMountDisks(pMgmt, pReq->mountPath, &pDisks));
373✔
560
  int32_t nDisks = taosArrayGetSize(pDisks);
373✔
561
  if (nDisks < 1 || nDisks > TFS_MAX_DISKS) {
373✔
UNCOV
562
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
563
  }
564
  for (int32_t i = 0; i < nDisks; ++i) {
2,611✔
565
    SDiskCfg *pDisk = TARRAY_GET_ELEM(pDisks, i);
2,238✔
566
    if (!pMountInfo->pDisks[pDisk->level]) {
2,238✔
567
      pMountInfo->pDisks[pDisk->level] = taosArrayInit(1, sizeof(char *));
746✔
568
      if (!pMountInfo->pDisks[pDisk->level]) {
746✔
UNCOV
569
        TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
570
      }
571
    }
572
    char *pDir = taosStrdup(pDisk->dir);
2,238✔
573
    if (pDir == NULL) {
2,238✔
UNCOV
574
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
575
    }
576
    if (pDisk->primary == 1 && taosArrayGetSize(pMountInfo->pDisks[0])) {
2,238✔
577
      // put the primary disk to the first position of level 0
578
      if (!taosArrayInsert(pMountInfo->pDisks[0], 0, &pDir)) {
373✔
UNCOV
579
        taosMemFree(pDir);
×
UNCOV
580
        TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
581
      }
582
    } else if (!taosArrayPush(pMountInfo->pDisks[pDisk->level], &pDir)) {
3,730✔
583
      taosMemFree(pDir);
×
584
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
585
    }
586
  }
587
  for (int32_t i = 0; i < TFS_MAX_TIERS; ++i) {
1,492✔
588
    int32_t nDisk = taosArrayGetSize(pMountInfo->pDisks[i]);
1,119✔
589
    if (nDisk < (i == 0 ? 1 : 0) || nDisk > TFS_MAX_DISKS_PER_TIER) {
1,119✔
UNCOV
590
      dError("mount:%s, invalid disk number:%d at level:%d", pReq->mountName, nDisk, i);
×
UNCOV
591
      TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
592
    }
593
  }
594
_exit:
373✔
595
  if (content != NULL) taosMemoryFreeClear(content);
373✔
596
  if (pJson != NULL) cJSON_Delete(pJson);
373✔
597
  if (pFile != NULL) taosCloseFile(&pFile);
373✔
598
  if (pDisks != NULL) {
373✔
599
    taosArrayDestroy(pDisks);
373✔
600
    pDisks = NULL;
373✔
601
  }
602
  if (code != 0) {
373✔
UNCOV
603
    dError("mount:%s, failed to retrieve mount dnode at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
604
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
605
  } else {
606
    dInfo("mount:%s, success to retrieve mount dnode on dnode:%d, clusterId:%" PRId64 ", path:%s", pReq->mountName,
373✔
607
          pReq->dnodeId, pMountInfo->clusterId, pReq->mountPath);
608
  }
609
  TAOS_RETURN(code);
373✔
610
}
611

612
static int32_t vmRetrieveMountVnodes(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
373✔
613
  int32_t       code = 0, lino = 0;
373✔
614
  SWrapperCfg  *pCfgs = NULL;
373✔
615
  int32_t       numOfVnodes = 0;
373✔
616
  char          path[TSDB_MOUNT_FPATH_LEN] = {0};
373✔
617
  TdDirPtr      pDir = NULL;
373✔
618
  TdDirEntryPtr de = NULL;
373✔
619
  SVnodeMgmt    vnodeMgmt = {0};
373✔
620
  SArray       *pVgCfgs = NULL;
373✔
621
  SArray       *pDbInfos = NULL;
373✔
622
  SArray       *pDiskPrimarys = NULL;
373✔
623

624
  snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
373✔
625
  vnodeMgmt.path = path;
373✔
626
  TAOS_CHECK_EXIT(vmGetVnodeListFromFile(&vnodeMgmt, &pCfgs, &numOfVnodes));
373✔
627
  dInfo("mount:%s, num of vnodes is %d in path:%s", pReq->mountName, numOfVnodes, vnodeMgmt.path);
373✔
628
  TSDB_CHECK_NULL((pVgCfgs = taosArrayInit_s(sizeof(SVnodeInfo), numOfVnodes)), code, lino, _exit, terrno);
373✔
629
  TSDB_CHECK_NULL((pDiskPrimarys = taosArrayInit(numOfVnodes, sizeof(SMountDbVgId))), code, lino, _exit, terrno);
373✔
630

631
  int32_t nDiskLevel0 = taosArrayGetSize(pMountInfo->pDisks[0]);
373✔
632
  int32_t nVgDropped = 0, j = 0;
373✔
633
  for (int32_t i = 0; i < numOfVnodes; ++i) {
1,865✔
634
    SWrapperCfg *pCfg = &pCfgs[i];
1,492✔
635
    // in order to support multi-tier disk, the pCfg->path should be adapted according to the diskPrimary firstly
636
    if (nDiskLevel0 > 1) {
1,492✔
637
      char *pDir = taosArrayGet(pMountInfo->pDisks[0], pCfg->diskPrimary);
1,492✔
638
      if (!pDir) TAOS_CHECK_EXIT(TSDB_CODE_INTERNAL_ERROR);
1,492✔
639
      (void)snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%svnode%d", *(char **)pDir, TD_DIRSEP, TD_DIRSEP,
1,492✔
640
                     pCfg->vgId);
641
    }
642
    dInfo("mount:%s, vnode path:%s, dropped:%" PRIi8, pReq->mountName, pCfg->path, pCfg->dropped);
1,492✔
643
    if (pCfg->dropped) {
1,492✔
UNCOV
644
      ++nVgDropped;
×
UNCOV
645
      continue;
×
646
    }
647
    if (!taosCheckAccessFile(pCfg->path, TD_FILE_ACCESS_EXIST_OK | TD_FILE_ACCESS_READ_OK | TD_FILE_ACCESS_WRITE_OK)) {
1,492✔
648
      dError("mount:%s, vnode path:%s, no r/w authority", pReq->mountName, pCfg->path);
×
649
      TAOS_CHECK_EXIT(TSDB_CODE_MND_NO_RIGHTS);
×
650
    }
651
    SVnodeInfo *pInfo = TARRAY_GET_ELEM(pVgCfgs, j++);
1,492✔
652
    TAOS_CHECK_EXIT(vnodeLoadInfo(pCfg->path, pInfo));
1,492✔
653
    if (pInfo->config.syncCfg.replicaNum > 1) {
1,492✔
UNCOV
654
      dError("mount:%s, vnode path:%s, invalid replica:%d", pReq->mountName, pCfg->path,
×
655
             pInfo->config.syncCfg.replicaNum);
UNCOV
656
      TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_REPLICA);
×
657
    } else if (pInfo->config.vgId != pCfg->vgId) {
1,492✔
658
      dError("mount:%s, vnode path:%s, vgId:%d not match:%d", pReq->mountName, pCfg->path, pInfo->config.vgId,
×
659
             pCfg->vgId);
660
      TAOS_CHECK_EXIT(TSDB_CODE_FILE_CORRUPTED);
×
661
    } else if (pInfo->config.tdbEncryptData.encryptAlgrName[0] != '\0' ||
1,492✔
662
               pInfo->config.tsdbCfg.encryptData.encryptAlgrName[0] != '\0' ||
1,492✔
663
               pInfo->config.walCfg.encryptData.encryptAlgrName[0] != '\0') {
1,492✔
664
      dError("mount:%s, vnode path:%s, invalid encrypt algorithm, tdb:%s wal:%s tsdb:%s", pReq->mountName, pCfg->path,
×
665
             pInfo->config.tdbEncryptData.encryptAlgrName, pInfo->config.walCfg.encryptData.encryptAlgrName,
666
             pInfo->config.tsdbCfg.encryptData.encryptAlgrName);
UNCOV
667
      TAOS_CHECK_EXIT(TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG);
×
668
    }
669
    SMountDbVgId dbVgId = {.dbId = pInfo->config.dbId, .vgId = pInfo->config.vgId, .diskPrimary = pCfg->diskPrimary};
1,492✔
670
    TSDB_CHECK_NULL(taosArrayPush(pDiskPrimarys, &dbVgId), code, lino, _exit, terrno);
1,492✔
671
  }
672
  if (nVgDropped > 0) {
373✔
UNCOV
673
    dInfo("mount:%s, %d vnodes are dropped", pReq->mountName, nVgDropped);
×
UNCOV
674
    int32_t nVgToDrop = taosArrayGetSize(pVgCfgs) - nVgDropped;
×
UNCOV
675
    if (nVgToDrop > 0) taosArrayRemoveBatch(pVgCfgs, nVgToDrop - 1, nVgToDrop, NULL);
×
676
  }
677
  int32_t nVgCfg = taosArrayGetSize(pVgCfgs);
373✔
678
  int32_t nDiskPrimary = taosArrayGetSize(pDiskPrimarys);
373✔
679
  if (nVgCfg != nDiskPrimary) {
373✔
UNCOV
680
    dError("mount:%s, nVgCfg:%d not match nDiskPrimary:%d", pReq->mountName, nVgCfg, nDiskPrimary);
×
UNCOV
681
    TAOS_CHECK_EXIT(TSDB_CODE_APP_ERROR);
×
682
  }
683
  if (nVgCfg > 1) {
373✔
684
    taosArraySort(pVgCfgs, compareVnodeInfo);
373✔
685
    taosArraySort(pDiskPrimarys, compareVgDiskPrimary);
373✔
686
  }
687

688
  int64_t clusterId = pMountInfo->clusterId;
373✔
689
  int64_t dbId = 0, vgId = 0, nDb = 0;
373✔
690
  for (int32_t i = 0; i < nVgCfg; ++i) {
1,353✔
691
    SVnodeInfo *pInfo = TARRAY_GET_ELEM(pVgCfgs, i);
1,108✔
692
    if (clusterId != pInfo->config.syncCfg.nodeInfo->clusterId) {
1,108✔
693
      dError("mount:%s, clusterId:%" PRId64 " not match:%" PRId64, pReq->mountName, clusterId,
128✔
694
             pInfo->config.syncCfg.nodeInfo->clusterId);
695
      TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_CLUSTER_ID);
128✔
696
    }
697
    if (dbId != pInfo->config.dbId) {
980✔
698
      dbId = pInfo->config.dbId;
490✔
699
      ++nDb;
490✔
700
    }
701
    if (vgId == pInfo->config.vgId) {
980✔
UNCOV
702
      TAOS_CHECK_EXIT(TSDB_CODE_FILE_CORRUPTED);
×
703
    } else {
704
      vgId = pInfo->config.vgId;
980✔
705
    }
706
  }
707

708
  if (nDb > 0) {
245✔
709
    TSDB_CHECK_NULL((pDbInfos = taosArrayInit_s(sizeof(SMountDbInfo), nDb)), code, lino, _exit, terrno);
245✔
710
    int32_t dbIdx = -1;
245✔
711
    for (int32_t i = 0; i < nVgCfg; ++i) {
1,225✔
712
      SVnodeInfo   *pVgCfg = TARRAY_GET_ELEM(pVgCfgs, i);
980✔
713
      SMountDbVgId *pDiskPrimary = TARRAY_GET_ELEM(pDiskPrimarys, i);
980✔
714
      SMountDbInfo *pDbInfo = NULL;
980✔
715
      if (i == 0 || ((SMountDbInfo *)TARRAY_GET_ELEM(pDbInfos, dbIdx))->dbId != pVgCfg->config.dbId) {
980✔
716
        pDbInfo = TARRAY_GET_ELEM(pDbInfos, ++dbIdx);
490✔
717
        pDbInfo->dbId = pVgCfg->config.dbId;
490✔
718
        snprintf(pDbInfo->dbName, sizeof(pDbInfo->dbName), "%s", pVgCfg->config.dbname);
490✔
719
        TSDB_CHECK_NULL((pDbInfo->pVgs = taosArrayInit(nVgCfg / nDb, sizeof(SMountVgInfo))), code, lino, _exit, terrno);
490✔
720
      } else {
721
        pDbInfo = TARRAY_GET_ELEM(pDbInfos, dbIdx);
490✔
722
      }
723
      SMountVgInfo vgInfo = {
980✔
724
          .diskPrimary = pDiskPrimary->diskPrimary,
980✔
725
          .vgId = pVgCfg->config.vgId,
980✔
726
          .dbId = pVgCfg->config.dbId,
980✔
727
          .cacheLastSize = pVgCfg->config.cacheLastSize,
980✔
728
          .szPage = pVgCfg->config.szPage,
980✔
729
          .szCache = pVgCfg->config.szCache,
980✔
730
          .szBuf = pVgCfg->config.szBuf,
980✔
731
          .cacheLast = pVgCfg->config.cacheLast,
980✔
732
          .standby = pVgCfg->config.standby,
980✔
733
          .hashMethod = pVgCfg->config.hashMethod,
980✔
734
          .hashBegin = pVgCfg->config.hashBegin,
980✔
735
          .hashEnd = pVgCfg->config.hashEnd,
980✔
736
          .hashPrefix = pVgCfg->config.hashPrefix,
980✔
737
          .hashSuffix = pVgCfg->config.hashSuffix,
980✔
738
          .sttTrigger = pVgCfg->config.sttTrigger,
980✔
739
          .replications = pVgCfg->config.syncCfg.replicaNum,
980✔
740
          .precision = pVgCfg->config.tsdbCfg.precision,
980✔
741
          .compression = pVgCfg->config.tsdbCfg.compression,
980✔
742
          .slLevel = pVgCfg->config.tsdbCfg.slLevel,
980✔
743
          .daysPerFile = pVgCfg->config.tsdbCfg.days,
980✔
744
          .keep0 = pVgCfg->config.tsdbCfg.keep0,
980✔
745
          .keep1 = pVgCfg->config.tsdbCfg.keep1,
980✔
746
          .keep2 = pVgCfg->config.tsdbCfg.keep2,
980✔
747
          .keepTimeOffset = pVgCfg->config.tsdbCfg.keepTimeOffset,
980✔
748
          .minRows = pVgCfg->config.tsdbCfg.minRows,
980✔
749
          .maxRows = pVgCfg->config.tsdbCfg.maxRows,
980✔
750
          .tsdbPageSize = pVgCfg->config.tsdbPageSize / 1024,
980✔
751
          .ssChunkSize = pVgCfg->config.ssChunkSize,
980✔
752
          .ssKeepLocal = pVgCfg->config.ssKeepLocal,
980✔
753
          .ssCompact = pVgCfg->config.ssCompact,
980✔
754
          .walFsyncPeriod = pVgCfg->config.walCfg.fsyncPeriod,
980✔
755
          .walRetentionPeriod = pVgCfg->config.walCfg.retentionPeriod,
980✔
756
          .walRollPeriod = pVgCfg->config.walCfg.rollPeriod,
980✔
757
          .walRetentionSize = pVgCfg->config.walCfg.retentionSize,
980✔
758
          .walSegSize = pVgCfg->config.walCfg.segSize,
980✔
759
          .walLevel = pVgCfg->config.walCfg.level,
980✔
760
          .isAudit = pVgCfg->config.isAudit,
980✔
761
          .allowDrop = pVgCfg->config.allowDrop,
980✔
762
          //.encryptAlgorithm = pVgCfg->config.walCfg.encryptAlgorithm,
763
          .committed = pVgCfg->state.committed,
980✔
764
          .commitID = pVgCfg->state.commitID,
980✔
765
          .commitTerm = pVgCfg->state.commitTerm,
980✔
766
          .numOfSTables = pVgCfg->config.vndStats.numOfSTables,
980✔
767
          .numOfCTables = pVgCfg->config.vndStats.numOfCTables,
980✔
768
          .numOfNTables = pVgCfg->config.vndStats.numOfNTables,
980✔
769
      };
770
      TSDB_CHECK_NULL(taosArrayPush(pDbInfo->pVgs, &vgInfo), code, lino, _exit, terrno);
1,960✔
771
    }
772
  }
773

774
  pMountInfo->pDbs = pDbInfos;
245✔
775

776
_exit:
373✔
777
  if (code != 0) {
373✔
778
    dError("mount:%s, failed to retrieve mount vnode at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
128✔
779
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
780
  }
781
  taosArrayDestroy(pDiskPrimarys);
373✔
782
  taosArrayDestroy(pVgCfgs);
373✔
783
  taosMemoryFreeClear(pCfgs);
373✔
784
  TAOS_RETURN(code);
373✔
785
}
786

787
/**
788
 *   Retrieve the stables from vnode meta.
789
 */
790
static int32_t vmRetrieveMountStbs(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
245✔
791
  int32_t code = 0, lino = 0;
245✔
792
  char    path[TSDB_MOUNT_FPATH_LEN] = {0};
245✔
793
  int32_t nDb = taosArrayGetSize(pMountInfo->pDbs);
245✔
794
  SArray *suidList = NULL;
245✔
795
  SArray *pCols = NULL;
245✔
796
  SArray *pTags = NULL;
245✔
797
  SArray *pColExts = NULL;
245✔
798
  SArray *pTagExts = NULL;
245✔
799

800
  snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
245✔
801
  for (int32_t i = 0; i < nDb; ++i) {
735✔
802
    SMountDbInfo *pDbInfo = TARRAY_GET_ELEM(pMountInfo->pDbs, i);
490✔
803
    int32_t       nVg = taosArrayGetSize(pDbInfo->pVgs);
490✔
804
    for (int32_t j = 0; j < nVg; ++j) {
490✔
805
      SMountVgInfo *pVgInfo = TARRAY_GET_ELEM(pDbInfo->pVgs, j);
490✔
806
      SVnode        vnode = {
490✔
807
                 .config.vgId = pVgInfo->vgId,
490✔
808
                 .config.dbId = pVgInfo->dbId,
490✔
809
                 .config.cacheLastSize = pVgInfo->cacheLastSize,
490✔
810
                 .config.szPage = pVgInfo->szPage,
490✔
811
                 .config.szCache = pVgInfo->szCache,
490✔
812
                 .config.szBuf = pVgInfo->szBuf,
490✔
813
                 .config.cacheLast = pVgInfo->cacheLast,
490✔
814
                 .config.standby = pVgInfo->standby,
490✔
815
                 .config.hashMethod = pVgInfo->hashMethod,
490✔
816
                 .config.hashBegin = pVgInfo->hashBegin,
490✔
817
                 .config.hashEnd = pVgInfo->hashEnd,
490✔
818
                 .config.hashPrefix = pVgInfo->hashPrefix,
490✔
819
                 .config.hashSuffix = pVgInfo->hashSuffix,
490✔
820
                 .config.sttTrigger = pVgInfo->sttTrigger,
490✔
821
                 .config.syncCfg.replicaNum = pVgInfo->replications,
490✔
822
                 .config.tsdbCfg.precision = pVgInfo->precision,
490✔
823
                 .config.tsdbCfg.compression = pVgInfo->compression,
490✔
824
                 .config.tsdbCfg.slLevel = pVgInfo->slLevel,
490✔
825
                 .config.tsdbCfg.days = pVgInfo->daysPerFile,
490✔
826
                 .config.tsdbCfg.keep0 = pVgInfo->keep0,
490✔
827
                 .config.tsdbCfg.keep1 = pVgInfo->keep1,
490✔
828
                 .config.tsdbCfg.keep2 = pVgInfo->keep2,
490✔
829
                 .config.tsdbCfg.keepTimeOffset = pVgInfo->keepTimeOffset,
490✔
830
                 .config.tsdbCfg.minRows = pVgInfo->minRows,
490✔
831
                 .config.tsdbCfg.maxRows = pVgInfo->maxRows,
490✔
832
                 .config.tsdbPageSize = pVgInfo->tsdbPageSize,
490✔
833
                 .config.ssChunkSize = pVgInfo->ssChunkSize,
490✔
834
                 .config.ssKeepLocal = pVgInfo->ssKeepLocal,
490✔
835
                 .config.ssCompact = pVgInfo->ssCompact,
490✔
836
                 .config.isAudit = pVgInfo->isAudit,
490✔
837
                 .config.allowDrop = pVgInfo->allowDrop,
490✔
838
                 .config.walCfg.fsyncPeriod = pVgInfo->walFsyncPeriod,
490✔
839
                 .config.walCfg.retentionPeriod = pVgInfo->walRetentionPeriod,
490✔
840
                 .config.walCfg.rollPeriod = pVgInfo->walRollPeriod,
490✔
841
                 .config.walCfg.retentionSize = pVgInfo->walRetentionSize,
490✔
842
                 .config.walCfg.segSize = pVgInfo->walSegSize,
490✔
843
                 .config.walCfg.level = pVgInfo->walLevel,
490✔
844
          //.config.walCfg.encryptAlgorithm = pVgInfo->encryptAlgorithm,
845
                 .diskPrimary = pVgInfo->diskPrimary,
490✔
846
      };
847
      void *vnodePath = taosArrayGet(pMountInfo->pDisks[0], pVgInfo->diskPrimary);
490✔
848
      snprintf(path, sizeof(path), "%s%s%s%svnode%d", *(char **)vnodePath, TD_DIRSEP, dmNodeName(VNODE), TD_DIRSEP,
490✔
849
               pVgInfo->vgId);
850
      vnode.path = path;
490✔
851

852
      int32_t rollback = vnodeShouldRollback(&vnode);
490✔
853
      if ((code = metaOpen(&vnode, &vnode.pMeta, rollback)) != 0) {
490✔
UNCOV
854
        dError("mount:%s, failed to retrieve stbs of vnode:%d for db:%" PRId64 " on dnode:%d since %s, path:%s",
×
855
               pReq->mountName, pVgInfo->vgId, pVgInfo->dbId, pReq->dnodeId, tstrerror(code), path);
UNCOV
856
        TAOS_CHECK_EXIT(code);
×
857
      } else {
858
        dInfo("mount:%s, success to retrieve stbs of vnode:%d for db:%" PRId64 " on dnode:%d, path:%s", pReq->mountName,
490✔
859
              pVgInfo->vgId, pVgInfo->dbId, pReq->dnodeId, path);
860

861
        SMetaReader mr = {0};
490✔
862
        tb_uid_t    suid = 0;
490✔
863
        SMeta      *pMeta = vnode.pMeta;
490✔
864

865
        metaReaderDoInit(&mr, pMeta, META_READER_LOCK);
490✔
866
        if (!suidList && !(suidList = taosArrayInit(1, sizeof(tb_uid_t)))) {
490✔
UNCOV
867
          TSDB_CHECK_CODE(terrno, lino, _exit0);
×
868
        }
869
        taosArrayClear(suidList);
490✔
870
        TSDB_CHECK_CODE(vnodeGetStbIdList(&vnode, 0, suidList), lino, _exit0);
490✔
871
        dInfo("mount:%s, vnode:%d, db:%" PRId64 ", stbs num:%d on dnode:%d", pReq->mountName, pVgInfo->vgId,
490✔
872
              pVgInfo->dbId, (int32_t)taosArrayGetSize(suidList), pReq->dnodeId);
873
        int32_t nStbs = taosArrayGetSize(suidList);
490✔
874
        if (!pDbInfo->pStbs && !(pDbInfo->pStbs = taosArrayInit(nStbs, sizeof(void *)))) {
490✔
875
          TSDB_CHECK_CODE(terrno, lino, _exit0);
×
876
        }
877
        for (int32_t i = 0; i < nStbs; ++i) {
1,960✔
878
          suid = *(tb_uid_t *)taosArrayGet(suidList, i);
1,470✔
879
          dInfo("mount:%s, vnode:%d, db:%" PRId64 ", stb suid:%" PRIu64 " on dnode:%d", pReq->mountName, pVgInfo->vgId,
1,470✔
880
                pVgInfo->dbId, suid, pReq->dnodeId);
881
          if ((code = metaReaderGetTableEntryByUidCache(&mr, suid)) < 0) {
1,470✔
UNCOV
882
            TSDB_CHECK_CODE(code, lino, _exit0);
×
883
          }
884
          if (mr.me.uid != suid || mr.me.type != TSDB_SUPER_TABLE ||
1,470✔
885
              mr.me.colCmpr.nCols != mr.me.stbEntry.schemaRow.nCols) {
1,470✔
UNCOV
886
            dError("mount:%s, vnode:%d, db:%" PRId64 ", stb info not match, suid:%" PRIu64 " expected:%" PRIu64
×
887
                   ", type:%" PRIi8 " expected:%d, nCmprCols:%d nCols:%d on dnode:%d",
888
                   pReq->mountName, pVgInfo->vgId, pVgInfo->dbId, mr.me.uid, suid, mr.me.type, TSDB_SUPER_TABLE,
889
                   mr.me.colCmpr.nCols, mr.me.stbEntry.schemaRow.nCols, pReq->dnodeId);
890
            TSDB_CHECK_CODE(TSDB_CODE_FILE_CORRUPTED, lino, _exit0);
×
891
          }
892
          SMountStbInfo stbInfo = {
1,470✔
893
              .req.source = TD_REQ_FROM_APP,
894
              .req.suid = suid,
895
              .req.colVer = mr.me.stbEntry.schemaRow.version,
1,470✔
896
              .req.tagVer = mr.me.stbEntry.schemaTag.version,
1,470✔
897
              .req.numOfColumns = mr.me.stbEntry.schemaRow.nCols,
1,470✔
898
              .req.numOfTags = mr.me.stbEntry.schemaTag.nCols,
1,470✔
899
              .req.virtualStb = TABLE_IS_VIRTUAL(mr.me.flags) ? 1 : 0,
1,470✔
900
          };
901
          snprintf(stbInfo.req.name, sizeof(stbInfo.req.name), "%s", mr.me.name);
1,470✔
902
          if (!pCols && !(pCols = taosArrayInit(stbInfo.req.numOfColumns, sizeof(SFieldWithOptions)))) {
1,470✔
UNCOV
903
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
904
          }
905
          if (!pTags && !(pTags = taosArrayInit(stbInfo.req.numOfTags, sizeof(SField)))) {
1,470✔
UNCOV
906
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
907
          }
908

909
          if (!pColExts && !(pColExts = taosArrayInit(stbInfo.req.numOfColumns, sizeof(col_id_t)))) {
1,470✔
UNCOV
910
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
911
          }
912
          if (!pTagExts && !(pTagExts = taosArrayInit(stbInfo.req.numOfTags, sizeof(col_id_t)))) {
1,470✔
UNCOV
913
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
914
          }
915
          taosArrayClear(pCols);
1,470✔
916
          taosArrayClear(pTags);
1,470✔
917
          taosArrayClear(pColExts);
1,470✔
918
          taosArrayClear(pTagExts);
1,470✔
919
          stbInfo.req.pColumns = pCols;
1,470✔
920
          stbInfo.req.pTags = pTags;
1,470✔
921
          stbInfo.pColExts = pColExts;
1,470✔
922
          stbInfo.pTagExts = pTagExts;
1,470✔
923

924
          for (int32_t c = 0; c < stbInfo.req.numOfColumns; ++c) {
8,820✔
925
            SSchema          *pSchema = mr.me.stbEntry.schemaRow.pSchema + c;
7,350✔
926
            SColCmpr         *pColComp = mr.me.colCmpr.pColCmpr + c;
7,350✔
927
            SFieldWithOptions col = {
7,350✔
928
                .type = pSchema->type,
7,350✔
929
                .flags = pSchema->flags,
7,350✔
930
                .bytes = pSchema->bytes,
7,350✔
931
                .compress = pColComp->alg,
7,350✔
932
            };
933
            (void)snprintf(col.name, sizeof(col.name), "%s", pSchema->name);
7,350✔
934
            if (pSchema->colId != pColComp->id) {
7,350✔
UNCOV
935
              TSDB_CHECK_CODE(TSDB_CODE_FILE_CORRUPTED, lino, _exit0);
×
936
            }
937
            if (mr.me.pExtSchemas) {
7,350✔
UNCOV
938
              col.typeMod = (mr.me.pExtSchemas + c)->typeMod;
×
939
            }
940
            TSDB_CHECK_NULL(taosArrayPush(pCols, &col), code, lino, _exit0, terrno);
7,350✔
941
            TSDB_CHECK_NULL(taosArrayPush(pColExts, &pSchema->colId), code, lino, _exit0, terrno);
14,700✔
942
          }
943
          for (int32_t t = 0; t < stbInfo.req.numOfTags; ++t) {
3,430✔
944
            SSchema *pSchema = mr.me.stbEntry.schemaTag.pSchema + t;
1,960✔
945
            SField   tag = {
1,960✔
946
                  .type = pSchema->type,
1,960✔
947
                  .flags = pSchema->flags,
1,960✔
948
                  .bytes = pSchema->bytes,
1,960✔
949
            };
950
            (void)snprintf(tag.name, sizeof(tag.name), "%s", pSchema->name);
1,960✔
951
            TSDB_CHECK_NULL(taosArrayPush(pTags, &tag), code, lino, _exit0, terrno);
1,960✔
952
            TSDB_CHECK_NULL(taosArrayPush(pTagExts, &pSchema->colId), code, lino, _exit0, terrno);
3,920✔
953
          }
954
          tDecoderClear(&mr.coder);
1,470✔
955

956
          // serialize the SMountStbInfo
957
          int32_t firstPartLen = 0;
1,470✔
958
          int32_t msgLen = tSerializeSMountStbInfo(NULL, 0, &firstPartLen, &stbInfo);
1,470✔
959
          if (msgLen <= 0) {
1,470✔
UNCOV
960
            TSDB_CHECK_CODE(msgLen < 0 ? msgLen : TSDB_CODE_INTERNAL_ERROR, lino, _exit0);
×
961
          }
962
          void *pBuf = taosMemoryMalloc((sizeof(int32_t) << 1) + msgLen);  // totalLen(4)|1stPartLen(4)|1stPart|2ndPart
1,470✔
963
          if (!pBuf) TSDB_CHECK_CODE(TSDB_CODE_OUT_OF_MEMORY, lino, _exit0);
1,470✔
964
          *(int32_t *)pBuf = (sizeof(int32_t) << 1) + msgLen;
1,470✔
965
          *(int32_t *)POINTER_SHIFT(pBuf, sizeof(int32_t)) = firstPartLen;
1,470✔
966
          if (tSerializeSMountStbInfo(POINTER_SHIFT(pBuf, (sizeof(int32_t) << 1)), msgLen, NULL, &stbInfo) <= 0) {
1,470✔
UNCOV
967
            taosMemoryFree(pBuf);
×
968
            TSDB_CHECK_CODE(msgLen < 0 ? msgLen : TSDB_CODE_INTERNAL_ERROR, lino, _exit0);
×
969
          }
970
          if (!taosArrayPush(pDbInfo->pStbs, &pBuf)) {
2,940✔
UNCOV
971
            taosMemoryFree(pBuf);
×
UNCOV
972
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
973
          }
974
        }
975
      _exit0:
490✔
976
        metaReaderClear(&mr);
490✔
977
        metaClose(&vnode.pMeta);
490✔
978
        TAOS_CHECK_EXIT(code);
490✔
979
      }
980
      break;  // retrieve stbs from one vnode is enough
490✔
981
    }
982
  }
983
_exit:
245✔
984
  if (code != 0) {
245✔
UNCOV
985
    dError("mount:%s, failed to retrieve mount stbs at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
986
           pReq->dnodeId, tstrerror(code), path);
987
  }
988
  taosArrayDestroy(suidList);
245✔
989
  taosArrayDestroy(pCols);
245✔
990
  taosArrayDestroy(pTags);
245✔
991
  taosArrayDestroy(pColExts);
245✔
992
  taosArrayDestroy(pTagExts);
245✔
993
  TAOS_RETURN(code);
245✔
994
}
995

996
int32_t vmMountCheckRunning(const char *mountName, const char *mountPath, TdFilePtr *pFile, int32_t retryLimit) {
1,117✔
997
  int32_t code = 0, lino = 0;
1,117✔
998
  int32_t retryTimes = 0;
1,117✔
999
  char    filepath[PATH_MAX] = {0};
1,117✔
1000
  (void)snprintf(filepath, sizeof(filepath), "%s%s.running", mountPath, TD_DIRSEP);
1,117✔
1001
  TSDB_CHECK_NULL((*pFile = taosOpenFile(
1,117✔
1002
                       filepath, TD_FILE_CREATE | TD_FILE_READ | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_CLOEXEC)),
1003
                  code, lino, _exit, terrno);
1004
  int32_t ret = 0;
1,117✔
1005
  do {
1006
    ret = taosLockFile(*pFile);
1,373✔
1007
    if (ret == 0) break;
1,373✔
1008
    taosMsleep(1000);
384✔
1009
    ++retryTimes;
384✔
1010
    dError("mount:%s, failed to lock file:%s since %s, retryTimes:%d", mountName, filepath, tstrerror(ret), retryTimes);
384✔
1011
  } while (retryTimes < retryLimit);
384✔
1012
  TAOS_CHECK_EXIT(ret);
1,117✔
1013
_exit:
1,117✔
1014
  if (code != 0) {
1,117✔
1015
    (void)taosCloseFile(pFile);
128✔
1016
    *pFile = NULL;
128✔
1017
    dError("mount:%s, failed to check running at line %d since %s, path:%s", mountName, lino, tstrerror(code),
128✔
1018
           filepath);
1019
  }
1020
  TAOS_RETURN(code);
1,117✔
1021
}
1022

1023
static int32_t vmRetrieveMountPreCheck(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
757✔
1024
  int32_t code = 0, lino = 0;
757✔
1025
  char    path[TSDB_MOUNT_FPATH_LEN] = {0};
757✔
1026
  TSDB_CHECK_CONDITION(taosCheckAccessFile(pReq->mountPath, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
757✔
1027
  TAOS_CHECK_EXIT(vmMountCheckRunning(pReq->mountName, pReq->mountPath, &pMountInfo->pFile, 3));
629✔
1028
  (void)snprintf(path, sizeof(path), "%s%s%s%sdnode.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP);
501✔
1029
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
501✔
1030
  (void)snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(MNODE));
373✔
1031
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
373✔
1032
  (void)snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
373✔
1033
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
373✔
1034
  (void)snprintf(path, sizeof(path), "%s%s%s%sconfig%slocal.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP,
373✔
1035
           TD_DIRSEP);
1036
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
373✔
1037
_exit:
757✔
1038
  if (code != 0) {
757✔
1039
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
384✔
1040
           pReq->dnodeId, tstrerror(code), path);
1041
  }
1042
  TAOS_RETURN(code);
757✔
1043
}
1044

1045
static int32_t vmRetrieveMountPathImpl(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, SRetrieveMountPathReq *pReq,
757✔
1046
                                       SMountInfo *pMountInfo) {
1047
  int32_t code = 0, lino = 0;
757✔
1048
  pMountInfo->dnodeId = pReq->dnodeId;
757✔
1049
  pMountInfo->mountUid = pReq->mountUid;
757✔
1050
  (void)tsnprintf(pMountInfo->mountName, sizeof(pMountInfo->mountName), "%s", pReq->mountName);
757✔
1051
  (void)tsnprintf(pMountInfo->mountPath, sizeof(pMountInfo->mountPath), "%s", pReq->mountPath);
757✔
1052
  pMountInfo->ignoreExist = pReq->ignoreExist;
757✔
1053
  pMountInfo->valLen = pReq->valLen;
757✔
1054
  pMountInfo->pVal = pReq->pVal;
757✔
1055
  TAOS_CHECK_EXIT(vmRetrieveMountPreCheck(pMgmt, pReq, pMountInfo));
757✔
1056
  TAOS_CHECK_EXIT(vmRetrieveMountDnode(pMgmt, pReq, pMountInfo));
373✔
1057
  TAOS_CHECK_EXIT(vmRetrieveMountVnodes(pMgmt, pReq, pMountInfo));
373✔
1058
  TAOS_CHECK_EXIT(vmRetrieveMountStbs(pMgmt, pReq, pMountInfo));
245✔
1059
_exit:
245✔
1060
  if (code != 0) {
757✔
1061
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
512✔
1062
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
1063
  }
1064
  TAOS_RETURN(code);
757✔
1065
}
1066

1067
int32_t vmProcessRetrieveMountPathReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
757✔
1068
  int32_t               code = 0, lino = 0;
757✔
1069
  int32_t               rspCode = 0;
757✔
1070
  SVnodeMgmt            vndMgmt = {0};
757✔
1071
  SMountInfo            mountInfo = {0};
757✔
1072
  void                 *pBuf = NULL;
757✔
1073
  int32_t               bufLen = 0;
757✔
1074
  SRetrieveMountPathReq req = {0};
757✔
1075

1076
  vndMgmt = *pMgmt;
757✔
1077
  vndMgmt.path = NULL;
757✔
1078
  TAOS_CHECK_GOTO(tDeserializeSRetrieveMountPathReq(pMsg->pCont, pMsg->contLen, &req), &lino, _end);
757✔
1079
  dInfo("mount:%s, start to retrieve path:%s", req.mountName, req.mountPath);
757✔
1080
  TAOS_CHECK_GOTO(vmRetrieveMountPathImpl(&vndMgmt, pMsg, &req, &mountInfo), &lino, _end);
757✔
1081
_end:
757✔
1082
  TSDB_CHECK_CONDITION((bufLen = tSerializeSMountInfo(NULL, 0, &mountInfo)) >= 0, rspCode, lino, _exit, bufLen);
757✔
1083
  TSDB_CHECK_CONDITION((pBuf = rpcMallocCont(bufLen)), rspCode, lino, _exit, terrno);
757✔
1084
  TSDB_CHECK_CONDITION((bufLen = tSerializeSMountInfo(pBuf, bufLen, &mountInfo)) >= 0, rspCode, lino, _exit, bufLen);
757✔
1085
  pMsg->info.rsp = pBuf;
757✔
1086
  pMsg->info.rspLen = bufLen;
757✔
1087
_exit:
757✔
1088
  if (rspCode != 0) {
757✔
1089
    // corner case: if occurs, the client will not receive the response, and the client should be killed manually
UNCOV
1090
    dError("mount:%s, failed to retrieve mount at line %d since %s, dnode:%d, path:%s", req.mountName, lino,
×
1091
           tstrerror(rspCode), req.dnodeId, req.mountPath);
UNCOV
1092
    rpcFreeCont(pBuf);
×
UNCOV
1093
    code = rspCode;
×
1094
  } else if (code != 0) {
757✔
1095
    // the client would receive the response with error msg
1096
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", req.mountName, lino,
512✔
1097
           req.dnodeId, tstrerror(code), req.mountPath);
1098
  } else {
1099
    int32_t nVgs = 0;
245✔
1100
    int32_t nDbs = taosArrayGetSize(mountInfo.pDbs);
245✔
1101
    for (int32_t i = 0; i < nDbs; ++i) {
735✔
1102
      SMountDbInfo *pDb = TARRAY_GET_ELEM(mountInfo.pDbs, i);
490✔
1103
      nVgs += taosArrayGetSize(pDb->pVgs);
490✔
1104
    }
1105
    dInfo("mount:%s, success to retrieve mount, nDbs:%d, nVgs:%d, path:%s", req.mountName, nDbs, nVgs, req.mountPath);
245✔
1106
  }
1107
  taosMemFreeClear(vndMgmt.path);
757✔
1108
  tFreeMountInfo(&mountInfo, false);
757✔
1109
  TAOS_RETURN(code);
757✔
1110
}
1111

1112
static int32_t vmMountVnode(SVnodeMgmt *pMgmt, const char *path, SVnodeCfg *pCfg, int32_t diskPrimary,
980✔
1113
                            SMountVnodeReq *req, STfs *pMountTfs) {
1114
  int32_t    code = 0;
980✔
1115
  SVnodeInfo info = {0};
980✔
1116
  char       hostDir[TSDB_FILENAME_LEN] = {0};
980✔
1117
  char       mountDir[TSDB_FILENAME_LEN] = {0};
980✔
1118
  char       mountVnode[32] = {0};
980✔
1119

1120
  if ((code = vnodeCheckCfg(pCfg)) < 0) {
980✔
UNCOV
1121
    vError("vgId:%d, mount:%s, failed to mount vnode since:%s", pCfg->vgId, req->mountName, tstrerror(code));
×
UNCOV
1122
    return code;
×
1123
  }
1124

1125
  vnodeGetPrimaryDir(path, 0, pMgmt->pTfs, hostDir, TSDB_FILENAME_LEN);
980✔
1126
  if ((code = taosMkDir(hostDir))) {
980✔
UNCOV
1127
    vError("vgId:%d, mount:%s, failed to prepare vnode dir since %s, host path: %s", pCfg->vgId, req->mountName,
×
1128
           tstrerror(code), hostDir);
1129
    return code;
×
1130
  }
1131

1132
  info.config = *pCfg;  // copy the config
980✔
1133
  info.state.committed = req->committed;
980✔
1134
  info.state.commitID = req->commitID;
980✔
1135
  info.state.commitTerm = req->commitTerm;
980✔
1136
  info.state.applied = req->committed;
980✔
1137
  info.state.applyTerm = req->commitTerm;
980✔
1138
  info.config.vndStats.numOfSTables = req->numOfSTables;
980✔
1139
  info.config.vndStats.numOfCTables = req->numOfCTables;
980✔
1140
  info.config.vndStats.numOfNTables = req->numOfNTables;
980✔
1141

1142
  SVnodeInfo oldInfo = {0};
980✔
1143
  oldInfo.config = vnodeCfgDefault;
980✔
1144
  if (vnodeLoadInfo(hostDir, &oldInfo) == 0) {
980✔
UNCOV
1145
    if (oldInfo.config.dbId != info.config.dbId) {
×
UNCOV
1146
      code = TSDB_CODE_VND_ALREADY_EXIST_BUT_NOT_MATCH;
×
UNCOV
1147
      vError("vgId:%d, mount:%s, vnode config info already exists at %s. oldDbId:%" PRId64 "(%s) at cluster:%" PRId64
×
1148
             ", newDbId:%" PRId64 "(%s) at cluser:%" PRId64 ", code:%s",
1149
             oldInfo.config.vgId, req->mountName, hostDir, oldInfo.config.dbId, oldInfo.config.dbname,
1150
             oldInfo.config.syncCfg.nodeInfo[oldInfo.config.syncCfg.myIndex].clusterId, info.config.dbId,
1151
             info.config.dbname, info.config.syncCfg.nodeInfo[info.config.syncCfg.myIndex].clusterId, tstrerror(code));
1152

1153
    } else {
1154
      vWarn("vgId:%d, mount:%s, vnode config info already exists at %s.", oldInfo.config.vgId, req->mountName, hostDir);
×
1155
    }
UNCOV
1156
    return code;
×
1157
  }
1158

1159
  char hostSubDir[TSDB_FILENAME_LEN] = {0};
980✔
1160
  char mountSubDir[TSDB_FILENAME_LEN] = {0};
980✔
1161
  (void)snprintf(mountVnode, sizeof(mountVnode), "vnode%svnode%d", TD_DIRSEP, req->mountVgId);
980✔
1162
  vnodeGetPrimaryDir(mountVnode, diskPrimary, pMountTfs, mountDir, TSDB_FILENAME_LEN);
980✔
1163
  static const char *vndSubDirs[] = {"meta", "sync", "tq", "tsdb", "wal"};
1164
  for (int32_t i = 0; i < tListLen(vndSubDirs); ++i) {
5,880✔
1165
    (void)snprintf(hostSubDir, sizeof(hostSubDir), "%s%s%s", hostDir, TD_DIRSEP, vndSubDirs[i]);
4,900✔
1166
    (void)snprintf(mountSubDir, sizeof(mountSubDir), "%s%s%s", mountDir, TD_DIRSEP, vndSubDirs[i]);
4,900✔
1167
    if ((code = taosSymLink(mountSubDir, hostSubDir)) != 0) {
4,900✔
UNCOV
1168
      vError("vgId:%d, mount:%s, failed to create vnode symlink %s -> %s since %s", info.config.vgId, req->mountName,
×
1169
             mountSubDir, hostSubDir, tstrerror(code));
UNCOV
1170
      return code;
×
1171
    }
1172
  }
1173
  vInfo("vgId:%d, mount:save vnode config while create", info.config.vgId);
980✔
1174
  if ((code = vnodeSaveInfo(hostDir, &info)) < 0 || (code = vnodeCommitInfo(hostDir)) < 0) {
980✔
UNCOV
1175
    vError("vgId:%d, mount:%s, failed to save vnode config since %s, mount path: %s", pCfg ? pCfg->vgId : 0,
×
1176
           req->mountName, tstrerror(code), hostDir);
UNCOV
1177
    return code;
×
1178
  }
1179
  vInfo("vgId:%d, mount:%s, vnode is mounted from %s to %s", info.config.vgId, req->mountName, mountDir, hostDir);
980✔
1180
  return 0;
980✔
1181
}
1182

1183
int32_t vmProcessMountVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
980✔
1184
  int32_t          code = 0, lino = 0;
980✔
1185
  SMountVnodeReq   req = {0};
980✔
1186
  SCreateVnodeReq *pCreateReq = &req.createReq;
980✔
1187
  SVnodeCfg        vnodeCfg = {0};
980✔
1188
  SWrapperCfg      wrapperCfg = {0};
980✔
1189
  SVnode          *pImpl = NULL;
980✔
1190
  STfs            *pMountTfs = NULL;
980✔
1191
  char             path[TSDB_FILENAME_LEN] = {0};
980✔
1192
  bool             releaseTfs = false;
980✔
1193

1194
  if (tDeserializeSMountVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
980✔
UNCOV
1195
    dError("vgId:%d, failed to mount vnode since deserialize request error", pCreateReq->vgId);
×
UNCOV
1196
    return TSDB_CODE_INVALID_MSG;
×
1197
  }
1198

1199
  if (pCreateReq->learnerReplica == 0) {
980✔
1200
    pCreateReq->learnerSelfIndex = -1;
980✔
1201
  }
1202
  for (int32_t i = 0; i < pCreateReq->replica; ++i) {
1,960✔
1203
    dInfo("mount:%s, vgId:%d, replica:%d ep:%s:%u dnode:%d", req.mountName, pCreateReq->vgId, i,
980✔
1204
          pCreateReq->replicas[i].fqdn, pCreateReq->replicas[i].port, pCreateReq->replicas[i].id);
1205
  }
1206
  for (int32_t i = 0; i < pCreateReq->learnerReplica; ++i) {
980✔
UNCOV
1207
    dInfo("mount:%s, vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.mountName, pCreateReq->vgId, i,
×
1208
          pCreateReq->learnerReplicas[i].fqdn, pCreateReq->learnerReplicas[i].port, pCreateReq->replicas[i].id);
1209
  }
1210

1211
  SReplica *pReplica = NULL;
980✔
1212
  if (pCreateReq->selfIndex != -1) {
980✔
1213
    pReplica = &pCreateReq->replicas[pCreateReq->selfIndex];
980✔
1214
  } else {
1215
    pReplica = &pCreateReq->learnerReplicas[pCreateReq->learnerSelfIndex];
×
1216
  }
1217
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
980✔
1218
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
980✔
UNCOV
1219
    (void)tFreeSMountVnodeReq(&req);
×
UNCOV
1220
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1221
    dError("mount:%s, vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode, reason:%s", req.mountName,
×
1222
           pCreateReq->vgId, pReplica->id, pReplica->fqdn, pReplica->port, tstrerror(code));
1223
    return code;
×
1224
  }
1225
  
1226
  if (taosWaitCfgKeyLoaded() != 0) {
980✔
1227
    (void)tFreeSMountVnodeReq(&req);
×
1228
    code = terrno;
×
1229
    dError("mount:%s, vgId:%d, failed to create vnode since encrypt key is not loaded, reason:%s", req.mountName,
×
1230
           pCreateReq->vgId, tstrerror(code));
1231
    return code;
×
1232
  }
1233

1234
  vmGenerateVnodeCfg(pCreateReq, &vnodeCfg);
980✔
1235
  vnodeCfg.mountVgId = req.mountVgId;
980✔
1236
  vmGenerateWrapperCfg(pMgmt, pCreateReq, &wrapperCfg);
980✔
1237
  wrapperCfg.mountId = req.mountId;
980✔
1238

1239
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, pCreateReq->vgId, false);
980✔
1240
  if (pVnode != NULL && (pCreateReq->replica == 1 || !pVnode->failed)) {
980✔
UNCOV
1241
    dError("mount:%s, vgId:%d, already exist", req.mountName, pCreateReq->vgId);
×
UNCOV
1242
    (void)tFreeSMountVnodeReq(&req);
×
UNCOV
1243
    vmReleaseVnode(pMgmt, pVnode);
×
UNCOV
1244
    code = TSDB_CODE_VND_ALREADY_EXIST;
×
UNCOV
1245
    return 0;
×
1246
  }
1247
  vmReleaseVnode(pMgmt, pVnode);
980✔
1248

1249
  wrapperCfg.diskPrimary = req.diskPrimary;
980✔
1250
  (void)snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
980✔
1251
  TAOS_CHECK_EXIT(vmAcquireMountTfs(pMgmt, req.mountId, req.mountName, req.mountPath, &pMountTfs));
980✔
1252
  releaseTfs = true;
980✔
1253

1254
  TAOS_CHECK_EXIT(vmMountVnode(pMgmt, path, &vnodeCfg, wrapperCfg.diskPrimary, &req, pMountTfs));
980✔
1255
  if (!(pImpl = vnodeOpen(path, 0, pMgmt->pTfs, pMountTfs, pMgmt->msgCb, true))) {
980✔
UNCOV
1256
    TAOS_CHECK_EXIT(terrno != 0 ? terrno : -1);
×
1257
  }
1258
  if ((code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl)) != 0) {
980✔
UNCOV
1259
    TAOS_CHECK_EXIT(terrno != 0 ? terrno : code);
×
1260
  }
1261
  TAOS_CHECK_EXIT(vnodeStart(pImpl));
980✔
1262
  TAOS_CHECK_EXIT(vmWriteVnodeListToFile(pMgmt));
980✔
1263
  TAOS_CHECK_EXIT(vmWriteMountListToFile(pMgmt));
980✔
1264
_exit:
980✔
1265
  vmCleanPrimaryDisk(pMgmt, pCreateReq->vgId);
980✔
1266
  if (code != 0) {
980✔
1267
    dError("mount:%s, vgId:%d, msgType:%s, failed at line %d to mount vnode since %s", req.mountName, pCreateReq->vgId,
×
1268
           TMSG_INFO(pMsg->msgType), lino, tstrerror(code));
UNCOV
1269
    vmCloseFailedVnode(pMgmt, pCreateReq->vgId);
×
UNCOV
1270
    vnodeClose(pImpl);
×
UNCOV
1271
    vnodeDestroy(0, path, pMgmt->pTfs, 0);
×
UNCOV
1272
    if (releaseTfs) vmReleaseMountTfs(pMgmt, req.mountId, 1);
×
1273
  } else {
1274
    dInfo("mount:%s, vgId:%d, msgType:%s, success to mount vnode", req.mountName, pCreateReq->vgId,
980✔
1275
          TMSG_INFO(pMsg->msgType));
1276
  }
1277

1278
  pMsg->code = code;
980✔
1279
  pMsg->info.rsp = NULL;
980✔
1280
  pMsg->info.rspLen = 0;
980✔
1281

1282
  (void)tFreeSMountVnodeReq(&req);
980✔
1283
  TAOS_RETURN(code);
980✔
1284
}
1285
#endif  // USE_MOUNT
1286

1287
// alter replica doesn't use this, but restore dnode still use this
1288
int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
2,000,714✔
1289
  SAlterVnodeTypeReq req = {0};
2,000,714✔
1290
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
2,000,714✔
UNCOV
1291
    terrno = TSDB_CODE_INVALID_MSG;
×
UNCOV
1292
    return -1;
×
1293
  }
1294

1295
  if (req.learnerReplicas == 0) {
1296
    req.learnerSelfIndex = -1;
1297
  }
1298

1299
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process alter-node-type-request", req.vgId,
2,000,714✔
1300
        TMSG_INFO(pMsg->msgType));
1301

1302
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
2,000,714✔
1303
  if (pVnode == NULL) {
2,000,714✔
UNCOV
1304
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
UNCOV
1305
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
UNCOV
1306
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
UNCOV
1307
    return -1;
×
1308
  }
1309

1310
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
2,000,714✔
1311
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
2,000,714✔
1312
  if (role == TAOS_SYNC_ROLE_VOTER) {
2,000,714✔
1313
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
1314
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
1315
    vmReleaseVnode(pMgmt, pVnode);
×
UNCOV
1316
    return -1;
×
1317
  }
1318

1319
  dInfo("vgId:%d, checking node catch up", req.vgId);
2,000,714✔
1320
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
2,000,714✔
1321
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
1,910,719✔
1322
    vmReleaseVnode(pMgmt, pVnode);
1,910,719✔
1323
    return -1;
1,910,719✔
1324
  }
1325

1326
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
89,995✔
1327

1328
  int32_t vgId = req.vgId;
89,995✔
1329
  dInfo("vgId:%d, start to alter vnode type replica:%d selfIndex:%d strict:%d changeVersion:%d", vgId, req.replica,
89,995✔
1330
        req.selfIndex, req.strict, req.changeVersion);
1331
  for (int32_t i = 0; i < req.replica; ++i) {
361,073✔
1332
    SReplica *pReplica = &req.replicas[i];
271,078✔
1333
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
271,078✔
1334
  }
1335
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
89,995✔
UNCOV
1336
    SReplica *pReplica = &req.learnerReplicas[i];
×
UNCOV
1337
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
×
1338
  }
1339

1340
  if (req.replica <= 0 || (req.selfIndex < 0 && req.learnerSelfIndex < 0) || req.selfIndex >= req.replica ||
89,995✔
1341
      req.learnerSelfIndex >= req.learnerReplica) {
89,995✔
UNCOV
1342
    terrno = TSDB_CODE_INVALID_MSG;
×
UNCOV
1343
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
1344
    vmReleaseVnode(pMgmt, pVnode);
×
1345
    return -1;
×
1346
  }
1347

1348
  SReplica *pReplica = NULL;
89,995✔
1349
  if (req.selfIndex != -1) {
89,995✔
1350
    pReplica = &req.replicas[req.selfIndex];
89,995✔
1351
  } else {
1352
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
×
1353
  }
1354

1355
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
89,995✔
1356
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
89,995✔
UNCOV
1357
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
UNCOV
1358
    dError("vgId:%d, dnodeId:%d ep:%s:%u in request, ep:%s:%u in local, %s", vgId, pReplica->id, pReplica->fqdn,
×
1359
           pReplica->port, tsLocalFqdn, tsServerPort, tstrerror(terrno));
1360
    vmReleaseVnode(pMgmt, pVnode);
×
UNCOV
1361
    return -1;
×
1362
  }
1363

1364
  dInfo("vgId:%d, start to close vnode", vgId);
89,995✔
1365
  SWrapperCfg wrapperCfg = {
89,995✔
1366
      .dropped = pVnode->dropped,
89,995✔
1367
      .vgId = pVnode->vgId,
89,995✔
1368
      .vgVersion = pVnode->vgVersion,
89,995✔
1369
      .diskPrimary = pVnode->diskPrimary,
89,995✔
1370
  };
1371
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
89,995✔
1372

1373
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
89,995✔
1374
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
89,995✔
1375

1376
  int32_t diskPrimary = wrapperCfg.diskPrimary;
89,995✔
1377
  char    path[TSDB_FILENAME_LEN] = {0};
89,995✔
1378
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
89,995✔
1379

1380
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
89,995✔
1381
  if (vnodeAlterReplica(path, &req, diskPrimary, pMgmt->pTfs) < 0) {
89,995✔
UNCOV
1382
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
UNCOV
1383
    return -1;
×
1384
  }
1385

1386
  dInfo("vgId:%d, begin to open vnode", vgId);
89,995✔
1387
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
89,995✔
1388
  if (pImpl == NULL) {
89,995✔
UNCOV
1389
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
1390
    return -1;
×
1391
  }
1392

1393
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
89,995✔
UNCOV
1394
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
UNCOV
1395
    return -1;
×
1396
  }
1397

1398
  if (vnodeStart(pImpl) != 0) {
89,995✔
UNCOV
1399
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
UNCOV
1400
    return -1;
×
1401
  }
1402

1403
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process alter-node-type-request, vnode config is altered",
89,995✔
1404
        req.vgId, TMSG_INFO(pMsg->msgType));
1405
  return 0;
89,995✔
1406
}
1407

1408
int32_t vmProcessCheckLearnCatchupReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
UNCOV
1409
  SCheckLearnCatchupReq req = {0};
×
UNCOV
1410
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
×
UNCOV
1411
    terrno = TSDB_CODE_INVALID_MSG;
×
UNCOV
1412
    return -1;
×
1413
  }
1414

1415
  if (req.learnerReplicas == 0) {
1416
    req.learnerSelfIndex = -1;
1417
  }
1418

1419
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process check-learner-catchup-request", req.vgId,
×
1420
        TMSG_INFO(pMsg->msgType));
1421

UNCOV
1422
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
×
UNCOV
1423
  if (pVnode == NULL) {
×
UNCOV
1424
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
UNCOV
1425
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
UNCOV
1426
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1427
    return -1;
×
1428
  }
1429

1430
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
×
1431
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
×
1432
  if (role == TAOS_SYNC_ROLE_VOTER) {
×
1433
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
1434
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
1435
    vmReleaseVnode(pMgmt, pVnode);
×
UNCOV
1436
    return -1;
×
1437
  }
1438

1439
  dInfo("vgId:%d, checking node catch up", req.vgId);
×
1440
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
×
1441
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
×
1442
    vmReleaseVnode(pMgmt, pVnode);
×
1443
    return -1;
×
1444
  }
1445

UNCOV
1446
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
×
1447

1448
  vmReleaseVnode(pMgmt, pVnode);
×
1449

1450
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process check-learner-catchup-request", req.vgId,
×
1451
        TMSG_INFO(pMsg->msgType));
1452

UNCOV
1453
  return 0;
×
1454
}
1455

1456
int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
24,937✔
1457
  SDisableVnodeWriteReq req = {0};
24,937✔
1458
  if (tDeserializeSDisableVnodeWriteReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
24,937✔
UNCOV
1459
    terrno = TSDB_CODE_INVALID_MSG;
×
UNCOV
1460
    return -1;
×
1461
  }
1462

1463
  dInfo("vgId:%d, vnode write disable:%d", req.vgId, req.disable);
24,937✔
1464

1465
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
24,937✔
1466
  if (pVnode == NULL) {
24,937✔
1467
    dError("vgId:%d, failed to disable write since %s", req.vgId, terrstr());
×
1468
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
UNCOV
1469
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
UNCOV
1470
    return -1;
×
1471
  }
1472

1473
  pVnode->disable = req.disable;
24,937✔
1474
  vmReleaseVnode(pMgmt, pVnode);
24,937✔
1475
  return 0;
24,937✔
1476
}
1477

1478
int32_t vmProcessSetKeepVersionReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
3,216✔
1479
  SMsgHead *pHead = pMsg->pCont;
3,216✔
1480
  pHead->contLen = ntohl(pHead->contLen);
3,216✔
1481
  pHead->vgId = ntohl(pHead->vgId);
3,216✔
1482

1483
  SVndSetKeepVersionReq req = {0};
3,216✔
1484
  if (tDeserializeSVndSetKeepVersionReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead),
3,216✔
1485
                                        &req) != 0) {
UNCOV
1486
    terrno = TSDB_CODE_INVALID_MSG;
×
UNCOV
1487
    return -1;
×
1488
  }
1489

1490
  dInfo("vgId:%d, set wal keep version to %" PRId64, pHead->vgId, req.keepVersion);
3,216✔
1491

1492
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
3,216✔
1493
  if (pVnode == NULL) {
3,216✔
1494
    dError("vgId:%d, failed to set keep version since %s", pHead->vgId, terrstr());
×
1495
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
UNCOV
1496
    return -1;
×
1497
  }
1498

1499
  // Directly call vnodeSetWalKeepVersion for immediate effect (< 1ms)
1500
  // This bypasses Raft to avoid timing issues where WAL might be deleted
1501
  // before keepVersion is set through the Raft consensus process
1502
  int32_t code = vnodeSetWalKeepVersion(pVnode->pImpl, req.keepVersion);
3,216✔
1503
  if (code != TSDB_CODE_SUCCESS) {
3,216✔
1504
    dError("vgId:%d, failed to set keepVersion to %" PRId64 " since %s", pHead->vgId, req.keepVersion, tstrerror(code));
×
UNCOV
1505
    terrno = code;
×
UNCOV
1506
    vmReleaseVnode(pMgmt, pVnode);
×
UNCOV
1507
    return -1;
×
1508
  }
1509

1510
  dInfo("vgId:%d, successfully set keepVersion to %" PRId64, pHead->vgId, req.keepVersion);
3,216✔
1511

1512
  vmReleaseVnode(pMgmt, pVnode);
3,216✔
1513
  return 0;
3,216✔
1514
}
1515

1516
int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
24,252✔
1517
  SAlterVnodeHashRangeReq req = {0};
24,252✔
1518
  if (tDeserializeSAlterVnodeHashRangeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
24,252✔
UNCOV
1519
    terrno = TSDB_CODE_INVALID_MSG;
×
UNCOV
1520
    return -1;
×
1521
  }
1522

1523
  int32_t srcVgId = req.srcVgId;
24,252✔
1524
  int32_t dstVgId = req.dstVgId;
24,252✔
1525

1526
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, dstVgId);
24,252✔
1527
  if (pVnode != NULL) {
24,252✔
1528
    dError("vgId:%d, vnode already exist", dstVgId);
×
UNCOV
1529
    vmReleaseVnode(pMgmt, pVnode);
×
UNCOV
1530
    terrno = TSDB_CODE_VND_ALREADY_EXIST;
×
UNCOV
1531
    return -1;
×
1532
  }
1533

1534
  dInfo("vgId:%d, start to alter vnode hashrange:[%u, %u], dstVgId:%d", req.srcVgId, req.hashBegin, req.hashEnd,
24,252✔
1535
        req.dstVgId);
1536
  pVnode = vmAcquireVnode(pMgmt, srcVgId);
24,252✔
1537
  if (pVnode == NULL) {
24,252✔
1538
    dError("vgId:%d, failed to alter hashrange since %s", srcVgId, terrstr());
×
1539
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
UNCOV
1540
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
UNCOV
1541
    return -1;
×
1542
  }
1543

1544
  SWrapperCfg wrapperCfg = {
24,252✔
1545
      .dropped = pVnode->dropped,
24,252✔
1546
      .vgId = dstVgId,
1547
      .vgVersion = pVnode->vgVersion,
24,252✔
1548
      .diskPrimary = pVnode->diskPrimary,
24,252✔
1549
  };
1550
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
24,252✔
1551

1552
  // prepare alter
1553
  pVnode->toVgId = dstVgId;
24,252✔
1554
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
24,252✔
UNCOV
1555
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
UNCOV
1556
    return -1;
×
1557
  }
1558

1559
  dInfo("vgId:%d, close vnode", srcVgId);
24,252✔
1560
  vmCloseVnode(pMgmt, pVnode, true, false);
24,252✔
1561

1562
  int32_t diskPrimary = wrapperCfg.diskPrimary;
24,252✔
1563
  char    srcPath[TSDB_FILENAME_LEN] = {0};
24,252✔
1564
  char    dstPath[TSDB_FILENAME_LEN] = {0};
24,252✔
1565
  snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
24,252✔
1566
  snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
24,252✔
1567

1568
  dInfo("vgId:%d, alter vnode hashrange at %s", srcVgId, srcPath);
24,252✔
1569
  if (vnodeAlterHashRange(srcPath, dstPath, &req, diskPrimary, pMgmt->pTfs) < 0) {
24,252✔
UNCOV
1570
    dError("vgId:%d, failed to alter vnode hashrange since %s", srcVgId, terrstr());
×
UNCOV
1571
    return -1;
×
1572
  }
1573

1574
  dInfo("vgId:%d, open vnode", dstVgId);
24,252✔
1575
  SVnode *pImpl = vnodeOpen(dstPath, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
24,252✔
1576

1577
  if (pImpl == NULL) {
24,252✔
1578
    dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr());
×
1579
    return -1;
×
1580
  }
1581

1582
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
24,252✔
UNCOV
1583
    dError("vgId:%d, failed to open vnode mgmt since %s", dstVgId, terrstr());
×
UNCOV
1584
    return -1;
×
1585
  }
1586

1587
  if (vnodeStart(pImpl) != 0) {
24,252✔
UNCOV
1588
    dError("vgId:%d, failed to start sync since %s", dstVgId, terrstr());
×
UNCOV
1589
    return -1;
×
1590
  }
1591

1592
  // complete alter
1593
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
24,252✔
UNCOV
1594
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
UNCOV
1595
    return -1;
×
1596
  }
1597

1598
  dInfo("vgId:%d, vnode hashrange is altered", dstVgId);
24,252✔
1599
  return 0;
24,252✔
1600
}
1601

1602
int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
635,066✔
1603
  SAlterVnodeReplicaReq alterReq = {0};
635,066✔
1604
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
635,066✔
UNCOV
1605
    terrno = TSDB_CODE_INVALID_MSG;
×
UNCOV
1606
    return -1;
×
1607
  }
1608

1609
  if (alterReq.learnerReplica == 0) {
635,066✔
1610
    alterReq.learnerSelfIndex = -1;
454,860✔
1611
  }
1612

1613
  int32_t vgId = alterReq.vgId;
635,066✔
1614
  dInfo(
635,066✔
1615
      "vgId:%d, vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
1616
      "learnerSelfIndex:%d strict:%d changeVersion:%d",
1617
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
1618
      alterReq.learnerSelfIndex, alterReq.strict, alterReq.changeVersion);
1619

1620
  for (int32_t i = 0; i < alterReq.replica; ++i) {
2,476,428✔
1621
    SReplica *pReplica = &alterReq.replicas[i];
1,841,362✔
1622
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
1,841,362✔
1623
  }
1624
  for (int32_t i = 0; i < alterReq.learnerReplica; ++i) {
815,272✔
1625
    SReplica *pReplica = &alterReq.learnerReplicas[i];
180,206✔
1626
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
180,206✔
1627
  }
1628

1629
  if (alterReq.replica <= 0 || (alterReq.selfIndex < 0 && alterReq.learnerSelfIndex < 0) ||
635,066✔
1630
      alterReq.selfIndex >= alterReq.replica || alterReq.learnerSelfIndex >= alterReq.learnerReplica) {
635,066✔
UNCOV
1631
    terrno = TSDB_CODE_INVALID_MSG;
×
UNCOV
1632
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
UNCOV
1633
    return -1;
×
1634
  }
1635

1636
  SReplica *pReplica = NULL;
635,066✔
1637
  if (alterReq.selfIndex != -1) {
635,066✔
1638
    pReplica = &alterReq.replicas[alterReq.selfIndex];
635,066✔
1639
  } else {
1640
    pReplica = &alterReq.learnerReplicas[alterReq.learnerSelfIndex];
×
1641
  }
1642

1643
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
635,066✔
1644
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
635,066✔
UNCOV
1645
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
UNCOV
1646
    dError("vgId:%d, dnodeId:%d ep:%s:%u in request, ep:%s:%u in lcoal, %s", vgId, pReplica->id, pReplica->fqdn,
×
1647
           pReplica->port, tsLocalFqdn, tsServerPort, tstrerror(terrno));
1648
    return -1;
×
1649
  }
1650

1651
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
635,066✔
1652
  if (pVnode == NULL) {
635,066✔
1653
    dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
×
1654
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
UNCOV
1655
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1656
    return -1;
×
1657
  }
1658

1659
  dInfo("vgId:%d, start to close vnode", vgId);
635,066✔
1660
  SWrapperCfg wrapperCfg = {
635,066✔
1661
      .dropped = pVnode->dropped,
635,066✔
1662
      .vgId = pVnode->vgId,
635,066✔
1663
      .vgVersion = pVnode->vgVersion,
635,066✔
1664
      .diskPrimary = pVnode->diskPrimary,
635,066✔
1665
  };
1666
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
635,066✔
1667

1668
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
635,066✔
1669
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
635,066✔
1670

1671
  int32_t diskPrimary = wrapperCfg.diskPrimary;
635,066✔
1672
  char    path[TSDB_FILENAME_LEN] = {0};
635,066✔
1673
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
635,066✔
1674

1675
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
635,066✔
1676
  if (vnodeAlterReplica(path, &alterReq, diskPrimary, pMgmt->pTfs) < 0) {
635,066✔
UNCOV
1677
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
UNCOV
1678
    return -1;
×
1679
  }
1680

1681
  dInfo("vgId:%d, begin to open vnode", vgId);
635,066✔
1682
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
635,066✔
1683
  if (pImpl == NULL) {
635,066✔
UNCOV
1684
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
1685
    return -1;
×
1686
  }
1687

1688
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
635,066✔
UNCOV
1689
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
UNCOV
1690
    return -1;
×
1691
  }
1692

1693
  if (vnodeStart(pImpl) != 0) {
635,066✔
UNCOV
1694
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
UNCOV
1695
    return -1;
×
1696
  }
1697

1698
  dInfo(
635,066✔
1699
      "vgId:%d, vnode management handle msgType:%s, end to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
1700
      "learnerSelfIndex:%d strict:%d",
1701
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
1702
      alterReq.learnerSelfIndex, alterReq.strict);
1703
  return 0;
635,066✔
1704
}
1705

1706
int32_t vmProcessAlterVnodeElectBaselineReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
18,780✔
1707
  SAlterVnodeElectBaselineReq alterReq = {0};
18,780✔
1708
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
18,780✔
UNCOV
1709
    return TSDB_CODE_INVALID_MSG;
×
1710
  }
1711

1712
  int32_t vgId = alterReq.vgId;
18,780✔
1713
  dInfo(
18,780✔
1714
      "vgId:%d, process alter vnode elect-base-line msgType:%s, electBaseLine:%d",
1715
      vgId, TMSG_INFO(pMsg->msgType), alterReq.electBaseLine);
1716

1717
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
18,780✔
1718
  if (pVnode == NULL) {
18,780✔
UNCOV
1719
    dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
×
UNCOV
1720
    return terrno;
×
1721
  }
1722

1723
  if(vnodeSetElectBaseline(pVnode->pImpl, alterReq.electBaseLine) != 0){
18,780✔
UNCOV
1724
    vmReleaseVnode(pMgmt, pVnode);
×
UNCOV
1725
    return -1;
×
1726
  }
1727

1728
  vmReleaseVnode(pMgmt, pVnode);
18,780✔
1729
  return 0;
18,780✔
1730
}
1731

1732
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
1,443,541✔
1733
  int32_t       code = 0;
1,443,541✔
1734
  SDropVnodeReq dropReq = {0};
1,443,541✔
1735
  if (tDeserializeSDropVnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
1,443,541✔
UNCOV
1736
    terrno = TSDB_CODE_INVALID_MSG;
×
UNCOV
1737
    return terrno;
×
1738
  }
1739

1740
  int32_t vgId = dropReq.vgId;
1,443,541✔
1741
  dInfo("vgId:%d, start to drop vnode", vgId);
1,443,541✔
1742

1743
  if (dropReq.dnodeId != pMgmt->pData->dnodeId) {
1,443,541✔
1744
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1745
    dError("vgId:%d, dnodeId:%d, %s", dropReq.vgId, dropReq.dnodeId, tstrerror(terrno));
×
UNCOV
1746
    return terrno;
×
1747
  }
1748

1749
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, vgId, false);
1,443,541✔
1750
  if (pVnode == NULL) {
1,443,541✔
UNCOV
1751
    dInfo("vgId:%d, failed to drop since %s", vgId, terrstr());
×
1752
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1753
    return terrno;
×
1754
  }
1755

1756
  pVnode->dropped = 1;
1,443,541✔
1757
  if ((code = vmWriteVnodeListToFile(pMgmt)) != 0) {
1,443,541✔
UNCOV
1758
    pVnode->dropped = 0;
×
1759
    vmReleaseVnode(pMgmt, pVnode);
×
1760
    return code;
×
1761
  }
1762

1763
  vmCloseVnode(pMgmt, pVnode, false, false);
1,443,541✔
1764
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
1,443,541✔
UNCOV
1765
    dError("vgId:%d, failed to write vnode list since %s", vgId, terrstr());
×
1766
  }
1767

1768
  dInfo("vgId:%d, is dropped", vgId);
1,443,541✔
1769
  return 0;
1,443,541✔
1770
}
1771

1772
int32_t vmProcessArbHeartBeatReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
109,097✔
1773
  SVArbHeartBeatReq arbHbReq = {0};
109,097✔
1774
  SVArbHeartBeatRsp arbHbRsp = {0};
109,097✔
1775
  if (tDeserializeSVArbHeartBeatReq(pMsg->pCont, pMsg->contLen, &arbHbReq) != 0) {
109,097✔
UNCOV
1776
    terrno = TSDB_CODE_INVALID_MSG;
×
UNCOV
1777
    return -1;
×
1778
  }
1779

1780
  if (arbHbReq.dnodeId != pMgmt->pData->dnodeId) {
109,097✔
UNCOV
1781
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
UNCOV
1782
    dError("dnodeId:%d, %s", arbHbReq.dnodeId, tstrerror(terrno));
×
UNCOV
1783
    goto _OVER;
×
1784
  }
1785

1786
  if (strlen(arbHbReq.arbToken) == 0) {
109,097✔
UNCOV
1787
    terrno = TSDB_CODE_INVALID_MSG;
×
UNCOV
1788
    dError("dnodeId:%d arbToken is empty", arbHbReq.dnodeId);
×
1789
    goto _OVER;
×
1790
  }
1791

1792
  size_t size = taosArrayGetSize(arbHbReq.hbMembers);
109,097✔
1793

1794
  arbHbRsp.dnodeId = pMgmt->pData->dnodeId;
109,097✔
1795
  tstrncpy(arbHbRsp.arbToken, arbHbReq.arbToken, TSDB_ARB_TOKEN_SIZE);
109,097✔
1796
  arbHbRsp.hbMembers = taosArrayInit(size, sizeof(SVArbHbRspMember));
109,097✔
1797
  if (arbHbRsp.hbMembers == NULL) {
109,097✔
UNCOV
1798
    goto _OVER;
×
1799
  }
1800

1801
  for (int32_t i = 0; i < size; i++) {
230,026✔
1802
    SVArbHbReqMember *pReqMember = taosArrayGet(arbHbReq.hbMembers, i);
120,929✔
1803
    SVnodeObj        *pVnode = vmAcquireVnode(pMgmt, pReqMember->vgId);
120,929✔
1804
    if (pVnode == NULL) {
120,929✔
1805
      dError("dnodeId:%d vgId:%d not found failed to process arb hb req", arbHbReq.dnodeId, pReqMember->vgId);
26,637✔
1806
      continue;
26,637✔
1807
    }
1808

1809
    SVArbHbRspMember rspMember = {0};
94,292✔
1810
    rspMember.vgId = pReqMember->vgId;
94,292✔
1811
    rspMember.hbSeq = pReqMember->hbSeq;
94,292✔
1812
    if (vnodeGetArbToken(pVnode->pImpl, rspMember.memberToken) != 0) {
94,292✔
UNCOV
1813
      dError("dnodeId:%d vgId:%d failed to get arb token", arbHbReq.dnodeId, pReqMember->vgId);
×
UNCOV
1814
      vmReleaseVnode(pMgmt, pVnode);
×
UNCOV
1815
      continue;
×
1816
    }
1817

1818
    if (vnodeUpdateArbTerm(pVnode->pImpl, arbHbReq.arbTerm) != 0) {
94,292✔
UNCOV
1819
      dError("dnodeId:%d vgId:%d failed to update arb term", arbHbReq.dnodeId, pReqMember->vgId);
×
UNCOV
1820
      vmReleaseVnode(pMgmt, pVnode);
×
1821
      continue;
×
1822
    }
1823

1824
    if (taosArrayPush(arbHbRsp.hbMembers, &rspMember) == NULL) {
188,584✔
UNCOV
1825
      dError("dnodeId:%d vgId:%d failed to push arb hb rsp member", arbHbReq.dnodeId, pReqMember->vgId);
×
UNCOV
1826
      vmReleaseVnode(pMgmt, pVnode);
×
1827
      goto _OVER;
×
1828
    }
1829

1830
    vmReleaseVnode(pMgmt, pVnode);
94,292✔
1831
  }
1832

1833
  SRpcMsg rspMsg = {.info = pMsg->info};
109,097✔
1834
  int32_t rspLen = tSerializeSVArbHeartBeatRsp(NULL, 0, &arbHbRsp);
109,097✔
1835
  if (rspLen < 0) {
109,097✔
UNCOV
1836
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
1837
    goto _OVER;
×
1838
  }
1839

1840
  void *pRsp = rpcMallocCont(rspLen);
109,097✔
1841
  if (pRsp == NULL) {
109,097✔
UNCOV
1842
    terrno = terrno;
×
UNCOV
1843
    goto _OVER;
×
1844
  }
1845

1846
  if (tSerializeSVArbHeartBeatRsp(pRsp, rspLen, &arbHbRsp) <= 0) {
109,097✔
UNCOV
1847
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
1848
    rpcFreeCont(pRsp);
×
UNCOV
1849
    goto _OVER;
×
1850
  }
1851
  pMsg->info.rsp = pRsp;
109,097✔
1852
  pMsg->info.rspLen = rspLen;
109,097✔
1853

1854
  terrno = TSDB_CODE_SUCCESS;
109,097✔
1855

1856
_OVER:
109,097✔
1857
  tFreeSVArbHeartBeatReq(&arbHbReq);
109,097✔
1858
  tFreeSVArbHeartBeatRsp(&arbHbRsp);
109,097✔
1859
  return terrno;
109,097✔
1860
}
1861

1862
SArray *vmGetMsgHandles() {
556,972✔
1863
  int32_t code = -1;
556,972✔
1864
  SArray *pArray = taosArrayInit(32, sizeof(SMgmtHandle));
556,972✔
1865
  if (pArray == NULL) goto _OVER;
556,972✔
1866

1867
  if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
556,972✔
1868
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
556,972✔
1869
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
556,972✔
1870
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
556,972✔
1871
  if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
556,972✔
1872
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
556,972✔
1873
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
556,972✔
1874
  if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
556,972✔
1875
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
556,972✔
1876
  if (dmSetMgmtHandle(pArray, TDMT_VND_VSUBTABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
556,972✔
1877
  if (dmSetMgmtHandle(pArray, TDMT_VND_VSTB_REF_DBS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
556,972✔
1878
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_CFG, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
556,972✔
1879
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
556,972✔
1880
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
556,972✔
1881
  if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
556,972✔
1882
  if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
556,972✔
1883
  if (dmSetMgmtHandle(pArray, TDMT_SCH_TASK_NOTIFY, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
556,972✔
1884
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
556,972✔
1885
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
556,972✔
1886
  if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_TTL_EXPIRED_TBS, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
556,972✔
1887
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
556,972✔
1888
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
556,972✔
1889
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
556,972✔
1890
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
556,972✔
1891
  if (dmSetMgmtHandle(pArray, TDMT_VND_SNODE_DROP_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
556,972✔
1892
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
556,972✔
1893
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DELETE_SUB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
556,972✔
1894
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
556,972✔
1895
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SEEK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
556,972✔
1896
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
556,972✔
1897
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME_PUSH, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
556,972✔
1898
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_WALINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
556,972✔
1899
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_COMMITTEDINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
556,972✔
1900
  if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
556,972✔
1901
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
556,972✔
1902
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
556,972✔
1903
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
556,972✔
1904
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
556,972✔
1905
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
556,972✔
1906
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_COMPACT_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
556,972✔
1907
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_TRIM_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
556,972✔
1908
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_SCAN_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
556,972✔
1909
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
556,972✔
1910
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_SCAN, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
556,972✔
1911
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
556,972✔
1912
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_NAME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
556,972✔
1913
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
556,972✔
1914
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
556,972✔
1915
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
556,972✔
1916

1917
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
556,972✔
1918
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
556,972✔
1919
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
556,972✔
1920
  if (dmSetMgmtHandle(pArray, TDMT_VND_DISABLE_WRITE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
556,972✔
1921
  if (dmSetMgmtHandle(pArray, TDMT_VND_SET_KEEP_VERSION, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
556,972✔
1922
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
556,972✔
1923
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
556,972✔
1924
  if (dmSetMgmtHandle(pArray, TDMT_VND_SCAN, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
556,972✔
1925
  if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
556,972✔
1926
  if (dmSetMgmtHandle(pArray, TDMT_VND_LIST_SSMIGRATE_FILESETS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
556,972✔
1927
  if (dmSetMgmtHandle(pArray, TDMT_VND_SSMIGRATE_FILESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
556,972✔
1928
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_SSMIGRATE_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
556,972✔
1929
  if (dmSetMgmtHandle(pArray, TDMT_VND_FOLLOWER_SSMIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
556,972✔
1930
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_SSMIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
556,972✔
1931
  if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM_WAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
556,972✔
1932
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
556,972✔
1933
  if (dmSetMgmtHandle(pArray, TDMT_DND_MOUNT_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
556,972✔
1934
  if (dmSetMgmtHandle(pArray, TDMT_DND_RETRIEVE_MOUNT_PATH, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
556,972✔
1935
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
556,972✔
1936
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
556,972✔
1937
  if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
556,972✔
1938
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CONFIG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
556,972✔
1939
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_ELECTBASELINE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
556,972✔
1940

1941
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT_ELECTION, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
556,972✔
1942
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
556,972✔
1943
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
556,972✔
1944
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
556,972✔
1945
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
556,972✔
1946
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
556,972✔
1947
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
556,972✔
1948
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
556,972✔
1949
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
556,972✔
1950
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
556,972✔
1951
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
556,972✔
1952
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_FORCE_FOLLOWER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
556,972✔
1953

1954
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
556,972✔
1955
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
556,972✔
1956
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
556,972✔
1957
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
556,972✔
1958
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
556,972✔
1959

1960
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_HEARTBEAT, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
556,972✔
1961
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_CHECK_SYNC, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
556,972✔
1962
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_ASSIGNED_LEADER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
556,972✔
1963

1964
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_FETCH, vmPutMsgToStreamReaderQueue, 0) == NULL) goto _OVER;
556,972✔
1965
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_PULL, vmPutMsgToStreamReaderQueue, 0) == NULL) goto _OVER;
556,972✔
1966
  code = 0;
556,972✔
1967

1968
_OVER:
556,972✔
1969
  if (code != 0) {
556,972✔
UNCOV
1970
    taosArrayDestroy(pArray);
×
UNCOV
1971
    return NULL;
×
1972
  } else {
1973
    return pArray;
556,972✔
1974
  }
1975
}
1976

UNCOV
1977
void vmUpdateMetricsInfo(SVnodeMgmt *pMgmt, int64_t clusterId) {
×
1978
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
1979

UNCOV
1980
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
UNCOV
1981
  while (pIter) {
×
UNCOV
1982
    SVnodeObj **ppVnode = pIter;
×
UNCOV
1983
    if (ppVnode == NULL || *ppVnode == NULL) {
×
UNCOV
1984
      continue;
×
1985
    }
1986

UNCOV
1987
    SVnodeObj *pVnode = *ppVnode;
×
1988
    if (!pVnode->failed) {
×
1989
      SRawWriteMetrics metrics = {0};
×
1990
      if (vnodeGetRawWriteMetrics(pVnode->pImpl, &metrics) == 0) {
×
1991
        // Add the metrics to the global metrics system with cluster ID
1992
        SName   name = {0};
×
UNCOV
1993
        int32_t code = tNameFromString(&name, pVnode->pImpl->config.dbname, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
UNCOV
1994
        if (code < 0) {
×
1995
          dError("failed to get db name since %s", tstrerror(code));
×
1996
          continue;
×
1997
        }
1998
        code = addWriteMetrics(pVnode->vgId, pMgmt->pData->dnodeId, clusterId, tsLocalEp, name.dbname, &metrics);
×
UNCOV
1999
        if (code != TSDB_CODE_SUCCESS) {
×
2000
          dError("Failed to add write metrics for vgId: %d, code: %d", pVnode->vgId, code);
×
2001
        } else {
2002
          // After successfully adding metrics, reset the vnode's write metrics using atomic operations
2003
          if (vnodeResetRawWriteMetrics(pVnode->pImpl, &metrics) != 0) {
×
2004
            dError("Failed to reset write metrics for vgId: %d", pVnode->vgId);
×
2005
          }
2006
        }
2007
      } else {
2008
        dError("Failed to get write metrics for vgId: %d", pVnode->vgId);
×
2009
      }
2010
    }
2011
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
2012
  }
2013

UNCOV
2014
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
UNCOV
2015
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc