• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4871

04 Dec 2025 01:55AM UTC coverage: 64.654% (+0.1%) from 64.545%
#4871

push

travis-ci

guanshengliang
Merge branch '3.0' into cover/3.0

869 of 2219 new or added lines in 36 files covered. (39.16%)

441 existing lines in 120 files now uncovered.

159620 of 246882 relevant lines covered (64.65%)

110922946.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.89
/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "metrics.h"
18
#include "taos_monitor.h"
19
#include "vmInt.h"
20
#include "vnd.h"
21
#include "vnodeInt.h"
22

23
extern taos_counter_t *tsInsertCounter;
24

25
// Forward declaration for function defined in metrics.c
26
extern int32_t addWriteMetrics(int32_t vgId, int32_t dnodeId, int64_t clusterId, const char *dnodeEp,
27
                               const char *dbname, const SRawWriteMetrics *pRawMetrics);
28

29
void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
42,988,804✔
30
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad));
42,988,804✔
31
  if (pInfo->pVloads == NULL) return;
42,988,804✔
32

33
  tfsUpdateSize(pMgmt->pTfs);
42,988,804✔
34

35
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
42,988,804✔
36

37
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
42,988,804✔
38
  while (pIter) {
150,429,706✔
39
    SVnodeObj **ppVnode = pIter;
107,440,902✔
40
    if (ppVnode == NULL || *ppVnode == NULL) continue;
107,440,902✔
41

42
    SVnodeObj *pVnode = *ppVnode;
107,440,902✔
43
    SVnodeLoad vload = {.vgId = pVnode->vgId};
107,440,902✔
44
    if (!pVnode->failed) {
107,440,902✔
45
      if (vnodeGetLoad(pVnode->pImpl, &vload) != 0) {
107,440,902✔
46
        dError("failed to get vnode load");
×
47
      }
48
      if (isReset) vnodeResetLoad(pVnode->pImpl, &vload);
107,440,902✔
49
    }
50
    if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
214,881,804✔
51
      dError("failed to push vnode load");
×
52
    }
53
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
107,440,902✔
54
  }
55

56
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
42,988,804✔
57
}
58

59
void vmSetVnodeSyncTimeout(SVnodeMgmt *pMgmt) {
×
60
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
61

62
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
63
  while (pIter) {
×
64
    SVnodeObj **ppVnode = pIter;
×
65
    if (ppVnode == NULL || *ppVnode == NULL) continue;
×
66

67
    SVnodeObj *pVnode = *ppVnode;
×
68

69
    if (vnodeSetSyncTimeout(pVnode->pImpl, tsVnodeElectIntervalMs) != 0) {
×
70
      dError("vgId:%d, failed to vnodeSetSyncTimeout", pVnode->vgId);
×
71
    }
72
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
73
  }
74

75
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
76
}
×
77

78
void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
×
79
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoadLite));
×
80
  if (!pInfo->pVloads) return;
×
81

82
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
83

84
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
85
  while (pIter) {
×
86
    SVnodeObj **ppVnode = pIter;
×
87
    if (ppVnode == NULL || *ppVnode == NULL) continue;
×
88

89
    SVnodeObj *pVnode = *ppVnode;
×
90
    if (!pVnode->failed) {
×
91
      SVnodeLoadLite vload = {0};
×
92
      if (vnodeGetLoadLite(pVnode->pImpl, &vload) == 0) {
×
93
        if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
×
94
          taosArrayDestroy(pInfo->pVloads);
×
95
          pInfo->pVloads = NULL;
×
96
          break;
×
97
        }
98
      }
99
    }
100
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
101
  }
102

103
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
104
}
105

106
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
122✔
107
  SMonVloadInfo vloads = {0};
122✔
108
  vmGetVnodeLoads(pMgmt, &vloads, true);
122✔
109

110
  SArray *pVloads = vloads.pVloads;
122✔
111
  if (pVloads == NULL) return;
122✔
112

113
  int32_t totalVnodes = 0;
122✔
114
  int32_t masterNum = 0;
122✔
115
  int64_t numOfSelectReqs = 0;
122✔
116
  int64_t numOfInsertReqs = 0;
122✔
117
  int64_t numOfInsertSuccessReqs = 0;
122✔
118
  int64_t numOfBatchInsertReqs = 0;
122✔
119
  int64_t numOfBatchInsertSuccessReqs = 0;
122✔
120

121
  for (int32_t i = 0; i < taosArrayGetSize(pVloads); ++i) {
366✔
122
    SVnodeLoad *pLoad = taosArrayGet(pVloads, i);
244✔
123
    numOfSelectReqs += pLoad->numOfSelectReqs;
244✔
124
    numOfInsertReqs += pLoad->numOfInsertReqs;
244✔
125
    numOfInsertSuccessReqs += pLoad->numOfInsertSuccessReqs;
244✔
126
    numOfBatchInsertReqs += pLoad->numOfBatchInsertReqs;
244✔
127
    numOfBatchInsertSuccessReqs += pLoad->numOfBatchInsertSuccessReqs;
244✔
128
    if (pLoad->syncState == TAOS_SYNC_STATE_LEADER || pLoad->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
244✔
129
      masterNum++;
244✔
130
    }
131
    totalVnodes++;
244✔
132
  }
133

134
  pInfo->vstat.totalVnodes = totalVnodes;
122✔
135
  pInfo->vstat.masterNum = masterNum;
122✔
136
  pInfo->vstat.numOfSelectReqs = numOfSelectReqs;
122✔
137
  pInfo->vstat.numOfInsertReqs = numOfInsertReqs;                          // delta
122✔
138
  pInfo->vstat.numOfInsertSuccessReqs = numOfInsertSuccessReqs;            // delta
122✔
139
  pInfo->vstat.numOfBatchInsertReqs = numOfBatchInsertReqs;                // delta
122✔
140
  pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;  // delta
122✔
141
  pMgmt->state.totalVnodes = totalVnodes;
122✔
142
  pMgmt->state.masterNum = masterNum;
122✔
143
  pMgmt->state.numOfSelectReqs = numOfSelectReqs;
122✔
144
  pMgmt->state.numOfInsertReqs = numOfInsertReqs;
122✔
145
  pMgmt->state.numOfInsertSuccessReqs = numOfInsertSuccessReqs;
122✔
146
  pMgmt->state.numOfBatchInsertReqs = numOfBatchInsertReqs;
122✔
147
  pMgmt->state.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;
122✔
148

149
  if (tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs) != 0) {
122✔
150
    dError("failed to get tfs monitor info");
×
151
  }
152
  taosArrayDestroy(pVloads);
122✔
153
}
154

155
void vmCleanExpriedSamples(SVnodeMgmt *pMgmt) {
122✔
156
  int list_size = taos_counter_get_keys_size(tsInsertCounter);
122✔
157
  if (list_size == 0) return;
122✔
158
  int32_t *vgroup_ids;
×
159
  char   **keys;
×
160
  int      r = 0;
×
161
  r = taos_counter_get_vgroup_ids(tsInsertCounter, &keys, &vgroup_ids, &list_size);
×
162
  if (r) {
×
163
    dError("failed to get vgroup ids");
×
164
    return;
×
165
  }
166
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
167
  for (int i = 0; i < list_size; i++) {
×
168
    int32_t vgroup_id = vgroup_ids[i];
×
169
    void   *vnode = taosHashGet(pMgmt->runngingHash, &vgroup_id, sizeof(int32_t));
×
170
    if (vnode == NULL) {
×
171
      r = taos_counter_delete(tsInsertCounter, keys[i]);
×
172
      if (r) {
×
173
        dError("failed to delete monitor sample key:%s", keys[i]);
×
174
      }
175
    }
176
  }
177
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
178
  if (vgroup_ids) taosMemoryFree(vgroup_ids);
×
179
  if (keys) taosMemoryFree(keys);
×
180
  return;
×
181
}
182

183
void vmCleanExpiredMetrics(SVnodeMgmt *pMgmt) {
×
184
  if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0 || !tsEnableMetrics) {
×
185
    return;
×
186
  }
187

188
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
189
  void     *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
190
  SHashObj *pValidVgroups = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
×
191
  if (pValidVgroups == NULL) {
×
192
    (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
193
    return;
×
194
  }
195

196
  while (pIter != NULL) {
×
197
    SVnodeObj **ppVnode = pIter;
×
198
    if (ppVnode && *ppVnode) {
×
199
      int32_t vgId = (*ppVnode)->vgId;
×
200
      char    dummy = 1;  // hash table value (we only care about the key)
×
201
      if (taosHashPut(pValidVgroups, &vgId, sizeof(int32_t), &dummy, sizeof(char)) != 0) {
×
202
        dError("failed to put vgId:%d to valid vgroups hash", vgId);
×
203
      }
204
    }
205
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
206
  }
207
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
208

209
  // Clean expired metrics by removing metrics for non-existent vgroups
210
  int32_t code = cleanupExpiredMetrics(pValidVgroups);
×
211
  if (code != TSDB_CODE_SUCCESS) {
×
212
    dError("failed to clean expired metrics, code:%d", code);
×
213
  }
214

215
  taosHashCleanup(pValidVgroups);
×
216
}
217

218
static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
2,958,012✔
219
  memcpy(pCfg, &vnodeCfgDefault, sizeof(SVnodeCfg));
2,958,012✔
220

221
  pCfg->vgId = pCreate->vgId;
2,958,012✔
222
  tstrncpy(pCfg->dbname, pCreate->db, sizeof(pCfg->dbname));
2,957,780✔
223
  pCfg->dbId = pCreate->dbUid;
2,957,780✔
224
  pCfg->szPage = pCreate->pageSize * 1024;
2,957,780✔
225
  pCfg->szCache = pCreate->pages;
2,957,637✔
226
  pCfg->cacheLast = pCreate->cacheLast;
2,957,726✔
227
  pCfg->cacheLastSize = pCreate->cacheLastSize;
2,957,726✔
228
  pCfg->szBuf = (uint64_t)pCreate->buffer * 1024 * 1024;
2,957,726✔
229
  pCfg->isWeak = true;
2,957,958✔
230
  pCfg->isTsma = pCreate->isTsma;
2,956,801✔
231
  pCfg->tsdbCfg.compression = pCreate->compression;
2,958,012✔
232
  pCfg->tsdbCfg.precision = pCreate->precision;
2,957,306✔
233
  pCfg->tsdbCfg.days = pCreate->daysPerFile;
2,956,491✔
234
  pCfg->tsdbCfg.keep0 = pCreate->daysToKeep0;
2,956,213✔
235
  pCfg->tsdbCfg.keep1 = pCreate->daysToKeep1;
2,956,115✔
236
  pCfg->tsdbCfg.keep2 = pCreate->daysToKeep2;
2,954,904✔
237
  pCfg->tsdbCfg.keepTimeOffset = pCreate->keepTimeOffset;
2,956,571✔
238
  pCfg->tsdbCfg.minRows = pCreate->minRows;
2,953,998✔
239
  pCfg->tsdbCfg.maxRows = pCreate->maxRows;
2,957,193✔
240
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
241
  pCfg->tsdbCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
2,955,004✔
242
  if (pCfg->tsdbCfg.encryptAlgorithm == DND_CA_SM4) {
2,956,367✔
243
    tstrncpy(pCfg->tsdbCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
1,630✔
244
  }
245
#else
246
  pCfg->tsdbCfg.encryptAlgorithm = 0;
247
#endif
248

249
  pCfg->walCfg.vgId = pCreate->vgId;  // pCreate->mountVgId ? pCreate->mountVgId : pCreate->vgId;
2,952,956✔
250
  pCfg->walCfg.fsyncPeriod = pCreate->walFsyncPeriod;
2,954,472✔
251
  pCfg->walCfg.retentionPeriod = pCreate->walRetentionPeriod;
2,955,359✔
252
  pCfg->walCfg.rollPeriod = pCreate->walRollPeriod;
2,953,329✔
253
  pCfg->walCfg.retentionSize = pCreate->walRetentionSize;
2,955,704✔
254
  pCfg->walCfg.segSize = pCreate->walSegmentSize;
2,953,084✔
255
  pCfg->walCfg.level = pCreate->walLevel;
2,955,701✔
256
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
257
  pCfg->walCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
2,952,350✔
258
  if (pCfg->walCfg.encryptAlgorithm == DND_CA_SM4) {
2,952,744✔
259
    tstrncpy(pCfg->walCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
1,630✔
260
  }
261
#else
262
  pCfg->walCfg.encryptAlgorithm = 0;
263
#endif
264

265
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
266
  pCfg->tdbEncryptAlgorithm = pCreate->encryptAlgorithm;
2,951,143✔
267
  if (pCfg->tdbEncryptAlgorithm == DND_CA_SM4) {
2,954,399✔
268
    tstrncpy(pCfg->tdbEncryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
1,630✔
269
  }
270
#else
271
  pCfg->tdbEncryptAlgorithm = 0;
272
#endif
273

274
  pCfg->sttTrigger = pCreate->sstTrigger;
2,954,715✔
275
  pCfg->hashBegin = pCreate->hashBegin;
2,955,061✔
276
  pCfg->hashEnd = pCreate->hashEnd;
2,952,662✔
277
  pCfg->hashMethod = pCreate->hashMethod;
2,950,403✔
278
  pCfg->hashPrefix = pCreate->hashPrefix;
2,954,248✔
279
  pCfg->hashSuffix = pCreate->hashSuffix;
2,949,831✔
280
  pCfg->tsdbPageSize = pCreate->tsdbPageSize * 1024;
2,952,629✔
281

282
  pCfg->ssChunkSize = pCreate->ssChunkSize;
2,951,400✔
283
  pCfg->ssKeepLocal = pCreate->ssKeepLocal;
2,951,041✔
284
  pCfg->ssCompact = pCreate->ssCompact;
2,949,851✔
285

286
  pCfg->standby = 0;
2,951,953✔
287
  pCfg->syncCfg.replicaNum = 0;
2,953,896✔
288
  pCfg->syncCfg.totalReplicaNum = 0;
2,953,169✔
289
  pCfg->syncCfg.changeVersion = pCreate->changeVersion;
2,952,836✔
290

291
  memset(&pCfg->syncCfg.nodeInfo, 0, sizeof(pCfg->syncCfg.nodeInfo));
2,953,475✔
292
  for (int32_t i = 0; i < pCreate->replica; ++i) {
7,074,483✔
293
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
4,120,159✔
294
    pNode->nodeId = pCreate->replicas[pCfg->syncCfg.replicaNum].id;
4,115,475✔
295
    pNode->nodePort = pCreate->replicas[pCfg->syncCfg.replicaNum].port;
4,115,521✔
296
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
4,115,099✔
297
    tstrncpy(pNode->nodeFqdn, pCreate->replicas[pCfg->syncCfg.replicaNum].fqdn, TSDB_FQDN_LEN);
4,117,110✔
298
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
4,118,015✔
299
    pCfg->syncCfg.replicaNum++;
4,120,803✔
300
  }
301
  if (pCreate->selfIndex != -1) {
2,956,627✔
302
    pCfg->syncCfg.myIndex = pCreate->selfIndex;
2,860,110✔
303
  }
304
  for (int32_t i = pCfg->syncCfg.replicaNum; i < pCreate->replica + pCreate->learnerReplica; ++i) {
3,047,239✔
305
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
95,944✔
306
    pNode->nodeId = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].id;
95,944✔
307
    pNode->nodePort = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].port;
95,944✔
308
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
95,944✔
309
    tstrncpy(pNode->nodeFqdn, pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].fqdn, TSDB_FQDN_LEN);
95,944✔
310
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
95,944✔
311
    pCfg->syncCfg.totalReplicaNum++;
95,944✔
312
  }
313
  pCfg->syncCfg.totalReplicaNum += pCfg->syncCfg.replicaNum;
2,953,377✔
314
  if (pCreate->learnerSelfIndex != -1) {
2,951,283✔
315
    pCfg->syncCfg.myIndex = pCreate->replica + pCreate->learnerSelfIndex;
95,944✔
316
  }
317
}
2,953,197✔
318

319
static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) {
2,951,845✔
320
  pCfg->vgId = pCreate->vgId;
2,951,845✔
321
  pCfg->vgVersion = pCreate->vgVersion;
2,953,538✔
322
  pCfg->dropped = 0;
2,952,966✔
323
  snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCreate->vgId);
2,952,055✔
324
}
2,954,445✔
325

326
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
2,946,683✔
327
  SCreateVnodeReq req = {0};
2,946,683✔
328
  SVnodeCfg       vnodeCfg = {0};
2,954,378✔
329
  SWrapperCfg     wrapperCfg = {0};
2,953,098✔
330
  int32_t         code = -1;
2,953,795✔
331
  char            path[TSDB_FILENAME_LEN] = {0};
2,953,795✔
332

333
  if (tDeserializeSCreateVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
2,954,540✔
334
    return TSDB_CODE_INVALID_MSG;
×
335
  }
336

337
  if (req.learnerReplica == 0) {
2,952,564✔
338
    req.learnerSelfIndex = -1;
2,857,406✔
339
  }
340

341
  dInfo(
2,952,564✔
342
      "vgId:%d, vnode management handle msgType:%s, start to create vnode, page:%d pageSize:%d buffer:%d szPage:%d "
343
      "szBuf:%" PRIu64 ", cacheLast:%d cacheLastSize:%d sstTrigger:%d tsdbPageSize:%d %d dbname:%s dbId:%" PRId64
344
      ", days:%d keep0:%d keep1:%d keep2:%d keepTimeOffset%d ssChunkSize:%d ssKeepLocal:%d ssCompact:%d tsma:%d "
345
      "precision:%d compression:%d minRows:%d maxRows:%d"
346
      ", wal fsync:%d level:%d retentionPeriod:%d retentionSize:%" PRId64 " rollPeriod:%d segSize:%" PRId64
347
      ", hash method:%d begin:%u end:%u prefix:%d surfix:%d replica:%d selfIndex:%d "
348
      "learnerReplica:%d learnerSelfIndex:%d strict:%d changeVersion:%d encryptAlgorithm:%d",
349
      req.vgId, TMSG_INFO(pMsg->msgType), req.pages, req.pageSize, req.buffer, req.pageSize * 1024,
350
      (uint64_t)req.buffer * 1024 * 1024, req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize,
351
      req.tsdbPageSize * 1024, req.db, req.dbUid, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
352
      req.keepTimeOffset, req.ssChunkSize, req.ssKeepLocal, req.ssCompact, req.isTsma, req.precision, req.compression,
353
      req.minRows, req.maxRows, req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize,
354
      req.walRollPeriod, req.walSegmentSize, req.hashMethod, req.hashBegin, req.hashEnd, req.hashPrefix, req.hashSuffix,
355
      req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict, req.changeVersion,
356
      req.encryptAlgorithm);
357

358
  for (int32_t i = 0; i < req.replica; ++i) {
7,075,107✔
359
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port,
4,118,855✔
360
          req.replicas[i].id);
361
  }
362
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
3,052,196✔
363
    dInfo("vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.vgId, i, req.learnerReplicas[i].fqdn,
95,944✔
364
          req.learnerReplicas[i].port, req.replicas[i].id);
365
  }
366

367
  SReplica *pReplica = NULL;
2,956,252✔
368
  if (req.selfIndex != -1) {
2,956,252✔
369
    pReplica = &req.replicas[req.selfIndex];
2,859,548✔
370
  } else {
371
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
96,704✔
372
  }
373
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
2,955,492✔
374
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
2,955,492✔
UNCOV
375
    (void)tFreeSCreateVnodeReq(&req);
×
376

377
    code = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
378
    dError("vgId:%d, dnodeId:%d ep:%s:%u in request, ep:%s:%u in local, %s", req.vgId, pReplica->id,
×
379
           pReplica->fqdn, pReplica->port, tsLocalFqdn, tsServerPort, tstrerror(code));
380
    return code;
×
381
  }
382

383
  if (req.encryptAlgorithm == DND_CA_SM4) {
2,955,492✔
384
    if (strlen(tsEncryptKey) == 0) {
1,630✔
385
      (void)tFreeSCreateVnodeReq(&req);
×
386
      code = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
387
      dError("vgId:%d, failed to create vnode since encrypt key is empty, reason:%s", req.vgId, tstrerror(code));
×
388
      return code;
×
389
    }
390
  }
391

392
  vmGenerateVnodeCfg(&req, &vnodeCfg);
2,955,492✔
393

394
  vmGenerateWrapperCfg(pMgmt, &req, &wrapperCfg);
2,950,698✔
395

396
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, req.vgId, false);
2,954,010✔
397
  if (pVnode != NULL && (req.replica == 1 || !pVnode->failed)) {
2,953,544✔
398
    dError("vgId:%d, already exist", req.vgId);
42,695✔
399
    (void)tFreeSCreateVnodeReq(&req);
42,695✔
400
    vmReleaseVnode(pMgmt, pVnode);
42,695✔
401
    code = TSDB_CODE_VND_ALREADY_EXIST;
42,695✔
402
    return 0;
42,695✔
403
  }
404

405
  int32_t diskPrimary = vmGetPrimaryDisk(pMgmt, vnodeCfg.vgId);
2,910,849✔
406
  if (diskPrimary < 0) {
2,909,237✔
407
    diskPrimary = vmAllocPrimaryDisk(pMgmt, vnodeCfg.vgId);
2,910,697✔
408
  }
409
  wrapperCfg.diskPrimary = diskPrimary;
2,911,337✔
410

411
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
2,911,337✔
412

413
  if ((code = vnodeCreate(path, &vnodeCfg, diskPrimary, pMgmt->pTfs)) < 0) {
2,911,337✔
414
    dError("vgId:%d, failed to create vnode since %s", req.vgId, tstrerror(code));
×
415
    vmReleaseVnode(pMgmt, pVnode);
×
416
    vmCleanPrimaryDisk(pMgmt, req.vgId);
×
417
    (void)tFreeSCreateVnodeReq(&req);
×
418
    return code;
×
419
  }
420

421
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, true);
2,912,797✔
422
  if (pImpl == NULL) {
2,912,797✔
423
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
424
    code = terrno != 0 ? terrno : -1;
×
425
    goto _OVER;
×
426
  }
427

428
  code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl);
2,912,797✔
429
  if (code != 0) {
2,912,797✔
430
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
431
    code = terrno != 0 ? terrno : code;
×
432
    goto _OVER;
×
433
  }
434

435
  code = vnodeStart(pImpl);
2,912,797✔
436
  if (code != 0) {
2,912,797✔
437
    dError("vgId:%d, failed to start sync since %s", req.vgId, terrstr());
×
438
    goto _OVER;
×
439
  }
440

441
  code = vmWriteVnodeListToFile(pMgmt);
2,912,797✔
442
  if (code != 0) {
2,912,797✔
443
    code = terrno != 0 ? terrno : code;
×
444
    goto _OVER;
×
445
  }
446

447
_OVER:
2,912,797✔
448
  vmCleanPrimaryDisk(pMgmt, req.vgId);
2,912,797✔
449

450
  if (code != 0) {
2,912,797✔
451
    vmCloseFailedVnode(pMgmt, req.vgId);
×
452

453
    vnodeClose(pImpl);
×
454
    vnodeDestroy(0, path, pMgmt->pTfs, 0);
×
455
  } else {
456
    dInfo("vgId:%d, vnode management handle msgType:%s, end to create vnode, vnode is created", req.vgId,
2,912,797✔
457
          TMSG_INFO(pMsg->msgType));
458
  }
459

460
  (void)tFreeSCreateVnodeReq(&req);
2,912,797✔
461
  terrno = code;
2,912,797✔
462
  return code;
2,912,797✔
463
}
464

465
#ifdef USE_MOUNT
466
typedef struct {
467
  int64_t dbId;
468
  int32_t vgId;
469
  int32_t diskPrimary;
470
} SMountDbVgId;
471
extern int32_t vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo);
472
extern int32_t mndFetchSdbStables(const char *mntName, const char *path, void *output);
473

474
static int compareVnodeInfo(const void *p1, const void *p2) {
6,629✔
475
  SVnodeInfo *v1 = (SVnodeInfo *)p1;
6,629✔
476
  SVnodeInfo *v2 = (SVnodeInfo *)p2;
6,629✔
477

478
  if (v1->config.dbId == v2->config.dbId) {
6,629✔
479
    if (v1->config.vgId == v2->config.vgId) {
3,788✔
480
      return 0;
×
481
    }
482
    return v1->config.vgId > v2->config.vgId ? 1 : -1;
3,788✔
483
  }
484

485
  return v1->config.dbId > v2->config.dbId ? 1 : -1;
2,841✔
486
}
487
static int compareVgDiskPrimary(const void *p1, const void *p2) {
6,629✔
488
  SMountDbVgId *v1 = (SMountDbVgId *)p1;
6,629✔
489
  SMountDbVgId *v2 = (SMountDbVgId *)p2;
6,629✔
490

491
  if (v1->dbId == v2->dbId) {
6,629✔
492
    if (v1->vgId == v2->vgId) {
3,788✔
493
      return 0;
×
494
    }
495
    return v1->vgId > v2->vgId ? 1 : -1;
3,788✔
496
  }
497

498
  return v1->dbId > v2->dbId ? 1 : -1;
2,841✔
499
}
500

501
static int32_t vmRetrieveMountDnode(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
947✔
502
  int32_t   code = 0, lino = 0;
947✔
503
  TdFilePtr pFile = NULL;
947✔
504
  char     *content = NULL;
947✔
505
  SJson    *pJson = NULL;
947✔
506
  int64_t   size = 0;
947✔
507
  int64_t   clusterId = 0, dropped = 0, encryptScope = 0;
947✔
508
  char      file[TSDB_MOUNT_FPATH_LEN] = {0};
947✔
509
  SArray   *pDisks = NULL;
947✔
510
  // step 1: fetch clusterId from dnode.json
511
  (void)snprintf(file, sizeof(file), "%s%s%s%sdnode.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP);
947✔
512
  TAOS_CHECK_EXIT(taosStatFile(file, &size, NULL, NULL));
947✔
513
  TSDB_CHECK_NULL((pFile = taosOpenFile(file, TD_FILE_READ)), code, lino, _exit, terrno);
947✔
514
  TSDB_CHECK_NULL((content = taosMemoryMalloc(size + 1)), code, lino, _exit, terrno);
947✔
515
  if (taosReadFile(pFile, content, size) != size) {
947✔
516
    TAOS_CHECK_EXIT(terrno);
×
517
  }
518
  content[size] = '\0';
947✔
519
  pJson = tjsonParse(content);
947✔
520
  if (pJson == NULL) {
947✔
521
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
522
  }
523
  tjsonGetNumberValue(pJson, "dropped", dropped, code);
947✔
524
  TAOS_CHECK_EXIT(code);
947✔
525
  if (dropped == 1) {
947✔
526
    TAOS_CHECK_EXIT(TSDB_CODE_MND_MOUNT_DNODE_DROPPED);
×
527
  }
528
  tjsonGetNumberValue(pJson, "encryptScope", encryptScope, code);
947✔
529
  TAOS_CHECK_EXIT(code);
947✔
530
  if (encryptScope != 0) {
947✔
531
    TAOS_CHECK_EXIT(TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG);
×
532
  }
533
  tjsonGetNumberValue(pJson, "clusterId", clusterId, code);
947✔
534
  TAOS_CHECK_EXIT(code);
947✔
535
  if (clusterId == 0) {
947✔
536
    TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_CLUSTER_ID);
×
537
  }
538
  pMountInfo->clusterId = clusterId;
947✔
539
  if (content != NULL) taosMemoryFreeClear(content);
947✔
540
  if (pJson != NULL) {
947✔
541
    cJSON_Delete(pJson);
947✔
542
    pJson = NULL;
947✔
543
  }
544
  if (pFile != NULL) taosCloseFile(&pFile);
947✔
545
  // step 2: fetch dataDir from dnode/config/local.json
546
  TAOS_CHECK_EXIT(vmGetMountDisks(pMgmt, pReq->mountPath, &pDisks));
947✔
547
  int32_t nDisks = taosArrayGetSize(pDisks);
947✔
548
  if (nDisks < 1 || nDisks > TFS_MAX_DISKS) {
947✔
549
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
550
  }
551
  for (int32_t i = 0; i < nDisks; ++i) {
6,629✔
552
    SDiskCfg *pDisk = TARRAY_GET_ELEM(pDisks, i);
5,682✔
553
    if (!pMountInfo->pDisks[pDisk->level]) {
5,682✔
554
      pMountInfo->pDisks[pDisk->level] = taosArrayInit(1, sizeof(char *));
1,894✔
555
      if (!pMountInfo->pDisks[pDisk->level]) {
1,894✔
556
        TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
557
      }
558
    }
559
    char *pDir = taosStrdup(pDisk->dir);
5,682✔
560
    if (pDir == NULL) {
5,682✔
561
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
562
    }
563
    if (pDisk->primary == 1 && taosArrayGetSize(pMountInfo->pDisks[0])) {
5,682✔
564
      // put the primary disk to the first position of level 0
565
      if (!taosArrayInsert(pMountInfo->pDisks[0], 0, &pDir)) {
947✔
566
        taosMemFree(pDir);
×
567
        TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
568
      }
569
    } else if (!taosArrayPush(pMountInfo->pDisks[pDisk->level], &pDir)) {
9,470✔
570
      taosMemFree(pDir);
×
571
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
572
    }
573
  }
574
  for (int32_t i = 0; i < TFS_MAX_TIERS; ++i) {
3,788✔
575
    int32_t nDisk = taosArrayGetSize(pMountInfo->pDisks[i]);
2,841✔
576
    if (nDisk < (i == 0 ? 1 : 0) || nDisk > TFS_MAX_DISKS_PER_TIER) {
2,841✔
577
      dError("mount:%s, invalid disk number:%d at level:%d", pReq->mountName, nDisk, i);
×
578
      TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
579
    }
580
  }
581
_exit:
947✔
582
  if (content != NULL) taosMemoryFreeClear(content);
947✔
583
  if (pJson != NULL) cJSON_Delete(pJson);
947✔
584
  if (pFile != NULL) taosCloseFile(&pFile);
947✔
585
  if (pDisks != NULL) {
947✔
586
    taosArrayDestroy(pDisks);
947✔
587
    pDisks = NULL;
947✔
588
  }
589
  if (code != 0) {
947✔
590
    dError("mount:%s, failed to retrieve mount dnode at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
591
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
592
  } else {
593
    dInfo("mount:%s, success to retrieve mount dnode on dnode:%d, clusterId:%" PRId64 ", path:%s", pReq->mountName,
947✔
594
          pReq->dnodeId, pMountInfo->clusterId, pReq->mountPath);
595
  }
596
  TAOS_RETURN(code);
947✔
597
}
598

599
static int32_t vmRetrieveMountVnodes(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
947✔
600
  int32_t       code = 0, lino = 0;
947✔
601
  SWrapperCfg  *pCfgs = NULL;
947✔
602
  int32_t       numOfVnodes = 0;
947✔
603
  char          path[TSDB_MOUNT_FPATH_LEN] = {0};
947✔
604
  TdDirPtr      pDir = NULL;
947✔
605
  TdDirEntryPtr de = NULL;
947✔
606
  SVnodeMgmt    vnodeMgmt = {0};
947✔
607
  SArray       *pVgCfgs = NULL;
947✔
608
  SArray       *pDbInfos = NULL;
947✔
609
  SArray       *pDiskPrimarys = NULL;
947✔
610

611
  snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
947✔
612
  vnodeMgmt.path = path;
947✔
613
  TAOS_CHECK_EXIT(vmGetVnodeListFromFile(&vnodeMgmt, &pCfgs, &numOfVnodes));
947✔
614
  dInfo("mount:%s, num of vnodes is %d in path:%s", pReq->mountName, numOfVnodes, vnodeMgmt.path);
947✔
615
  TSDB_CHECK_NULL((pVgCfgs = taosArrayInit_s(sizeof(SVnodeInfo), numOfVnodes)), code, lino, _exit, terrno);
947✔
616
  TSDB_CHECK_NULL((pDiskPrimarys = taosArrayInit(numOfVnodes, sizeof(SMountDbVgId))), code, lino, _exit, terrno);
947✔
617

618
  int32_t nDiskLevel0 = taosArrayGetSize(pMountInfo->pDisks[0]);
947✔
619
  int32_t nVgDropped = 0, j = 0;
947✔
620
  for (int32_t i = 0; i < numOfVnodes; ++i) {
4,735✔
621
    SWrapperCfg *pCfg = &pCfgs[i];
3,788✔
622
    // in order to support multi-tier disk, the pCfg->path should be adapted according to the diskPrimary firstly
623
    if (nDiskLevel0 > 1) {
3,788✔
624
      char *pDir = taosArrayGet(pMountInfo->pDisks[0], pCfg->diskPrimary);
3,788✔
625
      if (!pDir) TAOS_CHECK_EXIT(TSDB_CODE_INTERNAL_ERROR);
3,788✔
626
      (void)snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%svnode%d", *(char **)pDir, TD_DIRSEP, TD_DIRSEP,
3,788✔
627
                     pCfg->vgId);
628
    }
629
    dInfo("mount:%s, vnode path:%s, dropped:%" PRIi8, pReq->mountName, pCfg->path, pCfg->dropped);
3,788✔
630
    if (pCfg->dropped) {
3,788✔
631
      ++nVgDropped;
×
632
      continue;
×
633
    }
634
    if (!taosCheckAccessFile(pCfg->path, TD_FILE_ACCESS_EXIST_OK | TD_FILE_ACCESS_READ_OK | TD_FILE_ACCESS_WRITE_OK)) {
3,788✔
635
      dError("mount:%s, vnode path:%s, no r/w authority", pReq->mountName, pCfg->path);
×
636
      TAOS_CHECK_EXIT(TSDB_CODE_MND_NO_RIGHTS);
×
637
    }
638
    SVnodeInfo *pInfo = TARRAY_GET_ELEM(pVgCfgs, j++);
3,788✔
639
    TAOS_CHECK_EXIT(vnodeLoadInfo(pCfg->path, pInfo));
3,788✔
640
    if (pInfo->config.syncCfg.replicaNum > 1) {
3,788✔
641
      dError("mount:%s, vnode path:%s, invalid replica:%d", pReq->mountName, pCfg->path,
×
642
             pInfo->config.syncCfg.replicaNum);
643
      TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_REPLICA);
×
644
    } else if (pInfo->config.vgId != pCfg->vgId) {
3,788✔
645
      dError("mount:%s, vnode path:%s, vgId:%d not match:%d", pReq->mountName, pCfg->path, pInfo->config.vgId,
×
646
             pCfg->vgId);
647
      TAOS_CHECK_EXIT(TSDB_CODE_FILE_CORRUPTED);
×
648
    } else if (pInfo->config.tdbEncryptAlgorithm || pInfo->config.tsdbCfg.encryptAlgorithm ||
3,788✔
649
               pInfo->config.walCfg.encryptAlgorithm) {
3,788✔
650
      dError("mount:%s, vnode path:%s, invalid encrypt algorithm, tdb:%d wal:%d tsdb:%d", pReq->mountName, pCfg->path,
×
651
             pInfo->config.tdbEncryptAlgorithm, pInfo->config.walCfg.encryptAlgorithm,
652
             pInfo->config.tsdbCfg.encryptAlgorithm);
653
      TAOS_CHECK_EXIT(TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG);
×
654
    }
655
    SMountDbVgId dbVgId = {.dbId = pInfo->config.dbId, .vgId = pInfo->config.vgId, .diskPrimary = pCfg->diskPrimary};
3,788✔
656
    TSDB_CHECK_NULL(taosArrayPush(pDiskPrimarys, &dbVgId), code, lino, _exit, terrno);
3,788✔
657
  }
658
  if (nVgDropped > 0) {
947✔
659
    dInfo("mount:%s, %d vnodes are dropped", pReq->mountName, nVgDropped);
×
660
    int32_t nVgToDrop = taosArrayGetSize(pVgCfgs) - nVgDropped;
×
661
    if (nVgToDrop > 0) taosArrayRemoveBatch(pVgCfgs, nVgToDrop - 1, nVgToDrop, NULL);
×
662
  }
663
  int32_t nVgCfg = taosArrayGetSize(pVgCfgs);
947✔
664
  int32_t nDiskPrimary = taosArrayGetSize(pDiskPrimarys);
947✔
665
  if (nVgCfg != nDiskPrimary) {
947✔
666
    dError("mount:%s, nVgCfg:%d not match nDiskPrimary:%d", pReq->mountName, nVgCfg, nDiskPrimary);
×
667
    TAOS_CHECK_EXIT(TSDB_CODE_APP_ERROR);
×
668
  }
669
  if (nVgCfg > 1) {
947✔
670
    taosArraySort(pVgCfgs, compareVnodeInfo);
947✔
671
    taosArraySort(pDiskPrimarys, compareVgDiskPrimary);
947✔
672
  }
673

674
  int64_t clusterId = pMountInfo->clusterId;
947✔
675
  int64_t dbId = 0, vgId = 0, nDb = 0;
947✔
676
  for (int32_t i = 0; i < nVgCfg; ++i) {
3,467✔
677
    SVnodeInfo *pInfo = TARRAY_GET_ELEM(pVgCfgs, i);
2,837✔
678
    if (clusterId != pInfo->config.syncCfg.nodeInfo->clusterId) {
2,837✔
679
      dError("mount:%s, clusterId:%" PRId64 " not match:%" PRId64, pReq->mountName, clusterId,
317✔
680
             pInfo->config.syncCfg.nodeInfo->clusterId);
681
      TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_CLUSTER_ID);
317✔
682
    }
683
    if (dbId != pInfo->config.dbId) {
2,520✔
684
      dbId = pInfo->config.dbId;
1,260✔
685
      ++nDb;
1,260✔
686
    }
687
    if (vgId == pInfo->config.vgId) {
2,520✔
688
      TAOS_CHECK_EXIT(TSDB_CODE_FILE_CORRUPTED);
×
689
    } else {
690
      vgId = pInfo->config.vgId;
2,520✔
691
    }
692
  }
693

694
  if (nDb > 0) {
630✔
695
    TSDB_CHECK_NULL((pDbInfos = taosArrayInit_s(sizeof(SMountDbInfo), nDb)), code, lino, _exit, terrno);
630✔
696
    int32_t dbIdx = -1;
630✔
697
    for (int32_t i = 0; i < nVgCfg; ++i) {
3,150✔
698
      SVnodeInfo   *pVgCfg = TARRAY_GET_ELEM(pVgCfgs, i);
2,520✔
699
      SMountDbVgId *pDiskPrimary = TARRAY_GET_ELEM(pDiskPrimarys, i);
2,520✔
700
      SMountDbInfo *pDbInfo = NULL;
2,520✔
701
      if (i == 0 || ((SMountDbInfo *)TARRAY_GET_ELEM(pDbInfos, dbIdx))->dbId != pVgCfg->config.dbId) {
2,520✔
702
        pDbInfo = TARRAY_GET_ELEM(pDbInfos, ++dbIdx);
1,260✔
703
        pDbInfo->dbId = pVgCfg->config.dbId;
1,260✔
704
        snprintf(pDbInfo->dbName, sizeof(pDbInfo->dbName), "%s", pVgCfg->config.dbname);
1,260✔
705
        TSDB_CHECK_NULL((pDbInfo->pVgs = taosArrayInit(nVgCfg / nDb, sizeof(SMountVgInfo))), code, lino, _exit, terrno);
1,260✔
706
      } else {
707
        pDbInfo = TARRAY_GET_ELEM(pDbInfos, dbIdx);
1,260✔
708
      }
709
      SMountVgInfo vgInfo = {
2,520✔
710
          .diskPrimary = pDiskPrimary->diskPrimary,
2,520✔
711
          .vgId = pVgCfg->config.vgId,
2,520✔
712
          .dbId = pVgCfg->config.dbId,
2,520✔
713
          .cacheLastSize = pVgCfg->config.cacheLastSize,
2,520✔
714
          .szPage = pVgCfg->config.szPage,
2,520✔
715
          .szCache = pVgCfg->config.szCache,
2,520✔
716
          .szBuf = pVgCfg->config.szBuf,
2,520✔
717
          .cacheLast = pVgCfg->config.cacheLast,
2,520✔
718
          .standby = pVgCfg->config.standby,
2,520✔
719
          .hashMethod = pVgCfg->config.hashMethod,
2,520✔
720
          .hashBegin = pVgCfg->config.hashBegin,
2,520✔
721
          .hashEnd = pVgCfg->config.hashEnd,
2,520✔
722
          .hashPrefix = pVgCfg->config.hashPrefix,
2,520✔
723
          .hashSuffix = pVgCfg->config.hashSuffix,
2,520✔
724
          .sttTrigger = pVgCfg->config.sttTrigger,
2,520✔
725
          .replications = pVgCfg->config.syncCfg.replicaNum,
2,520✔
726
          .precision = pVgCfg->config.tsdbCfg.precision,
2,520✔
727
          .compression = pVgCfg->config.tsdbCfg.compression,
2,520✔
728
          .slLevel = pVgCfg->config.tsdbCfg.slLevel,
2,520✔
729
          .daysPerFile = pVgCfg->config.tsdbCfg.days,
2,520✔
730
          .keep0 = pVgCfg->config.tsdbCfg.keep0,
2,520✔
731
          .keep1 = pVgCfg->config.tsdbCfg.keep1,
2,520✔
732
          .keep2 = pVgCfg->config.tsdbCfg.keep2,
2,520✔
733
          .keepTimeOffset = pVgCfg->config.tsdbCfg.keepTimeOffset,
2,520✔
734
          .minRows = pVgCfg->config.tsdbCfg.minRows,
2,520✔
735
          .maxRows = pVgCfg->config.tsdbCfg.maxRows,
2,520✔
736
          .tsdbPageSize = pVgCfg->config.tsdbPageSize / 1024,
2,520✔
737
          .ssChunkSize = pVgCfg->config.ssChunkSize,
2,520✔
738
          .ssKeepLocal = pVgCfg->config.ssKeepLocal,
2,520✔
739
          .ssCompact = pVgCfg->config.ssCompact,
2,520✔
740
          .walFsyncPeriod = pVgCfg->config.walCfg.fsyncPeriod,
2,520✔
741
          .walRetentionPeriod = pVgCfg->config.walCfg.retentionPeriod,
2,520✔
742
          .walRollPeriod = pVgCfg->config.walCfg.rollPeriod,
2,520✔
743
          .walRetentionSize = pVgCfg->config.walCfg.retentionSize,
2,520✔
744
          .walSegSize = pVgCfg->config.walCfg.segSize,
2,520✔
745
          .walLevel = pVgCfg->config.walCfg.level,
2,520✔
746
          .encryptAlgorithm = pVgCfg->config.walCfg.encryptAlgorithm,
2,520✔
747
          .committed = pVgCfg->state.committed,
2,520✔
748
          .commitID = pVgCfg->state.commitID,
2,520✔
749
          .commitTerm = pVgCfg->state.commitTerm,
2,520✔
750
          .numOfSTables = pVgCfg->config.vndStats.numOfSTables,
2,520✔
751
          .numOfCTables = pVgCfg->config.vndStats.numOfCTables,
2,520✔
752
          .numOfNTables = pVgCfg->config.vndStats.numOfNTables,
2,520✔
753
      };
754
      TSDB_CHECK_NULL(taosArrayPush(pDbInfo->pVgs, &vgInfo), code, lino, _exit, terrno);
5,040✔
755
    }
756
  }
757

758
  pMountInfo->pDbs = pDbInfos;
630✔
759

760
_exit:
947✔
761
  if (code != 0) {
947✔
762
    dError("mount:%s, failed to retrieve mount vnode at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
317✔
763
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
764
  }
765
  taosArrayDestroy(pDiskPrimarys);
947✔
766
  taosArrayDestroy(pVgCfgs);
947✔
767
  taosMemoryFreeClear(pCfgs);
947✔
768
  TAOS_RETURN(code);
947✔
769
}
770

771
/**
772
 *   Retrieve the stables from vnode meta.
773
 */
774
static int32_t vmRetrieveMountStbs(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
630✔
775
  int32_t code = 0, lino = 0;
630✔
776
  char    path[TSDB_MOUNT_FPATH_LEN] = {0};
630✔
777
  int32_t nDb = taosArrayGetSize(pMountInfo->pDbs);
630✔
778
  SArray *suidList = NULL;
630✔
779
  SArray *pCols = NULL;
630✔
780
  SArray *pTags = NULL;
630✔
781
  SArray *pColExts = NULL;
630✔
782
  SArray *pTagExts = NULL;
630✔
783

784
  snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
630✔
785
  for (int32_t i = 0; i < nDb; ++i) {
1,890✔
786
    SMountDbInfo *pDbInfo = TARRAY_GET_ELEM(pMountInfo->pDbs, i);
1,260✔
787
    int32_t       nVg = taosArrayGetSize(pDbInfo->pVgs);
1,260✔
788
    for (int32_t j = 0; j < nVg; ++j) {
1,260✔
789
      SMountVgInfo *pVgInfo = TARRAY_GET_ELEM(pDbInfo->pVgs, j);
1,260✔
790
      SVnode        vnode = {
1,260✔
791
                 .config.vgId = pVgInfo->vgId,
1,260✔
792
                 .config.dbId = pVgInfo->dbId,
1,260✔
793
                 .config.cacheLastSize = pVgInfo->cacheLastSize,
1,260✔
794
                 .config.szPage = pVgInfo->szPage,
1,260✔
795
                 .config.szCache = pVgInfo->szCache,
1,260✔
796
                 .config.szBuf = pVgInfo->szBuf,
1,260✔
797
                 .config.cacheLast = pVgInfo->cacheLast,
1,260✔
798
                 .config.standby = pVgInfo->standby,
1,260✔
799
                 .config.hashMethod = pVgInfo->hashMethod,
1,260✔
800
                 .config.hashBegin = pVgInfo->hashBegin,
1,260✔
801
                 .config.hashEnd = pVgInfo->hashEnd,
1,260✔
802
                 .config.hashPrefix = pVgInfo->hashPrefix,
1,260✔
803
                 .config.hashSuffix = pVgInfo->hashSuffix,
1,260✔
804
                 .config.sttTrigger = pVgInfo->sttTrigger,
1,260✔
805
                 .config.syncCfg.replicaNum = pVgInfo->replications,
1,260✔
806
                 .config.tsdbCfg.precision = pVgInfo->precision,
1,260✔
807
                 .config.tsdbCfg.compression = pVgInfo->compression,
1,260✔
808
                 .config.tsdbCfg.slLevel = pVgInfo->slLevel,
1,260✔
809
                 .config.tsdbCfg.days = pVgInfo->daysPerFile,
1,260✔
810
                 .config.tsdbCfg.keep0 = pVgInfo->keep0,
1,260✔
811
                 .config.tsdbCfg.keep1 = pVgInfo->keep1,
1,260✔
812
                 .config.tsdbCfg.keep2 = pVgInfo->keep2,
1,260✔
813
                 .config.tsdbCfg.keepTimeOffset = pVgInfo->keepTimeOffset,
1,260✔
814
                 .config.tsdbCfg.minRows = pVgInfo->minRows,
1,260✔
815
                 .config.tsdbCfg.maxRows = pVgInfo->maxRows,
1,260✔
816
                 .config.tsdbPageSize = pVgInfo->tsdbPageSize,
1,260✔
817
                 .config.ssChunkSize = pVgInfo->ssChunkSize,
1,260✔
818
                 .config.ssKeepLocal = pVgInfo->ssKeepLocal,
1,260✔
819
                 .config.ssCompact = pVgInfo->ssCompact,
1,260✔
820
                 .config.walCfg.fsyncPeriod = pVgInfo->walFsyncPeriod,
1,260✔
821
                 .config.walCfg.retentionPeriod = pVgInfo->walRetentionPeriod,
1,260✔
822
                 .config.walCfg.rollPeriod = pVgInfo->walRollPeriod,
1,260✔
823
                 .config.walCfg.retentionSize = pVgInfo->walRetentionSize,
1,260✔
824
                 .config.walCfg.segSize = pVgInfo->walSegSize,
1,260✔
825
                 .config.walCfg.level = pVgInfo->walLevel,
1,260✔
826
                 .config.walCfg.encryptAlgorithm = pVgInfo->encryptAlgorithm,
1,260✔
827
                 .diskPrimary = pVgInfo->diskPrimary,
1,260✔
828
      };
829
      void *vnodePath = taosArrayGet(pMountInfo->pDisks[0], pVgInfo->diskPrimary);
1,260✔
830
      snprintf(path, sizeof(path), "%s%s%s%svnode%d", *(char **)vnodePath, TD_DIRSEP, dmNodeName(VNODE), TD_DIRSEP,
1,260✔
831
               pVgInfo->vgId);
832
      vnode.path = path;
1,260✔
833

834
      int32_t rollback = vnodeShouldRollback(&vnode);
1,260✔
835
      if ((code = metaOpen(&vnode, &vnode.pMeta, rollback)) != 0) {
1,260✔
836
        dError("mount:%s, failed to retrieve stbs of vnode:%d for db:%" PRId64 " on dnode:%d since %s, path:%s",
×
837
               pReq->mountName, pVgInfo->vgId, pVgInfo->dbId, pReq->dnodeId, tstrerror(code), path);
838
        TAOS_CHECK_EXIT(code);
×
839
      } else {
840
        dInfo("mount:%s, success to retrieve stbs of vnode:%d for db:%" PRId64 " on dnode:%d, path:%s", pReq->mountName,
1,260✔
841
              pVgInfo->vgId, pVgInfo->dbId, pReq->dnodeId, path);
842

843
        SMetaReader mr = {0};
1,260✔
844
        tb_uid_t    suid = 0;
1,260✔
845
        SMeta      *pMeta = vnode.pMeta;
1,260✔
846

847
        metaReaderDoInit(&mr, pMeta, META_READER_LOCK);
1,260✔
848
        if (!suidList && !(suidList = taosArrayInit(1, sizeof(tb_uid_t)))) {
1,260✔
849
          TSDB_CHECK_CODE(terrno, lino, _exit0);
×
850
        }
851
        taosArrayClear(suidList);
1,260✔
852
        TSDB_CHECK_CODE(vnodeGetStbIdList(&vnode, 0, suidList), lino, _exit0);
1,260✔
853
        dInfo("mount:%s, vnode:%d, db:%" PRId64 ", stbs num:%d on dnode:%d", pReq->mountName, pVgInfo->vgId,
1,260✔
854
              pVgInfo->dbId, (int32_t)taosArrayGetSize(suidList), pReq->dnodeId);
855
        int32_t nStbs = taosArrayGetSize(suidList);
1,260✔
856
        if (!pDbInfo->pStbs && !(pDbInfo->pStbs = taosArrayInit(nStbs, sizeof(void *)))) {
1,260✔
857
          TSDB_CHECK_CODE(terrno, lino, _exit0);
×
858
        }
859
        for (int32_t i = 0; i < nStbs; ++i) {
5,040✔
860
          suid = *(tb_uid_t *)taosArrayGet(suidList, i);
3,780✔
861
          dInfo("mount:%s, vnode:%d, db:%" PRId64 ", stb suid:%" PRIu64 " on dnode:%d", pReq->mountName, pVgInfo->vgId,
3,780✔
862
                pVgInfo->dbId, suid, pReq->dnodeId);
863
          if ((code = metaReaderGetTableEntryByUidCache(&mr, suid)) < 0) {
3,780✔
864
            TSDB_CHECK_CODE(code, lino, _exit0);
×
865
          }
866
          if (mr.me.uid != suid || mr.me.type != TSDB_SUPER_TABLE ||
3,780✔
867
              mr.me.colCmpr.nCols != mr.me.stbEntry.schemaRow.nCols) {
3,780✔
868
            dError("mount:%s, vnode:%d, db:%" PRId64 ", stb info not match, suid:%" PRIu64 " expected:%" PRIu64
×
869
                   ", type:%" PRIi8 " expected:%d, nCmprCols:%d nCols:%d on dnode:%d",
870
                   pReq->mountName, pVgInfo->vgId, pVgInfo->dbId, mr.me.uid, suid, mr.me.type, TSDB_SUPER_TABLE,
871
                   mr.me.colCmpr.nCols, mr.me.stbEntry.schemaRow.nCols, pReq->dnodeId);
872
            TSDB_CHECK_CODE(TSDB_CODE_FILE_CORRUPTED, lino, _exit0);
×
873
          }
874
          SMountStbInfo stbInfo = {
3,780✔
875
              .req.source = TD_REQ_FROM_APP,
876
              .req.suid = suid,
877
              .req.colVer = mr.me.stbEntry.schemaRow.version,
3,780✔
878
              .req.tagVer = mr.me.stbEntry.schemaTag.version,
3,780✔
879
              .req.numOfColumns = mr.me.stbEntry.schemaRow.nCols,
3,780✔
880
              .req.numOfTags = mr.me.stbEntry.schemaTag.nCols,
3,780✔
881
              .req.virtualStb = TABLE_IS_VIRTUAL(mr.me.flags) ? 1 : 0,
3,780✔
882
          };
883
          snprintf(stbInfo.req.name, sizeof(stbInfo.req.name), "%s", mr.me.name);
3,780✔
884
          if (!pCols && !(pCols = taosArrayInit(stbInfo.req.numOfColumns, sizeof(SFieldWithOptions)))) {
3,780✔
885
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
886
          }
887
          if (!pTags && !(pTags = taosArrayInit(stbInfo.req.numOfTags, sizeof(SField)))) {
3,780✔
888
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
889
          }
890

891
          if (!pColExts && !(pColExts = taosArrayInit(stbInfo.req.numOfColumns, sizeof(col_id_t)))) {
3,780✔
892
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
893
          }
894
          if (!pTagExts && !(pTagExts = taosArrayInit(stbInfo.req.numOfTags, sizeof(col_id_t)))) {
3,780✔
895
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
896
          }
897
          taosArrayClear(pCols);
3,780✔
898
          taosArrayClear(pTags);
3,780✔
899
          taosArrayClear(pColExts);
3,780✔
900
          taosArrayClear(pTagExts);
3,780✔
901
          stbInfo.req.pColumns = pCols;
3,780✔
902
          stbInfo.req.pTags = pTags;
3,780✔
903
          stbInfo.pColExts = pColExts;
3,780✔
904
          stbInfo.pTagExts = pTagExts;
3,780✔
905

906
          for (int32_t c = 0; c < stbInfo.req.numOfColumns; ++c) {
22,680✔
907
            SSchema          *pSchema = mr.me.stbEntry.schemaRow.pSchema + c;
18,900✔
908
            SColCmpr         *pColComp = mr.me.colCmpr.pColCmpr + c;
18,900✔
909
            SFieldWithOptions col = {
18,900✔
910
                .type = pSchema->type,
18,900✔
911
                .flags = pSchema->flags,
18,900✔
912
                .bytes = pSchema->bytes,
18,900✔
913
                .compress = pColComp->alg,
18,900✔
914
            };
915
            (void)snprintf(col.name, sizeof(col.name), "%s", pSchema->name);
18,900✔
916
            if (pSchema->colId != pColComp->id) {
18,900✔
917
              TSDB_CHECK_CODE(TSDB_CODE_FILE_CORRUPTED, lino, _exit0);
×
918
            }
919
            if (mr.me.pExtSchemas) {
18,900✔
920
              col.typeMod = (mr.me.pExtSchemas + c)->typeMod;
×
921
            }
922
            TSDB_CHECK_NULL(taosArrayPush(pCols, &col), code, lino, _exit0, terrno);
18,900✔
923
            TSDB_CHECK_NULL(taosArrayPush(pColExts, &pSchema->colId), code, lino, _exit0, terrno);
37,800✔
924
          }
925
          for (int32_t t = 0; t < stbInfo.req.numOfTags; ++t) {
8,820✔
926
            SSchema *pSchema = mr.me.stbEntry.schemaTag.pSchema + t;
5,040✔
927
            SField   tag = {
5,040✔
928
                  .type = pSchema->type,
5,040✔
929
                  .flags = pSchema->flags,
5,040✔
930
                  .bytes = pSchema->bytes,
5,040✔
931
            };
932
            (void)snprintf(tag.name, sizeof(tag.name), "%s", pSchema->name);
5,040✔
933
            TSDB_CHECK_NULL(taosArrayPush(pTags, &tag), code, lino, _exit0, terrno);
5,040✔
934
            TSDB_CHECK_NULL(taosArrayPush(pTagExts, &pSchema->colId), code, lino, _exit0, terrno);
10,080✔
935
          }
936
          tDecoderClear(&mr.coder);
3,780✔
937

938
          // serialize the SMountStbInfo
939
          int32_t firstPartLen = 0;
3,780✔
940
          int32_t msgLen = tSerializeSMountStbInfo(NULL, 0, &firstPartLen, &stbInfo);
3,780✔
941
          if (msgLen <= 0) {
3,780✔
942
            TSDB_CHECK_CODE(msgLen < 0 ? msgLen : TSDB_CODE_INTERNAL_ERROR, lino, _exit0);
×
943
          }
944
          void *pBuf = taosMemoryMalloc((sizeof(int32_t) << 1) + msgLen);  // totalLen(4)|1stPartLen(4)|1stPart|2ndPart
3,780✔
945
          if (!pBuf) TSDB_CHECK_CODE(TSDB_CODE_OUT_OF_MEMORY, lino, _exit0);
3,780✔
946
          *(int32_t *)pBuf = (sizeof(int32_t) << 1) + msgLen;
3,780✔
947
          *(int32_t *)POINTER_SHIFT(pBuf, sizeof(int32_t)) = firstPartLen;
3,780✔
948
          if (tSerializeSMountStbInfo(POINTER_SHIFT(pBuf, (sizeof(int32_t) << 1)), msgLen, NULL, &stbInfo) <= 0) {
3,780✔
949
            taosMemoryFree(pBuf);
×
950
            TSDB_CHECK_CODE(msgLen < 0 ? msgLen : TSDB_CODE_INTERNAL_ERROR, lino, _exit0);
×
951
          }
952
          if (!taosArrayPush(pDbInfo->pStbs, &pBuf)) {
7,560✔
953
            taosMemoryFree(pBuf);
×
954
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
955
          }
956
        }
957
      _exit0:
1,260✔
958
        metaReaderClear(&mr);
1,260✔
959
        metaClose(&vnode.pMeta);
1,260✔
960
        TAOS_CHECK_EXIT(code);
1,260✔
961
      }
962
      break;  // retrieve stbs from one vnode is enough
1,260✔
963
    }
964
  }
965
_exit:
630✔
966
  if (code != 0) {
630✔
967
    dError("mount:%s, failed to retrieve mount stbs at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
968
           pReq->dnodeId, tstrerror(code), path);
969
  }
970
  taosArrayDestroy(suidList);
630✔
971
  taosArrayDestroy(pCols);
630✔
972
  taosArrayDestroy(pTags);
630✔
973
  taosArrayDestroy(pColExts);
630✔
974
  taosArrayDestroy(pTagExts);
630✔
975
  TAOS_RETURN(code);
630✔
976
}
977

978
int32_t vmMountCheckRunning(const char *mountName, const char *mountPath, TdFilePtr *pFile, int32_t retryLimit) {
2,839✔
979
  int32_t code = 0, lino = 0;
2,839✔
980
  int32_t retryTimes = 0;
2,839✔
981
  char    filepath[PATH_MAX] = {0};
2,839✔
982
  (void)snprintf(filepath, sizeof(filepath), "%s%s.running", mountPath, TD_DIRSEP);
2,839✔
983
  TSDB_CHECK_NULL((*pFile = taosOpenFile(
2,839✔
984
                       filepath, TD_FILE_CREATE | TD_FILE_READ | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_CLOEXEC)),
985
                  code, lino, _exit, terrno);
986
  int32_t ret = 0;
2,839✔
987
  do {
988
    ret = taosLockFile(*pFile);
3,473✔
989
    if (ret == 0) break;
3,473✔
990
    taosMsleep(1000);
951✔
991
    ++retryTimes;
951✔
992
    dError("mount:%s, failed to lock file:%s since %s, retryTimes:%d", mountName, filepath, tstrerror(ret), retryTimes);
951✔
993
  } while (retryTimes < retryLimit);
951✔
994
  TAOS_CHECK_EXIT(ret);
2,839✔
995
_exit:
2,839✔
996
  if (code != 0) {
2,839✔
997
    (void)taosCloseFile(pFile);
317✔
998
    *pFile = NULL;
317✔
999
    dError("mount:%s, failed to check running at line %d since %s, path:%s", mountName, lino, tstrerror(code),
317✔
1000
           filepath);
1001
  }
1002
  TAOS_RETURN(code);
2,839✔
1003
}
1004

1005
static int32_t vmRetrieveMountPreCheck(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
1,898✔
1006
  int32_t code = 0, lino = 0;
1,898✔
1007
  char    path[TSDB_MOUNT_FPATH_LEN] = {0};
1,898✔
1008
  TSDB_CHECK_CONDITION(taosCheckAccessFile(pReq->mountPath, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
1,898✔
1009
  TAOS_CHECK_EXIT(vmMountCheckRunning(pReq->mountName, pReq->mountPath, &pMountInfo->pFile, 3));
1,581✔
1010
  (void)snprintf(path, sizeof(path), "%s%s%s%sdnode.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP);
1,264✔
1011
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
1,264✔
1012
  (void)snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(MNODE));
947✔
1013
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
947✔
1014
  (void)snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
947✔
1015
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
947✔
1016
  (void)snprintf(path, sizeof(path), "%s%s%s%sconfig%slocal.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP,
947✔
1017
           TD_DIRSEP);
1018
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
947✔
1019
_exit:
1,898✔
1020
  if (code != 0) {
1,898✔
1021
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
951✔
1022
           pReq->dnodeId, tstrerror(code), path);
1023
  }
1024
  TAOS_RETURN(code);
1,898✔
1025
}
1026

1027
static int32_t vmRetrieveMountPathImpl(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, SRetrieveMountPathReq *pReq,
1,898✔
1028
                                       SMountInfo *pMountInfo) {
1029
  int32_t code = 0, lino = 0;
1,898✔
1030
  pMountInfo->dnodeId = pReq->dnodeId;
1,898✔
1031
  pMountInfo->mountUid = pReq->mountUid;
1,898✔
1032
  (void)tsnprintf(pMountInfo->mountName, sizeof(pMountInfo->mountName), "%s", pReq->mountName);
1,898✔
1033
  (void)tsnprintf(pMountInfo->mountPath, sizeof(pMountInfo->mountPath), "%s", pReq->mountPath);
1,898✔
1034
  pMountInfo->ignoreExist = pReq->ignoreExist;
1,898✔
1035
  pMountInfo->valLen = pReq->valLen;
1,898✔
1036
  pMountInfo->pVal = pReq->pVal;
1,898✔
1037
  TAOS_CHECK_EXIT(vmRetrieveMountPreCheck(pMgmt, pReq, pMountInfo));
1,898✔
1038
  TAOS_CHECK_EXIT(vmRetrieveMountDnode(pMgmt, pReq, pMountInfo));
947✔
1039
  TAOS_CHECK_EXIT(vmRetrieveMountVnodes(pMgmt, pReq, pMountInfo));
947✔
1040
  TAOS_CHECK_EXIT(vmRetrieveMountStbs(pMgmt, pReq, pMountInfo));
630✔
1041
_exit:
630✔
1042
  if (code != 0) {
1,898✔
1043
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
1,268✔
1044
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
1045
  }
1046
  TAOS_RETURN(code);
1,898✔
1047
}
1048

1049
int32_t vmProcessRetrieveMountPathReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
1,898✔
1050
  int32_t               code = 0, lino = 0;
1,898✔
1051
  int32_t               rspCode = 0;
1,898✔
1052
  SVnodeMgmt            vndMgmt = {0};
1,898✔
1053
  SMountInfo            mountInfo = {0};
1,898✔
1054
  void                 *pBuf = NULL;
1,898✔
1055
  int32_t               bufLen = 0;
1,898✔
1056
  SRetrieveMountPathReq req = {0};
1,898✔
1057

1058
  vndMgmt = *pMgmt;
1,898✔
1059
  vndMgmt.path = NULL;
1,898✔
1060
  TAOS_CHECK_GOTO(tDeserializeSRetrieveMountPathReq(pMsg->pCont, pMsg->contLen, &req), &lino, _end);
1,898✔
1061
  dInfo("mount:%s, start to retrieve path:%s", req.mountName, req.mountPath);
1,898✔
1062
  TAOS_CHECK_GOTO(vmRetrieveMountPathImpl(&vndMgmt, pMsg, &req, &mountInfo), &lino, _end);
1,898✔
1063
_end:
1,898✔
1064
  TSDB_CHECK_CONDITION((bufLen = tSerializeSMountInfo(NULL, 0, &mountInfo)) >= 0, rspCode, lino, _exit, bufLen);
1,898✔
1065
  TSDB_CHECK_CONDITION((pBuf = rpcMallocCont(bufLen)), rspCode, lino, _exit, terrno);
1,898✔
1066
  TSDB_CHECK_CONDITION((bufLen = tSerializeSMountInfo(pBuf, bufLen, &mountInfo)) >= 0, rspCode, lino, _exit, bufLen);
1,898✔
1067
  pMsg->info.rsp = pBuf;
1,898✔
1068
  pMsg->info.rspLen = bufLen;
1,898✔
1069
_exit:
1,898✔
1070
  if (rspCode != 0) {
1,898✔
1071
    // corner case: if occurs, the client will not receive the response, and the client should be killed manually
1072
    dError("mount:%s, failed to retrieve mount at line %d since %s, dnode:%d, path:%s", req.mountName, lino,
×
1073
           tstrerror(rspCode), req.dnodeId, req.mountPath);
1074
    rpcFreeCont(pBuf);
×
1075
    code = rspCode;
×
1076
  } else if (code != 0) {
1,898✔
1077
    // the client would receive the response with error msg
1078
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", req.mountName, lino,
1,268✔
1079
           req.dnodeId, tstrerror(code), req.mountPath);
1080
  } else {
1081
    int32_t nVgs = 0;
630✔
1082
    int32_t nDbs = taosArrayGetSize(mountInfo.pDbs);
630✔
1083
    for (int32_t i = 0; i < nDbs; ++i) {
1,890✔
1084
      SMountDbInfo *pDb = TARRAY_GET_ELEM(mountInfo.pDbs, i);
1,260✔
1085
      nVgs += taosArrayGetSize(pDb->pVgs);
1,260✔
1086
    }
1087
    dInfo("mount:%s, success to retrieve mount, nDbs:%d, nVgs:%d, path:%s", req.mountName, nDbs, nVgs, req.mountPath);
630✔
1088
  }
1089
  taosMemFreeClear(vndMgmt.path);
1,898✔
1090
  tFreeMountInfo(&mountInfo, false);
1,898✔
1091
  TAOS_RETURN(code);
1,898✔
1092
}
1093

1094
static int32_t vmMountVnode(SVnodeMgmt *pMgmt, const char *path, SVnodeCfg *pCfg, int32_t diskPrimary,
2,520✔
1095
                            SMountVnodeReq *req, STfs *pMountTfs) {
1096
  int32_t    code = 0;
2,520✔
1097
  SVnodeInfo info = {0};
2,520✔
1098
  char       hostDir[TSDB_FILENAME_LEN] = {0};
2,520✔
1099
  char       mountDir[TSDB_FILENAME_LEN] = {0};
2,520✔
1100
  char       mountVnode[32] = {0};
2,520✔
1101

1102
  if ((code = vnodeCheckCfg(pCfg)) < 0) {
2,520✔
1103
    vError("vgId:%d, mount:%s, failed to mount vnode since:%s", pCfg->vgId, req->mountName, tstrerror(code));
×
1104
    return code;
×
1105
  }
1106

1107
  vnodeGetPrimaryDir(path, 0, pMgmt->pTfs, hostDir, TSDB_FILENAME_LEN);
2,520✔
1108
  if ((code = taosMkDir(hostDir))) {
2,520✔
1109
    vError("vgId:%d, mount:%s, failed to prepare vnode dir since %s, host path: %s", pCfg->vgId, req->mountName,
×
1110
           tstrerror(code), hostDir);
1111
    return code;
×
1112
  }
1113

1114
  info.config = *pCfg;  // copy the config
2,520✔
1115
  info.state.committed = req->committed;
2,520✔
1116
  info.state.commitID = req->commitID;
2,520✔
1117
  info.state.commitTerm = req->commitTerm;
2,520✔
1118
  info.state.applied = req->committed;
2,520✔
1119
  info.state.applyTerm = req->commitTerm;
2,520✔
1120
  info.config.vndStats.numOfSTables = req->numOfSTables;
2,520✔
1121
  info.config.vndStats.numOfCTables = req->numOfCTables;
2,520✔
1122
  info.config.vndStats.numOfNTables = req->numOfNTables;
2,520✔
1123

1124
  SVnodeInfo oldInfo = {0};
2,520✔
1125
  oldInfo.config = vnodeCfgDefault;
2,520✔
1126
  if (vnodeLoadInfo(hostDir, &oldInfo) == 0) {
2,520✔
1127
    if (oldInfo.config.dbId != info.config.dbId) {
×
1128
      code = TSDB_CODE_VND_ALREADY_EXIST_BUT_NOT_MATCH;
×
1129
      vError("vgId:%d, mount:%s, vnode config info already exists at %s. oldDbId:%" PRId64 "(%s) at cluster:%" PRId64
×
1130
             ", newDbId:%" PRId64 "(%s) at cluser:%" PRId64 ", code:%s",
1131
             oldInfo.config.vgId, req->mountName, hostDir, oldInfo.config.dbId, oldInfo.config.dbname,
1132
             oldInfo.config.syncCfg.nodeInfo[oldInfo.config.syncCfg.myIndex].clusterId, info.config.dbId,
1133
             info.config.dbname, info.config.syncCfg.nodeInfo[info.config.syncCfg.myIndex].clusterId, tstrerror(code));
1134

1135
    } else {
1136
      vWarn("vgId:%d, mount:%s, vnode config info already exists at %s.", oldInfo.config.vgId, req->mountName, hostDir);
×
1137
    }
1138
    return code;
×
1139
  }
1140

1141
  char hostSubDir[TSDB_FILENAME_LEN] = {0};
2,520✔
1142
  char mountSubDir[TSDB_FILENAME_LEN] = {0};
2,520✔
1143
  (void)snprintf(mountVnode, sizeof(mountVnode), "vnode%svnode%d", TD_DIRSEP, req->mountVgId);
2,520✔
1144
  vnodeGetPrimaryDir(mountVnode, diskPrimary, pMountTfs, mountDir, TSDB_FILENAME_LEN);
2,520✔
1145
  static const char *vndSubDirs[] = {"meta", "sync", "tq", "tsdb", "wal"};
1146
  for (int32_t i = 0; i < tListLen(vndSubDirs); ++i) {
15,120✔
1147
    (void)snprintf(hostSubDir, sizeof(hostSubDir), "%s%s%s", hostDir, TD_DIRSEP, vndSubDirs[i]);
12,600✔
1148
    (void)snprintf(mountSubDir, sizeof(mountSubDir), "%s%s%s", mountDir, TD_DIRSEP, vndSubDirs[i]);
12,600✔
1149
    if ((code = taosSymLink(mountSubDir, hostSubDir)) != 0) {
12,600✔
1150
      vError("vgId:%d, mount:%s, failed to create vnode symlink %s -> %s since %s", info.config.vgId, req->mountName,
×
1151
             mountSubDir, hostSubDir, tstrerror(code));
1152
      return code;
×
1153
    }
1154
  }
1155
  vInfo("vgId:%d, mount:save vnode config while create", info.config.vgId);
2,520✔
1156
  if ((code = vnodeSaveInfo(hostDir, &info)) < 0 || (code = vnodeCommitInfo(hostDir)) < 0) {
2,520✔
1157
    vError("vgId:%d, mount:%s, failed to save vnode config since %s, mount path: %s", pCfg ? pCfg->vgId : 0,
×
1158
           req->mountName, tstrerror(code), hostDir);
1159
    return code;
×
1160
  }
1161
  vInfo("vgId:%d, mount:%s, vnode is mounted from %s to %s", info.config.vgId, req->mountName, mountDir, hostDir);
2,520✔
1162
  return 0;
2,520✔
1163
}
1164

1165
int32_t vmProcessMountVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
2,520✔
1166
  int32_t          code = 0, lino = 0;
2,520✔
1167
  SMountVnodeReq   req = {0};
2,520✔
1168
  SCreateVnodeReq *pCreateReq = &req.createReq;
2,520✔
1169
  SVnodeCfg        vnodeCfg = {0};
2,520✔
1170
  SWrapperCfg      wrapperCfg = {0};
2,520✔
1171
  SVnode          *pImpl = NULL;
2,520✔
1172
  STfs            *pMountTfs = NULL;
2,520✔
1173
  char             path[TSDB_FILENAME_LEN] = {0};
2,520✔
1174
  bool             releaseTfs = false;
2,520✔
1175

1176
  if (tDeserializeSMountVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
2,520✔
1177
    dError("vgId:%d, failed to mount vnode since deserialize request error", pCreateReq->vgId);
×
1178
    return TSDB_CODE_INVALID_MSG;
×
1179
  }
1180

1181
  if (pCreateReq->learnerReplica == 0) {
2,520✔
1182
    pCreateReq->learnerSelfIndex = -1;
2,520✔
1183
  }
1184
  for (int32_t i = 0; i < pCreateReq->replica; ++i) {
5,040✔
1185
    dInfo("mount:%s, vgId:%d, replica:%d ep:%s:%u dnode:%d", req.mountName, pCreateReq->vgId, i,
2,520✔
1186
          pCreateReq->replicas[i].fqdn, pCreateReq->replicas[i].port, pCreateReq->replicas[i].id);
1187
  }
1188
  for (int32_t i = 0; i < pCreateReq->learnerReplica; ++i) {
2,520✔
1189
    dInfo("mount:%s, vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.mountName, pCreateReq->vgId, i,
×
1190
          pCreateReq->learnerReplicas[i].fqdn, pCreateReq->learnerReplicas[i].port, pCreateReq->replicas[i].id);
1191
  }
1192

1193
  SReplica *pReplica = NULL;
2,520✔
1194
  if (pCreateReq->selfIndex != -1) {
2,520✔
1195
    pReplica = &pCreateReq->replicas[pCreateReq->selfIndex];
2,520✔
1196
  } else {
1197
    pReplica = &pCreateReq->learnerReplicas[pCreateReq->learnerSelfIndex];
×
1198
  }
1199
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
2,520✔
1200
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
2,520✔
1201
    (void)tFreeSMountVnodeReq(&req);
×
1202
    code = TSDB_CODE_INVALID_MSG;
×
1203
    dError("mount:%s, vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode, reason:%s", req.mountName,
×
1204
           pCreateReq->vgId, pReplica->id, pReplica->fqdn, pReplica->port, tstrerror(code));
1205
    return code;
×
1206
  }
1207
  vmGenerateVnodeCfg(pCreateReq, &vnodeCfg);
2,520✔
1208
  vnodeCfg.mountVgId = req.mountVgId;
2,520✔
1209
  vmGenerateWrapperCfg(pMgmt, pCreateReq, &wrapperCfg);
2,520✔
1210
  wrapperCfg.mountId = req.mountId;
2,520✔
1211

1212
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, pCreateReq->vgId, false);
2,520✔
1213
  if (pVnode != NULL && (pCreateReq->replica == 1 || !pVnode->failed)) {
2,520✔
1214
    dError("mount:%s, vgId:%d, already exist", req.mountName, pCreateReq->vgId);
×
1215
    (void)tFreeSMountVnodeReq(&req);
×
1216
    vmReleaseVnode(pMgmt, pVnode);
×
1217
    code = TSDB_CODE_VND_ALREADY_EXIST;
×
1218
    return 0;
×
1219
  }
1220
  vmReleaseVnode(pMgmt, pVnode);
2,520✔
1221

1222
  wrapperCfg.diskPrimary = req.diskPrimary;
2,520✔
1223
  (void)snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
2,520✔
1224
  TAOS_CHECK_EXIT(vmAcquireMountTfs(pMgmt, req.mountId, req.mountName, req.mountPath, &pMountTfs));
2,520✔
1225
  releaseTfs = true;
2,520✔
1226

1227
  TAOS_CHECK_EXIT(vmMountVnode(pMgmt, path, &vnodeCfg, wrapperCfg.diskPrimary, &req, pMountTfs));
2,520✔
1228
  if (!(pImpl = vnodeOpen(path, 0, pMgmt->pTfs, pMountTfs, pMgmt->msgCb, true))) {
2,520✔
1229
    TAOS_CHECK_EXIT(terrno != 0 ? terrno : -1);
×
1230
  }
1231
  if ((code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl)) != 0) {
2,520✔
1232
    TAOS_CHECK_EXIT(terrno != 0 ? terrno : code);
×
1233
  }
1234
  TAOS_CHECK_EXIT(vnodeStart(pImpl));
2,520✔
1235
  TAOS_CHECK_EXIT(vmWriteVnodeListToFile(pMgmt));
2,520✔
1236
  TAOS_CHECK_EXIT(vmWriteMountListToFile(pMgmt));
2,520✔
1237
_exit:
2,520✔
1238
  vmCleanPrimaryDisk(pMgmt, pCreateReq->vgId);
2,520✔
1239
  if (code != 0) {
2,520✔
1240
    dError("mount:%s, vgId:%d, msgType:%s, failed at line %d to mount vnode since %s", req.mountName, pCreateReq->vgId,
×
1241
           TMSG_INFO(pMsg->msgType), lino, tstrerror(code));
1242
    vmCloseFailedVnode(pMgmt, pCreateReq->vgId);
×
1243
    vnodeClose(pImpl);
×
1244
    vnodeDestroy(0, path, pMgmt->pTfs, 0);
×
1245
    if (releaseTfs) vmReleaseMountTfs(pMgmt, req.mountId, 1);
×
1246
  } else {
1247
    dInfo("mount:%s, vgId:%d, msgType:%s, success to mount vnode", req.mountName, pCreateReq->vgId,
2,520✔
1248
          TMSG_INFO(pMsg->msgType));
1249
  }
1250

1251
  pMsg->code = code;
2,520✔
1252
  pMsg->info.rsp = NULL;
2,520✔
1253
  pMsg->info.rspLen = 0;
2,520✔
1254

1255
  (void)tFreeSMountVnodeReq(&req);
2,520✔
1256
  TAOS_RETURN(code);
2,520✔
1257
}
1258
#endif  // USE_MOUNT
1259

1260
// alter replica doesn't use this, but restore dnode still use this
1261
int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
1,964,681✔
1262
  SAlterVnodeTypeReq req = {0};
1,964,681✔
1263
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
1,964,681✔
1264
    terrno = TSDB_CODE_INVALID_MSG;
×
1265
    return -1;
×
1266
  }
1267

1268
  if (req.learnerReplicas == 0) {
1269
    req.learnerSelfIndex = -1;
1270
  }
1271

1272
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process alter-node-type-request", req.vgId,
1,964,681✔
1273
        TMSG_INFO(pMsg->msgType));
1274

1275
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
1,964,681✔
1276
  if (pVnode == NULL) {
1,964,681✔
1277
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
1278
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1279
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1280
    return -1;
×
1281
  }
1282

1283
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
1,964,681✔
1284
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
1,964,681✔
1285
  if (role == TAOS_SYNC_ROLE_VOTER) {
1,964,681✔
1286
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
1287
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
1288
    vmReleaseVnode(pMgmt, pVnode);
×
1289
    return -1;
×
1290
  }
1291

1292
  dInfo("vgId:%d, checking node catch up", req.vgId);
1,964,681✔
1293
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
1,964,681✔
1294
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
1,874,109✔
1295
    vmReleaseVnode(pMgmt, pVnode);
1,874,109✔
1296
    return -1;
1,874,109✔
1297
  }
1298

1299
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
90,572✔
1300

1301
  int32_t vgId = req.vgId;
90,572✔
1302
  dInfo("vgId:%d, start to alter vnode type replica:%d selfIndex:%d strict:%d changeVersion:%d", vgId, req.replica,
90,572✔
1303
        req.selfIndex, req.strict, req.changeVersion);
1304
  for (int32_t i = 0; i < req.replica; ++i) {
361,896✔
1305
    SReplica *pReplica = &req.replicas[i];
271,324✔
1306
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
271,324✔
1307
  }
1308
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
90,572✔
1309
    SReplica *pReplica = &req.learnerReplicas[i];
×
1310
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
×
1311
  }
1312

1313
  if (req.replica <= 0 || (req.selfIndex < 0 && req.learnerSelfIndex < 0) || req.selfIndex >= req.replica ||
90,572✔
1314
      req.learnerSelfIndex >= req.learnerReplica) {
90,572✔
1315
    terrno = TSDB_CODE_INVALID_MSG;
×
1316
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
1317
    vmReleaseVnode(pMgmt, pVnode);
×
1318
    return -1;
×
1319
  }
1320

1321
  SReplica *pReplica = NULL;
90,572✔
1322
  if (req.selfIndex != -1) {
90,572✔
1323
    pReplica = &req.replicas[req.selfIndex];
90,572✔
1324
  } else {
1325
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
×
1326
  }
1327

1328
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
90,572✔
1329
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
90,572✔
1330
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1331
    dError("vgId:%d, dnodeId:%d ep:%s:%u in request, ep:%s:%u in local, %s", vgId, pReplica->id, pReplica->fqdn,
×
1332
           pReplica->port, tsLocalFqdn, tsServerPort, tstrerror(terrno));
1333
    vmReleaseVnode(pMgmt, pVnode);
×
1334
    return -1;
×
1335
  }
1336

1337
  dInfo("vgId:%d, start to close vnode", vgId);
90,572✔
1338
  SWrapperCfg wrapperCfg = {
90,572✔
1339
      .dropped = pVnode->dropped,
90,572✔
1340
      .vgId = pVnode->vgId,
90,572✔
1341
      .vgVersion = pVnode->vgVersion,
90,572✔
1342
      .diskPrimary = pVnode->diskPrimary,
90,572✔
1343
  };
1344
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
90,572✔
1345

1346
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
90,572✔
1347
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
90,572✔
1348

1349
  int32_t diskPrimary = wrapperCfg.diskPrimary;
90,572✔
1350
  char    path[TSDB_FILENAME_LEN] = {0};
90,572✔
1351
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
90,572✔
1352

1353
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
90,572✔
1354
  if (vnodeAlterReplica(path, &req, diskPrimary, pMgmt->pTfs) < 0) {
90,572✔
1355
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
1356
    return -1;
×
1357
  }
1358

1359
  dInfo("vgId:%d, begin to open vnode", vgId);
90,572✔
1360
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
90,572✔
1361
  if (pImpl == NULL) {
90,572✔
1362
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
1363
    return -1;
×
1364
  }
1365

1366
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
90,572✔
1367
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
1368
    return -1;
×
1369
  }
1370

1371
  if (vnodeStart(pImpl) != 0) {
90,572✔
1372
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
1373
    return -1;
×
1374
  }
1375

1376
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process alter-node-type-request, vnode config is altered",
90,572✔
1377
        req.vgId, TMSG_INFO(pMsg->msgType));
1378
  return 0;
90,572✔
1379
}
1380

1381
int32_t vmProcessCheckLearnCatchupReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
1382
  SCheckLearnCatchupReq req = {0};
×
1383
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
×
1384
    terrno = TSDB_CODE_INVALID_MSG;
×
1385
    return -1;
×
1386
  }
1387

1388
  if (req.learnerReplicas == 0) {
1389
    req.learnerSelfIndex = -1;
1390
  }
1391

1392
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process check-learner-catchup-request", req.vgId,
×
1393
        TMSG_INFO(pMsg->msgType));
1394

1395
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
×
1396
  if (pVnode == NULL) {
×
1397
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
1398
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1399
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1400
    return -1;
×
1401
  }
1402

1403
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
×
1404
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
×
1405
  if (role == TAOS_SYNC_ROLE_VOTER) {
×
1406
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
1407
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
1408
    vmReleaseVnode(pMgmt, pVnode);
×
1409
    return -1;
×
1410
  }
1411

1412
  dInfo("vgId:%d, checking node catch up", req.vgId);
×
1413
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
×
1414
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
×
1415
    vmReleaseVnode(pMgmt, pVnode);
×
1416
    return -1;
×
1417
  }
1418

1419
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
×
1420

1421
  vmReleaseVnode(pMgmt, pVnode);
×
1422

1423
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process check-learner-catchup-request", req.vgId,
×
1424
        TMSG_INFO(pMsg->msgType));
1425

1426
  return 0;
×
1427
}
1428

1429
int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
20,911✔
1430
  SDisableVnodeWriteReq req = {0};
20,911✔
1431
  if (tDeserializeSDisableVnodeWriteReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
20,911✔
1432
    terrno = TSDB_CODE_INVALID_MSG;
×
1433
    return -1;
×
1434
  }
1435

1436
  dInfo("vgId:%d, vnode write disable:%d", req.vgId, req.disable);
20,911✔
1437

1438
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
20,911✔
1439
  if (pVnode == NULL) {
20,911✔
1440
    dError("vgId:%d, failed to disable write since %s", req.vgId, terrstr());
×
1441
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1442
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1443
    return -1;
×
1444
  }
1445

1446
  pVnode->disable = req.disable;
20,911✔
1447
  vmReleaseVnode(pMgmt, pVnode);
20,911✔
1448
  return 0;
20,911✔
1449
}
1450

1451
int32_t vmProcessSetKeepVersionReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
3,538✔
1452
  SMsgHead *pHead = pMsg->pCont;
3,538✔
1453
  pHead->contLen = ntohl(pHead->contLen);
3,538✔
1454
  pHead->vgId = ntohl(pHead->vgId);
3,538✔
1455

1456
  SVndSetKeepVersionReq req = {0};
3,538✔
1457
  if (tDeserializeSVndSetKeepVersionReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead),
3,538✔
1458
                                        &req) != 0) {
1459
    terrno = TSDB_CODE_INVALID_MSG;
×
1460
    return -1;
×
1461
  }
1462

1463
  dInfo("vgId:%d, set wal keep version to %" PRId64, pHead->vgId, req.keepVersion);
3,538✔
1464

1465
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
3,538✔
1466
  if (pVnode == NULL) {
3,538✔
1467
    dError("vgId:%d, failed to set keep version since %s", pHead->vgId, terrstr());
×
1468
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1469
    return -1;
×
1470
  }
1471

1472
  // Directly call vnodeSetWalKeepVersion for immediate effect (< 1ms)
1473
  // This bypasses Raft to avoid timing issues where WAL might be deleted
1474
  // before keepVersion is set through the Raft consensus process
1475
  int32_t code = vnodeSetWalKeepVersion(pVnode->pImpl, req.keepVersion);
3,538✔
1476
  if (code != TSDB_CODE_SUCCESS) {
3,538✔
1477
    dError("vgId:%d, failed to set keepVersion to %" PRId64 " since %s", pHead->vgId, req.keepVersion, tstrerror(code));
×
1478
    terrno = code;
×
1479
    vmReleaseVnode(pMgmt, pVnode);
×
1480
    return -1;
×
1481
  }
1482

1483
  dInfo("vgId:%d, successfully set keepVersion to %" PRId64, pHead->vgId, req.keepVersion);
3,538✔
1484

1485
  vmReleaseVnode(pMgmt, pVnode);
3,538✔
1486
  return 0;
3,538✔
1487
}
1488

1489
int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
20,178✔
1490
  SAlterVnodeHashRangeReq req = {0};
20,178✔
1491
  if (tDeserializeSAlterVnodeHashRangeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
20,178✔
1492
    terrno = TSDB_CODE_INVALID_MSG;
×
1493
    return -1;
×
1494
  }
1495

1496
  int32_t srcVgId = req.srcVgId;
20,178✔
1497
  int32_t dstVgId = req.dstVgId;
20,178✔
1498

1499
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, dstVgId);
20,178✔
1500
  if (pVnode != NULL) {
20,178✔
1501
    dError("vgId:%d, vnode already exist", dstVgId);
×
1502
    vmReleaseVnode(pMgmt, pVnode);
×
1503
    terrno = TSDB_CODE_VND_ALREADY_EXIST;
×
1504
    return -1;
×
1505
  }
1506

1507
  dInfo("vgId:%d, start to alter vnode hashrange:[%u, %u], dstVgId:%d", req.srcVgId, req.hashBegin, req.hashEnd,
20,178✔
1508
        req.dstVgId);
1509
  pVnode = vmAcquireVnode(pMgmt, srcVgId);
20,178✔
1510
  if (pVnode == NULL) {
20,178✔
1511
    dError("vgId:%d, failed to alter hashrange since %s", srcVgId, terrstr());
×
1512
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1513
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1514
    return -1;
×
1515
  }
1516

1517
  SWrapperCfg wrapperCfg = {
20,178✔
1518
      .dropped = pVnode->dropped,
20,178✔
1519
      .vgId = dstVgId,
1520
      .vgVersion = pVnode->vgVersion,
20,178✔
1521
      .diskPrimary = pVnode->diskPrimary,
20,178✔
1522
  };
1523
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
20,178✔
1524

1525
  // prepare alter
1526
  pVnode->toVgId = dstVgId;
20,178✔
1527
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
20,178✔
1528
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
1529
    return -1;
×
1530
  }
1531

1532
  dInfo("vgId:%d, close vnode", srcVgId);
20,178✔
1533
  vmCloseVnode(pMgmt, pVnode, true, false);
20,178✔
1534

1535
  int32_t diskPrimary = wrapperCfg.diskPrimary;
20,178✔
1536
  char    srcPath[TSDB_FILENAME_LEN] = {0};
20,178✔
1537
  char    dstPath[TSDB_FILENAME_LEN] = {0};
20,178✔
1538
  snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
20,178✔
1539
  snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
20,178✔
1540

1541
  dInfo("vgId:%d, alter vnode hashrange at %s", srcVgId, srcPath);
20,178✔
1542
  if (vnodeAlterHashRange(srcPath, dstPath, &req, diskPrimary, pMgmt->pTfs) < 0) {
20,178✔
1543
    dError("vgId:%d, failed to alter vnode hashrange since %s", srcVgId, terrstr());
×
1544
    return -1;
×
1545
  }
1546

1547
  dInfo("vgId:%d, open vnode", dstVgId);
20,178✔
1548
  SVnode *pImpl = vnodeOpen(dstPath, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
20,178✔
1549

1550
  if (pImpl == NULL) {
20,178✔
1551
    dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr());
×
1552
    return -1;
×
1553
  }
1554

1555
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
20,178✔
1556
    dError("vgId:%d, failed to open vnode mgmt since %s", dstVgId, terrstr());
×
1557
    return -1;
×
1558
  }
1559

1560
  if (vnodeStart(pImpl) != 0) {
20,178✔
1561
    dError("vgId:%d, failed to start sync since %s", dstVgId, terrstr());
×
1562
    return -1;
×
1563
  }
1564

1565
  // complete alter
1566
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
20,178✔
1567
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
1568
    return -1;
×
1569
  }
1570

1571
  dInfo("vgId:%d, vnode hashrange is altered", dstVgId);
20,178✔
1572
  return 0;
20,178✔
1573
}
1574

1575
int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
626,297✔
1576
  SAlterVnodeReplicaReq alterReq = {0};
626,297✔
1577
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
626,297✔
1578
    terrno = TSDB_CODE_INVALID_MSG;
×
1579
    return -1;
×
1580
  }
1581

1582
  if (alterReq.learnerReplica == 0) {
626,297✔
1583
    alterReq.learnerSelfIndex = -1;
449,465✔
1584
  }
1585

1586
  int32_t vgId = alterReq.vgId;
626,297✔
1587
  dInfo(
626,297✔
1588
      "vgId:%d, vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
1589
      "learnerSelfIndex:%d strict:%d changeVersion:%d",
1590
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
1591
      alterReq.learnerSelfIndex, alterReq.strict, alterReq.changeVersion);
1592

1593
  for (int32_t i = 0; i < alterReq.replica; ++i) {
2,431,868✔
1594
    SReplica *pReplica = &alterReq.replicas[i];
1,805,571✔
1595
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
1,805,571✔
1596
  }
1597
  for (int32_t i = 0; i < alterReq.learnerReplica; ++i) {
803,129✔
1598
    SReplica *pReplica = &alterReq.learnerReplicas[i];
176,832✔
1599
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
176,832✔
1600
  }
1601

1602
  if (alterReq.replica <= 0 || (alterReq.selfIndex < 0 && alterReq.learnerSelfIndex < 0) ||
626,297✔
1603
      alterReq.selfIndex >= alterReq.replica || alterReq.learnerSelfIndex >= alterReq.learnerReplica) {
626,297✔
1604
    terrno = TSDB_CODE_INVALID_MSG;
×
1605
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
1606
    return -1;
×
1607
  }
1608

1609
  SReplica *pReplica = NULL;
626,297✔
1610
  if (alterReq.selfIndex != -1) {
626,297✔
1611
    pReplica = &alterReq.replicas[alterReq.selfIndex];
626,297✔
1612
  } else {
1613
    pReplica = &alterReq.learnerReplicas[alterReq.learnerSelfIndex];
×
1614
  }
1615

1616
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
626,297✔
1617
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
626,297✔
1618
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1619
    dError("vgId:%d, dnodeId:%d ep:%s:%u in request, ep:%s:%u in lcoal, %s", vgId, pReplica->id, pReplica->fqdn,
×
1620
           pReplica->port, tsLocalFqdn, tsServerPort, tstrerror(terrno));
1621
    return -1;
×
1622
  }
1623

1624
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
626,297✔
1625
  if (pVnode == NULL) {
626,297✔
1626
    dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
×
1627
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1628
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1629
    return -1;
×
1630
  }
1631

1632
  dInfo("vgId:%d, start to close vnode", vgId);
626,297✔
1633
  SWrapperCfg wrapperCfg = {
626,297✔
1634
      .dropped = pVnode->dropped,
626,297✔
1635
      .vgId = pVnode->vgId,
626,297✔
1636
      .vgVersion = pVnode->vgVersion,
626,297✔
1637
      .diskPrimary = pVnode->diskPrimary,
626,297✔
1638
  };
1639
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
626,297✔
1640

1641
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
626,297✔
1642
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
626,297✔
1643

1644
  int32_t diskPrimary = wrapperCfg.diskPrimary;
626,297✔
1645
  char    path[TSDB_FILENAME_LEN] = {0};
626,297✔
1646
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
626,297✔
1647

1648
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
626,297✔
1649
  if (vnodeAlterReplica(path, &alterReq, diskPrimary, pMgmt->pTfs) < 0) {
626,297✔
1650
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
1651
    return -1;
×
1652
  }
1653

1654
  dInfo("vgId:%d, begin to open vnode", vgId);
626,297✔
1655
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
626,297✔
1656
  if (pImpl == NULL) {
626,297✔
1657
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
1658
    return -1;
×
1659
  }
1660

1661
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
626,297✔
1662
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
1663
    return -1;
×
1664
  }
1665

1666
  if (vnodeStart(pImpl) != 0) {
626,297✔
1667
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
1668
    return -1;
×
1669
  }
1670

1671
  dInfo(
626,297✔
1672
      "vgId:%d, vnode management handle msgType:%s, end to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
1673
      "learnerSelfIndex:%d strict:%d",
1674
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
1675
      alterReq.learnerSelfIndex, alterReq.strict);
1676
  return 0;
626,297✔
1677
}
1678

1679
int32_t vmProcessAlterVnodeElectBaselineReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
22,893✔
1680
  SAlterVnodeElectBaselineReq alterReq = {0};
22,893✔
1681
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
22,893✔
1682
    return TSDB_CODE_INVALID_MSG;
×
1683
  }
1684

1685
  int32_t vgId = alterReq.vgId;
22,893✔
1686
  dInfo(
22,893✔
1687
      "vgId:%d, process alter vnode elect-base-line msgType:%s, electBaseLine:%d",
1688
      vgId, TMSG_INFO(pMsg->msgType), alterReq.electBaseLine);
1689

1690
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
22,893✔
1691
  if (pVnode == NULL) {
22,893✔
1692
    dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
×
1693
    return terrno;
×
1694
  }
1695

1696
  if(vnodeSetElectBaseline(pVnode->pImpl, alterReq.electBaseLine) != 0){
22,893✔
1697
    vmReleaseVnode(pMgmt, pVnode);
×
1698
    return -1;
×
1699
  }
1700

1701
  vmReleaseVnode(pMgmt, pVnode);
22,893✔
1702
  return 0;
22,893✔
1703
}
1704

1705
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
1,588,999✔
1706
  int32_t       code = 0;
1,588,999✔
1707
  SDropVnodeReq dropReq = {0};
1,588,999✔
1708
  if (tDeserializeSDropVnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
1,588,999✔
1709
    terrno = TSDB_CODE_INVALID_MSG;
×
1710
    return terrno;
×
1711
  }
1712

1713
  int32_t vgId = dropReq.vgId;
1,588,999✔
1714
  dInfo("vgId:%d, start to drop vnode", vgId);
1,588,999✔
1715

1716
  if (dropReq.dnodeId != pMgmt->pData->dnodeId) {
1,588,999✔
1717
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1718
    dError("vgId:%d, dnodeId:%d, %s", dropReq.vgId, dropReq.dnodeId, tstrerror(terrno));
×
1719
    return terrno;
×
1720
  }
1721

1722
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, vgId, false);
1,588,999✔
1723
  if (pVnode == NULL) {
1,588,999✔
1724
    dInfo("vgId:%d, failed to drop since %s", vgId, terrstr());
×
1725
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1726
    return terrno;
×
1727
  }
1728

1729
  pVnode->dropped = 1;
1,588,999✔
1730
  if ((code = vmWriteVnodeListToFile(pMgmt)) != 0) {
1,588,999✔
1731
    pVnode->dropped = 0;
×
1732
    vmReleaseVnode(pMgmt, pVnode);
×
1733
    return code;
×
1734
  }
1735

1736
  vmCloseVnode(pMgmt, pVnode, false, false);
1,588,999✔
1737
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
1,588,999✔
1738
    dError("vgId:%d, failed to write vnode list since %s", vgId, terrstr());
×
1739
  }
1740

1741
  dInfo("vgId:%d, is dropped", vgId);
1,588,999✔
1742
  return 0;
1,588,999✔
1743
}
1744

1745
int32_t vmProcessArbHeartBeatReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
106,485✔
1746
  SVArbHeartBeatReq arbHbReq = {0};
106,485✔
1747
  SVArbHeartBeatRsp arbHbRsp = {0};
106,485✔
1748
  if (tDeserializeSVArbHeartBeatReq(pMsg->pCont, pMsg->contLen, &arbHbReq) != 0) {
106,485✔
1749
    terrno = TSDB_CODE_INVALID_MSG;
×
1750
    return -1;
×
1751
  }
1752

1753
  if (arbHbReq.dnodeId != pMgmt->pData->dnodeId) {
106,485✔
1754
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1755
    dError("dnodeId:%d, %s", arbHbReq.dnodeId, tstrerror(terrno));
×
1756
    goto _OVER;
×
1757
  }
1758

1759
  if (strlen(arbHbReq.arbToken) == 0) {
106,485✔
1760
    terrno = TSDB_CODE_INVALID_MSG;
×
1761
    dError("dnodeId:%d arbToken is empty", arbHbReq.dnodeId);
×
1762
    goto _OVER;
×
1763
  }
1764

1765
  size_t size = taosArrayGetSize(arbHbReq.hbMembers);
106,485✔
1766

1767
  arbHbRsp.dnodeId = pMgmt->pData->dnodeId;
106,485✔
1768
  tstrncpy(arbHbRsp.arbToken, arbHbReq.arbToken, TSDB_ARB_TOKEN_SIZE);
106,485✔
1769
  arbHbRsp.hbMembers = taosArrayInit(size, sizeof(SVArbHbRspMember));
106,485✔
1770
  if (arbHbRsp.hbMembers == NULL) {
106,485✔
1771
    goto _OVER;
×
1772
  }
1773

1774
  for (int32_t i = 0; i < size; i++) {
226,380✔
1775
    SVArbHbReqMember *pReqMember = taosArrayGet(arbHbReq.hbMembers, i);
119,895✔
1776
    SVnodeObj        *pVnode = vmAcquireVnode(pMgmt, pReqMember->vgId);
119,895✔
1777
    if (pVnode == NULL) {
119,895✔
1778
      dError("dnodeId:%d vgId:%d not found failed to process arb hb req", arbHbReq.dnodeId, pReqMember->vgId);
25,247✔
1779
      continue;
25,247✔
1780
    }
1781

1782
    SVArbHbRspMember rspMember = {0};
94,648✔
1783
    rspMember.vgId = pReqMember->vgId;
94,648✔
1784
    rspMember.hbSeq = pReqMember->hbSeq;
94,648✔
1785
    if (vnodeGetArbToken(pVnode->pImpl, rspMember.memberToken) != 0) {
94,648✔
1786
      dError("dnodeId:%d vgId:%d failed to get arb token", arbHbReq.dnodeId, pReqMember->vgId);
×
1787
      vmReleaseVnode(pMgmt, pVnode);
×
1788
      continue;
×
1789
    }
1790

1791
    if (vnodeUpdateArbTerm(pVnode->pImpl, arbHbReq.arbTerm) != 0) {
94,648✔
1792
      dError("dnodeId:%d vgId:%d failed to update arb term", arbHbReq.dnodeId, pReqMember->vgId);
×
1793
      vmReleaseVnode(pMgmt, pVnode);
×
1794
      continue;
×
1795
    }
1796

1797
    if (taosArrayPush(arbHbRsp.hbMembers, &rspMember) == NULL) {
189,296✔
1798
      dError("dnodeId:%d vgId:%d failed to push arb hb rsp member", arbHbReq.dnodeId, pReqMember->vgId);
×
1799
      vmReleaseVnode(pMgmt, pVnode);
×
1800
      goto _OVER;
×
1801
    }
1802

1803
    vmReleaseVnode(pMgmt, pVnode);
94,648✔
1804
  }
1805

1806
  SRpcMsg rspMsg = {.info = pMsg->info};
106,485✔
1807
  int32_t rspLen = tSerializeSVArbHeartBeatRsp(NULL, 0, &arbHbRsp);
106,485✔
1808
  if (rspLen < 0) {
106,485✔
1809
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1810
    goto _OVER;
×
1811
  }
1812

1813
  void *pRsp = rpcMallocCont(rspLen);
106,485✔
1814
  if (pRsp == NULL) {
106,485✔
1815
    terrno = terrno;
×
1816
    goto _OVER;
×
1817
  }
1818

1819
  if (tSerializeSVArbHeartBeatRsp(pRsp, rspLen, &arbHbRsp) <= 0) {
106,485✔
1820
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1821
    rpcFreeCont(pRsp);
×
1822
    goto _OVER;
×
1823
  }
1824
  pMsg->info.rsp = pRsp;
106,485✔
1825
  pMsg->info.rspLen = rspLen;
106,485✔
1826

1827
  terrno = TSDB_CODE_SUCCESS;
106,485✔
1828

1829
_OVER:
106,485✔
1830
  tFreeSVArbHeartBeatReq(&arbHbReq);
106,485✔
1831
  tFreeSVArbHeartBeatRsp(&arbHbRsp);
106,485✔
1832
  return terrno;
106,485✔
1833
}
1834

1835
SArray *vmGetMsgHandles() {
669,427✔
1836
  int32_t code = -1;
669,427✔
1837
  SArray *pArray = taosArrayInit(32, sizeof(SMgmtHandle));
669,427✔
1838
  if (pArray == NULL) goto _OVER;
669,427✔
1839

1840
  if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
669,427✔
1841
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
669,427✔
1842
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
669,427✔
1843
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
669,427✔
1844
  if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
669,427✔
1845
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
669,427✔
1846
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
669,427✔
1847
  if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
669,427✔
1848
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
669,427✔
1849
  if (dmSetMgmtHandle(pArray, TDMT_VND_VSUBTABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
669,427✔
1850
  if (dmSetMgmtHandle(pArray, TDMT_VND_VSTB_REF_DBS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
669,427✔
1851
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_CFG, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
669,427✔
1852
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
669,427✔
1853
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
669,427✔
1854
  if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
669,427✔
1855
  if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
669,427✔
1856
  if (dmSetMgmtHandle(pArray, TDMT_SCH_TASK_NOTIFY, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
669,427✔
1857
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
669,427✔
1858
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
669,427✔
1859
  if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_TTL_EXPIRED_TBS, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
669,427✔
1860
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
669,427✔
1861
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
669,427✔
1862
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
669,427✔
1863
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
669,427✔
1864
  if (dmSetMgmtHandle(pArray, TDMT_VND_SNODE_DROP_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
669,427✔
1865
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
669,427✔
1866
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DELETE_SUB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
669,427✔
1867
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
669,427✔
1868
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SEEK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
669,427✔
1869
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_ADD_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
669,427✔
1870
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DEL_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
669,427✔
1871
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
669,427✔
1872
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME_PUSH, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
669,427✔
1873
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_WALINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
669,427✔
1874
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_COMMITTEDINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
669,427✔
1875
  if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
669,427✔
1876
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
669,427✔
1877
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
669,427✔
1878
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
669,427✔
1879
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
669,427✔
1880
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
669,427✔
1881
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_COMPACT_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
669,427✔
1882
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_TRIM_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
669,427✔
1883
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_SCAN_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
669,427✔
1884
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
669,427✔
1885
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_SCAN, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
669,427✔
1886
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
669,427✔
1887
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_NAME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
669,427✔
1888
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
669,427✔
1889
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
669,427✔
1890
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
669,427✔
1891

1892
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
669,427✔
1893
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
669,427✔
1894
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
669,427✔
1895
  if (dmSetMgmtHandle(pArray, TDMT_VND_DISABLE_WRITE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
669,427✔
1896
  if (dmSetMgmtHandle(pArray, TDMT_VND_SET_KEEP_VERSION, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
669,427✔
1897
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
669,427✔
1898
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
669,427✔
1899
  if (dmSetMgmtHandle(pArray, TDMT_VND_SCAN, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
669,427✔
1900
  if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
669,427✔
1901
  if (dmSetMgmtHandle(pArray, TDMT_VND_LIST_SSMIGRATE_FILESETS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
669,427✔
1902
  if (dmSetMgmtHandle(pArray, TDMT_VND_SSMIGRATE_FILESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
669,427✔
1903
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_SSMIGRATE_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
669,427✔
1904
  if (dmSetMgmtHandle(pArray, TDMT_VND_FOLLOWER_SSMIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
669,427✔
1905
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_SSMIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
669,427✔
1906
  if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM_WAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
669,427✔
1907
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
669,427✔
1908
  if (dmSetMgmtHandle(pArray, TDMT_DND_MOUNT_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
669,427✔
1909
  if (dmSetMgmtHandle(pArray, TDMT_DND_RETRIEVE_MOUNT_PATH, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
669,427✔
1910
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
669,427✔
1911
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
669,427✔
1912
  if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
669,427✔
1913
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CONFIG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
669,427✔
1914
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_ELECTBASELINE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
669,427✔
1915

1916
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT_ELECTION, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
669,427✔
1917
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
669,427✔
1918
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
669,427✔
1919
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
669,427✔
1920
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
669,427✔
1921
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
669,427✔
1922
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
669,427✔
1923
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
669,427✔
1924
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
669,427✔
1925
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
669,427✔
1926
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
669,427✔
1927
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_FORCE_FOLLOWER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
669,427✔
1928

1929
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
669,427✔
1930
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
669,427✔
1931
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
669,427✔
1932
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
669,427✔
1933
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
669,427✔
1934

1935
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_HEARTBEAT, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
669,427✔
1936
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_CHECK_SYNC, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
669,427✔
1937
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_ASSIGNED_LEADER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
669,427✔
1938

1939
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_FETCH, vmPutMsgToStreamReaderQueue, 0) == NULL) goto _OVER;
669,427✔
1940
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_PULL, vmPutMsgToStreamReaderQueue, 0) == NULL) goto _OVER;
669,427✔
1941
  code = 0;
669,427✔
1942

1943
_OVER:
669,427✔
1944
  if (code != 0) {
669,427✔
1945
    taosArrayDestroy(pArray);
×
1946
    return NULL;
×
1947
  } else {
1948
    return pArray;
669,427✔
1949
  }
1950
}
1951

1952
void vmUpdateMetricsInfo(SVnodeMgmt *pMgmt, int64_t clusterId) {
×
1953
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
1954

1955
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
1956
  while (pIter) {
×
1957
    SVnodeObj **ppVnode = pIter;
×
1958
    if (ppVnode == NULL || *ppVnode == NULL) {
×
1959
      continue;
×
1960
    }
1961

1962
    SVnodeObj *pVnode = *ppVnode;
×
1963
    if (!pVnode->failed) {
×
1964
      SRawWriteMetrics metrics = {0};
×
1965
      if (vnodeGetRawWriteMetrics(pVnode->pImpl, &metrics) == 0) {
×
1966
        // Add the metrics to the global metrics system with cluster ID
1967
        SName   name = {0};
×
1968
        int32_t code = tNameFromString(&name, pVnode->pImpl->config.dbname, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1969
        if (code < 0) {
×
1970
          dError("failed to get db name since %s", tstrerror(code));
×
1971
          continue;
×
1972
        }
1973
        code = addWriteMetrics(pVnode->vgId, pMgmt->pData->dnodeId, clusterId, tsLocalEp, name.dbname, &metrics);
×
1974
        if (code != TSDB_CODE_SUCCESS) {
×
1975
          dError("Failed to add write metrics for vgId: %d, code: %d", pVnode->vgId, code);
×
1976
        } else {
1977
          // After successfully adding metrics, reset the vnode's write metrics using atomic operations
1978
          if (vnodeResetRawWriteMetrics(pVnode->pImpl, &metrics) != 0) {
×
1979
            dError("Failed to reset write metrics for vgId: %d", pVnode->vgId);
×
1980
          }
1981
        }
1982
      } else {
1983
        dError("Failed to get write metrics for vgId: %d", pVnode->vgId);
×
1984
      }
1985
    }
1986
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
1987
  }
1988

1989
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
1990
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc