• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4754

25 Sep 2025 05:58AM UTC coverage: 57.946% (-1.0%) from 58.977%
#4754

push

travis-ci

web-flow
enh: taos command line support '-uroot' on windows (#33055)

133189 of 293169 branches covered (45.43%)

Branch coverage included in aggregate %.

201677 of 284720 relevant lines covered (70.83%)

5398749.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.94
/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "metrics.h"
18
#include "taos_monitor.h"
19
#include "vmInt.h"
20
#include "vnd.h"
21
#include "vnodeInt.h"
22

23
extern taos_counter_t *tsInsertCounter;
24

25
// Forward declaration for function defined in metrics.c
26
extern int32_t addWriteMetrics(int32_t vgId, int32_t dnodeId, int64_t clusterId, const char *dnodeEp,
27
                               const char *dbname, const SRawWriteMetrics *pRawMetrics);
28

29
void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
101,948✔
30
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad));
101,948✔
31
  if (pInfo->pVloads == NULL) return;
101,948!
32

33
  tfsUpdateSize(pMgmt->pTfs);
101,948✔
34

35
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
101,948✔
36

37
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
101,948✔
38
  while (pIter) {
374,795✔
39
    SVnodeObj **ppVnode = pIter;
272,847✔
40
    if (ppVnode == NULL || *ppVnode == NULL) continue;
272,847!
41

42
    SVnodeObj *pVnode = *ppVnode;
272,847✔
43
    SVnodeLoad vload = {.vgId = pVnode->vgId};
272,847✔
44
    if (!pVnode->failed) {
272,847!
45
      if (vnodeGetLoad(pVnode->pImpl, &vload) != 0) {
272,847!
46
        dError("failed to get vnode load");
×
47
      }
48
      if (isReset) vnodeResetLoad(pVnode->pImpl, &vload);
272,847✔
49
    }
50
    if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
545,694!
51
      dError("failed to push vnode load");
×
52
    }
53
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
272,847✔
54
  }
55

56
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
101,948✔
57
}
58

59
void vmSetVnodeSyncTimeout(SVnodeMgmt *pMgmt) {
×
60
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
61

62
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
63
  while (pIter) {
×
64
    SVnodeObj **ppVnode = pIter;
×
65
    if (ppVnode == NULL || *ppVnode == NULL) continue;
×
66

67
    SVnodeObj *pVnode = *ppVnode;
×
68

69
    if (vnodeSetSyncTimeout(pVnode->pImpl, tsVnodeElectIntervalMs) != 0) {
×
70
      dError("vgId:%d, failed to vnodeSetSyncTimeout", pVnode->vgId);
×
71
    }
72
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
73
  }
74

75
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
76
}
×
77

78
void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
×
79
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoadLite));
×
80
  if (!pInfo->pVloads) return;
×
81

82
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
83

84
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
85
  while (pIter) {
×
86
    SVnodeObj **ppVnode = pIter;
×
87
    if (ppVnode == NULL || *ppVnode == NULL) continue;
×
88

89
    SVnodeObj *pVnode = *ppVnode;
×
90
    if (!pVnode->failed) {
×
91
      SVnodeLoadLite vload = {0};
×
92
      if (vnodeGetLoadLite(pVnode->pImpl, &vload) == 0) {
×
93
        if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
×
94
          taosArrayDestroy(pInfo->pVloads);
×
95
          pInfo->pVloads = NULL;
×
96
          break;
×
97
        }
98
      }
99
    }
100
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
101
  }
102

103
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
104
}
105

106
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
1✔
107
  SMonVloadInfo vloads = {0};
1✔
108
  vmGetVnodeLoads(pMgmt, &vloads, true);
1✔
109

110
  SArray *pVloads = vloads.pVloads;
1✔
111
  if (pVloads == NULL) return;
1!
112

113
  int32_t totalVnodes = 0;
1✔
114
  int32_t masterNum = 0;
1✔
115
  int64_t numOfSelectReqs = 0;
1✔
116
  int64_t numOfInsertReqs = 0;
1✔
117
  int64_t numOfInsertSuccessReqs = 0;
1✔
118
  int64_t numOfBatchInsertReqs = 0;
1✔
119
  int64_t numOfBatchInsertSuccessReqs = 0;
1✔
120

121
  for (int32_t i = 0; i < taosArrayGetSize(pVloads); ++i) {
7✔
122
    SVnodeLoad *pLoad = taosArrayGet(pVloads, i);
6✔
123
    numOfSelectReqs += pLoad->numOfSelectReqs;
6✔
124
    numOfInsertReqs += pLoad->numOfInsertReqs;
6✔
125
    numOfInsertSuccessReqs += pLoad->numOfInsertSuccessReqs;
6✔
126
    numOfBatchInsertReqs += pLoad->numOfBatchInsertReqs;
6✔
127
    numOfBatchInsertSuccessReqs += pLoad->numOfBatchInsertSuccessReqs;
6✔
128
    if (pLoad->syncState == TAOS_SYNC_STATE_LEADER || pLoad->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
6!
129
      masterNum++;
6✔
130
    }
131
    totalVnodes++;
6✔
132
  }
133

134
  pInfo->vstat.totalVnodes = totalVnodes;
1✔
135
  pInfo->vstat.masterNum = masterNum;
1✔
136
  pInfo->vstat.numOfSelectReqs = numOfSelectReqs;
1✔
137
  pInfo->vstat.numOfInsertReqs = numOfInsertReqs;                          // delta
1✔
138
  pInfo->vstat.numOfInsertSuccessReqs = numOfInsertSuccessReqs;            // delta
1✔
139
  pInfo->vstat.numOfBatchInsertReqs = numOfBatchInsertReqs;                // delta
1✔
140
  pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;  // delta
1✔
141
  pMgmt->state.totalVnodes = totalVnodes;
1✔
142
  pMgmt->state.masterNum = masterNum;
1✔
143
  pMgmt->state.numOfSelectReqs = numOfSelectReqs;
1✔
144
  pMgmt->state.numOfInsertReqs = numOfInsertReqs;
1✔
145
  pMgmt->state.numOfInsertSuccessReqs = numOfInsertSuccessReqs;
1✔
146
  pMgmt->state.numOfBatchInsertReqs = numOfBatchInsertReqs;
1✔
147
  pMgmt->state.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;
1✔
148

149
  if (tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs) != 0) {
1!
150
    dError("failed to get tfs monitor info");
×
151
  }
152
  taosArrayDestroy(pVloads);
1✔
153
}
154

155
void vmCleanExpriedSamples(SVnodeMgmt *pMgmt) {
1✔
156
  int list_size = taos_counter_get_keys_size(tsInsertCounter);
1✔
157
  if (list_size == 0) return;
1!
158
  int32_t *vgroup_ids;
159
  char   **keys;
160
  int      r = 0;
×
161
  r = taos_counter_get_vgroup_ids(tsInsertCounter, &keys, &vgroup_ids, &list_size);
×
162
  if (r) {
×
163
    dError("failed to get vgroup ids");
×
164
    return;
×
165
  }
166
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
167
  for (int i = 0; i < list_size; i++) {
×
168
    int32_t vgroup_id = vgroup_ids[i];
×
169
    void   *vnode = taosHashGet(pMgmt->runngingHash, &vgroup_id, sizeof(int32_t));
×
170
    if (vnode == NULL) {
×
171
      r = taos_counter_delete(tsInsertCounter, keys[i]);
×
172
      if (r) {
×
173
        dError("failed to delete monitor sample key:%s", keys[i]);
×
174
      }
175
    }
176
  }
177
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
178
  if (vgroup_ids) taosMemoryFree(vgroup_ids);
×
179
  if (keys) taosMemoryFree(keys);
×
180
  return;
×
181
}
182

183
void vmCleanExpiredMetrics(SVnodeMgmt *pMgmt) {
×
184
  if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0 || !tsEnableMetrics) {
×
185
    return;
×
186
  }
187

188
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
189
  void     *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
190
  SHashObj *pValidVgroups = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
×
191
  if (pValidVgroups == NULL) {
×
192
    (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
193
    return;
×
194
  }
195

196
  while (pIter != NULL) {
×
197
    SVnodeObj **ppVnode = pIter;
×
198
    if (ppVnode && *ppVnode) {
×
199
      int32_t vgId = (*ppVnode)->vgId;
×
200
      char    dummy = 1;  // hash table value (we only care about the key)
×
201
      if (taosHashPut(pValidVgroups, &vgId, sizeof(int32_t), &dummy, sizeof(char)) != 0) {
×
202
        dError("failed to put vgId:%d to valid vgroups hash", vgId);
×
203
      }
204
    }
205
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
206
  }
207
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
208

209
  // Clean expired metrics by removing metrics for non-existent vgroups
210
  int32_t code = cleanupExpiredMetrics(pValidVgroups);
×
211
  if (code != TSDB_CODE_SUCCESS) {
×
212
    dError("failed to clean expired metrics, code:%d", code);
×
213
  }
214

215
  taosHashCleanup(pValidVgroups);
×
216
}
217

218
static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
9,725✔
219
  memcpy(pCfg, &vnodeCfgDefault, sizeof(SVnodeCfg));
9,725✔
220

221
  pCfg->vgId = pCreate->vgId;
9,725✔
222
  tstrncpy(pCfg->dbname, pCreate->db, sizeof(pCfg->dbname));
9,725✔
223
  pCfg->dbId = pCreate->dbUid;
9,725✔
224
  pCfg->szPage = pCreate->pageSize * 1024;
9,725✔
225
  pCfg->szCache = pCreate->pages;
9,725✔
226
  pCfg->cacheLast = pCreate->cacheLast;
9,725✔
227
  pCfg->cacheLastSize = pCreate->cacheLastSize;
9,725✔
228
  pCfg->szBuf = (uint64_t)pCreate->buffer * 1024 * 1024;
9,725✔
229
  pCfg->isWeak = true;
9,725✔
230
  pCfg->isTsma = pCreate->isTsma;
9,725✔
231
  pCfg->tsdbCfg.compression = pCreate->compression;
9,725✔
232
  pCfg->tsdbCfg.precision = pCreate->precision;
9,725✔
233
  pCfg->tsdbCfg.days = pCreate->daysPerFile;
9,725✔
234
  pCfg->tsdbCfg.keep0 = pCreate->daysToKeep0;
9,725✔
235
  pCfg->tsdbCfg.keep1 = pCreate->daysToKeep1;
9,725✔
236
  pCfg->tsdbCfg.keep2 = pCreate->daysToKeep2;
9,725✔
237
  pCfg->tsdbCfg.keepTimeOffset = pCreate->keepTimeOffset;
9,725✔
238
  pCfg->tsdbCfg.minRows = pCreate->minRows;
9,725✔
239
  pCfg->tsdbCfg.maxRows = pCreate->maxRows;
9,725✔
240
  for (size_t i = 0; i < taosArrayGetSize(pCreate->pRetensions); ++i) {
9,725!
241
    SRetention *pRetention = &pCfg->tsdbCfg.retentions[i];
×
242
    memcpy(pRetention, taosArrayGet(pCreate->pRetensions, i), sizeof(SRetention));
×
243
    if (i == 0) {
×
244
      if ((pRetention->freq >= 0 && pRetention->keep > 0)) pCfg->isRsma = 1;
×
245
    }
246
  }
247
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
248
  pCfg->tsdbCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
9,725✔
249
  if (pCfg->tsdbCfg.encryptAlgorithm == DND_CA_SM4) {
9,725✔
250
    tstrncpy(pCfg->tsdbCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
3✔
251
  }
252
#else
253
  pCfg->tsdbCfg.encryptAlgorithm = 0;
254
#endif
255

256
  pCfg->walCfg.vgId = pCreate->vgId;  // pCreate->mountVgId ? pCreate->mountVgId : pCreate->vgId;
9,725✔
257
  pCfg->walCfg.fsyncPeriod = pCreate->walFsyncPeriod;
9,725✔
258
  pCfg->walCfg.retentionPeriod = pCreate->walRetentionPeriod;
9,725✔
259
  pCfg->walCfg.rollPeriod = pCreate->walRollPeriod;
9,725✔
260
  pCfg->walCfg.retentionSize = pCreate->walRetentionSize;
9,725✔
261
  pCfg->walCfg.segSize = pCreate->walSegmentSize;
9,725✔
262
  pCfg->walCfg.level = pCreate->walLevel;
9,725✔
263
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
264
  pCfg->walCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
9,725✔
265
  if (pCfg->walCfg.encryptAlgorithm == DND_CA_SM4) {
9,725✔
266
    tstrncpy(pCfg->walCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
3✔
267
  }
268
#else
269
  pCfg->walCfg.encryptAlgorithm = 0;
270
#endif
271

272
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
273
  pCfg->tdbEncryptAlgorithm = pCreate->encryptAlgorithm;
9,725✔
274
  if (pCfg->tdbEncryptAlgorithm == DND_CA_SM4) {
9,725✔
275
    tstrncpy(pCfg->tdbEncryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
3✔
276
  }
277
#else
278
  pCfg->tdbEncryptAlgorithm = 0;
279
#endif
280

281
  pCfg->sttTrigger = pCreate->sstTrigger;
9,725✔
282
  pCfg->hashBegin = pCreate->hashBegin;
9,725✔
283
  pCfg->hashEnd = pCreate->hashEnd;
9,725✔
284
  pCfg->hashMethod = pCreate->hashMethod;
9,725✔
285
  pCfg->hashPrefix = pCreate->hashPrefix;
9,725✔
286
  pCfg->hashSuffix = pCreate->hashSuffix;
9,725✔
287
  pCfg->tsdbPageSize = pCreate->tsdbPageSize * 1024;
9,725✔
288

289
  pCfg->ssChunkSize = pCreate->ssChunkSize;
9,725✔
290
  pCfg->ssKeepLocal = pCreate->ssKeepLocal;
9,725✔
291
  pCfg->ssCompact = pCreate->ssCompact;
9,725✔
292

293
  pCfg->standby = 0;
9,725✔
294
  pCfg->syncCfg.replicaNum = 0;
9,725✔
295
  pCfg->syncCfg.totalReplicaNum = 0;
9,725✔
296
  pCfg->syncCfg.changeVersion = pCreate->changeVersion;
9,725✔
297

298
  memset(&pCfg->syncCfg.nodeInfo, 0, sizeof(pCfg->syncCfg.nodeInfo));
9,725✔
299
  for (int32_t i = 0; i < pCreate->replica; ++i) {
22,449✔
300
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
12,723✔
301
    pNode->nodeId = pCreate->replicas[pCfg->syncCfg.replicaNum].id;
12,723✔
302
    pNode->nodePort = pCreate->replicas[pCfg->syncCfg.replicaNum].port;
12,723✔
303
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
12,723✔
304
    tstrncpy(pNode->nodeFqdn, pCreate->replicas[pCfg->syncCfg.replicaNum].fqdn, TSDB_FQDN_LEN);
12,723✔
305
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
12,723✔
306
    pCfg->syncCfg.replicaNum++;
12,724✔
307
  }
308
  if (pCreate->selfIndex != -1) {
9,726✔
309
    pCfg->syncCfg.myIndex = pCreate->selfIndex;
9,533✔
310
  }
311
  for (int32_t i = pCfg->syncCfg.replicaNum; i < pCreate->replica + pCreate->learnerReplica; ++i) {
9,918✔
312
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
192✔
313
    pNode->nodeId = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].id;
192✔
314
    pNode->nodePort = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].port;
192✔
315
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
192✔
316
    tstrncpy(pNode->nodeFqdn, pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].fqdn, TSDB_FQDN_LEN);
192✔
317
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
192✔
318
    pCfg->syncCfg.totalReplicaNum++;
192✔
319
  }
320
  pCfg->syncCfg.totalReplicaNum += pCfg->syncCfg.replicaNum;
9,726✔
321
  if (pCreate->learnerSelfIndex != -1) {
9,726✔
322
    pCfg->syncCfg.myIndex = pCreate->replica + pCreate->learnerSelfIndex;
192✔
323
  }
324
}
9,726✔
325

326
static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) {
9,724✔
327
  pCfg->vgId = pCreate->vgId;
9,724✔
328
  pCfg->vgVersion = pCreate->vgVersion;
9,724✔
329
  pCfg->dropped = 0;
9,724✔
330
  snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCreate->vgId);
9,724✔
331
}
9,724✔
332

333
static int32_t vmTsmaAdjustDays(SVnodeCfg *pCfg, SCreateVnodeReq *pReq) {
9,716✔
334
  if (pReq->isTsma) {
9,716!
335
    SMsgHead *smaMsg = pReq->pTsma;
×
336
    uint32_t  contLen = (uint32_t)(htonl(smaMsg->contLen) - sizeof(SMsgHead));
×
337
    return smaGetTSmaDays(pCfg, POINTER_SHIFT(smaMsg, sizeof(SMsgHead)), contLen, &pCfg->tsdbCfg.days);
×
338
  }
339
  return 0;
9,716✔
340
}
341

342
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
9,717✔
343
  SCreateVnodeReq req = {0};
9,717✔
344
  SVnodeCfg       vnodeCfg = {0};
9,717✔
345
  SWrapperCfg     wrapperCfg = {0};
9,717✔
346
  int32_t         code = -1;
9,717✔
347
  char            path[TSDB_FILENAME_LEN] = {0};
9,717✔
348

349
  if (tDeserializeSCreateVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
9,717!
350
    return TSDB_CODE_INVALID_MSG;
×
351
  }
352

353
  if (req.learnerReplica == 0) {
9,689✔
354
    req.learnerSelfIndex = -1;
9,503✔
355
  }
356

357
  dInfo(
9,689!
358
      "vgId:%d, vnode management handle msgType:%s, start to create vnode, page:%d pageSize:%d buffer:%d szPage:%d "
359
      "szBuf:%" PRIu64 ", cacheLast:%d cacheLastSize:%d sstTrigger:%d tsdbPageSize:%d %d dbname:%s dbId:%" PRId64
360
      ", days:%d keep0:%d keep1:%d keep2:%d keepTimeOffset%d ssChunkSize:%d ssKeepLocal:%d ssCompact:%d tsma:%d "
361
      "precision:%d compression:%d minRows:%d maxRows:%d"
362
      ", wal fsync:%d level:%d retentionPeriod:%d retentionSize:%" PRId64 " rollPeriod:%d segSize:%" PRId64
363
      ", hash method:%d begin:%u end:%u prefix:%d surfix:%d replica:%d selfIndex:%d "
364
      "learnerReplica:%d learnerSelfIndex:%d strict:%d changeVersion:%d encryptAlgorithm:%d",
365
      req.vgId, TMSG_INFO(pMsg->msgType), req.pages, req.pageSize, req.buffer, req.pageSize * 1024,
366
      (uint64_t)req.buffer * 1024 * 1024, req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize,
367
      req.tsdbPageSize * 1024, req.db, req.dbUid, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
368
      req.keepTimeOffset, req.ssChunkSize, req.ssKeepLocal, req.ssCompact, req.isTsma, req.precision, req.compression,
369
      req.minRows, req.maxRows, req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize,
370
      req.walRollPeriod, req.walSegmentSize, req.hashMethod, req.hashBegin, req.hashEnd, req.hashPrefix, req.hashSuffix,
371
      req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict, req.changeVersion,
372
      req.encryptAlgorithm);
373

374
  for (int32_t i = 0; i < req.replica; ++i) {
22,432✔
375
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port,
12,715!
376
          req.replicas[i].id);
377
  }
378
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
9,909✔
379
    dInfo("vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.vgId, i, req.learnerReplicas[i].fqdn,
192!
380
          req.learnerReplicas[i].port, req.replicas[i].id);
381
  }
382

383
  SReplica *pReplica = NULL;
9,717✔
384
  if (req.selfIndex != -1) {
9,717✔
385
    pReplica = &req.replicas[req.selfIndex];
9,525✔
386
  } else {
387
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
192✔
388
  }
389
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
9,717!
390
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
9,715!
391
    (void)tFreeSCreateVnodeReq(&req);
×
392

393
    code = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
394
    dError("vgId:%d, dnodeId:%d ep:%s:%u in request, ep:%s:%u in local, %s", req.vgId, pReplica->id,
×
395
           pReplica->fqdn, pReplica->port, tsLocalFqdn, tsServerPort, tstrerror(code));
396
    return code;
×
397
  }
398

399
  if (req.encryptAlgorithm == DND_CA_SM4) {
9,717✔
400
    if (strlen(tsEncryptKey) == 0) {
3!
401
      (void)tFreeSCreateVnodeReq(&req);
×
402
      code = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
403
      dError("vgId:%d, failed to create vnode since encrypt key is empty, reason:%s", req.vgId, tstrerror(code));
×
404
      return code;
×
405
    }
406
  }
407

408
  vmGenerateVnodeCfg(&req, &vnodeCfg);
9,717✔
409

410
  if ((code = vmTsmaAdjustDays(&vnodeCfg, &req)) < 0) {
9,716!
411
    dError("vgId:%d, failed to adjust tsma days since %s", req.vgId, tstrerror(code));
×
412
    goto _OVER;
×
413
  }
414

415
  vmGenerateWrapperCfg(pMgmt, &req, &wrapperCfg);
9,716✔
416

417
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, req.vgId, false);
9,714✔
418
  if (pVnode != NULL && (req.replica == 1 || !pVnode->failed)) {
9,715!
419
    dError("vgId:%d, already exist", req.vgId);
72!
420
    (void)tFreeSCreateVnodeReq(&req);
72✔
421
    vmReleaseVnode(pMgmt, pVnode);
72✔
422
    code = TSDB_CODE_VND_ALREADY_EXIST;
72✔
423
    return 0;
72✔
424
  }
425

426
  int32_t diskPrimary = vmGetPrimaryDisk(pMgmt, vnodeCfg.vgId);
9,643✔
427
  if (diskPrimary < 0) {
9,643!
428
    diskPrimary = vmAllocPrimaryDisk(pMgmt, vnodeCfg.vgId);
9,643✔
429
  }
430
  wrapperCfg.diskPrimary = diskPrimary;
9,644✔
431

432
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
9,644✔
433

434
  if ((code = vnodeCreate(path, &vnodeCfg, diskPrimary, pMgmt->pTfs)) < 0) {
9,644!
435
    dError("vgId:%d, failed to create vnode since %s", req.vgId, tstrerror(code));
×
436
    vmReleaseVnode(pMgmt, pVnode);
×
437
    vmCleanPrimaryDisk(pMgmt, req.vgId);
×
438
    (void)tFreeSCreateVnodeReq(&req);
×
439
    return code;
×
440
  }
441

442
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, true);
9,645✔
443
  if (pImpl == NULL) {
9,645!
444
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
445
    code = terrno != 0 ? terrno : -1;
×
446
    goto _OVER;
×
447
  }
448

449
  code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl);
9,645✔
450
  if (code != 0) {
9,645!
451
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
452
    code = terrno != 0 ? terrno : code;
×
453
    goto _OVER;
×
454
  }
455

456
  code = vnodeStart(pImpl);
9,645✔
457
  if (code != 0) {
9,645!
458
    dError("vgId:%d, failed to start sync since %s", req.vgId, terrstr());
×
459
    goto _OVER;
×
460
  }
461

462
  code = vmWriteVnodeListToFile(pMgmt);
9,645✔
463
  if (code != 0) {
9,645!
464
    code = terrno != 0 ? terrno : code;
×
465
    goto _OVER;
×
466
  }
467

468
_OVER:
9,645✔
469
  vmCleanPrimaryDisk(pMgmt, req.vgId);
9,645✔
470

471
  if (code != 0) {
9,645!
472
    vmCloseFailedVnode(pMgmt, req.vgId);
×
473

474
    vnodeClose(pImpl);
×
475
    vnodeDestroy(0, path, pMgmt->pTfs, 0);
×
476
  } else {
477
    dInfo("vgId:%d, vnode management handle msgType:%s, end to create vnode, vnode is created", req.vgId,
9,645!
478
          TMSG_INFO(pMsg->msgType));
479
  }
480

481
  (void)tFreeSCreateVnodeReq(&req);
9,645✔
482
  terrno = code;
9,645✔
483
  return code;
9,645✔
484
}
485

486
#ifdef USE_MOUNT
487
typedef struct {
488
  int64_t dbId;
489
  int32_t vgId;
490
  int32_t diskPrimary;
491
} SMountDbVgId;
492
extern int32_t vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo);
493
extern int32_t mndFetchSdbStables(const char *mntName, const char *path, void *output);
494

495
static int compareVnodeInfo(const void *p1, const void *p2) {
21✔
496
  SVnodeInfo *v1 = (SVnodeInfo *)p1;
21✔
497
  SVnodeInfo *v2 = (SVnodeInfo *)p2;
21✔
498

499
  if (v1->config.dbId == v2->config.dbId) {
21✔
500
    if (v1->config.vgId == v2->config.vgId) {
12!
501
      return 0;
×
502
    }
503
    return v1->config.vgId > v2->config.vgId ? 1 : -1;
12!
504
  }
505

506
  return v1->config.dbId > v2->config.dbId ? 1 : -1;
9!
507
}
508
static int compareVgDiskPrimary(const void *p1, const void *p2) {
21✔
509
  SMountDbVgId *v1 = (SMountDbVgId *)p1;
21✔
510
  SMountDbVgId *v2 = (SMountDbVgId *)p2;
21✔
511

512
  if (v1->dbId == v2->dbId) {
21✔
513
    if (v1->vgId == v2->vgId) {
12!
514
      return 0;
×
515
    }
516
    return v1->vgId > v2->vgId ? 1 : -1;
12!
517
  }
518

519
  return v1->dbId > v2->dbId ? 1 : -1;
9!
520
}
521

522
static int32_t vmRetrieveMountDnode(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
3✔
523
  int32_t   code = 0, lino = 0;
3✔
524
  TdFilePtr pFile = NULL;
3✔
525
  char     *content = NULL;
3✔
526
  SJson    *pJson = NULL;
3✔
527
  int64_t   size = 0;
3✔
528
  int64_t   clusterId = 0, dropped = 0, encryptScope = 0;
3✔
529
  char      file[TSDB_MOUNT_FPATH_LEN] = {0};
3✔
530
  SArray   *pDisks = NULL;
3✔
531
  // step 1: fetch clusterId from dnode.json
532
  (void)snprintf(file, sizeof(file), "%s%s%s%sdnode.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP);
3✔
533
  TAOS_CHECK_EXIT(taosStatFile(file, &size, NULL, NULL));
3!
534
  TSDB_CHECK_NULL((pFile = taosOpenFile(file, TD_FILE_READ)), code, lino, _exit, terrno);
3!
535
  TSDB_CHECK_NULL((content = taosMemoryMalloc(size + 1)), code, lino, _exit, terrno);
3!
536
  if (taosReadFile(pFile, content, size) != size) {
3!
537
    TAOS_CHECK_EXIT(terrno);
×
538
  }
539
  content[size] = '\0';
3✔
540
  pJson = tjsonParse(content);
3✔
541
  if (pJson == NULL) {
3!
542
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
543
  }
544
  tjsonGetNumberValue(pJson, "dropped", dropped, code);
3✔
545
  TAOS_CHECK_EXIT(code);
3!
546
  if (dropped == 1) {
3!
547
    TAOS_CHECK_EXIT(TSDB_CODE_MND_MOUNT_DNODE_DROPPED);
×
548
  }
549
  tjsonGetNumberValue(pJson, "encryptScope", encryptScope, code);
3✔
550
  TAOS_CHECK_EXIT(code);
3!
551
  if (encryptScope != 0) {
3!
552
    TAOS_CHECK_EXIT(TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG);
×
553
  }
554
  tjsonGetNumberValue(pJson, "clusterId", clusterId, code);
3✔
555
  TAOS_CHECK_EXIT(code);
3!
556
  if (clusterId == 0) {
3!
557
    TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_CLUSTER_ID);
×
558
  }
559
  pMountInfo->clusterId = clusterId;
3✔
560
  if (content != NULL) taosMemoryFreeClear(content);
3!
561
  if (pJson != NULL) {
3!
562
    cJSON_Delete(pJson);
3✔
563
    pJson = NULL;
3✔
564
  }
565
  if (pFile != NULL) taosCloseFile(&pFile);
3!
566
  // step 2: fetch dataDir from dnode/config/local.json
567
  TAOS_CHECK_EXIT(vmGetMountDisks(pMgmt, pReq->mountPath, &pDisks));
3!
568
  int32_t nDisks = taosArrayGetSize(pDisks);
3✔
569
  if (nDisks < 1 || nDisks > TFS_MAX_DISKS) {
3!
570
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
571
  }
572
  for (int32_t i = 0; i < nDisks; ++i) {
21✔
573
    SDiskCfg *pDisk = TARRAY_GET_ELEM(pDisks, i);
18✔
574
    if (!pMountInfo->pDisks[pDisk->level]) {
18✔
575
      pMountInfo->pDisks[pDisk->level] = taosArrayInit(1, sizeof(char *));
6✔
576
      if (!pMountInfo->pDisks[pDisk->level]) {
6!
577
        TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
578
      }
579
    }
580
    char *pDir = taosStrdup(pDisk->dir);
18!
581
    if (pDir == NULL) {
18!
582
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
583
    }
584
    if (pDisk->primary == 1 && taosArrayGetSize(pMountInfo->pDisks[0])) {
18!
585
      // put the primary disk to the first position of level 0
586
      if (!taosArrayInsert(pMountInfo->pDisks[0], 0, &pDir)) {
3!
587
        taosMemFree(pDir);
×
588
        TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
589
      }
590
    } else if (!taosArrayPush(pMountInfo->pDisks[pDisk->level], &pDir)) {
30!
591
      taosMemFree(pDir);
×
592
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
593
    }
594
  }
595
  for (int32_t i = 0; i < TFS_MAX_TIERS; ++i) {
12✔
596
    int32_t nDisk = taosArrayGetSize(pMountInfo->pDisks[i]);
9✔
597
    if (nDisk < (i == 0 ? 1 : 0) || nDisk > TFS_MAX_DISKS_PER_TIER) {
9!
598
      dError("mount:%s, invalid disk number:%d at level:%d", pReq->mountName, nDisk, i);
×
599
      TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
600
    }
601
  }
602
_exit:
3✔
603
  if (content != NULL) taosMemoryFreeClear(content);
3!
604
  if (pJson != NULL) cJSON_Delete(pJson);
3!
605
  if (pFile != NULL) taosCloseFile(&pFile);
3!
606
  if (pDisks != NULL) {
3!
607
    taosArrayDestroy(pDisks);
3✔
608
    pDisks = NULL;
3✔
609
  }
610
  if (code != 0) {
3!
611
    dError("mount:%s, failed to retrieve mount dnode at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
612
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
613
  } else {
614
    dInfo("mount:%s, success to retrieve mount dnode on dnode:%d, clusterId:%" PRId64 ", path:%s", pReq->mountName,
3!
615
          pReq->dnodeId, pMountInfo->clusterId, pReq->mountPath);
616
  }
617
  TAOS_RETURN(code);
3✔
618
}
619

620
static int32_t vmRetrieveMountVnodes(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
3✔
621
  int32_t       code = 0, lino = 0;
3✔
622
  SWrapperCfg  *pCfgs = NULL;
3✔
623
  int32_t       numOfVnodes = 0;
3✔
624
  char          path[TSDB_MOUNT_FPATH_LEN] = {0};
3✔
625
  TdDirPtr      pDir = NULL;
3✔
626
  TdDirEntryPtr de = NULL;
3✔
627
  SVnodeMgmt    vnodeMgmt = {0};
3✔
628
  SArray       *pVgCfgs = NULL;
3✔
629
  SArray       *pDbInfos = NULL;
3✔
630
  SArray       *pDiskPrimarys = NULL;
3✔
631

632
  snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
3✔
633
  vnodeMgmt.path = path;
3✔
634
  TAOS_CHECK_EXIT(vmGetVnodeListFromFile(&vnodeMgmt, &pCfgs, &numOfVnodes));
3!
635
  dInfo("mount:%s, num of vnodes is %d in path:%s", pReq->mountName, numOfVnodes, vnodeMgmt.path);
3!
636
  TSDB_CHECK_NULL((pVgCfgs = taosArrayInit_s(sizeof(SVnodeInfo), numOfVnodes)), code, lino, _exit, terrno);
3!
637
  TSDB_CHECK_NULL((pDiskPrimarys = taosArrayInit(numOfVnodes, sizeof(SMountDbVgId))), code, lino, _exit, terrno);
3!
638

639
  int32_t nDiskLevel0 = taosArrayGetSize(pMountInfo->pDisks[0]);
3✔
640
  int32_t nVgDropped = 0, j = 0;
3✔
641
  for (int32_t i = 0; i < numOfVnodes; ++i) {
15✔
642
    SWrapperCfg *pCfg = &pCfgs[i];
12✔
643
    // in order to support multi-tier disk, the pCfg->path should be adapted according to the diskPrimary firstly
644
    if (nDiskLevel0 > 1) {
12!
645
      char *pDir = taosArrayGet(pMountInfo->pDisks[0], pCfg->diskPrimary);
12✔
646
      if (!pDir) TAOS_CHECK_EXIT(TSDB_CODE_INTERNAL_ERROR);
12!
647
      (void)snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%svnode%d", *(char **)pDir, TD_DIRSEP, TD_DIRSEP,
12✔
648
                     pCfg->vgId);
649
    }
650
    dInfo("mount:%s, vnode path:%s, dropped:%" PRIi8, pReq->mountName, pCfg->path, pCfg->dropped);
12!
651
    if (pCfg->dropped) {
12!
652
      ++nVgDropped;
×
653
      continue;
×
654
    }
655
    if (!taosCheckAccessFile(pCfg->path, TD_FILE_ACCESS_EXIST_OK | TD_FILE_ACCESS_READ_OK | TD_FILE_ACCESS_WRITE_OK)) {
12!
656
      dError("mount:%s, vnode path:%s, no r/w authority", pReq->mountName, pCfg->path);
×
657
      TAOS_CHECK_EXIT(TSDB_CODE_MND_NO_RIGHTS);
×
658
    }
659
    SVnodeInfo *pInfo = TARRAY_GET_ELEM(pVgCfgs, j++);
12✔
660
    TAOS_CHECK_EXIT(vnodeLoadInfo(pCfg->path, pInfo));
12!
661
    if (pInfo->config.syncCfg.replicaNum > 1) {
12!
662
      dError("mount:%s, vnode path:%s, invalid replica:%d", pReq->mountName, pCfg->path,
×
663
             pInfo->config.syncCfg.replicaNum);
664
      TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_REPLICA);
×
665
    } else if (pInfo->config.vgId != pCfg->vgId) {
12!
666
      dError("mount:%s, vnode path:%s, vgId:%d not match:%d", pReq->mountName, pCfg->path, pInfo->config.vgId,
×
667
             pCfg->vgId);
668
      TAOS_CHECK_EXIT(TSDB_CODE_FILE_CORRUPTED);
×
669
    } else if (pInfo->config.tdbEncryptAlgorithm || pInfo->config.tsdbCfg.encryptAlgorithm ||
12!
670
               pInfo->config.walCfg.encryptAlgorithm) {
12!
671
      dError("mount:%s, vnode path:%s, invalid encrypt algorithm, tdb:%d wal:%d tsdb:%d", pReq->mountName, pCfg->path,
×
672
             pInfo->config.tdbEncryptAlgorithm, pInfo->config.walCfg.encryptAlgorithm,
673
             pInfo->config.tsdbCfg.encryptAlgorithm);
674
      TAOS_CHECK_EXIT(TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG);
×
675
    }
676
    SMountDbVgId dbVgId = {.dbId = pInfo->config.dbId, .vgId = pInfo->config.vgId, .diskPrimary = pCfg->diskPrimary};
12✔
677
    TSDB_CHECK_NULL(taosArrayPush(pDiskPrimarys, &dbVgId), code, lino, _exit, terrno);
12!
678
  }
679
  if (nVgDropped > 0) {
3!
680
    dInfo("mount:%s, %d vnodes are dropped", pReq->mountName, nVgDropped);
×
681
    int32_t nVgToDrop = taosArrayGetSize(pVgCfgs) - nVgDropped;
×
682
    if (nVgToDrop > 0) taosArrayRemoveBatch(pVgCfgs, nVgToDrop - 1, nVgToDrop, NULL);
×
683
  }
684
  int32_t nVgCfg = taosArrayGetSize(pVgCfgs);
3✔
685
  int32_t nDiskPrimary = taosArrayGetSize(pDiskPrimarys);
3✔
686
  if (nVgCfg != nDiskPrimary) {
3!
687
    dError("mount:%s, nVgCfg:%d not match nDiskPrimary:%d", pReq->mountName, nVgCfg, nDiskPrimary);
×
688
    TAOS_CHECK_EXIT(TSDB_CODE_APP_ERROR);
×
689
  }
690
  if (nVgCfg > 1) {
3!
691
    taosArraySort(pVgCfgs, compareVnodeInfo);
3✔
692
    taosArraySort(pDiskPrimarys, compareVgDiskPrimary);
3✔
693
  }
694

695
  int64_t clusterId = pMountInfo->clusterId;
3✔
696
  int64_t dbId = 0, vgId = 0, nDb = 0;
3✔
697
  for (int32_t i = 0; i < nVgCfg; ++i) {
11✔
698
    SVnodeInfo *pInfo = TARRAY_GET_ELEM(pVgCfgs, i);
9✔
699
    if (clusterId != pInfo->config.syncCfg.nodeInfo->clusterId) {
9✔
700
      dError("mount:%s, clusterId:%" PRId64 " not match:%" PRId64, pReq->mountName, clusterId,
1!
701
             pInfo->config.syncCfg.nodeInfo->clusterId);
702
      TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_CLUSTER_ID);
1!
703
    }
704
    if (dbId != pInfo->config.dbId) {
8✔
705
      dbId = pInfo->config.dbId;
4✔
706
      ++nDb;
4✔
707
    }
708
    if (vgId == pInfo->config.vgId) {
8!
709
      TAOS_CHECK_EXIT(TSDB_CODE_FILE_CORRUPTED);
×
710
    } else {
711
      vgId = pInfo->config.vgId;
8✔
712
    }
713
  }
714

715
  if (nDb > 0) {
2!
716
    TSDB_CHECK_NULL((pDbInfos = taosArrayInit_s(sizeof(SMountDbInfo), nDb)), code, lino, _exit, terrno);
2!
717
    int32_t dbIdx = -1;
2✔
718
    for (int32_t i = 0; i < nVgCfg; ++i) {
10✔
719
      SVnodeInfo   *pVgCfg = TARRAY_GET_ELEM(pVgCfgs, i);
8✔
720
      SMountDbVgId *pDiskPrimary = TARRAY_GET_ELEM(pDiskPrimarys, i);
8✔
721
      SMountDbInfo *pDbInfo = NULL;
8✔
722
      if (i == 0 || ((SMountDbInfo *)TARRAY_GET_ELEM(pDbInfos, dbIdx))->dbId != pVgCfg->config.dbId) {
8✔
723
        pDbInfo = TARRAY_GET_ELEM(pDbInfos, ++dbIdx);
4✔
724
        pDbInfo->dbId = pVgCfg->config.dbId;
4✔
725
        snprintf(pDbInfo->dbName, sizeof(pDbInfo->dbName), "%s", pVgCfg->config.dbname);
4✔
726
        TSDB_CHECK_NULL((pDbInfo->pVgs = taosArrayInit(nVgCfg / nDb, sizeof(SMountVgInfo))), code, lino, _exit, terrno);
4!
727
      } else {
728
        pDbInfo = TARRAY_GET_ELEM(pDbInfos, dbIdx);
4✔
729
      }
730
      SMountVgInfo vgInfo = {
8✔
731
          .diskPrimary = pDiskPrimary->diskPrimary,
8✔
732
          .vgId = pVgCfg->config.vgId,
8✔
733
          .dbId = pVgCfg->config.dbId,
8✔
734
          .cacheLastSize = pVgCfg->config.cacheLastSize,
8✔
735
          .szPage = pVgCfg->config.szPage,
8✔
736
          .szCache = pVgCfg->config.szCache,
8✔
737
          .szBuf = pVgCfg->config.szBuf,
8✔
738
          .cacheLast = pVgCfg->config.cacheLast,
8✔
739
          .standby = pVgCfg->config.standby,
8✔
740
          .hashMethod = pVgCfg->config.hashMethod,
8✔
741
          .hashBegin = pVgCfg->config.hashBegin,
8✔
742
          .hashEnd = pVgCfg->config.hashEnd,
8✔
743
          .hashPrefix = pVgCfg->config.hashPrefix,
8✔
744
          .hashSuffix = pVgCfg->config.hashSuffix,
8✔
745
          .sttTrigger = pVgCfg->config.sttTrigger,
8✔
746
          .replications = pVgCfg->config.syncCfg.replicaNum,
8✔
747
          .precision = pVgCfg->config.tsdbCfg.precision,
8✔
748
          .compression = pVgCfg->config.tsdbCfg.compression,
8✔
749
          .slLevel = pVgCfg->config.tsdbCfg.slLevel,
8✔
750
          .daysPerFile = pVgCfg->config.tsdbCfg.days,
8✔
751
          .keep0 = pVgCfg->config.tsdbCfg.keep0,
8✔
752
          .keep1 = pVgCfg->config.tsdbCfg.keep1,
8✔
753
          .keep2 = pVgCfg->config.tsdbCfg.keep2,
8✔
754
          .keepTimeOffset = pVgCfg->config.tsdbCfg.keepTimeOffset,
8✔
755
          .minRows = pVgCfg->config.tsdbCfg.minRows,
8✔
756
          .maxRows = pVgCfg->config.tsdbCfg.maxRows,
8✔
757
          .tsdbPageSize = pVgCfg->config.tsdbPageSize / 1024,
8✔
758
          .ssChunkSize = pVgCfg->config.ssChunkSize,
8✔
759
          .ssKeepLocal = pVgCfg->config.ssKeepLocal,
8✔
760
          .ssCompact = pVgCfg->config.ssCompact,
8✔
761
          .walFsyncPeriod = pVgCfg->config.walCfg.fsyncPeriod,
8✔
762
          .walRetentionPeriod = pVgCfg->config.walCfg.retentionPeriod,
8✔
763
          .walRollPeriod = pVgCfg->config.walCfg.rollPeriod,
8✔
764
          .walRetentionSize = pVgCfg->config.walCfg.retentionSize,
8✔
765
          .walSegSize = pVgCfg->config.walCfg.segSize,
8✔
766
          .walLevel = pVgCfg->config.walCfg.level,
8✔
767
          .encryptAlgorithm = pVgCfg->config.walCfg.encryptAlgorithm,
8✔
768
          .committed = pVgCfg->state.committed,
8✔
769
          .commitID = pVgCfg->state.commitID,
8✔
770
          .commitTerm = pVgCfg->state.commitTerm,
8✔
771
          .numOfSTables = pVgCfg->config.vndStats.numOfSTables,
8✔
772
          .numOfCTables = pVgCfg->config.vndStats.numOfCTables,
8✔
773
          .numOfNTables = pVgCfg->config.vndStats.numOfNTables,
8✔
774
      };
775
      TSDB_CHECK_NULL(taosArrayPush(pDbInfo->pVgs, &vgInfo), code, lino, _exit, terrno);
16!
776
    }
777
  }
778

779
  pMountInfo->pDbs = pDbInfos;
2✔
780

781
_exit:
3✔
782
  if (code != 0) {
3✔
783
    dError("mount:%s, failed to retrieve mount vnode at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
1!
784
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
785
  }
786
  taosArrayDestroy(pDiskPrimarys);
3✔
787
  taosArrayDestroy(pVgCfgs);
3✔
788
  taosMemoryFreeClear(pCfgs);
3!
789
  TAOS_RETURN(code);
3✔
790
}
791

792
/**
793
 *   Retrieve the stables from vnode meta.
794
 */
795
static int32_t vmRetrieveMountStbs(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
2✔
796
  int32_t code = 0, lino = 0;
2✔
797
  char    path[TSDB_MOUNT_FPATH_LEN] = {0};
2✔
798
  int32_t nDb = taosArrayGetSize(pMountInfo->pDbs);
2✔
799
  SArray *suidList = NULL;
2✔
800
  SArray *pCols = NULL;
2✔
801
  SArray *pTags = NULL;
2✔
802
  SArray *pColExts = NULL;
2✔
803
  SArray *pTagExts = NULL;
2✔
804

805
  snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
2✔
806
  for (int32_t i = 0; i < nDb; ++i) {
6✔
807
    SMountDbInfo *pDbInfo = TARRAY_GET_ELEM(pMountInfo->pDbs, i);
4✔
808
    int32_t       nVg = taosArrayGetSize(pDbInfo->pVgs);
4✔
809
    for (int32_t j = 0; j < nVg; ++j) {
4!
810
      SMountVgInfo *pVgInfo = TARRAY_GET_ELEM(pDbInfo->pVgs, j);
4✔
811
      SVnode        vnode = {
4✔
812
                 .config.vgId = pVgInfo->vgId,
4✔
813
                 .config.dbId = pVgInfo->dbId,
4✔
814
                 .config.cacheLastSize = pVgInfo->cacheLastSize,
4✔
815
                 .config.szPage = pVgInfo->szPage,
4✔
816
                 .config.szCache = pVgInfo->szCache,
4✔
817
                 .config.szBuf = pVgInfo->szBuf,
4✔
818
                 .config.cacheLast = pVgInfo->cacheLast,
4✔
819
                 .config.standby = pVgInfo->standby,
4✔
820
                 .config.hashMethod = pVgInfo->hashMethod,
4✔
821
                 .config.hashBegin = pVgInfo->hashBegin,
4✔
822
                 .config.hashEnd = pVgInfo->hashEnd,
4✔
823
                 .config.hashPrefix = pVgInfo->hashPrefix,
4✔
824
                 .config.hashSuffix = pVgInfo->hashSuffix,
4✔
825
                 .config.sttTrigger = pVgInfo->sttTrigger,
4✔
826
                 .config.syncCfg.replicaNum = pVgInfo->replications,
4✔
827
                 .config.tsdbCfg.precision = pVgInfo->precision,
4✔
828
                 .config.tsdbCfg.compression = pVgInfo->compression,
4✔
829
                 .config.tsdbCfg.slLevel = pVgInfo->slLevel,
4✔
830
                 .config.tsdbCfg.days = pVgInfo->daysPerFile,
4✔
831
                 .config.tsdbCfg.keep0 = pVgInfo->keep0,
4✔
832
                 .config.tsdbCfg.keep1 = pVgInfo->keep1,
4✔
833
                 .config.tsdbCfg.keep2 = pVgInfo->keep2,
4✔
834
                 .config.tsdbCfg.keepTimeOffset = pVgInfo->keepTimeOffset,
4✔
835
                 .config.tsdbCfg.minRows = pVgInfo->minRows,
4✔
836
                 .config.tsdbCfg.maxRows = pVgInfo->maxRows,
4✔
837
                 .config.tsdbPageSize = pVgInfo->tsdbPageSize,
4✔
838
                 .config.ssChunkSize = pVgInfo->ssChunkSize,
4✔
839
                 .config.ssKeepLocal = pVgInfo->ssKeepLocal,
4✔
840
                 .config.ssCompact = pVgInfo->ssCompact,
4✔
841
                 .config.walCfg.fsyncPeriod = pVgInfo->walFsyncPeriod,
4✔
842
                 .config.walCfg.retentionPeriod = pVgInfo->walRetentionPeriod,
4✔
843
                 .config.walCfg.rollPeriod = pVgInfo->walRollPeriod,
4✔
844
                 .config.walCfg.retentionSize = pVgInfo->walRetentionSize,
4✔
845
                 .config.walCfg.segSize = pVgInfo->walSegSize,
4✔
846
                 .config.walCfg.level = pVgInfo->walLevel,
4✔
847
                 .config.walCfg.encryptAlgorithm = pVgInfo->encryptAlgorithm,
4✔
848
                 .diskPrimary = pVgInfo->diskPrimary,
4✔
849
      };
850
      void *vnodePath = taosArrayGet(pMountInfo->pDisks[0], pVgInfo->diskPrimary);
4✔
851
      snprintf(path, sizeof(path), "%s%s%s%svnode%d", *(char **)vnodePath, TD_DIRSEP, dmNodeName(VNODE), TD_DIRSEP,
4✔
852
               pVgInfo->vgId);
853
      vnode.path = path;
4✔
854

855
      int32_t rollback = vnodeShouldRollback(&vnode);
4✔
856
      if ((code = metaOpen(&vnode, &vnode.pMeta, rollback)) != 0) {
4!
857
        dError("mount:%s, failed to retrieve stbs of vnode:%d for db:%" PRId64 " on dnode:%d since %s, path:%s",
×
858
               pReq->mountName, pVgInfo->vgId, pVgInfo->dbId, pReq->dnodeId, tstrerror(code), path);
859
        TAOS_CHECK_EXIT(code);
×
860
      } else {
861
        dInfo("mount:%s, success to retrieve stbs of vnode:%d for db:%" PRId64 " on dnode:%d, path:%s", pReq->mountName,
4!
862
              pVgInfo->vgId, pVgInfo->dbId, pReq->dnodeId, path);
863

864
        SMetaReader mr = {0};
4✔
865
        tb_uid_t    suid = 0;
4✔
866
        SMeta      *pMeta = vnode.pMeta;
4✔
867

868
        metaReaderDoInit(&mr, pMeta, META_READER_LOCK);
4✔
869
        if (!suidList && !(suidList = taosArrayInit(1, sizeof(tb_uid_t)))) {
4!
870
          TSDB_CHECK_CODE(terrno, lino, _exit0);
×
871
        }
872
        taosArrayClear(suidList);
4✔
873
        TSDB_CHECK_CODE(vnodeGetStbIdList(&vnode, 0, suidList), lino, _exit0);
4!
874
        dInfo("mount:%s, vnode:%d, db:%" PRId64 ", stbs num:%d on dnode:%d", pReq->mountName, pVgInfo->vgId,
4!
875
              pVgInfo->dbId, (int32_t)taosArrayGetSize(suidList), pReq->dnodeId);
876
        int32_t nStbs = taosArrayGetSize(suidList);
4✔
877
        if (!pDbInfo->pStbs && !(pDbInfo->pStbs = taosArrayInit(nStbs, sizeof(void *)))) {
4!
878
          TSDB_CHECK_CODE(terrno, lino, _exit0);
×
879
        }
880
        for (int32_t i = 0; i < nStbs; ++i) {
16✔
881
          suid = *(tb_uid_t *)taosArrayGet(suidList, i);
12✔
882
          dInfo("mount:%s, vnode:%d, db:%" PRId64 ", stb suid:%" PRIu64 " on dnode:%d", pReq->mountName, pVgInfo->vgId,
12!
883
                pVgInfo->dbId, suid, pReq->dnodeId);
884
          if ((code = metaReaderGetTableEntryByUidCache(&mr, suid)) < 0) {
12!
885
            TSDB_CHECK_CODE(code, lino, _exit0);
×
886
          }
887
          if (mr.me.uid != suid || mr.me.type != TSDB_SUPER_TABLE ||
12!
888
              mr.me.colCmpr.nCols != mr.me.stbEntry.schemaRow.nCols) {
12!
889
            dError("mount:%s, vnode:%d, db:%" PRId64 ", stb info not match, suid:%" PRIu64 " expected:%" PRIu64
×
890
                   ", type:%" PRIi8 " expected:%d, nCmprCols:%d nCols:%d on dnode:%d",
891
                   pReq->mountName, pVgInfo->vgId, pVgInfo->dbId, mr.me.uid, suid, mr.me.type, TSDB_SUPER_TABLE,
892
                   mr.me.colCmpr.nCols, mr.me.stbEntry.schemaRow.nCols, pReq->dnodeId);
893
            TSDB_CHECK_CODE(TSDB_CODE_FILE_CORRUPTED, lino, _exit0);
×
894
          }
895
          SMountStbInfo stbInfo = {
12✔
896
              .req.source = TD_REQ_FROM_APP,
897
              .req.suid = suid,
898
              .req.colVer = mr.me.stbEntry.schemaRow.version,
12✔
899
              .req.tagVer = mr.me.stbEntry.schemaTag.version,
12✔
900
              .req.numOfColumns = mr.me.stbEntry.schemaRow.nCols,
12✔
901
              .req.numOfTags = mr.me.stbEntry.schemaTag.nCols,
12✔
902
              .req.virtualStb = TABLE_IS_VIRTUAL(mr.me.flags) ? 1 : 0,
12✔
903
          };
904
          snprintf(stbInfo.req.name, sizeof(stbInfo.req.name), "%s", mr.me.name);
12✔
905
          if (!pCols && !(pCols = taosArrayInit(stbInfo.req.numOfColumns, sizeof(SFieldWithOptions)))) {
12!
906
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
907
          }
908
          if (!pTags && !(pTags = taosArrayInit(stbInfo.req.numOfTags, sizeof(SField)))) {
12!
909
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
910
          }
911

912
          if (!pColExts && !(pColExts = taosArrayInit(stbInfo.req.numOfColumns, sizeof(col_id_t)))) {
12!
913
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
914
          }
915
          if (!pTagExts && !(pTagExts = taosArrayInit(stbInfo.req.numOfTags, sizeof(col_id_t)))) {
12!
916
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
917
          }
918
          taosArrayClear(pCols);
12✔
919
          taosArrayClear(pTags);
12✔
920
          taosArrayClear(pColExts);
12✔
921
          taosArrayClear(pTagExts);
12✔
922
          stbInfo.req.pColumns = pCols;
12✔
923
          stbInfo.req.pTags = pTags;
12✔
924
          stbInfo.pColExts = pColExts;
12✔
925
          stbInfo.pTagExts = pTagExts;
12✔
926

927
          for (int32_t c = 0; c < stbInfo.req.numOfColumns; ++c) {
72✔
928
            SSchema          *pSchema = mr.me.stbEntry.schemaRow.pSchema + c;
60✔
929
            SColCmpr         *pColComp = mr.me.colCmpr.pColCmpr + c;
60✔
930
            SFieldWithOptions col = {
60✔
931
                .type = pSchema->type,
60✔
932
                .flags = pSchema->flags,
60✔
933
                .bytes = pSchema->bytes,
60✔
934
                .compress = pColComp->alg,
60✔
935
            };
936
            (void)snprintf(col.name, sizeof(col.name), "%s", pSchema->name);
60✔
937
            if (pSchema->colId != pColComp->id) {
60!
938
              TSDB_CHECK_CODE(TSDB_CODE_FILE_CORRUPTED, lino, _exit0);
×
939
            }
940
            if (mr.me.pExtSchemas) {
60!
941
              col.typeMod = (mr.me.pExtSchemas + c)->typeMod;
×
942
            }
943
            TSDB_CHECK_NULL(taosArrayPush(pCols, &col), code, lino, _exit0, terrno);
60!
944
            TSDB_CHECK_NULL(taosArrayPush(pColExts, &pSchema->colId), code, lino, _exit0, terrno);
120!
945
          }
946
          for (int32_t t = 0; t < stbInfo.req.numOfTags; ++t) {
28✔
947
            SSchema *pSchema = mr.me.stbEntry.schemaTag.pSchema + t;
16✔
948
            SField   tag = {
16✔
949
                  .type = pSchema->type,
16✔
950
                  .flags = pSchema->flags,
16✔
951
                  .bytes = pSchema->bytes,
16✔
952
            };
953
            (void)snprintf(tag.name, sizeof(tag.name), "%s", pSchema->name);
16✔
954
            TSDB_CHECK_NULL(taosArrayPush(pTags, &tag), code, lino, _exit0, terrno);
16!
955
            TSDB_CHECK_NULL(taosArrayPush(pTagExts, &pSchema->colId), code, lino, _exit0, terrno);
32!
956
          }
957
          tDecoderClear(&mr.coder);
12✔
958

959
          // serialize the SMountStbInfo
960
          int32_t firstPartLen = 0;
12✔
961
          int32_t msgLen = tSerializeSMountStbInfo(NULL, 0, &firstPartLen, &stbInfo);
12✔
962
          if (msgLen <= 0) {
12!
963
            TSDB_CHECK_CODE(msgLen < 0 ? msgLen : TSDB_CODE_INTERNAL_ERROR, lino, _exit0);
×
964
          }
965
          void *pBuf = taosMemoryMalloc((sizeof(int32_t) << 1) + msgLen);  // totalLen(4)|1stPartLen(4)|1stPart|2ndPart
12!
966
          if (!pBuf) TSDB_CHECK_CODE(TSDB_CODE_OUT_OF_MEMORY, lino, _exit0);
12!
967
          *(int32_t *)pBuf = (sizeof(int32_t) << 1) + msgLen;
12✔
968
          *(int32_t *)POINTER_SHIFT(pBuf, sizeof(int32_t)) = firstPartLen;
12✔
969
          if (tSerializeSMountStbInfo(POINTER_SHIFT(pBuf, (sizeof(int32_t) << 1)), msgLen, NULL, &stbInfo) <= 0) {
12!
970
            taosMemoryFree(pBuf);
×
971
            TSDB_CHECK_CODE(msgLen < 0 ? msgLen : TSDB_CODE_INTERNAL_ERROR, lino, _exit0);
×
972
          }
973
          if (!taosArrayPush(pDbInfo->pStbs, &pBuf)) {
24!
974
            taosMemoryFree(pBuf);
×
975
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
976
          }
977
        }
978
      _exit0:
4✔
979
        metaReaderClear(&mr);
4✔
980
        metaClose(&vnode.pMeta);
4✔
981
        TAOS_CHECK_EXIT(code);
4!
982
      }
983
      break;  // retrieve stbs from one vnode is enough
4✔
984
    }
985
  }
986
_exit:
2✔
987
  if (code != 0) {
2!
988
    dError("mount:%s, failed to retrieve mount stbs at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
989
           pReq->dnodeId, tstrerror(code), path);
990
  }
991
  taosArrayDestroy(suidList);
2✔
992
  taosArrayDestroy(pCols);
2✔
993
  taosArrayDestroy(pTags);
2✔
994
  taosArrayDestroy(pColExts);
2✔
995
  taosArrayDestroy(pTagExts);
2✔
996
  TAOS_RETURN(code);
2✔
997
}
998

999
int32_t vmMountCheckRunning(const char *mountName, const char *mountPath, TdFilePtr *pFile, int32_t retryLimit) {
9✔
1000
  int32_t code = 0, lino = 0;
9✔
1001
  int32_t retryTimes = 0;
9✔
1002
  char    filepath[PATH_MAX] = {0};
9✔
1003
  (void)snprintf(filepath, sizeof(filepath), "%s%s.running", mountPath, TD_DIRSEP);
9✔
1004
  TSDB_CHECK_NULL((*pFile = taosOpenFile(
9!
1005
                       filepath, TD_FILE_CREATE | TD_FILE_READ | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_CLOEXEC)),
1006
                  code, lino, _exit, terrno);
1007
  int32_t ret = 0;
9✔
1008
  do {
1009
    ret = taosLockFile(*pFile);
11✔
1010
    if (ret == 0) break;
11✔
1011
    taosMsleep(1000);
3✔
1012
    ++retryTimes;
3✔
1013
    dError("mount:%s, failed to lock file:%s since %s, retryTimes:%d", mountName, filepath, tstrerror(ret), retryTimes);
3!
1014
  } while (retryTimes < retryLimit);
3✔
1015
  TAOS_CHECK_EXIT(ret);
9✔
1016
_exit:
8✔
1017
  if (code != 0) {
9✔
1018
    (void)taosCloseFile(pFile);
1✔
1019
    *pFile = NULL;
1✔
1020
    dError("mount:%s, failed to check running at line %d since %s, path:%s", mountName, lino, tstrerror(code),
1!
1021
           filepath);
1022
  }
1023
  TAOS_RETURN(code);
9✔
1024
}
1025

1026
static int32_t vmRetrieveMountPreCheck(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
6✔
1027
  int32_t code = 0, lino = 0;
6✔
1028
  char    path[TSDB_MOUNT_FPATH_LEN] = {0};
6✔
1029
  TSDB_CHECK_CONDITION(taosCheckAccessFile(pReq->mountPath, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
6✔
1030
  TAOS_CHECK_EXIT(vmMountCheckRunning(pReq->mountName, pReq->mountPath, &pMountInfo->pFile, 3));
5✔
1031
  (void)snprintf(path, sizeof(path), "%s%s%s%sdnode.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP);
4✔
1032
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
4✔
1033
  (void)snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(MNODE));
3✔
1034
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
3!
1035
  (void)snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
3✔
1036
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
3!
1037
  (void)snprintf(path, sizeof(path), "%s%s%s%sconfig%slocal.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP,
3✔
1038
           TD_DIRSEP);
1039
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
3!
1040
_exit:
3✔
1041
  if (code != 0) {
6✔
1042
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
3!
1043
           pReq->dnodeId, tstrerror(code), path);
1044
  }
1045
  TAOS_RETURN(code);
6✔
1046
}
1047

1048
static int32_t vmRetrieveMountPathImpl(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, SRetrieveMountPathReq *pReq,
6✔
1049
                                       SMountInfo *pMountInfo) {
1050
  int32_t code = 0, lino = 0;
6✔
1051
  pMountInfo->dnodeId = pReq->dnodeId;
6✔
1052
  pMountInfo->mountUid = pReq->mountUid;
6✔
1053
  (void)tsnprintf(pMountInfo->mountName, sizeof(pMountInfo->mountName), "%s", pReq->mountName);
6✔
1054
  (void)tsnprintf(pMountInfo->mountPath, sizeof(pMountInfo->mountPath), "%s", pReq->mountPath);
6✔
1055
  pMountInfo->ignoreExist = pReq->ignoreExist;
6✔
1056
  pMountInfo->valLen = pReq->valLen;
6✔
1057
  pMountInfo->pVal = pReq->pVal;
6✔
1058
  TAOS_CHECK_EXIT(vmRetrieveMountPreCheck(pMgmt, pReq, pMountInfo));
6✔
1059
  TAOS_CHECK_EXIT(vmRetrieveMountDnode(pMgmt, pReq, pMountInfo));
3!
1060
  TAOS_CHECK_EXIT(vmRetrieveMountVnodes(pMgmt, pReq, pMountInfo));
3✔
1061
  TAOS_CHECK_EXIT(vmRetrieveMountStbs(pMgmt, pReq, pMountInfo));
2!
1062
_exit:
2✔
1063
  if (code != 0) {
6✔
1064
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
4!
1065
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
1066
  }
1067
  TAOS_RETURN(code);
6✔
1068
}
1069

1070
int32_t vmProcessRetrieveMountPathReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
6✔
1071
  int32_t               code = 0, lino = 0;
6✔
1072
  int32_t               rspCode = 0;
6✔
1073
  SVnodeMgmt            vndMgmt = {0};
6✔
1074
  SMountInfo            mountInfo = {0};
6✔
1075
  void                 *pBuf = NULL;
6✔
1076
  int32_t               bufLen = 0;
6✔
1077
  SRetrieveMountPathReq req = {0};
6✔
1078

1079
  vndMgmt = *pMgmt;
6✔
1080
  vndMgmt.path = NULL;
6✔
1081
  TAOS_CHECK_GOTO(tDeserializeSRetrieveMountPathReq(pMsg->pCont, pMsg->contLen, &req), &lino, _end);
6!
1082
  dInfo("mount:%s, start to retrieve path:%s", req.mountName, req.mountPath);
6!
1083
  TAOS_CHECK_GOTO(vmRetrieveMountPathImpl(&vndMgmt, pMsg, &req, &mountInfo), &lino, _end);
6✔
1084
_end:
2✔
1085
  TSDB_CHECK_CONDITION((bufLen = tSerializeSMountInfo(NULL, 0, &mountInfo)) >= 0, rspCode, lino, _exit, bufLen);
6!
1086
  TSDB_CHECK_CONDITION((pBuf = rpcMallocCont(bufLen)), rspCode, lino, _exit, terrno);
6!
1087
  TSDB_CHECK_CONDITION((bufLen = tSerializeSMountInfo(pBuf, bufLen, &mountInfo)) >= 0, rspCode, lino, _exit, bufLen);
6!
1088
  pMsg->info.rsp = pBuf;
6✔
1089
  pMsg->info.rspLen = bufLen;
6✔
1090
_exit:
6✔
1091
  if (rspCode != 0) {
6!
1092
    // corner case: if occurs, the client will not receive the response, and the client should be killed manually
1093
    dError("mount:%s, failed to retrieve mount at line %d since %s, dnode:%d, path:%s", req.mountName, lino,
×
1094
           tstrerror(rspCode), req.dnodeId, req.mountPath);
1095
    rpcFreeCont(pBuf);
×
1096
    code = rspCode;
×
1097
  } else if (code != 0) {
6✔
1098
    // the client would receive the response with error msg
1099
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", req.mountName, lino,
4!
1100
           req.dnodeId, tstrerror(code), req.mountPath);
1101
  } else {
1102
    int32_t nVgs = 0;
2✔
1103
    int32_t nDbs = taosArrayGetSize(mountInfo.pDbs);
2✔
1104
    for (int32_t i = 0; i < nDbs; ++i) {
6✔
1105
      SMountDbInfo *pDb = TARRAY_GET_ELEM(mountInfo.pDbs, i);
4✔
1106
      nVgs += taosArrayGetSize(pDb->pVgs);
4✔
1107
    }
1108
    dInfo("mount:%s, success to retrieve mount, nDbs:%d, nVgs:%d, path:%s", req.mountName, nDbs, nVgs, req.mountPath);
2!
1109
  }
1110
  taosMemFreeClear(vndMgmt.path);
6!
1111
  tFreeMountInfo(&mountInfo, false);
6✔
1112
  TAOS_RETURN(code);
6✔
1113
}
1114

1115
static int32_t vmMountVnode(SVnodeMgmt *pMgmt, const char *path, SVnodeCfg *pCfg, int32_t diskPrimary,
8✔
1116
                            SMountVnodeReq *req, STfs *pMountTfs) {
1117
  int32_t    code = 0;
8✔
1118
  SVnodeInfo info = {0};
8✔
1119
  char       hostDir[TSDB_FILENAME_LEN] = {0};
8✔
1120
  char       mountDir[TSDB_FILENAME_LEN] = {0};
8✔
1121
  char       mountVnode[32] = {0};
8✔
1122

1123
  if ((code = vnodeCheckCfg(pCfg)) < 0) {
8!
1124
    vError("vgId:%d, mount:%s, failed to mount vnode since:%s", pCfg->vgId, req->mountName, tstrerror(code));
×
1125
    return code;
×
1126
  }
1127

1128
  vnodeGetPrimaryDir(path, 0, pMgmt->pTfs, hostDir, TSDB_FILENAME_LEN);
8✔
1129
  if ((code = taosMkDir(hostDir))) {
8!
1130
    vError("vgId:%d, mount:%s, failed to prepare vnode dir since %s, host path: %s", pCfg->vgId, req->mountName,
×
1131
           tstrerror(code), hostDir);
1132
    return code;
×
1133
  }
1134

1135
  info.config = *pCfg;  // copy the config
8✔
1136
  info.state.committed = req->committed;
8✔
1137
  info.state.commitID = req->commitID;
8✔
1138
  info.state.commitTerm = req->commitTerm;
8✔
1139
  info.state.applied = req->committed;
8✔
1140
  info.state.applyTerm = req->commitTerm;
8✔
1141
  info.config.vndStats.numOfSTables = req->numOfSTables;
8✔
1142
  info.config.vndStats.numOfCTables = req->numOfCTables;
8✔
1143
  info.config.vndStats.numOfNTables = req->numOfNTables;
8✔
1144

1145
  SVnodeInfo oldInfo = {0};
8✔
1146
  oldInfo.config = vnodeCfgDefault;
8✔
1147
  if (vnodeLoadInfo(hostDir, &oldInfo) == 0) {
8!
1148
    if (oldInfo.config.dbId != info.config.dbId) {
×
1149
      code = TSDB_CODE_VND_ALREADY_EXIST_BUT_NOT_MATCH;
×
1150
      vError("vgId:%d, mount:%s, vnode config info already exists at %s. oldDbId:%" PRId64 "(%s) at cluster:%" PRId64
×
1151
             ", newDbId:%" PRId64 "(%s) at cluser:%" PRId64 ", code:%s",
1152
             oldInfo.config.vgId, req->mountName, hostDir, oldInfo.config.dbId, oldInfo.config.dbname,
1153
             oldInfo.config.syncCfg.nodeInfo[oldInfo.config.syncCfg.myIndex].clusterId, info.config.dbId,
1154
             info.config.dbname, info.config.syncCfg.nodeInfo[info.config.syncCfg.myIndex].clusterId, tstrerror(code));
1155

1156
    } else {
1157
      vWarn("vgId:%d, mount:%s, vnode config info already exists at %s.", oldInfo.config.vgId, req->mountName, hostDir);
×
1158
    }
1159
    return code;
×
1160
  }
1161

1162
  char hostSubDir[TSDB_FILENAME_LEN] = {0};
8✔
1163
  char mountSubDir[TSDB_FILENAME_LEN] = {0};
8✔
1164
  (void)snprintf(mountVnode, sizeof(mountVnode), "vnode%svnode%d", TD_DIRSEP, req->mountVgId);
8✔
1165
  vnodeGetPrimaryDir(mountVnode, diskPrimary, pMountTfs, mountDir, TSDB_FILENAME_LEN);
8✔
1166
  static const char *vndSubDirs[] = {"meta", "sync", "tq", "tsdb", "wal"};
1167
  for (int32_t i = 0; i < tListLen(vndSubDirs); ++i) {
48✔
1168
    (void)snprintf(hostSubDir, sizeof(hostSubDir), "%s%s%s", hostDir, TD_DIRSEP, vndSubDirs[i]);
40✔
1169
    (void)snprintf(mountSubDir, sizeof(mountSubDir), "%s%s%s", mountDir, TD_DIRSEP, vndSubDirs[i]);
40✔
1170
    if ((code = taosSymLink(mountSubDir, hostSubDir)) != 0) {
40!
1171
      vError("vgId:%d, mount:%s, failed to create vnode symlink %s -> %s since %s", info.config.vgId, req->mountName,
×
1172
             mountSubDir, hostSubDir, tstrerror(code));
1173
      return code;
×
1174
    }
1175
  }
1176
  vInfo("vgId:%d, mount:save vnode config while create", info.config.vgId);
8!
1177
  if ((code = vnodeSaveInfo(hostDir, &info)) < 0 || (code = vnodeCommitInfo(hostDir)) < 0) {
8!
1178
    vError("vgId:%d, mount:%s, failed to save vnode config since %s, mount path: %s", pCfg ? pCfg->vgId : 0,
×
1179
           req->mountName, tstrerror(code), hostDir);
1180
    return code;
×
1181
  }
1182
  vInfo("vgId:%d, mount:%s, vnode is mounted from %s to %s", info.config.vgId, req->mountName, mountDir, hostDir);
8!
1183
  return 0;
8✔
1184
}
1185

1186
int32_t vmProcessMountVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
8✔
1187
  int32_t          code = 0, lino = 0;
8✔
1188
  SMountVnodeReq   req = {0};
8✔
1189
  SCreateVnodeReq *pCreateReq = &req.createReq;
8✔
1190
  SVnodeCfg        vnodeCfg = {0};
8✔
1191
  SWrapperCfg      wrapperCfg = {0};
8✔
1192
  SVnode          *pImpl = NULL;
8✔
1193
  STfs            *pMountTfs = NULL;
8✔
1194
  char             path[TSDB_FILENAME_LEN] = {0};
8✔
1195
  bool             releaseTfs = false;
8✔
1196

1197
  if (tDeserializeSMountVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
8!
1198
    dError("vgId:%d, failed to mount vnode since deserialize request error", pCreateReq->vgId);
×
1199
    return TSDB_CODE_INVALID_MSG;
×
1200
  }
1201

1202
  if (pCreateReq->learnerReplica == 0) {
8!
1203
    pCreateReq->learnerSelfIndex = -1;
8✔
1204
  }
1205
  for (int32_t i = 0; i < pCreateReq->replica; ++i) {
16✔
1206
    dInfo("mount:%s, vgId:%d, replica:%d ep:%s:%u dnode:%d", req.mountName, pCreateReq->vgId, i,
8!
1207
          pCreateReq->replicas[i].fqdn, pCreateReq->replicas[i].port, pCreateReq->replicas[i].id);
1208
  }
1209
  for (int32_t i = 0; i < pCreateReq->learnerReplica; ++i) {
8!
1210
    dInfo("mount:%s, vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.mountName, pCreateReq->vgId, i,
×
1211
          pCreateReq->learnerReplicas[i].fqdn, pCreateReq->learnerReplicas[i].port, pCreateReq->replicas[i].id);
1212
  }
1213

1214
  SReplica *pReplica = NULL;
8✔
1215
  if (pCreateReq->selfIndex != -1) {
8!
1216
    pReplica = &pCreateReq->replicas[pCreateReq->selfIndex];
8✔
1217
  } else {
1218
    pReplica = &pCreateReq->learnerReplicas[pCreateReq->learnerSelfIndex];
×
1219
  }
1220
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
8!
1221
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
8!
1222
    (void)tFreeSMountVnodeReq(&req);
×
1223
    code = TSDB_CODE_INVALID_MSG;
×
1224
    dError("mount:%s, vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode, reason:%s", req.mountName,
×
1225
           pCreateReq->vgId, pReplica->id, pReplica->fqdn, pReplica->port, tstrerror(code));
1226
    return code;
×
1227
  }
1228
  vmGenerateVnodeCfg(pCreateReq, &vnodeCfg);
8✔
1229
  vnodeCfg.mountVgId = req.mountVgId;
8✔
1230
  vmGenerateWrapperCfg(pMgmt, pCreateReq, &wrapperCfg);
8✔
1231
  wrapperCfg.mountId = req.mountId;
8✔
1232

1233
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, pCreateReq->vgId, false);
8✔
1234
  if (pVnode != NULL && (pCreateReq->replica == 1 || !pVnode->failed)) {
8!
1235
    dError("mount:%s, vgId:%d, already exist", req.mountName, pCreateReq->vgId);
×
1236
    (void)tFreeSMountVnodeReq(&req);
×
1237
    vmReleaseVnode(pMgmt, pVnode);
×
1238
    code = TSDB_CODE_VND_ALREADY_EXIST;
×
1239
    return 0;
×
1240
  }
1241
  vmReleaseVnode(pMgmt, pVnode);
8✔
1242

1243
  wrapperCfg.diskPrimary = req.diskPrimary;
8✔
1244
  (void)snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
8✔
1245
  TAOS_CHECK_EXIT(vmAcquireMountTfs(pMgmt, req.mountId, req.mountName, req.mountPath, &pMountTfs));
8!
1246
  releaseTfs = true;
8✔
1247

1248
  TAOS_CHECK_EXIT(vmMountVnode(pMgmt, path, &vnodeCfg, wrapperCfg.diskPrimary, &req, pMountTfs));
8!
1249
  if (!(pImpl = vnodeOpen(path, 0, pMgmt->pTfs, pMountTfs, pMgmt->msgCb, true))) {
8!
1250
    TAOS_CHECK_EXIT(terrno != 0 ? terrno : -1);
×
1251
  }
1252
  if ((code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl)) != 0) {
8!
1253
    TAOS_CHECK_EXIT(terrno != 0 ? terrno : code);
×
1254
  }
1255
  TAOS_CHECK_EXIT(vnodeStart(pImpl));
8!
1256
  TAOS_CHECK_EXIT(vmWriteVnodeListToFile(pMgmt));
8!
1257
  TAOS_CHECK_EXIT(vmWriteMountListToFile(pMgmt));
8!
1258
_exit:
8✔
1259
  vmCleanPrimaryDisk(pMgmt, pCreateReq->vgId);
8✔
1260
  if (code != 0) {
8!
1261
    dError("mount:%s, vgId:%d, msgType:%s, failed at line %d to mount vnode since %s", req.mountName, pCreateReq->vgId,
×
1262
           TMSG_INFO(pMsg->msgType), lino, tstrerror(code));
1263
    vmCloseFailedVnode(pMgmt, pCreateReq->vgId);
×
1264
    vnodeClose(pImpl);
×
1265
    vnodeDestroy(0, path, pMgmt->pTfs, 0);
×
1266
    if (releaseTfs) vmReleaseMountTfs(pMgmt, req.mountId, 1);
×
1267
  } else {
1268
    dInfo("mount:%s, vgId:%d, msgType:%s, success to mount vnode", req.mountName, pCreateReq->vgId,
8!
1269
          TMSG_INFO(pMsg->msgType));
1270
  }
1271

1272
  pMsg->code = code;
8✔
1273
  pMsg->info.rsp = NULL;
8✔
1274
  pMsg->info.rspLen = 0;
8✔
1275

1276
  (void)tFreeSMountVnodeReq(&req);
8✔
1277
  TAOS_RETURN(code);
8✔
1278
}
1279
#endif  // USE_MOUNT
1280

1281
// alter replica doesn't use this, but restore dnode still use this
1282
int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
3,520✔
1283
  SAlterVnodeTypeReq req = {0};
3,520✔
1284
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
3,520!
1285
    terrno = TSDB_CODE_INVALID_MSG;
×
1286
    return -1;
×
1287
  }
1288

1289
  if (req.learnerReplicas == 0) {
1290
    req.learnerSelfIndex = -1;
1291
  }
1292

1293
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process alter-node-type-request", req.vgId,
3,520!
1294
        TMSG_INFO(pMsg->msgType));
1295

1296
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
3,520✔
1297
  if (pVnode == NULL) {
3,520!
1298
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
1299
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1300
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1301
    return -1;
×
1302
  }
1303

1304
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
3,520✔
1305
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
3,520!
1306
  if (role == TAOS_SYNC_ROLE_VOTER) {
3,520!
1307
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
1308
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
1309
    vmReleaseVnode(pMgmt, pVnode);
×
1310
    return -1;
×
1311
  }
1312

1313
  dInfo("vgId:%d, checking node catch up", req.vgId);
3,520!
1314
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
3,520✔
1315
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
3,341✔
1316
    vmReleaseVnode(pMgmt, pVnode);
3,341✔
1317
    return -1;
3,341✔
1318
  }
1319

1320
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
179!
1321

1322
  int32_t vgId = req.vgId;
179✔
1323
  dInfo("vgId:%d, start to alter vnode type replica:%d selfIndex:%d strict:%d changeVersion:%d", vgId, req.replica,
179!
1324
        req.selfIndex, req.strict, req.changeVersion);
1325
  for (int32_t i = 0; i < req.replica; ++i) {
691✔
1326
    SReplica *pReplica = &req.replicas[i];
512✔
1327
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
512!
1328
  }
1329
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
179!
1330
    SReplica *pReplica = &req.learnerReplicas[i];
×
1331
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
×
1332
  }
1333

1334
  if (req.replica <= 0 || (req.selfIndex < 0 && req.learnerSelfIndex < 0) || req.selfIndex >= req.replica ||
179!
1335
      req.learnerSelfIndex >= req.learnerReplica) {
179!
1336
    terrno = TSDB_CODE_INVALID_MSG;
×
1337
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
1338
    vmReleaseVnode(pMgmt, pVnode);
×
1339
    return -1;
×
1340
  }
1341

1342
  SReplica *pReplica = NULL;
179✔
1343
  if (req.selfIndex != -1) {
179!
1344
    pReplica = &req.replicas[req.selfIndex];
179✔
1345
  } else {
1346
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
×
1347
  }
1348

1349
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
179!
1350
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
179!
1351
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1352
    dError("vgId:%d, dnodeId:%d ep:%s:%u in request, ep:%s:%u in local, %s", vgId, pReplica->id, pReplica->fqdn,
×
1353
           pReplica->port, tsLocalFqdn, tsServerPort, tstrerror(terrno));
1354
    vmReleaseVnode(pMgmt, pVnode);
×
1355
    return -1;
×
1356
  }
1357

1358
  dInfo("vgId:%d, start to close vnode", vgId);
179!
1359
  SWrapperCfg wrapperCfg = {
179✔
1360
      .dropped = pVnode->dropped,
179✔
1361
      .vgId = pVnode->vgId,
179✔
1362
      .vgVersion = pVnode->vgVersion,
179✔
1363
      .diskPrimary = pVnode->diskPrimary,
179✔
1364
  };
1365
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
179✔
1366

1367
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
179✔
1368
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
179✔
1369

1370
  int32_t diskPrimary = wrapperCfg.diskPrimary;
179✔
1371
  char    path[TSDB_FILENAME_LEN] = {0};
179✔
1372
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
179✔
1373

1374
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
179!
1375
  if (vnodeAlterReplica(path, &req, diskPrimary, pMgmt->pTfs) < 0) {
179!
1376
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
1377
    return -1;
×
1378
  }
1379

1380
  dInfo("vgId:%d, begin to open vnode", vgId);
179!
1381
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
179✔
1382
  if (pImpl == NULL) {
179!
1383
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
1384
    return -1;
×
1385
  }
1386

1387
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
179!
1388
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
1389
    return -1;
×
1390
  }
1391

1392
  if (vnodeStart(pImpl) != 0) {
179!
1393
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
1394
    return -1;
×
1395
  }
1396

1397
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process alter-node-type-request, vnode config is altered",
179!
1398
        req.vgId, TMSG_INFO(pMsg->msgType));
1399
  return 0;
179✔
1400
}
1401

1402
int32_t vmProcessCheckLearnCatchupReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
1403
  SCheckLearnCatchupReq req = {0};
×
1404
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
×
1405
    terrno = TSDB_CODE_INVALID_MSG;
×
1406
    return -1;
×
1407
  }
1408

1409
  if (req.learnerReplicas == 0) {
1410
    req.learnerSelfIndex = -1;
1411
  }
1412

1413
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process check-learner-catchup-request", req.vgId,
×
1414
        TMSG_INFO(pMsg->msgType));
1415

1416
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
×
1417
  if (pVnode == NULL) {
×
1418
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
1419
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1420
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1421
    return -1;
×
1422
  }
1423

1424
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
×
1425
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
×
1426
  if (role == TAOS_SYNC_ROLE_VOTER) {
×
1427
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
1428
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
1429
    vmReleaseVnode(pMgmt, pVnode);
×
1430
    return -1;
×
1431
  }
1432

1433
  dInfo("vgId:%d, checking node catch up", req.vgId);
×
1434
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
×
1435
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
×
1436
    vmReleaseVnode(pMgmt, pVnode);
×
1437
    return -1;
×
1438
  }
1439

1440
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
×
1441

1442
  vmReleaseVnode(pMgmt, pVnode);
×
1443

1444
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process check-learner-catchup-request", req.vgId,
×
1445
        TMSG_INFO(pMsg->msgType));
1446

1447
  return 0;
×
1448
}
1449

1450
int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
64✔
1451
  SDisableVnodeWriteReq req = {0};
64✔
1452
  if (tDeserializeSDisableVnodeWriteReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
64!
1453
    terrno = TSDB_CODE_INVALID_MSG;
×
1454
    return -1;
×
1455
  }
1456

1457
  dInfo("vgId:%d, vnode write disable:%d", req.vgId, req.disable);
64!
1458

1459
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
64✔
1460
  if (pVnode == NULL) {
64!
1461
    dError("vgId:%d, failed to disable write since %s", req.vgId, terrstr());
×
1462
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1463
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1464
    return -1;
×
1465
  }
1466

1467
  pVnode->disable = req.disable;
64✔
1468
  vmReleaseVnode(pMgmt, pVnode);
64✔
1469
  return 0;
64✔
1470
}
1471

1472
int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
64✔
1473
  SAlterVnodeHashRangeReq req = {0};
64✔
1474
  if (tDeserializeSAlterVnodeHashRangeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
64!
1475
    terrno = TSDB_CODE_INVALID_MSG;
×
1476
    return -1;
×
1477
  }
1478

1479
  int32_t srcVgId = req.srcVgId;
64✔
1480
  int32_t dstVgId = req.dstVgId;
64✔
1481

1482
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, dstVgId);
64✔
1483
  if (pVnode != NULL) {
64!
1484
    dError("vgId:%d, vnode already exist", dstVgId);
×
1485
    vmReleaseVnode(pMgmt, pVnode);
×
1486
    terrno = TSDB_CODE_VND_ALREADY_EXIST;
×
1487
    return -1;
×
1488
  }
1489

1490
  dInfo("vgId:%d, start to alter vnode hashrange:[%u, %u], dstVgId:%d", req.srcVgId, req.hashBegin, req.hashEnd,
64!
1491
        req.dstVgId);
1492
  pVnode = vmAcquireVnode(pMgmt, srcVgId);
64✔
1493
  if (pVnode == NULL) {
64!
1494
    dError("vgId:%d, failed to alter hashrange since %s", srcVgId, terrstr());
×
1495
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1496
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1497
    return -1;
×
1498
  }
1499

1500
  SWrapperCfg wrapperCfg = {
64✔
1501
      .dropped = pVnode->dropped,
64✔
1502
      .vgId = dstVgId,
1503
      .vgVersion = pVnode->vgVersion,
64✔
1504
      .diskPrimary = pVnode->diskPrimary,
64✔
1505
  };
1506
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
64✔
1507

1508
  // prepare alter
1509
  pVnode->toVgId = dstVgId;
64✔
1510
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
64!
1511
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
1512
    return -1;
×
1513
  }
1514

1515
  dInfo("vgId:%d, close vnode", srcVgId);
64!
1516
  vmCloseVnode(pMgmt, pVnode, true, false);
64✔
1517

1518
  int32_t diskPrimary = wrapperCfg.diskPrimary;
64✔
1519
  char    srcPath[TSDB_FILENAME_LEN] = {0};
64✔
1520
  char    dstPath[TSDB_FILENAME_LEN] = {0};
64✔
1521
  snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
64✔
1522
  snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
64✔
1523

1524
  dInfo("vgId:%d, alter vnode hashrange at %s", srcVgId, srcPath);
64!
1525
  if (vnodeAlterHashRange(srcPath, dstPath, &req, diskPrimary, pMgmt->pTfs) < 0) {
64!
1526
    dError("vgId:%d, failed to alter vnode hashrange since %s", srcVgId, terrstr());
×
1527
    return -1;
×
1528
  }
1529

1530
  dInfo("vgId:%d, open vnode", dstVgId);
64!
1531
  SVnode *pImpl = vnodeOpen(dstPath, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
64✔
1532

1533
  if (pImpl == NULL) {
64!
1534
    dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr());
×
1535
    return -1;
×
1536
  }
1537

1538
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
64!
1539
    dError("vgId:%d, failed to open vnode mgmt since %s", dstVgId, terrstr());
×
1540
    return -1;
×
1541
  }
1542

1543
  if (vnodeStart(pImpl) != 0) {
64!
1544
    dError("vgId:%d, failed to start sync since %s", dstVgId, terrstr());
×
1545
    return -1;
×
1546
  }
1547

1548
  // complete alter
1549
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
64!
1550
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
1551
    return -1;
×
1552
  }
1553

1554
  dInfo("vgId:%d, vnode hashrange is altered", dstVgId);
64!
1555
  return 0;
64✔
1556
}
1557

1558
int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
1,195✔
1559
  SAlterVnodeReplicaReq alterReq = {0};
1,195✔
1560
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
1,195!
1561
    terrno = TSDB_CODE_INVALID_MSG;
×
1562
    return -1;
×
1563
  }
1564

1565
  if (alterReq.learnerReplica == 0) {
1,195✔
1566
    alterReq.learnerSelfIndex = -1;
866✔
1567
  }
1568

1569
  int32_t vgId = alterReq.vgId;
1,195✔
1570
  dInfo(
1,195!
1571
      "vgId:%d, vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
1572
      "learnerSelfIndex:%d strict:%d changeVersion:%d",
1573
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
1574
      alterReq.learnerSelfIndex, alterReq.strict, alterReq.changeVersion);
1575

1576
  for (int32_t i = 0; i < alterReq.replica; ++i) {
4,565✔
1577
    SReplica *pReplica = &alterReq.replicas[i];
3,370✔
1578
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
3,370!
1579
  }
1580
  for (int32_t i = 0; i < alterReq.learnerReplica; ++i) {
1,524✔
1581
    SReplica *pReplica = &alterReq.learnerReplicas[i];
329✔
1582
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
329!
1583
  }
1584

1585
  if (alterReq.replica <= 0 || (alterReq.selfIndex < 0 && alterReq.learnerSelfIndex < 0) ||
1,195!
1586
      alterReq.selfIndex >= alterReq.replica || alterReq.learnerSelfIndex >= alterReq.learnerReplica) {
1,195!
1587
    terrno = TSDB_CODE_INVALID_MSG;
×
1588
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
1589
    return -1;
×
1590
  }
1591

1592
  SReplica *pReplica = NULL;
1,195✔
1593
  if (alterReq.selfIndex != -1) {
1,195!
1594
    pReplica = &alterReq.replicas[alterReq.selfIndex];
1,195✔
1595
  } else {
1596
    pReplica = &alterReq.learnerReplicas[alterReq.learnerSelfIndex];
×
1597
  }
1598

1599
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
1,195!
1600
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
1,195!
1601
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1602
    dError("vgId:%d, dnodeId:%d ep:%s:%u in request, ep:%s:%u in lcoal, %s", vgId, pReplica->id, pReplica->fqdn,
×
1603
           pReplica->port, tsLocalFqdn, tsServerPort, tstrerror(terrno));
1604
    return -1;
×
1605
  }
1606

1607
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
1,195✔
1608
  if (pVnode == NULL) {
1,195!
1609
    dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
×
1610
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1611
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1612
    return -1;
×
1613
  }
1614

1615
  dInfo("vgId:%d, start to close vnode", vgId);
1,195!
1616
  SWrapperCfg wrapperCfg = {
1,195✔
1617
      .dropped = pVnode->dropped,
1,195✔
1618
      .vgId = pVnode->vgId,
1,195✔
1619
      .vgVersion = pVnode->vgVersion,
1,195✔
1620
      .diskPrimary = pVnode->diskPrimary,
1,195✔
1621
  };
1622
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
1,195✔
1623

1624
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
1,195✔
1625
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
1,195✔
1626

1627
  int32_t diskPrimary = wrapperCfg.diskPrimary;
1,195✔
1628
  char    path[TSDB_FILENAME_LEN] = {0};
1,195✔
1629
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
1,195✔
1630

1631
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
1,195!
1632
  if (vnodeAlterReplica(path, &alterReq, diskPrimary, pMgmt->pTfs) < 0) {
1,195!
1633
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
1634
    return -1;
×
1635
  }
1636

1637
  dInfo("vgId:%d, begin to open vnode", vgId);
1,195!
1638
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
1,195✔
1639
  if (pImpl == NULL) {
1,195!
1640
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
1641
    return -1;
×
1642
  }
1643

1644
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
1,195!
1645
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
1646
    return -1;
×
1647
  }
1648

1649
  if (vnodeStart(pImpl) != 0) {
1,195!
1650
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
1651
    return -1;
×
1652
  }
1653

1654
  dInfo(
1,195!
1655
      "vgId:%d, vnode management handle msgType:%s, end to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
1656
      "learnerSelfIndex:%d strict:%d",
1657
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
1658
      alterReq.learnerSelfIndex, alterReq.strict);
1659
  return 0;
1,195✔
1660
}
1661

1662
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
4,069✔
1663
  int32_t       code = 0;
4,069✔
1664
  SDropVnodeReq dropReq = {0};
4,069✔
1665
  if (tDeserializeSDropVnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
4,069!
1666
    terrno = TSDB_CODE_INVALID_MSG;
×
1667
    return terrno;
×
1668
  }
1669

1670
  int32_t vgId = dropReq.vgId;
4,069✔
1671
  dInfo("vgId:%d, start to drop vnode", vgId);
4,069!
1672

1673
  if (dropReq.dnodeId != pMgmt->pData->dnodeId) {
4,069!
1674
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1675
    dError("vgId:%d, dnodeId:%d, %s", dropReq.vgId, dropReq.dnodeId, tstrerror(terrno));
×
1676
    return terrno;
×
1677
  }
1678

1679
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, vgId, false);
4,069✔
1680
  if (pVnode == NULL) {
4,069!
1681
    dInfo("vgId:%d, failed to drop since %s", vgId, terrstr());
×
1682
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1683
    return terrno;
×
1684
  }
1685

1686
  pVnode->dropped = 1;
4,069✔
1687
  if ((code = vmWriteVnodeListToFile(pMgmt)) != 0) {
4,069!
1688
    pVnode->dropped = 0;
×
1689
    vmReleaseVnode(pMgmt, pVnode);
×
1690
    return code;
×
1691
  }
1692

1693
  vmCloseVnode(pMgmt, pVnode, false, false);
4,069✔
1694
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
4,069!
1695
    dError("vgId:%d, failed to write vnode list since %s", vgId, terrstr());
×
1696
  }
1697

1698
  dInfo("vgId:%d, is dropped", vgId);
4,069!
1699
  return 0;
4,069✔
1700
}
1701

1702
int32_t vmProcessArbHeartBeatReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
139✔
1703
  SVArbHeartBeatReq arbHbReq = {0};
139✔
1704
  SVArbHeartBeatRsp arbHbRsp = {0};
139✔
1705
  if (tDeserializeSVArbHeartBeatReq(pMsg->pCont, pMsg->contLen, &arbHbReq) != 0) {
139!
1706
    terrno = TSDB_CODE_INVALID_MSG;
×
1707
    return -1;
×
1708
  }
1709

1710
  if (arbHbReq.dnodeId != pMgmt->pData->dnodeId) {
139!
1711
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1712
    dError("dnodeId:%d, %s", arbHbReq.dnodeId, tstrerror(terrno));
×
1713
    goto _OVER;
×
1714
  }
1715

1716
  if (strlen(arbHbReq.arbToken) == 0) {
139!
1717
    terrno = TSDB_CODE_INVALID_MSG;
×
1718
    dError("dnodeId:%d arbToken is empty", arbHbReq.dnodeId);
×
1719
    goto _OVER;
×
1720
  }
1721

1722
  size_t size = taosArrayGetSize(arbHbReq.hbMembers);
139✔
1723

1724
  arbHbRsp.dnodeId = pMgmt->pData->dnodeId;
139✔
1725
  tstrncpy(arbHbRsp.arbToken, arbHbReq.arbToken, TSDB_ARB_TOKEN_SIZE);
139✔
1726
  arbHbRsp.hbMembers = taosArrayInit(size, sizeof(SVArbHbRspMember));
139✔
1727
  if (arbHbRsp.hbMembers == NULL) {
139!
1728
    goto _OVER;
×
1729
  }
1730

1731
  for (int32_t i = 0; i < size; i++) {
294✔
1732
    SVArbHbReqMember *pReqMember = taosArrayGet(arbHbReq.hbMembers, i);
155✔
1733
    SVnodeObj        *pVnode = vmAcquireVnode(pMgmt, pReqMember->vgId);
155✔
1734
    if (pVnode == NULL) {
155!
1735
      dError("dnodeId:%d vgId:%d not found failed to process arb hb req", arbHbReq.dnodeId, pReqMember->vgId);
×
1736
      continue;
×
1737
    }
1738

1739
    SVArbHbRspMember rspMember = {0};
155✔
1740
    rspMember.vgId = pReqMember->vgId;
155✔
1741
    rspMember.hbSeq = pReqMember->hbSeq;
155✔
1742
    if (vnodeGetArbToken(pVnode->pImpl, rspMember.memberToken) != 0) {
155!
1743
      dError("dnodeId:%d vgId:%d failed to get arb token", arbHbReq.dnodeId, pReqMember->vgId);
×
1744
      vmReleaseVnode(pMgmt, pVnode);
×
1745
      continue;
×
1746
    }
1747

1748
    if (vnodeUpdateArbTerm(pVnode->pImpl, arbHbReq.arbTerm) != 0) {
155!
1749
      dError("dnodeId:%d vgId:%d failed to update arb term", arbHbReq.dnodeId, pReqMember->vgId);
×
1750
      vmReleaseVnode(pMgmt, pVnode);
×
1751
      continue;
×
1752
    }
1753

1754
    if (taosArrayPush(arbHbRsp.hbMembers, &rspMember) == NULL) {
310!
1755
      dError("dnodeId:%d vgId:%d failed to push arb hb rsp member", arbHbReq.dnodeId, pReqMember->vgId);
×
1756
      vmReleaseVnode(pMgmt, pVnode);
×
1757
      goto _OVER;
×
1758
    }
1759

1760
    vmReleaseVnode(pMgmt, pVnode);
155✔
1761
  }
1762

1763
  SRpcMsg rspMsg = {.info = pMsg->info};
139✔
1764
  int32_t rspLen = tSerializeSVArbHeartBeatRsp(NULL, 0, &arbHbRsp);
139✔
1765
  if (rspLen < 0) {
139!
1766
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1767
    goto _OVER;
×
1768
  }
1769

1770
  void *pRsp = rpcMallocCont(rspLen);
139✔
1771
  if (pRsp == NULL) {
139!
1772
    terrno = terrno;
×
1773
    goto _OVER;
×
1774
  }
1775

1776
  if (tSerializeSVArbHeartBeatRsp(pRsp, rspLen, &arbHbRsp) <= 0) {
139!
1777
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1778
    rpcFreeCont(pRsp);
×
1779
    goto _OVER;
×
1780
  }
1781
  pMsg->info.rsp = pRsp;
139✔
1782
  pMsg->info.rspLen = rspLen;
139✔
1783

1784
  terrno = TSDB_CODE_SUCCESS;
139✔
1785

1786
_OVER:
139✔
1787
  tFreeSVArbHeartBeatReq(&arbHbReq);
139✔
1788
  tFreeSVArbHeartBeatRsp(&arbHbRsp);
139✔
1789
  return terrno;
139✔
1790
}
1791

1792
SArray *vmGetMsgHandles() {
2,382✔
1793
  int32_t code = -1;
2,382✔
1794
  SArray *pArray = taosArrayInit(32, sizeof(SMgmtHandle));
2,382✔
1795
  if (pArray == NULL) goto _OVER;
2,382!
1796

1797
  if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,382!
1798
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,382!
1799
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,382!
1800
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,382!
1801
  if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,382!
1802
  if (dmSetMgmtHandle(pArray, TDMT_VND_EXEC_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,382!
1803
  if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,382!
1804
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,382!
1805
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,382!
1806
  if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,382!
1807
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,382!
1808
  if (dmSetMgmtHandle(pArray, TDMT_VND_VSUBTABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,382!
1809
  if (dmSetMgmtHandle(pArray, TDMT_VND_VSTB_REF_DBS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,382!
1810
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_CFG, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,382!
1811
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,382!
1812
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,382!
1813
  if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,382!
1814
  if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,382!
1815
  if (dmSetMgmtHandle(pArray, TDMT_SCH_TASK_NOTIFY, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,382!
1816
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,382!
1817
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,382!
1818
  if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_TTL_EXPIRED_TBS, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,382!
1819
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,382!
1820
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,382!
1821
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,382!
1822
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,382!
1823
  if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,382!
1824
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,382!
1825
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DELETE_SUB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,382!
1826
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,382!
1827
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SEEK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,382!
1828
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_ADD_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,382!
1829
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DEL_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,382!
1830
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,382!
1831
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME_PUSH, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,382!
1832
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_WALINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,382!
1833
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_COMMITTEDINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,382!
1834
  if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,382!
1835
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,382!
1836
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,382!
1837
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,382!
1838
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,382!
1839
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,382!
1840
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_COMPACT_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,382!
1841
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,382!
1842
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_NAME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,382!
1843

1844
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,382!
1845
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,382!
1846
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,382!
1847
  if (dmSetMgmtHandle(pArray, TDMT_VND_DISABLE_WRITE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,382!
1848
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,382!
1849
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,382!
1850
  if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,382!
1851

1852
  if (dmSetMgmtHandle(pArray, TDMT_VND_LIST_SSMIGRATE_FILESETS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,382!
1853
  if (dmSetMgmtHandle(pArray, TDMT_VND_SSMIGRATE_FILESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,382!
1854
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_SSMIGRATE_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,382!
1855
  if (dmSetMgmtHandle(pArray, TDMT_VND_FOLLOWER_SSMIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,382!
1856
  // if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_SSMIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1857

1858
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
2,382!
1859
  if (dmSetMgmtHandle(pArray, TDMT_DND_MOUNT_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
2,382!
1860
  if (dmSetMgmtHandle(pArray, TDMT_DND_RETRIEVE_MOUNT_PATH, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
2,382!
1861
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,382!
1862
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,382!
1863
  if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,382!
1864
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CONFIG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,382!
1865

1866
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT_ELECTION, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,382!
1867
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,382!
1868
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,382!
1869
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,382!
1870
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,382!
1871
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,382!
1872
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,382!
1873
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,382!
1874
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,382!
1875
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,382!
1876
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,382!
1877
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_FORCE_FOLLOWER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,382!
1878

1879
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
2,382!
1880
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
2,382!
1881
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
2,382!
1882
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
2,382!
1883
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
2,382!
1884

1885
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_HEARTBEAT, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,382!
1886
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_CHECK_SYNC, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,382!
1887
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_ASSIGNED_LEADER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,382!
1888

1889
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_FETCH, vmPutMsgToStreamReaderQueue, 0) == NULL) goto _OVER;
2,382!
1890
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_PULL, vmPutMsgToStreamReaderQueue, 0) == NULL) goto _OVER;
2,382!
1891
  code = 0;
2,382✔
1892

1893
_OVER:
2,382✔
1894
  if (code != 0) {
2,382!
1895
    taosArrayDestroy(pArray);
×
1896
    return NULL;
×
1897
  } else {
1898
    return pArray;
2,382✔
1899
  }
1900
}
1901

1902
void vmUpdateMetricsInfo(SVnodeMgmt *pMgmt, int64_t clusterId) {
×
1903
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
1904

1905
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
1906
  while (pIter) {
×
1907
    SVnodeObj **ppVnode = pIter;
×
1908
    if (ppVnode == NULL || *ppVnode == NULL) {
×
1909
      continue;
×
1910
    }
1911

1912
    SVnodeObj *pVnode = *ppVnode;
×
1913
    if (!pVnode->failed) {
×
1914
      SRawWriteMetrics metrics = {0};
×
1915
      if (vnodeGetRawWriteMetrics(pVnode->pImpl, &metrics) == 0) {
×
1916
        // Add the metrics to the global metrics system with cluster ID
1917
        SName   name = {0};
×
1918
        int32_t code = tNameFromString(&name, pVnode->pImpl->config.dbname, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1919
        if (code < 0) {
×
1920
          dError("failed to get db name since %s", tstrerror(code));
×
1921
          continue;
×
1922
        }
1923
        code = addWriteMetrics(pVnode->vgId, pMgmt->pData->dnodeId, clusterId, tsLocalEp, name.dbname, &metrics);
×
1924
        if (code != TSDB_CODE_SUCCESS) {
×
1925
          dError("Failed to add write metrics for vgId: %d, code: %d", pVnode->vgId, code);
×
1926
        } else {
1927
          // After successfully adding metrics, reset the vnode's write metrics using atomic operations
1928
          if (vnodeResetRawWriteMetrics(pVnode->pImpl, &metrics) != 0) {
×
1929
            dError("Failed to reset write metrics for vgId: %d", pVnode->vgId);
×
1930
          }
1931
        }
1932
      } else {
1933
        dError("Failed to get write metrics for vgId: %d", pVnode->vgId);
×
1934
      }
1935
    }
1936
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
1937
  }
1938

1939
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
1940
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc