• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4808

16 Oct 2025 11:40AM UTC coverage: 57.938% (-0.6%) from 58.524%
#4808

push

travis-ci

web-flow
fix(tref): increase TSDB_REF_OBJECTS from 100 to 2000 for improved reference handling (#33281)

137662 of 303532 branches covered (45.35%)

Branch coverage included in aggregate %.

209234 of 295200 relevant lines covered (70.88%)

4035326.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.44
/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "metrics.h"
18
#include "taos_monitor.h"
19
#include "vmInt.h"
20
#include "vnd.h"
21
#include "vnodeInt.h"
22

23
extern taos_counter_t *tsInsertCounter;
24

25
// Forward declaration for function defined in metrics.c
26
extern int32_t addWriteMetrics(int32_t vgId, int32_t dnodeId, int64_t clusterId, const char *dnodeEp,
27
                               const char *dbname, const SRawWriteMetrics *pRawMetrics);
28

29
void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
87,128✔
30
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad));
87,128✔
31
  if (pInfo->pVloads == NULL) return;
87,128!
32

33
  tfsUpdateSize(pMgmt->pTfs);
87,128✔
34

35
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
87,128✔
36

37
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
87,128✔
38
  while (pIter) {
323,933✔
39
    SVnodeObj **ppVnode = pIter;
236,805✔
40
    if (ppVnode == NULL || *ppVnode == NULL) continue;
236,805!
41

42
    SVnodeObj *pVnode = *ppVnode;
236,805✔
43
    SVnodeLoad vload = {.vgId = pVnode->vgId};
236,805✔
44
    if (!pVnode->failed) {
236,805!
45
      if (vnodeGetLoad(pVnode->pImpl, &vload) != 0) {
236,805!
46
        dError("failed to get vnode load");
×
47
      }
48
      if (isReset) vnodeResetLoad(pVnode->pImpl, &vload);
236,805✔
49
    }
50
    if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
473,610!
51
      dError("failed to push vnode load");
×
52
    }
53
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
236,805✔
54
  }
55

56
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
87,128✔
57
}
58

59
void vmSetVnodeSyncTimeout(SVnodeMgmt *pMgmt) {
×
60
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
61

62
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
63
  while (pIter) {
×
64
    SVnodeObj **ppVnode = pIter;
×
65
    if (ppVnode == NULL || *ppVnode == NULL) continue;
×
66

67
    SVnodeObj *pVnode = *ppVnode;
×
68

69
    if (vnodeSetSyncTimeout(pVnode->pImpl, tsVnodeElectIntervalMs) != 0) {
×
70
      dError("vgId:%d, failed to vnodeSetSyncTimeout", pVnode->vgId);
×
71
    }
72
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
73
  }
74

75
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
76
}
×
77

78
void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
×
79
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoadLite));
×
80
  if (!pInfo->pVloads) return;
×
81

82
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
83

84
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
85
  while (pIter) {
×
86
    SVnodeObj **ppVnode = pIter;
×
87
    if (ppVnode == NULL || *ppVnode == NULL) continue;
×
88

89
    SVnodeObj *pVnode = *ppVnode;
×
90
    if (!pVnode->failed) {
×
91
      SVnodeLoadLite vload = {0};
×
92
      if (vnodeGetLoadLite(pVnode->pImpl, &vload) == 0) {
×
93
        if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
×
94
          taosArrayDestroy(pInfo->pVloads);
×
95
          pInfo->pVloads = NULL;
×
96
          break;
×
97
        }
98
      }
99
    }
100
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
101
  }
102

103
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
104
}
105

106
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
1✔
107
  SMonVloadInfo vloads = {0};
1✔
108
  vmGetVnodeLoads(pMgmt, &vloads, true);
1✔
109

110
  SArray *pVloads = vloads.pVloads;
1✔
111
  if (pVloads == NULL) return;
1!
112

113
  int32_t totalVnodes = 0;
1✔
114
  int32_t masterNum = 0;
1✔
115
  int64_t numOfSelectReqs = 0;
1✔
116
  int64_t numOfInsertReqs = 0;
1✔
117
  int64_t numOfInsertSuccessReqs = 0;
1✔
118
  int64_t numOfBatchInsertReqs = 0;
1✔
119
  int64_t numOfBatchInsertSuccessReqs = 0;
1✔
120

121
  for (int32_t i = 0; i < taosArrayGetSize(pVloads); ++i) {
7✔
122
    SVnodeLoad *pLoad = taosArrayGet(pVloads, i);
6✔
123
    numOfSelectReqs += pLoad->numOfSelectReqs;
6✔
124
    numOfInsertReqs += pLoad->numOfInsertReqs;
6✔
125
    numOfInsertSuccessReqs += pLoad->numOfInsertSuccessReqs;
6✔
126
    numOfBatchInsertReqs += pLoad->numOfBatchInsertReqs;
6✔
127
    numOfBatchInsertSuccessReqs += pLoad->numOfBatchInsertSuccessReqs;
6✔
128
    if (pLoad->syncState == TAOS_SYNC_STATE_LEADER || pLoad->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
6!
129
      masterNum++;
6✔
130
    }
131
    totalVnodes++;
6✔
132
  }
133

134
  pInfo->vstat.totalVnodes = totalVnodes;
1✔
135
  pInfo->vstat.masterNum = masterNum;
1✔
136
  pInfo->vstat.numOfSelectReqs = numOfSelectReqs;
1✔
137
  pInfo->vstat.numOfInsertReqs = numOfInsertReqs;                          // delta
1✔
138
  pInfo->vstat.numOfInsertSuccessReqs = numOfInsertSuccessReqs;            // delta
1✔
139
  pInfo->vstat.numOfBatchInsertReqs = numOfBatchInsertReqs;                // delta
1✔
140
  pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;  // delta
1✔
141
  pMgmt->state.totalVnodes = totalVnodes;
1✔
142
  pMgmt->state.masterNum = masterNum;
1✔
143
  pMgmt->state.numOfSelectReqs = numOfSelectReqs;
1✔
144
  pMgmt->state.numOfInsertReqs = numOfInsertReqs;
1✔
145
  pMgmt->state.numOfInsertSuccessReqs = numOfInsertSuccessReqs;
1✔
146
  pMgmt->state.numOfBatchInsertReqs = numOfBatchInsertReqs;
1✔
147
  pMgmt->state.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;
1✔
148

149
  if (tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs) != 0) {
1!
150
    dError("failed to get tfs monitor info");
×
151
  }
152
  taosArrayDestroy(pVloads);
1✔
153
}
154

155
void vmCleanExpriedSamples(SVnodeMgmt *pMgmt) {
1✔
156
  int list_size = taos_counter_get_keys_size(tsInsertCounter);
1✔
157
  if (list_size == 0) return;
1!
158
  int32_t *vgroup_ids;
159
  char   **keys;
160
  int      r = 0;
×
161
  r = taos_counter_get_vgroup_ids(tsInsertCounter, &keys, &vgroup_ids, &list_size);
×
162
  if (r) {
×
163
    dError("failed to get vgroup ids");
×
164
    return;
×
165
  }
166
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
167
  for (int i = 0; i < list_size; i++) {
×
168
    int32_t vgroup_id = vgroup_ids[i];
×
169
    void   *vnode = taosHashGet(pMgmt->runngingHash, &vgroup_id, sizeof(int32_t));
×
170
    if (vnode == NULL) {
×
171
      r = taos_counter_delete(tsInsertCounter, keys[i]);
×
172
      if (r) {
×
173
        dError("failed to delete monitor sample key:%s", keys[i]);
×
174
      }
175
    }
176
  }
177
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
178
  if (vgroup_ids) taosMemoryFree(vgroup_ids);
×
179
  if (keys) taosMemoryFree(keys);
×
180
  return;
×
181
}
182

183
void vmCleanExpiredMetrics(SVnodeMgmt *pMgmt) {
×
184
  if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0 || !tsEnableMetrics) {
×
185
    return;
×
186
  }
187

188
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
189
  void     *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
190
  SHashObj *pValidVgroups = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
×
191
  if (pValidVgroups == NULL) {
×
192
    (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
193
    return;
×
194
  }
195

196
  while (pIter != NULL) {
×
197
    SVnodeObj **ppVnode = pIter;
×
198
    if (ppVnode && *ppVnode) {
×
199
      int32_t vgId = (*ppVnode)->vgId;
×
200
      char    dummy = 1;  // hash table value (we only care about the key)
×
201
      if (taosHashPut(pValidVgroups, &vgId, sizeof(int32_t), &dummy, sizeof(char)) != 0) {
×
202
        dError("failed to put vgId:%d to valid vgroups hash", vgId);
×
203
      }
204
    }
205
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
206
  }
207
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
208

209
  // Clean expired metrics by removing metrics for non-existent vgroups
210
  int32_t code = cleanupExpiredMetrics(pValidVgroups);
×
211
  if (code != TSDB_CODE_SUCCESS) {
×
212
    dError("failed to clean expired metrics, code:%d", code);
×
213
  }
214

215
  taosHashCleanup(pValidVgroups);
×
216
}
217

218
static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
7,012✔
219
  memcpy(pCfg, &vnodeCfgDefault, sizeof(SVnodeCfg));
7,012✔
220

221
  pCfg->vgId = pCreate->vgId;
7,012✔
222
  tstrncpy(pCfg->dbname, pCreate->db, sizeof(pCfg->dbname));
7,012✔
223
  pCfg->dbId = pCreate->dbUid;
7,012✔
224
  pCfg->szPage = pCreate->pageSize * 1024;
7,012✔
225
  pCfg->szCache = pCreate->pages;
7,012✔
226
  pCfg->cacheLast = pCreate->cacheLast;
7,012✔
227
  pCfg->cacheLastSize = pCreate->cacheLastSize;
7,012✔
228
  pCfg->szBuf = (uint64_t)pCreate->buffer * 1024 * 1024;
7,012✔
229
  pCfg->isWeak = true;
7,012✔
230
  pCfg->isTsma = pCreate->isTsma;
7,012✔
231
  pCfg->tsdbCfg.compression = pCreate->compression;
7,012✔
232
  pCfg->tsdbCfg.precision = pCreate->precision;
7,012✔
233
  pCfg->tsdbCfg.days = pCreate->daysPerFile;
7,012✔
234
  pCfg->tsdbCfg.keep0 = pCreate->daysToKeep0;
7,012✔
235
  pCfg->tsdbCfg.keep1 = pCreate->daysToKeep1;
7,012✔
236
  pCfg->tsdbCfg.keep2 = pCreate->daysToKeep2;
7,012✔
237
  pCfg->tsdbCfg.keepTimeOffset = pCreate->keepTimeOffset;
7,012✔
238
  pCfg->tsdbCfg.minRows = pCreate->minRows;
7,012✔
239
  pCfg->tsdbCfg.maxRows = pCreate->maxRows;
7,012✔
240
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
241
  pCfg->tsdbCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
7,012✔
242
  if (pCfg->tsdbCfg.encryptAlgorithm == DND_CA_SM4) {
7,012✔
243
    tstrncpy(pCfg->tsdbCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
3✔
244
  }
245
#else
246
  pCfg->tsdbCfg.encryptAlgorithm = 0;
247
#endif
248

249
  pCfg->walCfg.vgId = pCreate->vgId;  // pCreate->mountVgId ? pCreate->mountVgId : pCreate->vgId;
7,012✔
250
  pCfg->walCfg.fsyncPeriod = pCreate->walFsyncPeriod;
7,012✔
251
  pCfg->walCfg.retentionPeriod = pCreate->walRetentionPeriod;
7,012✔
252
  pCfg->walCfg.rollPeriod = pCreate->walRollPeriod;
7,012✔
253
  pCfg->walCfg.retentionSize = pCreate->walRetentionSize;
7,012✔
254
  pCfg->walCfg.segSize = pCreate->walSegmentSize;
7,012✔
255
  pCfg->walCfg.level = pCreate->walLevel;
7,012✔
256
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
257
  pCfg->walCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
7,012✔
258
  if (pCfg->walCfg.encryptAlgorithm == DND_CA_SM4) {
7,012✔
259
    tstrncpy(pCfg->walCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
3✔
260
  }
261
#else
262
  pCfg->walCfg.encryptAlgorithm = 0;
263
#endif
264

265
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
266
  pCfg->tdbEncryptAlgorithm = pCreate->encryptAlgorithm;
7,012✔
267
  if (pCfg->tdbEncryptAlgorithm == DND_CA_SM4) {
7,012✔
268
    tstrncpy(pCfg->tdbEncryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
3✔
269
  }
270
#else
271
  pCfg->tdbEncryptAlgorithm = 0;
272
#endif
273

274
  pCfg->sttTrigger = pCreate->sstTrigger;
7,012✔
275
  pCfg->hashBegin = pCreate->hashBegin;
7,012✔
276
  pCfg->hashEnd = pCreate->hashEnd;
7,012✔
277
  pCfg->hashMethod = pCreate->hashMethod;
7,012✔
278
  pCfg->hashPrefix = pCreate->hashPrefix;
7,012✔
279
  pCfg->hashSuffix = pCreate->hashSuffix;
7,012✔
280
  pCfg->tsdbPageSize = pCreate->tsdbPageSize * 1024;
7,012✔
281

282
  pCfg->ssChunkSize = pCreate->ssChunkSize;
7,012✔
283
  pCfg->ssKeepLocal = pCreate->ssKeepLocal;
7,012✔
284
  pCfg->ssCompact = pCreate->ssCompact;
7,012✔
285

286
  pCfg->standby = 0;
7,012✔
287
  pCfg->syncCfg.replicaNum = 0;
7,012✔
288
  pCfg->syncCfg.totalReplicaNum = 0;
7,012✔
289
  pCfg->syncCfg.changeVersion = pCreate->changeVersion;
7,012✔
290

291
  memset(&pCfg->syncCfg.nodeInfo, 0, sizeof(pCfg->syncCfg.nodeInfo));
7,012✔
292
  for (int32_t i = 0; i < pCreate->replica; ++i) {
17,100✔
293
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
10,087✔
294
    pNode->nodeId = pCreate->replicas[pCfg->syncCfg.replicaNum].id;
10,087✔
295
    pNode->nodePort = pCreate->replicas[pCfg->syncCfg.replicaNum].port;
10,087✔
296
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
10,087✔
297
    tstrncpy(pNode->nodeFqdn, pCreate->replicas[pCfg->syncCfg.replicaNum].fqdn, TSDB_FQDN_LEN);
10,087✔
298
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
10,087✔
299
    pCfg->syncCfg.replicaNum++;
10,088✔
300
  }
301
  if (pCreate->selfIndex != -1) {
7,013✔
302
    pCfg->syncCfg.myIndex = pCreate->selfIndex;
6,817✔
303
  }
304
  for (int32_t i = pCfg->syncCfg.replicaNum; i < pCreate->replica + pCreate->learnerReplica; ++i) {
7,209✔
305
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
196✔
306
    pNode->nodeId = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].id;
196✔
307
    pNode->nodePort = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].port;
196✔
308
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
196✔
309
    tstrncpy(pNode->nodeFqdn, pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].fqdn, TSDB_FQDN_LEN);
196✔
310
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
196✔
311
    pCfg->syncCfg.totalReplicaNum++;
196✔
312
  }
313
  pCfg->syncCfg.totalReplicaNum += pCfg->syncCfg.replicaNum;
7,013✔
314
  if (pCreate->learnerSelfIndex != -1) {
7,013✔
315
    pCfg->syncCfg.myIndex = pCreate->replica + pCreate->learnerSelfIndex;
196✔
316
  }
317
}
7,013✔
318

319
static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) {
7,010✔
320
  pCfg->vgId = pCreate->vgId;
7,010✔
321
  pCfg->vgVersion = pCreate->vgVersion;
7,010✔
322
  pCfg->dropped = 0;
7,010✔
323
  snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCreate->vgId);
7,010✔
324
}
7,010✔
325

326
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
7,004✔
327
  SCreateVnodeReq req = {0};
7,004✔
328
  SVnodeCfg       vnodeCfg = {0};
7,004✔
329
  SWrapperCfg     wrapperCfg = {0};
7,004✔
330
  int32_t         code = -1;
7,004✔
331
  char            path[TSDB_FILENAME_LEN] = {0};
7,004✔
332

333
  if (tDeserializeSCreateVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
7,004!
334
    return TSDB_CODE_INVALID_MSG;
×
335
  }
336

337
  if (req.learnerReplica == 0) {
6,994✔
338
    req.learnerSelfIndex = -1;
6,800✔
339
  }
340

341
  dInfo(
6,994!
342
      "vgId:%d, vnode management handle msgType:%s, start to create vnode, page:%d pageSize:%d buffer:%d szPage:%d "
343
      "szBuf:%" PRIu64 ", cacheLast:%d cacheLastSize:%d sstTrigger:%d tsdbPageSize:%d %d dbname:%s dbId:%" PRId64
344
      ", days:%d keep0:%d keep1:%d keep2:%d keepTimeOffset%d ssChunkSize:%d ssKeepLocal:%d ssCompact:%d tsma:%d "
345
      "precision:%d compression:%d minRows:%d maxRows:%d"
346
      ", wal fsync:%d level:%d retentionPeriod:%d retentionSize:%" PRId64 " rollPeriod:%d segSize:%" PRId64
347
      ", hash method:%d begin:%u end:%u prefix:%d surfix:%d replica:%d selfIndex:%d "
348
      "learnerReplica:%d learnerSelfIndex:%d strict:%d changeVersion:%d encryptAlgorithm:%d",
349
      req.vgId, TMSG_INFO(pMsg->msgType), req.pages, req.pageSize, req.buffer, req.pageSize * 1024,
350
      (uint64_t)req.buffer * 1024 * 1024, req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize,
351
      req.tsdbPageSize * 1024, req.db, req.dbUid, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
352
      req.keepTimeOffset, req.ssChunkSize, req.ssKeepLocal, req.ssCompact, req.isTsma, req.precision, req.compression,
353
      req.minRows, req.maxRows, req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize,
354
      req.walRollPeriod, req.walSegmentSize, req.hashMethod, req.hashBegin, req.hashEnd, req.hashPrefix, req.hashSuffix,
355
      req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict, req.changeVersion,
356
      req.encryptAlgorithm);
357

358
  for (int32_t i = 0; i < req.replica; ++i) {
17,085✔
359
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port,
10,080!
360
          req.replicas[i].id);
361
  }
362
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
7,201✔
363
    dInfo("vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.vgId, i, req.learnerReplicas[i].fqdn,
196!
364
          req.learnerReplicas[i].port, req.replicas[i].id);
365
  }
366

367
  SReplica *pReplica = NULL;
7,005✔
368
  if (req.selfIndex != -1) {
7,005✔
369
    pReplica = &req.replicas[req.selfIndex];
6,809✔
370
  } else {
371
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
196✔
372
  }
373
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
7,005!
374
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
7,005!
375
    (void)tFreeSCreateVnodeReq(&req);
×
376

377
    code = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
378
    dError("vgId:%d, dnodeId:%d ep:%s:%u in request, ep:%s:%u in local, %s", req.vgId, pReplica->id,
×
379
           pReplica->fqdn, pReplica->port, tsLocalFqdn, tsServerPort, tstrerror(code));
380
    return code;
×
381
  }
382

383
  if (req.encryptAlgorithm == DND_CA_SM4) {
7,005✔
384
    if (strlen(tsEncryptKey) == 0) {
3!
385
      (void)tFreeSCreateVnodeReq(&req);
×
386
      code = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
387
      dError("vgId:%d, failed to create vnode since encrypt key is empty, reason:%s", req.vgId, tstrerror(code));
×
388
      return code;
×
389
    }
390
  }
391

392
  vmGenerateVnodeCfg(&req, &vnodeCfg);
7,005✔
393

394
  vmGenerateWrapperCfg(pMgmt, &req, &wrapperCfg);
7,002✔
395

396
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, req.vgId, false);
7,004✔
397
  if (pVnode != NULL && (req.replica == 1 || !pVnode->failed)) {
7,005!
398
    dError("vgId:%d, already exist", req.vgId);
84!
399
    (void)tFreeSCreateVnodeReq(&req);
84✔
400
    vmReleaseVnode(pMgmt, pVnode);
84✔
401
    code = TSDB_CODE_VND_ALREADY_EXIST;
84✔
402
    return 0;
84✔
403
  }
404

405
  int32_t diskPrimary = vmGetPrimaryDisk(pMgmt, vnodeCfg.vgId);
6,921✔
406
  if (diskPrimary < 0) {
6,917!
407
    diskPrimary = vmAllocPrimaryDisk(pMgmt, vnodeCfg.vgId);
6,917✔
408
  }
409
  wrapperCfg.diskPrimary = diskPrimary;
6,921✔
410

411
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
6,921✔
412

413
  if ((code = vnodeCreate(path, &vnodeCfg, diskPrimary, pMgmt->pTfs)) < 0) {
6,921!
414
    dError("vgId:%d, failed to create vnode since %s", req.vgId, tstrerror(code));
×
415
    vmReleaseVnode(pMgmt, pVnode);
×
416
    vmCleanPrimaryDisk(pMgmt, req.vgId);
×
417
    (void)tFreeSCreateVnodeReq(&req);
×
418
    return code;
×
419
  }
420

421
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, true);
6,921✔
422
  if (pImpl == NULL) {
6,921!
423
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
424
    code = terrno != 0 ? terrno : -1;
×
425
    goto _OVER;
×
426
  }
427

428
  code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl);
6,921✔
429
  if (code != 0) {
6,921!
430
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
431
    code = terrno != 0 ? terrno : code;
×
432
    goto _OVER;
×
433
  }
434

435
  code = vnodeStart(pImpl);
6,921✔
436
  if (code != 0) {
6,921!
437
    dError("vgId:%d, failed to start sync since %s", req.vgId, terrstr());
×
438
    goto _OVER;
×
439
  }
440

441
  code = vmWriteVnodeListToFile(pMgmt);
6,921✔
442
  if (code != 0) {
6,921!
443
    code = terrno != 0 ? terrno : code;
×
444
    goto _OVER;
×
445
  }
446

447
_OVER:
6,921✔
448
  vmCleanPrimaryDisk(pMgmt, req.vgId);
6,921✔
449

450
  if (code != 0) {
6,921!
451
    vmCloseFailedVnode(pMgmt, req.vgId);
×
452

453
    vnodeClose(pImpl);
×
454
    vnodeDestroy(0, path, pMgmt->pTfs, 0);
×
455
  } else {
456
    dInfo("vgId:%d, vnode management handle msgType:%s, end to create vnode, vnode is created", req.vgId,
6,921!
457
          TMSG_INFO(pMsg->msgType));
458
  }
459

460
  (void)tFreeSCreateVnodeReq(&req);
6,921✔
461
  terrno = code;
6,921✔
462
  return code;
6,921✔
463
}
464

465
#ifdef USE_MOUNT
466
typedef struct {
467
  int64_t dbId;
468
  int32_t vgId;
469
  int32_t diskPrimary;
470
} SMountDbVgId;
471
extern int32_t vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo);
472
extern int32_t mndFetchSdbStables(const char *mntName, const char *path, void *output);
473

474
static int compareVnodeInfo(const void *p1, const void *p2) {
21✔
475
  SVnodeInfo *v1 = (SVnodeInfo *)p1;
21✔
476
  SVnodeInfo *v2 = (SVnodeInfo *)p2;
21✔
477

478
  if (v1->config.dbId == v2->config.dbId) {
21✔
479
    if (v1->config.vgId == v2->config.vgId) {
12!
480
      return 0;
×
481
    }
482
    return v1->config.vgId > v2->config.vgId ? 1 : -1;
12!
483
  }
484

485
  return v1->config.dbId > v2->config.dbId ? 1 : -1;
9!
486
}
487
static int compareVgDiskPrimary(const void *p1, const void *p2) {
21✔
488
  SMountDbVgId *v1 = (SMountDbVgId *)p1;
21✔
489
  SMountDbVgId *v2 = (SMountDbVgId *)p2;
21✔
490

491
  if (v1->dbId == v2->dbId) {
21✔
492
    if (v1->vgId == v2->vgId) {
12!
493
      return 0;
×
494
    }
495
    return v1->vgId > v2->vgId ? 1 : -1;
12!
496
  }
497

498
  return v1->dbId > v2->dbId ? 1 : -1;
9!
499
}
500

501
static int32_t vmRetrieveMountDnode(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
3✔
502
  int32_t   code = 0, lino = 0;
3✔
503
  TdFilePtr pFile = NULL;
3✔
504
  char     *content = NULL;
3✔
505
  SJson    *pJson = NULL;
3✔
506
  int64_t   size = 0;
3✔
507
  int64_t   clusterId = 0, dropped = 0, encryptScope = 0;
3✔
508
  char      file[TSDB_MOUNT_FPATH_LEN] = {0};
3✔
509
  SArray   *pDisks = NULL;
3✔
510
  // step 1: fetch clusterId from dnode.json
511
  (void)snprintf(file, sizeof(file), "%s%s%s%sdnode.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP);
3✔
512
  TAOS_CHECK_EXIT(taosStatFile(file, &size, NULL, NULL));
3!
513
  TSDB_CHECK_NULL((pFile = taosOpenFile(file, TD_FILE_READ)), code, lino, _exit, terrno);
3!
514
  TSDB_CHECK_NULL((content = taosMemoryMalloc(size + 1)), code, lino, _exit, terrno);
3!
515
  if (taosReadFile(pFile, content, size) != size) {
3!
516
    TAOS_CHECK_EXIT(terrno);
×
517
  }
518
  content[size] = '\0';
3✔
519
  pJson = tjsonParse(content);
3✔
520
  if (pJson == NULL) {
3!
521
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
522
  }
523
  tjsonGetNumberValue(pJson, "dropped", dropped, code);
3✔
524
  TAOS_CHECK_EXIT(code);
3!
525
  if (dropped == 1) {
3!
526
    TAOS_CHECK_EXIT(TSDB_CODE_MND_MOUNT_DNODE_DROPPED);
×
527
  }
528
  tjsonGetNumberValue(pJson, "encryptScope", encryptScope, code);
3✔
529
  TAOS_CHECK_EXIT(code);
3!
530
  if (encryptScope != 0) {
3!
531
    TAOS_CHECK_EXIT(TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG);
×
532
  }
533
  tjsonGetNumberValue(pJson, "clusterId", clusterId, code);
3✔
534
  TAOS_CHECK_EXIT(code);
3!
535
  if (clusterId == 0) {
3!
536
    TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_CLUSTER_ID);
×
537
  }
538
  pMountInfo->clusterId = clusterId;
3✔
539
  if (content != NULL) taosMemoryFreeClear(content);
3!
540
  if (pJson != NULL) {
3!
541
    cJSON_Delete(pJson);
3✔
542
    pJson = NULL;
3✔
543
  }
544
  if (pFile != NULL) taosCloseFile(&pFile);
3!
545
  // step 2: fetch dataDir from dnode/config/local.json
546
  TAOS_CHECK_EXIT(vmGetMountDisks(pMgmt, pReq->mountPath, &pDisks));
3!
547
  int32_t nDisks = taosArrayGetSize(pDisks);
3✔
548
  if (nDisks < 1 || nDisks > TFS_MAX_DISKS) {
3!
549
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
550
  }
551
  for (int32_t i = 0; i < nDisks; ++i) {
21✔
552
    SDiskCfg *pDisk = TARRAY_GET_ELEM(pDisks, i);
18✔
553
    if (!pMountInfo->pDisks[pDisk->level]) {
18✔
554
      pMountInfo->pDisks[pDisk->level] = taosArrayInit(1, sizeof(char *));
6✔
555
      if (!pMountInfo->pDisks[pDisk->level]) {
6!
556
        TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
557
      }
558
    }
559
    char *pDir = taosStrdup(pDisk->dir);
18!
560
    if (pDir == NULL) {
18!
561
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
562
    }
563
    if (pDisk->primary == 1 && taosArrayGetSize(pMountInfo->pDisks[0])) {
18!
564
      // put the primary disk to the first position of level 0
565
      if (!taosArrayInsert(pMountInfo->pDisks[0], 0, &pDir)) {
3!
566
        taosMemFree(pDir);
×
567
        TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
568
      }
569
    } else if (!taosArrayPush(pMountInfo->pDisks[pDisk->level], &pDir)) {
30!
570
      taosMemFree(pDir);
×
571
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
572
    }
573
  }
574
  for (int32_t i = 0; i < TFS_MAX_TIERS; ++i) {
12✔
575
    int32_t nDisk = taosArrayGetSize(pMountInfo->pDisks[i]);
9✔
576
    if (nDisk < (i == 0 ? 1 : 0) || nDisk > TFS_MAX_DISKS_PER_TIER) {
9!
577
      dError("mount:%s, invalid disk number:%d at level:%d", pReq->mountName, nDisk, i);
×
578
      TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
579
    }
580
  }
581
_exit:
3✔
582
  if (content != NULL) taosMemoryFreeClear(content);
3!
583
  if (pJson != NULL) cJSON_Delete(pJson);
3!
584
  if (pFile != NULL) taosCloseFile(&pFile);
3!
585
  if (pDisks != NULL) {
3!
586
    taosArrayDestroy(pDisks);
3✔
587
    pDisks = NULL;
3✔
588
  }
589
  if (code != 0) {
3!
590
    dError("mount:%s, failed to retrieve mount dnode at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
591
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
592
  } else {
593
    dInfo("mount:%s, success to retrieve mount dnode on dnode:%d, clusterId:%" PRId64 ", path:%s", pReq->mountName,
3!
594
          pReq->dnodeId, pMountInfo->clusterId, pReq->mountPath);
595
  }
596
  TAOS_RETURN(code);
3✔
597
}
598

599
static int32_t vmRetrieveMountVnodes(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
3✔
600
  int32_t       code = 0, lino = 0;
3✔
601
  SWrapperCfg  *pCfgs = NULL;
3✔
602
  int32_t       numOfVnodes = 0;
3✔
603
  char          path[TSDB_MOUNT_FPATH_LEN] = {0};
3✔
604
  TdDirPtr      pDir = NULL;
3✔
605
  TdDirEntryPtr de = NULL;
3✔
606
  SVnodeMgmt    vnodeMgmt = {0};
3✔
607
  SArray       *pVgCfgs = NULL;
3✔
608
  SArray       *pDbInfos = NULL;
3✔
609
  SArray       *pDiskPrimarys = NULL;
3✔
610

611
  snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
3✔
612
  vnodeMgmt.path = path;
3✔
613
  TAOS_CHECK_EXIT(vmGetVnodeListFromFile(&vnodeMgmt, &pCfgs, &numOfVnodes));
3!
614
  dInfo("mount:%s, num of vnodes is %d in path:%s", pReq->mountName, numOfVnodes, vnodeMgmt.path);
3!
615
  TSDB_CHECK_NULL((pVgCfgs = taosArrayInit_s(sizeof(SVnodeInfo), numOfVnodes)), code, lino, _exit, terrno);
3!
616
  TSDB_CHECK_NULL((pDiskPrimarys = taosArrayInit(numOfVnodes, sizeof(SMountDbVgId))), code, lino, _exit, terrno);
3!
617

618
  int32_t nDiskLevel0 = taosArrayGetSize(pMountInfo->pDisks[0]);
3✔
619
  int32_t nVgDropped = 0, j = 0;
3✔
620
  for (int32_t i = 0; i < numOfVnodes; ++i) {
15✔
621
    SWrapperCfg *pCfg = &pCfgs[i];
12✔
622
    // in order to support multi-tier disk, the pCfg->path should be adapted according to the diskPrimary firstly
623
    if (nDiskLevel0 > 1) {
12!
624
      char *pDir = taosArrayGet(pMountInfo->pDisks[0], pCfg->diskPrimary);
12✔
625
      if (!pDir) TAOS_CHECK_EXIT(TSDB_CODE_INTERNAL_ERROR);
12!
626
      (void)snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%svnode%d", *(char **)pDir, TD_DIRSEP, TD_DIRSEP,
12✔
627
                     pCfg->vgId);
628
    }
629
    dInfo("mount:%s, vnode path:%s, dropped:%" PRIi8, pReq->mountName, pCfg->path, pCfg->dropped);
12!
630
    if (pCfg->dropped) {
12!
631
      ++nVgDropped;
×
632
      continue;
×
633
    }
634
    if (!taosCheckAccessFile(pCfg->path, TD_FILE_ACCESS_EXIST_OK | TD_FILE_ACCESS_READ_OK | TD_FILE_ACCESS_WRITE_OK)) {
12!
635
      dError("mount:%s, vnode path:%s, no r/w authority", pReq->mountName, pCfg->path);
×
636
      TAOS_CHECK_EXIT(TSDB_CODE_MND_NO_RIGHTS);
×
637
    }
638
    SVnodeInfo *pInfo = TARRAY_GET_ELEM(pVgCfgs, j++);
12✔
639
    TAOS_CHECK_EXIT(vnodeLoadInfo(pCfg->path, pInfo));
12!
640
    if (pInfo->config.syncCfg.replicaNum > 1) {
12!
641
      dError("mount:%s, vnode path:%s, invalid replica:%d", pReq->mountName, pCfg->path,
×
642
             pInfo->config.syncCfg.replicaNum);
643
      TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_REPLICA);
×
644
    } else if (pInfo->config.vgId != pCfg->vgId) {
12!
645
      dError("mount:%s, vnode path:%s, vgId:%d not match:%d", pReq->mountName, pCfg->path, pInfo->config.vgId,
×
646
             pCfg->vgId);
647
      TAOS_CHECK_EXIT(TSDB_CODE_FILE_CORRUPTED);
×
648
    } else if (pInfo->config.tdbEncryptAlgorithm || pInfo->config.tsdbCfg.encryptAlgorithm ||
12!
649
               pInfo->config.walCfg.encryptAlgorithm) {
12!
650
      dError("mount:%s, vnode path:%s, invalid encrypt algorithm, tdb:%d wal:%d tsdb:%d", pReq->mountName, pCfg->path,
×
651
             pInfo->config.tdbEncryptAlgorithm, pInfo->config.walCfg.encryptAlgorithm,
652
             pInfo->config.tsdbCfg.encryptAlgorithm);
653
      TAOS_CHECK_EXIT(TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG);
×
654
    }
655
    SMountDbVgId dbVgId = {.dbId = pInfo->config.dbId, .vgId = pInfo->config.vgId, .diskPrimary = pCfg->diskPrimary};
12✔
656
    TSDB_CHECK_NULL(taosArrayPush(pDiskPrimarys, &dbVgId), code, lino, _exit, terrno);
12!
657
  }
658
  if (nVgDropped > 0) {
3!
659
    dInfo("mount:%s, %d vnodes are dropped", pReq->mountName, nVgDropped);
×
660
    int32_t nVgToDrop = taosArrayGetSize(pVgCfgs) - nVgDropped;
×
661
    if (nVgToDrop > 0) taosArrayRemoveBatch(pVgCfgs, nVgToDrop - 1, nVgToDrop, NULL);
×
662
  }
663
  int32_t nVgCfg = taosArrayGetSize(pVgCfgs);
3✔
664
  int32_t nDiskPrimary = taosArrayGetSize(pDiskPrimarys);
3✔
665
  if (nVgCfg != nDiskPrimary) {
3!
666
    dError("mount:%s, nVgCfg:%d not match nDiskPrimary:%d", pReq->mountName, nVgCfg, nDiskPrimary);
×
667
    TAOS_CHECK_EXIT(TSDB_CODE_APP_ERROR);
×
668
  }
669
  if (nVgCfg > 1) {
3!
670
    taosArraySort(pVgCfgs, compareVnodeInfo);
3✔
671
    taosArraySort(pDiskPrimarys, compareVgDiskPrimary);
3✔
672
  }
673

674
  int64_t clusterId = pMountInfo->clusterId;
3✔
675
  int64_t dbId = 0, vgId = 0, nDb = 0;
3✔
676
  for (int32_t i = 0; i < nVgCfg; ++i) {
11✔
677
    SVnodeInfo *pInfo = TARRAY_GET_ELEM(pVgCfgs, i);
9✔
678
    if (clusterId != pInfo->config.syncCfg.nodeInfo->clusterId) {
9✔
679
      dError("mount:%s, clusterId:%" PRId64 " not match:%" PRId64, pReq->mountName, clusterId,
1!
680
             pInfo->config.syncCfg.nodeInfo->clusterId);
681
      TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_CLUSTER_ID);
1!
682
    }
683
    if (dbId != pInfo->config.dbId) {
8✔
684
      dbId = pInfo->config.dbId;
4✔
685
      ++nDb;
4✔
686
    }
687
    if (vgId == pInfo->config.vgId) {
8!
688
      TAOS_CHECK_EXIT(TSDB_CODE_FILE_CORRUPTED);
×
689
    } else {
690
      vgId = pInfo->config.vgId;
8✔
691
    }
692
  }
693

694
  if (nDb > 0) {
2!
695
    TSDB_CHECK_NULL((pDbInfos = taosArrayInit_s(sizeof(SMountDbInfo), nDb)), code, lino, _exit, terrno);
2!
696
    int32_t dbIdx = -1;
2✔
697
    for (int32_t i = 0; i < nVgCfg; ++i) {
10✔
698
      SVnodeInfo   *pVgCfg = TARRAY_GET_ELEM(pVgCfgs, i);
8✔
699
      SMountDbVgId *pDiskPrimary = TARRAY_GET_ELEM(pDiskPrimarys, i);
8✔
700
      SMountDbInfo *pDbInfo = NULL;
8✔
701
      if (i == 0 || ((SMountDbInfo *)TARRAY_GET_ELEM(pDbInfos, dbIdx))->dbId != pVgCfg->config.dbId) {
8✔
702
        pDbInfo = TARRAY_GET_ELEM(pDbInfos, ++dbIdx);
4✔
703
        pDbInfo->dbId = pVgCfg->config.dbId;
4✔
704
        snprintf(pDbInfo->dbName, sizeof(pDbInfo->dbName), "%s", pVgCfg->config.dbname);
4✔
705
        TSDB_CHECK_NULL((pDbInfo->pVgs = taosArrayInit(nVgCfg / nDb, sizeof(SMountVgInfo))), code, lino, _exit, terrno);
4!
706
      } else {
707
        pDbInfo = TARRAY_GET_ELEM(pDbInfos, dbIdx);
4✔
708
      }
709
      SMountVgInfo vgInfo = {
8✔
710
          .diskPrimary = pDiskPrimary->diskPrimary,
8✔
711
          .vgId = pVgCfg->config.vgId,
8✔
712
          .dbId = pVgCfg->config.dbId,
8✔
713
          .cacheLastSize = pVgCfg->config.cacheLastSize,
8✔
714
          .szPage = pVgCfg->config.szPage,
8✔
715
          .szCache = pVgCfg->config.szCache,
8✔
716
          .szBuf = pVgCfg->config.szBuf,
8✔
717
          .cacheLast = pVgCfg->config.cacheLast,
8✔
718
          .standby = pVgCfg->config.standby,
8✔
719
          .hashMethod = pVgCfg->config.hashMethod,
8✔
720
          .hashBegin = pVgCfg->config.hashBegin,
8✔
721
          .hashEnd = pVgCfg->config.hashEnd,
8✔
722
          .hashPrefix = pVgCfg->config.hashPrefix,
8✔
723
          .hashSuffix = pVgCfg->config.hashSuffix,
8✔
724
          .sttTrigger = pVgCfg->config.sttTrigger,
8✔
725
          .replications = pVgCfg->config.syncCfg.replicaNum,
8✔
726
          .precision = pVgCfg->config.tsdbCfg.precision,
8✔
727
          .compression = pVgCfg->config.tsdbCfg.compression,
8✔
728
          .slLevel = pVgCfg->config.tsdbCfg.slLevel,
8✔
729
          .daysPerFile = pVgCfg->config.tsdbCfg.days,
8✔
730
          .keep0 = pVgCfg->config.tsdbCfg.keep0,
8✔
731
          .keep1 = pVgCfg->config.tsdbCfg.keep1,
8✔
732
          .keep2 = pVgCfg->config.tsdbCfg.keep2,
8✔
733
          .keepTimeOffset = pVgCfg->config.tsdbCfg.keepTimeOffset,
8✔
734
          .minRows = pVgCfg->config.tsdbCfg.minRows,
8✔
735
          .maxRows = pVgCfg->config.tsdbCfg.maxRows,
8✔
736
          .tsdbPageSize = pVgCfg->config.tsdbPageSize / 1024,
8✔
737
          .ssChunkSize = pVgCfg->config.ssChunkSize,
8✔
738
          .ssKeepLocal = pVgCfg->config.ssKeepLocal,
8✔
739
          .ssCompact = pVgCfg->config.ssCompact,
8✔
740
          .walFsyncPeriod = pVgCfg->config.walCfg.fsyncPeriod,
8✔
741
          .walRetentionPeriod = pVgCfg->config.walCfg.retentionPeriod,
8✔
742
          .walRollPeriod = pVgCfg->config.walCfg.rollPeriod,
8✔
743
          .walRetentionSize = pVgCfg->config.walCfg.retentionSize,
8✔
744
          .walSegSize = pVgCfg->config.walCfg.segSize,
8✔
745
          .walLevel = pVgCfg->config.walCfg.level,
8✔
746
          .encryptAlgorithm = pVgCfg->config.walCfg.encryptAlgorithm,
8✔
747
          .committed = pVgCfg->state.committed,
8✔
748
          .commitID = pVgCfg->state.commitID,
8✔
749
          .commitTerm = pVgCfg->state.commitTerm,
8✔
750
          .numOfSTables = pVgCfg->config.vndStats.numOfSTables,
8✔
751
          .numOfCTables = pVgCfg->config.vndStats.numOfCTables,
8✔
752
          .numOfNTables = pVgCfg->config.vndStats.numOfNTables,
8✔
753
      };
754
      TSDB_CHECK_NULL(taosArrayPush(pDbInfo->pVgs, &vgInfo), code, lino, _exit, terrno);
16!
755
    }
756
  }
757

758
  pMountInfo->pDbs = pDbInfos;
2✔
759

760
_exit:
3✔
761
  if (code != 0) {
3✔
762
    dError("mount:%s, failed to retrieve mount vnode at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
1!
763
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
764
  }
765
  taosArrayDestroy(pDiskPrimarys);
3✔
766
  taosArrayDestroy(pVgCfgs);
3✔
767
  taosMemoryFreeClear(pCfgs);
3!
768
  TAOS_RETURN(code);
3✔
769
}
770

771
/**
772
 *   Retrieve the stables from vnode meta.
773
 */
774
static int32_t vmRetrieveMountStbs(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
2✔
775
  int32_t code = 0, lino = 0;
2✔
776
  char    path[TSDB_MOUNT_FPATH_LEN] = {0};
2✔
777
  int32_t nDb = taosArrayGetSize(pMountInfo->pDbs);
2✔
778
  SArray *suidList = NULL;
2✔
779
  SArray *pCols = NULL;
2✔
780
  SArray *pTags = NULL;
2✔
781
  SArray *pColExts = NULL;
2✔
782
  SArray *pTagExts = NULL;
2✔
783

784
  snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
2✔
785
  for (int32_t i = 0; i < nDb; ++i) {
6✔
786
    SMountDbInfo *pDbInfo = TARRAY_GET_ELEM(pMountInfo->pDbs, i);
4✔
787
    int32_t       nVg = taosArrayGetSize(pDbInfo->pVgs);
4✔
788
    for (int32_t j = 0; j < nVg; ++j) {
4!
789
      SMountVgInfo *pVgInfo = TARRAY_GET_ELEM(pDbInfo->pVgs, j);
4✔
790
      SVnode        vnode = {
4✔
791
                 .config.vgId = pVgInfo->vgId,
4✔
792
                 .config.dbId = pVgInfo->dbId,
4✔
793
                 .config.cacheLastSize = pVgInfo->cacheLastSize,
4✔
794
                 .config.szPage = pVgInfo->szPage,
4✔
795
                 .config.szCache = pVgInfo->szCache,
4✔
796
                 .config.szBuf = pVgInfo->szBuf,
4✔
797
                 .config.cacheLast = pVgInfo->cacheLast,
4✔
798
                 .config.standby = pVgInfo->standby,
4✔
799
                 .config.hashMethod = pVgInfo->hashMethod,
4✔
800
                 .config.hashBegin = pVgInfo->hashBegin,
4✔
801
                 .config.hashEnd = pVgInfo->hashEnd,
4✔
802
                 .config.hashPrefix = pVgInfo->hashPrefix,
4✔
803
                 .config.hashSuffix = pVgInfo->hashSuffix,
4✔
804
                 .config.sttTrigger = pVgInfo->sttTrigger,
4✔
805
                 .config.syncCfg.replicaNum = pVgInfo->replications,
4✔
806
                 .config.tsdbCfg.precision = pVgInfo->precision,
4✔
807
                 .config.tsdbCfg.compression = pVgInfo->compression,
4✔
808
                 .config.tsdbCfg.slLevel = pVgInfo->slLevel,
4✔
809
                 .config.tsdbCfg.days = pVgInfo->daysPerFile,
4✔
810
                 .config.tsdbCfg.keep0 = pVgInfo->keep0,
4✔
811
                 .config.tsdbCfg.keep1 = pVgInfo->keep1,
4✔
812
                 .config.tsdbCfg.keep2 = pVgInfo->keep2,
4✔
813
                 .config.tsdbCfg.keepTimeOffset = pVgInfo->keepTimeOffset,
4✔
814
                 .config.tsdbCfg.minRows = pVgInfo->minRows,
4✔
815
                 .config.tsdbCfg.maxRows = pVgInfo->maxRows,
4✔
816
                 .config.tsdbPageSize = pVgInfo->tsdbPageSize,
4✔
817
                 .config.ssChunkSize = pVgInfo->ssChunkSize,
4✔
818
                 .config.ssKeepLocal = pVgInfo->ssKeepLocal,
4✔
819
                 .config.ssCompact = pVgInfo->ssCompact,
4✔
820
                 .config.walCfg.fsyncPeriod = pVgInfo->walFsyncPeriod,
4✔
821
                 .config.walCfg.retentionPeriod = pVgInfo->walRetentionPeriod,
4✔
822
                 .config.walCfg.rollPeriod = pVgInfo->walRollPeriod,
4✔
823
                 .config.walCfg.retentionSize = pVgInfo->walRetentionSize,
4✔
824
                 .config.walCfg.segSize = pVgInfo->walSegSize,
4✔
825
                 .config.walCfg.level = pVgInfo->walLevel,
4✔
826
                 .config.walCfg.encryptAlgorithm = pVgInfo->encryptAlgorithm,
4✔
827
                 .diskPrimary = pVgInfo->diskPrimary,
4✔
828
      };
829
      void *vnodePath = taosArrayGet(pMountInfo->pDisks[0], pVgInfo->diskPrimary);
4✔
830
      snprintf(path, sizeof(path), "%s%s%s%svnode%d", *(char **)vnodePath, TD_DIRSEP, dmNodeName(VNODE), TD_DIRSEP,
4✔
831
               pVgInfo->vgId);
832
      vnode.path = path;
4✔
833

834
      int32_t rollback = vnodeShouldRollback(&vnode);
4✔
835
      if ((code = metaOpen(&vnode, &vnode.pMeta, rollback)) != 0) {
4!
836
        dError("mount:%s, failed to retrieve stbs of vnode:%d for db:%" PRId64 " on dnode:%d since %s, path:%s",
×
837
               pReq->mountName, pVgInfo->vgId, pVgInfo->dbId, pReq->dnodeId, tstrerror(code), path);
838
        TAOS_CHECK_EXIT(code);
×
839
      } else {
840
        dInfo("mount:%s, success to retrieve stbs of vnode:%d for db:%" PRId64 " on dnode:%d, path:%s", pReq->mountName,
4!
841
              pVgInfo->vgId, pVgInfo->dbId, pReq->dnodeId, path);
842

843
        SMetaReader mr = {0};
4✔
844
        tb_uid_t    suid = 0;
4✔
845
        SMeta      *pMeta = vnode.pMeta;
4✔
846

847
        metaReaderDoInit(&mr, pMeta, META_READER_LOCK);
4✔
848
        if (!suidList && !(suidList = taosArrayInit(1, sizeof(tb_uid_t)))) {
4!
849
          TSDB_CHECK_CODE(terrno, lino, _exit0);
×
850
        }
851
        taosArrayClear(suidList);
4✔
852
        TSDB_CHECK_CODE(vnodeGetStbIdList(&vnode, 0, suidList), lino, _exit0);
4!
853
        dInfo("mount:%s, vnode:%d, db:%" PRId64 ", stbs num:%d on dnode:%d", pReq->mountName, pVgInfo->vgId,
4!
854
              pVgInfo->dbId, (int32_t)taosArrayGetSize(suidList), pReq->dnodeId);
855
        int32_t nStbs = taosArrayGetSize(suidList);
4✔
856
        if (!pDbInfo->pStbs && !(pDbInfo->pStbs = taosArrayInit(nStbs, sizeof(void *)))) {
4!
857
          TSDB_CHECK_CODE(terrno, lino, _exit0);
×
858
        }
859
        for (int32_t i = 0; i < nStbs; ++i) {
16✔
860
          suid = *(tb_uid_t *)taosArrayGet(suidList, i);
12✔
861
          dInfo("mount:%s, vnode:%d, db:%" PRId64 ", stb suid:%" PRIu64 " on dnode:%d", pReq->mountName, pVgInfo->vgId,
12!
862
                pVgInfo->dbId, suid, pReq->dnodeId);
863
          if ((code = metaReaderGetTableEntryByUidCache(&mr, suid)) < 0) {
12!
864
            TSDB_CHECK_CODE(code, lino, _exit0);
×
865
          }
866
          if (mr.me.uid != suid || mr.me.type != TSDB_SUPER_TABLE ||
12!
867
              mr.me.colCmpr.nCols != mr.me.stbEntry.schemaRow.nCols) {
12!
868
            dError("mount:%s, vnode:%d, db:%" PRId64 ", stb info not match, suid:%" PRIu64 " expected:%" PRIu64
×
869
                   ", type:%" PRIi8 " expected:%d, nCmprCols:%d nCols:%d on dnode:%d",
870
                   pReq->mountName, pVgInfo->vgId, pVgInfo->dbId, mr.me.uid, suid, mr.me.type, TSDB_SUPER_TABLE,
871
                   mr.me.colCmpr.nCols, mr.me.stbEntry.schemaRow.nCols, pReq->dnodeId);
872
            TSDB_CHECK_CODE(TSDB_CODE_FILE_CORRUPTED, lino, _exit0);
×
873
          }
874
          SMountStbInfo stbInfo = {
12✔
875
              .req.source = TD_REQ_FROM_APP,
876
              .req.suid = suid,
877
              .req.colVer = mr.me.stbEntry.schemaRow.version,
12✔
878
              .req.tagVer = mr.me.stbEntry.schemaTag.version,
12✔
879
              .req.numOfColumns = mr.me.stbEntry.schemaRow.nCols,
12✔
880
              .req.numOfTags = mr.me.stbEntry.schemaTag.nCols,
12✔
881
              .req.virtualStb = TABLE_IS_VIRTUAL(mr.me.flags) ? 1 : 0,
12✔
882
          };
883
          snprintf(stbInfo.req.name, sizeof(stbInfo.req.name), "%s", mr.me.name);
12✔
884
          if (!pCols && !(pCols = taosArrayInit(stbInfo.req.numOfColumns, sizeof(SFieldWithOptions)))) {
12!
885
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
886
          }
887
          if (!pTags && !(pTags = taosArrayInit(stbInfo.req.numOfTags, sizeof(SField)))) {
12!
888
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
889
          }
890

891
          if (!pColExts && !(pColExts = taosArrayInit(stbInfo.req.numOfColumns, sizeof(col_id_t)))) {
12!
892
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
893
          }
894
          if (!pTagExts && !(pTagExts = taosArrayInit(stbInfo.req.numOfTags, sizeof(col_id_t)))) {
12!
895
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
896
          }
897
          taosArrayClear(pCols);
12✔
898
          taosArrayClear(pTags);
12✔
899
          taosArrayClear(pColExts);
12✔
900
          taosArrayClear(pTagExts);
12✔
901
          stbInfo.req.pColumns = pCols;
12✔
902
          stbInfo.req.pTags = pTags;
12✔
903
          stbInfo.pColExts = pColExts;
12✔
904
          stbInfo.pTagExts = pTagExts;
12✔
905

906
          for (int32_t c = 0; c < stbInfo.req.numOfColumns; ++c) {
72✔
907
            SSchema          *pSchema = mr.me.stbEntry.schemaRow.pSchema + c;
60✔
908
            SColCmpr         *pColComp = mr.me.colCmpr.pColCmpr + c;
60✔
909
            SFieldWithOptions col = {
60✔
910
                .type = pSchema->type,
60✔
911
                .flags = pSchema->flags,
60✔
912
                .bytes = pSchema->bytes,
60✔
913
                .compress = pColComp->alg,
60✔
914
            };
915
            (void)snprintf(col.name, sizeof(col.name), "%s", pSchema->name);
60✔
916
            if (pSchema->colId != pColComp->id) {
60!
917
              TSDB_CHECK_CODE(TSDB_CODE_FILE_CORRUPTED, lino, _exit0);
×
918
            }
919
            if (mr.me.pExtSchemas) {
60!
920
              col.typeMod = (mr.me.pExtSchemas + c)->typeMod;
×
921
            }
922
            TSDB_CHECK_NULL(taosArrayPush(pCols, &col), code, lino, _exit0, terrno);
60!
923
            TSDB_CHECK_NULL(taosArrayPush(pColExts, &pSchema->colId), code, lino, _exit0, terrno);
120!
924
          }
925
          for (int32_t t = 0; t < stbInfo.req.numOfTags; ++t) {
28✔
926
            SSchema *pSchema = mr.me.stbEntry.schemaTag.pSchema + t;
16✔
927
            SField   tag = {
16✔
928
                  .type = pSchema->type,
16✔
929
                  .flags = pSchema->flags,
16✔
930
                  .bytes = pSchema->bytes,
16✔
931
            };
932
            (void)snprintf(tag.name, sizeof(tag.name), "%s", pSchema->name);
16✔
933
            TSDB_CHECK_NULL(taosArrayPush(pTags, &tag), code, lino, _exit0, terrno);
16!
934
            TSDB_CHECK_NULL(taosArrayPush(pTagExts, &pSchema->colId), code, lino, _exit0, terrno);
32!
935
          }
936
          tDecoderClear(&mr.coder);
12✔
937

938
          // serialize the SMountStbInfo
939
          int32_t firstPartLen = 0;
12✔
940
          int32_t msgLen = tSerializeSMountStbInfo(NULL, 0, &firstPartLen, &stbInfo);
12✔
941
          if (msgLen <= 0) {
12!
942
            TSDB_CHECK_CODE(msgLen < 0 ? msgLen : TSDB_CODE_INTERNAL_ERROR, lino, _exit0);
×
943
          }
944
          void *pBuf = taosMemoryMalloc((sizeof(int32_t) << 1) + msgLen);  // totalLen(4)|1stPartLen(4)|1stPart|2ndPart
12!
945
          if (!pBuf) TSDB_CHECK_CODE(TSDB_CODE_OUT_OF_MEMORY, lino, _exit0);
12!
946
          *(int32_t *)pBuf = (sizeof(int32_t) << 1) + msgLen;
12✔
947
          *(int32_t *)POINTER_SHIFT(pBuf, sizeof(int32_t)) = firstPartLen;
12✔
948
          if (tSerializeSMountStbInfo(POINTER_SHIFT(pBuf, (sizeof(int32_t) << 1)), msgLen, NULL, &stbInfo) <= 0) {
12!
949
            taosMemoryFree(pBuf);
×
950
            TSDB_CHECK_CODE(msgLen < 0 ? msgLen : TSDB_CODE_INTERNAL_ERROR, lino, _exit0);
×
951
          }
952
          if (!taosArrayPush(pDbInfo->pStbs, &pBuf)) {
24!
953
            taosMemoryFree(pBuf);
×
954
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
955
          }
956
        }
957
      _exit0:
4✔
958
        metaReaderClear(&mr);
4✔
959
        metaClose(&vnode.pMeta);
4✔
960
        TAOS_CHECK_EXIT(code);
4!
961
      }
962
      break;  // retrieve stbs from one vnode is enough
4✔
963
    }
964
  }
965
_exit:
2✔
966
  if (code != 0) {
2!
967
    dError("mount:%s, failed to retrieve mount stbs at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
968
           pReq->dnodeId, tstrerror(code), path);
969
  }
970
  taosArrayDestroy(suidList);
2✔
971
  taosArrayDestroy(pCols);
2✔
972
  taosArrayDestroy(pTags);
2✔
973
  taosArrayDestroy(pColExts);
2✔
974
  taosArrayDestroy(pTagExts);
2✔
975
  TAOS_RETURN(code);
2✔
976
}
977

978
int32_t vmMountCheckRunning(const char *mountName, const char *mountPath, TdFilePtr *pFile, int32_t retryLimit) {
9✔
979
  int32_t code = 0, lino = 0;
9✔
980
  int32_t retryTimes = 0;
9✔
981
  char    filepath[PATH_MAX] = {0};
9✔
982
  (void)snprintf(filepath, sizeof(filepath), "%s%s.running", mountPath, TD_DIRSEP);
9✔
983
  TSDB_CHECK_NULL((*pFile = taosOpenFile(
9!
984
                       filepath, TD_FILE_CREATE | TD_FILE_READ | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_CLOEXEC)),
985
                  code, lino, _exit, terrno);
986
  int32_t ret = 0;
9✔
987
  do {
988
    ret = taosLockFile(*pFile);
11✔
989
    if (ret == 0) break;
11✔
990
    taosMsleep(1000);
3✔
991
    ++retryTimes;
3✔
992
    dError("mount:%s, failed to lock file:%s since %s, retryTimes:%d", mountName, filepath, tstrerror(ret), retryTimes);
3!
993
  } while (retryTimes < retryLimit);
3✔
994
  TAOS_CHECK_EXIT(ret);
9✔
995
_exit:
8✔
996
  if (code != 0) {
9✔
997
    (void)taosCloseFile(pFile);
1✔
998
    *pFile = NULL;
1✔
999
    dError("mount:%s, failed to check running at line %d since %s, path:%s", mountName, lino, tstrerror(code),
1!
1000
           filepath);
1001
  }
1002
  TAOS_RETURN(code);
9✔
1003
}
1004

1005
static int32_t vmRetrieveMountPreCheck(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
6✔
1006
  int32_t code = 0, lino = 0;
6✔
1007
  char    path[TSDB_MOUNT_FPATH_LEN] = {0};
6✔
1008
  TSDB_CHECK_CONDITION(taosCheckAccessFile(pReq->mountPath, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
6✔
1009
  TAOS_CHECK_EXIT(vmMountCheckRunning(pReq->mountName, pReq->mountPath, &pMountInfo->pFile, 3));
5✔
1010
  (void)snprintf(path, sizeof(path), "%s%s%s%sdnode.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP);
4✔
1011
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
4✔
1012
  (void)snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(MNODE));
3✔
1013
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
3!
1014
  (void)snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
3✔
1015
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
3!
1016
  (void)snprintf(path, sizeof(path), "%s%s%s%sconfig%slocal.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP,
3✔
1017
           TD_DIRSEP);
1018
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
3!
1019
_exit:
3✔
1020
  if (code != 0) {
6✔
1021
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
3!
1022
           pReq->dnodeId, tstrerror(code), path);
1023
  }
1024
  TAOS_RETURN(code);
6✔
1025
}
1026

1027
static int32_t vmRetrieveMountPathImpl(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, SRetrieveMountPathReq *pReq,
6✔
1028
                                       SMountInfo *pMountInfo) {
1029
  int32_t code = 0, lino = 0;
6✔
1030
  pMountInfo->dnodeId = pReq->dnodeId;
6✔
1031
  pMountInfo->mountUid = pReq->mountUid;
6✔
1032
  (void)tsnprintf(pMountInfo->mountName, sizeof(pMountInfo->mountName), "%s", pReq->mountName);
6✔
1033
  (void)tsnprintf(pMountInfo->mountPath, sizeof(pMountInfo->mountPath), "%s", pReq->mountPath);
6✔
1034
  pMountInfo->ignoreExist = pReq->ignoreExist;
6✔
1035
  pMountInfo->valLen = pReq->valLen;
6✔
1036
  pMountInfo->pVal = pReq->pVal;
6✔
1037
  TAOS_CHECK_EXIT(vmRetrieveMountPreCheck(pMgmt, pReq, pMountInfo));
6✔
1038
  TAOS_CHECK_EXIT(vmRetrieveMountDnode(pMgmt, pReq, pMountInfo));
3!
1039
  TAOS_CHECK_EXIT(vmRetrieveMountVnodes(pMgmt, pReq, pMountInfo));
3✔
1040
  TAOS_CHECK_EXIT(vmRetrieveMountStbs(pMgmt, pReq, pMountInfo));
2!
1041
_exit:
2✔
1042
  if (code != 0) {
6✔
1043
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
4!
1044
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
1045
  }
1046
  TAOS_RETURN(code);
6✔
1047
}
1048

1049
int32_t vmProcessRetrieveMountPathReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
6✔
1050
  int32_t               code = 0, lino = 0;
6✔
1051
  int32_t               rspCode = 0;
6✔
1052
  SVnodeMgmt            vndMgmt = {0};
6✔
1053
  SMountInfo            mountInfo = {0};
6✔
1054
  void                 *pBuf = NULL;
6✔
1055
  int32_t               bufLen = 0;
6✔
1056
  SRetrieveMountPathReq req = {0};
6✔
1057

1058
  vndMgmt = *pMgmt;
6✔
1059
  vndMgmt.path = NULL;
6✔
1060
  TAOS_CHECK_GOTO(tDeserializeSRetrieveMountPathReq(pMsg->pCont, pMsg->contLen, &req), &lino, _end);
6!
1061
  dInfo("mount:%s, start to retrieve path:%s", req.mountName, req.mountPath);
6!
1062
  TAOS_CHECK_GOTO(vmRetrieveMountPathImpl(&vndMgmt, pMsg, &req, &mountInfo), &lino, _end);
6✔
1063
_end:
2✔
1064
  TSDB_CHECK_CONDITION((bufLen = tSerializeSMountInfo(NULL, 0, &mountInfo)) >= 0, rspCode, lino, _exit, bufLen);
6!
1065
  TSDB_CHECK_CONDITION((pBuf = rpcMallocCont(bufLen)), rspCode, lino, _exit, terrno);
6!
1066
  TSDB_CHECK_CONDITION((bufLen = tSerializeSMountInfo(pBuf, bufLen, &mountInfo)) >= 0, rspCode, lino, _exit, bufLen);
6!
1067
  pMsg->info.rsp = pBuf;
6✔
1068
  pMsg->info.rspLen = bufLen;
6✔
1069
_exit:
6✔
1070
  if (rspCode != 0) {
6!
1071
    // corner case: if occurs, the client will not receive the response, and the client should be killed manually
1072
    dError("mount:%s, failed to retrieve mount at line %d since %s, dnode:%d, path:%s", req.mountName, lino,
×
1073
           tstrerror(rspCode), req.dnodeId, req.mountPath);
1074
    rpcFreeCont(pBuf);
×
1075
    code = rspCode;
×
1076
  } else if (code != 0) {
6✔
1077
    // the client would receive the response with error msg
1078
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", req.mountName, lino,
4!
1079
           req.dnodeId, tstrerror(code), req.mountPath);
1080
  } else {
1081
    int32_t nVgs = 0;
2✔
1082
    int32_t nDbs = taosArrayGetSize(mountInfo.pDbs);
2✔
1083
    for (int32_t i = 0; i < nDbs; ++i) {
6✔
1084
      SMountDbInfo *pDb = TARRAY_GET_ELEM(mountInfo.pDbs, i);
4✔
1085
      nVgs += taosArrayGetSize(pDb->pVgs);
4✔
1086
    }
1087
    dInfo("mount:%s, success to retrieve mount, nDbs:%d, nVgs:%d, path:%s", req.mountName, nDbs, nVgs, req.mountPath);
2!
1088
  }
1089
  taosMemFreeClear(vndMgmt.path);
6!
1090
  tFreeMountInfo(&mountInfo, false);
6✔
1091
  TAOS_RETURN(code);
6✔
1092
}
1093

1094
static int32_t vmMountVnode(SVnodeMgmt *pMgmt, const char *path, SVnodeCfg *pCfg, int32_t diskPrimary,
8✔
1095
                            SMountVnodeReq *req, STfs *pMountTfs) {
1096
  int32_t    code = 0;
8✔
1097
  SVnodeInfo info = {0};
8✔
1098
  char       hostDir[TSDB_FILENAME_LEN] = {0};
8✔
1099
  char       mountDir[TSDB_FILENAME_LEN] = {0};
8✔
1100
  char       mountVnode[32] = {0};
8✔
1101

1102
  if ((code = vnodeCheckCfg(pCfg)) < 0) {
8!
1103
    vError("vgId:%d, mount:%s, failed to mount vnode since:%s", pCfg->vgId, req->mountName, tstrerror(code));
×
1104
    return code;
×
1105
  }
1106

1107
  vnodeGetPrimaryDir(path, 0, pMgmt->pTfs, hostDir, TSDB_FILENAME_LEN);
8✔
1108
  if ((code = taosMkDir(hostDir))) {
8!
1109
    vError("vgId:%d, mount:%s, failed to prepare vnode dir since %s, host path: %s", pCfg->vgId, req->mountName,
×
1110
           tstrerror(code), hostDir);
1111
    return code;
×
1112
  }
1113

1114
  info.config = *pCfg;  // copy the config
8✔
1115
  info.state.committed = req->committed;
8✔
1116
  info.state.commitID = req->commitID;
8✔
1117
  info.state.commitTerm = req->commitTerm;
8✔
1118
  info.state.applied = req->committed;
8✔
1119
  info.state.applyTerm = req->commitTerm;
8✔
1120
  info.config.vndStats.numOfSTables = req->numOfSTables;
8✔
1121
  info.config.vndStats.numOfCTables = req->numOfCTables;
8✔
1122
  info.config.vndStats.numOfNTables = req->numOfNTables;
8✔
1123

1124
  SVnodeInfo oldInfo = {0};
8✔
1125
  oldInfo.config = vnodeCfgDefault;
8✔
1126
  if (vnodeLoadInfo(hostDir, &oldInfo) == 0) {
8!
1127
    if (oldInfo.config.dbId != info.config.dbId) {
×
1128
      code = TSDB_CODE_VND_ALREADY_EXIST_BUT_NOT_MATCH;
×
1129
      vError("vgId:%d, mount:%s, vnode config info already exists at %s. oldDbId:%" PRId64 "(%s) at cluster:%" PRId64
×
1130
             ", newDbId:%" PRId64 "(%s) at cluser:%" PRId64 ", code:%s",
1131
             oldInfo.config.vgId, req->mountName, hostDir, oldInfo.config.dbId, oldInfo.config.dbname,
1132
             oldInfo.config.syncCfg.nodeInfo[oldInfo.config.syncCfg.myIndex].clusterId, info.config.dbId,
1133
             info.config.dbname, info.config.syncCfg.nodeInfo[info.config.syncCfg.myIndex].clusterId, tstrerror(code));
1134

1135
    } else {
1136
      vWarn("vgId:%d, mount:%s, vnode config info already exists at %s.", oldInfo.config.vgId, req->mountName, hostDir);
×
1137
    }
1138
    return code;
×
1139
  }
1140

1141
  char hostSubDir[TSDB_FILENAME_LEN] = {0};
8✔
1142
  char mountSubDir[TSDB_FILENAME_LEN] = {0};
8✔
1143
  (void)snprintf(mountVnode, sizeof(mountVnode), "vnode%svnode%d", TD_DIRSEP, req->mountVgId);
8✔
1144
  vnodeGetPrimaryDir(mountVnode, diskPrimary, pMountTfs, mountDir, TSDB_FILENAME_LEN);
8✔
1145
  static const char *vndSubDirs[] = {"meta", "sync", "tq", "tsdb", "wal"};
1146
  for (int32_t i = 0; i < tListLen(vndSubDirs); ++i) {
48✔
1147
    (void)snprintf(hostSubDir, sizeof(hostSubDir), "%s%s%s", hostDir, TD_DIRSEP, vndSubDirs[i]);
40✔
1148
    (void)snprintf(mountSubDir, sizeof(mountSubDir), "%s%s%s", mountDir, TD_DIRSEP, vndSubDirs[i]);
40✔
1149
    if ((code = taosSymLink(mountSubDir, hostSubDir)) != 0) {
40!
1150
      vError("vgId:%d, mount:%s, failed to create vnode symlink %s -> %s since %s", info.config.vgId, req->mountName,
×
1151
             mountSubDir, hostSubDir, tstrerror(code));
1152
      return code;
×
1153
    }
1154
  }
1155
  vInfo("vgId:%d, mount:save vnode config while create", info.config.vgId);
8!
1156
  if ((code = vnodeSaveInfo(hostDir, &info)) < 0 || (code = vnodeCommitInfo(hostDir)) < 0) {
8!
1157
    vError("vgId:%d, mount:%s, failed to save vnode config since %s, mount path: %s", pCfg ? pCfg->vgId : 0,
×
1158
           req->mountName, tstrerror(code), hostDir);
1159
    return code;
×
1160
  }
1161
  vInfo("vgId:%d, mount:%s, vnode is mounted from %s to %s", info.config.vgId, req->mountName, mountDir, hostDir);
8!
1162
  return 0;
8✔
1163
}
1164

1165
int32_t vmProcessMountVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
8✔
1166
  int32_t          code = 0, lino = 0;
8✔
1167
  SMountVnodeReq   req = {0};
8✔
1168
  SCreateVnodeReq *pCreateReq = &req.createReq;
8✔
1169
  SVnodeCfg        vnodeCfg = {0};
8✔
1170
  SWrapperCfg      wrapperCfg = {0};
8✔
1171
  SVnode          *pImpl = NULL;
8✔
1172
  STfs            *pMountTfs = NULL;
8✔
1173
  char             path[TSDB_FILENAME_LEN] = {0};
8✔
1174
  bool             releaseTfs = false;
8✔
1175

1176
  if (tDeserializeSMountVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
8!
1177
    dError("vgId:%d, failed to mount vnode since deserialize request error", pCreateReq->vgId);
×
1178
    return TSDB_CODE_INVALID_MSG;
×
1179
  }
1180

1181
  if (pCreateReq->learnerReplica == 0) {
8!
1182
    pCreateReq->learnerSelfIndex = -1;
8✔
1183
  }
1184
  for (int32_t i = 0; i < pCreateReq->replica; ++i) {
16✔
1185
    dInfo("mount:%s, vgId:%d, replica:%d ep:%s:%u dnode:%d", req.mountName, pCreateReq->vgId, i,
8!
1186
          pCreateReq->replicas[i].fqdn, pCreateReq->replicas[i].port, pCreateReq->replicas[i].id);
1187
  }
1188
  for (int32_t i = 0; i < pCreateReq->learnerReplica; ++i) {
8!
1189
    dInfo("mount:%s, vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.mountName, pCreateReq->vgId, i,
×
1190
          pCreateReq->learnerReplicas[i].fqdn, pCreateReq->learnerReplicas[i].port, pCreateReq->replicas[i].id);
1191
  }
1192

1193
  SReplica *pReplica = NULL;
8✔
1194
  if (pCreateReq->selfIndex != -1) {
8!
1195
    pReplica = &pCreateReq->replicas[pCreateReq->selfIndex];
8✔
1196
  } else {
1197
    pReplica = &pCreateReq->learnerReplicas[pCreateReq->learnerSelfIndex];
×
1198
  }
1199
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
8!
1200
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
8!
1201
    (void)tFreeSMountVnodeReq(&req);
×
1202
    code = TSDB_CODE_INVALID_MSG;
×
1203
    dError("mount:%s, vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode, reason:%s", req.mountName,
×
1204
           pCreateReq->vgId, pReplica->id, pReplica->fqdn, pReplica->port, tstrerror(code));
1205
    return code;
×
1206
  }
1207
  vmGenerateVnodeCfg(pCreateReq, &vnodeCfg);
8✔
1208
  vnodeCfg.mountVgId = req.mountVgId;
8✔
1209
  vmGenerateWrapperCfg(pMgmt, pCreateReq, &wrapperCfg);
8✔
1210
  wrapperCfg.mountId = req.mountId;
8✔
1211

1212
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, pCreateReq->vgId, false);
8✔
1213
  if (pVnode != NULL && (pCreateReq->replica == 1 || !pVnode->failed)) {
8!
1214
    dError("mount:%s, vgId:%d, already exist", req.mountName, pCreateReq->vgId);
×
1215
    (void)tFreeSMountVnodeReq(&req);
×
1216
    vmReleaseVnode(pMgmt, pVnode);
×
1217
    code = TSDB_CODE_VND_ALREADY_EXIST;
×
1218
    return 0;
×
1219
  }
1220
  vmReleaseVnode(pMgmt, pVnode);
8✔
1221

1222
  wrapperCfg.diskPrimary = req.diskPrimary;
8✔
1223
  (void)snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
8✔
1224
  TAOS_CHECK_EXIT(vmAcquireMountTfs(pMgmt, req.mountId, req.mountName, req.mountPath, &pMountTfs));
8!
1225
  releaseTfs = true;
8✔
1226

1227
  TAOS_CHECK_EXIT(vmMountVnode(pMgmt, path, &vnodeCfg, wrapperCfg.diskPrimary, &req, pMountTfs));
8!
1228
  if (!(pImpl = vnodeOpen(path, 0, pMgmt->pTfs, pMountTfs, pMgmt->msgCb, true))) {
8!
1229
    TAOS_CHECK_EXIT(terrno != 0 ? terrno : -1);
×
1230
  }
1231
  if ((code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl)) != 0) {
8!
1232
    TAOS_CHECK_EXIT(terrno != 0 ? terrno : code);
×
1233
  }
1234
  TAOS_CHECK_EXIT(vnodeStart(pImpl));
8!
1235
  TAOS_CHECK_EXIT(vmWriteVnodeListToFile(pMgmt));
8!
1236
  TAOS_CHECK_EXIT(vmWriteMountListToFile(pMgmt));
8!
1237
_exit:
8✔
1238
  vmCleanPrimaryDisk(pMgmt, pCreateReq->vgId);
8✔
1239
  if (code != 0) {
8!
1240
    dError("mount:%s, vgId:%d, msgType:%s, failed at line %d to mount vnode since %s", req.mountName, pCreateReq->vgId,
×
1241
           TMSG_INFO(pMsg->msgType), lino, tstrerror(code));
1242
    vmCloseFailedVnode(pMgmt, pCreateReq->vgId);
×
1243
    vnodeClose(pImpl);
×
1244
    vnodeDestroy(0, path, pMgmt->pTfs, 0);
×
1245
    if (releaseTfs) vmReleaseMountTfs(pMgmt, req.mountId, 1);
×
1246
  } else {
1247
    dInfo("mount:%s, vgId:%d, msgType:%s, success to mount vnode", req.mountName, pCreateReq->vgId,
8!
1248
          TMSG_INFO(pMsg->msgType));
1249
  }
1250

1251
  pMsg->code = code;
8✔
1252
  pMsg->info.rsp = NULL;
8✔
1253
  pMsg->info.rspLen = 0;
8✔
1254

1255
  (void)tFreeSMountVnodeReq(&req);
8✔
1256
  TAOS_RETURN(code);
8✔
1257
}
1258
#endif  // USE_MOUNT
1259

1260
// alter replica doesn't use this, but restore dnode still use this
1261
int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
3,659✔
1262
  SAlterVnodeTypeReq req = {0};
3,659✔
1263
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
3,659!
1264
    terrno = TSDB_CODE_INVALID_MSG;
×
1265
    return -1;
×
1266
  }
1267

1268
  if (req.learnerReplicas == 0) {
1269
    req.learnerSelfIndex = -1;
1270
  }
1271

1272
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process alter-node-type-request", req.vgId,
3,659!
1273
        TMSG_INFO(pMsg->msgType));
1274

1275
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
3,659✔
1276
  if (pVnode == NULL) {
3,659!
1277
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
1278
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1279
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1280
    return -1;
×
1281
  }
1282

1283
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
3,659✔
1284
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
3,659!
1285
  if (role == TAOS_SYNC_ROLE_VOTER) {
3,659!
1286
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
1287
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
1288
    vmReleaseVnode(pMgmt, pVnode);
×
1289
    return -1;
×
1290
  }
1291

1292
  dInfo("vgId:%d, checking node catch up", req.vgId);
3,659!
1293
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
3,659✔
1294
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
3,466✔
1295
    vmReleaseVnode(pMgmt, pVnode);
3,466✔
1296
    return -1;
3,466✔
1297
  }
1298

1299
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
193!
1300

1301
  int32_t vgId = req.vgId;
193✔
1302
  dInfo("vgId:%d, start to alter vnode type replica:%d selfIndex:%d strict:%d changeVersion:%d", vgId, req.replica,
193!
1303
        req.selfIndex, req.strict, req.changeVersion);
1304
  for (int32_t i = 0; i < req.replica; ++i) {
737✔
1305
    SReplica *pReplica = &req.replicas[i];
544✔
1306
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
544!
1307
  }
1308
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
193!
1309
    SReplica *pReplica = &req.learnerReplicas[i];
×
1310
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
×
1311
  }
1312

1313
  if (req.replica <= 0 || (req.selfIndex < 0 && req.learnerSelfIndex < 0) || req.selfIndex >= req.replica ||
193!
1314
      req.learnerSelfIndex >= req.learnerReplica) {
193!
1315
    terrno = TSDB_CODE_INVALID_MSG;
×
1316
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
1317
    vmReleaseVnode(pMgmt, pVnode);
×
1318
    return -1;
×
1319
  }
1320

1321
  SReplica *pReplica = NULL;
193✔
1322
  if (req.selfIndex != -1) {
193!
1323
    pReplica = &req.replicas[req.selfIndex];
193✔
1324
  } else {
1325
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
×
1326
  }
1327

1328
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
193!
1329
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
193!
1330
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1331
    dError("vgId:%d, dnodeId:%d ep:%s:%u in request, ep:%s:%u in local, %s", vgId, pReplica->id, pReplica->fqdn,
×
1332
           pReplica->port, tsLocalFqdn, tsServerPort, tstrerror(terrno));
1333
    vmReleaseVnode(pMgmt, pVnode);
×
1334
    return -1;
×
1335
  }
1336

1337
  dInfo("vgId:%d, start to close vnode", vgId);
193!
1338
  SWrapperCfg wrapperCfg = {
193✔
1339
      .dropped = pVnode->dropped,
193✔
1340
      .vgId = pVnode->vgId,
193✔
1341
      .vgVersion = pVnode->vgVersion,
193✔
1342
      .diskPrimary = pVnode->diskPrimary,
193✔
1343
  };
1344
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
193✔
1345

1346
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
193✔
1347
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
193✔
1348

1349
  int32_t diskPrimary = wrapperCfg.diskPrimary;
193✔
1350
  char    path[TSDB_FILENAME_LEN] = {0};
193✔
1351
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
193✔
1352

1353
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
193!
1354
  if (vnodeAlterReplica(path, &req, diskPrimary, pMgmt->pTfs) < 0) {
193!
1355
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
1356
    return -1;
×
1357
  }
1358

1359
  dInfo("vgId:%d, begin to open vnode", vgId);
193!
1360
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
193✔
1361
  if (pImpl == NULL) {
193!
1362
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
1363
    return -1;
×
1364
  }
1365

1366
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
193!
1367
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
1368
    return -1;
×
1369
  }
1370

1371
  if (vnodeStart(pImpl) != 0) {
193!
1372
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
1373
    return -1;
×
1374
  }
1375

1376
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process alter-node-type-request, vnode config is altered",
193!
1377
        req.vgId, TMSG_INFO(pMsg->msgType));
1378
  return 0;
193✔
1379
}
1380

1381
int32_t vmProcessCheckLearnCatchupReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
1382
  SCheckLearnCatchupReq req = {0};
×
1383
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
×
1384
    terrno = TSDB_CODE_INVALID_MSG;
×
1385
    return -1;
×
1386
  }
1387

1388
  if (req.learnerReplicas == 0) {
1389
    req.learnerSelfIndex = -1;
1390
  }
1391

1392
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process check-learner-catchup-request", req.vgId,
×
1393
        TMSG_INFO(pMsg->msgType));
1394

1395
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
×
1396
  if (pVnode == NULL) {
×
1397
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
1398
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1399
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1400
    return -1;
×
1401
  }
1402

1403
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
×
1404
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
×
1405
  if (role == TAOS_SYNC_ROLE_VOTER) {
×
1406
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
1407
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
1408
    vmReleaseVnode(pMgmt, pVnode);
×
1409
    return -1;
×
1410
  }
1411

1412
  dInfo("vgId:%d, checking node catch up", req.vgId);
×
1413
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
×
1414
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
×
1415
    vmReleaseVnode(pMgmt, pVnode);
×
1416
    return -1;
×
1417
  }
1418

1419
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
×
1420

1421
  vmReleaseVnode(pMgmt, pVnode);
×
1422

1423
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process check-learner-catchup-request", req.vgId,
×
1424
        TMSG_INFO(pMsg->msgType));
1425

1426
  return 0;
×
1427
}
1428

1429
int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
66✔
1430
  SDisableVnodeWriteReq req = {0};
66✔
1431
  if (tDeserializeSDisableVnodeWriteReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
66!
1432
    terrno = TSDB_CODE_INVALID_MSG;
×
1433
    return -1;
×
1434
  }
1435

1436
  dInfo("vgId:%d, vnode write disable:%d", req.vgId, req.disable);
66!
1437

1438
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
66✔
1439
  if (pVnode == NULL) {
66!
1440
    dError("vgId:%d, failed to disable write since %s", req.vgId, terrstr());
×
1441
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1442
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1443
    return -1;
×
1444
  }
1445

1446
  pVnode->disable = req.disable;
66✔
1447
  vmReleaseVnode(pMgmt, pVnode);
66✔
1448
  return 0;
66✔
1449
}
1450

1451
int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
65✔
1452
  SAlterVnodeHashRangeReq req = {0};
65✔
1453
  if (tDeserializeSAlterVnodeHashRangeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
65!
1454
    terrno = TSDB_CODE_INVALID_MSG;
×
1455
    return -1;
×
1456
  }
1457

1458
  int32_t srcVgId = req.srcVgId;
65✔
1459
  int32_t dstVgId = req.dstVgId;
65✔
1460

1461
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, dstVgId);
65✔
1462
  if (pVnode != NULL) {
65!
1463
    dError("vgId:%d, vnode already exist", dstVgId);
×
1464
    vmReleaseVnode(pMgmt, pVnode);
×
1465
    terrno = TSDB_CODE_VND_ALREADY_EXIST;
×
1466
    return -1;
×
1467
  }
1468

1469
  dInfo("vgId:%d, start to alter vnode hashrange:[%u, %u], dstVgId:%d", req.srcVgId, req.hashBegin, req.hashEnd,
65!
1470
        req.dstVgId);
1471
  pVnode = vmAcquireVnode(pMgmt, srcVgId);
65✔
1472
  if (pVnode == NULL) {
65!
1473
    dError("vgId:%d, failed to alter hashrange since %s", srcVgId, terrstr());
×
1474
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1475
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1476
    return -1;
×
1477
  }
1478

1479
  SWrapperCfg wrapperCfg = {
65✔
1480
      .dropped = pVnode->dropped,
65✔
1481
      .vgId = dstVgId,
1482
      .vgVersion = pVnode->vgVersion,
65✔
1483
      .diskPrimary = pVnode->diskPrimary,
65✔
1484
  };
1485
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
65✔
1486

1487
  // prepare alter
1488
  pVnode->toVgId = dstVgId;
65✔
1489
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
65!
1490
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
1491
    return -1;
×
1492
  }
1493

1494
  dInfo("vgId:%d, close vnode", srcVgId);
65!
1495
  vmCloseVnode(pMgmt, pVnode, true, false);
65✔
1496

1497
  int32_t diskPrimary = wrapperCfg.diskPrimary;
65✔
1498
  char    srcPath[TSDB_FILENAME_LEN] = {0};
65✔
1499
  char    dstPath[TSDB_FILENAME_LEN] = {0};
65✔
1500
  snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
65✔
1501
  snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
65✔
1502

1503
  dInfo("vgId:%d, alter vnode hashrange at %s", srcVgId, srcPath);
65!
1504
  if (vnodeAlterHashRange(srcPath, dstPath, &req, diskPrimary, pMgmt->pTfs) < 0) {
65!
1505
    dError("vgId:%d, failed to alter vnode hashrange since %s", srcVgId, terrstr());
×
1506
    return -1;
×
1507
  }
1508

1509
  dInfo("vgId:%d, open vnode", dstVgId);
65!
1510
  SVnode *pImpl = vnodeOpen(dstPath, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
65✔
1511

1512
  if (pImpl == NULL) {
65!
1513
    dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr());
×
1514
    return -1;
×
1515
  }
1516

1517
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
65!
1518
    dError("vgId:%d, failed to open vnode mgmt since %s", dstVgId, terrstr());
×
1519
    return -1;
×
1520
  }
1521

1522
  if (vnodeStart(pImpl) != 0) {
65!
1523
    dError("vgId:%d, failed to start sync since %s", dstVgId, terrstr());
×
1524
    return -1;
×
1525
  }
1526

1527
  // complete alter
1528
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
65!
1529
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
1530
    return -1;
×
1531
  }
1532

1533
  dInfo("vgId:%d, vnode hashrange is altered", dstVgId);
65!
1534
  return 0;
65✔
1535
}
1536

1537
int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
1,209✔
1538
  SAlterVnodeReplicaReq alterReq = {0};
1,209✔
1539
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
1,209!
1540
    terrno = TSDB_CODE_INVALID_MSG;
×
1541
    return -1;
×
1542
  }
1543

1544
  if (alterReq.learnerReplica == 0) {
1,209✔
1545
    alterReq.learnerSelfIndex = -1;
870✔
1546
  }
1547

1548
  int32_t vgId = alterReq.vgId;
1,209✔
1549
  dInfo(
1,209!
1550
      "vgId:%d, vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
1551
      "learnerSelfIndex:%d strict:%d changeVersion:%d",
1552
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
1553
      alterReq.learnerSelfIndex, alterReq.strict, alterReq.changeVersion);
1554

1555
  for (int32_t i = 0; i < alterReq.replica; ++i) {
4,605✔
1556
    SReplica *pReplica = &alterReq.replicas[i];
3,396✔
1557
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
3,396!
1558
  }
1559
  for (int32_t i = 0; i < alterReq.learnerReplica; ++i) {
1,548✔
1560
    SReplica *pReplica = &alterReq.learnerReplicas[i];
339✔
1561
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
339!
1562
  }
1563

1564
  if (alterReq.replica <= 0 || (alterReq.selfIndex < 0 && alterReq.learnerSelfIndex < 0) ||
1,209!
1565
      alterReq.selfIndex >= alterReq.replica || alterReq.learnerSelfIndex >= alterReq.learnerReplica) {
1,209!
1566
    terrno = TSDB_CODE_INVALID_MSG;
×
1567
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
1568
    return -1;
×
1569
  }
1570

1571
  SReplica *pReplica = NULL;
1,209✔
1572
  if (alterReq.selfIndex != -1) {
1,209!
1573
    pReplica = &alterReq.replicas[alterReq.selfIndex];
1,209✔
1574
  } else {
1575
    pReplica = &alterReq.learnerReplicas[alterReq.learnerSelfIndex];
×
1576
  }
1577

1578
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
1,209!
1579
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
1,209!
1580
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1581
    dError("vgId:%d, dnodeId:%d ep:%s:%u in request, ep:%s:%u in lcoal, %s", vgId, pReplica->id, pReplica->fqdn,
×
1582
           pReplica->port, tsLocalFqdn, tsServerPort, tstrerror(terrno));
1583
    return -1;
×
1584
  }
1585

1586
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
1,209✔
1587
  if (pVnode == NULL) {
1,209!
1588
    dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
×
1589
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1590
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1591
    return -1;
×
1592
  }
1593

1594
  dInfo("vgId:%d, start to close vnode", vgId);
1,209!
1595
  SWrapperCfg wrapperCfg = {
1,209✔
1596
      .dropped = pVnode->dropped,
1,209✔
1597
      .vgId = pVnode->vgId,
1,209✔
1598
      .vgVersion = pVnode->vgVersion,
1,209✔
1599
      .diskPrimary = pVnode->diskPrimary,
1,209✔
1600
  };
1601
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
1,209✔
1602

1603
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
1,209✔
1604
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
1,209✔
1605

1606
  int32_t diskPrimary = wrapperCfg.diskPrimary;
1,209✔
1607
  char    path[TSDB_FILENAME_LEN] = {0};
1,209✔
1608
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
1,209✔
1609

1610
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
1,209!
1611
  if (vnodeAlterReplica(path, &alterReq, diskPrimary, pMgmt->pTfs) < 0) {
1,209!
1612
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
1613
    return -1;
×
1614
  }
1615

1616
  dInfo("vgId:%d, begin to open vnode", vgId);
1,209!
1617
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
1,209✔
1618
  if (pImpl == NULL) {
1,209!
1619
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
1620
    return -1;
×
1621
  }
1622

1623
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
1,209!
1624
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
1625
    return -1;
×
1626
  }
1627

1628
  if (vnodeStart(pImpl) != 0) {
1,209!
1629
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
1630
    return -1;
×
1631
  }
1632

1633
  dInfo(
1,209!
1634
      "vgId:%d, vnode management handle msgType:%s, end to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
1635
      "learnerSelfIndex:%d strict:%d",
1636
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
1637
      alterReq.learnerSelfIndex, alterReq.strict);
1638
  return 0;
1,209✔
1639
}
1640

1641
int32_t vmProcessAlterVnodeElectBaselineReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
120✔
1642
  SAlterVnodeElectBaselineReq alterReq = {0};
120✔
1643
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
120!
1644
    return TSDB_CODE_INVALID_MSG;
×
1645
  }
1646

1647
  int32_t vgId = alterReq.vgId;
120✔
1648
  dInfo(
120!
1649
      "vgId:%d, process alter vnode elect-base-line msgType:%s, electBaseLine:%d",
1650
      vgId, TMSG_INFO(pMsg->msgType), alterReq.electBaseLine);
1651

1652
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
120✔
1653
  if (pVnode == NULL) {
120!
1654
    dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
×
1655
    return terrno;
×
1656
  }
1657

1658
  if(vnodeSetElectBaseline(pVnode->pImpl, alterReq.electBaseLine) != 0){
120!
1659
    vmReleaseVnode(pMgmt, pVnode);
×
1660
    return -1;
×
1661
  }
1662

1663
  vmReleaseVnode(pMgmt, pVnode);
120✔
1664
  return 0;
120✔
1665
}
1666

1667
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
3,065✔
1668
  int32_t       code = 0;
3,065✔
1669
  SDropVnodeReq dropReq = {0};
3,065✔
1670
  if (tDeserializeSDropVnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
3,065!
1671
    terrno = TSDB_CODE_INVALID_MSG;
×
1672
    return terrno;
×
1673
  }
1674

1675
  int32_t vgId = dropReq.vgId;
3,065✔
1676
  dInfo("vgId:%d, start to drop vnode", vgId);
3,065!
1677

1678
  if (dropReq.dnodeId != pMgmt->pData->dnodeId) {
3,065!
1679
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1680
    dError("vgId:%d, dnodeId:%d, %s", dropReq.vgId, dropReq.dnodeId, tstrerror(terrno));
×
1681
    return terrno;
×
1682
  }
1683

1684
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, vgId, false);
3,065✔
1685
  if (pVnode == NULL) {
3,065!
1686
    dInfo("vgId:%d, failed to drop since %s", vgId, terrstr());
×
1687
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1688
    return terrno;
×
1689
  }
1690

1691
  pVnode->dropped = 1;
3,065✔
1692
  if ((code = vmWriteVnodeListToFile(pMgmt)) != 0) {
3,065!
1693
    pVnode->dropped = 0;
×
1694
    vmReleaseVnode(pMgmt, pVnode);
×
1695
    return code;
×
1696
  }
1697

1698
  vmCloseVnode(pMgmt, pVnode, false, false);
3,065✔
1699
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
3,065!
1700
    dError("vgId:%d, failed to write vnode list since %s", vgId, terrstr());
×
1701
  }
1702

1703
  dInfo("vgId:%d, is dropped", vgId);
3,065!
1704
  return 0;
3,065✔
1705
}
1706

1707
int32_t vmProcessArbHeartBeatReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
262✔
1708
  SVArbHeartBeatReq arbHbReq = {0};
262✔
1709
  SVArbHeartBeatRsp arbHbRsp = {0};
262✔
1710
  if (tDeserializeSVArbHeartBeatReq(pMsg->pCont, pMsg->contLen, &arbHbReq) != 0) {
262!
1711
    terrno = TSDB_CODE_INVALID_MSG;
×
1712
    return -1;
×
1713
  }
1714

1715
  if (arbHbReq.dnodeId != pMgmt->pData->dnodeId) {
262!
1716
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1717
    dError("dnodeId:%d, %s", arbHbReq.dnodeId, tstrerror(terrno));
×
1718
    goto _OVER;
×
1719
  }
1720

1721
  if (strlen(arbHbReq.arbToken) == 0) {
262!
1722
    terrno = TSDB_CODE_INVALID_MSG;
×
1723
    dError("dnodeId:%d arbToken is empty", arbHbReq.dnodeId);
×
1724
    goto _OVER;
×
1725
  }
1726

1727
  size_t size = taosArrayGetSize(arbHbReq.hbMembers);
262✔
1728

1729
  arbHbRsp.dnodeId = pMgmt->pData->dnodeId;
262✔
1730
  tstrncpy(arbHbRsp.arbToken, arbHbReq.arbToken, TSDB_ARB_TOKEN_SIZE);
262✔
1731
  arbHbRsp.hbMembers = taosArrayInit(size, sizeof(SVArbHbRspMember));
262✔
1732
  if (arbHbRsp.hbMembers == NULL) {
262!
1733
    goto _OVER;
×
1734
  }
1735

1736
  for (int32_t i = 0; i < size; i++) {
539✔
1737
    SVArbHbReqMember *pReqMember = taosArrayGet(arbHbReq.hbMembers, i);
277✔
1738
    SVnodeObj        *pVnode = vmAcquireVnode(pMgmt, pReqMember->vgId);
277✔
1739
    if (pVnode == NULL) {
277✔
1740
      dError("dnodeId:%d vgId:%d not found failed to process arb hb req", arbHbReq.dnodeId, pReqMember->vgId);
28!
1741
      continue;
28✔
1742
    }
1743

1744
    SVArbHbRspMember rspMember = {0};
249✔
1745
    rspMember.vgId = pReqMember->vgId;
249✔
1746
    rspMember.hbSeq = pReqMember->hbSeq;
249✔
1747
    if (vnodeGetArbToken(pVnode->pImpl, rspMember.memberToken) != 0) {
249!
1748
      dError("dnodeId:%d vgId:%d failed to get arb token", arbHbReq.dnodeId, pReqMember->vgId);
×
1749
      vmReleaseVnode(pMgmt, pVnode);
×
1750
      continue;
×
1751
    }
1752

1753
    if (vnodeUpdateArbTerm(pVnode->pImpl, arbHbReq.arbTerm) != 0) {
249!
1754
      dError("dnodeId:%d vgId:%d failed to update arb term", arbHbReq.dnodeId, pReqMember->vgId);
×
1755
      vmReleaseVnode(pMgmt, pVnode);
×
1756
      continue;
×
1757
    }
1758

1759
    if (taosArrayPush(arbHbRsp.hbMembers, &rspMember) == NULL) {
498!
1760
      dError("dnodeId:%d vgId:%d failed to push arb hb rsp member", arbHbReq.dnodeId, pReqMember->vgId);
×
1761
      vmReleaseVnode(pMgmt, pVnode);
×
1762
      goto _OVER;
×
1763
    }
1764

1765
    vmReleaseVnode(pMgmt, pVnode);
249✔
1766
  }
1767

1768
  SRpcMsg rspMsg = {.info = pMsg->info};
262✔
1769
  int32_t rspLen = tSerializeSVArbHeartBeatRsp(NULL, 0, &arbHbRsp);
262✔
1770
  if (rspLen < 0) {
262!
1771
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1772
    goto _OVER;
×
1773
  }
1774

1775
  void *pRsp = rpcMallocCont(rspLen);
262✔
1776
  if (pRsp == NULL) {
262!
1777
    terrno = terrno;
×
1778
    goto _OVER;
×
1779
  }
1780

1781
  if (tSerializeSVArbHeartBeatRsp(pRsp, rspLen, &arbHbRsp) <= 0) {
262!
1782
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1783
    rpcFreeCont(pRsp);
×
1784
    goto _OVER;
×
1785
  }
1786
  pMsg->info.rsp = pRsp;
262✔
1787
  pMsg->info.rspLen = rspLen;
262✔
1788

1789
  terrno = TSDB_CODE_SUCCESS;
262✔
1790

1791
_OVER:
262✔
1792
  tFreeSVArbHeartBeatReq(&arbHbReq);
262✔
1793
  tFreeSVArbHeartBeatRsp(&arbHbRsp);
262✔
1794
  return terrno;
262✔
1795
}
1796

1797
SArray *vmGetMsgHandles() {
1,809✔
1798
  int32_t code = -1;
1,809✔
1799
  SArray *pArray = taosArrayInit(32, sizeof(SMgmtHandle));
1,809✔
1800
  if (pArray == NULL) goto _OVER;
1,809!
1801

1802
  if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,809!
1803
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
1,809!
1804
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
1,809!
1805
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
1,809!
1806
  if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,809!
1807
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,809!
1808
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,809!
1809
  if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,809!
1810
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,809!
1811
  if (dmSetMgmtHandle(pArray, TDMT_VND_VSUBTABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,809!
1812
  if (dmSetMgmtHandle(pArray, TDMT_VND_VSTB_REF_DBS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,809!
1813
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_CFG, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,809!
1814
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,809!
1815
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,809!
1816
  if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,809!
1817
  if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,809!
1818
  if (dmSetMgmtHandle(pArray, TDMT_SCH_TASK_NOTIFY, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,809!
1819
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,809!
1820
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,809!
1821
  if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_TTL_EXPIRED_TBS, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,809!
1822
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,809!
1823
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,809!
1824
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,809!
1825
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,809!
1826
  if (dmSetMgmtHandle(pArray, TDMT_VND_SNODE_DROP_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,809!
1827
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,809!
1828
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DELETE_SUB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,809!
1829
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,809!
1830
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SEEK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,809!
1831
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_ADD_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,809!
1832
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DEL_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,809!
1833
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
1,809!
1834
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME_PUSH, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
1,809!
1835
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_WALINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,809!
1836
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_COMMITTEDINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,809!
1837
  if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,809!
1838
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,809!
1839
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,809!
1840
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,809!
1841
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,809!
1842
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,809!
1843
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_COMPACT_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,809!
1844
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_TRIM_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,809!
1845
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_SCAN_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,809!
1846
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,809!
1847
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_SCAN, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,809!
1848
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,809!
1849
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_NAME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,809!
1850
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,809!
1851
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,809!
1852
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,809!
1853

1854
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,809!
1855
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,809!
1856
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,809!
1857
  if (dmSetMgmtHandle(pArray, TDMT_VND_DISABLE_WRITE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,809!
1858
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,809!
1859
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,809!
1860
  if (dmSetMgmtHandle(pArray, TDMT_VND_SCAN, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,809!
1861
  if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,809!
1862

1863
  if (dmSetMgmtHandle(pArray, TDMT_VND_LIST_SSMIGRATE_FILESETS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,809!
1864
  if (dmSetMgmtHandle(pArray, TDMT_VND_SSMIGRATE_FILESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,809!
1865
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_SSMIGRATE_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,809!
1866
  if (dmSetMgmtHandle(pArray, TDMT_VND_FOLLOWER_SSMIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,809!
1867
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_SSMIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,809!
1868

1869
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
1,809!
1870
  if (dmSetMgmtHandle(pArray, TDMT_DND_MOUNT_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
1,809!
1871
  if (dmSetMgmtHandle(pArray, TDMT_DND_RETRIEVE_MOUNT_PATH, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
1,809!
1872
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,809!
1873
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,809!
1874
  if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,809!
1875
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CONFIG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,809!
1876
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_ELECTBASELINE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,809!
1877

1878
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT_ELECTION, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,809!
1879
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,809!
1880
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,809!
1881
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,809!
1882
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,809!
1883
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,809!
1884
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,809!
1885
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,809!
1886
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,809!
1887
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,809!
1888
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,809!
1889
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_FORCE_FOLLOWER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,809!
1890

1891
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
1,809!
1892
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
1,809!
1893
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
1,809!
1894
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
1,809!
1895
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
1,809!
1896

1897
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_HEARTBEAT, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,809!
1898
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_CHECK_SYNC, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,809!
1899
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_ASSIGNED_LEADER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,809!
1900

1901
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_FETCH, vmPutMsgToStreamReaderQueue, 0) == NULL) goto _OVER;
1,809!
1902
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_PULL, vmPutMsgToStreamReaderQueue, 0) == NULL) goto _OVER;
1,809!
1903
  code = 0;
1,809✔
1904

1905
_OVER:
1,809✔
1906
  if (code != 0) {
1,809!
1907
    taosArrayDestroy(pArray);
×
1908
    return NULL;
×
1909
  } else {
1910
    return pArray;
1,809✔
1911
  }
1912
}
1913

1914
void vmUpdateMetricsInfo(SVnodeMgmt *pMgmt, int64_t clusterId) {
×
1915
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
1916

1917
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
1918
  while (pIter) {
×
1919
    SVnodeObj **ppVnode = pIter;
×
1920
    if (ppVnode == NULL || *ppVnode == NULL) {
×
1921
      continue;
×
1922
    }
1923

1924
    SVnodeObj *pVnode = *ppVnode;
×
1925
    if (!pVnode->failed) {
×
1926
      SRawWriteMetrics metrics = {0};
×
1927
      if (vnodeGetRawWriteMetrics(pVnode->pImpl, &metrics) == 0) {
×
1928
        // Add the metrics to the global metrics system with cluster ID
1929
        SName   name = {0};
×
1930
        int32_t code = tNameFromString(&name, pVnode->pImpl->config.dbname, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1931
        if (code < 0) {
×
1932
          dError("failed to get db name since %s", tstrerror(code));
×
1933
          continue;
×
1934
        }
1935
        code = addWriteMetrics(pVnode->vgId, pMgmt->pData->dnodeId, clusterId, tsLocalEp, name.dbname, &metrics);
×
1936
        if (code != TSDB_CODE_SUCCESS) {
×
1937
          dError("Failed to add write metrics for vgId: %d, code: %d", pVnode->vgId, code);
×
1938
        } else {
1939
          // After successfully adding metrics, reset the vnode's write metrics using atomic operations
1940
          if (vnodeResetRawWriteMetrics(pVnode->pImpl, &metrics) != 0) {
×
1941
            dError("Failed to reset write metrics for vgId: %d", pVnode->vgId);
×
1942
          }
1943
        }
1944
      } else {
1945
        dError("Failed to get write metrics for vgId: %d", pVnode->vgId);
×
1946
      }
1947
    }
1948
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
1949
  }
1950

1951
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
1952
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc