• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4712

06 Sep 2025 04:27PM UTC coverage: 58.144% (-1.0%) from 59.134%
#4712

push

travis-ci

GitHub
test: update case description (#32878)

133123 of 291691 branches covered (45.64%)

Branch coverage included in aggregate %.

201244 of 283375 relevant lines covered (71.02%)

5637899.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.47
/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "metrics.h"
18
#include "taos_monitor.h"
19
#include "vmInt.h"
20
#include "vnd.h"
21
#include "vnodeInt.h"
22

23
extern taos_counter_t *tsInsertCounter;
24

25
// Forward declaration for function defined in metrics.c
26
extern int32_t addWriteMetrics(int32_t vgId, int32_t dnodeId, int64_t clusterId, const char *dnodeEp,
27
                               const char *dbname, const SRawWriteMetrics *pRawMetrics);
28

29
void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
98,340✔
30
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad));
98,340✔
31
  if (pInfo->pVloads == NULL) return;
98,340!
32

33
  tfsUpdateSize(pMgmt->pTfs);
98,340✔
34

35
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
98,340✔
36

37
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
98,340✔
38
  while (pIter) {
364,254✔
39
    SVnodeObj **ppVnode = pIter;
265,914✔
40
    if (ppVnode == NULL || *ppVnode == NULL) continue;
265,914!
41

42
    SVnodeObj *pVnode = *ppVnode;
265,914✔
43
    SVnodeLoad vload = {.vgId = pVnode->vgId};
265,914✔
44
    if (!pVnode->failed) {
265,914!
45
      if (vnodeGetLoad(pVnode->pImpl, &vload) != 0) {
265,914!
46
        dError("failed to get vnode load");
×
47
      }
48
      if (isReset) vnodeResetLoad(pVnode->pImpl, &vload);
265,914✔
49
    }
50
    if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
531,828!
51
      dError("failed to push vnode load");
×
52
    }
53
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
265,914✔
54
  }
55

56
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
98,340✔
57
}
58

59
void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
×
60
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoadLite));
×
61
  if (!pInfo->pVloads) return;
×
62

63
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
64

65
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
66
  while (pIter) {
×
67
    SVnodeObj **ppVnode = pIter;
×
68
    if (ppVnode == NULL || *ppVnode == NULL) continue;
×
69

70
    SVnodeObj *pVnode = *ppVnode;
×
71
    if (!pVnode->failed) {
×
72
      SVnodeLoadLite vload = {0};
×
73
      if (vnodeGetLoadLite(pVnode->pImpl, &vload) == 0) {
×
74
        if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
×
75
          taosArrayDestroy(pInfo->pVloads);
×
76
          pInfo->pVloads = NULL;
×
77
          break;
×
78
        }
79
      }
80
    }
81
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
82
  }
83

84
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
85
}
86

87
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
1✔
88
  SMonVloadInfo vloads = {0};
1✔
89
  vmGetVnodeLoads(pMgmt, &vloads, true);
1✔
90

91
  SArray *pVloads = vloads.pVloads;
1✔
92
  if (pVloads == NULL) return;
1!
93

94
  int32_t totalVnodes = 0;
1✔
95
  int32_t masterNum = 0;
1✔
96
  int64_t numOfSelectReqs = 0;
1✔
97
  int64_t numOfInsertReqs = 0;
1✔
98
  int64_t numOfInsertSuccessReqs = 0;
1✔
99
  int64_t numOfBatchInsertReqs = 0;
1✔
100
  int64_t numOfBatchInsertSuccessReqs = 0;
1✔
101

102
  for (int32_t i = 0; i < taosArrayGetSize(pVloads); ++i) {
7✔
103
    SVnodeLoad *pLoad = taosArrayGet(pVloads, i);
6✔
104
    numOfSelectReqs += pLoad->numOfSelectReqs;
6✔
105
    numOfInsertReqs += pLoad->numOfInsertReqs;
6✔
106
    numOfInsertSuccessReqs += pLoad->numOfInsertSuccessReqs;
6✔
107
    numOfBatchInsertReqs += pLoad->numOfBatchInsertReqs;
6✔
108
    numOfBatchInsertSuccessReqs += pLoad->numOfBatchInsertSuccessReqs;
6✔
109
    if (pLoad->syncState == TAOS_SYNC_STATE_LEADER || pLoad->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
6!
110
      masterNum++;
6✔
111
    }
112
    totalVnodes++;
6✔
113
  }
114

115
  pInfo->vstat.totalVnodes = totalVnodes;
1✔
116
  pInfo->vstat.masterNum = masterNum;
1✔
117
  pInfo->vstat.numOfSelectReqs = numOfSelectReqs;
1✔
118
  pInfo->vstat.numOfInsertReqs = numOfInsertReqs;                          // delta
1✔
119
  pInfo->vstat.numOfInsertSuccessReqs = numOfInsertSuccessReqs;            // delta
1✔
120
  pInfo->vstat.numOfBatchInsertReqs = numOfBatchInsertReqs;                // delta
1✔
121
  pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;  // delta
1✔
122
  pMgmt->state.totalVnodes = totalVnodes;
1✔
123
  pMgmt->state.masterNum = masterNum;
1✔
124
  pMgmt->state.numOfSelectReqs = numOfSelectReqs;
1✔
125
  pMgmt->state.numOfInsertReqs = numOfInsertReqs;
1✔
126
  pMgmt->state.numOfInsertSuccessReqs = numOfInsertSuccessReqs;
1✔
127
  pMgmt->state.numOfBatchInsertReqs = numOfBatchInsertReqs;
1✔
128
  pMgmt->state.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;
1✔
129

130
  if (tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs) != 0) {
1!
131
    dError("failed to get tfs monitor info");
×
132
  }
133
  taosArrayDestroy(pVloads);
1✔
134
}
135

136
void vmCleanExpriedSamples(SVnodeMgmt *pMgmt) {
1✔
137
  int list_size = taos_counter_get_keys_size(tsInsertCounter);
1✔
138
  if (list_size == 0) return;
1!
139
  int32_t *vgroup_ids;
140
  char   **keys;
141
  int      r = 0;
×
142
  r = taos_counter_get_vgroup_ids(tsInsertCounter, &keys, &vgroup_ids, &list_size);
×
143
  if (r) {
×
144
    dError("failed to get vgroup ids");
×
145
    return;
×
146
  }
147
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
148
  for (int i = 0; i < list_size; i++) {
×
149
    int32_t vgroup_id = vgroup_ids[i];
×
150
    void   *vnode = taosHashGet(pMgmt->runngingHash, &vgroup_id, sizeof(int32_t));
×
151
    if (vnode == NULL) {
×
152
      r = taos_counter_delete(tsInsertCounter, keys[i]);
×
153
      if (r) {
×
154
        dError("failed to delete monitor sample key:%s", keys[i]);
×
155
      }
156
    }
157
  }
158
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
159
  if (vgroup_ids) taosMemoryFree(vgroup_ids);
×
160
  if (keys) taosMemoryFree(keys);
×
161
  return;
×
162
}
163

164
void vmCleanExpiredMetrics(SVnodeMgmt *pMgmt) {
×
165
  if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0 || !tsEnableMetrics) {
×
166
    return;
×
167
  }
168

169
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
170
  void     *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
171
  SHashObj *pValidVgroups = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
×
172
  if (pValidVgroups == NULL) {
×
173
    (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
174
    return;
×
175
  }
176

177
  while (pIter != NULL) {
×
178
    SVnodeObj **ppVnode = pIter;
×
179
    if (ppVnode && *ppVnode) {
×
180
      int32_t vgId = (*ppVnode)->vgId;
×
181
      char    dummy = 1;  // hash table value (we only care about the key)
×
182
      if (taosHashPut(pValidVgroups, &vgId, sizeof(int32_t), &dummy, sizeof(char)) != 0) {
×
183
        dError("failed to put vgId:%d to valid vgroups hash", vgId);
×
184
      }
185
    }
186
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
187
  }
188
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
189

190
  // Clean expired metrics by removing metrics for non-existent vgroups
191
  int32_t code = cleanupExpiredMetrics(pValidVgroups);
×
192
  if (code != TSDB_CODE_SUCCESS) {
×
193
    dError("failed to clean expired metrics, code:%d", code);
×
194
  }
195

196
  taosHashCleanup(pValidVgroups);
×
197
}
198

199
static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
9,922✔
200
  memcpy(pCfg, &vnodeCfgDefault, sizeof(SVnodeCfg));
9,922✔
201

202
  pCfg->vgId = pCreate->vgId;
9,922✔
203
  tstrncpy(pCfg->dbname, pCreate->db, sizeof(pCfg->dbname));
9,922✔
204
  pCfg->dbId = pCreate->dbUid;
9,922✔
205
  pCfg->szPage = pCreate->pageSize * 1024;
9,922✔
206
  pCfg->szCache = pCreate->pages;
9,922✔
207
  pCfg->cacheLast = pCreate->cacheLast;
9,922✔
208
  pCfg->cacheLastSize = pCreate->cacheLastSize;
9,922✔
209
  pCfg->szBuf = (uint64_t)pCreate->buffer * 1024 * 1024;
9,922✔
210
  pCfg->isWeak = true;
9,922✔
211
  pCfg->isTsma = pCreate->isTsma;
9,922✔
212
  pCfg->tsdbCfg.compression = pCreate->compression;
9,922✔
213
  pCfg->tsdbCfg.precision = pCreate->precision;
9,922✔
214
  pCfg->tsdbCfg.days = pCreate->daysPerFile;
9,922✔
215
  pCfg->tsdbCfg.keep0 = pCreate->daysToKeep0;
9,922✔
216
  pCfg->tsdbCfg.keep1 = pCreate->daysToKeep1;
9,922✔
217
  pCfg->tsdbCfg.keep2 = pCreate->daysToKeep2;
9,922✔
218
  pCfg->tsdbCfg.keepTimeOffset = pCreate->keepTimeOffset;
9,922✔
219
  pCfg->tsdbCfg.minRows = pCreate->minRows;
9,922✔
220
  pCfg->tsdbCfg.maxRows = pCreate->maxRows;
9,922✔
221
  for (size_t i = 0; i < taosArrayGetSize(pCreate->pRetensions); ++i) {
9,922!
222
    SRetention *pRetention = &pCfg->tsdbCfg.retentions[i];
×
223
    memcpy(pRetention, taosArrayGet(pCreate->pRetensions, i), sizeof(SRetention));
×
224
    if (i == 0) {
×
225
      if ((pRetention->freq >= 0 && pRetention->keep > 0)) pCfg->isRsma = 1;
×
226
    }
227
  }
228
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
229
  pCfg->tsdbCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
9,921✔
230
  if (pCfg->tsdbCfg.encryptAlgorithm == DND_CA_SM4) {
9,921✔
231
    tstrncpy(pCfg->tsdbCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
3✔
232
  }
233
#else
234
  pCfg->tsdbCfg.encryptAlgorithm = 0;
235
#endif
236

237
  pCfg->walCfg.vgId = pCreate->vgId;  // pCreate->mountVgId ? pCreate->mountVgId : pCreate->vgId;
9,921✔
238
  pCfg->walCfg.fsyncPeriod = pCreate->walFsyncPeriod;
9,921✔
239
  pCfg->walCfg.retentionPeriod = pCreate->walRetentionPeriod;
9,921✔
240
  pCfg->walCfg.rollPeriod = pCreate->walRollPeriod;
9,921✔
241
  pCfg->walCfg.retentionSize = pCreate->walRetentionSize;
9,921✔
242
  pCfg->walCfg.segSize = pCreate->walSegmentSize;
9,921✔
243
  pCfg->walCfg.level = pCreate->walLevel;
9,921✔
244
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
245
  pCfg->walCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
9,921✔
246
  if (pCfg->walCfg.encryptAlgorithm == DND_CA_SM4) {
9,921✔
247
    tstrncpy(pCfg->walCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
3✔
248
  }
249
#else
250
  pCfg->walCfg.encryptAlgorithm = 0;
251
#endif
252

253
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
254
  pCfg->tdbEncryptAlgorithm = pCreate->encryptAlgorithm;
9,921✔
255
  if (pCfg->tdbEncryptAlgorithm == DND_CA_SM4) {
9,921✔
256
    tstrncpy(pCfg->tdbEncryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
3✔
257
  }
258
#else
259
  pCfg->tdbEncryptAlgorithm = 0;
260
#endif
261

262
  pCfg->sttTrigger = pCreate->sstTrigger;
9,921✔
263
  pCfg->hashBegin = pCreate->hashBegin;
9,921✔
264
  pCfg->hashEnd = pCreate->hashEnd;
9,921✔
265
  pCfg->hashMethod = pCreate->hashMethod;
9,921✔
266
  pCfg->hashPrefix = pCreate->hashPrefix;
9,921✔
267
  pCfg->hashSuffix = pCreate->hashSuffix;
9,921✔
268
  pCfg->tsdbPageSize = pCreate->tsdbPageSize * 1024;
9,921✔
269

270
  pCfg->ssChunkSize = pCreate->ssChunkSize;
9,921✔
271
  pCfg->ssKeepLocal = pCreate->ssKeepLocal;
9,921✔
272
  pCfg->ssCompact = pCreate->ssCompact;
9,921✔
273

274
  pCfg->standby = 0;
9,921✔
275
  pCfg->syncCfg.replicaNum = 0;
9,921✔
276
  pCfg->syncCfg.totalReplicaNum = 0;
9,921✔
277
  pCfg->syncCfg.changeVersion = pCreate->changeVersion;
9,921✔
278

279
  memset(&pCfg->syncCfg.nodeInfo, 0, sizeof(pCfg->syncCfg.nodeInfo));
9,921✔
280
  for (int32_t i = 0; i < pCreate->replica; ++i) {
22,877✔
281
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
12,956✔
282
    pNode->nodeId = pCreate->replicas[pCfg->syncCfg.replicaNum].id;
12,956✔
283
    pNode->nodePort = pCreate->replicas[pCfg->syncCfg.replicaNum].port;
12,956✔
284
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
12,956✔
285
    tstrncpy(pNode->nodeFqdn, pCreate->replicas[pCfg->syncCfg.replicaNum].fqdn, TSDB_FQDN_LEN);
12,956✔
286
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
12,956✔
287
    pCfg->syncCfg.replicaNum++;
12,956✔
288
  }
289
  if (pCreate->selfIndex != -1) {
9,921✔
290
    pCfg->syncCfg.myIndex = pCreate->selfIndex;
9,730✔
291
  }
292
  for (int32_t i = pCfg->syncCfg.replicaNum; i < pCreate->replica + pCreate->learnerReplica; ++i) {
10,113✔
293
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
192✔
294
    pNode->nodeId = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].id;
192✔
295
    pNode->nodePort = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].port;
192✔
296
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
192✔
297
    tstrncpy(pNode->nodeFqdn, pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].fqdn, TSDB_FQDN_LEN);
192✔
298
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
192✔
299
    pCfg->syncCfg.totalReplicaNum++;
192✔
300
  }
301
  pCfg->syncCfg.totalReplicaNum += pCfg->syncCfg.replicaNum;
9,921✔
302
  if (pCreate->learnerSelfIndex != -1) {
9,921✔
303
    pCfg->syncCfg.myIndex = pCreate->replica + pCreate->learnerSelfIndex;
192✔
304
  }
305
}
9,921✔
306

307
static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) {
9,922✔
308
  pCfg->vgId = pCreate->vgId;
9,922✔
309
  pCfg->vgVersion = pCreate->vgVersion;
9,922✔
310
  pCfg->dropped = 0;
9,922✔
311
  snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCreate->vgId);
9,922✔
312
}
9,922✔
313

314
static int32_t vmTsmaAdjustDays(SVnodeCfg *pCfg, SCreateVnodeReq *pReq) {
9,914✔
315
  if (pReq->isTsma) {
9,914!
316
    SMsgHead *smaMsg = pReq->pTsma;
×
317
    uint32_t  contLen = (uint32_t)(htonl(smaMsg->contLen) - sizeof(SMsgHead));
×
318
    return smaGetTSmaDays(pCfg, POINTER_SHIFT(smaMsg, sizeof(SMsgHead)), contLen, &pCfg->tsdbCfg.days);
×
319
  }
320
  return 0;
9,914✔
321
}
322

323
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
9,912✔
324
  SCreateVnodeReq req = {0};
9,912✔
325
  SVnodeCfg       vnodeCfg = {0};
9,912✔
326
  SWrapperCfg     wrapperCfg = {0};
9,912✔
327
  int32_t         code = -1;
9,912✔
328
  char            path[TSDB_FILENAME_LEN] = {0};
9,912✔
329

330
  if (tDeserializeSCreateVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
9,912!
331
    return TSDB_CODE_INVALID_MSG;
×
332
  }
333

334
  if (req.learnerReplica == 0) {
9,895✔
335
    req.learnerSelfIndex = -1;
9,707✔
336
  }
337

338
  dInfo(
9,895!
339
      "vgId:%d, vnode management handle msgType:%s, start to create vnode, page:%d pageSize:%d buffer:%d szPage:%d "
340
      "szBuf:%" PRIu64 ", cacheLast:%d cacheLastSize:%d sstTrigger:%d tsdbPageSize:%d %d dbname:%s dbId:%" PRId64
341
      ", days:%d keep0:%d keep1:%d keep2:%d keepTimeOffset%d ssChunkSize:%d ssKeepLocal:%d ssCompact:%d tsma:%d "
342
      "precision:%d compression:%d minRows:%d maxRows:%d"
343
      ", wal fsync:%d level:%d retentionPeriod:%d retentionSize:%" PRId64 " rollPeriod:%d segSize:%" PRId64
344
      ", hash method:%d begin:%u end:%u prefix:%d surfix:%d replica:%d selfIndex:%d "
345
      "learnerReplica:%d learnerSelfIndex:%d strict:%d changeVersion:%d encryptAlgorithm:%d",
346
      req.vgId, TMSG_INFO(pMsg->msgType), req.pages, req.pageSize, req.buffer, req.pageSize * 1024,
347
      (uint64_t)req.buffer * 1024 * 1024, req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize,
348
      req.tsdbPageSize * 1024, req.db, req.dbUid, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
349
      req.keepTimeOffset, req.ssChunkSize, req.ssKeepLocal, req.ssCompact, req.isTsma, req.precision, req.compression,
350
      req.minRows, req.maxRows, req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize,
351
      req.walRollPeriod, req.walSegmentSize, req.hashMethod, req.hashBegin, req.hashEnd, req.hashPrefix, req.hashSuffix,
352
      req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict, req.changeVersion,
353
      req.encryptAlgorithm);
354

355
  for (int32_t i = 0; i < req.replica; ++i) {
22,862✔
356
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port,
12,948!
357
          req.replicas[i].id);
358
  }
359
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
10,106✔
360
    dInfo("vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.vgId, i, req.learnerReplicas[i].fqdn,
192!
361
          req.learnerReplicas[i].port, req.replicas[i].id);
362
  }
363

364
  SReplica *pReplica = NULL;
9,914✔
365
  if (req.selfIndex != -1) {
9,914✔
366
    pReplica = &req.replicas[req.selfIndex];
9,722✔
367
  } else {
368
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
192✔
369
  }
370
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
9,914!
371
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
9,914✔
372
    (void)tFreeSCreateVnodeReq(&req);
1✔
373

374
    code = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
375
    dError("vgId:%d, dnodeId:%d ep:%s:%u in request, ep:%s:%u in local, %s", req.vgId, pReplica->id,
×
376
           pReplica->fqdn, pReplica->port, tsLocalFqdn, tsServerPort, tstrerror(code));
377
    return code;
×
378
  }
379

380
  if (req.encryptAlgorithm == DND_CA_SM4) {
9,913✔
381
    if (strlen(tsEncryptKey) == 0) {
3!
382
      (void)tFreeSCreateVnodeReq(&req);
×
383
      code = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
384
      dError("vgId:%d, failed to create vnode since encrypt key is empty, reason:%s", req.vgId, tstrerror(code));
×
385
      return code;
×
386
    }
387
  }
388

389
  vmGenerateVnodeCfg(&req, &vnodeCfg);
9,913✔
390

391
  if ((code = vmTsmaAdjustDays(&vnodeCfg, &req)) < 0) {
9,914!
392
    dError("vgId:%d, failed to adjust tsma days since %s", req.vgId, tstrerror(code));
×
393
    goto _OVER;
×
394
  }
395

396
  vmGenerateWrapperCfg(pMgmt, &req, &wrapperCfg);
9,914✔
397

398
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, req.vgId, false);
9,914✔
399
  if (pVnode != NULL && (req.replica == 1 || !pVnode->failed)) {
9,909!
400
    dError("vgId:%d, already exist", req.vgId);
90!
401
    (void)tFreeSCreateVnodeReq(&req);
90✔
402
    vmReleaseVnode(pMgmt, pVnode);
90✔
403
    code = TSDB_CODE_VND_ALREADY_EXIST;
90✔
404
    return 0;
90✔
405
  }
406

407
  int32_t diskPrimary = vmGetPrimaryDisk(pMgmt, vnodeCfg.vgId);
9,819✔
408
  if (diskPrimary < 0) {
9,813!
409
    diskPrimary = vmAllocPrimaryDisk(pMgmt, vnodeCfg.vgId);
9,814✔
410
  }
411
  wrapperCfg.diskPrimary = diskPrimary;
9,823✔
412

413
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
9,823✔
414

415
  if ((code = vnodeCreate(path, &vnodeCfg, diskPrimary, pMgmt->pTfs)) < 0) {
9,823!
416
    dError("vgId:%d, failed to create vnode since %s", req.vgId, tstrerror(code));
×
417
    vmReleaseVnode(pMgmt, pVnode);
×
418
    vmCleanPrimaryDisk(pMgmt, req.vgId);
×
419
    (void)tFreeSCreateVnodeReq(&req);
×
420
    return code;
×
421
  }
422

423
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, true);
9,824✔
424
  if (pImpl == NULL) {
9,824!
425
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
426
    code = terrno != 0 ? terrno : -1;
×
427
    goto _OVER;
×
428
  }
429

430
  code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl);
9,824✔
431
  if (code != 0) {
9,824!
432
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
433
    code = terrno != 0 ? terrno : code;
×
434
    goto _OVER;
×
435
  }
436

437
  code = vnodeStart(pImpl);
9,824✔
438
  if (code != 0) {
9,824!
439
    dError("vgId:%d, failed to start sync since %s", req.vgId, terrstr());
×
440
    goto _OVER;
×
441
  }
442

443
  code = vmWriteVnodeListToFile(pMgmt);
9,824✔
444
  if (code != 0) {
9,824!
445
    code = terrno != 0 ? terrno : code;
×
446
    goto _OVER;
×
447
  }
448

449
_OVER:
9,824✔
450
  vmCleanPrimaryDisk(pMgmt, req.vgId);
9,824✔
451

452
  if (code != 0) {
9,824!
453
    vmCloseFailedVnode(pMgmt, req.vgId);
×
454

455
    vnodeClose(pImpl);
×
456
    vnodeDestroy(0, path, pMgmt->pTfs, 0);
×
457
  } else {
458
    dInfo("vgId:%d, vnode management handle msgType:%s, end to create vnode, vnode is created", req.vgId,
9,824!
459
          TMSG_INFO(pMsg->msgType));
460
  }
461

462
  (void)tFreeSCreateVnodeReq(&req);
9,824✔
463
  terrno = code;
9,824✔
464
  return code;
9,824✔
465
}
466

467
#ifdef USE_MOUNT
468
typedef struct {
469
  int64_t dbId;
470
  int32_t vgId;
471
  int32_t diskPrimary;
472
} SMountDbVgId;
473
extern int32_t vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo);
474
extern int32_t mndFetchSdbStables(const char *mntName, const char *path, void *output);
475

476
static int compareVnodeInfo(const void *p1, const void *p2) {
21✔
477
  SVnodeInfo *v1 = (SVnodeInfo *)p1;
21✔
478
  SVnodeInfo *v2 = (SVnodeInfo *)p2;
21✔
479

480
  if (v1->config.dbId == v2->config.dbId) {
21✔
481
    if (v1->config.vgId == v2->config.vgId) {
12!
482
      return 0;
×
483
    }
484
    return v1->config.vgId > v2->config.vgId ? 1 : -1;
12!
485
  }
486

487
  return v1->config.dbId > v2->config.dbId ? 1 : -1;
9!
488
}
489
static int compareVgDiskPrimary(const void *p1, const void *p2) {
21✔
490
  SMountDbVgId *v1 = (SMountDbVgId *)p1;
21✔
491
  SMountDbVgId *v2 = (SMountDbVgId *)p2;
21✔
492

493
  if (v1->dbId == v2->dbId) {
21✔
494
    if (v1->vgId == v2->vgId) {
12!
495
      return 0;
×
496
    }
497
    return v1->vgId > v2->vgId ? 1 : -1;
12!
498
  }
499

500
  return v1->dbId > v2->dbId ? 1 : -1;
9!
501
}
502

503
static int32_t vmRetrieveMountDnode(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
3✔
504
  int32_t   code = 0, lino = 0;
3✔
505
  TdFilePtr pFile = NULL;
3✔
506
  char     *content = NULL;
3✔
507
  SJson    *pJson = NULL;
3✔
508
  int64_t   size = 0;
3✔
509
  int64_t   clusterId = 0, dropped = 0, encryptScope = 0;
3✔
510
  char      file[TSDB_MOUNT_FPATH_LEN] = {0};
3✔
511
  SArray   *pDisks = NULL;
3✔
512
  // step 1: fetch clusterId from dnode.json
513
  (void)snprintf(file, sizeof(file), "%s%s%s%sdnode.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP);
3✔
514
  TAOS_CHECK_EXIT(taosStatFile(file, &size, NULL, NULL));
3!
515
  TSDB_CHECK_NULL((pFile = taosOpenFile(file, TD_FILE_READ)), code, lino, _exit, terrno);
3!
516
  TSDB_CHECK_NULL((content = taosMemoryMalloc(size + 1)), code, lino, _exit, terrno);
3!
517
  if (taosReadFile(pFile, content, size) != size) {
3!
518
    TAOS_CHECK_EXIT(terrno);
×
519
  }
520
  content[size] = '\0';
3✔
521
  pJson = tjsonParse(content);
3✔
522
  if (pJson == NULL) {
3!
523
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
524
  }
525
  tjsonGetNumberValue(pJson, "dropped", dropped, code);
3✔
526
  TAOS_CHECK_EXIT(code);
3!
527
  if (dropped == 1) {
3!
528
    TAOS_CHECK_EXIT(TSDB_CODE_MND_MOUNT_DNODE_DROPPED);
×
529
  }
530
  tjsonGetNumberValue(pJson, "encryptScope", encryptScope, code);
3✔
531
  TAOS_CHECK_EXIT(code);
3!
532
  if (encryptScope != 0) {
3!
533
    TAOS_CHECK_EXIT(TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG);
×
534
  }
535
  tjsonGetNumberValue(pJson, "clusterId", clusterId, code);
3✔
536
  TAOS_CHECK_EXIT(code);
3!
537
  if (clusterId == 0) {
3!
538
    TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_CLUSTER_ID);
×
539
  }
540
  pMountInfo->clusterId = clusterId;
3✔
541
  if (content != NULL) taosMemoryFreeClear(content);
3!
542
  if (pJson != NULL) {
3!
543
    cJSON_Delete(pJson);
3✔
544
    pJson = NULL;
3✔
545
  }
546
  if (pFile != NULL) taosCloseFile(&pFile);
3!
547
  // step 2: fetch dataDir from dnode/config/local.json
548
  TAOS_CHECK_EXIT(vmGetMountDisks(pMgmt, pReq->mountPath, &pDisks));
3!
549
  int32_t nDisks = taosArrayGetSize(pDisks);
3✔
550
  if (nDisks < 1 || nDisks > TFS_MAX_DISKS) {
3!
551
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
552
  }
553
  for (int32_t i = 0; i < nDisks; ++i) {
21✔
554
    SDiskCfg *pDisk = TARRAY_GET_ELEM(pDisks, i);
18✔
555
    if (!pMountInfo->pDisks[pDisk->level]) {
18✔
556
      pMountInfo->pDisks[pDisk->level] = taosArrayInit(1, sizeof(char *));
6✔
557
      if (!pMountInfo->pDisks[pDisk->level]) {
6!
558
        TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
559
      }
560
    }
561
    char *pDir = taosStrdup(pDisk->dir);
18!
562
    if (pDir == NULL) {
18!
563
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
564
    }
565
    if (pDisk->primary == 1 && taosArrayGetSize(pMountInfo->pDisks[0])) {
18!
566
      // put the primary disk to the first position of level 0
567
      if (!taosArrayInsert(pMountInfo->pDisks[0], 0, &pDir)) {
3!
568
        taosMemFree(pDir);
×
569
        TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
570
      }
571
    } else if (!taosArrayPush(pMountInfo->pDisks[pDisk->level], &pDir)) {
30!
572
      taosMemFree(pDir);
×
573
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
574
    }
575
  }
576
  for (int32_t i = 0; i < TFS_MAX_TIERS; ++i) {
12✔
577
    int32_t nDisk = taosArrayGetSize(pMountInfo->pDisks[i]);
9✔
578
    if (nDisk < (i == 0 ? 1 : 0) || nDisk > TFS_MAX_DISKS_PER_TIER) {
9!
579
      dError("mount:%s, invalid disk number:%d at level:%d", pReq->mountName, nDisk, i);
×
580
      TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
581
    }
582
  }
583
_exit:
3✔
584
  if (content != NULL) taosMemoryFreeClear(content);
3!
585
  if (pJson != NULL) cJSON_Delete(pJson);
3!
586
  if (pFile != NULL) taosCloseFile(&pFile);
3!
587
  if (pDisks != NULL) {
3!
588
    taosArrayDestroy(pDisks);
3✔
589
    pDisks = NULL;
3✔
590
  }
591
  if (code != 0) {
3!
592
    dError("mount:%s, failed to retrieve mount dnode at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
593
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
594
  } else {
595
    dInfo("mount:%s, success to retrieve mount dnode on dnode:%d, clusterId:%" PRId64 ", path:%s", pReq->mountName,
3!
596
          pReq->dnodeId, pMountInfo->clusterId, pReq->mountPath);
597
  }
598
  TAOS_RETURN(code);
3✔
599
}
600

601
static int32_t vmRetrieveMountVnodes(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
3✔
602
  int32_t       code = 0, lino = 0;
3✔
603
  SWrapperCfg  *pCfgs = NULL;
3✔
604
  int32_t       numOfVnodes = 0;
3✔
605
  char          path[TSDB_MOUNT_FPATH_LEN] = {0};
3✔
606
  TdDirPtr      pDir = NULL;
3✔
607
  TdDirEntryPtr de = NULL;
3✔
608
  SVnodeMgmt    vnodeMgmt = {0};
3✔
609
  SArray       *pVgCfgs = NULL;
3✔
610
  SArray       *pDbInfos = NULL;
3✔
611
  SArray       *pDiskPrimarys = NULL;
3✔
612

613
  snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
3✔
614
  vnodeMgmt.path = path;
3✔
615
  TAOS_CHECK_EXIT(vmGetVnodeListFromFile(&vnodeMgmt, &pCfgs, &numOfVnodes));
3!
616
  dInfo("mount:%s, num of vnodes is %d in path:%s", pReq->mountName, numOfVnodes, vnodeMgmt.path);
3!
617
  TSDB_CHECK_NULL((pVgCfgs = taosArrayInit_s(sizeof(SVnodeInfo), numOfVnodes)), code, lino, _exit, terrno);
3!
618
  TSDB_CHECK_NULL((pDiskPrimarys = taosArrayInit(numOfVnodes, sizeof(SMountDbVgId))), code, lino, _exit, terrno);
3!
619

620
  int32_t nDiskLevel0 = taosArrayGetSize(pMountInfo->pDisks[0]);
3✔
621
  int32_t nVgDropped = 0, j = 0;
3✔
622
  for (int32_t i = 0; i < numOfVnodes; ++i) {
15✔
623
    SWrapperCfg *pCfg = &pCfgs[i];
12✔
624
    // in order to support multi-tier disk, the pCfg->path should be adapted according to the diskPrimary firstly
625
    if (nDiskLevel0 > 1) {
12!
626
      char *pDir = taosArrayGet(pMountInfo->pDisks[0], pCfg->diskPrimary);
12✔
627
      if (!pDir) TAOS_CHECK_EXIT(TSDB_CODE_INTERNAL_ERROR);
12!
628
      (void)snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%svnode%d", *(char **)pDir, TD_DIRSEP, TD_DIRSEP,
12✔
629
                     pCfg->vgId);
630
    }
631
    dInfo("mount:%s, vnode path:%s, dropped:%" PRIi8, pReq->mountName, pCfg->path, pCfg->dropped);
12!
632
    if (pCfg->dropped) {
12!
633
      ++nVgDropped;
×
634
      continue;
×
635
    }
636
    if (!taosCheckAccessFile(pCfg->path, TD_FILE_ACCESS_EXIST_OK | TD_FILE_ACCESS_READ_OK | TD_FILE_ACCESS_WRITE_OK)) {
12!
637
      dError("mount:%s, vnode path:%s, no r/w authority", pReq->mountName, pCfg->path);
×
638
      TAOS_CHECK_EXIT(TSDB_CODE_MND_NO_RIGHTS);
×
639
    }
640
    SVnodeInfo *pInfo = TARRAY_GET_ELEM(pVgCfgs, j++);
12✔
641
    TAOS_CHECK_EXIT(vnodeLoadInfo(pCfg->path, pInfo));
12!
642
    if (pInfo->config.syncCfg.replicaNum > 1) {
12!
643
      dError("mount:%s, vnode path:%s, invalid replica:%d", pReq->mountName, pCfg->path,
×
644
             pInfo->config.syncCfg.replicaNum);
645
      TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_REPLICA);
×
646
    } else if (pInfo->config.vgId != pCfg->vgId) {
12!
647
      dError("mount:%s, vnode path:%s, vgId:%d not match:%d", pReq->mountName, pCfg->path, pInfo->config.vgId,
×
648
             pCfg->vgId);
649
      TAOS_CHECK_EXIT(TSDB_CODE_FILE_CORRUPTED);
×
650
    } else if (pInfo->config.tdbEncryptAlgorithm || pInfo->config.tsdbCfg.encryptAlgorithm ||
12!
651
               pInfo->config.walCfg.encryptAlgorithm) {
12!
652
      dError("mount:%s, vnode path:%s, invalid encrypt algorithm, tdb:%d wal:%d tsdb:%d", pReq->mountName, pCfg->path,
×
653
             pInfo->config.tdbEncryptAlgorithm, pInfo->config.walCfg.encryptAlgorithm,
654
             pInfo->config.tsdbCfg.encryptAlgorithm);
655
      TAOS_CHECK_EXIT(TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG);
×
656
    }
657
    SMountDbVgId dbVgId = {.dbId = pInfo->config.dbId, .vgId = pInfo->config.vgId, .diskPrimary = pCfg->diskPrimary};
12✔
658
    TSDB_CHECK_NULL(taosArrayPush(pDiskPrimarys, &dbVgId), code, lino, _exit, terrno);
12!
659
  }
660
  if (nVgDropped > 0) {
3!
661
    dInfo("mount:%s, %d vnodes are dropped", pReq->mountName, nVgDropped);
×
662
    int32_t nVgToDrop = taosArrayGetSize(pVgCfgs) - nVgDropped;
×
663
    if (nVgToDrop > 0) taosArrayRemoveBatch(pVgCfgs, nVgToDrop - 1, nVgToDrop, NULL);
×
664
  }
665
  int32_t nVgCfg = taosArrayGetSize(pVgCfgs);
3✔
666
  int32_t nDiskPrimary = taosArrayGetSize(pDiskPrimarys);
3✔
667
  if (nVgCfg != nDiskPrimary) {
3!
668
    dError("mount:%s, nVgCfg:%d not match nDiskPrimary:%d", pReq->mountName, nVgCfg, nDiskPrimary);
×
669
    TAOS_CHECK_EXIT(TSDB_CODE_APP_ERROR);
×
670
  }
671
  if (nVgCfg > 1) {
3!
672
    taosArraySort(pVgCfgs, compareVnodeInfo);
3✔
673
    taosArraySort(pDiskPrimarys, compareVgDiskPrimary);
3✔
674
  }
675

676
  int64_t clusterId = pMountInfo->clusterId;
3✔
677
  int64_t dbId = 0, vgId = 0, nDb = 0;
3✔
678
  for (int32_t i = 0; i < nVgCfg; ++i) {
11✔
679
    SVnodeInfo *pInfo = TARRAY_GET_ELEM(pVgCfgs, i);
9✔
680
    if (clusterId != pInfo->config.syncCfg.nodeInfo->clusterId) {
9✔
681
      dError("mount:%s, clusterId:%" PRId64 " not match:%" PRId64, pReq->mountName, clusterId,
1!
682
             pInfo->config.syncCfg.nodeInfo->clusterId);
683
      TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_CLUSTER_ID);
1!
684
    }
685
    if (dbId != pInfo->config.dbId) {
8✔
686
      dbId = pInfo->config.dbId;
4✔
687
      ++nDb;
4✔
688
    }
689
    if (vgId == pInfo->config.vgId) {
8!
690
      TAOS_CHECK_EXIT(TSDB_CODE_FILE_CORRUPTED);
×
691
    } else {
692
      vgId = pInfo->config.vgId;
8✔
693
    }
694
  }
695

696
  if (nDb > 0) {
2!
697
    TSDB_CHECK_NULL((pDbInfos = taosArrayInit_s(sizeof(SMountDbInfo), nDb)), code, lino, _exit, terrno);
2!
698
    int32_t dbIdx = -1;
2✔
699
    for (int32_t i = 0; i < nVgCfg; ++i) {
10✔
700
      SVnodeInfo   *pVgCfg = TARRAY_GET_ELEM(pVgCfgs, i);
8✔
701
      SMountDbVgId *pDiskPrimary = TARRAY_GET_ELEM(pDiskPrimarys, i);
8✔
702
      SMountDbInfo *pDbInfo = NULL;
8✔
703
      if (i == 0 || ((SMountDbInfo *)TARRAY_GET_ELEM(pDbInfos, dbIdx))->dbId != pVgCfg->config.dbId) {
8✔
704
        pDbInfo = TARRAY_GET_ELEM(pDbInfos, ++dbIdx);
4✔
705
        pDbInfo->dbId = pVgCfg->config.dbId;
4✔
706
        snprintf(pDbInfo->dbName, sizeof(pDbInfo->dbName), "%s", pVgCfg->config.dbname);
4✔
707
        TSDB_CHECK_NULL((pDbInfo->pVgs = taosArrayInit(nVgCfg / nDb, sizeof(SMountVgInfo))), code, lino, _exit, terrno);
4!
708
      } else {
709
        pDbInfo = TARRAY_GET_ELEM(pDbInfos, dbIdx);
4✔
710
      }
711
      SMountVgInfo vgInfo = {
8✔
712
          .diskPrimary = pDiskPrimary->diskPrimary,
8✔
713
          .vgId = pVgCfg->config.vgId,
8✔
714
          .dbId = pVgCfg->config.dbId,
8✔
715
          .cacheLastSize = pVgCfg->config.cacheLastSize,
8✔
716
          .szPage = pVgCfg->config.szPage,
8✔
717
          .szCache = pVgCfg->config.szCache,
8✔
718
          .szBuf = pVgCfg->config.szBuf,
8✔
719
          .cacheLast = pVgCfg->config.cacheLast,
8✔
720
          .standby = pVgCfg->config.standby,
8✔
721
          .hashMethod = pVgCfg->config.hashMethod,
8✔
722
          .hashBegin = pVgCfg->config.hashBegin,
8✔
723
          .hashEnd = pVgCfg->config.hashEnd,
8✔
724
          .hashPrefix = pVgCfg->config.hashPrefix,
8✔
725
          .hashSuffix = pVgCfg->config.hashSuffix,
8✔
726
          .sttTrigger = pVgCfg->config.sttTrigger,
8✔
727
          .replications = pVgCfg->config.syncCfg.replicaNum,
8✔
728
          .precision = pVgCfg->config.tsdbCfg.precision,
8✔
729
          .compression = pVgCfg->config.tsdbCfg.compression,
8✔
730
          .slLevel = pVgCfg->config.tsdbCfg.slLevel,
8✔
731
          .daysPerFile = pVgCfg->config.tsdbCfg.days,
8✔
732
          .keep0 = pVgCfg->config.tsdbCfg.keep0,
8✔
733
          .keep1 = pVgCfg->config.tsdbCfg.keep1,
8✔
734
          .keep2 = pVgCfg->config.tsdbCfg.keep2,
8✔
735
          .keepTimeOffset = pVgCfg->config.tsdbCfg.keepTimeOffset,
8✔
736
          .minRows = pVgCfg->config.tsdbCfg.minRows,
8✔
737
          .maxRows = pVgCfg->config.tsdbCfg.maxRows,
8✔
738
          .tsdbPageSize = pVgCfg->config.tsdbPageSize / 1024,
8✔
739
          .ssChunkSize = pVgCfg->config.ssChunkSize,
8✔
740
          .ssKeepLocal = pVgCfg->config.ssKeepLocal,
8✔
741
          .ssCompact = pVgCfg->config.ssCompact,
8✔
742
          .walFsyncPeriod = pVgCfg->config.walCfg.fsyncPeriod,
8✔
743
          .walRetentionPeriod = pVgCfg->config.walCfg.retentionPeriod,
8✔
744
          .walRollPeriod = pVgCfg->config.walCfg.rollPeriod,
8✔
745
          .walRetentionSize = pVgCfg->config.walCfg.retentionSize,
8✔
746
          .walSegSize = pVgCfg->config.walCfg.segSize,
8✔
747
          .walLevel = pVgCfg->config.walCfg.level,
8✔
748
          .encryptAlgorithm = pVgCfg->config.walCfg.encryptAlgorithm,
8✔
749
          .committed = pVgCfg->state.committed,
8✔
750
          .commitID = pVgCfg->state.commitID,
8✔
751
          .commitTerm = pVgCfg->state.commitTerm,
8✔
752
          .numOfSTables = pVgCfg->config.vndStats.numOfSTables,
8✔
753
          .numOfCTables = pVgCfg->config.vndStats.numOfCTables,
8✔
754
          .numOfNTables = pVgCfg->config.vndStats.numOfNTables,
8✔
755
      };
756
      TSDB_CHECK_NULL(taosArrayPush(pDbInfo->pVgs, &vgInfo), code, lino, _exit, terrno);
16!
757
    }
758
  }
759

760
  pMountInfo->pDbs = pDbInfos;
2✔
761

762
_exit:
3✔
763
  if (code != 0) {
3✔
764
    dError("mount:%s, failed to retrieve mount vnode at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
1!
765
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
766
  }
767
  taosArrayDestroy(pDiskPrimarys);
3✔
768
  taosArrayDestroy(pVgCfgs);
3✔
769
  taosMemoryFreeClear(pCfgs);
3!
770
  TAOS_RETURN(code);
3✔
771
}
772

773
/**
774
 *   Retrieve the stables from vnode meta.
775
 */
776
static int32_t vmRetrieveMountStbs(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
2✔
777
  int32_t code = 0, lino = 0;
2✔
778
  char    path[TSDB_MOUNT_FPATH_LEN] = {0};
2✔
779
  int32_t nDb = taosArrayGetSize(pMountInfo->pDbs);
2✔
780
  SArray *suidList = NULL;
2✔
781
  SArray *pCols = NULL;
2✔
782
  SArray *pTags = NULL;
2✔
783
  SArray *pColExts = NULL;
2✔
784
  SArray *pTagExts = NULL;
2✔
785

786
  snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
2✔
787
  for (int32_t i = 0; i < nDb; ++i) {
6✔
788
    SMountDbInfo *pDbInfo = TARRAY_GET_ELEM(pMountInfo->pDbs, i);
4✔
789
    int32_t       nVg = taosArrayGetSize(pDbInfo->pVgs);
4✔
790
    for (int32_t j = 0; j < nVg; ++j) {
4!
791
      SMountVgInfo *pVgInfo = TARRAY_GET_ELEM(pDbInfo->pVgs, j);
4✔
792
      SVnode        vnode = {
4✔
793
                 .config.vgId = pVgInfo->vgId,
4✔
794
                 .config.dbId = pVgInfo->dbId,
4✔
795
                 .config.cacheLastSize = pVgInfo->cacheLastSize,
4✔
796
                 .config.szPage = pVgInfo->szPage,
4✔
797
                 .config.szCache = pVgInfo->szCache,
4✔
798
                 .config.szBuf = pVgInfo->szBuf,
4✔
799
                 .config.cacheLast = pVgInfo->cacheLast,
4✔
800
                 .config.standby = pVgInfo->standby,
4✔
801
                 .config.hashMethod = pVgInfo->hashMethod,
4✔
802
                 .config.hashBegin = pVgInfo->hashBegin,
4✔
803
                 .config.hashEnd = pVgInfo->hashEnd,
4✔
804
                 .config.hashPrefix = pVgInfo->hashPrefix,
4✔
805
                 .config.hashSuffix = pVgInfo->hashSuffix,
4✔
806
                 .config.sttTrigger = pVgInfo->sttTrigger,
4✔
807
                 .config.syncCfg.replicaNum = pVgInfo->replications,
4✔
808
                 .config.tsdbCfg.precision = pVgInfo->precision,
4✔
809
                 .config.tsdbCfg.compression = pVgInfo->compression,
4✔
810
                 .config.tsdbCfg.slLevel = pVgInfo->slLevel,
4✔
811
                 .config.tsdbCfg.days = pVgInfo->daysPerFile,
4✔
812
                 .config.tsdbCfg.keep0 = pVgInfo->keep0,
4✔
813
                 .config.tsdbCfg.keep1 = pVgInfo->keep1,
4✔
814
                 .config.tsdbCfg.keep2 = pVgInfo->keep2,
4✔
815
                 .config.tsdbCfg.keepTimeOffset = pVgInfo->keepTimeOffset,
4✔
816
                 .config.tsdbCfg.minRows = pVgInfo->minRows,
4✔
817
                 .config.tsdbCfg.maxRows = pVgInfo->maxRows,
4✔
818
                 .config.tsdbPageSize = pVgInfo->tsdbPageSize,
4✔
819
                 .config.ssChunkSize = pVgInfo->ssChunkSize,
4✔
820
                 .config.ssKeepLocal = pVgInfo->ssKeepLocal,
4✔
821
                 .config.ssCompact = pVgInfo->ssCompact,
4✔
822
                 .config.walCfg.fsyncPeriod = pVgInfo->walFsyncPeriod,
4✔
823
                 .config.walCfg.retentionPeriod = pVgInfo->walRetentionPeriod,
4✔
824
                 .config.walCfg.rollPeriod = pVgInfo->walRollPeriod,
4✔
825
                 .config.walCfg.retentionSize = pVgInfo->walRetentionSize,
4✔
826
                 .config.walCfg.segSize = pVgInfo->walSegSize,
4✔
827
                 .config.walCfg.level = pVgInfo->walLevel,
4✔
828
                 .config.walCfg.encryptAlgorithm = pVgInfo->encryptAlgorithm,
4✔
829
                 .diskPrimary = pVgInfo->diskPrimary,
4✔
830
      };
831
      void *vnodePath = taosArrayGet(pMountInfo->pDisks[0], pVgInfo->diskPrimary);
4✔
832
      snprintf(path, sizeof(path), "%s%s%s%svnode%d", *(char **)vnodePath, TD_DIRSEP, dmNodeName(VNODE), TD_DIRSEP,
4✔
833
               pVgInfo->vgId);
834
      vnode.path = path;
4✔
835

836
      int32_t rollback = vnodeShouldRollback(&vnode);
4✔
837
      if ((code = metaOpen(&vnode, &vnode.pMeta, rollback)) != 0) {
4!
838
        dError("mount:%s, failed to retrieve stbs of vnode:%d for db:%" PRId64 " on dnode:%d since %s, path:%s",
×
839
               pReq->mountName, pVgInfo->vgId, pVgInfo->dbId, pReq->dnodeId, tstrerror(code), path);
840
        TAOS_CHECK_EXIT(code);
×
841
      } else {
842
        dInfo("mount:%s, success to retrieve stbs of vnode:%d for db:%" PRId64 " on dnode:%d, path:%s", pReq->mountName,
4!
843
              pVgInfo->vgId, pVgInfo->dbId, pReq->dnodeId, path);
844

845
        SMetaReader mr = {0};
4✔
846
        tb_uid_t    suid = 0;
4✔
847
        SMeta      *pMeta = vnode.pMeta;
4✔
848

849
        metaReaderDoInit(&mr, pMeta, META_READER_LOCK);
4✔
850
        if (!suidList && !(suidList = taosArrayInit(1, sizeof(tb_uid_t)))) {
4!
851
          TSDB_CHECK_CODE(terrno, lino, _exit0);
×
852
        }
853
        taosArrayClear(suidList);
4✔
854
        TSDB_CHECK_CODE(vnodeGetStbIdList(&vnode, 0, suidList), lino, _exit0);
4!
855
        dInfo("mount:%s, vnode:%d, db:%" PRId64 ", stbs num:%d on dnode:%d", pReq->mountName, pVgInfo->vgId,
4!
856
              pVgInfo->dbId, (int32_t)taosArrayGetSize(suidList), pReq->dnodeId);
857
        int32_t nStbs = taosArrayGetSize(suidList);
4✔
858
        if (!pDbInfo->pStbs && !(pDbInfo->pStbs = taosArrayInit(nStbs, sizeof(void *)))) {
4!
859
          TSDB_CHECK_CODE(terrno, lino, _exit0);
×
860
        }
861
        for (int32_t i = 0; i < nStbs; ++i) {
16✔
862
          suid = *(tb_uid_t *)taosArrayGet(suidList, i);
12✔
863
          dInfo("mount:%s, vnode:%d, db:%" PRId64 ", stb suid:%" PRIu64 " on dnode:%d", pReq->mountName, pVgInfo->vgId,
12!
864
                pVgInfo->dbId, suid, pReq->dnodeId);
865
          if ((code = metaReaderGetTableEntryByUidCache(&mr, suid)) < 0) {
12!
866
            TSDB_CHECK_CODE(code, lino, _exit0);
×
867
          }
868
          if (mr.me.uid != suid || mr.me.type != TSDB_SUPER_TABLE ||
12!
869
              mr.me.colCmpr.nCols != mr.me.stbEntry.schemaRow.nCols) {
12!
870
            dError("mount:%s, vnode:%d, db:%" PRId64 ", stb info not match, suid:%" PRIu64 " expected:%" PRIu64
×
871
                   ", type:%" PRIi8 " expected:%d, nCmprCols:%d nCols:%d on dnode:%d",
872
                   pReq->mountName, pVgInfo->vgId, pVgInfo->dbId, mr.me.uid, suid, mr.me.type, TSDB_SUPER_TABLE,
873
                   mr.me.colCmpr.nCols, mr.me.stbEntry.schemaRow.nCols, pReq->dnodeId);
874
            TSDB_CHECK_CODE(TSDB_CODE_FILE_CORRUPTED, lino, _exit0);
×
875
          }
876
          SMountStbInfo stbInfo = {
12✔
877
              .req.source = TD_REQ_FROM_APP,
878
              .req.suid = suid,
879
              .req.colVer = mr.me.stbEntry.schemaRow.version,
12✔
880
              .req.tagVer = mr.me.stbEntry.schemaTag.version,
12✔
881
              .req.numOfColumns = mr.me.stbEntry.schemaRow.nCols,
12✔
882
              .req.numOfTags = mr.me.stbEntry.schemaTag.nCols,
12✔
883
              .req.virtualStb = TABLE_IS_VIRTUAL(mr.me.flags) ? 1 : 0,
12✔
884
          };
885
          snprintf(stbInfo.req.name, sizeof(stbInfo.req.name), "%s", mr.me.name);
12✔
886
          if (!pCols && !(pCols = taosArrayInit(stbInfo.req.numOfColumns, sizeof(SFieldWithOptions)))) {
12!
887
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
888
          }
889
          if (!pTags && !(pTags = taosArrayInit(stbInfo.req.numOfTags, sizeof(SField)))) {
12!
890
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
891
          }
892

893
          if (!pColExts && !(pColExts = taosArrayInit(stbInfo.req.numOfColumns, sizeof(col_id_t)))) {
12!
894
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
895
          }
896
          if (!pTagExts && !(pTagExts = taosArrayInit(stbInfo.req.numOfTags, sizeof(col_id_t)))) {
12!
897
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
898
          }
899
          taosArrayClear(pCols);
12✔
900
          taosArrayClear(pTags);
12✔
901
          taosArrayClear(pColExts);
12✔
902
          taosArrayClear(pTagExts);
12✔
903
          stbInfo.req.pColumns = pCols;
12✔
904
          stbInfo.req.pTags = pTags;
12✔
905
          stbInfo.pColExts = pColExts;
12✔
906
          stbInfo.pTagExts = pTagExts;
12✔
907

908
          for (int32_t c = 0; c < stbInfo.req.numOfColumns; ++c) {
72✔
909
            SSchema          *pSchema = mr.me.stbEntry.schemaRow.pSchema + c;
60✔
910
            SColCmpr         *pColComp = mr.me.colCmpr.pColCmpr + c;
60✔
911
            SFieldWithOptions col = {
60✔
912
                .type = pSchema->type,
60✔
913
                .flags = pSchema->flags,
60✔
914
                .bytes = pSchema->bytes,
60✔
915
                .compress = pColComp->alg,
60✔
916
            };
917
            (void)snprintf(col.name, sizeof(col.name), "%s", pSchema->name);
60✔
918
            if (pSchema->colId != pColComp->id) {
60!
919
              TSDB_CHECK_CODE(TSDB_CODE_FILE_CORRUPTED, lino, _exit0);
×
920
            }
921
            if (mr.me.pExtSchemas) {
60!
922
              col.typeMod = (mr.me.pExtSchemas + c)->typeMod;
×
923
            }
924
            TSDB_CHECK_NULL(taosArrayPush(pCols, &col), code, lino, _exit0, terrno);
60!
925
            TSDB_CHECK_NULL(taosArrayPush(pColExts, &pSchema->colId), code, lino, _exit0, terrno);
120!
926
          }
927
          for (int32_t t = 0; t < stbInfo.req.numOfTags; ++t) {
28✔
928
            SSchema *pSchema = mr.me.stbEntry.schemaTag.pSchema + t;
16✔
929
            SField   tag = {
16✔
930
                  .type = pSchema->type,
16✔
931
                  .flags = pSchema->flags,
16✔
932
                  .bytes = pSchema->bytes,
16✔
933
            };
934
            (void)snprintf(tag.name, sizeof(tag.name), "%s", pSchema->name);
16✔
935
            TSDB_CHECK_NULL(taosArrayPush(pTags, &tag), code, lino, _exit0, terrno);
16!
936
            TSDB_CHECK_NULL(taosArrayPush(pTagExts, &pSchema->colId), code, lino, _exit0, terrno);
32!
937
          }
938
          tDecoderClear(&mr.coder);
12✔
939

940
          // serialize the SMountStbInfo
941
          int32_t firstPartLen = 0;
12✔
942
          int32_t msgLen = tSerializeSMountStbInfo(NULL, 0, &firstPartLen, &stbInfo);
12✔
943
          if (msgLen <= 0) {
12!
944
            TSDB_CHECK_CODE(msgLen < 0 ? msgLen : TSDB_CODE_INTERNAL_ERROR, lino, _exit0);
×
945
          }
946
          void *pBuf = taosMemoryMalloc((sizeof(int32_t) << 1) + msgLen);  // totalLen(4)|1stPartLen(4)|1stPart|2ndPart
12!
947
          if (!pBuf) TSDB_CHECK_CODE(TSDB_CODE_OUT_OF_MEMORY, lino, _exit0);
12!
948
          *(int32_t *)pBuf = (sizeof(int32_t) << 1) + msgLen;
12✔
949
          *(int32_t *)POINTER_SHIFT(pBuf, sizeof(int32_t)) = firstPartLen;
12✔
950
          if (tSerializeSMountStbInfo(POINTER_SHIFT(pBuf, (sizeof(int32_t) << 1)), msgLen, NULL, &stbInfo) <= 0) {
12!
951
            taosMemoryFree(pBuf);
×
952
            TSDB_CHECK_CODE(msgLen < 0 ? msgLen : TSDB_CODE_INTERNAL_ERROR, lino, _exit0);
×
953
          }
954
          if (!taosArrayPush(pDbInfo->pStbs, &pBuf)) {
24!
955
            taosMemoryFree(pBuf);
×
956
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
957
          }
958
        }
959
      _exit0:
4✔
960
        metaReaderClear(&mr);
4✔
961
        metaClose(&vnode.pMeta);
4✔
962
        TAOS_CHECK_EXIT(code);
4!
963
      }
964
      break;  // retrieve stbs from one vnode is enough
4✔
965
    }
966
  }
967
_exit:
2✔
968
  if (code != 0) {
2!
969
    dError("mount:%s, failed to retrieve mount stbs at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
970
           pReq->dnodeId, tstrerror(code), path);
971
  }
972
  taosArrayDestroy(suidList);
2✔
973
  taosArrayDestroy(pCols);
2✔
974
  taosArrayDestroy(pTags);
2✔
975
  taosArrayDestroy(pColExts);
2✔
976
  taosArrayDestroy(pTagExts);
2✔
977
  TAOS_RETURN(code);
2✔
978
}
979

980
int32_t vmMountCheckRunning(const char *mountName, const char *mountPath, TdFilePtr *pFile, int32_t retryLimit) {
9✔
981
  int32_t code = 0, lino = 0;
9✔
982
  int32_t retryTimes = 0;
9✔
983
  char    filepath[PATH_MAX] = {0};
9✔
984
  (void)snprintf(filepath, sizeof(filepath), "%s%s.running", mountPath, TD_DIRSEP);
9✔
985
  TSDB_CHECK_NULL((*pFile = taosOpenFile(
9!
986
                       filepath, TD_FILE_CREATE | TD_FILE_READ | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_CLOEXEC)),
987
                  code, lino, _exit, terrno);
988
  int32_t ret = 0;
9✔
989
  do {
990
    ret = taosLockFile(*pFile);
11✔
991
    if (ret == 0) break;
11✔
992
    taosMsleep(1000);
3✔
993
    ++retryTimes;
3✔
994
    dError("mount:%s, failed to lock file:%s since %s, retryTimes:%d", mountName, filepath, tstrerror(ret), retryTimes);
3!
995
  } while (retryTimes < retryLimit);
3✔
996
  TAOS_CHECK_EXIT(ret);
9✔
997
_exit:
8✔
998
  if (code != 0) {
9✔
999
    (void)taosCloseFile(pFile);
1✔
1000
    *pFile = NULL;
1✔
1001
    dError("mount:%s, failed to check running at line %d since %s, path:%s", mountName, lino, tstrerror(code),
1!
1002
           filepath);
1003
  }
1004
  TAOS_RETURN(code);
9✔
1005
}
1006

1007
static int32_t vmRetrieveMountPreCheck(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
6✔
1008
  int32_t code = 0, lino = 0;
6✔
1009
  char    path[TSDB_MOUNT_FPATH_LEN] = {0};
6✔
1010
  TSDB_CHECK_CONDITION(taosCheckAccessFile(pReq->mountPath, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
6✔
1011
  TAOS_CHECK_EXIT(vmMountCheckRunning(pReq->mountName, pReq->mountPath, &pMountInfo->pFile, 3));
5✔
1012
  (void)snprintf(path, sizeof(path), "%s%s%s%sdnode.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP);
4✔
1013
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
4✔
1014
  (void)snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(MNODE));
3✔
1015
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
3!
1016
  (void)snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
3✔
1017
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
3!
1018
  (void)snprintf(path, sizeof(path), "%s%s%s%sconfig%slocal.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP,
3✔
1019
           TD_DIRSEP);
1020
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
3!
1021
_exit:
3✔
1022
  if (code != 0) {
6✔
1023
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
3!
1024
           pReq->dnodeId, tstrerror(code), path);
1025
  }
1026
  TAOS_RETURN(code);
6✔
1027
}
1028

1029
static int32_t vmRetrieveMountPathImpl(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, SRetrieveMountPathReq *pReq,
6✔
1030
                                       SMountInfo *pMountInfo) {
1031
  int32_t code = 0, lino = 0;
6✔
1032
  pMountInfo->dnodeId = pReq->dnodeId;
6✔
1033
  pMountInfo->mountUid = pReq->mountUid;
6✔
1034
  (void)tsnprintf(pMountInfo->mountName, sizeof(pMountInfo->mountName), "%s", pReq->mountName);
6✔
1035
  (void)tsnprintf(pMountInfo->mountPath, sizeof(pMountInfo->mountPath), "%s", pReq->mountPath);
6✔
1036
  pMountInfo->ignoreExist = pReq->ignoreExist;
6✔
1037
  pMountInfo->valLen = pReq->valLen;
6✔
1038
  pMountInfo->pVal = pReq->pVal;
6✔
1039
  TAOS_CHECK_EXIT(vmRetrieveMountPreCheck(pMgmt, pReq, pMountInfo));
6✔
1040
  TAOS_CHECK_EXIT(vmRetrieveMountDnode(pMgmt, pReq, pMountInfo));
3!
1041
  TAOS_CHECK_EXIT(vmRetrieveMountVnodes(pMgmt, pReq, pMountInfo));
3✔
1042
  TAOS_CHECK_EXIT(vmRetrieveMountStbs(pMgmt, pReq, pMountInfo));
2!
1043
_exit:
2✔
1044
  if (code != 0) {
6✔
1045
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
4!
1046
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
1047
  }
1048
  TAOS_RETURN(code);
6✔
1049
}
1050

1051
int32_t vmProcessRetrieveMountPathReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
6✔
1052
  int32_t               code = 0, lino = 0;
6✔
1053
  int32_t               rspCode = 0;
6✔
1054
  SVnodeMgmt            vndMgmt = {0};
6✔
1055
  SMountInfo            mountInfo = {0};
6✔
1056
  void                 *pBuf = NULL;
6✔
1057
  int32_t               bufLen = 0;
6✔
1058
  SRetrieveMountPathReq req = {0};
6✔
1059

1060
  vndMgmt = *pMgmt;
6✔
1061
  vndMgmt.path = NULL;
6✔
1062
  TAOS_CHECK_GOTO(tDeserializeSRetrieveMountPathReq(pMsg->pCont, pMsg->contLen, &req), &lino, _end);
6!
1063
  dInfo("mount:%s, start to retrieve path:%s", req.mountName, req.mountPath);
6!
1064
  TAOS_CHECK_GOTO(vmRetrieveMountPathImpl(&vndMgmt, pMsg, &req, &mountInfo), &lino, _end);
6✔
1065
_end:
2✔
1066
  TSDB_CHECK_CONDITION((bufLen = tSerializeSMountInfo(NULL, 0, &mountInfo)) >= 0, rspCode, lino, _exit, bufLen);
6!
1067
  TSDB_CHECK_CONDITION((pBuf = rpcMallocCont(bufLen)), rspCode, lino, _exit, terrno);
6!
1068
  TSDB_CHECK_CONDITION((bufLen = tSerializeSMountInfo(pBuf, bufLen, &mountInfo)) >= 0, rspCode, lino, _exit, bufLen);
6!
1069
  pMsg->info.rsp = pBuf;
6✔
1070
  pMsg->info.rspLen = bufLen;
6✔
1071
_exit:
6✔
1072
  if (rspCode != 0) {
6!
1073
    // corner case: if occurs, the client will not receive the response, and the client should be killed manually
1074
    dError("mount:%s, failed to retrieve mount at line %d since %s, dnode:%d, path:%s", req.mountName, lino,
×
1075
           tstrerror(rspCode), req.dnodeId, req.mountPath);
1076
    rpcFreeCont(pBuf);
×
1077
    code = rspCode;
×
1078
  } else if (code != 0) {
6✔
1079
    // the client would receive the response with error msg
1080
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", req.mountName, lino,
4!
1081
           req.dnodeId, tstrerror(code), req.mountPath);
1082
  } else {
1083
    int32_t nVgs = 0;
2✔
1084
    int32_t nDbs = taosArrayGetSize(mountInfo.pDbs);
2✔
1085
    for (int32_t i = 0; i < nDbs; ++i) {
6✔
1086
      SMountDbInfo *pDb = TARRAY_GET_ELEM(mountInfo.pDbs, i);
4✔
1087
      nVgs += taosArrayGetSize(pDb->pVgs);
4✔
1088
    }
1089
    dInfo("mount:%s, success to retrieve mount, nDbs:%d, nVgs:%d, path:%s", req.mountName, nDbs, nVgs, req.mountPath);
2!
1090
  }
1091
  taosMemFreeClear(vndMgmt.path);
6!
1092
  tFreeMountInfo(&mountInfo, false);
6✔
1093
  TAOS_RETURN(code);
6✔
1094
}
1095

1096
static int32_t vmMountVnode(SVnodeMgmt *pMgmt, const char *path, SVnodeCfg *pCfg, int32_t diskPrimary,
8✔
1097
                            SMountVnodeReq *req, STfs *pMountTfs) {
1098
  int32_t    code = 0;
8✔
1099
  SVnodeInfo info = {0};
8✔
1100
  char       hostDir[TSDB_FILENAME_LEN] = {0};
8✔
1101
  char       mountDir[TSDB_FILENAME_LEN] = {0};
8✔
1102
  char       mountVnode[32] = {0};
8✔
1103

1104
  if ((code = vnodeCheckCfg(pCfg)) < 0) {
8!
1105
    vError("vgId:%d, mount:%s, failed to mount vnode since:%s", pCfg->vgId, req->mountName, tstrerror(code));
×
1106
    return code;
×
1107
  }
1108

1109
  vnodeGetPrimaryDir(path, 0, pMgmt->pTfs, hostDir, TSDB_FILENAME_LEN);
8✔
1110
  if ((code = taosMkDir(hostDir))) {
8!
1111
    vError("vgId:%d, mount:%s, failed to prepare vnode dir since %s, host path: %s", pCfg->vgId, req->mountName,
×
1112
           tstrerror(code), hostDir);
1113
    return code;
×
1114
  }
1115

1116
  info.config = *pCfg;  // copy the config
8✔
1117
  info.state.committed = req->committed;
8✔
1118
  info.state.commitID = req->commitID;
8✔
1119
  info.state.commitTerm = req->commitTerm;
8✔
1120
  info.state.applied = req->committed;
8✔
1121
  info.state.applyTerm = req->commitTerm;
8✔
1122
  info.config.vndStats.numOfSTables = req->numOfSTables;
8✔
1123
  info.config.vndStats.numOfCTables = req->numOfCTables;
8✔
1124
  info.config.vndStats.numOfNTables = req->numOfNTables;
8✔
1125

1126
  SVnodeInfo oldInfo = {0};
8✔
1127
  oldInfo.config = vnodeCfgDefault;
8✔
1128
  if (vnodeLoadInfo(hostDir, &oldInfo) == 0) {
8!
1129
    if (oldInfo.config.dbId != info.config.dbId) {
×
1130
      code = TSDB_CODE_VND_ALREADY_EXIST_BUT_NOT_MATCH;
×
1131
      vError("vgId:%d, mount:%s, vnode config info already exists at %s. oldDbId:%" PRId64 "(%s) at cluster:%" PRId64
×
1132
             ", newDbId:%" PRId64 "(%s) at cluser:%" PRId64 ", code:%s",
1133
             oldInfo.config.vgId, req->mountName, hostDir, oldInfo.config.dbId, oldInfo.config.dbname,
1134
             oldInfo.config.syncCfg.nodeInfo[oldInfo.config.syncCfg.myIndex].clusterId, info.config.dbId,
1135
             info.config.dbname, info.config.syncCfg.nodeInfo[info.config.syncCfg.myIndex].clusterId, tstrerror(code));
1136

1137
    } else {
1138
      vWarn("vgId:%d, mount:%s, vnode config info already exists at %s.", oldInfo.config.vgId, req->mountName, hostDir);
×
1139
    }
1140
    return code;
×
1141
  }
1142

1143
  char hostSubDir[TSDB_FILENAME_LEN] = {0};
8✔
1144
  char mountSubDir[TSDB_FILENAME_LEN] = {0};
8✔
1145
  (void)snprintf(mountVnode, sizeof(mountVnode), "vnode%svnode%d", TD_DIRSEP, req->mountVgId);
8✔
1146
  vnodeGetPrimaryDir(mountVnode, diskPrimary, pMountTfs, mountDir, TSDB_FILENAME_LEN);
8✔
1147
  static const char *vndSubDirs[] = {"meta", "sync", "tq", "tsdb", "wal"};
1148
  for (int32_t i = 0; i < tListLen(vndSubDirs); ++i) {
48✔
1149
    (void)snprintf(hostSubDir, sizeof(hostSubDir), "%s%s%s", hostDir, TD_DIRSEP, vndSubDirs[i]);
40✔
1150
    (void)snprintf(mountSubDir, sizeof(mountSubDir), "%s%s%s", mountDir, TD_DIRSEP, vndSubDirs[i]);
40✔
1151
    if ((code = taosSymLink(mountSubDir, hostSubDir)) != 0) {
40!
1152
      vError("vgId:%d, mount:%s, failed to create vnode symlink %s -> %s since %s", info.config.vgId, req->mountName,
×
1153
             mountSubDir, hostSubDir, tstrerror(code));
1154
      return code;
×
1155
    }
1156
  }
1157
  vInfo("vgId:%d, mount:save vnode config while create", info.config.vgId);
8!
1158
  if ((code = vnodeSaveInfo(hostDir, &info)) < 0 || (code = vnodeCommitInfo(hostDir)) < 0) {
8!
1159
    vError("vgId:%d, mount:%s, failed to save vnode config since %s, mount path: %s", pCfg ? pCfg->vgId : 0,
×
1160
           req->mountName, tstrerror(code), hostDir);
1161
    return code;
×
1162
  }
1163
  vInfo("vgId:%d, mount:%s, vnode is mounted from %s to %s", info.config.vgId, req->mountName, mountDir, hostDir);
8!
1164
  return 0;
8✔
1165
}
1166

1167
int32_t vmProcessMountVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
8✔
1168
  int32_t          code = 0, lino = 0;
8✔
1169
  SMountVnodeReq   req = {0};
8✔
1170
  SCreateVnodeReq *pCreateReq = &req.createReq;
8✔
1171
  SVnodeCfg        vnodeCfg = {0};
8✔
1172
  SWrapperCfg      wrapperCfg = {0};
8✔
1173
  SVnode          *pImpl = NULL;
8✔
1174
  STfs            *pMountTfs = NULL;
8✔
1175
  char             path[TSDB_FILENAME_LEN] = {0};
8✔
1176
  bool             releaseTfs = false;
8✔
1177

1178
  if (tDeserializeSMountVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
8!
1179
    dError("vgId:%d, failed to mount vnode since deserialize request error", pCreateReq->vgId);
×
1180
    return TSDB_CODE_INVALID_MSG;
×
1181
  }
1182

1183
  if (pCreateReq->learnerReplica == 0) {
8!
1184
    pCreateReq->learnerSelfIndex = -1;
8✔
1185
  }
1186
  for (int32_t i = 0; i < pCreateReq->replica; ++i) {
16✔
1187
    dInfo("mount:%s, vgId:%d, replica:%d ep:%s:%u dnode:%d", req.mountName, pCreateReq->vgId, i,
8!
1188
          pCreateReq->replicas[i].fqdn, pCreateReq->replicas[i].port, pCreateReq->replicas[i].id);
1189
  }
1190
  for (int32_t i = 0; i < pCreateReq->learnerReplica; ++i) {
8!
1191
    dInfo("mount:%s, vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.mountName, pCreateReq->vgId, i,
×
1192
          pCreateReq->learnerReplicas[i].fqdn, pCreateReq->learnerReplicas[i].port, pCreateReq->replicas[i].id);
1193
  }
1194

1195
  SReplica *pReplica = NULL;
8✔
1196
  if (pCreateReq->selfIndex != -1) {
8!
1197
    pReplica = &pCreateReq->replicas[pCreateReq->selfIndex];
8✔
1198
  } else {
1199
    pReplica = &pCreateReq->learnerReplicas[pCreateReq->learnerSelfIndex];
×
1200
  }
1201
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
8!
1202
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
8!
1203
    (void)tFreeSMountVnodeReq(&req);
×
1204
    code = TSDB_CODE_INVALID_MSG;
×
1205
    dError("mount:%s, vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode, reason:%s", req.mountName,
×
1206
           pCreateReq->vgId, pReplica->id, pReplica->fqdn, pReplica->port, tstrerror(code));
1207
    return code;
×
1208
  }
1209
  vmGenerateVnodeCfg(pCreateReq, &vnodeCfg);
8✔
1210
  vnodeCfg.mountVgId = req.mountVgId;
8✔
1211
  vmGenerateWrapperCfg(pMgmt, pCreateReq, &wrapperCfg);
8✔
1212
  wrapperCfg.mountId = req.mountId;
8✔
1213

1214
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, pCreateReq->vgId, false);
8✔
1215
  if (pVnode != NULL && (pCreateReq->replica == 1 || !pVnode->failed)) {
8!
1216
    dError("mount:%s, vgId:%d, already exist", req.mountName, pCreateReq->vgId);
×
1217
    (void)tFreeSMountVnodeReq(&req);
×
1218
    vmReleaseVnode(pMgmt, pVnode);
×
1219
    code = TSDB_CODE_VND_ALREADY_EXIST;
×
1220
    return 0;
×
1221
  }
1222
  vmReleaseVnode(pMgmt, pVnode);
8✔
1223

1224
  wrapperCfg.diskPrimary = req.diskPrimary;
8✔
1225
  (void)snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
8✔
1226
  TAOS_CHECK_EXIT(vmAcquireMountTfs(pMgmt, req.mountId, req.mountName, req.mountPath, &pMountTfs));
8!
1227
  releaseTfs = true;
8✔
1228

1229
  TAOS_CHECK_EXIT(vmMountVnode(pMgmt, path, &vnodeCfg, wrapperCfg.diskPrimary, &req, pMountTfs));
8!
1230
  if (!(pImpl = vnodeOpen(path, 0, pMgmt->pTfs, pMountTfs, pMgmt->msgCb, true))) {
8!
1231
    TAOS_CHECK_EXIT(terrno != 0 ? terrno : -1);
×
1232
  }
1233
  if ((code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl)) != 0) {
8!
1234
    TAOS_CHECK_EXIT(terrno != 0 ? terrno : code);
×
1235
  }
1236
  TAOS_CHECK_EXIT(vnodeStart(pImpl));
8!
1237
  TAOS_CHECK_EXIT(vmWriteVnodeListToFile(pMgmt));
8!
1238
  TAOS_CHECK_EXIT(vmWriteMountListToFile(pMgmt));
8!
1239
_exit:
8✔
1240
  vmCleanPrimaryDisk(pMgmt, pCreateReq->vgId);
8✔
1241
  if (code != 0) {
8!
1242
    dError("mount:%s, vgId:%d, msgType:%s, failed at line %d to mount vnode since %s", req.mountName, pCreateReq->vgId,
×
1243
           TMSG_INFO(pMsg->msgType), lino, tstrerror(code));
1244
    vmCloseFailedVnode(pMgmt, pCreateReq->vgId);
×
1245
    vnodeClose(pImpl);
×
1246
    vnodeDestroy(0, path, pMgmt->pTfs, 0);
×
1247
    if (releaseTfs) vmReleaseMountTfs(pMgmt, req.mountId, 1);
×
1248
  } else {
1249
    dInfo("mount:%s, vgId:%d, msgType:%s, success to mount vnode", req.mountName, pCreateReq->vgId,
8!
1250
          TMSG_INFO(pMsg->msgType));
1251
  }
1252

1253
  pMsg->code = code;
8✔
1254
  pMsg->info.rsp = NULL;
8✔
1255
  pMsg->info.rspLen = 0;
8✔
1256

1257
  (void)tFreeSMountVnodeReq(&req);
8✔
1258
  TAOS_RETURN(code);
8✔
1259
}
1260
#endif  // USE_MOUNT
1261

1262
// alter replica doesn't use this, but restore dnode still use this
1263
int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
3,495✔
1264
  SAlterVnodeTypeReq req = {0};
3,495✔
1265
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
3,495!
1266
    terrno = TSDB_CODE_INVALID_MSG;
×
1267
    return -1;
×
1268
  }
1269

1270
  if (req.learnerReplicas == 0) {
1271
    req.learnerSelfIndex = -1;
1272
  }
1273

1274
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process alter-node-type-request", req.vgId,
3,495!
1275
        TMSG_INFO(pMsg->msgType));
1276

1277
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
3,495✔
1278
  if (pVnode == NULL) {
3,495!
1279
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
1280
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1281
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1282
    return -1;
×
1283
  }
1284

1285
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
3,495✔
1286
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
3,495!
1287
  if (role == TAOS_SYNC_ROLE_VOTER) {
3,495!
1288
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
1289
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
1290
    vmReleaseVnode(pMgmt, pVnode);
×
1291
    return -1;
×
1292
  }
1293

1294
  dInfo("vgId:%d, checking node catch up", req.vgId);
3,495!
1295
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
3,495✔
1296
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
3,315✔
1297
    vmReleaseVnode(pMgmt, pVnode);
3,315✔
1298
    return -1;
3,315✔
1299
  }
1300

1301
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
180!
1302

1303
  int32_t vgId = req.vgId;
180✔
1304
  dInfo("vgId:%d, start to alter vnode type replica:%d selfIndex:%d strict:%d changeVersion:%d", vgId, req.replica,
180!
1305
        req.selfIndex, req.strict, req.changeVersion);
1306
  for (int32_t i = 0; i < req.replica; ++i) {
694✔
1307
    SReplica *pReplica = &req.replicas[i];
514✔
1308
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
514!
1309
  }
1310
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
180!
1311
    SReplica *pReplica = &req.learnerReplicas[i];
×
1312
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
×
1313
  }
1314

1315
  if (req.replica <= 0 || (req.selfIndex < 0 && req.learnerSelfIndex < 0) || req.selfIndex >= req.replica ||
180!
1316
      req.learnerSelfIndex >= req.learnerReplica) {
180!
1317
    terrno = TSDB_CODE_INVALID_MSG;
×
1318
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
1319
    vmReleaseVnode(pMgmt, pVnode);
×
1320
    return -1;
×
1321
  }
1322

1323
  SReplica *pReplica = NULL;
180✔
1324
  if (req.selfIndex != -1) {
180!
1325
    pReplica = &req.replicas[req.selfIndex];
180✔
1326
  } else {
1327
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
×
1328
  }
1329

1330
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
180!
1331
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
180!
1332
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1333
    dError("vgId:%d, dnodeId:%d ep:%s:%u in request, ep:%s:%u in local, %s", vgId, pReplica->id, pReplica->fqdn,
×
1334
           pReplica->port, tsLocalFqdn, tsServerPort, tstrerror(terrno));
1335
    vmReleaseVnode(pMgmt, pVnode);
×
1336
    return -1;
×
1337
  }
1338

1339
  dInfo("vgId:%d, start to close vnode", vgId);
180!
1340
  SWrapperCfg wrapperCfg = {
180✔
1341
      .dropped = pVnode->dropped,
180✔
1342
      .vgId = pVnode->vgId,
180✔
1343
      .vgVersion = pVnode->vgVersion,
180✔
1344
      .diskPrimary = pVnode->diskPrimary,
180✔
1345
  };
1346
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
180✔
1347

1348
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
180✔
1349
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
180✔
1350

1351
  int32_t diskPrimary = wrapperCfg.diskPrimary;
180✔
1352
  char    path[TSDB_FILENAME_LEN] = {0};
180✔
1353
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
180✔
1354

1355
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
180!
1356
  if (vnodeAlterReplica(path, &req, diskPrimary, pMgmt->pTfs) < 0) {
180!
1357
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
1358
    return -1;
×
1359
  }
1360

1361
  dInfo("vgId:%d, begin to open vnode", vgId);
180!
1362
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
180✔
1363
  if (pImpl == NULL) {
180!
1364
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
1365
    return -1;
×
1366
  }
1367

1368
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
180!
1369
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
1370
    return -1;
×
1371
  }
1372

1373
  if (vnodeStart(pImpl) != 0) {
180!
1374
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
1375
    return -1;
×
1376
  }
1377

1378
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process alter-node-type-request, vnode config is altered",
180!
1379
        req.vgId, TMSG_INFO(pMsg->msgType));
1380
  return 0;
180✔
1381
}
1382

1383
int32_t vmProcessCheckLearnCatchupReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
1384
  SCheckLearnCatchupReq req = {0};
×
1385
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
×
1386
    terrno = TSDB_CODE_INVALID_MSG;
×
1387
    return -1;
×
1388
  }
1389

1390
  if (req.learnerReplicas == 0) {
1391
    req.learnerSelfIndex = -1;
1392
  }
1393

1394
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process check-learner-catchup-request", req.vgId,
×
1395
        TMSG_INFO(pMsg->msgType));
1396

1397
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
×
1398
  if (pVnode == NULL) {
×
1399
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
1400
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1401
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1402
    return -1;
×
1403
  }
1404

1405
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
×
1406
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
×
1407
  if (role == TAOS_SYNC_ROLE_VOTER) {
×
1408
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
1409
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
1410
    vmReleaseVnode(pMgmt, pVnode);
×
1411
    return -1;
×
1412
  }
1413

1414
  dInfo("vgId:%d, checking node catch up", req.vgId);
×
1415
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
×
1416
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
×
1417
    vmReleaseVnode(pMgmt, pVnode);
×
1418
    return -1;
×
1419
  }
1420

1421
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
×
1422

1423
  vmReleaseVnode(pMgmt, pVnode);
×
1424

1425
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process check-learner-catchup-request", req.vgId,
×
1426
        TMSG_INFO(pMsg->msgType));
1427

1428
  return 0;
×
1429
}
1430

1431
int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
64✔
1432
  SDisableVnodeWriteReq req = {0};
64✔
1433
  if (tDeserializeSDisableVnodeWriteReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
64!
1434
    terrno = TSDB_CODE_INVALID_MSG;
×
1435
    return -1;
×
1436
  }
1437

1438
  dInfo("vgId:%d, vnode write disable:%d", req.vgId, req.disable);
64!
1439

1440
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
64✔
1441
  if (pVnode == NULL) {
64!
1442
    dError("vgId:%d, failed to disable write since %s", req.vgId, terrstr());
×
1443
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1444
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1445
    return -1;
×
1446
  }
1447

1448
  pVnode->disable = req.disable;
64✔
1449
  vmReleaseVnode(pMgmt, pVnode);
64✔
1450
  return 0;
64✔
1451
}
1452

1453
int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
64✔
1454
  SAlterVnodeHashRangeReq req = {0};
64✔
1455
  if (tDeserializeSAlterVnodeHashRangeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
64!
1456
    terrno = TSDB_CODE_INVALID_MSG;
×
1457
    return -1;
×
1458
  }
1459

1460
  int32_t srcVgId = req.srcVgId;
64✔
1461
  int32_t dstVgId = req.dstVgId;
64✔
1462

1463
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, dstVgId);
64✔
1464
  if (pVnode != NULL) {
64!
1465
    dError("vgId:%d, vnode already exist", dstVgId);
×
1466
    vmReleaseVnode(pMgmt, pVnode);
×
1467
    terrno = TSDB_CODE_VND_ALREADY_EXIST;
×
1468
    return -1;
×
1469
  }
1470

1471
  dInfo("vgId:%d, start to alter vnode hashrange:[%u, %u], dstVgId:%d", req.srcVgId, req.hashBegin, req.hashEnd,
64!
1472
        req.dstVgId);
1473
  pVnode = vmAcquireVnode(pMgmt, srcVgId);
64✔
1474
  if (pVnode == NULL) {
64!
1475
    dError("vgId:%d, failed to alter hashrange since %s", srcVgId, terrstr());
×
1476
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1477
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1478
    return -1;
×
1479
  }
1480

1481
  SWrapperCfg wrapperCfg = {
64✔
1482
      .dropped = pVnode->dropped,
64✔
1483
      .vgId = dstVgId,
1484
      .vgVersion = pVnode->vgVersion,
64✔
1485
      .diskPrimary = pVnode->diskPrimary,
64✔
1486
  };
1487
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
64✔
1488

1489
  // prepare alter
1490
  pVnode->toVgId = dstVgId;
64✔
1491
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
64!
1492
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
1493
    return -1;
×
1494
  }
1495

1496
  dInfo("vgId:%d, close vnode", srcVgId);
64!
1497
  vmCloseVnode(pMgmt, pVnode, true, false);
64✔
1498

1499
  int32_t diskPrimary = wrapperCfg.diskPrimary;
64✔
1500
  char    srcPath[TSDB_FILENAME_LEN] = {0};
64✔
1501
  char    dstPath[TSDB_FILENAME_LEN] = {0};
64✔
1502
  snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
64✔
1503
  snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
64✔
1504

1505
  dInfo("vgId:%d, alter vnode hashrange at %s", srcVgId, srcPath);
64!
1506
  if (vnodeAlterHashRange(srcPath, dstPath, &req, diskPrimary, pMgmt->pTfs) < 0) {
64!
1507
    dError("vgId:%d, failed to alter vnode hashrange since %s", srcVgId, terrstr());
×
1508
    return -1;
×
1509
  }
1510

1511
  dInfo("vgId:%d, open vnode", dstVgId);
64!
1512
  SVnode *pImpl = vnodeOpen(dstPath, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
64✔
1513

1514
  if (pImpl == NULL) {
64!
1515
    dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr());
×
1516
    return -1;
×
1517
  }
1518

1519
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
64!
1520
    dError("vgId:%d, failed to open vnode mgmt since %s", dstVgId, terrstr());
×
1521
    return -1;
×
1522
  }
1523

1524
  if (vnodeStart(pImpl) != 0) {
64!
1525
    dError("vgId:%d, failed to start sync since %s", dstVgId, terrstr());
×
1526
    return -1;
×
1527
  }
1528

1529
  // complete alter
1530
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
64!
1531
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
1532
    return -1;
×
1533
  }
1534

1535
  dInfo("vgId:%d, vnode hashrange is altered", dstVgId);
64!
1536
  return 0;
64✔
1537
}
1538

1539
int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
1,193✔
1540
  SAlterVnodeReplicaReq alterReq = {0};
1,193✔
1541
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
1,193!
1542
    terrno = TSDB_CODE_INVALID_MSG;
×
1543
    return -1;
×
1544
  }
1545

1546
  if (alterReq.learnerReplica == 0) {
1,193✔
1547
    alterReq.learnerSelfIndex = -1;
862✔
1548
  }
1549

1550
  int32_t vgId = alterReq.vgId;
1,193✔
1551
  dInfo(
1,193!
1552
      "vgId:%d, vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
1553
      "learnerSelfIndex:%d strict:%d changeVersion:%d",
1554
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
1555
      alterReq.learnerSelfIndex, alterReq.strict, alterReq.changeVersion);
1556

1557
  for (int32_t i = 0; i < alterReq.replica; ++i) {
4,547✔
1558
    SReplica *pReplica = &alterReq.replicas[i];
3,354✔
1559
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
3,354!
1560
  }
1561
  for (int32_t i = 0; i < alterReq.learnerReplica; ++i) {
1,524✔
1562
    SReplica *pReplica = &alterReq.learnerReplicas[i];
331✔
1563
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
331!
1564
  }
1565

1566
  if (alterReq.replica <= 0 || (alterReq.selfIndex < 0 && alterReq.learnerSelfIndex < 0) ||
1,193!
1567
      alterReq.selfIndex >= alterReq.replica || alterReq.learnerSelfIndex >= alterReq.learnerReplica) {
1,193!
1568
    terrno = TSDB_CODE_INVALID_MSG;
×
1569
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
1570
    return -1;
×
1571
  }
1572

1573
  SReplica *pReplica = NULL;
1,193✔
1574
  if (alterReq.selfIndex != -1) {
1,193!
1575
    pReplica = &alterReq.replicas[alterReq.selfIndex];
1,193✔
1576
  } else {
1577
    pReplica = &alterReq.learnerReplicas[alterReq.learnerSelfIndex];
×
1578
  }
1579

1580
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
1,193!
1581
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
1,193!
1582
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1583
    dError("vgId:%d, dnodeId:%d ep:%s:%u in request, ep:%s:%u in lcoal, %s", vgId, pReplica->id, pReplica->fqdn,
×
1584
           pReplica->port, tsLocalFqdn, tsServerPort, tstrerror(terrno));
1585
    return -1;
×
1586
  }
1587

1588
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
1,193✔
1589
  if (pVnode == NULL) {
1,193!
1590
    dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
×
1591
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1592
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1593
    return -1;
×
1594
  }
1595

1596
  dInfo("vgId:%d, start to close vnode", vgId);
1,193!
1597
  SWrapperCfg wrapperCfg = {
1,193✔
1598
      .dropped = pVnode->dropped,
1,193✔
1599
      .vgId = pVnode->vgId,
1,193✔
1600
      .vgVersion = pVnode->vgVersion,
1,193✔
1601
      .diskPrimary = pVnode->diskPrimary,
1,193✔
1602
  };
1603
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
1,193✔
1604

1605
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
1,193✔
1606
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
1,193✔
1607

1608
  int32_t diskPrimary = wrapperCfg.diskPrimary;
1,193✔
1609
  char    path[TSDB_FILENAME_LEN] = {0};
1,193✔
1610
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
1,193✔
1611

1612
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
1,193!
1613
  if (vnodeAlterReplica(path, &alterReq, diskPrimary, pMgmt->pTfs) < 0) {
1,193!
1614
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
1615
    return -1;
×
1616
  }
1617

1618
  dInfo("vgId:%d, begin to open vnode", vgId);
1,193!
1619
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
1,193✔
1620
  if (pImpl == NULL) {
1,193!
1621
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
1622
    return -1;
×
1623
  }
1624

1625
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
1,193!
1626
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
1627
    return -1;
×
1628
  }
1629

1630
  if (vnodeStart(pImpl) != 0) {
1,193!
1631
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
1632
    return -1;
×
1633
  }
1634

1635
  dInfo(
1,193!
1636
      "vgId:%d, vnode management handle msgType:%s, end to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
1637
      "learnerSelfIndex:%d strict:%d",
1638
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
1639
      alterReq.learnerSelfIndex, alterReq.strict);
1640
  return 0;
1,193✔
1641
}
1642

1643
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
4,237✔
1644
  int32_t       code = 0;
4,237✔
1645
  SDropVnodeReq dropReq = {0};
4,237✔
1646
  if (tDeserializeSDropVnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
4,237!
1647
    terrno = TSDB_CODE_INVALID_MSG;
×
1648
    return terrno;
×
1649
  }
1650

1651
  int32_t vgId = dropReq.vgId;
4,237✔
1652
  dInfo("vgId:%d, start to drop vnode", vgId);
4,237!
1653

1654
  if (dropReq.dnodeId != pMgmt->pData->dnodeId) {
4,237!
1655
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1656
    dError("vgId:%d, dnodeId:%d, %s", dropReq.vgId, dropReq.dnodeId, tstrerror(terrno));
×
1657
    return terrno;
×
1658
  }
1659

1660
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, vgId, false);
4,237✔
1661
  if (pVnode == NULL) {
4,237!
1662
    dInfo("vgId:%d, failed to drop since %s", vgId, terrstr());
×
1663
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1664
    return terrno;
×
1665
  }
1666

1667
  pVnode->dropped = 1;
4,237✔
1668
  if ((code = vmWriteVnodeListToFile(pMgmt)) != 0) {
4,237!
1669
    pVnode->dropped = 0;
×
1670
    vmReleaseVnode(pMgmt, pVnode);
×
1671
    return code;
×
1672
  }
1673

1674
  vmCloseVnode(pMgmt, pVnode, false, false);
4,237✔
1675
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
4,237!
1676
    dError("vgId:%d, failed to write vnode list since %s", vgId, terrstr());
×
1677
  }
1678

1679
  dInfo("vgId:%d, is dropped", vgId);
4,237!
1680
  return 0;
4,237✔
1681
}
1682

1683
int32_t vmProcessArbHeartBeatReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
155✔
1684
  SVArbHeartBeatReq arbHbReq = {0};
155✔
1685
  SVArbHeartBeatRsp arbHbRsp = {0};
155✔
1686
  if (tDeserializeSVArbHeartBeatReq(pMsg->pCont, pMsg->contLen, &arbHbReq) != 0) {
155!
1687
    terrno = TSDB_CODE_INVALID_MSG;
×
1688
    return -1;
×
1689
  }
1690

1691
  if (arbHbReq.dnodeId != pMgmt->pData->dnodeId) {
155!
1692
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1693
    dError("dnodeId:%d, %s", arbHbReq.dnodeId, tstrerror(terrno));
×
1694
    goto _OVER;
×
1695
  }
1696

1697
  if (strlen(arbHbReq.arbToken) == 0) {
155!
1698
    terrno = TSDB_CODE_INVALID_MSG;
×
1699
    dError("dnodeId:%d arbToken is empty", arbHbReq.dnodeId);
×
1700
    goto _OVER;
×
1701
  }
1702

1703
  size_t size = taosArrayGetSize(arbHbReq.hbMembers);
155✔
1704

1705
  arbHbRsp.dnodeId = pMgmt->pData->dnodeId;
155✔
1706
  tstrncpy(arbHbRsp.arbToken, arbHbReq.arbToken, TSDB_ARB_TOKEN_SIZE);
155✔
1707
  arbHbRsp.hbMembers = taosArrayInit(size, sizeof(SVArbHbRspMember));
155✔
1708
  if (arbHbRsp.hbMembers == NULL) {
155!
1709
    goto _OVER;
×
1710
  }
1711

1712
  for (int32_t i = 0; i < size; i++) {
330✔
1713
    SVArbHbReqMember *pReqMember = taosArrayGet(arbHbReq.hbMembers, i);
175✔
1714
    SVnodeObj        *pVnode = vmAcquireVnode(pMgmt, pReqMember->vgId);
175✔
1715
    if (pVnode == NULL) {
175!
1716
      dError("dnodeId:%d vgId:%d not found failed to process arb hb req", arbHbReq.dnodeId, pReqMember->vgId);
×
1717
      continue;
×
1718
    }
1719

1720
    SVArbHbRspMember rspMember = {0};
175✔
1721
    rspMember.vgId = pReqMember->vgId;
175✔
1722
    rspMember.hbSeq = pReqMember->hbSeq;
175✔
1723
    if (vnodeGetArbToken(pVnode->pImpl, rspMember.memberToken) != 0) {
175!
1724
      dError("dnodeId:%d vgId:%d failed to get arb token", arbHbReq.dnodeId, pReqMember->vgId);
×
1725
      vmReleaseVnode(pMgmt, pVnode);
×
1726
      continue;
×
1727
    }
1728

1729
    if (vnodeUpdateArbTerm(pVnode->pImpl, arbHbReq.arbTerm) != 0) {
175!
1730
      dError("dnodeId:%d vgId:%d failed to update arb term", arbHbReq.dnodeId, pReqMember->vgId);
×
1731
      vmReleaseVnode(pMgmt, pVnode);
×
1732
      continue;
×
1733
    }
1734

1735
    if (taosArrayPush(arbHbRsp.hbMembers, &rspMember) == NULL) {
350!
1736
      dError("dnodeId:%d vgId:%d failed to push arb hb rsp member", arbHbReq.dnodeId, pReqMember->vgId);
×
1737
      vmReleaseVnode(pMgmt, pVnode);
×
1738
      goto _OVER;
×
1739
    }
1740

1741
    vmReleaseVnode(pMgmt, pVnode);
175✔
1742
  }
1743

1744
  SRpcMsg rspMsg = {.info = pMsg->info};
155✔
1745
  int32_t rspLen = tSerializeSVArbHeartBeatRsp(NULL, 0, &arbHbRsp);
155✔
1746
  if (rspLen < 0) {
155!
1747
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1748
    goto _OVER;
×
1749
  }
1750

1751
  void *pRsp = rpcMallocCont(rspLen);
155✔
1752
  if (pRsp == NULL) {
155!
1753
    terrno = terrno;
×
1754
    goto _OVER;
×
1755
  }
1756

1757
  if (tSerializeSVArbHeartBeatRsp(pRsp, rspLen, &arbHbRsp) <= 0) {
155!
1758
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1759
    rpcFreeCont(pRsp);
×
1760
    goto _OVER;
×
1761
  }
1762
  pMsg->info.rsp = pRsp;
155✔
1763
  pMsg->info.rspLen = rspLen;
155✔
1764

1765
  terrno = TSDB_CODE_SUCCESS;
155✔
1766

1767
_OVER:
155✔
1768
  tFreeSVArbHeartBeatReq(&arbHbReq);
155✔
1769
  tFreeSVArbHeartBeatRsp(&arbHbRsp);
155✔
1770
  return terrno;
155✔
1771
}
1772

1773
SArray *vmGetMsgHandles() {
2,379✔
1774
  int32_t code = -1;
2,379✔
1775
  SArray *pArray = taosArrayInit(32, sizeof(SMgmtHandle));
2,379✔
1776
  if (pArray == NULL) goto _OVER;
2,379!
1777

1778
  if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,379!
1779
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,379!
1780
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,379!
1781
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,379!
1782
  if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,379!
1783
  if (dmSetMgmtHandle(pArray, TDMT_VND_EXEC_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,379!
1784
  if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,379!
1785
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,379!
1786
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,379!
1787
  if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,379!
1788
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,379!
1789
  if (dmSetMgmtHandle(pArray, TDMT_VND_VSUBTABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,379!
1790
  if (dmSetMgmtHandle(pArray, TDMT_VND_VSTB_REF_DBS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,379!
1791
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_CFG, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,379!
1792
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,379!
1793
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,379!
1794
  if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,379!
1795
  if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,379!
1796
  if (dmSetMgmtHandle(pArray, TDMT_SCH_TASK_NOTIFY, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,379!
1797
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,379!
1798
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,379!
1799
  if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_TTL_EXPIRED_TBS, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,379!
1800
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,379!
1801
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,379!
1802
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,379!
1803
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,379!
1804
  if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,379!
1805
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,379!
1806
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DELETE_SUB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,379!
1807
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,379!
1808
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SEEK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,379!
1809
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_ADD_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,379!
1810
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DEL_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,379!
1811
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,379!
1812
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME_PUSH, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,379!
1813
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_WALINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,379!
1814
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_COMMITTEDINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,379!
1815
  if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,379!
1816
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,379!
1817
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,379!
1818
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,379!
1819
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,379!
1820
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,379!
1821
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_COMPACT_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,379!
1822
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,379!
1823
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_NAME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,379!
1824

1825
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,379!
1826
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,379!
1827
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,379!
1828
  if (dmSetMgmtHandle(pArray, TDMT_VND_DISABLE_WRITE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,379!
1829
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,379!
1830
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,379!
1831
  if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,379!
1832

1833
  if (dmSetMgmtHandle(pArray, TDMT_VND_LIST_SSMIGRATE_FILESETS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,379!
1834
  if (dmSetMgmtHandle(pArray, TDMT_VND_SSMIGRATE_FILESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,379!
1835
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_SSMIGRATE_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,379!
1836
  if (dmSetMgmtHandle(pArray, TDMT_VND_FOLLOWER_SSMIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,379!
1837
  // if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_SSMIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1838

1839
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
2,379!
1840
  if (dmSetMgmtHandle(pArray, TDMT_DND_MOUNT_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
2,379!
1841
  if (dmSetMgmtHandle(pArray, TDMT_DND_RETRIEVE_MOUNT_PATH, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
2,379!
1842
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,379!
1843
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,379!
1844
  if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,379!
1845
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CONFIG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,379!
1846

1847
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT_ELECTION, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,379!
1848
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,379!
1849
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,379!
1850
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,379!
1851
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,379!
1852
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,379!
1853
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,379!
1854
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,379!
1855
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,379!
1856
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,379!
1857
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,379!
1858
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_FORCE_FOLLOWER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,379!
1859

1860
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
2,379!
1861
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
2,379!
1862
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
2,379!
1863
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
2,379!
1864
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
2,379!
1865

1866
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_HEARTBEAT, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,379!
1867
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_CHECK_SYNC, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,379!
1868
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_ASSIGNED_LEADER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,379!
1869

1870
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_FETCH, vmPutMsgToStreamReaderQueue, 0) == NULL) goto _OVER;
2,379!
1871
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_PULL, vmPutMsgToStreamReaderQueue, 0) == NULL) goto _OVER;
2,379!
1872
  code = 0;
2,379✔
1873

1874
_OVER:
2,379✔
1875
  if (code != 0) {
2,379!
1876
    taosArrayDestroy(pArray);
×
1877
    return NULL;
×
1878
  } else {
1879
    return pArray;
2,379✔
1880
  }
1881
}
1882

1883
void vmUpdateMetricsInfo(SVnodeMgmt *pMgmt, int64_t clusterId) {
×
1884
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
1885

1886
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
1887
  while (pIter) {
×
1888
    SVnodeObj **ppVnode = pIter;
×
1889
    if (ppVnode == NULL || *ppVnode == NULL) {
×
1890
      continue;
×
1891
    }
1892

1893
    SVnodeObj *pVnode = *ppVnode;
×
1894
    if (!pVnode->failed) {
×
1895
      SRawWriteMetrics metrics = {0};
×
1896
      if (vnodeGetRawWriteMetrics(pVnode->pImpl, &metrics) == 0) {
×
1897
        // Add the metrics to the global metrics system with cluster ID
1898
        SName   name = {0};
×
1899
        int32_t code = tNameFromString(&name, pVnode->pImpl->config.dbname, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1900
        if (code < 0) {
×
1901
          dError("failed to get db name since %s", tstrerror(code));
×
1902
          continue;
×
1903
        }
1904
        code = addWriteMetrics(pVnode->vgId, pMgmt->pData->dnodeId, clusterId, tsLocalEp, name.dbname, &metrics);
×
1905
        if (code != TSDB_CODE_SUCCESS) {
×
1906
          dError("Failed to add write metrics for vgId: %d, code: %d", pVnode->vgId, code);
×
1907
        } else {
1908
          // After successfully adding metrics, reset the vnode's write metrics using atomic operations
1909
          if (vnodeResetRawWriteMetrics(pVnode->pImpl, &metrics) != 0) {
×
1910
            dError("Failed to reset write metrics for vgId: %d", pVnode->vgId);
×
1911
          }
1912
        }
1913
      } else {
1914
        dError("Failed to get write metrics for vgId: %d", pVnode->vgId);
×
1915
      }
1916
    }
1917
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
1918
  }
1919

1920
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
1921
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc