• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4909

30 Dec 2025 10:52AM UTC coverage: 65.542% (+0.2%) from 65.386%
#4909

push

travis-ci

web-flow
enh: drop multi-stream (#33962)

60 of 106 new or added lines in 4 files covered. (56.6%)

857 existing lines in 113 files now uncovered.

193924 of 295877 relevant lines covered (65.54%)

120594206.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.43
/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "metrics.h"
18
#include "taos_monitor.h"
19
#include "vmInt.h"
20
#include "vnd.h"
21
#include "vnodeInt.h"
22
#include "tencrypt.h"
23

24
extern taos_counter_t *tsInsertCounter;
25

26
// Forward declaration for function defined in metrics.c
27
extern int32_t addWriteMetrics(int32_t vgId, int32_t dnodeId, int64_t clusterId, const char *dnodeEp,
28
                               const char *dbname, const SRawWriteMetrics *pRawMetrics);
29

30
void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
54,547,121✔
31
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad));
54,547,121✔
32
  if (pInfo->pVloads == NULL) return;
54,547,121✔
33

34
  tfsUpdateSize(pMgmt->pTfs);
54,547,121✔
35

36
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
54,547,121✔
37

38
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
54,547,121✔
39
  while (pIter) {
211,826,086✔
40
    SVnodeObj **ppVnode = pIter;
157,278,965✔
41
    if (ppVnode == NULL || *ppVnode == NULL) continue;
157,278,965✔
42

43
    SVnodeObj *pVnode = *ppVnode;
157,278,965✔
44
    SVnodeLoad vload = {.vgId = pVnode->vgId};
157,278,965✔
45
    if (!pVnode->failed) {
157,278,965✔
46
      if (vnodeGetLoad(pVnode->pImpl, &vload) != 0) {
157,278,965✔
47
        dError("failed to get vnode load");
×
48
      }
49
      if (isReset) vnodeResetLoad(pVnode->pImpl, &vload);
157,278,965✔
50
    }
51
    if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
314,557,930✔
52
      dError("failed to push vnode load");
×
53
    }
54
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
157,278,965✔
55
  }
56

57
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
54,547,121✔
58
}
59

60
void vmSetVnodeSyncTimeout(SVnodeMgmt *pMgmt) {
×
61
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
62

63
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
64
  while (pIter) {
×
65
    SVnodeObj **ppVnode = pIter;
×
66
    if (ppVnode == NULL || *ppVnode == NULL) continue;
×
67

68
    SVnodeObj *pVnode = *ppVnode;
×
69

70
    if (vnodeSetSyncTimeout(pVnode->pImpl, tsVnodeElectIntervalMs) != 0) {
×
71
      dError("vgId:%d, failed to vnodeSetSyncTimeout", pVnode->vgId);
×
72
    }
73
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
74
  }
75

76
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
77
}
×
78

79
void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
×
80
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoadLite));
×
81
  if (!pInfo->pVloads) return;
×
82

83
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
84

85
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
86
  while (pIter) {
×
87
    SVnodeObj **ppVnode = pIter;
×
88
    if (ppVnode == NULL || *ppVnode == NULL) continue;
×
89

90
    SVnodeObj *pVnode = *ppVnode;
×
91
    if (!pVnode->failed) {
×
92
      SVnodeLoadLite vload = {0};
×
93
      if (vnodeGetLoadLite(pVnode->pImpl, &vload) == 0) {
×
94
        if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
×
95
          taosArrayDestroy(pInfo->pVloads);
×
96
          pInfo->pVloads = NULL;
×
97
          break;
×
98
        }
99
      }
100
    }
101
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
102
  }
103

104
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
105
}
106

107
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
59✔
108
  SMonVloadInfo vloads = {0};
59✔
109
  vmGetVnodeLoads(pMgmt, &vloads, true);
59✔
110

111
  SArray *pVloads = vloads.pVloads;
59✔
112
  if (pVloads == NULL) return;
59✔
113

114
  int32_t totalVnodes = 0;
59✔
115
  int32_t masterNum = 0;
59✔
116
  int64_t numOfSelectReqs = 0;
59✔
117
  int64_t numOfInsertReqs = 0;
59✔
118
  int64_t numOfInsertSuccessReqs = 0;
59✔
119
  int64_t numOfBatchInsertReqs = 0;
59✔
120
  int64_t numOfBatchInsertSuccessReqs = 0;
59✔
121

122
  for (int32_t i = 0; i < taosArrayGetSize(pVloads); ++i) {
177✔
123
    SVnodeLoad *pLoad = taosArrayGet(pVloads, i);
118✔
124
    numOfSelectReqs += pLoad->numOfSelectReqs;
118✔
125
    numOfInsertReqs += pLoad->numOfInsertReqs;
118✔
126
    numOfInsertSuccessReqs += pLoad->numOfInsertSuccessReqs;
118✔
127
    numOfBatchInsertReqs += pLoad->numOfBatchInsertReqs;
118✔
128
    numOfBatchInsertSuccessReqs += pLoad->numOfBatchInsertSuccessReqs;
118✔
129
    if (pLoad->syncState == TAOS_SYNC_STATE_LEADER || pLoad->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
118✔
130
      masterNum++;
118✔
131
    }
132
    totalVnodes++;
118✔
133
  }
134

135
  pInfo->vstat.totalVnodes = totalVnodes;
59✔
136
  pInfo->vstat.masterNum = masterNum;
59✔
137
  pInfo->vstat.numOfSelectReqs = numOfSelectReqs;
59✔
138
  pInfo->vstat.numOfInsertReqs = numOfInsertReqs;                          // delta
59✔
139
  pInfo->vstat.numOfInsertSuccessReqs = numOfInsertSuccessReqs;            // delta
59✔
140
  pInfo->vstat.numOfBatchInsertReqs = numOfBatchInsertReqs;                // delta
59✔
141
  pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;  // delta
59✔
142
  pMgmt->state.totalVnodes = totalVnodes;
59✔
143
  pMgmt->state.masterNum = masterNum;
59✔
144
  pMgmt->state.numOfSelectReqs = numOfSelectReqs;
59✔
145
  pMgmt->state.numOfInsertReqs = numOfInsertReqs;
59✔
146
  pMgmt->state.numOfInsertSuccessReqs = numOfInsertSuccessReqs;
59✔
147
  pMgmt->state.numOfBatchInsertReqs = numOfBatchInsertReqs;
59✔
148
  pMgmt->state.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;
59✔
149

150
  if (tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs) != 0) {
59✔
151
    dError("failed to get tfs monitor info");
×
152
  }
153
  taosArrayDestroy(pVloads);
59✔
154
}
155

156
void vmCleanExpriedSamples(SVnodeMgmt *pMgmt) {
59✔
157
  int list_size = taos_counter_get_keys_size(tsInsertCounter);
59✔
158
  if (list_size == 0) return;
59✔
159
  int32_t *vgroup_ids;
×
160
  char   **keys;
×
161
  int      r = 0;
×
162
  r = taos_counter_get_vgroup_ids(tsInsertCounter, &keys, &vgroup_ids, &list_size);
×
163
  if (r) {
×
164
    dError("failed to get vgroup ids");
×
165
    return;
×
166
  }
167
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
168
  for (int i = 0; i < list_size; i++) {
×
169
    int32_t vgroup_id = vgroup_ids[i];
×
170
    void   *vnode = taosHashGet(pMgmt->runngingHash, &vgroup_id, sizeof(int32_t));
×
171
    if (vnode == NULL) {
×
172
      r = taos_counter_delete(tsInsertCounter, keys[i]);
×
173
      if (r) {
×
174
        dError("failed to delete monitor sample key:%s", keys[i]);
×
175
      }
176
    }
177
  }
178
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
179
  if (vgroup_ids) taosMemoryFree(vgroup_ids);
×
180
  if (keys) taosMemoryFree(keys);
×
181
  return;
×
182
}
183

184
void vmCleanExpiredMetrics(SVnodeMgmt *pMgmt) {
×
185
  if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0 || !tsEnableMetrics) {
×
186
    return;
×
187
  }
188

189
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
190
  void     *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
191
  SHashObj *pValidVgroups = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
×
192
  if (pValidVgroups == NULL) {
×
193
    (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
194
    return;
×
195
  }
196

197
  while (pIter != NULL) {
×
198
    SVnodeObj **ppVnode = pIter;
×
199
    if (ppVnode && *ppVnode) {
×
200
      int32_t vgId = (*ppVnode)->vgId;
×
201
      char    dummy = 1;  // hash table value (we only care about the key)
×
202
      if (taosHashPut(pValidVgroups, &vgId, sizeof(int32_t), &dummy, sizeof(char)) != 0) {
×
203
        dError("failed to put vgId:%d to valid vgroups hash", vgId);
×
204
      }
205
    }
206
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
207
  }
208
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
209

210
  // Clean expired metrics by removing metrics for non-existent vgroups
211
  int32_t code = cleanupExpiredMetrics(pValidVgroups);
×
212
  if (code != TSDB_CODE_SUCCESS) {
×
213
    dError("failed to clean expired metrics, code:%d", code);
×
214
  }
215

216
  taosHashCleanup(pValidVgroups);
×
217
}
218

219
static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
2,591,054✔
220
  memcpy(pCfg, &vnodeCfgDefault, sizeof(SVnodeCfg));
2,591,054✔
221

222
  pCfg->vgId = pCreate->vgId;
2,591,054✔
223
  tstrncpy(pCfg->dbname, pCreate->db, sizeof(pCfg->dbname));
2,590,675✔
224
  pCfg->dbId = pCreate->dbUid;
2,589,305✔
225
  pCfg->szPage = pCreate->pageSize * 1024;
2,590,327✔
226
  pCfg->szCache = pCreate->pages;
2,588,348✔
227
  pCfg->cacheLast = pCreate->cacheLast;
2,590,286✔
228
  pCfg->cacheLastSize = pCreate->cacheLastSize;
2,589,438✔
229
  pCfg->szBuf = (uint64_t)pCreate->buffer * 1024 * 1024;
2,588,987✔
230
  pCfg->isWeak = true;
2,589,601✔
231
  pCfg->isTsma = pCreate->isTsma;
2,590,222✔
232
  pCfg->tsdbCfg.compression = pCreate->compression;
2,588,942✔
233
  pCfg->tsdbCfg.precision = pCreate->precision;
2,589,496✔
234
  pCfg->tsdbCfg.days = pCreate->daysPerFile;
2,588,371✔
235
  pCfg->tsdbCfg.keep0 = pCreate->daysToKeep0;
2,589,070✔
236
  pCfg->tsdbCfg.keep1 = pCreate->daysToKeep1;
2,589,814✔
237
  pCfg->tsdbCfg.keep2 = pCreate->daysToKeep2;
2,589,425✔
238
  pCfg->tsdbCfg.keepTimeOffset = pCreate->keepTimeOffset;
2,589,383✔
239
  pCfg->tsdbCfg.minRows = pCreate->minRows;
2,587,112✔
240
  pCfg->tsdbCfg.maxRows = pCreate->maxRows;
2,588,658✔
241
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
242
  // pCfg->tsdbCfg.encryptAlgr = pCreate->encryptAlgr;
243
  tstrncpy(pCfg->tsdbCfg.encryptData.encryptAlgrName, pCreate->encryptAlgrName, TSDB_ENCRYPT_ALGR_NAME_LEN);
2,588,600✔
244
  if (pCfg->tsdbCfg.encryptAlgr == DND_CA_SM4 || pCfg->tsdbCfg.encryptData.encryptAlgrName[0] != '\0') {
2,590,655✔
UNCOV
245
    tstrncpy(pCfg->tsdbCfg.encryptData.encryptKey, tsDbKey, ENCRYPT_KEY_LEN + 1);
×
246
  }
247
#else
248
  pCfg->tsdbCfg.encryptAlgr = 0;
249
#endif
250

251
  pCfg->walCfg.vgId = pCreate->vgId;  // pCreate->mountVgId ? pCreate->mountVgId : pCreate->vgId;
2,588,237✔
252
  pCfg->walCfg.fsyncPeriod = pCreate->walFsyncPeriod;
2,587,697✔
253
  pCfg->walCfg.retentionPeriod = pCreate->walRetentionPeriod;
2,586,012✔
254
  pCfg->walCfg.rollPeriod = pCreate->walRollPeriod;
2,585,743✔
255
  pCfg->walCfg.retentionSize = pCreate->walRetentionSize;
2,587,373✔
256
  pCfg->walCfg.segSize = pCreate->walSegmentSize;
2,588,014✔
257
  pCfg->walCfg.level = pCreate->walLevel;
2,587,907✔
258
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
259
  // pCfg->walCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
260
  tstrncpy(pCfg->walCfg.encryptData.encryptAlgrName, pCreate->encryptAlgrName, TSDB_ENCRYPT_ALGR_NAME_LEN);
2,587,356✔
261
  if (pCfg->walCfg.encryptAlgr == DND_CA_SM4 || pCfg->walCfg.encryptData.encryptAlgrName[0] != '\0') {
2,590,043✔
262
    tstrncpy(pCfg->walCfg.encryptData.encryptKey, tsDbKey, ENCRYPT_KEY_LEN + 1);
2,116✔
263
  }
264
#else
265
  pCfg->walCfg.encryptAlgr = 0;
266
#endif
267

268
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
269
  // pCfg->tdbEncryptAlgorithm = pCreate->encryptAlgorithm;
270
  tstrncpy(pCfg->tdbEncryptData.encryptAlgrName, pCreate->encryptAlgrName, TSDB_ENCRYPT_ALGR_NAME_LEN);
2,587,528✔
271
  if (pCfg->tdbEncryptAlgr == DND_CA_SM4 || pCfg->tdbEncryptData.encryptAlgrName[0] != '\0') {
2,588,964✔
272
    tstrncpy(pCfg->tdbEncryptData.encryptKey, tsDbKey, ENCRYPT_KEY_LEN + 1);
3,451✔
273
  }
274
#else
275
  pCfg->tdbEncryptAlgr = 0;
276
#endif
277

278
  pCfg->sttTrigger = pCreate->sstTrigger;
2,583,840✔
279
  pCfg->hashBegin = pCreate->hashBegin;
2,587,797✔
280
  pCfg->hashEnd = pCreate->hashEnd;
2,587,884✔
281
  pCfg->hashMethod = pCreate->hashMethod;
2,587,453✔
282
  pCfg->hashPrefix = pCreate->hashPrefix;
2,585,685✔
283
  pCfg->hashSuffix = pCreate->hashSuffix;
2,588,789✔
284
  pCfg->tsdbPageSize = pCreate->tsdbPageSize * 1024;
2,586,579✔
285

286
  pCfg->ssChunkSize = pCreate->ssChunkSize;
2,584,598✔
287
  pCfg->ssKeepLocal = pCreate->ssKeepLocal;
2,587,605✔
288
  pCfg->ssCompact = pCreate->ssCompact;
2,587,034✔
289

290
  pCfg->standby = 0;
2,585,122✔
291
  pCfg->syncCfg.replicaNum = 0;
2,586,986✔
292
  pCfg->syncCfg.totalReplicaNum = 0;
2,584,600✔
293
  pCfg->syncCfg.changeVersion = pCreate->changeVersion;
2,588,275✔
294

295
  memset(&pCfg->syncCfg.nodeInfo, 0, sizeof(pCfg->syncCfg.nodeInfo));
2,587,386✔
296
  for (int32_t i = 0; i < pCreate->replica; ++i) {
6,066,127✔
297
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
3,477,465✔
298
    pNode->nodeId = pCreate->replicas[pCfg->syncCfg.replicaNum].id;
3,476,016✔
299
    pNode->nodePort = pCreate->replicas[pCfg->syncCfg.replicaNum].port;
3,475,291✔
300
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
3,474,560✔
301
    tstrncpy(pNode->nodeFqdn, pCreate->replicas[pCfg->syncCfg.replicaNum].fqdn, TSDB_FQDN_LEN);
3,472,828✔
302
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
3,477,598✔
303
    pCfg->syncCfg.replicaNum++;
3,480,120✔
304
  }
305
  if (pCreate->selfIndex != -1) {
2,589,247✔
306
    pCfg->syncCfg.myIndex = pCreate->selfIndex;
2,498,131✔
307
  }
308
  for (int32_t i = pCfg->syncCfg.replicaNum; i < pCreate->replica + pCreate->learnerReplica; ++i) {
2,679,133✔
309
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
91,943✔
310
    pNode->nodeId = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].id;
91,943✔
311
    pNode->nodePort = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].port;
91,943✔
312
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
91,943✔
313
    tstrncpy(pNode->nodeFqdn, pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].fqdn, TSDB_FQDN_LEN);
91,943✔
314
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
91,943✔
315
    pCfg->syncCfg.totalReplicaNum++;
91,943✔
316
  }
317
  pCfg->syncCfg.totalReplicaNum += pCfg->syncCfg.replicaNum;
2,585,112✔
318
  if (pCreate->learnerSelfIndex != -1) {
2,587,159✔
319
    pCfg->syncCfg.myIndex = pCreate->replica + pCreate->learnerSelfIndex;
91,943✔
320
  }
321
}
2,587,060✔
322

323
static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) {
2,585,729✔
324
  pCfg->vgId = pCreate->vgId;
2,585,729✔
325
  pCfg->vgVersion = pCreate->vgVersion;
2,584,406✔
326
  pCfg->dropped = 0;
2,586,878✔
327
  snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCreate->vgId);
2,581,261✔
328
}
2,584,428✔
329

330
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
2,579,995✔
331
  SCreateVnodeReq req = {0};
2,579,995✔
332
  SVnodeCfg       vnodeCfg = {0};
2,589,854✔
333
  SWrapperCfg     wrapperCfg = {0};
2,589,365✔
334
  int32_t         code = -1;
2,589,725✔
335
  char            path[TSDB_FILENAME_LEN] = {0};
2,589,725✔
336

337
  if (tDeserializeSCreateVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
2,588,412✔
338
    return TSDB_CODE_INVALID_MSG;
×
339
  }
340

341
  if (req.learnerReplica == 0) {
2,590,434✔
342
    req.learnerSelfIndex = -1;
2,498,491✔
343
  }
344

345
  dInfo(
2,590,434✔
346
      "vgId:%d, vnode management handle msgType:%s, start to create vnode, page:%d pageSize:%d buffer:%d szPage:%d "
347
      "szBuf:%" PRIu64 ", cacheLast:%d cacheLastSize:%d sstTrigger:%d tsdbPageSize:%d %d dbname:%s dbId:%" PRId64
348
      ", days:%d keep0:%d keep1:%d keep2:%d keepTimeOffset%d ssChunkSize:%d ssKeepLocal:%d ssCompact:%d tsma:%d "
349
      "precision:%d compression:%d minRows:%d maxRows:%d"
350
      ", wal fsync:%d level:%d retentionPeriod:%d retentionSize:%" PRId64 " rollPeriod:%d segSize:%" PRId64
351
      ", hash method:%d begin:%u end:%u prefix:%d surfix:%d replica:%d selfIndex:%d "
352
      "learnerReplica:%d learnerSelfIndex:%d strict:%d changeVersion:%d encryptAlgorithm:%d encryptAlgrName:%s",
353
      req.vgId, TMSG_INFO(pMsg->msgType), req.pages, req.pageSize, req.buffer, req.pageSize * 1024,
354
      (uint64_t)req.buffer * 1024 * 1024, req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize,
355
      req.tsdbPageSize * 1024, req.db, req.dbUid, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
356
      req.keepTimeOffset, req.ssChunkSize, req.ssKeepLocal, req.ssCompact, req.isTsma, req.precision, req.compression,
357
      req.minRows, req.maxRows, req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize,
358
      req.walRollPeriod, req.walSegmentSize, req.hashMethod, req.hashBegin, req.hashEnd, req.hashPrefix, req.hashSuffix,
359
      req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict, req.changeVersion,
360
      req.encryptAlgorithm, req.encryptAlgrName);
361

362
  for (int32_t i = 0; i < req.replica; ++i) {
6,071,417✔
363
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port,
3,480,983✔
364
          req.replicas[i].id);
365
  }
366
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
2,682,377✔
367
    dInfo("vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.vgId, i, req.learnerReplicas[i].fqdn,
91,943✔
368
          req.learnerReplicas[i].port, req.replicas[i].id);
369
  }
370

371
  SReplica *pReplica = NULL;
2,590,434✔
372
  if (req.selfIndex != -1) {
2,590,434✔
373
    pReplica = &req.replicas[req.selfIndex];
2,498,491✔
374
  } else {
375
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
91,943✔
376
  }
377
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
2,590,434✔
378
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
2,590,110✔
379
    (void)tFreeSCreateVnodeReq(&req);
714✔
380

381
    code = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
382
    dError("vgId:%d, dnodeId:%d ep:%s:%u in request, ep:%s:%u in local, %s", req.vgId, pReplica->id,
×
383
           pReplica->fqdn, pReplica->port, tsLocalFqdn, tsServerPort, tstrerror(code));
384
    return code;
×
385
  }
386

387
  if (taosWaitCfgKeyLoaded() != 0) {
2,589,720✔
388
    (void)tFreeSCreateVnodeReq(&req);
×
389
    code = terrno;
×
390
    dError("vgId:%d, failed to create vnode since encrypt key is not loaded, reason:%s", req.vgId, tstrerror(code));
×
391
    return code;
×
392
  }
393

394
  if (req.encryptAlgrName[0] != '\0' && strlen(tsDbKey) == 0) {
2,590,110✔
395
    (void)tFreeSCreateVnodeReq(&req);
×
396
    code = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
397
    dError("vgId:%d, failed to create vnode since encrypt key is empty, reason:%s", req.vgId, tstrerror(code));
×
398
    return code;
×
399
  }
400

401
  vmGenerateVnodeCfg(&req, &vnodeCfg);
2,590,110✔
402

403
  vmGenerateWrapperCfg(pMgmt, &req, &wrapperCfg);
2,585,414✔
404

405
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, req.vgId, false);
2,587,731✔
406
  if (pVnode != NULL && (req.replica == 1 || !pVnode->failed)) {
2,590,110✔
407
    dError("vgId:%d, already exist", req.vgId);
10,949✔
408
    (void)tFreeSCreateVnodeReq(&req);
10,949✔
409
    vmReleaseVnode(pMgmt, pVnode);
10,949✔
410
    code = TSDB_CODE_VND_ALREADY_EXIST;
10,949✔
411
    return 0;
10,949✔
412
  }
413

414
  int32_t diskPrimary = vmGetPrimaryDisk(pMgmt, vnodeCfg.vgId);
2,579,161✔
415
  if (diskPrimary < 0) {
2,577,951✔
416
    diskPrimary = vmAllocPrimaryDisk(pMgmt, vnodeCfg.vgId);
2,577,951✔
417
  }
418
  wrapperCfg.diskPrimary = diskPrimary;
2,579,485✔
419

420
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
2,579,485✔
421

422
  if ((code = vnodeCreate(path, &vnodeCfg, diskPrimary, pMgmt->pTfs)) < 0) {
2,579,485✔
423
    dError("vgId:%d, failed to create vnode since %s", req.vgId, tstrerror(code));
×
424
    vmReleaseVnode(pMgmt, pVnode);
×
425
    vmCleanPrimaryDisk(pMgmt, req.vgId);
×
426
    (void)tFreeSCreateVnodeReq(&req);
×
427
    return code;
×
428
  }
429

430
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, true);
2,579,485✔
431
  if (pImpl == NULL) {
2,579,485✔
432
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
433
    code = terrno != 0 ? terrno : -1;
×
434
    goto _OVER;
×
435
  }
436

437
  code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl);
2,579,485✔
438
  if (code != 0) {
2,579,485✔
439
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
440
    code = terrno != 0 ? terrno : code;
×
441
    goto _OVER;
×
442
  }
443

444
  code = vnodeStart(pImpl);
2,579,485✔
445
  if (code != 0) {
2,579,485✔
446
    dError("vgId:%d, failed to start sync since %s", req.vgId, terrstr());
×
447
    goto _OVER;
×
448
  }
449

450
  code = vmWriteVnodeListToFile(pMgmt);
2,579,485✔
451
  if (code != 0) {
2,579,485✔
452
    code = terrno != 0 ? terrno : code;
×
453
    goto _OVER;
×
454
  }
455

456
_OVER:
2,579,485✔
457
  vmCleanPrimaryDisk(pMgmt, req.vgId);
2,579,485✔
458

459
  if (code != 0) {
2,578,848✔
460
    vmCloseFailedVnode(pMgmt, req.vgId);
×
461

462
    vnodeClose(pImpl);
×
463
    vnodeDestroy(0, path, pMgmt->pTfs, 0);
×
464
  } else {
465
    dInfo("vgId:%d, vnode management handle msgType:%s, end to create vnode, vnode is created", req.vgId,
2,578,848✔
466
          TMSG_INFO(pMsg->msgType));
467
  }
468

469
  (void)tFreeSCreateVnodeReq(&req);
2,579,485✔
470
  terrno = code;
2,579,485✔
471
  return code;
2,579,485✔
472
}
473

474
#ifdef USE_MOUNT
475
typedef struct {
476
  int64_t dbId;
477
  int32_t vgId;
478
  int32_t diskPrimary;
479
} SMountDbVgId;
480
extern int32_t vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo);
481
extern int32_t mndFetchSdbStables(const char *mntName, const char *path, void *output);
482

483
static int compareVnodeInfo(const void *p1, const void *p2) {
2,513✔
484
  SVnodeInfo *v1 = (SVnodeInfo *)p1;
2,513✔
485
  SVnodeInfo *v2 = (SVnodeInfo *)p2;
2,513✔
486

487
  if (v1->config.dbId == v2->config.dbId) {
2,513✔
488
    if (v1->config.vgId == v2->config.vgId) {
1,436✔
489
      return 0;
×
490
    }
491
    return v1->config.vgId > v2->config.vgId ? 1 : -1;
1,436✔
492
  }
493

494
  return v1->config.dbId > v2->config.dbId ? 1 : -1;
1,077✔
495
}
496
static int compareVgDiskPrimary(const void *p1, const void *p2) {
2,513✔
497
  SMountDbVgId *v1 = (SMountDbVgId *)p1;
2,513✔
498
  SMountDbVgId *v2 = (SMountDbVgId *)p2;
2,513✔
499

500
  if (v1->dbId == v2->dbId) {
2,513✔
501
    if (v1->vgId == v2->vgId) {
1,436✔
502
      return 0;
×
503
    }
504
    return v1->vgId > v2->vgId ? 1 : -1;
1,436✔
505
  }
506

507
  return v1->dbId > v2->dbId ? 1 : -1;
1,077✔
508
}
509

510
static int32_t vmRetrieveMountDnode(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
359✔
511
  int32_t   code = 0, lino = 0;
359✔
512
  TdFilePtr pFile = NULL;
359✔
513
  char     *content = NULL;
359✔
514
  SJson    *pJson = NULL;
359✔
515
  int64_t   size = 0;
359✔
516
  int64_t   clusterId = 0, dropped = 0, encryptScope = 0;
359✔
517
  char      file[TSDB_MOUNT_FPATH_LEN] = {0};
359✔
518
  SArray   *pDisks = NULL;
359✔
519
  // step 1: fetch clusterId from dnode.json
520
  (void)snprintf(file, sizeof(file), "%s%s%s%sdnode.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP);
359✔
521
  TAOS_CHECK_EXIT(taosStatFile(file, &size, NULL, NULL));
359✔
522
  TSDB_CHECK_NULL((pFile = taosOpenFile(file, TD_FILE_READ)), code, lino, _exit, terrno);
359✔
523
  TSDB_CHECK_NULL((content = taosMemoryMalloc(size + 1)), code, lino, _exit, terrno);
359✔
524
  if (taosReadFile(pFile, content, size) != size) {
359✔
525
    TAOS_CHECK_EXIT(terrno);
×
526
  }
527
  content[size] = '\0';
359✔
528
  pJson = tjsonParse(content);
359✔
529
  if (pJson == NULL) {
359✔
530
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
531
  }
532
  tjsonGetNumberValue(pJson, "dropped", dropped, code);
359✔
533
  TAOS_CHECK_EXIT(code);
359✔
534
  if (dropped == 1) {
359✔
535
    TAOS_CHECK_EXIT(TSDB_CODE_MND_MOUNT_DNODE_DROPPED);
×
536
  }
537
  tjsonGetNumberValue(pJson, "encryptScope", encryptScope, code);
359✔
538
  TAOS_CHECK_EXIT(code);
359✔
539
  if (encryptScope != 0) {
359✔
540
    TAOS_CHECK_EXIT(TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG);
×
541
  }
542
  tjsonGetNumberValue(pJson, "clusterId", clusterId, code);
359✔
543
  TAOS_CHECK_EXIT(code);
359✔
544
  if (clusterId == 0) {
359✔
545
    TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_CLUSTER_ID);
×
546
  }
547
  pMountInfo->clusterId = clusterId;
359✔
548
  if (content != NULL) taosMemoryFreeClear(content);
359✔
549
  if (pJson != NULL) {
359✔
550
    cJSON_Delete(pJson);
359✔
551
    pJson = NULL;
359✔
552
  }
553
  if (pFile != NULL) taosCloseFile(&pFile);
359✔
554
  // step 2: fetch dataDir from dnode/config/local.json
555
  TAOS_CHECK_EXIT(vmGetMountDisks(pMgmt, pReq->mountPath, &pDisks));
359✔
556
  int32_t nDisks = taosArrayGetSize(pDisks);
359✔
557
  if (nDisks < 1 || nDisks > TFS_MAX_DISKS) {
359✔
558
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
559
  }
560
  for (int32_t i = 0; i < nDisks; ++i) {
2,513✔
561
    SDiskCfg *pDisk = TARRAY_GET_ELEM(pDisks, i);
2,154✔
562
    if (!pMountInfo->pDisks[pDisk->level]) {
2,154✔
563
      pMountInfo->pDisks[pDisk->level] = taosArrayInit(1, sizeof(char *));
718✔
564
      if (!pMountInfo->pDisks[pDisk->level]) {
718✔
565
        TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
566
      }
567
    }
568
    char *pDir = taosStrdup(pDisk->dir);
2,154✔
569
    if (pDir == NULL) {
2,154✔
570
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
571
    }
572
    if (pDisk->primary == 1 && taosArrayGetSize(pMountInfo->pDisks[0])) {
2,154✔
573
      // put the primary disk to the first position of level 0
574
      if (!taosArrayInsert(pMountInfo->pDisks[0], 0, &pDir)) {
359✔
575
        taosMemFree(pDir);
×
576
        TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
577
      }
578
    } else if (!taosArrayPush(pMountInfo->pDisks[pDisk->level], &pDir)) {
3,590✔
579
      taosMemFree(pDir);
×
580
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
581
    }
582
  }
583
  for (int32_t i = 0; i < TFS_MAX_TIERS; ++i) {
1,436✔
584
    int32_t nDisk = taosArrayGetSize(pMountInfo->pDisks[i]);
1,077✔
585
    if (nDisk < (i == 0 ? 1 : 0) || nDisk > TFS_MAX_DISKS_PER_TIER) {
1,077✔
586
      dError("mount:%s, invalid disk number:%d at level:%d", pReq->mountName, nDisk, i);
×
587
      TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
588
    }
589
  }
590
_exit:
359✔
591
  if (content != NULL) taosMemoryFreeClear(content);
359✔
592
  if (pJson != NULL) cJSON_Delete(pJson);
359✔
593
  if (pFile != NULL) taosCloseFile(&pFile);
359✔
594
  if (pDisks != NULL) {
359✔
595
    taosArrayDestroy(pDisks);
359✔
596
    pDisks = NULL;
359✔
597
  }
598
  if (code != 0) {
359✔
599
    dError("mount:%s, failed to retrieve mount dnode at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
600
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
601
  } else {
602
    dInfo("mount:%s, success to retrieve mount dnode on dnode:%d, clusterId:%" PRId64 ", path:%s", pReq->mountName,
359✔
603
          pReq->dnodeId, pMountInfo->clusterId, pReq->mountPath);
604
  }
605
  TAOS_RETURN(code);
359✔
606
}
607

608
static int32_t vmRetrieveMountVnodes(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
359✔
609
  int32_t       code = 0, lino = 0;
359✔
610
  SWrapperCfg  *pCfgs = NULL;
359✔
611
  int32_t       numOfVnodes = 0;
359✔
612
  char          path[TSDB_MOUNT_FPATH_LEN] = {0};
359✔
613
  TdDirPtr      pDir = NULL;
359✔
614
  TdDirEntryPtr de = NULL;
359✔
615
  SVnodeMgmt    vnodeMgmt = {0};
359✔
616
  SArray       *pVgCfgs = NULL;
359✔
617
  SArray       *pDbInfos = NULL;
359✔
618
  SArray       *pDiskPrimarys = NULL;
359✔
619

620
  snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
359✔
621
  vnodeMgmt.path = path;
359✔
622
  TAOS_CHECK_EXIT(vmGetVnodeListFromFile(&vnodeMgmt, &pCfgs, &numOfVnodes));
359✔
623
  dInfo("mount:%s, num of vnodes is %d in path:%s", pReq->mountName, numOfVnodes, vnodeMgmt.path);
359✔
624
  TSDB_CHECK_NULL((pVgCfgs = taosArrayInit_s(sizeof(SVnodeInfo), numOfVnodes)), code, lino, _exit, terrno);
359✔
625
  TSDB_CHECK_NULL((pDiskPrimarys = taosArrayInit(numOfVnodes, sizeof(SMountDbVgId))), code, lino, _exit, terrno);
359✔
626

627
  int32_t nDiskLevel0 = taosArrayGetSize(pMountInfo->pDisks[0]);
359✔
628
  int32_t nVgDropped = 0, j = 0;
359✔
629
  for (int32_t i = 0; i < numOfVnodes; ++i) {
1,795✔
630
    SWrapperCfg *pCfg = &pCfgs[i];
1,436✔
631
    // in order to support multi-tier disk, the pCfg->path should be adapted according to the diskPrimary firstly
632
    if (nDiskLevel0 > 1) {
1,436✔
633
      char *pDir = taosArrayGet(pMountInfo->pDisks[0], pCfg->diskPrimary);
1,436✔
634
      if (!pDir) TAOS_CHECK_EXIT(TSDB_CODE_INTERNAL_ERROR);
1,436✔
635
      (void)snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%svnode%d", *(char **)pDir, TD_DIRSEP, TD_DIRSEP,
1,436✔
636
                     pCfg->vgId);
637
    }
638
    dInfo("mount:%s, vnode path:%s, dropped:%" PRIi8, pReq->mountName, pCfg->path, pCfg->dropped);
1,436✔
639
    if (pCfg->dropped) {
1,436✔
640
      ++nVgDropped;
×
641
      continue;
×
642
    }
643
    if (!taosCheckAccessFile(pCfg->path, TD_FILE_ACCESS_EXIST_OK | TD_FILE_ACCESS_READ_OK | TD_FILE_ACCESS_WRITE_OK)) {
1,436✔
644
      dError("mount:%s, vnode path:%s, no r/w authority", pReq->mountName, pCfg->path);
×
645
      TAOS_CHECK_EXIT(TSDB_CODE_MND_NO_RIGHTS);
×
646
    }
647
    SVnodeInfo *pInfo = TARRAY_GET_ELEM(pVgCfgs, j++);
1,436✔
648
    TAOS_CHECK_EXIT(vnodeLoadInfo(pCfg->path, pInfo));
1,436✔
649
    if (pInfo->config.syncCfg.replicaNum > 1) {
1,436✔
650
      dError("mount:%s, vnode path:%s, invalid replica:%d", pReq->mountName, pCfg->path,
×
651
             pInfo->config.syncCfg.replicaNum);
652
      TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_REPLICA);
×
653
    } else if (pInfo->config.vgId != pCfg->vgId) {
1,436✔
654
      dError("mount:%s, vnode path:%s, vgId:%d not match:%d", pReq->mountName, pCfg->path, pInfo->config.vgId,
×
655
             pCfg->vgId);
656
      TAOS_CHECK_EXIT(TSDB_CODE_FILE_CORRUPTED);
×
657
    } else if (pInfo->config.tdbEncryptData.encryptAlgrName[0] != '\0' ||
1,436✔
658
               pInfo->config.tsdbCfg.encryptData.encryptAlgrName[0] != '\0' ||
1,436✔
659
               pInfo->config.walCfg.encryptData.encryptAlgrName[0] != '\0') {
1,436✔
660
      dError("mount:%s, vnode path:%s, invalid encrypt algorithm, tdb:%s wal:%s tsdb:%s", pReq->mountName, pCfg->path,
×
661
             pInfo->config.tdbEncryptData.encryptAlgrName, pInfo->config.walCfg.encryptData.encryptAlgrName,
662
             pInfo->config.tsdbCfg.encryptData.encryptAlgrName);
663
      TAOS_CHECK_EXIT(TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG);
×
664
    }
665
    SMountDbVgId dbVgId = {.dbId = pInfo->config.dbId, .vgId = pInfo->config.vgId, .diskPrimary = pCfg->diskPrimary};
1,436✔
666
    TSDB_CHECK_NULL(taosArrayPush(pDiskPrimarys, &dbVgId), code, lino, _exit, terrno);
1,436✔
667
  }
668
  if (nVgDropped > 0) {
359✔
669
    dInfo("mount:%s, %d vnodes are dropped", pReq->mountName, nVgDropped);
×
670
    int32_t nVgToDrop = taosArrayGetSize(pVgCfgs) - nVgDropped;
×
671
    if (nVgToDrop > 0) taosArrayRemoveBatch(pVgCfgs, nVgToDrop - 1, nVgToDrop, NULL);
×
672
  }
673
  int32_t nVgCfg = taosArrayGetSize(pVgCfgs);
359✔
674
  int32_t nDiskPrimary = taosArrayGetSize(pDiskPrimarys);
359✔
675
  if (nVgCfg != nDiskPrimary) {
359✔
676
    dError("mount:%s, nVgCfg:%d not match nDiskPrimary:%d", pReq->mountName, nVgCfg, nDiskPrimary);
×
677
    TAOS_CHECK_EXIT(TSDB_CODE_APP_ERROR);
×
678
  }
679
  if (nVgCfg > 1) {
359✔
680
    taosArraySort(pVgCfgs, compareVnodeInfo);
359✔
681
    taosArraySort(pDiskPrimarys, compareVgDiskPrimary);
359✔
682
  }
683

684
  int64_t clusterId = pMountInfo->clusterId;
359✔
685
  int64_t dbId = 0, vgId = 0, nDb = 0;
359✔
686
  for (int32_t i = 0; i < nVgCfg; ++i) {
1,303✔
687
    SVnodeInfo *pInfo = TARRAY_GET_ELEM(pVgCfgs, i);
1,067✔
688
    if (clusterId != pInfo->config.syncCfg.nodeInfo->clusterId) {
1,067✔
689
      dError("mount:%s, clusterId:%" PRId64 " not match:%" PRId64, pReq->mountName, clusterId,
123✔
690
             pInfo->config.syncCfg.nodeInfo->clusterId);
691
      TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_CLUSTER_ID);
123✔
692
    }
693
    if (dbId != pInfo->config.dbId) {
944✔
694
      dbId = pInfo->config.dbId;
472✔
695
      ++nDb;
472✔
696
    }
697
    if (vgId == pInfo->config.vgId) {
944✔
698
      TAOS_CHECK_EXIT(TSDB_CODE_FILE_CORRUPTED);
×
699
    } else {
700
      vgId = pInfo->config.vgId;
944✔
701
    }
702
  }
703

704
  if (nDb > 0) {
236✔
705
    TSDB_CHECK_NULL((pDbInfos = taosArrayInit_s(sizeof(SMountDbInfo), nDb)), code, lino, _exit, terrno);
236✔
706
    int32_t dbIdx = -1;
236✔
707
    for (int32_t i = 0; i < nVgCfg; ++i) {
1,180✔
708
      SVnodeInfo   *pVgCfg = TARRAY_GET_ELEM(pVgCfgs, i);
944✔
709
      SMountDbVgId *pDiskPrimary = TARRAY_GET_ELEM(pDiskPrimarys, i);
944✔
710
      SMountDbInfo *pDbInfo = NULL;
944✔
711
      if (i == 0 || ((SMountDbInfo *)TARRAY_GET_ELEM(pDbInfos, dbIdx))->dbId != pVgCfg->config.dbId) {
944✔
712
        pDbInfo = TARRAY_GET_ELEM(pDbInfos, ++dbIdx);
472✔
713
        pDbInfo->dbId = pVgCfg->config.dbId;
472✔
714
        snprintf(pDbInfo->dbName, sizeof(pDbInfo->dbName), "%s", pVgCfg->config.dbname);
472✔
715
        TSDB_CHECK_NULL((pDbInfo->pVgs = taosArrayInit(nVgCfg / nDb, sizeof(SMountVgInfo))), code, lino, _exit, terrno);
472✔
716
      } else {
717
        pDbInfo = TARRAY_GET_ELEM(pDbInfos, dbIdx);
472✔
718
      }
719
      SMountVgInfo vgInfo = {
944✔
720
          .diskPrimary = pDiskPrimary->diskPrimary,
944✔
721
          .vgId = pVgCfg->config.vgId,
944✔
722
          .dbId = pVgCfg->config.dbId,
944✔
723
          .cacheLastSize = pVgCfg->config.cacheLastSize,
944✔
724
          .szPage = pVgCfg->config.szPage,
944✔
725
          .szCache = pVgCfg->config.szCache,
944✔
726
          .szBuf = pVgCfg->config.szBuf,
944✔
727
          .cacheLast = pVgCfg->config.cacheLast,
944✔
728
          .standby = pVgCfg->config.standby,
944✔
729
          .hashMethod = pVgCfg->config.hashMethod,
944✔
730
          .hashBegin = pVgCfg->config.hashBegin,
944✔
731
          .hashEnd = pVgCfg->config.hashEnd,
944✔
732
          .hashPrefix = pVgCfg->config.hashPrefix,
944✔
733
          .hashSuffix = pVgCfg->config.hashSuffix,
944✔
734
          .sttTrigger = pVgCfg->config.sttTrigger,
944✔
735
          .replications = pVgCfg->config.syncCfg.replicaNum,
944✔
736
          .precision = pVgCfg->config.tsdbCfg.precision,
944✔
737
          .compression = pVgCfg->config.tsdbCfg.compression,
944✔
738
          .slLevel = pVgCfg->config.tsdbCfg.slLevel,
944✔
739
          .daysPerFile = pVgCfg->config.tsdbCfg.days,
944✔
740
          .keep0 = pVgCfg->config.tsdbCfg.keep0,
944✔
741
          .keep1 = pVgCfg->config.tsdbCfg.keep1,
944✔
742
          .keep2 = pVgCfg->config.tsdbCfg.keep2,
944✔
743
          .keepTimeOffset = pVgCfg->config.tsdbCfg.keepTimeOffset,
944✔
744
          .minRows = pVgCfg->config.tsdbCfg.minRows,
944✔
745
          .maxRows = pVgCfg->config.tsdbCfg.maxRows,
944✔
746
          .tsdbPageSize = pVgCfg->config.tsdbPageSize / 1024,
944✔
747
          .ssChunkSize = pVgCfg->config.ssChunkSize,
944✔
748
          .ssKeepLocal = pVgCfg->config.ssKeepLocal,
944✔
749
          .ssCompact = pVgCfg->config.ssCompact,
944✔
750
          .walFsyncPeriod = pVgCfg->config.walCfg.fsyncPeriod,
944✔
751
          .walRetentionPeriod = pVgCfg->config.walCfg.retentionPeriod,
944✔
752
          .walRollPeriod = pVgCfg->config.walCfg.rollPeriod,
944✔
753
          .walRetentionSize = pVgCfg->config.walCfg.retentionSize,
944✔
754
          .walSegSize = pVgCfg->config.walCfg.segSize,
944✔
755
          .walLevel = pVgCfg->config.walCfg.level,
944✔
756
          //.encryptAlgorithm = pVgCfg->config.walCfg.encryptAlgorithm,
757
          .committed = pVgCfg->state.committed,
944✔
758
          .commitID = pVgCfg->state.commitID,
944✔
759
          .commitTerm = pVgCfg->state.commitTerm,
944✔
760
          .numOfSTables = pVgCfg->config.vndStats.numOfSTables,
944✔
761
          .numOfCTables = pVgCfg->config.vndStats.numOfCTables,
944✔
762
          .numOfNTables = pVgCfg->config.vndStats.numOfNTables,
944✔
763
      };
764
      TSDB_CHECK_NULL(taosArrayPush(pDbInfo->pVgs, &vgInfo), code, lino, _exit, terrno);
1,888✔
765
    }
766
  }
767

768
  pMountInfo->pDbs = pDbInfos;
236✔
769

770
_exit:
359✔
771
  if (code != 0) {
359✔
772
    dError("mount:%s, failed to retrieve mount vnode at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
123✔
773
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
774
  }
775
  taosArrayDestroy(pDiskPrimarys);
359✔
776
  taosArrayDestroy(pVgCfgs);
359✔
777
  taosMemoryFreeClear(pCfgs);
359✔
778
  TAOS_RETURN(code);
359✔
779
}
780

781
/**
782
 *   Retrieve the stables from vnode meta.
783
 */
784
static int32_t vmRetrieveMountStbs(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
236✔
785
  int32_t code = 0, lino = 0;
236✔
786
  char    path[TSDB_MOUNT_FPATH_LEN] = {0};
236✔
787
  int32_t nDb = taosArrayGetSize(pMountInfo->pDbs);
236✔
788
  SArray *suidList = NULL;
236✔
789
  SArray *pCols = NULL;
236✔
790
  SArray *pTags = NULL;
236✔
791
  SArray *pColExts = NULL;
236✔
792
  SArray *pTagExts = NULL;
236✔
793

794
  snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
236✔
795
  for (int32_t i = 0; i < nDb; ++i) {
708✔
796
    SMountDbInfo *pDbInfo = TARRAY_GET_ELEM(pMountInfo->pDbs, i);
472✔
797
    int32_t       nVg = taosArrayGetSize(pDbInfo->pVgs);
472✔
798
    for (int32_t j = 0; j < nVg; ++j) {
472✔
799
      SMountVgInfo *pVgInfo = TARRAY_GET_ELEM(pDbInfo->pVgs, j);
472✔
800
      SVnode        vnode = {
472✔
801
                 .config.vgId = pVgInfo->vgId,
472✔
802
                 .config.dbId = pVgInfo->dbId,
472✔
803
                 .config.cacheLastSize = pVgInfo->cacheLastSize,
472✔
804
                 .config.szPage = pVgInfo->szPage,
472✔
805
                 .config.szCache = pVgInfo->szCache,
472✔
806
                 .config.szBuf = pVgInfo->szBuf,
472✔
807
                 .config.cacheLast = pVgInfo->cacheLast,
472✔
808
                 .config.standby = pVgInfo->standby,
472✔
809
                 .config.hashMethod = pVgInfo->hashMethod,
472✔
810
                 .config.hashBegin = pVgInfo->hashBegin,
472✔
811
                 .config.hashEnd = pVgInfo->hashEnd,
472✔
812
                 .config.hashPrefix = pVgInfo->hashPrefix,
472✔
813
                 .config.hashSuffix = pVgInfo->hashSuffix,
472✔
814
                 .config.sttTrigger = pVgInfo->sttTrigger,
472✔
815
                 .config.syncCfg.replicaNum = pVgInfo->replications,
472✔
816
                 .config.tsdbCfg.precision = pVgInfo->precision,
472✔
817
                 .config.tsdbCfg.compression = pVgInfo->compression,
472✔
818
                 .config.tsdbCfg.slLevel = pVgInfo->slLevel,
472✔
819
                 .config.tsdbCfg.days = pVgInfo->daysPerFile,
472✔
820
                 .config.tsdbCfg.keep0 = pVgInfo->keep0,
472✔
821
                 .config.tsdbCfg.keep1 = pVgInfo->keep1,
472✔
822
                 .config.tsdbCfg.keep2 = pVgInfo->keep2,
472✔
823
                 .config.tsdbCfg.keepTimeOffset = pVgInfo->keepTimeOffset,
472✔
824
                 .config.tsdbCfg.minRows = pVgInfo->minRows,
472✔
825
                 .config.tsdbCfg.maxRows = pVgInfo->maxRows,
472✔
826
                 .config.tsdbPageSize = pVgInfo->tsdbPageSize,
472✔
827
                 .config.ssChunkSize = pVgInfo->ssChunkSize,
472✔
828
                 .config.ssKeepLocal = pVgInfo->ssKeepLocal,
472✔
829
                 .config.ssCompact = pVgInfo->ssCompact,
472✔
830
                 .config.walCfg.fsyncPeriod = pVgInfo->walFsyncPeriod,
472✔
831
                 .config.walCfg.retentionPeriod = pVgInfo->walRetentionPeriod,
472✔
832
                 .config.walCfg.rollPeriod = pVgInfo->walRollPeriod,
472✔
833
                 .config.walCfg.retentionSize = pVgInfo->walRetentionSize,
472✔
834
                 .config.walCfg.segSize = pVgInfo->walSegSize,
472✔
835
                 .config.walCfg.level = pVgInfo->walLevel,
472✔
836
          //.config.walCfg.encryptAlgorithm = pVgInfo->encryptAlgorithm,
837
                 .diskPrimary = pVgInfo->diskPrimary,
472✔
838
      };
839
      void *vnodePath = taosArrayGet(pMountInfo->pDisks[0], pVgInfo->diskPrimary);
472✔
840
      snprintf(path, sizeof(path), "%s%s%s%svnode%d", *(char **)vnodePath, TD_DIRSEP, dmNodeName(VNODE), TD_DIRSEP,
472✔
841
               pVgInfo->vgId);
842
      vnode.path = path;
472✔
843

844
      int32_t rollback = vnodeShouldRollback(&vnode);
472✔
845
      if ((code = metaOpen(&vnode, &vnode.pMeta, rollback)) != 0) {
472✔
846
        dError("mount:%s, failed to retrieve stbs of vnode:%d for db:%" PRId64 " on dnode:%d since %s, path:%s",
×
847
               pReq->mountName, pVgInfo->vgId, pVgInfo->dbId, pReq->dnodeId, tstrerror(code), path);
848
        TAOS_CHECK_EXIT(code);
×
849
      } else {
850
        dInfo("mount:%s, success to retrieve stbs of vnode:%d for db:%" PRId64 " on dnode:%d, path:%s", pReq->mountName,
472✔
851
              pVgInfo->vgId, pVgInfo->dbId, pReq->dnodeId, path);
852

853
        SMetaReader mr = {0};
472✔
854
        tb_uid_t    suid = 0;
472✔
855
        SMeta      *pMeta = vnode.pMeta;
472✔
856

857
        metaReaderDoInit(&mr, pMeta, META_READER_LOCK);
472✔
858
        if (!suidList && !(suidList = taosArrayInit(1, sizeof(tb_uid_t)))) {
472✔
859
          TSDB_CHECK_CODE(terrno, lino, _exit0);
×
860
        }
861
        taosArrayClear(suidList);
472✔
862
        TSDB_CHECK_CODE(vnodeGetStbIdList(&vnode, 0, suidList), lino, _exit0);
472✔
863
        dInfo("mount:%s, vnode:%d, db:%" PRId64 ", stbs num:%d on dnode:%d", pReq->mountName, pVgInfo->vgId,
472✔
864
              pVgInfo->dbId, (int32_t)taosArrayGetSize(suidList), pReq->dnodeId);
865
        int32_t nStbs = taosArrayGetSize(suidList);
472✔
866
        if (!pDbInfo->pStbs && !(pDbInfo->pStbs = taosArrayInit(nStbs, sizeof(void *)))) {
472✔
867
          TSDB_CHECK_CODE(terrno, lino, _exit0);
×
868
        }
869
        for (int32_t i = 0; i < nStbs; ++i) {
1,888✔
870
          suid = *(tb_uid_t *)taosArrayGet(suidList, i);
1,416✔
871
          dInfo("mount:%s, vnode:%d, db:%" PRId64 ", stb suid:%" PRIu64 " on dnode:%d", pReq->mountName, pVgInfo->vgId,
1,416✔
872
                pVgInfo->dbId, suid, pReq->dnodeId);
873
          if ((code = metaReaderGetTableEntryByUidCache(&mr, suid)) < 0) {
1,416✔
874
            TSDB_CHECK_CODE(code, lino, _exit0);
×
875
          }
876
          if (mr.me.uid != suid || mr.me.type != TSDB_SUPER_TABLE ||
1,416✔
877
              mr.me.colCmpr.nCols != mr.me.stbEntry.schemaRow.nCols) {
1,416✔
878
            dError("mount:%s, vnode:%d, db:%" PRId64 ", stb info not match, suid:%" PRIu64 " expected:%" PRIu64
×
879
                   ", type:%" PRIi8 " expected:%d, nCmprCols:%d nCols:%d on dnode:%d",
880
                   pReq->mountName, pVgInfo->vgId, pVgInfo->dbId, mr.me.uid, suid, mr.me.type, TSDB_SUPER_TABLE,
881
                   mr.me.colCmpr.nCols, mr.me.stbEntry.schemaRow.nCols, pReq->dnodeId);
882
            TSDB_CHECK_CODE(TSDB_CODE_FILE_CORRUPTED, lino, _exit0);
×
883
          }
884
          SMountStbInfo stbInfo = {
1,416✔
885
              .req.source = TD_REQ_FROM_APP,
886
              .req.suid = suid,
887
              .req.colVer = mr.me.stbEntry.schemaRow.version,
1,416✔
888
              .req.tagVer = mr.me.stbEntry.schemaTag.version,
1,416✔
889
              .req.numOfColumns = mr.me.stbEntry.schemaRow.nCols,
1,416✔
890
              .req.numOfTags = mr.me.stbEntry.schemaTag.nCols,
1,416✔
891
              .req.virtualStb = TABLE_IS_VIRTUAL(mr.me.flags) ? 1 : 0,
1,416✔
892
          };
893
          snprintf(stbInfo.req.name, sizeof(stbInfo.req.name), "%s", mr.me.name);
1,416✔
894
          if (!pCols && !(pCols = taosArrayInit(stbInfo.req.numOfColumns, sizeof(SFieldWithOptions)))) {
1,416✔
895
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
896
          }
897
          if (!pTags && !(pTags = taosArrayInit(stbInfo.req.numOfTags, sizeof(SField)))) {
1,416✔
898
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
899
          }
900

901
          if (!pColExts && !(pColExts = taosArrayInit(stbInfo.req.numOfColumns, sizeof(col_id_t)))) {
1,416✔
902
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
903
          }
904
          if (!pTagExts && !(pTagExts = taosArrayInit(stbInfo.req.numOfTags, sizeof(col_id_t)))) {
1,416✔
905
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
906
          }
907
          taosArrayClear(pCols);
1,416✔
908
          taosArrayClear(pTags);
1,416✔
909
          taosArrayClear(pColExts);
1,416✔
910
          taosArrayClear(pTagExts);
1,416✔
911
          stbInfo.req.pColumns = pCols;
1,416✔
912
          stbInfo.req.pTags = pTags;
1,416✔
913
          stbInfo.pColExts = pColExts;
1,416✔
914
          stbInfo.pTagExts = pTagExts;
1,416✔
915

916
          for (int32_t c = 0; c < stbInfo.req.numOfColumns; ++c) {
8,496✔
917
            SSchema          *pSchema = mr.me.stbEntry.schemaRow.pSchema + c;
7,080✔
918
            SColCmpr         *pColComp = mr.me.colCmpr.pColCmpr + c;
7,080✔
919
            SFieldWithOptions col = {
7,080✔
920
                .type = pSchema->type,
7,080✔
921
                .flags = pSchema->flags,
7,080✔
922
                .bytes = pSchema->bytes,
7,080✔
923
                .compress = pColComp->alg,
7,080✔
924
            };
925
            (void)snprintf(col.name, sizeof(col.name), "%s", pSchema->name);
7,080✔
926
            if (pSchema->colId != pColComp->id) {
7,080✔
927
              TSDB_CHECK_CODE(TSDB_CODE_FILE_CORRUPTED, lino, _exit0);
×
928
            }
929
            if (mr.me.pExtSchemas) {
7,080✔
930
              col.typeMod = (mr.me.pExtSchemas + c)->typeMod;
×
931
            }
932
            TSDB_CHECK_NULL(taosArrayPush(pCols, &col), code, lino, _exit0, terrno);
7,080✔
933
            TSDB_CHECK_NULL(taosArrayPush(pColExts, &pSchema->colId), code, lino, _exit0, terrno);
14,160✔
934
          }
935
          for (int32_t t = 0; t < stbInfo.req.numOfTags; ++t) {
3,304✔
936
            SSchema *pSchema = mr.me.stbEntry.schemaTag.pSchema + t;
1,888✔
937
            SField   tag = {
1,888✔
938
                  .type = pSchema->type,
1,888✔
939
                  .flags = pSchema->flags,
1,888✔
940
                  .bytes = pSchema->bytes,
1,888✔
941
            };
942
            (void)snprintf(tag.name, sizeof(tag.name), "%s", pSchema->name);
1,888✔
943
            TSDB_CHECK_NULL(taosArrayPush(pTags, &tag), code, lino, _exit0, terrno);
1,888✔
944
            TSDB_CHECK_NULL(taosArrayPush(pTagExts, &pSchema->colId), code, lino, _exit0, terrno);
3,776✔
945
          }
946
          tDecoderClear(&mr.coder);
1,416✔
947

948
          // serialize the SMountStbInfo
949
          int32_t firstPartLen = 0;
1,416✔
950
          int32_t msgLen = tSerializeSMountStbInfo(NULL, 0, &firstPartLen, &stbInfo);
1,416✔
951
          if (msgLen <= 0) {
1,416✔
952
            TSDB_CHECK_CODE(msgLen < 0 ? msgLen : TSDB_CODE_INTERNAL_ERROR, lino, _exit0);
×
953
          }
954
          void *pBuf = taosMemoryMalloc((sizeof(int32_t) << 1) + msgLen);  // totalLen(4)|1stPartLen(4)|1stPart|2ndPart
1,416✔
955
          if (!pBuf) TSDB_CHECK_CODE(TSDB_CODE_OUT_OF_MEMORY, lino, _exit0);
1,416✔
956
          *(int32_t *)pBuf = (sizeof(int32_t) << 1) + msgLen;
1,416✔
957
          *(int32_t *)POINTER_SHIFT(pBuf, sizeof(int32_t)) = firstPartLen;
1,416✔
958
          if (tSerializeSMountStbInfo(POINTER_SHIFT(pBuf, (sizeof(int32_t) << 1)), msgLen, NULL, &stbInfo) <= 0) {
1,416✔
959
            taosMemoryFree(pBuf);
×
960
            TSDB_CHECK_CODE(msgLen < 0 ? msgLen : TSDB_CODE_INTERNAL_ERROR, lino, _exit0);
×
961
          }
962
          if (!taosArrayPush(pDbInfo->pStbs, &pBuf)) {
2,832✔
963
            taosMemoryFree(pBuf);
×
964
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
965
          }
966
        }
967
      _exit0:
472✔
968
        metaReaderClear(&mr);
472✔
969
        metaClose(&vnode.pMeta);
472✔
970
        TAOS_CHECK_EXIT(code);
472✔
971
      }
972
      break;  // retrieve stbs from one vnode is enough
472✔
973
    }
974
  }
975
_exit:
236✔
976
  if (code != 0) {
236✔
977
    dError("mount:%s, failed to retrieve mount stbs at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
978
           pReq->dnodeId, tstrerror(code), path);
979
  }
980
  taosArrayDestroy(suidList);
236✔
981
  taosArrayDestroy(pCols);
236✔
982
  taosArrayDestroy(pTags);
236✔
983
  taosArrayDestroy(pColExts);
236✔
984
  taosArrayDestroy(pTagExts);
236✔
985
  TAOS_RETURN(code);
236✔
986
}
987

988
int32_t vmMountCheckRunning(const char *mountName, const char *mountPath, TdFilePtr *pFile, int32_t retryLimit) {
1,074✔
989
  int32_t code = 0, lino = 0;
1,074✔
990
  int32_t retryTimes = 0;
1,074✔
991
  char    filepath[PATH_MAX] = {0};
1,074✔
992
  (void)snprintf(filepath, sizeof(filepath), "%s%s.running", mountPath, TD_DIRSEP);
1,074✔
993
  TSDB_CHECK_NULL((*pFile = taosOpenFile(
1,074✔
994
                       filepath, TD_FILE_CREATE | TD_FILE_READ | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_CLOEXEC)),
995
                  code, lino, _exit, terrno);
996
  int32_t ret = 0;
1,074✔
997
  do {
998
    ret = taosLockFile(*pFile);
1,320✔
999
    if (ret == 0) break;
1,320✔
1000
    taosMsleep(1000);
369✔
1001
    ++retryTimes;
369✔
1002
    dError("mount:%s, failed to lock file:%s since %s, retryTimes:%d", mountName, filepath, tstrerror(ret), retryTimes);
369✔
1003
  } while (retryTimes < retryLimit);
369✔
1004
  TAOS_CHECK_EXIT(ret);
1,074✔
1005
_exit:
1,074✔
1006
  if (code != 0) {
1,074✔
1007
    (void)taosCloseFile(pFile);
123✔
1008
    *pFile = NULL;
123✔
1009
    dError("mount:%s, failed to check running at line %d since %s, path:%s", mountName, lino, tstrerror(code),
123✔
1010
           filepath);
1011
  }
1012
  TAOS_RETURN(code);
1,074✔
1013
}
1014

1015
static int32_t vmRetrieveMountPreCheck(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
728✔
1016
  int32_t code = 0, lino = 0;
728✔
1017
  char    path[TSDB_MOUNT_FPATH_LEN] = {0};
728✔
1018
  TSDB_CHECK_CONDITION(taosCheckAccessFile(pReq->mountPath, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
728✔
1019
  TAOS_CHECK_EXIT(vmMountCheckRunning(pReq->mountName, pReq->mountPath, &pMountInfo->pFile, 3));
605✔
1020
  (void)snprintf(path, sizeof(path), "%s%s%s%sdnode.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP);
482✔
1021
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
482✔
1022
  (void)snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(MNODE));
359✔
1023
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
359✔
1024
  (void)snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
359✔
1025
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
359✔
1026
  (void)snprintf(path, sizeof(path), "%s%s%s%sconfig%slocal.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP,
359✔
1027
           TD_DIRSEP);
1028
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
359✔
1029
_exit:
728✔
1030
  if (code != 0) {
728✔
1031
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
369✔
1032
           pReq->dnodeId, tstrerror(code), path);
1033
  }
1034
  TAOS_RETURN(code);
728✔
1035
}
1036

1037
static int32_t vmRetrieveMountPathImpl(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, SRetrieveMountPathReq *pReq,
728✔
1038
                                       SMountInfo *pMountInfo) {
1039
  int32_t code = 0, lino = 0;
728✔
1040
  pMountInfo->dnodeId = pReq->dnodeId;
728✔
1041
  pMountInfo->mountUid = pReq->mountUid;
728✔
1042
  (void)tsnprintf(pMountInfo->mountName, sizeof(pMountInfo->mountName), "%s", pReq->mountName);
728✔
1043
  (void)tsnprintf(pMountInfo->mountPath, sizeof(pMountInfo->mountPath), "%s", pReq->mountPath);
728✔
1044
  pMountInfo->ignoreExist = pReq->ignoreExist;
728✔
1045
  pMountInfo->valLen = pReq->valLen;
728✔
1046
  pMountInfo->pVal = pReq->pVal;
728✔
1047
  TAOS_CHECK_EXIT(vmRetrieveMountPreCheck(pMgmt, pReq, pMountInfo));
728✔
1048
  TAOS_CHECK_EXIT(vmRetrieveMountDnode(pMgmt, pReq, pMountInfo));
359✔
1049
  TAOS_CHECK_EXIT(vmRetrieveMountVnodes(pMgmt, pReq, pMountInfo));
359✔
1050
  TAOS_CHECK_EXIT(vmRetrieveMountStbs(pMgmt, pReq, pMountInfo));
236✔
1051
_exit:
236✔
1052
  if (code != 0) {
728✔
1053
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
492✔
1054
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
1055
  }
1056
  TAOS_RETURN(code);
728✔
1057
}
1058

1059
int32_t vmProcessRetrieveMountPathReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
728✔
1060
  int32_t               code = 0, lino = 0;
728✔
1061
  int32_t               rspCode = 0;
728✔
1062
  SVnodeMgmt            vndMgmt = {0};
728✔
1063
  SMountInfo            mountInfo = {0};
728✔
1064
  void                 *pBuf = NULL;
728✔
1065
  int32_t               bufLen = 0;
728✔
1066
  SRetrieveMountPathReq req = {0};
728✔
1067

1068
  vndMgmt = *pMgmt;
728✔
1069
  vndMgmt.path = NULL;
728✔
1070
  TAOS_CHECK_GOTO(tDeserializeSRetrieveMountPathReq(pMsg->pCont, pMsg->contLen, &req), &lino, _end);
728✔
1071
  dInfo("mount:%s, start to retrieve path:%s", req.mountName, req.mountPath);
728✔
1072
  TAOS_CHECK_GOTO(vmRetrieveMountPathImpl(&vndMgmt, pMsg, &req, &mountInfo), &lino, _end);
728✔
1073
_end:
728✔
1074
  TSDB_CHECK_CONDITION((bufLen = tSerializeSMountInfo(NULL, 0, &mountInfo)) >= 0, rspCode, lino, _exit, bufLen);
728✔
1075
  TSDB_CHECK_CONDITION((pBuf = rpcMallocCont(bufLen)), rspCode, lino, _exit, terrno);
728✔
1076
  TSDB_CHECK_CONDITION((bufLen = tSerializeSMountInfo(pBuf, bufLen, &mountInfo)) >= 0, rspCode, lino, _exit, bufLen);
728✔
1077
  pMsg->info.rsp = pBuf;
728✔
1078
  pMsg->info.rspLen = bufLen;
728✔
1079
_exit:
728✔
1080
  if (rspCode != 0) {
728✔
1081
    // corner case: if occurs, the client will not receive the response, and the client should be killed manually
1082
    dError("mount:%s, failed to retrieve mount at line %d since %s, dnode:%d, path:%s", req.mountName, lino,
×
1083
           tstrerror(rspCode), req.dnodeId, req.mountPath);
1084
    rpcFreeCont(pBuf);
×
1085
    code = rspCode;
×
1086
  } else if (code != 0) {
728✔
1087
    // the client would receive the response with error msg
1088
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", req.mountName, lino,
492✔
1089
           req.dnodeId, tstrerror(code), req.mountPath);
1090
  } else {
1091
    int32_t nVgs = 0;
236✔
1092
    int32_t nDbs = taosArrayGetSize(mountInfo.pDbs);
236✔
1093
    for (int32_t i = 0; i < nDbs; ++i) {
708✔
1094
      SMountDbInfo *pDb = TARRAY_GET_ELEM(mountInfo.pDbs, i);
472✔
1095
      nVgs += taosArrayGetSize(pDb->pVgs);
472✔
1096
    }
1097
    dInfo("mount:%s, success to retrieve mount, nDbs:%d, nVgs:%d, path:%s", req.mountName, nDbs, nVgs, req.mountPath);
236✔
1098
  }
1099
  taosMemFreeClear(vndMgmt.path);
728✔
1100
  tFreeMountInfo(&mountInfo, false);
728✔
1101
  TAOS_RETURN(code);
728✔
1102
}
1103

1104
static int32_t vmMountVnode(SVnodeMgmt *pMgmt, const char *path, SVnodeCfg *pCfg, int32_t diskPrimary,
944✔
1105
                            SMountVnodeReq *req, STfs *pMountTfs) {
1106
  int32_t    code = 0;
944✔
1107
  SVnodeInfo info = {0};
944✔
1108
  char       hostDir[TSDB_FILENAME_LEN] = {0};
944✔
1109
  char       mountDir[TSDB_FILENAME_LEN] = {0};
944✔
1110
  char       mountVnode[32] = {0};
944✔
1111

1112
  if ((code = vnodeCheckCfg(pCfg)) < 0) {
944✔
1113
    vError("vgId:%d, mount:%s, failed to mount vnode since:%s", pCfg->vgId, req->mountName, tstrerror(code));
×
1114
    return code;
×
1115
  }
1116

1117
  vnodeGetPrimaryDir(path, 0, pMgmt->pTfs, hostDir, TSDB_FILENAME_LEN);
944✔
1118
  if ((code = taosMkDir(hostDir))) {
944✔
1119
    vError("vgId:%d, mount:%s, failed to prepare vnode dir since %s, host path: %s", pCfg->vgId, req->mountName,
×
1120
           tstrerror(code), hostDir);
1121
    return code;
×
1122
  }
1123

1124
  info.config = *pCfg;  // copy the config
944✔
1125
  info.state.committed = req->committed;
944✔
1126
  info.state.commitID = req->commitID;
944✔
1127
  info.state.commitTerm = req->commitTerm;
944✔
1128
  info.state.applied = req->committed;
944✔
1129
  info.state.applyTerm = req->commitTerm;
944✔
1130
  info.config.vndStats.numOfSTables = req->numOfSTables;
944✔
1131
  info.config.vndStats.numOfCTables = req->numOfCTables;
944✔
1132
  info.config.vndStats.numOfNTables = req->numOfNTables;
944✔
1133

1134
  SVnodeInfo oldInfo = {0};
944✔
1135
  oldInfo.config = vnodeCfgDefault;
944✔
1136
  if (vnodeLoadInfo(hostDir, &oldInfo) == 0) {
944✔
1137
    if (oldInfo.config.dbId != info.config.dbId) {
×
1138
      code = TSDB_CODE_VND_ALREADY_EXIST_BUT_NOT_MATCH;
×
1139
      vError("vgId:%d, mount:%s, vnode config info already exists at %s. oldDbId:%" PRId64 "(%s) at cluster:%" PRId64
×
1140
             ", newDbId:%" PRId64 "(%s) at cluser:%" PRId64 ", code:%s",
1141
             oldInfo.config.vgId, req->mountName, hostDir, oldInfo.config.dbId, oldInfo.config.dbname,
1142
             oldInfo.config.syncCfg.nodeInfo[oldInfo.config.syncCfg.myIndex].clusterId, info.config.dbId,
1143
             info.config.dbname, info.config.syncCfg.nodeInfo[info.config.syncCfg.myIndex].clusterId, tstrerror(code));
1144

1145
    } else {
1146
      vWarn("vgId:%d, mount:%s, vnode config info already exists at %s.", oldInfo.config.vgId, req->mountName, hostDir);
×
1147
    }
1148
    return code;
×
1149
  }
1150

1151
  char hostSubDir[TSDB_FILENAME_LEN] = {0};
944✔
1152
  char mountSubDir[TSDB_FILENAME_LEN] = {0};
944✔
1153
  (void)snprintf(mountVnode, sizeof(mountVnode), "vnode%svnode%d", TD_DIRSEP, req->mountVgId);
944✔
1154
  vnodeGetPrimaryDir(mountVnode, diskPrimary, pMountTfs, mountDir, TSDB_FILENAME_LEN);
944✔
1155
  static const char *vndSubDirs[] = {"meta", "sync", "tq", "tsdb", "wal"};
1156
  for (int32_t i = 0; i < tListLen(vndSubDirs); ++i) {
5,664✔
1157
    (void)snprintf(hostSubDir, sizeof(hostSubDir), "%s%s%s", hostDir, TD_DIRSEP, vndSubDirs[i]);
4,720✔
1158
    (void)snprintf(mountSubDir, sizeof(mountSubDir), "%s%s%s", mountDir, TD_DIRSEP, vndSubDirs[i]);
4,720✔
1159
    if ((code = taosSymLink(mountSubDir, hostSubDir)) != 0) {
4,597✔
1160
      vError("vgId:%d, mount:%s, failed to create vnode symlink %s -> %s since %s", info.config.vgId, req->mountName,
×
1161
             mountSubDir, hostSubDir, tstrerror(code));
1162
      return code;
×
1163
    }
1164
  }
1165
  vInfo("vgId:%d, mount:save vnode config while create", info.config.vgId);
944✔
1166
  if ((code = vnodeSaveInfo(hostDir, &info)) < 0 || (code = vnodeCommitInfo(hostDir)) < 0) {
944✔
1167
    vError("vgId:%d, mount:%s, failed to save vnode config since %s, mount path: %s", pCfg ? pCfg->vgId : 0,
×
1168
           req->mountName, tstrerror(code), hostDir);
1169
    return code;
×
1170
  }
1171
  vInfo("vgId:%d, mount:%s, vnode is mounted from %s to %s", info.config.vgId, req->mountName, mountDir, hostDir);
944✔
1172
  return 0;
944✔
1173
}
1174

1175
int32_t vmProcessMountVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
944✔
1176
  int32_t          code = 0, lino = 0;
944✔
1177
  SMountVnodeReq   req = {0};
944✔
1178
  SCreateVnodeReq *pCreateReq = &req.createReq;
944✔
1179
  SVnodeCfg        vnodeCfg = {0};
944✔
1180
  SWrapperCfg      wrapperCfg = {0};
944✔
1181
  SVnode          *pImpl = NULL;
944✔
1182
  STfs            *pMountTfs = NULL;
944✔
1183
  char             path[TSDB_FILENAME_LEN] = {0};
944✔
1184
  bool             releaseTfs = false;
944✔
1185

1186
  if (tDeserializeSMountVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
944✔
1187
    dError("vgId:%d, failed to mount vnode since deserialize request error", pCreateReq->vgId);
×
1188
    return TSDB_CODE_INVALID_MSG;
×
1189
  }
1190

1191
  if (pCreateReq->learnerReplica == 0) {
944✔
1192
    pCreateReq->learnerSelfIndex = -1;
944✔
1193
  }
1194
  for (int32_t i = 0; i < pCreateReq->replica; ++i) {
1,888✔
1195
    dInfo("mount:%s, vgId:%d, replica:%d ep:%s:%u dnode:%d", req.mountName, pCreateReq->vgId, i,
944✔
1196
          pCreateReq->replicas[i].fqdn, pCreateReq->replicas[i].port, pCreateReq->replicas[i].id);
1197
  }
1198
  for (int32_t i = 0; i < pCreateReq->learnerReplica; ++i) {
944✔
1199
    dInfo("mount:%s, vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.mountName, pCreateReq->vgId, i,
×
1200
          pCreateReq->learnerReplicas[i].fqdn, pCreateReq->learnerReplicas[i].port, pCreateReq->replicas[i].id);
1201
  }
1202

1203
  SReplica *pReplica = NULL;
944✔
1204
  if (pCreateReq->selfIndex != -1) {
944✔
1205
    pReplica = &pCreateReq->replicas[pCreateReq->selfIndex];
944✔
1206
  } else {
1207
    pReplica = &pCreateReq->learnerReplicas[pCreateReq->learnerSelfIndex];
×
1208
  }
1209
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
944✔
1210
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
944✔
1211
    (void)tFreeSMountVnodeReq(&req);
×
1212
    code = TSDB_CODE_INVALID_MSG;
×
1213
    dError("mount:%s, vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode, reason:%s", req.mountName,
×
1214
           pCreateReq->vgId, pReplica->id, pReplica->fqdn, pReplica->port, tstrerror(code));
1215
    return code;
×
1216
  }
1217
  
1218
  if (taosWaitCfgKeyLoaded() != 0) {
944✔
1219
    (void)tFreeSMountVnodeReq(&req);
×
1220
    code = terrno;
×
1221
    dError("mount:%s, vgId:%d, failed to create vnode since encrypt key is not loaded, reason:%s", req.mountName,
×
1222
           pCreateReq->vgId, tstrerror(code));
1223
    return code;
×
1224
  }
1225

1226
  vmGenerateVnodeCfg(pCreateReq, &vnodeCfg);
944✔
1227
  vnodeCfg.mountVgId = req.mountVgId;
944✔
1228
  vmGenerateWrapperCfg(pMgmt, pCreateReq, &wrapperCfg);
944✔
1229
  wrapperCfg.mountId = req.mountId;
944✔
1230

1231
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, pCreateReq->vgId, false);
944✔
1232
  if (pVnode != NULL && (pCreateReq->replica == 1 || !pVnode->failed)) {
944✔
1233
    dError("mount:%s, vgId:%d, already exist", req.mountName, pCreateReq->vgId);
×
1234
    (void)tFreeSMountVnodeReq(&req);
×
1235
    vmReleaseVnode(pMgmt, pVnode);
×
1236
    code = TSDB_CODE_VND_ALREADY_EXIST;
×
1237
    return 0;
×
1238
  }
1239
  vmReleaseVnode(pMgmt, pVnode);
944✔
1240

1241
  wrapperCfg.diskPrimary = req.diskPrimary;
944✔
1242
  (void)snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
944✔
1243
  TAOS_CHECK_EXIT(vmAcquireMountTfs(pMgmt, req.mountId, req.mountName, req.mountPath, &pMountTfs));
944✔
1244
  releaseTfs = true;
944✔
1245

1246
  TAOS_CHECK_EXIT(vmMountVnode(pMgmt, path, &vnodeCfg, wrapperCfg.diskPrimary, &req, pMountTfs));
944✔
1247
  if (!(pImpl = vnodeOpen(path, 0, pMgmt->pTfs, pMountTfs, pMgmt->msgCb, true))) {
944✔
1248
    TAOS_CHECK_EXIT(terrno != 0 ? terrno : -1);
×
1249
  }
1250
  if ((code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl)) != 0) {
944✔
1251
    TAOS_CHECK_EXIT(terrno != 0 ? terrno : code);
×
1252
  }
1253
  TAOS_CHECK_EXIT(vnodeStart(pImpl));
944✔
1254
  TAOS_CHECK_EXIT(vmWriteVnodeListToFile(pMgmt));
944✔
1255
  TAOS_CHECK_EXIT(vmWriteMountListToFile(pMgmt));
944✔
1256
_exit:
944✔
1257
  vmCleanPrimaryDisk(pMgmt, pCreateReq->vgId);
944✔
1258
  if (code != 0) {
944✔
1259
    dError("mount:%s, vgId:%d, msgType:%s, failed at line %d to mount vnode since %s", req.mountName, pCreateReq->vgId,
×
1260
           TMSG_INFO(pMsg->msgType), lino, tstrerror(code));
1261
    vmCloseFailedVnode(pMgmt, pCreateReq->vgId);
×
1262
    vnodeClose(pImpl);
×
1263
    vnodeDestroy(0, path, pMgmt->pTfs, 0);
×
1264
    if (releaseTfs) vmReleaseMountTfs(pMgmt, req.mountId, 1);
×
1265
  } else {
1266
    dInfo("mount:%s, vgId:%d, msgType:%s, success to mount vnode", req.mountName, pCreateReq->vgId,
944✔
1267
          TMSG_INFO(pMsg->msgType));
1268
  }
1269

1270
  pMsg->code = code;
944✔
1271
  pMsg->info.rsp = NULL;
944✔
1272
  pMsg->info.rspLen = 0;
944✔
1273

1274
  (void)tFreeSMountVnodeReq(&req);
944✔
1275
  TAOS_RETURN(code);
944✔
1276
}
1277
#endif  // USE_MOUNT
1278

1279
// alter replica doesn't use this, but restore dnode still use this
1280
int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
1,755,984✔
1281
  SAlterVnodeTypeReq req = {0};
1,755,984✔
1282
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
1,755,984✔
1283
    terrno = TSDB_CODE_INVALID_MSG;
×
1284
    return -1;
×
1285
  }
1286

1287
  if (req.learnerReplicas == 0) {
1288
    req.learnerSelfIndex = -1;
1289
  }
1290

1291
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process alter-node-type-request", req.vgId,
1,755,984✔
1292
        TMSG_INFO(pMsg->msgType));
1293

1294
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
1,755,984✔
1295
  if (pVnode == NULL) {
1,755,984✔
1296
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
1297
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1298
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1299
    return -1;
×
1300
  }
1301

1302
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
1,755,984✔
1303
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
1,755,984✔
1304
  if (role == TAOS_SYNC_ROLE_VOTER) {
1,755,984✔
1305
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
1306
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
1307
    vmReleaseVnode(pMgmt, pVnode);
×
1308
    return -1;
×
1309
  }
1310

1311
  dInfo("vgId:%d, checking node catch up", req.vgId);
1,755,984✔
1312
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
1,755,984✔
1313
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
1,665,517✔
1314
    vmReleaseVnode(pMgmt, pVnode);
1,665,517✔
1315
    return -1;
1,665,517✔
1316
  }
1317

1318
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
90,467✔
1319

1320
  int32_t vgId = req.vgId;
90,467✔
1321
  dInfo("vgId:%d, start to alter vnode type replica:%d selfIndex:%d strict:%d changeVersion:%d", vgId, req.replica,
90,467✔
1322
        req.selfIndex, req.strict, req.changeVersion);
1323
  for (int32_t i = 0; i < req.replica; ++i) {
361,720✔
1324
    SReplica *pReplica = &req.replicas[i];
271,253✔
1325
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
271,253✔
1326
  }
1327
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
90,467✔
1328
    SReplica *pReplica = &req.learnerReplicas[i];
×
1329
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
×
1330
  }
1331

1332
  if (req.replica <= 0 || (req.selfIndex < 0 && req.learnerSelfIndex < 0) || req.selfIndex >= req.replica ||
90,467✔
1333
      req.learnerSelfIndex >= req.learnerReplica) {
90,467✔
1334
    terrno = TSDB_CODE_INVALID_MSG;
×
1335
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
1336
    vmReleaseVnode(pMgmt, pVnode);
×
1337
    return -1;
×
1338
  }
1339

1340
  SReplica *pReplica = NULL;
90,467✔
1341
  if (req.selfIndex != -1) {
90,467✔
1342
    pReplica = &req.replicas[req.selfIndex];
90,467✔
1343
  } else {
1344
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
×
1345
  }
1346

1347
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
90,467✔
1348
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
90,467✔
1349
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1350
    dError("vgId:%d, dnodeId:%d ep:%s:%u in request, ep:%s:%u in local, %s", vgId, pReplica->id, pReplica->fqdn,
×
1351
           pReplica->port, tsLocalFqdn, tsServerPort, tstrerror(terrno));
1352
    vmReleaseVnode(pMgmt, pVnode);
×
1353
    return -1;
×
1354
  }
1355

1356
  dInfo("vgId:%d, start to close vnode", vgId);
90,467✔
1357
  SWrapperCfg wrapperCfg = {
90,467✔
1358
      .dropped = pVnode->dropped,
90,467✔
1359
      .vgId = pVnode->vgId,
90,467✔
1360
      .vgVersion = pVnode->vgVersion,
90,467✔
1361
      .diskPrimary = pVnode->diskPrimary,
90,467✔
1362
  };
1363
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
90,467✔
1364

1365
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
90,467✔
1366
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
90,467✔
1367

1368
  int32_t diskPrimary = wrapperCfg.diskPrimary;
90,467✔
1369
  char    path[TSDB_FILENAME_LEN] = {0};
90,467✔
1370
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
90,467✔
1371

1372
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
90,467✔
1373
  if (vnodeAlterReplica(path, &req, diskPrimary, pMgmt->pTfs) < 0) {
90,467✔
1374
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
1375
    return -1;
×
1376
  }
1377

1378
  dInfo("vgId:%d, begin to open vnode", vgId);
90,467✔
1379
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
90,467✔
1380
  if (pImpl == NULL) {
90,467✔
1381
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
1382
    return -1;
×
1383
  }
1384

1385
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
90,467✔
1386
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
1387
    return -1;
×
1388
  }
1389

1390
  if (vnodeStart(pImpl) != 0) {
90,467✔
1391
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
1392
    return -1;
×
1393
  }
1394

1395
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process alter-node-type-request, vnode config is altered",
90,467✔
1396
        req.vgId, TMSG_INFO(pMsg->msgType));
1397
  return 0;
90,467✔
1398
}
1399

1400
int32_t vmProcessCheckLearnCatchupReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
1401
  SCheckLearnCatchupReq req = {0};
×
1402
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
×
1403
    terrno = TSDB_CODE_INVALID_MSG;
×
1404
    return -1;
×
1405
  }
1406

1407
  if (req.learnerReplicas == 0) {
1408
    req.learnerSelfIndex = -1;
1409
  }
1410

1411
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process check-learner-catchup-request", req.vgId,
×
1412
        TMSG_INFO(pMsg->msgType));
1413

1414
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
×
1415
  if (pVnode == NULL) {
×
1416
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
1417
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1418
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1419
    return -1;
×
1420
  }
1421

1422
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
×
1423
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
×
1424
  if (role == TAOS_SYNC_ROLE_VOTER) {
×
1425
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
1426
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
1427
    vmReleaseVnode(pMgmt, pVnode);
×
1428
    return -1;
×
1429
  }
1430

1431
  dInfo("vgId:%d, checking node catch up", req.vgId);
×
1432
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
×
1433
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
×
1434
    vmReleaseVnode(pMgmt, pVnode);
×
1435
    return -1;
×
1436
  }
1437

1438
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
×
1439

1440
  vmReleaseVnode(pMgmt, pVnode);
×
1441

1442
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process check-learner-catchup-request", req.vgId,
×
1443
        TMSG_INFO(pMsg->msgType));
1444

1445
  return 0;
×
1446
}
1447

1448
int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
24,774✔
1449
  SDisableVnodeWriteReq req = {0};
24,774✔
1450
  if (tDeserializeSDisableVnodeWriteReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
24,774✔
1451
    terrno = TSDB_CODE_INVALID_MSG;
×
1452
    return -1;
×
1453
  }
1454

1455
  dInfo("vgId:%d, vnode write disable:%d", req.vgId, req.disable);
24,774✔
1456

1457
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
24,774✔
1458
  if (pVnode == NULL) {
24,774✔
1459
    dError("vgId:%d, failed to disable write since %s", req.vgId, terrstr());
×
1460
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1461
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1462
    return -1;
×
1463
  }
1464

1465
  pVnode->disable = req.disable;
24,774✔
1466
  vmReleaseVnode(pMgmt, pVnode);
24,774✔
1467
  return 0;
24,774✔
1468
}
1469

1470
int32_t vmProcessSetKeepVersionReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
3,166✔
1471
  SMsgHead *pHead = pMsg->pCont;
3,166✔
1472
  pHead->contLen = ntohl(pHead->contLen);
3,166✔
1473
  pHead->vgId = ntohl(pHead->vgId);
3,166✔
1474

1475
  SVndSetKeepVersionReq req = {0};
3,166✔
1476
  if (tDeserializeSVndSetKeepVersionReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead),
3,166✔
1477
                                        &req) != 0) {
1478
    terrno = TSDB_CODE_INVALID_MSG;
×
1479
    return -1;
×
1480
  }
1481

1482
  dInfo("vgId:%d, set wal keep version to %" PRId64, pHead->vgId, req.keepVersion);
3,166✔
1483

1484
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
3,166✔
1485
  if (pVnode == NULL) {
3,166✔
1486
    dError("vgId:%d, failed to set keep version since %s", pHead->vgId, terrstr());
×
1487
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1488
    return -1;
×
1489
  }
1490

1491
  // Directly call vnodeSetWalKeepVersion for immediate effect (< 1ms)
1492
  // This bypasses Raft to avoid timing issues where WAL might be deleted
1493
  // before keepVersion is set through the Raft consensus process
1494
  int32_t code = vnodeSetWalKeepVersion(pVnode->pImpl, req.keepVersion);
3,166✔
1495
  if (code != TSDB_CODE_SUCCESS) {
3,166✔
1496
    dError("vgId:%d, failed to set keepVersion to %" PRId64 " since %s", pHead->vgId, req.keepVersion, tstrerror(code));
×
1497
    terrno = code;
×
1498
    vmReleaseVnode(pMgmt, pVnode);
×
1499
    return -1;
×
1500
  }
1501

1502
  dInfo("vgId:%d, successfully set keepVersion to %" PRId64, pHead->vgId, req.keepVersion);
3,166✔
1503

1504
  vmReleaseVnode(pMgmt, pVnode);
3,166✔
1505
  return 0;
3,166✔
1506
}
1507

1508
int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
24,103✔
1509
  SAlterVnodeHashRangeReq req = {0};
24,103✔
1510
  if (tDeserializeSAlterVnodeHashRangeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
24,103✔
1511
    terrno = TSDB_CODE_INVALID_MSG;
×
1512
    return -1;
×
1513
  }
1514

1515
  int32_t srcVgId = req.srcVgId;
24,103✔
1516
  int32_t dstVgId = req.dstVgId;
24,103✔
1517

1518
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, dstVgId);
24,103✔
1519
  if (pVnode != NULL) {
24,103✔
1520
    dError("vgId:%d, vnode already exist", dstVgId);
×
1521
    vmReleaseVnode(pMgmt, pVnode);
×
1522
    terrno = TSDB_CODE_VND_ALREADY_EXIST;
×
1523
    return -1;
×
1524
  }
1525

1526
  dInfo("vgId:%d, start to alter vnode hashrange:[%u, %u], dstVgId:%d", req.srcVgId, req.hashBegin, req.hashEnd,
24,103✔
1527
        req.dstVgId);
1528
  pVnode = vmAcquireVnode(pMgmt, srcVgId);
24,103✔
1529
  if (pVnode == NULL) {
24,103✔
1530
    dError("vgId:%d, failed to alter hashrange since %s", srcVgId, terrstr());
×
1531
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1532
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1533
    return -1;
×
1534
  }
1535

1536
  SWrapperCfg wrapperCfg = {
24,103✔
1537
      .dropped = pVnode->dropped,
24,103✔
1538
      .vgId = dstVgId,
1539
      .vgVersion = pVnode->vgVersion,
24,103✔
1540
      .diskPrimary = pVnode->diskPrimary,
24,103✔
1541
  };
1542
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
24,103✔
1543

1544
  // prepare alter
1545
  pVnode->toVgId = dstVgId;
24,103✔
1546
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
24,103✔
1547
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
1548
    return -1;
×
1549
  }
1550

1551
  dInfo("vgId:%d, close vnode", srcVgId);
24,103✔
1552
  vmCloseVnode(pMgmt, pVnode, true, false);
24,103✔
1553

1554
  int32_t diskPrimary = wrapperCfg.diskPrimary;
24,103✔
1555
  char    srcPath[TSDB_FILENAME_LEN] = {0};
24,103✔
1556
  char    dstPath[TSDB_FILENAME_LEN] = {0};
24,103✔
1557
  snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
24,103✔
1558
  snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
24,103✔
1559

1560
  dInfo("vgId:%d, alter vnode hashrange at %s", srcVgId, srcPath);
24,103✔
1561
  if (vnodeAlterHashRange(srcPath, dstPath, &req, diskPrimary, pMgmt->pTfs) < 0) {
24,103✔
1562
    dError("vgId:%d, failed to alter vnode hashrange since %s", srcVgId, terrstr());
×
1563
    return -1;
×
1564
  }
1565

1566
  dInfo("vgId:%d, open vnode", dstVgId);
24,103✔
1567
  SVnode *pImpl = vnodeOpen(dstPath, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
24,103✔
1568

1569
  if (pImpl == NULL) {
24,103✔
1570
    dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr());
×
1571
    return -1;
×
1572
  }
1573

1574
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
24,103✔
1575
    dError("vgId:%d, failed to open vnode mgmt since %s", dstVgId, terrstr());
×
1576
    return -1;
×
1577
  }
1578

1579
  if (vnodeStart(pImpl) != 0) {
24,103✔
1580
    dError("vgId:%d, failed to start sync since %s", dstVgId, terrstr());
×
1581
    return -1;
×
1582
  }
1583

1584
  // complete alter
1585
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
24,103✔
1586
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
1587
    return -1;
×
1588
  }
1589

1590
  dInfo("vgId:%d, vnode hashrange is altered", dstVgId);
24,103✔
1591
  return 0;
24,103✔
1592
}
1593

1594
int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
632,974✔
1595
  SAlterVnodeReplicaReq alterReq = {0};
632,974✔
1596
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
632,974✔
1597
    terrno = TSDB_CODE_INVALID_MSG;
×
1598
    return -1;
×
1599
  }
1600

1601
  if (alterReq.learnerReplica == 0) {
632,974✔
1602
    alterReq.learnerSelfIndex = -1;
453,739✔
1603
  }
1604

1605
  int32_t vgId = alterReq.vgId;
632,974✔
1606
  dInfo(
632,974✔
1607
      "vgId:%d, vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
1608
      "learnerSelfIndex:%d strict:%d changeVersion:%d",
1609
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
1610
      alterReq.learnerSelfIndex, alterReq.strict, alterReq.changeVersion);
1611

1612
  for (int32_t i = 0; i < alterReq.replica; ++i) {
2,466,437✔
1613
    SReplica *pReplica = &alterReq.replicas[i];
1,833,463✔
1614
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
1,833,463✔
1615
  }
1616
  for (int32_t i = 0; i < alterReq.learnerReplica; ++i) {
812,209✔
1617
    SReplica *pReplica = &alterReq.learnerReplicas[i];
179,235✔
1618
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
179,235✔
1619
  }
1620

1621
  if (alterReq.replica <= 0 || (alterReq.selfIndex < 0 && alterReq.learnerSelfIndex < 0) ||
632,974✔
1622
      alterReq.selfIndex >= alterReq.replica || alterReq.learnerSelfIndex >= alterReq.learnerReplica) {
632,974✔
1623
    terrno = TSDB_CODE_INVALID_MSG;
×
1624
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
1625
    return -1;
×
1626
  }
1627

1628
  SReplica *pReplica = NULL;
632,974✔
1629
  if (alterReq.selfIndex != -1) {
632,974✔
1630
    pReplica = &alterReq.replicas[alterReq.selfIndex];
632,974✔
1631
  } else {
1632
    pReplica = &alterReq.learnerReplicas[alterReq.learnerSelfIndex];
×
1633
  }
1634

1635
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
632,974✔
1636
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
632,974✔
1637
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1638
    dError("vgId:%d, dnodeId:%d ep:%s:%u in request, ep:%s:%u in lcoal, %s", vgId, pReplica->id, pReplica->fqdn,
×
1639
           pReplica->port, tsLocalFqdn, tsServerPort, tstrerror(terrno));
1640
    return -1;
×
1641
  }
1642

1643
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
632,974✔
1644
  if (pVnode == NULL) {
632,974✔
1645
    dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
×
1646
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1647
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1648
    return -1;
×
1649
  }
1650

1651
  dInfo("vgId:%d, start to close vnode", vgId);
632,974✔
1652
  SWrapperCfg wrapperCfg = {
632,974✔
1653
      .dropped = pVnode->dropped,
632,974✔
1654
      .vgId = pVnode->vgId,
632,974✔
1655
      .vgVersion = pVnode->vgVersion,
632,974✔
1656
      .diskPrimary = pVnode->diskPrimary,
632,974✔
1657
  };
1658
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
632,974✔
1659

1660
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
632,974✔
1661
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
632,974✔
1662

1663
  int32_t diskPrimary = wrapperCfg.diskPrimary;
632,974✔
1664
  char    path[TSDB_FILENAME_LEN] = {0};
632,974✔
1665
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
632,974✔
1666

1667
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
632,974✔
1668
  if (vnodeAlterReplica(path, &alterReq, diskPrimary, pMgmt->pTfs) < 0) {
632,974✔
1669
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
1670
    return -1;
×
1671
  }
1672

1673
  dInfo("vgId:%d, begin to open vnode", vgId);
632,974✔
1674
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
632,974✔
1675
  if (pImpl == NULL) {
632,974✔
1676
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
1677
    return -1;
×
1678
  }
1679

1680
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
632,974✔
1681
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
1682
    return -1;
×
1683
  }
1684

1685
  if (vnodeStart(pImpl) != 0) {
632,974✔
1686
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
1687
    return -1;
×
1688
  }
1689

1690
  dInfo(
632,974✔
1691
      "vgId:%d, vnode management handle msgType:%s, end to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
1692
      "learnerSelfIndex:%d strict:%d",
1693
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
1694
      alterReq.learnerSelfIndex, alterReq.strict);
1695
  return 0;
632,974✔
1696
}
1697

1698
int32_t vmProcessAlterVnodeElectBaselineReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
22,942✔
1699
  SAlterVnodeElectBaselineReq alterReq = {0};
22,942✔
1700
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
22,942✔
1701
    return TSDB_CODE_INVALID_MSG;
×
1702
  }
1703

1704
  int32_t vgId = alterReq.vgId;
22,942✔
1705
  dInfo(
22,942✔
1706
      "vgId:%d, process alter vnode elect-base-line msgType:%s, electBaseLine:%d",
1707
      vgId, TMSG_INFO(pMsg->msgType), alterReq.electBaseLine);
1708

1709
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
22,942✔
1710
  if (pVnode == NULL) {
22,942✔
1711
    dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
×
1712
    return terrno;
×
1713
  }
1714

1715
  if(vnodeSetElectBaseline(pVnode->pImpl, alterReq.electBaseLine) != 0){
22,942✔
1716
    vmReleaseVnode(pMgmt, pVnode);
×
1717
    return -1;
×
1718
  }
1719

1720
  vmReleaseVnode(pMgmt, pVnode);
22,942✔
1721
  return 0;
22,942✔
1722
}
1723

1724
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
1,394,886✔
1725
  int32_t       code = 0;
1,394,886✔
1726
  SDropVnodeReq dropReq = {0};
1,394,886✔
1727
  if (tDeserializeSDropVnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
1,394,886✔
1728
    terrno = TSDB_CODE_INVALID_MSG;
×
1729
    return terrno;
×
1730
  }
1731

1732
  int32_t vgId = dropReq.vgId;
1,394,886✔
1733
  dInfo("vgId:%d, start to drop vnode", vgId);
1,394,886✔
1734

1735
  if (dropReq.dnodeId != pMgmt->pData->dnodeId) {
1,394,886✔
1736
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1737
    dError("vgId:%d, dnodeId:%d, %s", dropReq.vgId, dropReq.dnodeId, tstrerror(terrno));
×
1738
    return terrno;
×
1739
  }
1740

1741
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, vgId, false);
1,394,886✔
1742
  if (pVnode == NULL) {
1,394,886✔
1743
    dInfo("vgId:%d, failed to drop since %s", vgId, terrstr());
×
1744
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1745
    return terrno;
×
1746
  }
1747

1748
  pVnode->dropped = 1;
1,394,886✔
1749
  if ((code = vmWriteVnodeListToFile(pMgmt)) != 0) {
1,394,886✔
1750
    pVnode->dropped = 0;
×
1751
    vmReleaseVnode(pMgmt, pVnode);
×
1752
    return code;
×
1753
  }
1754

1755
  vmCloseVnode(pMgmt, pVnode, false, false);
1,394,886✔
1756
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
1,394,886✔
1757
    dError("vgId:%d, failed to write vnode list since %s", vgId, terrstr());
×
1758
  }
1759

1760
  dInfo("vgId:%d, is dropped", vgId);
1,394,886✔
1761
  return 0;
1,394,886✔
1762
}
1763

1764
int32_t vmProcessArbHeartBeatReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
113,089✔
1765
  SVArbHeartBeatReq arbHbReq = {0};
113,089✔
1766
  SVArbHeartBeatRsp arbHbRsp = {0};
113,089✔
1767
  if (tDeserializeSVArbHeartBeatReq(pMsg->pCont, pMsg->contLen, &arbHbReq) != 0) {
113,089✔
1768
    terrno = TSDB_CODE_INVALID_MSG;
×
1769
    return -1;
×
1770
  }
1771

1772
  if (arbHbReq.dnodeId != pMgmt->pData->dnodeId) {
113,089✔
1773
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1774
    dError("dnodeId:%d, %s", arbHbReq.dnodeId, tstrerror(terrno));
×
1775
    goto _OVER;
×
1776
  }
1777

1778
  if (strlen(arbHbReq.arbToken) == 0) {
113,089✔
1779
    terrno = TSDB_CODE_INVALID_MSG;
×
1780
    dError("dnodeId:%d arbToken is empty", arbHbReq.dnodeId);
×
1781
    goto _OVER;
×
1782
  }
1783

1784
  size_t size = taosArrayGetSize(arbHbReq.hbMembers);
113,089✔
1785

1786
  arbHbRsp.dnodeId = pMgmt->pData->dnodeId;
113,089✔
1787
  tstrncpy(arbHbRsp.arbToken, arbHbReq.arbToken, TSDB_ARB_TOKEN_SIZE);
113,089✔
1788
  arbHbRsp.hbMembers = taosArrayInit(size, sizeof(SVArbHbRspMember));
113,089✔
1789
  if (arbHbRsp.hbMembers == NULL) {
113,089✔
1790
    goto _OVER;
×
1791
  }
1792

1793
  for (int32_t i = 0; i < size; i++) {
242,053✔
1794
    SVArbHbReqMember *pReqMember = taosArrayGet(arbHbReq.hbMembers, i);
128,964✔
1795
    SVnodeObj        *pVnode = vmAcquireVnode(pMgmt, pReqMember->vgId);
128,964✔
1796
    if (pVnode == NULL) {
128,964✔
1797
      dError("dnodeId:%d vgId:%d not found failed to process arb hb req", arbHbReq.dnodeId, pReqMember->vgId);
20,040✔
1798
      continue;
20,040✔
1799
    }
1800

1801
    SVArbHbRspMember rspMember = {0};
108,924✔
1802
    rspMember.vgId = pReqMember->vgId;
108,924✔
1803
    rspMember.hbSeq = pReqMember->hbSeq;
108,924✔
1804
    if (vnodeGetArbToken(pVnode->pImpl, rspMember.memberToken) != 0) {
108,924✔
1805
      dError("dnodeId:%d vgId:%d failed to get arb token", arbHbReq.dnodeId, pReqMember->vgId);
×
1806
      vmReleaseVnode(pMgmt, pVnode);
×
1807
      continue;
×
1808
    }
1809

1810
    if (vnodeUpdateArbTerm(pVnode->pImpl, arbHbReq.arbTerm) != 0) {
108,924✔
1811
      dError("dnodeId:%d vgId:%d failed to update arb term", arbHbReq.dnodeId, pReqMember->vgId);
×
1812
      vmReleaseVnode(pMgmt, pVnode);
×
1813
      continue;
×
1814
    }
1815

1816
    if (taosArrayPush(arbHbRsp.hbMembers, &rspMember) == NULL) {
217,848✔
1817
      dError("dnodeId:%d vgId:%d failed to push arb hb rsp member", arbHbReq.dnodeId, pReqMember->vgId);
×
1818
      vmReleaseVnode(pMgmt, pVnode);
×
1819
      goto _OVER;
×
1820
    }
1821

1822
    vmReleaseVnode(pMgmt, pVnode);
108,924✔
1823
  }
1824

1825
  SRpcMsg rspMsg = {.info = pMsg->info};
113,089✔
1826
  int32_t rspLen = tSerializeSVArbHeartBeatRsp(NULL, 0, &arbHbRsp);
113,089✔
1827
  if (rspLen < 0) {
113,089✔
1828
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1829
    goto _OVER;
×
1830
  }
1831

1832
  void *pRsp = rpcMallocCont(rspLen);
113,089✔
1833
  if (pRsp == NULL) {
113,089✔
1834
    terrno = terrno;
×
1835
    goto _OVER;
×
1836
  }
1837

1838
  if (tSerializeSVArbHeartBeatRsp(pRsp, rspLen, &arbHbRsp) <= 0) {
113,089✔
1839
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1840
    rpcFreeCont(pRsp);
×
1841
    goto _OVER;
×
1842
  }
1843
  pMsg->info.rsp = pRsp;
113,089✔
1844
  pMsg->info.rspLen = rspLen;
113,089✔
1845

1846
  terrno = TSDB_CODE_SUCCESS;
113,089✔
1847

1848
_OVER:
113,089✔
1849
  tFreeSVArbHeartBeatReq(&arbHbReq);
113,089✔
1850
  tFreeSVArbHeartBeatRsp(&arbHbRsp);
113,089✔
1851
  return terrno;
113,089✔
1852
}
1853

1854
SArray *vmGetMsgHandles() {
540,976✔
1855
  int32_t code = -1;
540,976✔
1856
  SArray *pArray = taosArrayInit(32, sizeof(SMgmtHandle));
540,976✔
1857
  if (pArray == NULL) goto _OVER;
540,976✔
1858

1859
  if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
540,976✔
1860
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
540,976✔
1861
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
540,976✔
1862
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
540,976✔
1863
  if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
540,976✔
1864
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
540,976✔
1865
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
540,976✔
1866
  if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
540,976✔
1867
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
540,976✔
1868
  if (dmSetMgmtHandle(pArray, TDMT_VND_VSUBTABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
540,976✔
1869
  if (dmSetMgmtHandle(pArray, TDMT_VND_VSTB_REF_DBS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
540,976✔
1870
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_CFG, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
540,976✔
1871
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
540,976✔
1872
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
540,976✔
1873
  if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
540,976✔
1874
  if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
540,976✔
1875
  if (dmSetMgmtHandle(pArray, TDMT_SCH_TASK_NOTIFY, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
540,976✔
1876
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
540,976✔
1877
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
540,976✔
1878
  if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_TTL_EXPIRED_TBS, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
540,976✔
1879
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
540,976✔
1880
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
540,976✔
1881
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
540,976✔
1882
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
540,976✔
1883
  if (dmSetMgmtHandle(pArray, TDMT_VND_SNODE_DROP_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
540,976✔
1884
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
540,976✔
1885
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DELETE_SUB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
540,976✔
1886
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
540,976✔
1887
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SEEK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
540,976✔
1888
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
540,976✔
1889
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME_PUSH, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
540,976✔
1890
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_WALINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
540,976✔
1891
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_COMMITTEDINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
540,976✔
1892
  if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
540,976✔
1893
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
540,976✔
1894
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
540,976✔
1895
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
540,976✔
1896
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
540,976✔
1897
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
540,976✔
1898
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_COMPACT_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
540,976✔
1899
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_TRIM_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
540,976✔
1900
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_SCAN_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
540,976✔
1901
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
540,976✔
1902
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_SCAN, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
540,976✔
1903
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
540,976✔
1904
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_NAME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
540,976✔
1905
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
540,976✔
1906
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
540,976✔
1907
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
540,976✔
1908

1909
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
540,976✔
1910
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
540,976✔
1911
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
540,976✔
1912
  if (dmSetMgmtHandle(pArray, TDMT_VND_DISABLE_WRITE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
540,976✔
1913
  if (dmSetMgmtHandle(pArray, TDMT_VND_SET_KEEP_VERSION, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
540,976✔
1914
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
540,976✔
1915
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
540,976✔
1916
  if (dmSetMgmtHandle(pArray, TDMT_VND_SCAN, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
540,976✔
1917
  if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
540,976✔
1918
  if (dmSetMgmtHandle(pArray, TDMT_VND_LIST_SSMIGRATE_FILESETS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
540,976✔
1919
  if (dmSetMgmtHandle(pArray, TDMT_VND_SSMIGRATE_FILESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
540,976✔
1920
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_SSMIGRATE_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
540,976✔
1921
  if (dmSetMgmtHandle(pArray, TDMT_VND_FOLLOWER_SSMIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
540,976✔
1922
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_SSMIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
540,976✔
1923
  if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM_WAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
540,976✔
1924
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
540,976✔
1925
  if (dmSetMgmtHandle(pArray, TDMT_DND_MOUNT_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
540,976✔
1926
  if (dmSetMgmtHandle(pArray, TDMT_DND_RETRIEVE_MOUNT_PATH, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
540,976✔
1927
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
540,976✔
1928
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
540,976✔
1929
  if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
540,976✔
1930
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CONFIG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
540,976✔
1931
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_ELECTBASELINE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
540,976✔
1932

1933
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT_ELECTION, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
540,976✔
1934
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
540,976✔
1935
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
540,976✔
1936
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
540,976✔
1937
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
540,976✔
1938
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
540,976✔
1939
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
540,976✔
1940
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
540,976✔
1941
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
540,976✔
1942
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
540,976✔
1943
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
540,976✔
1944
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_FORCE_FOLLOWER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
540,976✔
1945

1946
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
540,976✔
1947
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
540,976✔
1948
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
540,976✔
1949
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
540,976✔
1950
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
540,976✔
1951

1952
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_HEARTBEAT, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
540,976✔
1953
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_CHECK_SYNC, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
540,976✔
1954
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_ASSIGNED_LEADER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
540,976✔
1955

1956
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_FETCH, vmPutMsgToStreamReaderQueue, 0) == NULL) goto _OVER;
540,976✔
1957
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_PULL, vmPutMsgToStreamReaderQueue, 0) == NULL) goto _OVER;
540,976✔
1958
  code = 0;
540,976✔
1959

1960
_OVER:
540,976✔
1961
  if (code != 0) {
540,976✔
1962
    taosArrayDestroy(pArray);
×
1963
    return NULL;
×
1964
  } else {
1965
    return pArray;
540,976✔
1966
  }
1967
}
1968

1969
void vmUpdateMetricsInfo(SVnodeMgmt *pMgmt, int64_t clusterId) {
×
1970
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
1971

1972
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
1973
  while (pIter) {
×
1974
    SVnodeObj **ppVnode = pIter;
×
1975
    if (ppVnode == NULL || *ppVnode == NULL) {
×
1976
      continue;
×
1977
    }
1978

1979
    SVnodeObj *pVnode = *ppVnode;
×
1980
    if (!pVnode->failed) {
×
1981
      SRawWriteMetrics metrics = {0};
×
1982
      if (vnodeGetRawWriteMetrics(pVnode->pImpl, &metrics) == 0) {
×
1983
        // Add the metrics to the global metrics system with cluster ID
1984
        SName   name = {0};
×
1985
        int32_t code = tNameFromString(&name, pVnode->pImpl->config.dbname, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1986
        if (code < 0) {
×
1987
          dError("failed to get db name since %s", tstrerror(code));
×
1988
          continue;
×
1989
        }
1990
        code = addWriteMetrics(pVnode->vgId, pMgmt->pData->dnodeId, clusterId, tsLocalEp, name.dbname, &metrics);
×
1991
        if (code != TSDB_CODE_SUCCESS) {
×
1992
          dError("Failed to add write metrics for vgId: %d, code: %d", pVnode->vgId, code);
×
1993
        } else {
1994
          // After successfully adding metrics, reset the vnode's write metrics using atomic operations
1995
          if (vnodeResetRawWriteMetrics(pVnode->pImpl, &metrics) != 0) {
×
1996
            dError("Failed to reset write metrics for vgId: %d", pVnode->vgId);
×
1997
          }
1998
        }
1999
      } else {
2000
        dError("Failed to get write metrics for vgId: %d", pVnode->vgId);
×
2001
      }
2002
    }
2003
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
2004
  }
2005

2006
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
2007
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc