• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4876

10 Dec 2025 05:56AM UTC coverage: 64.632% (+0.2%) from 64.472%
#4876

push

travis-ci

guanshengliang
test: fix idmp case with checkDataMemLoop checked (#33862)

4 of 9 new or added lines in 3 files covered. (44.44%)

380 existing lines in 104 files now uncovered.

162866 of 251990 relevant lines covered (64.63%)

107950382.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.16
/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "metrics.h"
18
#include "taos_monitor.h"
19
#include "vmInt.h"
20
#include "vnd.h"
21
#include "vnodeInt.h"
22

23
extern taos_counter_t *tsInsertCounter;
24

25
// Forward declaration for function defined in metrics.c
26
extern int32_t addWriteMetrics(int32_t vgId, int32_t dnodeId, int64_t clusterId, const char *dnodeEp,
27
                               const char *dbname, const SRawWriteMetrics *pRawMetrics);
28

29
void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
56,302,092✔
30
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad));
56,302,092✔
31
  if (pInfo->pVloads == NULL) return;
56,302,092✔
32

33
  tfsUpdateSize(pMgmt->pTfs);
56,302,092✔
34

35
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
56,302,092✔
36

37
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
56,302,092✔
38
  while (pIter) {
206,896,412✔
39
    SVnodeObj **ppVnode = pIter;
150,594,320✔
40
    if (ppVnode == NULL || *ppVnode == NULL) continue;
150,594,320✔
41

42
    SVnodeObj *pVnode = *ppVnode;
150,594,320✔
43
    SVnodeLoad vload = {.vgId = pVnode->vgId};
150,594,320✔
44
    if (!pVnode->failed) {
150,594,320✔
45
      if (vnodeGetLoad(pVnode->pImpl, &vload) != 0) {
150,594,320✔
46
        dError("failed to get vnode load");
×
47
      }
48
      if (isReset) vnodeResetLoad(pVnode->pImpl, &vload);
150,594,320✔
49
    }
50
    if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
301,188,640✔
51
      dError("failed to push vnode load");
×
52
    }
53
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
150,594,320✔
54
  }
55

56
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
56,302,092✔
57
}
58

59
void vmSetVnodeSyncTimeout(SVnodeMgmt *pMgmt) {
×
60
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
61

62
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
63
  while (pIter) {
×
64
    SVnodeObj **ppVnode = pIter;
×
65
    if (ppVnode == NULL || *ppVnode == NULL) continue;
×
66

67
    SVnodeObj *pVnode = *ppVnode;
×
68

69
    if (vnodeSetSyncTimeout(pVnode->pImpl, tsVnodeElectIntervalMs) != 0) {
×
70
      dError("vgId:%d, failed to vnodeSetSyncTimeout", pVnode->vgId);
×
71
    }
72
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
73
  }
74

75
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
76
}
×
77

78
void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
×
79
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoadLite));
×
80
  if (!pInfo->pVloads) return;
×
81

82
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
83

84
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
85
  while (pIter) {
×
86
    SVnodeObj **ppVnode = pIter;
×
87
    if (ppVnode == NULL || *ppVnode == NULL) continue;
×
88

89
    SVnodeObj *pVnode = *ppVnode;
×
90
    if (!pVnode->failed) {
×
91
      SVnodeLoadLite vload = {0};
×
92
      if (vnodeGetLoadLite(pVnode->pImpl, &vload) == 0) {
×
93
        if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
×
94
          taosArrayDestroy(pInfo->pVloads);
×
95
          pInfo->pVloads = NULL;
×
96
          break;
×
97
        }
98
      }
99
    }
100
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
101
  }
102

103
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
104
}
105

106
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
250✔
107
  SMonVloadInfo vloads = {0};
250✔
108
  vmGetVnodeLoads(pMgmt, &vloads, true);
250✔
109

110
  SArray *pVloads = vloads.pVloads;
250✔
111
  if (pVloads == NULL) return;
250✔
112

113
  int32_t totalVnodes = 0;
250✔
114
  int32_t masterNum = 0;
250✔
115
  int64_t numOfSelectReqs = 0;
250✔
116
  int64_t numOfInsertReqs = 0;
250✔
117
  int64_t numOfInsertSuccessReqs = 0;
250✔
118
  int64_t numOfBatchInsertReqs = 0;
250✔
119
  int64_t numOfBatchInsertSuccessReqs = 0;
250✔
120

121
  for (int32_t i = 0; i < taosArrayGetSize(pVloads); ++i) {
1,250✔
122
    SVnodeLoad *pLoad = taosArrayGet(pVloads, i);
1,000✔
123
    numOfSelectReqs += pLoad->numOfSelectReqs;
1,000✔
124
    numOfInsertReqs += pLoad->numOfInsertReqs;
1,000✔
125
    numOfInsertSuccessReqs += pLoad->numOfInsertSuccessReqs;
1,000✔
126
    numOfBatchInsertReqs += pLoad->numOfBatchInsertReqs;
1,000✔
127
    numOfBatchInsertSuccessReqs += pLoad->numOfBatchInsertSuccessReqs;
1,000✔
128
    if (pLoad->syncState == TAOS_SYNC_STATE_LEADER || pLoad->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
1,000✔
129
      masterNum++;
1,000✔
130
    }
131
    totalVnodes++;
1,000✔
132
  }
133

134
  pInfo->vstat.totalVnodes = totalVnodes;
250✔
135
  pInfo->vstat.masterNum = masterNum;
250✔
136
  pInfo->vstat.numOfSelectReqs = numOfSelectReqs;
250✔
137
  pInfo->vstat.numOfInsertReqs = numOfInsertReqs;                          // delta
250✔
138
  pInfo->vstat.numOfInsertSuccessReqs = numOfInsertSuccessReqs;            // delta
250✔
139
  pInfo->vstat.numOfBatchInsertReqs = numOfBatchInsertReqs;                // delta
250✔
140
  pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;  // delta
250✔
141
  pMgmt->state.totalVnodes = totalVnodes;
250✔
142
  pMgmt->state.masterNum = masterNum;
250✔
143
  pMgmt->state.numOfSelectReqs = numOfSelectReqs;
250✔
144
  pMgmt->state.numOfInsertReqs = numOfInsertReqs;
250✔
145
  pMgmt->state.numOfInsertSuccessReqs = numOfInsertSuccessReqs;
250✔
146
  pMgmt->state.numOfBatchInsertReqs = numOfBatchInsertReqs;
250✔
147
  pMgmt->state.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;
250✔
148

149
  if (tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs) != 0) {
250✔
150
    dError("failed to get tfs monitor info");
×
151
  }
152
  taosArrayDestroy(pVloads);
250✔
153
}
154

155
void vmCleanExpriedSamples(SVnodeMgmt *pMgmt) {
250✔
156
  int list_size = taos_counter_get_keys_size(tsInsertCounter);
250✔
157
  if (list_size == 0) return;
250✔
158
  int32_t *vgroup_ids;
×
159
  char   **keys;
×
160
  int      r = 0;
×
161
  r = taos_counter_get_vgroup_ids(tsInsertCounter, &keys, &vgroup_ids, &list_size);
×
162
  if (r) {
×
163
    dError("failed to get vgroup ids");
×
164
    return;
×
165
  }
166
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
167
  for (int i = 0; i < list_size; i++) {
×
168
    int32_t vgroup_id = vgroup_ids[i];
×
169
    void   *vnode = taosHashGet(pMgmt->runngingHash, &vgroup_id, sizeof(int32_t));
×
170
    if (vnode == NULL) {
×
171
      r = taos_counter_delete(tsInsertCounter, keys[i]);
×
172
      if (r) {
×
173
        dError("failed to delete monitor sample key:%s", keys[i]);
×
174
      }
175
    }
176
  }
177
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
178
  if (vgroup_ids) taosMemoryFree(vgroup_ids);
×
179
  if (keys) taosMemoryFree(keys);
×
180
  return;
×
181
}
182

183
void vmCleanExpiredMetrics(SVnodeMgmt *pMgmt) {
×
184
  if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0 || !tsEnableMetrics) {
×
185
    return;
×
186
  }
187

188
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
189
  void     *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
190
  SHashObj *pValidVgroups = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
×
191
  if (pValidVgroups == NULL) {
×
192
    (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
193
    return;
×
194
  }
195

196
  while (pIter != NULL) {
×
197
    SVnodeObj **ppVnode = pIter;
×
198
    if (ppVnode && *ppVnode) {
×
199
      int32_t vgId = (*ppVnode)->vgId;
×
200
      char    dummy = 1;  // hash table value (we only care about the key)
×
201
      if (taosHashPut(pValidVgroups, &vgId, sizeof(int32_t), &dummy, sizeof(char)) != 0) {
×
202
        dError("failed to put vgId:%d to valid vgroups hash", vgId);
×
203
      }
204
    }
205
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
206
  }
207
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
208

209
  // Clean expired metrics by removing metrics for non-existent vgroups
210
  int32_t code = cleanupExpiredMetrics(pValidVgroups);
×
211
  if (code != TSDB_CODE_SUCCESS) {
×
212
    dError("failed to clean expired metrics, code:%d", code);
×
213
  }
214

215
  taosHashCleanup(pValidVgroups);
×
216
}
217

218
static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
2,989,589✔
219
  memcpy(pCfg, &vnodeCfgDefault, sizeof(SVnodeCfg));
2,989,589✔
220

221
  pCfg->vgId = pCreate->vgId;
2,989,589✔
222
  tstrncpy(pCfg->dbname, pCreate->db, sizeof(pCfg->dbname));
2,987,859✔
223
  pCfg->dbId = pCreate->dbUid;
2,989,589✔
224
  pCfg->szPage = pCreate->pageSize * 1024;
2,989,589✔
225
  pCfg->szCache = pCreate->pages;
2,975,927✔
226
  pCfg->cacheLast = pCreate->cacheLast;
2,988,801✔
227
  pCfg->cacheLastSize = pCreate->cacheLastSize;
2,988,700✔
228
  pCfg->szBuf = (uint64_t)pCreate->buffer * 1024 * 1024;
2,989,488✔
229
  pCfg->isWeak = true;
2,989,359✔
230
  pCfg->isTsma = pCreate->isTsma;
2,989,258✔
231
  pCfg->tsdbCfg.compression = pCreate->compression;
2,987,612✔
232
  pCfg->tsdbCfg.precision = pCreate->precision;
2,988,201✔
233
  pCfg->tsdbCfg.days = pCreate->daysPerFile;
2,987,333✔
234
  pCfg->tsdbCfg.keep0 = pCreate->daysToKeep0;
2,987,916✔
235
  pCfg->tsdbCfg.keep1 = pCreate->daysToKeep1;
2,986,463✔
236
  pCfg->tsdbCfg.keep2 = pCreate->daysToKeep2;
2,987,259✔
237
  pCfg->tsdbCfg.keepTimeOffset = pCreate->keepTimeOffset;
2,984,401✔
238
  pCfg->tsdbCfg.minRows = pCreate->minRows;
2,986,495✔
239
  pCfg->tsdbCfg.maxRows = pCreate->maxRows;
2,985,961✔
240
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
241
  // pCfg->tsdbCfg.encryptAlgr = pCreate->encryptAlgr;
242
  tstrncpy(pCfg->tsdbCfg.encryptData.encryptAlgrName, pCreate->encryptAlgrName, TSDB_ENCRYPT_ALGR_NAME_LEN);
2,988,450✔
243
  if (pCfg->tsdbCfg.encryptAlgr == DND_CA_SM4 || pCfg->tsdbCfg.encryptData.encryptAlgrName[0] != '\0') {
2,988,251✔
244
    tstrncpy(pCfg->tsdbCfg.encryptData.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
3,270✔
245
  }
246
#else
247
  pCfg->tsdbCfg.encryptAlgr = 0;
248
#endif
249

250
  pCfg->walCfg.vgId = pCreate->vgId;  // pCreate->mountVgId ? pCreate->mountVgId : pCreate->vgId;
2,982,855✔
251
  pCfg->walCfg.fsyncPeriod = pCreate->walFsyncPeriod;
2,982,524✔
252
  pCfg->walCfg.retentionPeriod = pCreate->walRetentionPeriod;
2,982,471✔
253
  pCfg->walCfg.rollPeriod = pCreate->walRollPeriod;
2,985,954✔
254
  pCfg->walCfg.retentionSize = pCreate->walRetentionSize;
2,984,118✔
255
  pCfg->walCfg.segSize = pCreate->walSegmentSize;
2,985,449✔
256
  pCfg->walCfg.level = pCreate->walLevel;
2,983,274✔
257
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
258
  // pCfg->walCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
259
  tstrncpy(pCfg->walCfg.encryptData.encryptAlgrName, pCreate->encryptAlgrName, TSDB_ENCRYPT_ALGR_NAME_LEN);
2,984,628✔
260
  if (pCfg->walCfg.encryptAlgr == DND_CA_SM4 || pCfg->walCfg.encryptData.encryptAlgrName[0] != '\0') {
2,988,270✔
261
    tstrncpy(pCfg->walCfg.encryptData.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
1,413✔
262
  }
263
#else
264
  pCfg->walCfg.encryptAlgr = 0;
265
#endif
266

267
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
268
  // pCfg->tdbEncryptAlgorithm = pCreate->encryptAlgorithm;
269
  tstrncpy(pCfg->tdbEncryptData.encryptAlgrName, pCreate->encryptAlgrName, TSDB_ENCRYPT_ALGR_NAME_LEN);
2,983,646✔
270
  if (pCfg->tdbEncryptAlgr == DND_CA_SM4 || pCfg->tdbEncryptData.encryptAlgrName[0] != '\0') {
2,984,776✔
271
    tstrncpy(pCfg->tdbEncryptData.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
4,779✔
272
  }
273
#else
274
  pCfg->tdbEncryptAlgr = 0;
275
#endif
276

277
  pCfg->sttTrigger = pCreate->sstTrigger;
2,984,422✔
278
  pCfg->hashBegin = pCreate->hashBegin;
2,980,954✔
279
  pCfg->hashEnd = pCreate->hashEnd;
2,983,198✔
280
  pCfg->hashMethod = pCreate->hashMethod;
2,981,345✔
281
  pCfg->hashPrefix = pCreate->hashPrefix;
2,983,077✔
282
  pCfg->hashSuffix = pCreate->hashSuffix;
2,982,821✔
283
  pCfg->tsdbPageSize = pCreate->tsdbPageSize * 1024;
2,982,378✔
284

285
  pCfg->ssChunkSize = pCreate->ssChunkSize;
2,983,409✔
286
  pCfg->ssKeepLocal = pCreate->ssKeepLocal;
2,986,407✔
287
  pCfg->ssCompact = pCreate->ssCompact;
2,984,934✔
288

289
  pCfg->standby = 0;
2,980,237✔
290
  pCfg->syncCfg.replicaNum = 0;
2,983,629✔
291
  pCfg->syncCfg.totalReplicaNum = 0;
2,980,339✔
292
  pCfg->syncCfg.changeVersion = pCreate->changeVersion;
2,982,520✔
293

294
  memset(&pCfg->syncCfg.nodeInfo, 0, sizeof(pCfg->syncCfg.nodeInfo));
2,981,282✔
295
  for (int32_t i = 0; i < pCreate->replica; ++i) {
7,155,443✔
296
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
4,172,166✔
297
    pNode->nodeId = pCreate->replicas[pCfg->syncCfg.replicaNum].id;
4,168,502✔
298
    pNode->nodePort = pCreate->replicas[pCfg->syncCfg.replicaNum].port;
4,167,265✔
299
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
4,168,649✔
300
    tstrncpy(pNode->nodeFqdn, pCreate->replicas[pCfg->syncCfg.replicaNum].fqdn, TSDB_FQDN_LEN);
4,166,096✔
301
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
4,169,928✔
302
    pCfg->syncCfg.replicaNum++;
4,173,127✔
303
  }
304
  if (pCreate->selfIndex != -1) {
2,985,730✔
305
    pCfg->syncCfg.myIndex = pCreate->selfIndex;
2,888,625✔
306
  }
307
  for (int32_t i = pCfg->syncCfg.replicaNum; i < pCreate->replica + pCreate->learnerReplica; ++i) {
3,084,227✔
308
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
97,128✔
309
    pNode->nodeId = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].id;
97,128✔
310
    pNode->nodePort = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].port;
97,128✔
311
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
97,128✔
312
    tstrncpy(pNode->nodeFqdn, pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].fqdn, TSDB_FQDN_LEN);
97,128✔
313
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
97,128✔
314
    pCfg->syncCfg.totalReplicaNum++;
97,128✔
315
  }
316
  pCfg->syncCfg.totalReplicaNum += pCfg->syncCfg.replicaNum;
2,984,279✔
317
  if (pCreate->learnerSelfIndex != -1) {
2,984,995✔
318
    pCfg->syncCfg.myIndex = pCreate->replica + pCreate->learnerSelfIndex;
97,128✔
319
  }
320
}
2,985,041✔
321

322
static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) {
2,980,140✔
323
  pCfg->vgId = pCreate->vgId;
2,980,140✔
324
  pCfg->vgVersion = pCreate->vgVersion;
2,979,282✔
325
  pCfg->dropped = 0;
2,980,979✔
326
  snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCreate->vgId);
2,977,317✔
327
}
2,983,628✔
328

329
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
2,982,164✔
330
  SCreateVnodeReq req = {0};
2,982,164✔
331
  SVnodeCfg       vnodeCfg = {0};
2,987,807✔
332
  SWrapperCfg     wrapperCfg = {0};
2,985,719✔
333
  int32_t         code = -1;
2,986,150✔
334
  char            path[TSDB_FILENAME_LEN] = {0};
2,986,150✔
335

336
  if (tDeserializeSCreateVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
2,985,539✔
337
    return TSDB_CODE_INVALID_MSG;
×
338
  }
339

340
  if (req.learnerReplica == 0) {
2,987,807✔
341
    req.learnerSelfIndex = -1;
2,890,679✔
342
  }
343

344
  dInfo(
2,987,807✔
345
      "vgId:%d, vnode management handle msgType:%s, start to create vnode, page:%d pageSize:%d buffer:%d szPage:%d "
346
      "szBuf:%" PRIu64 ", cacheLast:%d cacheLastSize:%d sstTrigger:%d tsdbPageSize:%d %d dbname:%s dbId:%" PRId64
347
      ", days:%d keep0:%d keep1:%d keep2:%d keepTimeOffset%d ssChunkSize:%d ssKeepLocal:%d ssCompact:%d tsma:%d "
348
      "precision:%d compression:%d minRows:%d maxRows:%d"
349
      ", wal fsync:%d level:%d retentionPeriod:%d retentionSize:%" PRId64 " rollPeriod:%d segSize:%" PRId64
350
      ", hash method:%d begin:%u end:%u prefix:%d surfix:%d replica:%d selfIndex:%d "
351
      "learnerReplica:%d learnerSelfIndex:%d strict:%d changeVersion:%d encryptAlgorithm:%d encryptAlgrName:%s",
352
      req.vgId, TMSG_INFO(pMsg->msgType), req.pages, req.pageSize, req.buffer, req.pageSize * 1024,
353
      (uint64_t)req.buffer * 1024 * 1024, req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize,
354
      req.tsdbPageSize * 1024, req.db, req.dbUid, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
355
      req.keepTimeOffset, req.ssChunkSize, req.ssKeepLocal, req.ssCompact, req.isTsma, req.precision, req.compression,
356
      req.minRows, req.maxRows, req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize,
357
      req.walRollPeriod, req.walSegmentSize, req.hashMethod, req.hashBegin, req.hashEnd, req.hashPrefix, req.hashSuffix,
358
      req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict, req.changeVersion,
359
      req.encryptAlgorithm, req.encryptAlgrName);
360

361
  for (int32_t i = 0; i < req.replica; ++i) {
7,162,223✔
362
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port,
4,174,416✔
363
          req.replicas[i].id);
364
  }
365
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
3,084,935✔
366
    dInfo("vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.vgId, i, req.learnerReplicas[i].fqdn,
97,128✔
367
          req.learnerReplicas[i].port, req.replicas[i].id);
368
  }
369

370
  SReplica *pReplica = NULL;
2,987,807✔
371
  if (req.selfIndex != -1) {
2,987,807✔
372
    pReplica = &req.replicas[req.selfIndex];
2,890,679✔
373
  } else {
374
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
97,128✔
375
  }
376
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
2,987,807✔
377
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
2,987,807✔
UNCOV
378
    (void)tFreeSCreateVnodeReq(&req);
×
379

380
    code = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
381
    dError("vgId:%d, dnodeId:%d ep:%s:%u in request, ep:%s:%u in local, %s", req.vgId, pReplica->id,
×
382
           pReplica->fqdn, pReplica->port, tsLocalFqdn, tsServerPort, tstrerror(code));
383
    return code;
×
384
  }
385

386
  if (req.encryptAlgrName[0] != '\0') {
2,987,807✔
387
    if (strlen(tsEncryptKey) == 0) {
2,412✔
388
      (void)tFreeSCreateVnodeReq(&req);
766✔
389
      code = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
766✔
390
      dError("vgId:%d, failed to create vnode since encrypt key is empty, reason:%s", req.vgId, tstrerror(code));
766✔
391
      return code;
766✔
392
    }
393
  }
394

395
  vmGenerateVnodeCfg(&req, &vnodeCfg);
2,987,041✔
396

397
  vmGenerateWrapperCfg(pMgmt, &req, &wrapperCfg);
2,978,309✔
398

399
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, req.vgId, false);
2,985,153✔
400
  if (pVnode != NULL && (req.replica == 1 || !pVnode->failed)) {
2,985,628✔
401
    dError("vgId:%d, already exist", req.vgId);
43,184✔
402
    (void)tFreeSCreateVnodeReq(&req);
43,184✔
403
    vmReleaseVnode(pMgmt, pVnode);
43,184✔
404
    code = TSDB_CODE_VND_ALREADY_EXIST;
43,184✔
405
    return 0;
43,184✔
406
  }
407

408
  int32_t diskPrimary = vmGetPrimaryDisk(pMgmt, vnodeCfg.vgId);
2,942,444✔
409
  if (diskPrimary < 0) {
2,941,552✔
410
    diskPrimary = vmAllocPrimaryDisk(pMgmt, vnodeCfg.vgId);
2,941,552✔
411
  }
412
  wrapperCfg.diskPrimary = diskPrimary;
2,943,857✔
413

414
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
2,943,857✔
415

416
  if ((code = vnodeCreate(path, &vnodeCfg, diskPrimary, pMgmt->pTfs)) < 0) {
2,943,857✔
417
    dError("vgId:%d, failed to create vnode since %s", req.vgId, tstrerror(code));
×
418
    vmReleaseVnode(pMgmt, pVnode);
×
419
    vmCleanPrimaryDisk(pMgmt, req.vgId);
×
420
    (void)tFreeSCreateVnodeReq(&req);
×
421
    return code;
×
422
  }
423

424
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, true);
2,943,857✔
425
  if (pImpl == NULL) {
2,943,502✔
426
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
427
    code = terrno != 0 ? terrno : -1;
×
428
    goto _OVER;
×
429
  }
430

431
  code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl);
2,943,502✔
432
  if (code != 0) {
2,943,857✔
433
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
434
    code = terrno != 0 ? terrno : code;
×
435
    goto _OVER;
×
436
  }
437

438
  code = vnodeStart(pImpl);
2,943,857✔
439
  if (code != 0) {
2,943,857✔
440
    dError("vgId:%d, failed to start sync since %s", req.vgId, terrstr());
×
441
    goto _OVER;
×
442
  }
443

444
  code = vmWriteVnodeListToFile(pMgmt);
2,943,857✔
445
  if (code != 0) {
2,943,857✔
446
    code = terrno != 0 ? terrno : code;
×
447
    goto _OVER;
×
448
  }
449

450
_OVER:
2,943,857✔
451
  vmCleanPrimaryDisk(pMgmt, req.vgId);
2,943,857✔
452

453
  if (code != 0) {
2,943,857✔
454
    vmCloseFailedVnode(pMgmt, req.vgId);
×
455

456
    vnodeClose(pImpl);
×
457
    vnodeDestroy(0, path, pMgmt->pTfs, 0);
×
458
  } else {
459
    dInfo("vgId:%d, vnode management handle msgType:%s, end to create vnode, vnode is created", req.vgId,
2,943,857✔
460
          TMSG_INFO(pMsg->msgType));
461
  }
462

463
  (void)tFreeSCreateVnodeReq(&req);
2,943,857✔
464
  terrno = code;
2,943,857✔
465
  return code;
2,943,857✔
466
}
467

468
#ifdef USE_MOUNT
469
typedef struct {
470
  int64_t dbId;
471
  int32_t vgId;
472
  int32_t diskPrimary;
473
} SMountDbVgId;
474
extern int32_t vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo);
475
extern int32_t mndFetchSdbStables(const char *mntName, const char *path, void *output);
476

477
static int compareVnodeInfo(const void *p1, const void *p2) {
6,706✔
478
  SVnodeInfo *v1 = (SVnodeInfo *)p1;
6,706✔
479
  SVnodeInfo *v2 = (SVnodeInfo *)p2;
6,706✔
480

481
  if (v1->config.dbId == v2->config.dbId) {
6,706✔
482
    if (v1->config.vgId == v2->config.vgId) {
3,832✔
483
      return 0;
×
484
    }
485
    return v1->config.vgId > v2->config.vgId ? 1 : -1;
3,832✔
486
  }
487

488
  return v1->config.dbId > v2->config.dbId ? 1 : -1;
2,874✔
489
}
490
static int compareVgDiskPrimary(const void *p1, const void *p2) {
6,706✔
491
  SMountDbVgId *v1 = (SMountDbVgId *)p1;
6,706✔
492
  SMountDbVgId *v2 = (SMountDbVgId *)p2;
6,706✔
493

494
  if (v1->dbId == v2->dbId) {
6,706✔
495
    if (v1->vgId == v2->vgId) {
3,832✔
496
      return 0;
×
497
    }
498
    return v1->vgId > v2->vgId ? 1 : -1;
3,832✔
499
  }
500

501
  return v1->dbId > v2->dbId ? 1 : -1;
2,874✔
502
}
503

504
static int32_t vmRetrieveMountDnode(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
958✔
505
  int32_t   code = 0, lino = 0;
958✔
506
  TdFilePtr pFile = NULL;
958✔
507
  char     *content = NULL;
958✔
508
  SJson    *pJson = NULL;
958✔
509
  int64_t   size = 0;
958✔
510
  int64_t   clusterId = 0, dropped = 0, encryptScope = 0;
958✔
511
  char      file[TSDB_MOUNT_FPATH_LEN] = {0};
958✔
512
  SArray   *pDisks = NULL;
958✔
513
  // step 1: fetch clusterId from dnode.json
514
  (void)snprintf(file, sizeof(file), "%s%s%s%sdnode.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP);
958✔
515
  TAOS_CHECK_EXIT(taosStatFile(file, &size, NULL, NULL));
958✔
516
  TSDB_CHECK_NULL((pFile = taosOpenFile(file, TD_FILE_READ)), code, lino, _exit, terrno);
958✔
517
  TSDB_CHECK_NULL((content = taosMemoryMalloc(size + 1)), code, lino, _exit, terrno);
958✔
518
  if (taosReadFile(pFile, content, size) != size) {
958✔
519
    TAOS_CHECK_EXIT(terrno);
×
520
  }
521
  content[size] = '\0';
958✔
522
  pJson = tjsonParse(content);
958✔
523
  if (pJson == NULL) {
958✔
524
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
525
  }
526
  tjsonGetNumberValue(pJson, "dropped", dropped, code);
958✔
527
  TAOS_CHECK_EXIT(code);
958✔
528
  if (dropped == 1) {
958✔
529
    TAOS_CHECK_EXIT(TSDB_CODE_MND_MOUNT_DNODE_DROPPED);
×
530
  }
531
  tjsonGetNumberValue(pJson, "encryptScope", encryptScope, code);
958✔
532
  TAOS_CHECK_EXIT(code);
958✔
533
  if (encryptScope != 0) {
958✔
534
    TAOS_CHECK_EXIT(TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG);
×
535
  }
536
  tjsonGetNumberValue(pJson, "clusterId", clusterId, code);
958✔
537
  TAOS_CHECK_EXIT(code);
958✔
538
  if (clusterId == 0) {
958✔
539
    TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_CLUSTER_ID);
×
540
  }
541
  pMountInfo->clusterId = clusterId;
958✔
542
  if (content != NULL) taosMemoryFreeClear(content);
958✔
543
  if (pJson != NULL) {
958✔
544
    cJSON_Delete(pJson);
958✔
545
    pJson = NULL;
958✔
546
  }
547
  if (pFile != NULL) taosCloseFile(&pFile);
958✔
548
  // step 2: fetch dataDir from dnode/config/local.json
549
  TAOS_CHECK_EXIT(vmGetMountDisks(pMgmt, pReq->mountPath, &pDisks));
958✔
550
  int32_t nDisks = taosArrayGetSize(pDisks);
958✔
551
  if (nDisks < 1 || nDisks > TFS_MAX_DISKS) {
958✔
552
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
553
  }
554
  for (int32_t i = 0; i < nDisks; ++i) {
6,706✔
555
    SDiskCfg *pDisk = TARRAY_GET_ELEM(pDisks, i);
5,748✔
556
    if (!pMountInfo->pDisks[pDisk->level]) {
5,748✔
557
      pMountInfo->pDisks[pDisk->level] = taosArrayInit(1, sizeof(char *));
1,916✔
558
      if (!pMountInfo->pDisks[pDisk->level]) {
1,916✔
559
        TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
560
      }
561
    }
562
    char *pDir = taosStrdup(pDisk->dir);
5,748✔
563
    if (pDir == NULL) {
5,748✔
564
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
565
    }
566
    if (pDisk->primary == 1 && taosArrayGetSize(pMountInfo->pDisks[0])) {
5,748✔
567
      // put the primary disk to the first position of level 0
568
      if (!taosArrayInsert(pMountInfo->pDisks[0], 0, &pDir)) {
958✔
569
        taosMemFree(pDir);
×
570
        TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
571
      }
572
    } else if (!taosArrayPush(pMountInfo->pDisks[pDisk->level], &pDir)) {
9,580✔
573
      taosMemFree(pDir);
×
574
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
575
    }
576
  }
577
  for (int32_t i = 0; i < TFS_MAX_TIERS; ++i) {
3,832✔
578
    int32_t nDisk = taosArrayGetSize(pMountInfo->pDisks[i]);
2,874✔
579
    if (nDisk < (i == 0 ? 1 : 0) || nDisk > TFS_MAX_DISKS_PER_TIER) {
2,874✔
580
      dError("mount:%s, invalid disk number:%d at level:%d", pReq->mountName, nDisk, i);
×
581
      TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
582
    }
583
  }
584
_exit:
958✔
585
  if (content != NULL) taosMemoryFreeClear(content);
958✔
586
  if (pJson != NULL) cJSON_Delete(pJson);
958✔
587
  if (pFile != NULL) taosCloseFile(&pFile);
958✔
588
  if (pDisks != NULL) {
958✔
589
    taosArrayDestroy(pDisks);
958✔
590
    pDisks = NULL;
958✔
591
  }
592
  if (code != 0) {
958✔
593
    dError("mount:%s, failed to retrieve mount dnode at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
594
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
595
  } else {
596
    dInfo("mount:%s, success to retrieve mount dnode on dnode:%d, clusterId:%" PRId64 ", path:%s", pReq->mountName,
958✔
597
          pReq->dnodeId, pMountInfo->clusterId, pReq->mountPath);
598
  }
599
  TAOS_RETURN(code);
958✔
600
}
601

602
static int32_t vmRetrieveMountVnodes(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
958✔
603
  int32_t       code = 0, lino = 0;
958✔
604
  SWrapperCfg  *pCfgs = NULL;
958✔
605
  int32_t       numOfVnodes = 0;
958✔
606
  char          path[TSDB_MOUNT_FPATH_LEN] = {0};
958✔
607
  TdDirPtr      pDir = NULL;
958✔
608
  TdDirEntryPtr de = NULL;
958✔
609
  SVnodeMgmt    vnodeMgmt = {0};
958✔
610
  SArray       *pVgCfgs = NULL;
958✔
611
  SArray       *pDbInfos = NULL;
958✔
612
  SArray       *pDiskPrimarys = NULL;
958✔
613

614
  snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
958✔
615
  vnodeMgmt.path = path;
958✔
616
  TAOS_CHECK_EXIT(vmGetVnodeListFromFile(&vnodeMgmt, &pCfgs, &numOfVnodes));
958✔
617
  dInfo("mount:%s, num of vnodes is %d in path:%s", pReq->mountName, numOfVnodes, vnodeMgmt.path);
958✔
618
  TSDB_CHECK_NULL((pVgCfgs = taosArrayInit_s(sizeof(SVnodeInfo), numOfVnodes)), code, lino, _exit, terrno);
958✔
619
  TSDB_CHECK_NULL((pDiskPrimarys = taosArrayInit(numOfVnodes, sizeof(SMountDbVgId))), code, lino, _exit, terrno);
958✔
620

621
  int32_t nDiskLevel0 = taosArrayGetSize(pMountInfo->pDisks[0]);
958✔
622
  int32_t nVgDropped = 0, j = 0;
958✔
623
  for (int32_t i = 0; i < numOfVnodes; ++i) {
4,790✔
624
    SWrapperCfg *pCfg = &pCfgs[i];
3,832✔
625
    // in order to support multi-tier disk, the pCfg->path should be adapted according to the diskPrimary firstly
626
    if (nDiskLevel0 > 1) {
3,832✔
627
      char *pDir = taosArrayGet(pMountInfo->pDisks[0], pCfg->diskPrimary);
3,832✔
628
      if (!pDir) TAOS_CHECK_EXIT(TSDB_CODE_INTERNAL_ERROR);
3,832✔
629
      (void)snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%svnode%d", *(char **)pDir, TD_DIRSEP, TD_DIRSEP,
3,832✔
630
                     pCfg->vgId);
631
    }
632
    dInfo("mount:%s, vnode path:%s, dropped:%" PRIi8, pReq->mountName, pCfg->path, pCfg->dropped);
3,832✔
633
    if (pCfg->dropped) {
3,832✔
634
      ++nVgDropped;
×
635
      continue;
×
636
    }
637
    if (!taosCheckAccessFile(pCfg->path, TD_FILE_ACCESS_EXIST_OK | TD_FILE_ACCESS_READ_OK | TD_FILE_ACCESS_WRITE_OK)) {
3,832✔
638
      dError("mount:%s, vnode path:%s, no r/w authority", pReq->mountName, pCfg->path);
×
639
      TAOS_CHECK_EXIT(TSDB_CODE_MND_NO_RIGHTS);
×
640
    }
641
    SVnodeInfo *pInfo = TARRAY_GET_ELEM(pVgCfgs, j++);
3,832✔
642
    TAOS_CHECK_EXIT(vnodeLoadInfo(pCfg->path, pInfo));
3,832✔
643
    if (pInfo->config.syncCfg.replicaNum > 1) {
3,832✔
644
      dError("mount:%s, vnode path:%s, invalid replica:%d", pReq->mountName, pCfg->path,
×
645
             pInfo->config.syncCfg.replicaNum);
646
      TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_REPLICA);
×
647
    } else if (pInfo->config.vgId != pCfg->vgId) {
3,832✔
648
      dError("mount:%s, vnode path:%s, vgId:%d not match:%d", pReq->mountName, pCfg->path, pInfo->config.vgId,
×
649
             pCfg->vgId);
650
      TAOS_CHECK_EXIT(TSDB_CODE_FILE_CORRUPTED);
×
651
    } else if (pInfo->config.tdbEncryptData.encryptAlgrName[0] != '\0' ||
3,832✔
652
               pInfo->config.tsdbCfg.encryptData.encryptAlgrName[0] != '\0' ||
3,832✔
653
               pInfo->config.walCfg.encryptData.encryptAlgrName[0] != '\0') {
3,832✔
654
      dError("mount:%s, vnode path:%s, invalid encrypt algorithm, tdb:%s wal:%s tsdb:%s", pReq->mountName, pCfg->path,
×
655
             pInfo->config.tdbEncryptData.encryptAlgrName, pInfo->config.walCfg.encryptData.encryptAlgrName,
656
             pInfo->config.tsdbCfg.encryptData.encryptAlgrName);
657
      TAOS_CHECK_EXIT(TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG);
×
658
    }
659
    SMountDbVgId dbVgId = {.dbId = pInfo->config.dbId, .vgId = pInfo->config.vgId, .diskPrimary = pCfg->diskPrimary};
3,832✔
660
    TSDB_CHECK_NULL(taosArrayPush(pDiskPrimarys, &dbVgId), code, lino, _exit, terrno);
3,832✔
661
  }
662
  if (nVgDropped > 0) {
958✔
663
    dInfo("mount:%s, %d vnodes are dropped", pReq->mountName, nVgDropped);
×
664
    int32_t nVgToDrop = taosArrayGetSize(pVgCfgs) - nVgDropped;
×
665
    if (nVgToDrop > 0) taosArrayRemoveBatch(pVgCfgs, nVgToDrop - 1, nVgToDrop, NULL);
×
666
  }
667
  int32_t nVgCfg = taosArrayGetSize(pVgCfgs);
958✔
668
  int32_t nDiskPrimary = taosArrayGetSize(pDiskPrimarys);
958✔
669
  if (nVgCfg != nDiskPrimary) {
958✔
670
    dError("mount:%s, nVgCfg:%d not match nDiskPrimary:%d", pReq->mountName, nVgCfg, nDiskPrimary);
×
671
    TAOS_CHECK_EXIT(TSDB_CODE_APP_ERROR);
×
672
  }
673
  if (nVgCfg > 1) {
958✔
674
    taosArraySort(pVgCfgs, compareVnodeInfo);
958✔
675
    taosArraySort(pDiskPrimarys, compareVgDiskPrimary);
958✔
676
  }
677

678
  int64_t clusterId = pMountInfo->clusterId;
958✔
679
  int64_t dbId = 0, vgId = 0, nDb = 0;
958✔
680
  for (int32_t i = 0; i < nVgCfg; ++i) {
3,506✔
681
    SVnodeInfo *pInfo = TARRAY_GET_ELEM(pVgCfgs, i);
2,869✔
682
    if (clusterId != pInfo->config.syncCfg.nodeInfo->clusterId) {
2,869✔
683
      dError("mount:%s, clusterId:%" PRId64 " not match:%" PRId64, pReq->mountName, clusterId,
321✔
684
             pInfo->config.syncCfg.nodeInfo->clusterId);
685
      TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_CLUSTER_ID);
321✔
686
    }
687
    if (dbId != pInfo->config.dbId) {
2,548✔
688
      dbId = pInfo->config.dbId;
1,274✔
689
      ++nDb;
1,274✔
690
    }
691
    if (vgId == pInfo->config.vgId) {
2,548✔
692
      TAOS_CHECK_EXIT(TSDB_CODE_FILE_CORRUPTED);
×
693
    } else {
694
      vgId = pInfo->config.vgId;
2,548✔
695
    }
696
  }
697

698
  if (nDb > 0) {
637✔
699
    TSDB_CHECK_NULL((pDbInfos = taosArrayInit_s(sizeof(SMountDbInfo), nDb)), code, lino, _exit, terrno);
637✔
700
    int32_t dbIdx = -1;
637✔
701
    for (int32_t i = 0; i < nVgCfg; ++i) {
3,185✔
702
      SVnodeInfo   *pVgCfg = TARRAY_GET_ELEM(pVgCfgs, i);
2,548✔
703
      SMountDbVgId *pDiskPrimary = TARRAY_GET_ELEM(pDiskPrimarys, i);
2,548✔
704
      SMountDbInfo *pDbInfo = NULL;
2,548✔
705
      if (i == 0 || ((SMountDbInfo *)TARRAY_GET_ELEM(pDbInfos, dbIdx))->dbId != pVgCfg->config.dbId) {
2,548✔
706
        pDbInfo = TARRAY_GET_ELEM(pDbInfos, ++dbIdx);
1,274✔
707
        pDbInfo->dbId = pVgCfg->config.dbId;
1,274✔
708
        snprintf(pDbInfo->dbName, sizeof(pDbInfo->dbName), "%s", pVgCfg->config.dbname);
1,274✔
709
        TSDB_CHECK_NULL((pDbInfo->pVgs = taosArrayInit(nVgCfg / nDb, sizeof(SMountVgInfo))), code, lino, _exit, terrno);
1,274✔
710
      } else {
711
        pDbInfo = TARRAY_GET_ELEM(pDbInfos, dbIdx);
1,274✔
712
      }
713
      SMountVgInfo vgInfo = {
2,548✔
714
          .diskPrimary = pDiskPrimary->diskPrimary,
2,548✔
715
          .vgId = pVgCfg->config.vgId,
2,548✔
716
          .dbId = pVgCfg->config.dbId,
2,548✔
717
          .cacheLastSize = pVgCfg->config.cacheLastSize,
2,548✔
718
          .szPage = pVgCfg->config.szPage,
2,548✔
719
          .szCache = pVgCfg->config.szCache,
2,548✔
720
          .szBuf = pVgCfg->config.szBuf,
2,548✔
721
          .cacheLast = pVgCfg->config.cacheLast,
2,548✔
722
          .standby = pVgCfg->config.standby,
2,548✔
723
          .hashMethod = pVgCfg->config.hashMethod,
2,548✔
724
          .hashBegin = pVgCfg->config.hashBegin,
2,548✔
725
          .hashEnd = pVgCfg->config.hashEnd,
2,548✔
726
          .hashPrefix = pVgCfg->config.hashPrefix,
2,548✔
727
          .hashSuffix = pVgCfg->config.hashSuffix,
2,548✔
728
          .sttTrigger = pVgCfg->config.sttTrigger,
2,548✔
729
          .replications = pVgCfg->config.syncCfg.replicaNum,
2,548✔
730
          .precision = pVgCfg->config.tsdbCfg.precision,
2,548✔
731
          .compression = pVgCfg->config.tsdbCfg.compression,
2,548✔
732
          .slLevel = pVgCfg->config.tsdbCfg.slLevel,
2,548✔
733
          .daysPerFile = pVgCfg->config.tsdbCfg.days,
2,548✔
734
          .keep0 = pVgCfg->config.tsdbCfg.keep0,
2,548✔
735
          .keep1 = pVgCfg->config.tsdbCfg.keep1,
2,548✔
736
          .keep2 = pVgCfg->config.tsdbCfg.keep2,
2,548✔
737
          .keepTimeOffset = pVgCfg->config.tsdbCfg.keepTimeOffset,
2,548✔
738
          .minRows = pVgCfg->config.tsdbCfg.minRows,
2,548✔
739
          .maxRows = pVgCfg->config.tsdbCfg.maxRows,
2,548✔
740
          .tsdbPageSize = pVgCfg->config.tsdbPageSize / 1024,
2,548✔
741
          .ssChunkSize = pVgCfg->config.ssChunkSize,
2,548✔
742
          .ssKeepLocal = pVgCfg->config.ssKeepLocal,
2,548✔
743
          .ssCompact = pVgCfg->config.ssCompact,
2,548✔
744
          .walFsyncPeriod = pVgCfg->config.walCfg.fsyncPeriod,
2,548✔
745
          .walRetentionPeriod = pVgCfg->config.walCfg.retentionPeriod,
2,548✔
746
          .walRollPeriod = pVgCfg->config.walCfg.rollPeriod,
2,548✔
747
          .walRetentionSize = pVgCfg->config.walCfg.retentionSize,
2,548✔
748
          .walSegSize = pVgCfg->config.walCfg.segSize,
2,548✔
749
          .walLevel = pVgCfg->config.walCfg.level,
2,548✔
750
          //.encryptAlgorithm = pVgCfg->config.walCfg.encryptAlgorithm,
751
          .committed = pVgCfg->state.committed,
2,548✔
752
          .commitID = pVgCfg->state.commitID,
2,548✔
753
          .commitTerm = pVgCfg->state.commitTerm,
2,548✔
754
          .numOfSTables = pVgCfg->config.vndStats.numOfSTables,
2,548✔
755
          .numOfCTables = pVgCfg->config.vndStats.numOfCTables,
2,548✔
756
          .numOfNTables = pVgCfg->config.vndStats.numOfNTables,
2,548✔
757
      };
758
      TSDB_CHECK_NULL(taosArrayPush(pDbInfo->pVgs, &vgInfo), code, lino, _exit, terrno);
5,096✔
759
    }
760
  }
761

762
  pMountInfo->pDbs = pDbInfos;
637✔
763

764
_exit:
958✔
765
  if (code != 0) {
958✔
766
    dError("mount:%s, failed to retrieve mount vnode at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
321✔
767
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
768
  }
769
  taosArrayDestroy(pDiskPrimarys);
958✔
770
  taosArrayDestroy(pVgCfgs);
958✔
771
  taosMemoryFreeClear(pCfgs);
958✔
772
  TAOS_RETURN(code);
958✔
773
}
774

775
/**
776
 *   Retrieve the stables from vnode meta.
777
 */
778
static int32_t vmRetrieveMountStbs(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
637✔
779
  int32_t code = 0, lino = 0;
637✔
780
  char    path[TSDB_MOUNT_FPATH_LEN] = {0};
637✔
781
  int32_t nDb = taosArrayGetSize(pMountInfo->pDbs);
637✔
782
  SArray *suidList = NULL;
637✔
783
  SArray *pCols = NULL;
637✔
784
  SArray *pTags = NULL;
637✔
785
  SArray *pColExts = NULL;
637✔
786
  SArray *pTagExts = NULL;
637✔
787

788
  snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
637✔
789
  for (int32_t i = 0; i < nDb; ++i) {
1,911✔
790
    SMountDbInfo *pDbInfo = TARRAY_GET_ELEM(pMountInfo->pDbs, i);
1,274✔
791
    int32_t       nVg = taosArrayGetSize(pDbInfo->pVgs);
1,274✔
792
    for (int32_t j = 0; j < nVg; ++j) {
1,274✔
793
      SMountVgInfo *pVgInfo = TARRAY_GET_ELEM(pDbInfo->pVgs, j);
1,274✔
794
      SVnode        vnode = {
1,274✔
795
                 .config.vgId = pVgInfo->vgId,
1,274✔
796
                 .config.dbId = pVgInfo->dbId,
1,274✔
797
                 .config.cacheLastSize = pVgInfo->cacheLastSize,
1,274✔
798
                 .config.szPage = pVgInfo->szPage,
1,274✔
799
                 .config.szCache = pVgInfo->szCache,
1,274✔
800
                 .config.szBuf = pVgInfo->szBuf,
1,274✔
801
                 .config.cacheLast = pVgInfo->cacheLast,
1,274✔
802
                 .config.standby = pVgInfo->standby,
1,274✔
803
                 .config.hashMethod = pVgInfo->hashMethod,
1,274✔
804
                 .config.hashBegin = pVgInfo->hashBegin,
1,274✔
805
                 .config.hashEnd = pVgInfo->hashEnd,
1,274✔
806
                 .config.hashPrefix = pVgInfo->hashPrefix,
1,274✔
807
                 .config.hashSuffix = pVgInfo->hashSuffix,
1,274✔
808
                 .config.sttTrigger = pVgInfo->sttTrigger,
1,274✔
809
                 .config.syncCfg.replicaNum = pVgInfo->replications,
1,274✔
810
                 .config.tsdbCfg.precision = pVgInfo->precision,
1,274✔
811
                 .config.tsdbCfg.compression = pVgInfo->compression,
1,274✔
812
                 .config.tsdbCfg.slLevel = pVgInfo->slLevel,
1,274✔
813
                 .config.tsdbCfg.days = pVgInfo->daysPerFile,
1,274✔
814
                 .config.tsdbCfg.keep0 = pVgInfo->keep0,
1,274✔
815
                 .config.tsdbCfg.keep1 = pVgInfo->keep1,
1,274✔
816
                 .config.tsdbCfg.keep2 = pVgInfo->keep2,
1,274✔
817
                 .config.tsdbCfg.keepTimeOffset = pVgInfo->keepTimeOffset,
1,274✔
818
                 .config.tsdbCfg.minRows = pVgInfo->minRows,
1,274✔
819
                 .config.tsdbCfg.maxRows = pVgInfo->maxRows,
1,274✔
820
                 .config.tsdbPageSize = pVgInfo->tsdbPageSize,
1,274✔
821
                 .config.ssChunkSize = pVgInfo->ssChunkSize,
1,274✔
822
                 .config.ssKeepLocal = pVgInfo->ssKeepLocal,
1,274✔
823
                 .config.ssCompact = pVgInfo->ssCompact,
1,274✔
824
                 .config.walCfg.fsyncPeriod = pVgInfo->walFsyncPeriod,
1,274✔
825
                 .config.walCfg.retentionPeriod = pVgInfo->walRetentionPeriod,
1,274✔
826
                 .config.walCfg.rollPeriod = pVgInfo->walRollPeriod,
1,274✔
827
                 .config.walCfg.retentionSize = pVgInfo->walRetentionSize,
1,274✔
828
                 .config.walCfg.segSize = pVgInfo->walSegSize,
1,274✔
829
                 .config.walCfg.level = pVgInfo->walLevel,
1,274✔
830
          //.config.walCfg.encryptAlgorithm = pVgInfo->encryptAlgorithm,
831
                 .diskPrimary = pVgInfo->diskPrimary,
1,274✔
832
      };
833
      void *vnodePath = taosArrayGet(pMountInfo->pDisks[0], pVgInfo->diskPrimary);
1,274✔
834
      snprintf(path, sizeof(path), "%s%s%s%svnode%d", *(char **)vnodePath, TD_DIRSEP, dmNodeName(VNODE), TD_DIRSEP,
1,274✔
835
               pVgInfo->vgId);
836
      vnode.path = path;
1,274✔
837

838
      int32_t rollback = vnodeShouldRollback(&vnode);
1,274✔
839
      if ((code = metaOpen(&vnode, &vnode.pMeta, rollback)) != 0) {
1,274✔
840
        dError("mount:%s, failed to retrieve stbs of vnode:%d for db:%" PRId64 " on dnode:%d since %s, path:%s",
×
841
               pReq->mountName, pVgInfo->vgId, pVgInfo->dbId, pReq->dnodeId, tstrerror(code), path);
842
        TAOS_CHECK_EXIT(code);
×
843
      } else {
844
        dInfo("mount:%s, success to retrieve stbs of vnode:%d for db:%" PRId64 " on dnode:%d, path:%s", pReq->mountName,
1,274✔
845
              pVgInfo->vgId, pVgInfo->dbId, pReq->dnodeId, path);
846

847
        SMetaReader mr = {0};
1,274✔
848
        tb_uid_t    suid = 0;
1,274✔
849
        SMeta      *pMeta = vnode.pMeta;
1,274✔
850

851
        metaReaderDoInit(&mr, pMeta, META_READER_LOCK);
1,274✔
852
        if (!suidList && !(suidList = taosArrayInit(1, sizeof(tb_uid_t)))) {
1,274✔
853
          TSDB_CHECK_CODE(terrno, lino, _exit0);
×
854
        }
855
        taosArrayClear(suidList);
1,274✔
856
        TSDB_CHECK_CODE(vnodeGetStbIdList(&vnode, 0, suidList), lino, _exit0);
1,274✔
857
        dInfo("mount:%s, vnode:%d, db:%" PRId64 ", stbs num:%d on dnode:%d", pReq->mountName, pVgInfo->vgId,
1,274✔
858
              pVgInfo->dbId, (int32_t)taosArrayGetSize(suidList), pReq->dnodeId);
859
        int32_t nStbs = taosArrayGetSize(suidList);
1,274✔
860
        if (!pDbInfo->pStbs && !(pDbInfo->pStbs = taosArrayInit(nStbs, sizeof(void *)))) {
1,274✔
861
          TSDB_CHECK_CODE(terrno, lino, _exit0);
×
862
        }
863
        for (int32_t i = 0; i < nStbs; ++i) {
5,096✔
864
          suid = *(tb_uid_t *)taosArrayGet(suidList, i);
3,822✔
865
          dInfo("mount:%s, vnode:%d, db:%" PRId64 ", stb suid:%" PRIu64 " on dnode:%d", pReq->mountName, pVgInfo->vgId,
3,822✔
866
                pVgInfo->dbId, suid, pReq->dnodeId);
867
          if ((code = metaReaderGetTableEntryByUidCache(&mr, suid)) < 0) {
3,822✔
868
            TSDB_CHECK_CODE(code, lino, _exit0);
×
869
          }
870
          if (mr.me.uid != suid || mr.me.type != TSDB_SUPER_TABLE ||
3,822✔
871
              mr.me.colCmpr.nCols != mr.me.stbEntry.schemaRow.nCols) {
3,822✔
872
            dError("mount:%s, vnode:%d, db:%" PRId64 ", stb info not match, suid:%" PRIu64 " expected:%" PRIu64
×
873
                   ", type:%" PRIi8 " expected:%d, nCmprCols:%d nCols:%d on dnode:%d",
874
                   pReq->mountName, pVgInfo->vgId, pVgInfo->dbId, mr.me.uid, suid, mr.me.type, TSDB_SUPER_TABLE,
875
                   mr.me.colCmpr.nCols, mr.me.stbEntry.schemaRow.nCols, pReq->dnodeId);
876
            TSDB_CHECK_CODE(TSDB_CODE_FILE_CORRUPTED, lino, _exit0);
×
877
          }
878
          SMountStbInfo stbInfo = {
3,822✔
879
              .req.source = TD_REQ_FROM_APP,
880
              .req.suid = suid,
881
              .req.colVer = mr.me.stbEntry.schemaRow.version,
3,822✔
882
              .req.tagVer = mr.me.stbEntry.schemaTag.version,
3,822✔
883
              .req.numOfColumns = mr.me.stbEntry.schemaRow.nCols,
3,822✔
884
              .req.numOfTags = mr.me.stbEntry.schemaTag.nCols,
3,822✔
885
              .req.virtualStb = TABLE_IS_VIRTUAL(mr.me.flags) ? 1 : 0,
3,822✔
886
          };
887
          snprintf(stbInfo.req.name, sizeof(stbInfo.req.name), "%s", mr.me.name);
3,822✔
888
          if (!pCols && !(pCols = taosArrayInit(stbInfo.req.numOfColumns, sizeof(SFieldWithOptions)))) {
3,822✔
889
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
890
          }
891
          if (!pTags && !(pTags = taosArrayInit(stbInfo.req.numOfTags, sizeof(SField)))) {
3,822✔
892
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
893
          }
894

895
          if (!pColExts && !(pColExts = taosArrayInit(stbInfo.req.numOfColumns, sizeof(col_id_t)))) {
3,822✔
896
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
897
          }
898
          if (!pTagExts && !(pTagExts = taosArrayInit(stbInfo.req.numOfTags, sizeof(col_id_t)))) {
3,822✔
899
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
900
          }
901
          taosArrayClear(pCols);
3,822✔
902
          taosArrayClear(pTags);
3,822✔
903
          taosArrayClear(pColExts);
3,822✔
904
          taosArrayClear(pTagExts);
3,822✔
905
          stbInfo.req.pColumns = pCols;
3,822✔
906
          stbInfo.req.pTags = pTags;
3,822✔
907
          stbInfo.pColExts = pColExts;
3,822✔
908
          stbInfo.pTagExts = pTagExts;
3,822✔
909

910
          for (int32_t c = 0; c < stbInfo.req.numOfColumns; ++c) {
22,932✔
911
            SSchema          *pSchema = mr.me.stbEntry.schemaRow.pSchema + c;
19,110✔
912
            SColCmpr         *pColComp = mr.me.colCmpr.pColCmpr + c;
19,110✔
913
            SFieldWithOptions col = {
19,110✔
914
                .type = pSchema->type,
19,110✔
915
                .flags = pSchema->flags,
19,110✔
916
                .bytes = pSchema->bytes,
19,110✔
917
                .compress = pColComp->alg,
19,110✔
918
            };
919
            (void)snprintf(col.name, sizeof(col.name), "%s", pSchema->name);
19,110✔
920
            if (pSchema->colId != pColComp->id) {
19,110✔
921
              TSDB_CHECK_CODE(TSDB_CODE_FILE_CORRUPTED, lino, _exit0);
×
922
            }
923
            if (mr.me.pExtSchemas) {
19,110✔
924
              col.typeMod = (mr.me.pExtSchemas + c)->typeMod;
×
925
            }
926
            TSDB_CHECK_NULL(taosArrayPush(pCols, &col), code, lino, _exit0, terrno);
19,110✔
927
            TSDB_CHECK_NULL(taosArrayPush(pColExts, &pSchema->colId), code, lino, _exit0, terrno);
38,220✔
928
          }
929
          for (int32_t t = 0; t < stbInfo.req.numOfTags; ++t) {
8,918✔
930
            SSchema *pSchema = mr.me.stbEntry.schemaTag.pSchema + t;
5,096✔
931
            SField   tag = {
5,096✔
932
                  .type = pSchema->type,
5,096✔
933
                  .flags = pSchema->flags,
5,096✔
934
                  .bytes = pSchema->bytes,
5,096✔
935
            };
936
            (void)snprintf(tag.name, sizeof(tag.name), "%s", pSchema->name);
5,096✔
937
            TSDB_CHECK_NULL(taosArrayPush(pTags, &tag), code, lino, _exit0, terrno);
5,096✔
938
            TSDB_CHECK_NULL(taosArrayPush(pTagExts, &pSchema->colId), code, lino, _exit0, terrno);
10,192✔
939
          }
940
          tDecoderClear(&mr.coder);
3,822✔
941

942
          // serialize the SMountStbInfo
943
          int32_t firstPartLen = 0;
3,822✔
944
          int32_t msgLen = tSerializeSMountStbInfo(NULL, 0, &firstPartLen, &stbInfo);
3,822✔
945
          if (msgLen <= 0) {
3,822✔
946
            TSDB_CHECK_CODE(msgLen < 0 ? msgLen : TSDB_CODE_INTERNAL_ERROR, lino, _exit0);
×
947
          }
948
          void *pBuf = taosMemoryMalloc((sizeof(int32_t) << 1) + msgLen);  // totalLen(4)|1stPartLen(4)|1stPart|2ndPart
3,822✔
949
          if (!pBuf) TSDB_CHECK_CODE(TSDB_CODE_OUT_OF_MEMORY, lino, _exit0);
3,822✔
950
          *(int32_t *)pBuf = (sizeof(int32_t) << 1) + msgLen;
3,822✔
951
          *(int32_t *)POINTER_SHIFT(pBuf, sizeof(int32_t)) = firstPartLen;
3,822✔
952
          if (tSerializeSMountStbInfo(POINTER_SHIFT(pBuf, (sizeof(int32_t) << 1)), msgLen, NULL, &stbInfo) <= 0) {
3,822✔
953
            taosMemoryFree(pBuf);
×
954
            TSDB_CHECK_CODE(msgLen < 0 ? msgLen : TSDB_CODE_INTERNAL_ERROR, lino, _exit0);
×
955
          }
956
          if (!taosArrayPush(pDbInfo->pStbs, &pBuf)) {
7,644✔
957
            taosMemoryFree(pBuf);
×
958
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
959
          }
960
        }
961
      _exit0:
1,274✔
962
        metaReaderClear(&mr);
1,274✔
963
        metaClose(&vnode.pMeta);
1,274✔
964
        TAOS_CHECK_EXIT(code);
1,274✔
965
      }
966
      break;  // retrieve stbs from one vnode is enough
1,274✔
967
    }
968
  }
969
_exit:
637✔
970
  if (code != 0) {
637✔
971
    dError("mount:%s, failed to retrieve mount stbs at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
972
           pReq->dnodeId, tstrerror(code), path);
973
  }
974
  taosArrayDestroy(suidList);
637✔
975
  taosArrayDestroy(pCols);
637✔
976
  taosArrayDestroy(pTags);
637✔
977
  taosArrayDestroy(pColExts);
637✔
978
  taosArrayDestroy(pTagExts);
637✔
979
  TAOS_RETURN(code);
637✔
980
}
981

982
int32_t vmMountCheckRunning(const char *mountName, const char *mountPath, TdFilePtr *pFile, int32_t retryLimit) {
2,872✔
983
  int32_t code = 0, lino = 0;
2,872✔
984
  int32_t retryTimes = 0;
2,872✔
985
  char    filepath[PATH_MAX] = {0};
2,872✔
986
  (void)snprintf(filepath, sizeof(filepath), "%s%s.running", mountPath, TD_DIRSEP);
2,872✔
987
  TSDB_CHECK_NULL((*pFile = taosOpenFile(
2,872✔
988
                       filepath, TD_FILE_CREATE | TD_FILE_READ | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_CLOEXEC)),
989
                  code, lino, _exit, terrno);
990
  int32_t ret = 0;
2,872✔
991
  do {
992
    ret = taosLockFile(*pFile);
3,514✔
993
    if (ret == 0) break;
3,514✔
994
    taosMsleep(1000);
963✔
995
    ++retryTimes;
963✔
996
    dError("mount:%s, failed to lock file:%s since %s, retryTimes:%d", mountName, filepath, tstrerror(ret), retryTimes);
963✔
997
  } while (retryTimes < retryLimit);
963✔
998
  TAOS_CHECK_EXIT(ret);
2,872✔
999
_exit:
2,872✔
1000
  if (code != 0) {
2,872✔
1001
    (void)taosCloseFile(pFile);
321✔
1002
    *pFile = NULL;
321✔
1003
    dError("mount:%s, failed to check running at line %d since %s, path:%s", mountName, lino, tstrerror(code),
321✔
1004
           filepath);
1005
  }
1006
  TAOS_RETURN(code);
2,872✔
1007
}
1008

1009
static int32_t vmRetrieveMountPreCheck(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
1,921✔
1010
  int32_t code = 0, lino = 0;
1,921✔
1011
  char    path[TSDB_MOUNT_FPATH_LEN] = {0};
1,921✔
1012
  TSDB_CHECK_CONDITION(taosCheckAccessFile(pReq->mountPath, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
1,921✔
1013
  TAOS_CHECK_EXIT(vmMountCheckRunning(pReq->mountName, pReq->mountPath, &pMountInfo->pFile, 3));
1,600✔
1014
  (void)snprintf(path, sizeof(path), "%s%s%s%sdnode.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP);
1,279✔
1015
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
1,279✔
1016
  (void)snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(MNODE));
958✔
1017
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
958✔
1018
  (void)snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
958✔
1019
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
958✔
1020
  (void)snprintf(path, sizeof(path), "%s%s%s%sconfig%slocal.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP,
958✔
1021
           TD_DIRSEP);
1022
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
958✔
1023
_exit:
1,921✔
1024
  if (code != 0) {
1,921✔
1025
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
963✔
1026
           pReq->dnodeId, tstrerror(code), path);
1027
  }
1028
  TAOS_RETURN(code);
1,921✔
1029
}
1030

1031
static int32_t vmRetrieveMountPathImpl(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, SRetrieveMountPathReq *pReq,
1,921✔
1032
                                       SMountInfo *pMountInfo) {
1033
  int32_t code = 0, lino = 0;
1,921✔
1034
  pMountInfo->dnodeId = pReq->dnodeId;
1,921✔
1035
  pMountInfo->mountUid = pReq->mountUid;
1,921✔
1036
  (void)tsnprintf(pMountInfo->mountName, sizeof(pMountInfo->mountName), "%s", pReq->mountName);
1,921✔
1037
  (void)tsnprintf(pMountInfo->mountPath, sizeof(pMountInfo->mountPath), "%s", pReq->mountPath);
1,921✔
1038
  pMountInfo->ignoreExist = pReq->ignoreExist;
1,921✔
1039
  pMountInfo->valLen = pReq->valLen;
1,921✔
1040
  pMountInfo->pVal = pReq->pVal;
1,921✔
1041
  TAOS_CHECK_EXIT(vmRetrieveMountPreCheck(pMgmt, pReq, pMountInfo));
1,921✔
1042
  TAOS_CHECK_EXIT(vmRetrieveMountDnode(pMgmt, pReq, pMountInfo));
958✔
1043
  TAOS_CHECK_EXIT(vmRetrieveMountVnodes(pMgmt, pReq, pMountInfo));
958✔
1044
  TAOS_CHECK_EXIT(vmRetrieveMountStbs(pMgmt, pReq, pMountInfo));
637✔
1045
_exit:
637✔
1046
  if (code != 0) {
1,921✔
1047
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
1,284✔
1048
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
1049
  }
1050
  TAOS_RETURN(code);
1,921✔
1051
}
1052

1053
int32_t vmProcessRetrieveMountPathReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
1,921✔
1054
  int32_t               code = 0, lino = 0;
1,921✔
1055
  int32_t               rspCode = 0;
1,921✔
1056
  SVnodeMgmt            vndMgmt = {0};
1,921✔
1057
  SMountInfo            mountInfo = {0};
1,921✔
1058
  void                 *pBuf = NULL;
1,921✔
1059
  int32_t               bufLen = 0;
1,921✔
1060
  SRetrieveMountPathReq req = {0};
1,921✔
1061

1062
  vndMgmt = *pMgmt;
1,921✔
1063
  vndMgmt.path = NULL;
1,921✔
1064
  TAOS_CHECK_GOTO(tDeserializeSRetrieveMountPathReq(pMsg->pCont, pMsg->contLen, &req), &lino, _end);
1,921✔
1065
  dInfo("mount:%s, start to retrieve path:%s", req.mountName, req.mountPath);
1,921✔
1066
  TAOS_CHECK_GOTO(vmRetrieveMountPathImpl(&vndMgmt, pMsg, &req, &mountInfo), &lino, _end);
1,921✔
1067
_end:
1,921✔
1068
  TSDB_CHECK_CONDITION((bufLen = tSerializeSMountInfo(NULL, 0, &mountInfo)) >= 0, rspCode, lino, _exit, bufLen);
1,921✔
1069
  TSDB_CHECK_CONDITION((pBuf = rpcMallocCont(bufLen)), rspCode, lino, _exit, terrno);
1,921✔
1070
  TSDB_CHECK_CONDITION((bufLen = tSerializeSMountInfo(pBuf, bufLen, &mountInfo)) >= 0, rspCode, lino, _exit, bufLen);
1,921✔
1071
  pMsg->info.rsp = pBuf;
1,921✔
1072
  pMsg->info.rspLen = bufLen;
1,921✔
1073
_exit:
1,921✔
1074
  if (rspCode != 0) {
1,921✔
1075
    // corner case: if occurs, the client will not receive the response, and the client should be killed manually
1076
    dError("mount:%s, failed to retrieve mount at line %d since %s, dnode:%d, path:%s", req.mountName, lino,
×
1077
           tstrerror(rspCode), req.dnodeId, req.mountPath);
1078
    rpcFreeCont(pBuf);
×
1079
    code = rspCode;
×
1080
  } else if (code != 0) {
1,921✔
1081
    // the client would receive the response with error msg
1082
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", req.mountName, lino,
1,284✔
1083
           req.dnodeId, tstrerror(code), req.mountPath);
1084
  } else {
1085
    int32_t nVgs = 0;
637✔
1086
    int32_t nDbs = taosArrayGetSize(mountInfo.pDbs);
637✔
1087
    for (int32_t i = 0; i < nDbs; ++i) {
1,911✔
1088
      SMountDbInfo *pDb = TARRAY_GET_ELEM(mountInfo.pDbs, i);
1,274✔
1089
      nVgs += taosArrayGetSize(pDb->pVgs);
1,274✔
1090
    }
1091
    dInfo("mount:%s, success to retrieve mount, nDbs:%d, nVgs:%d, path:%s", req.mountName, nDbs, nVgs, req.mountPath);
637✔
1092
  }
1093
  taosMemFreeClear(vndMgmt.path);
1,921✔
1094
  tFreeMountInfo(&mountInfo, false);
1,921✔
1095
  TAOS_RETURN(code);
1,921✔
1096
}
1097

1098
static int32_t vmMountVnode(SVnodeMgmt *pMgmt, const char *path, SVnodeCfg *pCfg, int32_t diskPrimary,
2,548✔
1099
                            SMountVnodeReq *req, STfs *pMountTfs) {
1100
  int32_t    code = 0;
2,548✔
1101
  SVnodeInfo info = {0};
2,548✔
1102
  char       hostDir[TSDB_FILENAME_LEN] = {0};
2,548✔
1103
  char       mountDir[TSDB_FILENAME_LEN] = {0};
2,548✔
1104
  char       mountVnode[32] = {0};
2,548✔
1105

1106
  if ((code = vnodeCheckCfg(pCfg)) < 0) {
2,548✔
1107
    vError("vgId:%d, mount:%s, failed to mount vnode since:%s", pCfg->vgId, req->mountName, tstrerror(code));
×
1108
    return code;
×
1109
  }
1110

1111
  vnodeGetPrimaryDir(path, 0, pMgmt->pTfs, hostDir, TSDB_FILENAME_LEN);
2,548✔
1112
  if ((code = taosMkDir(hostDir))) {
2,548✔
1113
    vError("vgId:%d, mount:%s, failed to prepare vnode dir since %s, host path: %s", pCfg->vgId, req->mountName,
×
1114
           tstrerror(code), hostDir);
1115
    return code;
×
1116
  }
1117

1118
  info.config = *pCfg;  // copy the config
2,548✔
1119
  info.state.committed = req->committed;
2,548✔
1120
  info.state.commitID = req->commitID;
2,548✔
1121
  info.state.commitTerm = req->commitTerm;
2,548✔
1122
  info.state.applied = req->committed;
2,548✔
1123
  info.state.applyTerm = req->commitTerm;
2,548✔
1124
  info.config.vndStats.numOfSTables = req->numOfSTables;
2,548✔
1125
  info.config.vndStats.numOfCTables = req->numOfCTables;
2,548✔
1126
  info.config.vndStats.numOfNTables = req->numOfNTables;
2,548✔
1127

1128
  SVnodeInfo oldInfo = {0};
2,548✔
1129
  oldInfo.config = vnodeCfgDefault;
2,548✔
1130
  if (vnodeLoadInfo(hostDir, &oldInfo) == 0) {
2,548✔
1131
    if (oldInfo.config.dbId != info.config.dbId) {
×
1132
      code = TSDB_CODE_VND_ALREADY_EXIST_BUT_NOT_MATCH;
×
1133
      vError("vgId:%d, mount:%s, vnode config info already exists at %s. oldDbId:%" PRId64 "(%s) at cluster:%" PRId64
×
1134
             ", newDbId:%" PRId64 "(%s) at cluser:%" PRId64 ", code:%s",
1135
             oldInfo.config.vgId, req->mountName, hostDir, oldInfo.config.dbId, oldInfo.config.dbname,
1136
             oldInfo.config.syncCfg.nodeInfo[oldInfo.config.syncCfg.myIndex].clusterId, info.config.dbId,
1137
             info.config.dbname, info.config.syncCfg.nodeInfo[info.config.syncCfg.myIndex].clusterId, tstrerror(code));
1138

1139
    } else {
1140
      vWarn("vgId:%d, mount:%s, vnode config info already exists at %s.", oldInfo.config.vgId, req->mountName, hostDir);
×
1141
    }
1142
    return code;
×
1143
  }
1144

1145
  char hostSubDir[TSDB_FILENAME_LEN] = {0};
2,548✔
1146
  char mountSubDir[TSDB_FILENAME_LEN] = {0};
2,548✔
1147
  (void)snprintf(mountVnode, sizeof(mountVnode), "vnode%svnode%d", TD_DIRSEP, req->mountVgId);
2,548✔
1148
  vnodeGetPrimaryDir(mountVnode, diskPrimary, pMountTfs, mountDir, TSDB_FILENAME_LEN);
2,548✔
1149
  static const char *vndSubDirs[] = {"meta", "sync", "tq", "tsdb", "wal"};
1150
  for (int32_t i = 0; i < tListLen(vndSubDirs); ++i) {
15,288✔
1151
    (void)snprintf(hostSubDir, sizeof(hostSubDir), "%s%s%s", hostDir, TD_DIRSEP, vndSubDirs[i]);
12,740✔
1152
    (void)snprintf(mountSubDir, sizeof(mountSubDir), "%s%s%s", mountDir, TD_DIRSEP, vndSubDirs[i]);
12,740✔
1153
    if ((code = taosSymLink(mountSubDir, hostSubDir)) != 0) {
12,740✔
1154
      vError("vgId:%d, mount:%s, failed to create vnode symlink %s -> %s since %s", info.config.vgId, req->mountName,
×
1155
             mountSubDir, hostSubDir, tstrerror(code));
1156
      return code;
×
1157
    }
1158
  }
1159
  vInfo("vgId:%d, mount:save vnode config while create", info.config.vgId);
2,548✔
1160
  if ((code = vnodeSaveInfo(hostDir, &info)) < 0 || (code = vnodeCommitInfo(hostDir)) < 0) {
2,548✔
1161
    vError("vgId:%d, mount:%s, failed to save vnode config since %s, mount path: %s", pCfg ? pCfg->vgId : 0,
×
1162
           req->mountName, tstrerror(code), hostDir);
1163
    return code;
×
1164
  }
1165
  vInfo("vgId:%d, mount:%s, vnode is mounted from %s to %s", info.config.vgId, req->mountName, mountDir, hostDir);
2,548✔
1166
  return 0;
2,548✔
1167
}
1168

1169
int32_t vmProcessMountVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
2,548✔
1170
  int32_t          code = 0, lino = 0;
2,548✔
1171
  SMountVnodeReq   req = {0};
2,548✔
1172
  SCreateVnodeReq *pCreateReq = &req.createReq;
2,548✔
1173
  SVnodeCfg        vnodeCfg = {0};
2,548✔
1174
  SWrapperCfg      wrapperCfg = {0};
2,548✔
1175
  SVnode          *pImpl = NULL;
2,548✔
1176
  STfs            *pMountTfs = NULL;
2,548✔
1177
  char             path[TSDB_FILENAME_LEN] = {0};
2,548✔
1178
  bool             releaseTfs = false;
2,548✔
1179

1180
  if (tDeserializeSMountVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
2,548✔
1181
    dError("vgId:%d, failed to mount vnode since deserialize request error", pCreateReq->vgId);
×
1182
    return TSDB_CODE_INVALID_MSG;
×
1183
  }
1184

1185
  if (pCreateReq->learnerReplica == 0) {
2,548✔
1186
    pCreateReq->learnerSelfIndex = -1;
2,548✔
1187
  }
1188
  for (int32_t i = 0; i < pCreateReq->replica; ++i) {
5,096✔
1189
    dInfo("mount:%s, vgId:%d, replica:%d ep:%s:%u dnode:%d", req.mountName, pCreateReq->vgId, i,
2,548✔
1190
          pCreateReq->replicas[i].fqdn, pCreateReq->replicas[i].port, pCreateReq->replicas[i].id);
1191
  }
1192
  for (int32_t i = 0; i < pCreateReq->learnerReplica; ++i) {
2,548✔
1193
    dInfo("mount:%s, vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.mountName, pCreateReq->vgId, i,
×
1194
          pCreateReq->learnerReplicas[i].fqdn, pCreateReq->learnerReplicas[i].port, pCreateReq->replicas[i].id);
1195
  }
1196

1197
  SReplica *pReplica = NULL;
2,548✔
1198
  if (pCreateReq->selfIndex != -1) {
2,548✔
1199
    pReplica = &pCreateReq->replicas[pCreateReq->selfIndex];
2,548✔
1200
  } else {
1201
    pReplica = &pCreateReq->learnerReplicas[pCreateReq->learnerSelfIndex];
×
1202
  }
1203
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
2,548✔
1204
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
2,548✔
1205
    (void)tFreeSMountVnodeReq(&req);
×
1206
    code = TSDB_CODE_INVALID_MSG;
×
1207
    dError("mount:%s, vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode, reason:%s", req.mountName,
×
1208
           pCreateReq->vgId, pReplica->id, pReplica->fqdn, pReplica->port, tstrerror(code));
1209
    return code;
×
1210
  }
1211
  vmGenerateVnodeCfg(pCreateReq, &vnodeCfg);
2,548✔
1212
  vnodeCfg.mountVgId = req.mountVgId;
2,548✔
1213
  vmGenerateWrapperCfg(pMgmt, pCreateReq, &wrapperCfg);
2,548✔
1214
  wrapperCfg.mountId = req.mountId;
2,548✔
1215

1216
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, pCreateReq->vgId, false);
2,548✔
1217
  if (pVnode != NULL && (pCreateReq->replica == 1 || !pVnode->failed)) {
2,548✔
1218
    dError("mount:%s, vgId:%d, already exist", req.mountName, pCreateReq->vgId);
×
1219
    (void)tFreeSMountVnodeReq(&req);
×
1220
    vmReleaseVnode(pMgmt, pVnode);
×
1221
    code = TSDB_CODE_VND_ALREADY_EXIST;
×
1222
    return 0;
×
1223
  }
1224
  vmReleaseVnode(pMgmt, pVnode);
2,548✔
1225

1226
  wrapperCfg.diskPrimary = req.diskPrimary;
2,548✔
1227
  (void)snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
2,548✔
1228
  TAOS_CHECK_EXIT(vmAcquireMountTfs(pMgmt, req.mountId, req.mountName, req.mountPath, &pMountTfs));
2,548✔
1229
  releaseTfs = true;
2,548✔
1230

1231
  TAOS_CHECK_EXIT(vmMountVnode(pMgmt, path, &vnodeCfg, wrapperCfg.diskPrimary, &req, pMountTfs));
2,548✔
1232
  if (!(pImpl = vnodeOpen(path, 0, pMgmt->pTfs, pMountTfs, pMgmt->msgCb, true))) {
2,548✔
1233
    TAOS_CHECK_EXIT(terrno != 0 ? terrno : -1);
×
1234
  }
1235
  if ((code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl)) != 0) {
2,548✔
1236
    TAOS_CHECK_EXIT(terrno != 0 ? terrno : code);
×
1237
  }
1238
  TAOS_CHECK_EXIT(vnodeStart(pImpl));
2,548✔
1239
  TAOS_CHECK_EXIT(vmWriteVnodeListToFile(pMgmt));
2,548✔
1240
  TAOS_CHECK_EXIT(vmWriteMountListToFile(pMgmt));
2,548✔
1241
_exit:
2,548✔
1242
  vmCleanPrimaryDisk(pMgmt, pCreateReq->vgId);
2,548✔
1243
  if (code != 0) {
2,548✔
1244
    dError("mount:%s, vgId:%d, msgType:%s, failed at line %d to mount vnode since %s", req.mountName, pCreateReq->vgId,
×
1245
           TMSG_INFO(pMsg->msgType), lino, tstrerror(code));
1246
    vmCloseFailedVnode(pMgmt, pCreateReq->vgId);
×
1247
    vnodeClose(pImpl);
×
1248
    vnodeDestroy(0, path, pMgmt->pTfs, 0);
×
1249
    if (releaseTfs) vmReleaseMountTfs(pMgmt, req.mountId, 1);
×
1250
  } else {
1251
    dInfo("mount:%s, vgId:%d, msgType:%s, success to mount vnode", req.mountName, pCreateReq->vgId,
2,548✔
1252
          TMSG_INFO(pMsg->msgType));
1253
  }
1254

1255
  pMsg->code = code;
2,548✔
1256
  pMsg->info.rsp = NULL;
2,548✔
1257
  pMsg->info.rspLen = 0;
2,548✔
1258

1259
  (void)tFreeSMountVnodeReq(&req);
2,548✔
1260
  TAOS_RETURN(code);
2,548✔
1261
}
1262
#endif  // USE_MOUNT
1263

1264
// alter replica doesn't use this, but restore dnode still use this
1265
int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
1,812,263✔
1266
  SAlterVnodeTypeReq req = {0};
1,812,263✔
1267
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
1,812,263✔
1268
    terrno = TSDB_CODE_INVALID_MSG;
×
1269
    return -1;
×
1270
  }
1271

1272
  if (req.learnerReplicas == 0) {
1273
    req.learnerSelfIndex = -1;
1274
  }
1275

1276
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process alter-node-type-request", req.vgId,
1,812,263✔
1277
        TMSG_INFO(pMsg->msgType));
1278

1279
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
1,812,263✔
1280
  if (pVnode == NULL) {
1,812,263✔
1281
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
1282
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1283
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1284
    return -1;
×
1285
  }
1286

1287
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
1,812,263✔
1288
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
1,812,263✔
1289
  if (role == TAOS_SYNC_ROLE_VOTER) {
1,812,263✔
1290
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
1291
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
1292
    vmReleaseVnode(pMgmt, pVnode);
×
1293
    return -1;
×
1294
  }
1295

1296
  dInfo("vgId:%d, checking node catch up", req.vgId);
1,812,263✔
1297
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
1,812,263✔
1298
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
1,718,800✔
1299
    vmReleaseVnode(pMgmt, pVnode);
1,718,800✔
1300
    return -1;
1,718,800✔
1301
  }
1302

1303
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
93,463✔
1304

1305
  int32_t vgId = req.vgId;
93,463✔
1306
  dInfo("vgId:%d, start to alter vnode type replica:%d selfIndex:%d strict:%d changeVersion:%d", vgId, req.replica,
93,463✔
1307
        req.selfIndex, req.strict, req.changeVersion);
1308
  for (int32_t i = 0; i < req.replica; ++i) {
372,628✔
1309
    SReplica *pReplica = &req.replicas[i];
279,165✔
1310
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
279,165✔
1311
  }
1312
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
93,463✔
1313
    SReplica *pReplica = &req.learnerReplicas[i];
×
1314
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
×
1315
  }
1316

1317
  if (req.replica <= 0 || (req.selfIndex < 0 && req.learnerSelfIndex < 0) || req.selfIndex >= req.replica ||
93,463✔
1318
      req.learnerSelfIndex >= req.learnerReplica) {
93,463✔
1319
    terrno = TSDB_CODE_INVALID_MSG;
×
1320
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
1321
    vmReleaseVnode(pMgmt, pVnode);
×
1322
    return -1;
×
1323
  }
1324

1325
  SReplica *pReplica = NULL;
93,463✔
1326
  if (req.selfIndex != -1) {
93,463✔
1327
    pReplica = &req.replicas[req.selfIndex];
93,463✔
1328
  } else {
1329
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
×
1330
  }
1331

1332
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
93,463✔
1333
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
93,463✔
1334
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1335
    dError("vgId:%d, dnodeId:%d ep:%s:%u in request, ep:%s:%u in local, %s", vgId, pReplica->id, pReplica->fqdn,
×
1336
           pReplica->port, tsLocalFqdn, tsServerPort, tstrerror(terrno));
1337
    vmReleaseVnode(pMgmt, pVnode);
×
1338
    return -1;
×
1339
  }
1340

1341
  dInfo("vgId:%d, start to close vnode", vgId);
93,463✔
1342
  SWrapperCfg wrapperCfg = {
93,463✔
1343
      .dropped = pVnode->dropped,
93,463✔
1344
      .vgId = pVnode->vgId,
93,463✔
1345
      .vgVersion = pVnode->vgVersion,
93,463✔
1346
      .diskPrimary = pVnode->diskPrimary,
93,463✔
1347
  };
1348
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
93,463✔
1349

1350
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
93,463✔
1351
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
93,463✔
1352

1353
  int32_t diskPrimary = wrapperCfg.diskPrimary;
93,463✔
1354
  char    path[TSDB_FILENAME_LEN] = {0};
93,463✔
1355
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
93,463✔
1356

1357
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
93,463✔
1358
  if (vnodeAlterReplica(path, &req, diskPrimary, pMgmt->pTfs) < 0) {
93,463✔
1359
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
1360
    return -1;
×
1361
  }
1362

1363
  dInfo("vgId:%d, begin to open vnode", vgId);
93,463✔
1364
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
93,463✔
1365
  if (pImpl == NULL) {
93,463✔
1366
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
1367
    return -1;
×
1368
  }
1369

1370
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
93,463✔
1371
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
1372
    return -1;
×
1373
  }
1374

1375
  if (vnodeStart(pImpl) != 0) {
93,463✔
1376
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
1377
    return -1;
×
1378
  }
1379

1380
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process alter-node-type-request, vnode config is altered",
93,463✔
1381
        req.vgId, TMSG_INFO(pMsg->msgType));
1382
  return 0;
93,463✔
1383
}
1384

1385
int32_t vmProcessCheckLearnCatchupReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
1386
  SCheckLearnCatchupReq req = {0};
×
1387
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
×
1388
    terrno = TSDB_CODE_INVALID_MSG;
×
1389
    return -1;
×
1390
  }
1391

1392
  if (req.learnerReplicas == 0) {
1393
    req.learnerSelfIndex = -1;
1394
  }
1395

1396
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process check-learner-catchup-request", req.vgId,
×
1397
        TMSG_INFO(pMsg->msgType));
1398

1399
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
×
1400
  if (pVnode == NULL) {
×
1401
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
1402
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1403
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1404
    return -1;
×
1405
  }
1406

1407
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
×
1408
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
×
1409
  if (role == TAOS_SYNC_ROLE_VOTER) {
×
1410
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
1411
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
1412
    vmReleaseVnode(pMgmt, pVnode);
×
1413
    return -1;
×
1414
  }
1415

1416
  dInfo("vgId:%d, checking node catch up", req.vgId);
×
1417
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
×
1418
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
×
1419
    vmReleaseVnode(pMgmt, pVnode);
×
1420
    return -1;
×
1421
  }
1422

1423
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
×
1424

1425
  vmReleaseVnode(pMgmt, pVnode);
×
1426

1427
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process check-learner-catchup-request", req.vgId,
×
1428
        TMSG_INFO(pMsg->msgType));
1429

1430
  return 0;
×
1431
}
1432

1433
int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
21,312✔
1434
  SDisableVnodeWriteReq req = {0};
21,312✔
1435
  if (tDeserializeSDisableVnodeWriteReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
21,312✔
1436
    terrno = TSDB_CODE_INVALID_MSG;
×
1437
    return -1;
×
1438
  }
1439

1440
  dInfo("vgId:%d, vnode write disable:%d", req.vgId, req.disable);
21,312✔
1441

1442
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
21,312✔
1443
  if (pVnode == NULL) {
21,312✔
1444
    dError("vgId:%d, failed to disable write since %s", req.vgId, terrstr());
×
1445
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1446
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1447
    return -1;
×
1448
  }
1449

1450
  pVnode->disable = req.disable;
21,312✔
1451
  vmReleaseVnode(pMgmt, pVnode);
21,312✔
1452
  return 0;
21,312✔
1453
}
1454

1455
int32_t vmProcessSetKeepVersionReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
3,528✔
1456
  SMsgHead *pHead = pMsg->pCont;
3,528✔
1457
  pHead->contLen = ntohl(pHead->contLen);
3,528✔
1458
  pHead->vgId = ntohl(pHead->vgId);
3,528✔
1459

1460
  SVndSetKeepVersionReq req = {0};
3,528✔
1461
  if (tDeserializeSVndSetKeepVersionReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead),
3,528✔
1462
                                        &req) != 0) {
1463
    terrno = TSDB_CODE_INVALID_MSG;
×
1464
    return -1;
×
1465
  }
1466

1467
  dInfo("vgId:%d, set wal keep version to %" PRId64, pHead->vgId, req.keepVersion);
3,528✔
1468

1469
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
3,528✔
1470
  if (pVnode == NULL) {
3,528✔
1471
    dError("vgId:%d, failed to set keep version since %s", pHead->vgId, terrstr());
×
1472
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1473
    return -1;
×
1474
  }
1475

1476
  // Directly call vnodeSetWalKeepVersion for immediate effect (< 1ms)
1477
  // This bypasses Raft to avoid timing issues where WAL might be deleted
1478
  // before keepVersion is set through the Raft consensus process
1479
  int32_t code = vnodeSetWalKeepVersion(pVnode->pImpl, req.keepVersion);
3,528✔
1480
  if (code != TSDB_CODE_SUCCESS) {
3,528✔
1481
    dError("vgId:%d, failed to set keepVersion to %" PRId64 " since %s", pHead->vgId, req.keepVersion, tstrerror(code));
×
1482
    terrno = code;
×
1483
    vmReleaseVnode(pMgmt, pVnode);
×
1484
    return -1;
×
1485
  }
1486

1487
  dInfo("vgId:%d, successfully set keepVersion to %" PRId64, pHead->vgId, req.keepVersion);
3,528✔
1488

1489
  vmReleaseVnode(pMgmt, pVnode);
3,528✔
1490
  return 0;
3,528✔
1491
}
1492

1493
int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
20,582✔
1494
  SAlterVnodeHashRangeReq req = {0};
20,582✔
1495
  if (tDeserializeSAlterVnodeHashRangeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
20,582✔
1496
    terrno = TSDB_CODE_INVALID_MSG;
×
1497
    return -1;
×
1498
  }
1499

1500
  int32_t srcVgId = req.srcVgId;
20,582✔
1501
  int32_t dstVgId = req.dstVgId;
20,582✔
1502

1503
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, dstVgId);
20,582✔
1504
  if (pVnode != NULL) {
20,582✔
1505
    dError("vgId:%d, vnode already exist", dstVgId);
×
1506
    vmReleaseVnode(pMgmt, pVnode);
×
1507
    terrno = TSDB_CODE_VND_ALREADY_EXIST;
×
1508
    return -1;
×
1509
  }
1510

1511
  dInfo("vgId:%d, start to alter vnode hashrange:[%u, %u], dstVgId:%d", req.srcVgId, req.hashBegin, req.hashEnd,
20,582✔
1512
        req.dstVgId);
1513
  pVnode = vmAcquireVnode(pMgmt, srcVgId);
20,582✔
1514
  if (pVnode == NULL) {
20,582✔
1515
    dError("vgId:%d, failed to alter hashrange since %s", srcVgId, terrstr());
×
1516
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1517
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1518
    return -1;
×
1519
  }
1520

1521
  SWrapperCfg wrapperCfg = {
20,582✔
1522
      .dropped = pVnode->dropped,
20,582✔
1523
      .vgId = dstVgId,
1524
      .vgVersion = pVnode->vgVersion,
20,582✔
1525
      .diskPrimary = pVnode->diskPrimary,
20,582✔
1526
  };
1527
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
20,582✔
1528

1529
  // prepare alter
1530
  pVnode->toVgId = dstVgId;
20,582✔
1531
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
20,582✔
1532
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
1533
    return -1;
×
1534
  }
1535

1536
  dInfo("vgId:%d, close vnode", srcVgId);
20,582✔
1537
  vmCloseVnode(pMgmt, pVnode, true, false);
20,582✔
1538

1539
  int32_t diskPrimary = wrapperCfg.diskPrimary;
20,582✔
1540
  char    srcPath[TSDB_FILENAME_LEN] = {0};
20,582✔
1541
  char    dstPath[TSDB_FILENAME_LEN] = {0};
20,582✔
1542
  snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
20,582✔
1543
  snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
20,582✔
1544

1545
  dInfo("vgId:%d, alter vnode hashrange at %s", srcVgId, srcPath);
20,582✔
1546
  if (vnodeAlterHashRange(srcPath, dstPath, &req, diskPrimary, pMgmt->pTfs) < 0) {
20,582✔
1547
    dError("vgId:%d, failed to alter vnode hashrange since %s", srcVgId, terrstr());
×
1548
    return -1;
×
1549
  }
1550

1551
  dInfo("vgId:%d, open vnode", dstVgId);
20,582✔
1552
  SVnode *pImpl = vnodeOpen(dstPath, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
20,582✔
1553

1554
  if (pImpl == NULL) {
20,582✔
1555
    dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr());
×
1556
    return -1;
×
1557
  }
1558

1559
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
20,582✔
1560
    dError("vgId:%d, failed to open vnode mgmt since %s", dstVgId, terrstr());
×
1561
    return -1;
×
1562
  }
1563

1564
  if (vnodeStart(pImpl) != 0) {
20,582✔
1565
    dError("vgId:%d, failed to start sync since %s", dstVgId, terrstr());
×
1566
    return -1;
×
1567
  }
1568

1569
  // complete alter
1570
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
20,582✔
1571
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
1572
    return -1;
×
1573
  }
1574

1575
  dInfo("vgId:%d, vnode hashrange is altered", dstVgId);
20,582✔
1576
  return 0;
20,582✔
1577
}
1578

1579
int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
642,954✔
1580
  SAlterVnodeReplicaReq alterReq = {0};
642,954✔
1581
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
642,954✔
1582
    terrno = TSDB_CODE_INVALID_MSG;
×
1583
    return -1;
×
1584
  }
1585

1586
  if (alterReq.learnerReplica == 0) {
642,954✔
1587
    alterReq.learnerSelfIndex = -1;
462,134✔
1588
  }
1589

1590
  int32_t vgId = alterReq.vgId;
642,954✔
1591
  dInfo(
642,954✔
1592
      "vgId:%d, vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
1593
      "learnerSelfIndex:%d strict:%d changeVersion:%d",
1594
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
1595
      alterReq.learnerSelfIndex, alterReq.strict, alterReq.changeVersion);
1596

1597
  for (int32_t i = 0; i < alterReq.replica; ++i) {
2,495,590✔
1598
    SReplica *pReplica = &alterReq.replicas[i];
1,852,636✔
1599
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
1,852,636✔
1600
  }
1601
  for (int32_t i = 0; i < alterReq.learnerReplica; ++i) {
823,774✔
1602
    SReplica *pReplica = &alterReq.learnerReplicas[i];
180,820✔
1603
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
180,820✔
1604
  }
1605

1606
  if (alterReq.replica <= 0 || (alterReq.selfIndex < 0 && alterReq.learnerSelfIndex < 0) ||
642,954✔
1607
      alterReq.selfIndex >= alterReq.replica || alterReq.learnerSelfIndex >= alterReq.learnerReplica) {
642,954✔
1608
    terrno = TSDB_CODE_INVALID_MSG;
×
1609
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
1610
    return -1;
×
1611
  }
1612

1613
  SReplica *pReplica = NULL;
642,954✔
1614
  if (alterReq.selfIndex != -1) {
642,954✔
1615
    pReplica = &alterReq.replicas[alterReq.selfIndex];
642,954✔
1616
  } else {
1617
    pReplica = &alterReq.learnerReplicas[alterReq.learnerSelfIndex];
×
1618
  }
1619

1620
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
642,954✔
1621
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
642,954✔
1622
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1623
    dError("vgId:%d, dnodeId:%d ep:%s:%u in request, ep:%s:%u in lcoal, %s", vgId, pReplica->id, pReplica->fqdn,
×
1624
           pReplica->port, tsLocalFqdn, tsServerPort, tstrerror(terrno));
1625
    return -1;
×
1626
  }
1627

1628
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
642,954✔
1629
  if (pVnode == NULL) {
642,954✔
1630
    dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
×
1631
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1632
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1633
    return -1;
×
1634
  }
1635

1636
  dInfo("vgId:%d, start to close vnode", vgId);
642,954✔
1637
  SWrapperCfg wrapperCfg = {
642,954✔
1638
      .dropped = pVnode->dropped,
642,954✔
1639
      .vgId = pVnode->vgId,
642,954✔
1640
      .vgVersion = pVnode->vgVersion,
642,954✔
1641
      .diskPrimary = pVnode->diskPrimary,
642,954✔
1642
  };
1643
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
642,954✔
1644

1645
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
642,954✔
1646
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
642,954✔
1647

1648
  int32_t diskPrimary = wrapperCfg.diskPrimary;
642,954✔
1649
  char    path[TSDB_FILENAME_LEN] = {0};
642,954✔
1650
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
642,954✔
1651

1652
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
642,954✔
1653
  if (vnodeAlterReplica(path, &alterReq, diskPrimary, pMgmt->pTfs) < 0) {
642,954✔
1654
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
1655
    return -1;
×
1656
  }
1657

1658
  dInfo("vgId:%d, begin to open vnode", vgId);
642,954✔
1659
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
642,954✔
1660
  if (pImpl == NULL) {
642,954✔
1661
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
1662
    return -1;
×
1663
  }
1664

1665
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
642,954✔
1666
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
1667
    return -1;
×
1668
  }
1669

1670
  if (vnodeStart(pImpl) != 0) {
642,954✔
1671
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
1672
    return -1;
×
1673
  }
1674

1675
  dInfo(
642,954✔
1676
      "vgId:%d, vnode management handle msgType:%s, end to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
1677
      "learnerSelfIndex:%d strict:%d",
1678
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
1679
      alterReq.learnerSelfIndex, alterReq.strict);
1680
  return 0;
642,954✔
1681
}
1682

1683
int32_t vmProcessAlterVnodeElectBaselineReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
34,828✔
1684
  SAlterVnodeElectBaselineReq alterReq = {0};
34,828✔
1685
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
34,828✔
1686
    return TSDB_CODE_INVALID_MSG;
×
1687
  }
1688

1689
  int32_t vgId = alterReq.vgId;
34,828✔
1690
  dInfo(
34,828✔
1691
      "vgId:%d, process alter vnode elect-base-line msgType:%s, electBaseLine:%d",
1692
      vgId, TMSG_INFO(pMsg->msgType), alterReq.electBaseLine);
1693

1694
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
34,828✔
1695
  if (pVnode == NULL) {
34,828✔
1696
    dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
×
1697
    return terrno;
×
1698
  }
1699

1700
  if(vnodeSetElectBaseline(pVnode->pImpl, alterReq.electBaseLine) != 0){
34,828✔
1701
    vmReleaseVnode(pMgmt, pVnode);
×
1702
    return -1;
×
1703
  }
1704

1705
  vmReleaseVnode(pMgmt, pVnode);
34,828✔
1706
  return 0;
34,828✔
1707
}
1708

1709
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
1,600,206✔
1710
  int32_t       code = 0;
1,600,206✔
1711
  SDropVnodeReq dropReq = {0};
1,600,206✔
1712
  if (tDeserializeSDropVnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
1,600,206✔
1713
    terrno = TSDB_CODE_INVALID_MSG;
×
1714
    return terrno;
×
1715
  }
1716

1717
  int32_t vgId = dropReq.vgId;
1,600,206✔
1718
  dInfo("vgId:%d, start to drop vnode", vgId);
1,600,206✔
1719

1720
  if (dropReq.dnodeId != pMgmt->pData->dnodeId) {
1,600,206✔
1721
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1722
    dError("vgId:%d, dnodeId:%d, %s", dropReq.vgId, dropReq.dnodeId, tstrerror(terrno));
×
1723
    return terrno;
×
1724
  }
1725

1726
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, vgId, false);
1,600,206✔
1727
  if (pVnode == NULL) {
1,600,206✔
1728
    dInfo("vgId:%d, failed to drop since %s", vgId, terrstr());
×
1729
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1730
    return terrno;
×
1731
  }
1732

1733
  pVnode->dropped = 1;
1,600,206✔
1734
  if ((code = vmWriteVnodeListToFile(pMgmt)) != 0) {
1,600,206✔
1735
    pVnode->dropped = 0;
×
1736
    vmReleaseVnode(pMgmt, pVnode);
×
1737
    return code;
×
1738
  }
1739

1740
  vmCloseVnode(pMgmt, pVnode, false, false);
1,600,206✔
1741
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
1,600,206✔
1742
    dError("vgId:%d, failed to write vnode list since %s", vgId, terrstr());
×
1743
  }
1744

1745
  dInfo("vgId:%d, is dropped", vgId);
1,600,206✔
1746
  return 0;
1,600,206✔
1747
}
1748

1749
int32_t vmProcessArbHeartBeatReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
107,391✔
1750
  SVArbHeartBeatReq arbHbReq = {0};
107,391✔
1751
  SVArbHeartBeatRsp arbHbRsp = {0};
107,391✔
1752
  if (tDeserializeSVArbHeartBeatReq(pMsg->pCont, pMsg->contLen, &arbHbReq) != 0) {
107,391✔
1753
    terrno = TSDB_CODE_INVALID_MSG;
×
1754
    return -1;
×
1755
  }
1756

1757
  if (arbHbReq.dnodeId != pMgmt->pData->dnodeId) {
107,391✔
1758
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1759
    dError("dnodeId:%d, %s", arbHbReq.dnodeId, tstrerror(terrno));
×
1760
    goto _OVER;
×
1761
  }
1762

1763
  if (strlen(arbHbReq.arbToken) == 0) {
107,391✔
1764
    terrno = TSDB_CODE_INVALID_MSG;
×
1765
    dError("dnodeId:%d arbToken is empty", arbHbReq.dnodeId);
×
1766
    goto _OVER;
×
1767
  }
1768

1769
  size_t size = taosArrayGetSize(arbHbReq.hbMembers);
107,391✔
1770

1771
  arbHbRsp.dnodeId = pMgmt->pData->dnodeId;
107,391✔
1772
  tstrncpy(arbHbRsp.arbToken, arbHbReq.arbToken, TSDB_ARB_TOKEN_SIZE);
107,391✔
1773
  arbHbRsp.hbMembers = taosArrayInit(size, sizeof(SVArbHbRspMember));
107,391✔
1774
  if (arbHbRsp.hbMembers == NULL) {
107,391✔
1775
    goto _OVER;
×
1776
  }
1777

1778
  for (int32_t i = 0; i < size; i++) {
227,651✔
1779
    SVArbHbReqMember *pReqMember = taosArrayGet(arbHbReq.hbMembers, i);
120,260✔
1780
    SVnodeObj        *pVnode = vmAcquireVnode(pMgmt, pReqMember->vgId);
120,260✔
1781
    if (pVnode == NULL) {
120,260✔
1782
      dError("dnodeId:%d vgId:%d not found failed to process arb hb req", arbHbReq.dnodeId, pReqMember->vgId);
25,550✔
1783
      continue;
25,550✔
1784
    }
1785

1786
    SVArbHbRspMember rspMember = {0};
94,710✔
1787
    rspMember.vgId = pReqMember->vgId;
94,710✔
1788
    rspMember.hbSeq = pReqMember->hbSeq;
94,710✔
1789
    if (vnodeGetArbToken(pVnode->pImpl, rspMember.memberToken) != 0) {
94,710✔
1790
      dError("dnodeId:%d vgId:%d failed to get arb token", arbHbReq.dnodeId, pReqMember->vgId);
×
1791
      vmReleaseVnode(pMgmt, pVnode);
×
1792
      continue;
×
1793
    }
1794

1795
    if (vnodeUpdateArbTerm(pVnode->pImpl, arbHbReq.arbTerm) != 0) {
94,710✔
1796
      dError("dnodeId:%d vgId:%d failed to update arb term", arbHbReq.dnodeId, pReqMember->vgId);
×
1797
      vmReleaseVnode(pMgmt, pVnode);
×
1798
      continue;
×
1799
    }
1800

1801
    if (taosArrayPush(arbHbRsp.hbMembers, &rspMember) == NULL) {
189,420✔
1802
      dError("dnodeId:%d vgId:%d failed to push arb hb rsp member", arbHbReq.dnodeId, pReqMember->vgId);
×
1803
      vmReleaseVnode(pMgmt, pVnode);
×
1804
      goto _OVER;
×
1805
    }
1806

1807
    vmReleaseVnode(pMgmt, pVnode);
94,710✔
1808
  }
1809

1810
  SRpcMsg rspMsg = {.info = pMsg->info};
107,391✔
1811
  int32_t rspLen = tSerializeSVArbHeartBeatRsp(NULL, 0, &arbHbRsp);
107,391✔
1812
  if (rspLen < 0) {
107,391✔
1813
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1814
    goto _OVER;
×
1815
  }
1816

1817
  void *pRsp = rpcMallocCont(rspLen);
107,391✔
1818
  if (pRsp == NULL) {
107,391✔
1819
    terrno = terrno;
×
1820
    goto _OVER;
×
1821
  }
1822

1823
  if (tSerializeSVArbHeartBeatRsp(pRsp, rspLen, &arbHbRsp) <= 0) {
107,391✔
1824
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1825
    rpcFreeCont(pRsp);
×
1826
    goto _OVER;
×
1827
  }
1828
  pMsg->info.rsp = pRsp;
107,391✔
1829
  pMsg->info.rspLen = rspLen;
107,391✔
1830

1831
  terrno = TSDB_CODE_SUCCESS;
107,391✔
1832

1833
_OVER:
107,391✔
1834
  tFreeSVArbHeartBeatReq(&arbHbReq);
107,391✔
1835
  tFreeSVArbHeartBeatRsp(&arbHbRsp);
107,391✔
1836
  return terrno;
107,391✔
1837
}
1838

1839
SArray *vmGetMsgHandles() {
678,342✔
1840
  int32_t code = -1;
678,342✔
1841
  SArray *pArray = taosArrayInit(32, sizeof(SMgmtHandle));
678,342✔
1842
  if (pArray == NULL) goto _OVER;
678,342✔
1843

1844
  if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
678,342✔
1845
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
678,342✔
1846
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
678,342✔
1847
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
678,342✔
1848
  if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
678,342✔
1849
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
678,342✔
1850
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
678,342✔
1851
  if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
678,342✔
1852
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
678,342✔
1853
  if (dmSetMgmtHandle(pArray, TDMT_VND_VSUBTABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
678,342✔
1854
  if (dmSetMgmtHandle(pArray, TDMT_VND_VSTB_REF_DBS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
678,342✔
1855
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_CFG, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
678,342✔
1856
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
678,342✔
1857
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
678,342✔
1858
  if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
678,342✔
1859
  if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
678,342✔
1860
  if (dmSetMgmtHandle(pArray, TDMT_SCH_TASK_NOTIFY, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
678,342✔
1861
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
678,342✔
1862
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
678,342✔
1863
  if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_TTL_EXPIRED_TBS, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
678,342✔
1864
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
678,342✔
1865
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
678,342✔
1866
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
678,342✔
1867
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
678,342✔
1868
  if (dmSetMgmtHandle(pArray, TDMT_VND_SNODE_DROP_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
678,342✔
1869
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
678,342✔
1870
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DELETE_SUB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
678,342✔
1871
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
678,342✔
1872
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SEEK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
678,342✔
1873
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_ADD_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
678,342✔
1874
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DEL_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
678,342✔
1875
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
678,342✔
1876
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME_PUSH, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
678,342✔
1877
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_WALINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
678,342✔
1878
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_COMMITTEDINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
678,342✔
1879
  if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
678,342✔
1880
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
678,342✔
1881
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
678,342✔
1882
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
678,342✔
1883
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
678,342✔
1884
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
678,342✔
1885
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_COMPACT_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
678,342✔
1886
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_TRIM_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
678,342✔
1887
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_SCAN_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
678,342✔
1888
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
678,342✔
1889
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_SCAN, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
678,342✔
1890
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
678,342✔
1891
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_NAME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
678,342✔
1892
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
678,342✔
1893
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
678,342✔
1894
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
678,342✔
1895

1896
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
678,342✔
1897
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
678,342✔
1898
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
678,342✔
1899
  if (dmSetMgmtHandle(pArray, TDMT_VND_DISABLE_WRITE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
678,342✔
1900
  if (dmSetMgmtHandle(pArray, TDMT_VND_SET_KEEP_VERSION, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
678,342✔
1901
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
678,342✔
1902
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
678,342✔
1903
  if (dmSetMgmtHandle(pArray, TDMT_VND_SCAN, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
678,342✔
1904
  if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
678,342✔
1905
  if (dmSetMgmtHandle(pArray, TDMT_VND_LIST_SSMIGRATE_FILESETS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
678,342✔
1906
  if (dmSetMgmtHandle(pArray, TDMT_VND_SSMIGRATE_FILESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
678,342✔
1907
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_SSMIGRATE_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
678,342✔
1908
  if (dmSetMgmtHandle(pArray, TDMT_VND_FOLLOWER_SSMIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
678,342✔
1909
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_SSMIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
678,342✔
1910
  if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM_WAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
678,342✔
1911
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
678,342✔
1912
  if (dmSetMgmtHandle(pArray, TDMT_DND_MOUNT_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
678,342✔
1913
  if (dmSetMgmtHandle(pArray, TDMT_DND_RETRIEVE_MOUNT_PATH, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
678,342✔
1914
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
678,342✔
1915
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
678,342✔
1916
  if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
678,342✔
1917
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CONFIG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
678,342✔
1918
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_ELECTBASELINE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
678,342✔
1919

1920
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT_ELECTION, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
678,342✔
1921
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
678,342✔
1922
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
678,342✔
1923
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
678,342✔
1924
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
678,342✔
1925
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
678,342✔
1926
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
678,342✔
1927
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
678,342✔
1928
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
678,342✔
1929
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
678,342✔
1930
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
678,342✔
1931
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_FORCE_FOLLOWER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
678,342✔
1932

1933
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
678,342✔
1934
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
678,342✔
1935
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
678,342✔
1936
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
678,342✔
1937
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
678,342✔
1938

1939
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_HEARTBEAT, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
678,342✔
1940
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_CHECK_SYNC, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
678,342✔
1941
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_ASSIGNED_LEADER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
678,342✔
1942

1943
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_FETCH, vmPutMsgToStreamReaderQueue, 0) == NULL) goto _OVER;
678,342✔
1944
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_PULL, vmPutMsgToStreamReaderQueue, 0) == NULL) goto _OVER;
678,342✔
1945
  code = 0;
678,342✔
1946

1947
_OVER:
678,342✔
1948
  if (code != 0) {
678,342✔
1949
    taosArrayDestroy(pArray);
×
1950
    return NULL;
×
1951
  } else {
1952
    return pArray;
678,342✔
1953
  }
1954
}
1955

1956
void vmUpdateMetricsInfo(SVnodeMgmt *pMgmt, int64_t clusterId) {
×
1957
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
1958

1959
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
1960
  while (pIter) {
×
1961
    SVnodeObj **ppVnode = pIter;
×
1962
    if (ppVnode == NULL || *ppVnode == NULL) {
×
1963
      continue;
×
1964
    }
1965

1966
    SVnodeObj *pVnode = *ppVnode;
×
1967
    if (!pVnode->failed) {
×
1968
      SRawWriteMetrics metrics = {0};
×
1969
      if (vnodeGetRawWriteMetrics(pVnode->pImpl, &metrics) == 0) {
×
1970
        // Add the metrics to the global metrics system with cluster ID
1971
        SName   name = {0};
×
1972
        int32_t code = tNameFromString(&name, pVnode->pImpl->config.dbname, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1973
        if (code < 0) {
×
1974
          dError("failed to get db name since %s", tstrerror(code));
×
1975
          continue;
×
1976
        }
1977
        code = addWriteMetrics(pVnode->vgId, pMgmt->pData->dnodeId, clusterId, tsLocalEp, name.dbname, &metrics);
×
1978
        if (code != TSDB_CODE_SUCCESS) {
×
1979
          dError("Failed to add write metrics for vgId: %d, code: %d", pVnode->vgId, code);
×
1980
        } else {
1981
          // After successfully adding metrics, reset the vnode's write metrics using atomic operations
1982
          if (vnodeResetRawWriteMetrics(pVnode->pImpl, &metrics) != 0) {
×
1983
            dError("Failed to reset write metrics for vgId: %d", pVnode->vgId);
×
1984
          }
1985
        }
1986
      } else {
1987
        dError("Failed to get write metrics for vgId: %d", pVnode->vgId);
×
1988
      }
1989
    }
1990
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
1991
  }
1992

1993
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
1994
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc