• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4506

15 Jul 2025 12:33AM UTC coverage: 62.026% (-0.7%) from 62.706%
#4506

push

travis-ci

web-flow
docs: update stream docs (#31874)

155391 of 320094 branches covered (48.55%)

Branch coverage included in aggregate %.

240721 of 318525 relevant lines covered (75.57%)

6529048.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

31.85
/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "metrics.h"
18
#include "taos_monitor.h"
19
#include "vmInt.h"
20
#include "vnd.h"
21
#include "vnodeInt.h"
22

23
extern taos_counter_t *tsInsertCounter;
24

25
// Forward declaration for function defined in metrics.c
26
extern int32_t addWriteMetrics(int32_t vgId, int32_t dnodeId, int64_t clusterId, const char *dnodeEp,
27
                               const char *dbname, const SRawWriteMetrics *pRawMetrics);
28

29
void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
111,228✔
30
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad));
111,228✔
31
  if (pInfo->pVloads == NULL) return;
111,228!
32

33
  tfsUpdateSize(pMgmt->pTfs);
111,228✔
34

35
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
111,228✔
36

37
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
111,228✔
38
  while (pIter) {
372,091✔
39
    SVnodeObj **ppVnode = pIter;
260,863✔
40
    if (ppVnode == NULL || *ppVnode == NULL) continue;
260,863!
41

42
    SVnodeObj *pVnode = *ppVnode;
260,863✔
43
    SVnodeLoad vload = {.vgId = pVnode->vgId};
260,863✔
44
    if (!pVnode->failed) {
260,863!
45
      if (vnodeGetLoad(pVnode->pImpl, &vload) != 0) {
260,863!
46
        dError("failed to get vnode load");
×
47
      }
48
      if (isReset) vnodeResetLoad(pVnode->pImpl, &vload);
260,863✔
49
    }
50
    if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
521,726!
51
      dError("failed to push vnode load");
×
52
    }
53
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
260,863✔
54
  }
55

56
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
111,228✔
57
}
58

59
void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
5✔
60
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoadLite));
5✔
61
  if (!pInfo->pVloads) return;
5!
62

63
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
5✔
64

65
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
5✔
66
  while (pIter) {
5!
67
    SVnodeObj **ppVnode = pIter;
×
68
    if (ppVnode == NULL || *ppVnode == NULL) continue;
×
69

70
    SVnodeObj *pVnode = *ppVnode;
×
71
    if (!pVnode->failed) {
×
72
      SVnodeLoadLite vload = {0};
×
73
      if (vnodeGetLoadLite(pVnode->pImpl, &vload) == 0) {
×
74
        if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
×
75
          taosArrayDestroy(pInfo->pVloads);
×
76
          pInfo->pVloads = NULL;
×
77
          break;
×
78
        }
79
      }
80
    }
81
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
82
  }
83

84
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
5✔
85
}
86

87
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
16✔
88
  SMonVloadInfo vloads = {0};
16✔
89
  vmGetVnodeLoads(pMgmt, &vloads, true);
16✔
90

91
  SArray *pVloads = vloads.pVloads;
16✔
92
  if (pVloads == NULL) return;
16!
93

94
  int32_t totalVnodes = 0;
16✔
95
  int32_t masterNum = 0;
16✔
96
  int64_t numOfSelectReqs = 0;
16✔
97
  int64_t numOfInsertReqs = 0;
16✔
98
  int64_t numOfInsertSuccessReqs = 0;
16✔
99
  int64_t numOfBatchInsertReqs = 0;
16✔
100
  int64_t numOfBatchInsertSuccessReqs = 0;
16✔
101

102
  for (int32_t i = 0; i < taosArrayGetSize(pVloads); ++i) {
54✔
103
    SVnodeLoad *pLoad = taosArrayGet(pVloads, i);
38✔
104
    numOfSelectReqs += pLoad->numOfSelectReqs;
38✔
105
    numOfInsertReqs += pLoad->numOfInsertReqs;
38✔
106
    numOfInsertSuccessReqs += pLoad->numOfInsertSuccessReqs;
38✔
107
    numOfBatchInsertReqs += pLoad->numOfBatchInsertReqs;
38✔
108
    numOfBatchInsertSuccessReqs += pLoad->numOfBatchInsertSuccessReqs;
38✔
109
    if (pLoad->syncState == TAOS_SYNC_STATE_LEADER || pLoad->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
38!
110
      masterNum++;
38✔
111
    }
112
    totalVnodes++;
38✔
113
  }
114

115
  pInfo->vstat.totalVnodes = totalVnodes;
16✔
116
  pInfo->vstat.masterNum = masterNum;
16✔
117
  pInfo->vstat.numOfSelectReqs = numOfSelectReqs;
16✔
118
  pInfo->vstat.numOfInsertReqs = numOfInsertReqs;                          // delta
16✔
119
  pInfo->vstat.numOfInsertSuccessReqs = numOfInsertSuccessReqs;            // delta
16✔
120
  pInfo->vstat.numOfBatchInsertReqs = numOfBatchInsertReqs;                // delta
16✔
121
  pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;  // delta
16✔
122
  pMgmt->state.totalVnodes = totalVnodes;
16✔
123
  pMgmt->state.masterNum = masterNum;
16✔
124
  pMgmt->state.numOfSelectReqs = numOfSelectReqs;
16✔
125
  pMgmt->state.numOfInsertReqs = numOfInsertReqs;
16✔
126
  pMgmt->state.numOfInsertSuccessReqs = numOfInsertSuccessReqs;
16✔
127
  pMgmt->state.numOfBatchInsertReqs = numOfBatchInsertReqs;
16✔
128
  pMgmt->state.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;
16✔
129

130
  if (tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs) != 0) {
16!
131
    dError("failed to get tfs monitor info");
×
132
  }
133
  taosArrayDestroy(pVloads);
16✔
134
}
135

136
void vmCleanExpriedSamples(SVnodeMgmt *pMgmt) {
16✔
137
  int list_size = taos_counter_get_keys_size(tsInsertCounter);
16✔
138
  if (list_size == 0) return;
16✔
139
  int32_t *vgroup_ids;
140
  char   **keys;
141
  int      r = 0;
1✔
142
  r = taos_counter_get_vgroup_ids(tsInsertCounter, &keys, &vgroup_ids, &list_size);
1✔
143
  if (r) {
1!
144
    dError("failed to get vgroup ids");
×
145
    return;
×
146
  }
147
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
1✔
148
  for (int i = 0; i < list_size; i++) {
2✔
149
    int32_t vgroup_id = vgroup_ids[i];
1✔
150
    void   *vnode = taosHashGet(pMgmt->runngingHash, &vgroup_id, sizeof(int32_t));
1✔
151
    if (vnode == NULL) {
1!
152
      r = taos_counter_delete(tsInsertCounter, keys[i]);
×
153
      if (r) {
×
154
        dError("failed to delete monitor sample key:%s", keys[i]);
×
155
      }
156
    }
157
  }
158
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
1✔
159
  if (vgroup_ids) taosMemoryFree(vgroup_ids);
1!
160
  if (keys) taosMemoryFree(keys);
1!
161
  return;
1✔
162
}
163

164
void vmCleanExpiredMetrics(SVnodeMgmt *pMgmt) {
×
165
  if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0 || !tsEnableMetrics) {
×
166
    return;
×
167
  }
168

169
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
170
  void     *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
171
  SHashObj *pValidVgroups = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
×
172
  if (pValidVgroups == NULL) {
×
173
    (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
174
    return;
×
175
  }
176

177
  while (pIter != NULL) {
×
178
    SVnodeObj **ppVnode = pIter;
×
179
    if (ppVnode && *ppVnode) {
×
180
      int32_t vgId = (*ppVnode)->vgId;
×
181
      char    dummy = 1;  // hash table value (we only care about the key)
×
182
      if (taosHashPut(pValidVgroups, &vgId, sizeof(int32_t), &dummy, sizeof(char)) != 0) {
×
183
        dError("failed to put vgId:%d to valid vgroups hash", vgId);
×
184
      }
185
    }
186
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
187
  }
188
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
189

190
  // Clean expired metrics by removing metrics for non-existent vgroups
191
  int32_t code = cleanupExpiredMetrics(pValidVgroups);
×
192
  if (code != TSDB_CODE_SUCCESS) {
×
193
    dError("failed to clean expired metrics, code:%d", code);
×
194
  }
195

196
  taosHashCleanup(pValidVgroups);
×
197
}
198

199
static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
11,391✔
200
  memcpy(pCfg, &vnodeCfgDefault, sizeof(SVnodeCfg));
11,391✔
201

202
  pCfg->vgId = pCreate->vgId;
11,391✔
203
  tstrncpy(pCfg->dbname, pCreate->db, sizeof(pCfg->dbname));
11,391✔
204
  pCfg->dbId = pCreate->dbUid;
11,391✔
205
  pCfg->szPage = pCreate->pageSize * 1024;
11,391✔
206
  pCfg->szCache = pCreate->pages;
11,391✔
207
  pCfg->cacheLast = pCreate->cacheLast;
11,391✔
208
  pCfg->cacheLastSize = pCreate->cacheLastSize;
11,391✔
209
  pCfg->szBuf = (uint64_t)pCreate->buffer * 1024 * 1024;
11,391✔
210
  pCfg->isWeak = true;
11,391✔
211
  pCfg->isTsma = pCreate->isTsma;
11,391✔
212
  pCfg->tsdbCfg.compression = pCreate->compression;
11,391✔
213
  pCfg->tsdbCfg.precision = pCreate->precision;
11,391✔
214
  pCfg->tsdbCfg.days = pCreate->daysPerFile;
11,391✔
215
  pCfg->tsdbCfg.keep0 = pCreate->daysToKeep0;
11,391✔
216
  pCfg->tsdbCfg.keep1 = pCreate->daysToKeep1;
11,391✔
217
  pCfg->tsdbCfg.keep2 = pCreate->daysToKeep2;
11,391✔
218
  pCfg->tsdbCfg.keepTimeOffset = pCreate->keepTimeOffset;
11,391✔
219
  pCfg->tsdbCfg.minRows = pCreate->minRows;
11,391✔
220
  pCfg->tsdbCfg.maxRows = pCreate->maxRows;
11,391✔
221
  for (size_t i = 0; i < taosArrayGetSize(pCreate->pRetensions); ++i) {
11,418✔
222
    SRetention *pRetention = &pCfg->tsdbCfg.retentions[i];
27✔
223
    memcpy(pRetention, taosArrayGet(pCreate->pRetensions, i), sizeof(SRetention));
27✔
224
    if (i == 0) {
27✔
225
      if ((pRetention->freq >= 0 && pRetention->keep > 0)) pCfg->isRsma = 1;
9!
226
    }
227
  }
228
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
229
  pCfg->tsdbCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
11,390✔
230
  if (pCfg->tsdbCfg.encryptAlgorithm == DND_CA_SM4) {
11,390✔
231
    tstrncpy(pCfg->tsdbCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
1✔
232
  }
233
#else
234
  pCfg->tsdbCfg.encryptAlgorithm = 0;
235
#endif
236

237
  pCfg->walCfg.vgId = pCreate->vgId;  // pCreate->mountVgId ? pCreate->mountVgId : pCreate->vgId;
11,390✔
238
  pCfg->walCfg.fsyncPeriod = pCreate->walFsyncPeriod;
11,390✔
239
  pCfg->walCfg.retentionPeriod = pCreate->walRetentionPeriod;
11,390✔
240
  pCfg->walCfg.rollPeriod = pCreate->walRollPeriod;
11,390✔
241
  pCfg->walCfg.retentionSize = pCreate->walRetentionSize;
11,390✔
242
  pCfg->walCfg.segSize = pCreate->walSegmentSize;
11,390✔
243
  pCfg->walCfg.level = pCreate->walLevel;
11,390✔
244
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
245
  pCfg->walCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
11,390✔
246
  if (pCfg->walCfg.encryptAlgorithm == DND_CA_SM4) {
11,390✔
247
    tstrncpy(pCfg->walCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
1✔
248
  }
249
#else
250
  pCfg->walCfg.encryptAlgorithm = 0;
251
#endif
252

253
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
254
  pCfg->tdbEncryptAlgorithm = pCreate->encryptAlgorithm;
11,390✔
255
  if (pCfg->tdbEncryptAlgorithm == DND_CA_SM4) {
11,390✔
256
    tstrncpy(pCfg->tdbEncryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
1✔
257
  }
258
#else
259
  pCfg->tdbEncryptAlgorithm = 0;
260
#endif
261

262
  pCfg->sttTrigger = pCreate->sstTrigger;
11,390✔
263
  pCfg->hashBegin = pCreate->hashBegin;
11,390✔
264
  pCfg->hashEnd = pCreate->hashEnd;
11,390✔
265
  pCfg->hashMethod = pCreate->hashMethod;
11,390✔
266
  pCfg->hashPrefix = pCreate->hashPrefix;
11,390✔
267
  pCfg->hashSuffix = pCreate->hashSuffix;
11,390✔
268
  pCfg->tsdbPageSize = pCreate->tsdbPageSize * 1024;
11,390✔
269

270
  pCfg->ssChunkSize = pCreate->ssChunkSize;
11,390✔
271
  pCfg->ssKeepLocal = pCreate->ssKeepLocal;
11,390✔
272
  pCfg->ssCompact = pCreate->ssCompact;
11,390✔
273

274
  pCfg->standby = 0;
11,390✔
275
  pCfg->syncCfg.replicaNum = 0;
11,390✔
276
  pCfg->syncCfg.totalReplicaNum = 0;
11,390✔
277
  pCfg->syncCfg.changeVersion = pCreate->changeVersion;
11,390✔
278

279
  memset(&pCfg->syncCfg.nodeInfo, 0, sizeof(pCfg->syncCfg.nodeInfo));
11,390✔
280
  for (int32_t i = 0; i < pCreate->replica; ++i) {
26,303✔
281
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
14,912✔
282
    pNode->nodeId = pCreate->replicas[pCfg->syncCfg.replicaNum].id;
14,912✔
283
    pNode->nodePort = pCreate->replicas[pCfg->syncCfg.replicaNum].port;
14,912✔
284
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
14,912✔
285
    tstrncpy(pNode->nodeFqdn, pCreate->replicas[pCfg->syncCfg.replicaNum].fqdn, TSDB_FQDN_LEN);
14,912✔
286
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
14,912✔
287
    pCfg->syncCfg.replicaNum++;
14,913✔
288
  }
289
  if (pCreate->selfIndex != -1) {
11,391✔
290
    pCfg->syncCfg.myIndex = pCreate->selfIndex;
11,142✔
291
  }
292
  for (int32_t i = pCfg->syncCfg.replicaNum; i < pCreate->replica + pCreate->learnerReplica; ++i) {
11,640✔
293
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
249✔
294
    pNode->nodeId = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].id;
249✔
295
    pNode->nodePort = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].port;
249✔
296
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
249✔
297
    tstrncpy(pNode->nodeFqdn, pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].fqdn, TSDB_FQDN_LEN);
249✔
298
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
249✔
299
    pCfg->syncCfg.totalReplicaNum++;
249✔
300
  }
301
  pCfg->syncCfg.totalReplicaNum += pCfg->syncCfg.replicaNum;
11,391✔
302
  if (pCreate->learnerSelfIndex != -1) {
11,391✔
303
    pCfg->syncCfg.myIndex = pCreate->replica + pCreate->learnerSelfIndex;
249✔
304
  }
305
}
11,391✔
306

307
static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) {
11,390✔
308
  pCfg->vgId = pCreate->vgId;
11,390✔
309
  pCfg->vgVersion = pCreate->vgVersion;
11,390✔
310
  pCfg->dropped = 0;
11,390✔
311
  snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCreate->vgId);
11,390✔
312
}
11,390✔
313

314
static int32_t vmTsmaAdjustDays(SVnodeCfg *pCfg, SCreateVnodeReq *pReq) {
11,390✔
315
  if (pReq->isTsma) {
11,390!
316
    SMsgHead *smaMsg = pReq->pTsma;
×
317
    uint32_t  contLen = (uint32_t)(htonl(smaMsg->contLen) - sizeof(SMsgHead));
×
318
    return smaGetTSmaDays(pCfg, POINTER_SHIFT(smaMsg, sizeof(SMsgHead)), contLen, &pCfg->tsdbCfg.days);
×
319
  }
320
  return 0;
11,390✔
321
}
322

323
#if 0
324
static int32_t vmTsmaProcessCreate(SVnode *pVnode, SCreateVnodeReq *pReq) {
325
  if (pReq->isTsma) {
326
    SMsgHead *smaMsg = pReq->pTsma;
327
    uint32_t  contLen = (uint32_t)(htonl(smaMsg->contLen) - sizeof(SMsgHead));
328
    return vnodeProcessCreateTSma(pVnode, POINTER_SHIFT(smaMsg, sizeof(SMsgHead)), contLen);
329
  }
330
  return 0;
331
}
332
#endif
333

334
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
11,385✔
335
  SCreateVnodeReq req = {0};
11,385✔
336
  SVnodeCfg       vnodeCfg = {0};
11,385✔
337
  SWrapperCfg     wrapperCfg = {0};
11,385✔
338
  int32_t         code = -1;
11,385✔
339
  char            path[TSDB_FILENAME_LEN] = {0};
11,385✔
340

341
  if (tDeserializeSCreateVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
11,385!
342
    return TSDB_CODE_INVALID_MSG;
×
343
  }
344

345
  if (req.learnerReplica == 0) {
11,330✔
346
    req.learnerSelfIndex = -1;
11,088✔
347
  }
348

349
  dInfo(
11,330!
350
      "vgId:%d, vnode management handle msgType:%s, start to create vnode, page:%d pageSize:%d buffer:%d szPage:%d "
351
      "szBuf:%" PRIu64 ", cacheLast:%d cacheLastSize:%d sstTrigger:%d tsdbPageSize:%d %d dbname:%s dbId:%" PRId64
352
      ", days:%d keep0:%d keep1:%d keep2:%d keepTimeOffset%d ssChunkSize:%d ssKeepLocal:%d ssCompact:%d tsma:%d "
353
      "precision:%d compression:%d minRows:%d maxRows:%d"
354
      ", wal fsync:%d level:%d retentionPeriod:%d retentionSize:%" PRId64 " rollPeriod:%d segSize:%" PRId64
355
      ", hash method:%d begin:%u end:%u prefix:%d surfix:%d replica:%d selfIndex:%d "
356
      "learnerReplica:%d learnerSelfIndex:%d strict:%d changeVersion:%d encryptAlgorithm:%d",
357
      req.vgId, TMSG_INFO(pMsg->msgType), req.pages, req.pageSize, req.buffer, req.pageSize * 1024,
358
      (uint64_t)req.buffer * 1024 * 1024, req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize,
359
      req.tsdbPageSize * 1024, req.db, req.dbUid, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
360
      req.keepTimeOffset, req.ssChunkSize, req.ssKeepLocal, req.ssCompact, req.isTsma, req.precision, req.compression,
361
      req.minRows, req.maxRows, req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize,
362
      req.walRollPeriod, req.walSegmentSize, req.hashMethod, req.hashBegin, req.hashEnd, req.hashPrefix, req.hashSuffix,
363
      req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict, req.changeVersion,
364
      req.encryptAlgorithm);
365

366
  for (int32_t i = 0; i < req.replica; ++i) {
26,304✔
367
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port,
14,912!
368
          req.replicas[i].id);
369
  }
370
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
11,641✔
371
    dInfo("vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.vgId, i, req.learnerReplicas[i].fqdn,
249!
372
          req.learnerReplicas[i].port, req.replicas[i].id);
373
  }
374

375
  SReplica *pReplica = NULL;
11,392✔
376
  if (req.selfIndex != -1) {
11,392✔
377
    pReplica = &req.replicas[req.selfIndex];
11,142✔
378
  } else {
379
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
250✔
380
  }
381
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
11,392!
382
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
11,391!
383
    (void)tFreeSCreateVnodeReq(&req);
1✔
384
    code = TSDB_CODE_INVALID_MSG;
×
385
    dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode, reason:%s", req.vgId, pReplica->id,
×
386
           pReplica->fqdn, pReplica->port, tstrerror(code));
387
    return code;
×
388
  }
389

390
  if (req.encryptAlgorithm == DND_CA_SM4) {
11,391✔
391
    if (strlen(tsEncryptKey) == 0) {
1!
392
      (void)tFreeSCreateVnodeReq(&req);
×
393
      code = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
394
      dError("vgId:%d, failed to create vnode since encrypt key is empty, reason:%s", req.vgId, tstrerror(code));
×
395
      return code;
×
396
    }
397
  }
398

399
  vmGenerateVnodeCfg(&req, &vnodeCfg);
11,391✔
400

401
  if ((code = vmTsmaAdjustDays(&vnodeCfg, &req)) < 0) {
11,390!
402
    dError("vgId:%d, failed to adjust tsma days since %s", req.vgId, tstrerror(code));
×
403
    goto _OVER;
×
404
  }
405

406
  vmGenerateWrapperCfg(pMgmt, &req, &wrapperCfg);
11,390✔
407

408
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, req.vgId, false);
11,390✔
409
  if (pVnode != NULL && (req.replica == 1 || !pVnode->failed)) {
11,391!
410
    dError("vgId:%d, already exist", req.vgId);
144!
411
    (void)tFreeSCreateVnodeReq(&req);
144✔
412
    vmReleaseVnode(pMgmt, pVnode);
144✔
413
    code = TSDB_CODE_VND_ALREADY_EXIST;
144✔
414
    return 0;
144✔
415
  }
416

417
  int32_t diskPrimary = vmGetPrimaryDisk(pMgmt, vnodeCfg.vgId);
11,247✔
418
  if (diskPrimary < 0) {
11,246✔
419
    diskPrimary = vmAllocPrimaryDisk(pMgmt, vnodeCfg.vgId);
11,245✔
420
  }
421
  wrapperCfg.diskPrimary = diskPrimary;
11,248✔
422

423
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
11,248✔
424

425
  if ((code = vnodeCreate(path, &vnodeCfg, diskPrimary, pMgmt->pTfs)) < 0) {
11,248!
426
    dError("vgId:%d, failed to create vnode since %s", req.vgId, tstrerror(code));
×
427
    vmReleaseVnode(pMgmt, pVnode);
×
428
    vmCleanPrimaryDisk(pMgmt, req.vgId);
×
429
    (void)tFreeSCreateVnodeReq(&req);
×
430
    return code;
×
431
  }
432

433
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, true);
11,247✔
434
  if (pImpl == NULL) {
11,247!
435
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
436
    code = terrno != 0 ? terrno : -1;
×
437
    goto _OVER;
×
438
  }
439

440
  code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl);
11,247✔
441
  if (code != 0) {
11,247!
442
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
443
    code = terrno != 0 ? terrno : code;
×
444
    goto _OVER;
×
445
  }
446

447
#if 0
448
  code = vmTsmaProcessCreate(pImpl, &req);
449
  if (code != 0) {
450
    dError("vgId:%d, failed to create tsma since %s", req.vgId, terrstr());
451
    code = terrno;
452
    goto _OVER;
453
  }
454
#endif
455

456
  code = vnodeStart(pImpl);
11,247✔
457
  if (code != 0) {
11,247!
458
    dError("vgId:%d, failed to start sync since %s", req.vgId, terrstr());
×
459
    goto _OVER;
×
460
  }
461

462
  code = vmWriteVnodeListToFile(pMgmt);
11,247✔
463
  if (code != 0) {
11,247!
464
    code = terrno != 0 ? terrno : code;
×
465
    goto _OVER;
×
466
  }
467

468
_OVER:
11,247✔
469
  vmCleanPrimaryDisk(pMgmt, req.vgId);
11,247✔
470

471
  if (code != 0) {
11,247!
472
    vmCloseFailedVnode(pMgmt, req.vgId);
×
473

474
    vnodeClose(pImpl);
×
475
    vnodeDestroy(0, path, pMgmt->pTfs, 0);
×
476
  } else {
477
    dInfo("vgId:%d, vnode management handle msgType:%s, end to create vnode, vnode is created", req.vgId,
11,247!
478
          TMSG_INFO(pMsg->msgType));
479
  }
480

481
  (void)tFreeSCreateVnodeReq(&req);
11,247✔
482
  terrno = code;
11,247✔
483
  return code;
11,247✔
484
}
485

486
#ifdef USE_MOUNT
487
typedef struct {
488
  int64_t dbId;
489
  int32_t vgId;
490
  int32_t diskPrimary;
491
} SMountDbVgId;
492
extern int32_t vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo);
493
extern int32_t mndFetchSdbStables(const char *mntName, const char *path, void *output);
494

495
static int compareVnodeInfo(const void *p1, const void *p2) {
×
496
  SVnodeInfo *v1 = (SVnodeInfo *)p1;
×
497
  SVnodeInfo *v2 = (SVnodeInfo *)p2;
×
498

499
  if (v1->config.dbId == v2->config.dbId) {
×
500
    if (v1->config.vgId == v2->config.vgId) {
×
501
      return 0;
×
502
    }
503
    return v1->config.vgId > v2->config.vgId ? 1 : -1;
×
504
  }
505

506
  return v1->config.dbId > v2->config.dbId ? 1 : -1;
×
507
}
508
static int compareVgDiskPrimary(const void *p1, const void *p2) {
×
509
  SMountDbVgId *v1 = (SMountDbVgId *)p1;
×
510
  SMountDbVgId *v2 = (SMountDbVgId *)p2;
×
511

512
  if (v1->dbId == v2->dbId) {
×
513
    if (v1->vgId == v2->vgId) {
×
514
      return 0;
×
515
    }
516
    return v1->vgId > v2->vgId ? 1 : -1;
×
517
  }
518

519
  return v1->dbId > v2->dbId ? 1 : -1;
×
520
}
521

522
static int32_t vmRetrieveMountDnode(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
×
523
  int32_t   code = 0, lino = 0;
×
524
  TdFilePtr pFile = NULL;
×
525
  char     *content = NULL;
×
526
  SJson    *pJson = NULL;
×
527
  int64_t   size = 0;
×
528
  int64_t   clusterId = 0, dropped = 0, encryptScope = 0;
×
529
  char      file[TSDB_MOUNT_FPATH_LEN] = {0};
×
530
  SArray   *pDisks = NULL;
×
531
  // step 1: fetch clusterId from dnode.json
532
  (void)snprintf(file, sizeof(file), "%s%s%s%sdnode.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP);
×
533
  TAOS_CHECK_EXIT(taosStatFile(file, &size, NULL, NULL));
×
534
  TSDB_CHECK_NULL((pFile = taosOpenFile(file, TD_FILE_READ)), code, lino, _exit, terrno);
×
535
  TSDB_CHECK_NULL((content = taosMemoryMalloc(size + 1)), code, lino, _exit, terrno);
×
536
  if (taosReadFile(pFile, content, size) != size) {
×
537
    TAOS_CHECK_EXIT(terrno);
×
538
  }
539
  content[size] = '\0';
×
540
  pJson = tjsonParse(content);
×
541
  if (pJson == NULL) {
×
542
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
543
  }
544
  tjsonGetNumberValue(pJson, "dropped", dropped, code);
×
545
  TAOS_CHECK_EXIT(code);
×
546
  if (dropped == 1) {
×
547
    TAOS_CHECK_EXIT(TSDB_CODE_MND_MOUNT_DNODE_DROPPED);
×
548
  }
549
  tjsonGetNumberValue(pJson, "encryptScope", encryptScope, code);
×
550
  TAOS_CHECK_EXIT(code);
×
551
  if (encryptScope != 0) {
×
552
    TAOS_CHECK_EXIT(TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG);
×
553
  }
554
  tjsonGetNumberValue(pJson, "clusterId", clusterId, code);
×
555
  TAOS_CHECK_EXIT(code);
×
556
  if (clusterId == 0) {
×
557
    TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_CLUSTER_ID);
×
558
  }
559
  pMountInfo->clusterId = clusterId;
×
560
  if (content != NULL) taosMemoryFreeClear(content);
×
561
  if (pJson != NULL) {
×
562
    cJSON_Delete(pJson);
×
563
    pJson = NULL;
×
564
  }
565
  if (pFile != NULL) taosCloseFile(&pFile);
×
566
  // step 2: fetch dataDir from dnode/config/local.json
567
  TAOS_CHECK_EXIT(vmGetMountDisks(pMgmt, pReq->mountPath, &pDisks));
×
568
  int32_t nDisks = taosArrayGetSize(pDisks);
×
569
  if (nDisks < 1 || nDisks > TFS_MAX_DISKS) {
×
570
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
571
  }
572
  for (int32_t i = 0; i < nDisks; ++i) {
×
573
    SDiskCfg *pDisk = TARRAY_GET_ELEM(pDisks, i);
×
574
    if (!pMountInfo->pDisks[pDisk->level]) {
×
575
      pMountInfo->pDisks[pDisk->level] = taosArrayInit(1, sizeof(char *));
×
576
      if (!pMountInfo->pDisks[pDisk->level]) {
×
577
        TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
578
      }
579
    }
580
    char *pDir = taosStrdup(pDisk->dir);
×
581
    if (pDir == NULL) {
×
582
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
583
    }
584
    if (pDisk->primary == 1 && taosArrayGetSize(pMountInfo->pDisks[0])) {
×
585
      // put the primary disk to the first position of level 0
586
      if (!taosArrayInsert(pMountInfo->pDisks[0], 0, &pDir)) {
×
587
        taosMemFree(pDir);
×
588
        TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
589
      }
590
    } else if (!taosArrayPush(pMountInfo->pDisks[pDisk->level], &pDir)) {
×
591
      taosMemFree(pDir);
×
592
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
593
    }
594
  }
595
  for (int32_t i = 0; i < TFS_MAX_TIERS; ++i) {
×
596
    int32_t nDisk = taosArrayGetSize(pMountInfo->pDisks[i]);
×
597
    if (nDisk < (i == 0 ? 1 : 0) || nDisk > TFS_MAX_DISKS_PER_TIER) {
×
598
      dError("mount:%s, invalid disk number:%d at level:%d", pReq->mountName, nDisk, i);
×
599
      TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
600
    }
601
  }
602
_exit:
×
603
  if (content != NULL) taosMemoryFreeClear(content);
×
604
  if (pJson != NULL) cJSON_Delete(pJson);
×
605
  if (pFile != NULL) taosCloseFile(&pFile);
×
606
  if (pDisks != NULL) {
×
607
    taosArrayDestroy(pDisks);
×
608
    pDisks = NULL;
×
609
  }
610
  if (code != 0) {
×
611
    dError("mount:%s, failed to retrieve mount dnode at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
612
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
613
  } else {
614
    dInfo("mount:%s, success to retrieve mount dnode on dnode:%d, clusterId:%" PRId64 ", path:%s", pReq->mountName,
×
615
          pReq->dnodeId, pMountInfo->clusterId, pReq->mountPath);
616
  }
617
  TAOS_RETURN(code);
×
618
}
619

620
static int32_t vmRetrieveMountVnodes(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
×
621
  int32_t       code = 0, lino = 0;
×
622
  SWrapperCfg  *pCfgs = NULL;
×
623
  int32_t       numOfVnodes = 0;
×
624
  char          path[TSDB_MOUNT_FPATH_LEN] = {0};
×
625
  TdDirPtr      pDir = NULL;
×
626
  TdDirEntryPtr de = NULL;
×
627
  SVnodeMgmt    vnodeMgmt = {0};
×
628
  SArray       *pVgCfgs = NULL;
×
629
  SArray       *pDbInfos = NULL;
×
630
  SArray       *pDiskPrimarys = NULL;
×
631

632
  snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
×
633
  vnodeMgmt.path = path;
×
634
  TAOS_CHECK_EXIT(vmGetVnodeListFromFile(&vnodeMgmt, &pCfgs, &numOfVnodes));
×
635
  dInfo("mount:%s, num of vnodes is %d in path:%s", pReq->mountName, numOfVnodes, vnodeMgmt.path);
×
636
  TSDB_CHECK_NULL((pVgCfgs = taosArrayInit_s(sizeof(SVnodeInfo), numOfVnodes)), code, lino, _exit, terrno);
×
637
  TSDB_CHECK_NULL((pDiskPrimarys = taosArrayInit(numOfVnodes, sizeof(SMountDbVgId))), code, lino, _exit, terrno);
×
638

639
  int32_t nDiskLevel0 = taosArrayGetSize(pMountInfo->pDisks[0]);
×
640
  int32_t nVgDropped = 0, j = 0;
×
641
  for (int32_t i = 0; i < numOfVnodes; ++i) {
×
642
    SWrapperCfg *pCfg = &pCfgs[i];
×
643
    // in order to support multi-tier disk, the pCfg->path should be adapted according to the diskPrimary firstly
644
    if (nDiskLevel0 > 1) {
×
645
      char *pDir = taosArrayGet(pMountInfo->pDisks[0], pCfg->diskPrimary);
×
646
      if (!pDir) TAOS_CHECK_EXIT(TSDB_CODE_INTERNAL_ERROR);
×
647
      (void)snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%svnode%d", *(char **)pDir, TD_DIRSEP, TD_DIRSEP,
×
648
                     pCfg->vgId);
649
    }
650
    dInfo("mount:%s, vnode path:%s, dropped:%" PRIi8, pReq->mountName, pCfg->path, pCfg->dropped);
×
651
    if (pCfg->dropped) {
×
652
      ++nVgDropped;
×
653
      continue;
×
654
    }
655
    if (!taosCheckAccessFile(pCfg->path, TD_FILE_ACCESS_EXIST_OK | TD_FILE_ACCESS_READ_OK | TD_FILE_ACCESS_WRITE_OK)) {
×
656
      dError("mount:%s, vnode path:%s, no r/w authority", pReq->mountName, pCfg->path);
×
657
      TAOS_CHECK_EXIT(TSDB_CODE_MND_NO_RIGHTS);
×
658
    }
659
    SVnodeInfo *pInfo = TARRAY_GET_ELEM(pVgCfgs, j++);
×
660
    TAOS_CHECK_EXIT(vnodeLoadInfo(pCfg->path, pInfo));
×
661
    if (pInfo->config.syncCfg.replicaNum > 1) {
×
662
      dError("mount:%s, vnode path:%s, invalid replica:%d", pReq->mountName, pCfg->path,
×
663
             pInfo->config.syncCfg.replicaNum);
664
      TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_REPLICA);
×
665
    } else if (pInfo->config.vgId != pCfg->vgId) {
×
666
      dError("mount:%s, vnode path:%s, vgId:%d not match:%d", pReq->mountName, pCfg->path, pInfo->config.vgId,
×
667
             pCfg->vgId);
668
      TAOS_CHECK_EXIT(TSDB_CODE_FILE_CORRUPTED);
×
669
    } else if (pInfo->config.tdbEncryptAlgorithm || pInfo->config.tsdbCfg.encryptAlgorithm ||
×
670
               pInfo->config.walCfg.encryptAlgorithm) {
×
671
      dError("mount:%s, vnode path:%s, invalid encrypt algorithm, tdb:%d wal:%d tsdb:%d", pReq->mountName, pCfg->path,
×
672
             pInfo->config.tdbEncryptAlgorithm, pInfo->config.walCfg.encryptAlgorithm,
673
             pInfo->config.tsdbCfg.encryptAlgorithm);
674
      TAOS_CHECK_EXIT(TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG);
×
675
    }
676
    SMountDbVgId dbVgId = {.dbId = pInfo->config.dbId, .vgId = pInfo->config.vgId, .diskPrimary = pCfg->diskPrimary};
×
677
    TSDB_CHECK_NULL(taosArrayPush(pDiskPrimarys, &dbVgId), code, lino, _exit, terrno);
×
678
  }
679
  if (nVgDropped > 0) {
×
680
    dInfo("mount:%s, %d vnodes are dropped", pReq->mountName, nVgDropped);
×
681
    int32_t nVgToDrop = taosArrayGetSize(pVgCfgs) - nVgDropped;
×
682
    if (nVgToDrop > 0) taosArrayRemoveBatch(pVgCfgs, nVgToDrop - 1, nVgToDrop, NULL);
×
683
  }
684
  int32_t nVgCfg = taosArrayGetSize(pVgCfgs);
×
685
  int32_t nDiskPrimary = taosArrayGetSize(pDiskPrimarys);
×
686
  if (nVgCfg != nDiskPrimary) {
×
687
    dError("mount:%s, nVgCfg:%d not match nDiskPrimary:%d", pReq->mountName, nVgCfg, nDiskPrimary);
×
688
    TAOS_CHECK_EXIT(TSDB_CODE_APP_ERROR);
×
689
  }
690
  if (nVgCfg > 1) {
×
691
    taosArraySort(pVgCfgs, compareVnodeInfo);
×
692
    taosArraySort(pDiskPrimarys, compareVgDiskPrimary);
×
693
  }
694

695
  int64_t clusterId = pMountInfo->clusterId;
×
696
  int64_t dbId = 0, vgId = 0, nDb = 0;
×
697
  for (int32_t i = 0; i < nVgCfg; ++i) {
×
698
    SVnodeInfo *pInfo = TARRAY_GET_ELEM(pVgCfgs, i);
×
699
    if (clusterId != pInfo->config.syncCfg.nodeInfo->clusterId) {
×
700
      dError("mount:%s, clusterId:%" PRId64 " not match:%" PRId64, pReq->mountName, clusterId,
×
701
             pInfo->config.syncCfg.nodeInfo->clusterId);
702
      TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_CLUSTER_ID);
×
703
    }
704
    if (dbId != pInfo->config.dbId) {
×
705
      dbId = pInfo->config.dbId;
×
706
      ++nDb;
×
707
    }
708
    if (vgId == pInfo->config.vgId) {
×
709
      TAOS_CHECK_EXIT(TSDB_CODE_FILE_CORRUPTED);
×
710
    } else {
711
      vgId = pInfo->config.vgId;
×
712
    }
713
  }
714

715
  if (nDb > 0) {
×
716
    TSDB_CHECK_NULL((pDbInfos = taosArrayInit_s(sizeof(SMountDbInfo), nDb)), code, lino, _exit, terrno);
×
717
    int32_t dbIdx = -1;
×
718
    for (int32_t i = 0; i < nVgCfg; ++i) {
×
719
      SVnodeInfo   *pVgCfg = TARRAY_GET_ELEM(pVgCfgs, i);
×
720
      SMountDbVgId *pDiskPrimary = TARRAY_GET_ELEM(pDiskPrimarys, i);
×
721
      SMountDbInfo *pDbInfo = NULL;
×
722
      if (i == 0 || ((SMountDbInfo *)TARRAY_GET_ELEM(pDbInfos, dbIdx))->dbId != pVgCfg->config.dbId) {
×
723
        pDbInfo = TARRAY_GET_ELEM(pDbInfos, ++dbIdx);
×
724
        pDbInfo->dbId = pVgCfg->config.dbId;
×
725
        snprintf(pDbInfo->dbName, sizeof(pDbInfo->dbName), "%s", pVgCfg->config.dbname);
×
726
        TSDB_CHECK_NULL((pDbInfo->pVgs = taosArrayInit(nVgCfg / nDb, sizeof(SMountVgInfo))), code, lino, _exit, terrno);
×
727
      } else {
728
        pDbInfo = TARRAY_GET_ELEM(pDbInfos, dbIdx);
×
729
      }
730
      SMountVgInfo vgInfo = {
×
731
          .diskPrimary = pDiskPrimary->diskPrimary,
×
732
          .vgId = pVgCfg->config.vgId,
×
733
          .dbId = pVgCfg->config.dbId,
×
734
          .cacheLastSize = pVgCfg->config.cacheLastSize,
×
735
          .szPage = pVgCfg->config.szPage,
×
736
          .szCache = pVgCfg->config.szCache,
×
737
          .szBuf = pVgCfg->config.szBuf,
×
738
          .cacheLast = pVgCfg->config.cacheLast,
×
739
          .standby = pVgCfg->config.standby,
×
740
          .hashMethod = pVgCfg->config.hashMethod,
×
741
          .hashBegin = pVgCfg->config.hashBegin,
×
742
          .hashEnd = pVgCfg->config.hashEnd,
×
743
          .hashPrefix = pVgCfg->config.hashPrefix,
×
744
          .hashSuffix = pVgCfg->config.hashSuffix,
×
745
          .sttTrigger = pVgCfg->config.sttTrigger,
×
746
          .replications = pVgCfg->config.syncCfg.replicaNum,
×
747
          .precision = pVgCfg->config.tsdbCfg.precision,
×
748
          .compression = pVgCfg->config.tsdbCfg.compression,
×
749
          .slLevel = pVgCfg->config.tsdbCfg.slLevel,
×
750
          .daysPerFile = pVgCfg->config.tsdbCfg.days,
×
751
          .keep0 = pVgCfg->config.tsdbCfg.keep0,
×
752
          .keep1 = pVgCfg->config.tsdbCfg.keep1,
×
753
          .keep2 = pVgCfg->config.tsdbCfg.keep2,
×
754
          .keepTimeOffset = pVgCfg->config.tsdbCfg.keepTimeOffset,
×
755
          .minRows = pVgCfg->config.tsdbCfg.minRows,
×
756
          .maxRows = pVgCfg->config.tsdbCfg.maxRows,
×
757
          .tsdbPageSize = pVgCfg->config.tsdbPageSize / 1024,
×
758
          .ssChunkSize = pVgCfg->config.ssChunkSize,
×
759
          .ssKeepLocal = pVgCfg->config.ssKeepLocal,
×
760
          .ssCompact = pVgCfg->config.ssCompact,
×
761
          .walFsyncPeriod = pVgCfg->config.walCfg.fsyncPeriod,
×
762
          .walRetentionPeriod = pVgCfg->config.walCfg.retentionPeriod,
×
763
          .walRollPeriod = pVgCfg->config.walCfg.rollPeriod,
×
764
          .walRetentionSize = pVgCfg->config.walCfg.retentionSize,
×
765
          .walSegSize = pVgCfg->config.walCfg.segSize,
×
766
          .walLevel = pVgCfg->config.walCfg.level,
×
767
          .encryptAlgorithm = pVgCfg->config.walCfg.encryptAlgorithm,
×
768
          .committed = pVgCfg->state.committed,
×
769
          .commitID = pVgCfg->state.commitID,
×
770
          .commitTerm = pVgCfg->state.commitTerm,
×
771
          .numOfSTables = pVgCfg->config.vndStats.numOfSTables,
×
772
          .numOfCTables = pVgCfg->config.vndStats.numOfCTables,
×
773
          .numOfNTables = pVgCfg->config.vndStats.numOfNTables,
×
774
      };
775
      TSDB_CHECK_NULL(taosArrayPush(pDbInfo->pVgs, &vgInfo), code, lino, _exit, terrno);
×
776
    }
777
  }
778

779
  pMountInfo->pDbs = pDbInfos;
×
780

781
_exit:
×
782
  if (code != 0) {
×
783
    dError("mount:%s, failed to retrieve mount vnode at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
784
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
785
  }
786
  taosArrayDestroy(pDiskPrimarys);
×
787
  taosArrayDestroy(pVgCfgs);
×
788
  taosMemoryFreeClear(pCfgs);
×
789
  TAOS_RETURN(code);
×
790
}
791

792
/**
793
 *   Retrieve the stables from vnode meta.
794
 */
795
static int32_t vmRetrieveMountStbs(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
×
796
  int32_t code = 0, lino = 0;
×
797
  char    path[TSDB_MOUNT_FPATH_LEN] = {0};
×
798
  int32_t nDb = taosArrayGetSize(pMountInfo->pDbs);
×
799
  SArray *suidList = NULL;
×
800
  SArray *pCols = NULL;
×
801
  SArray *pTags = NULL;
×
802
  SArray *pColExts = NULL;
×
803
  SArray *pTagExts = NULL;
×
804

805
  snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
×
806
  for (int32_t i = 0; i < nDb; ++i) {
×
807
    SMountDbInfo *pDbInfo = TARRAY_GET_ELEM(pMountInfo->pDbs, i);
×
808
    int32_t       nVg = taosArrayGetSize(pDbInfo->pVgs);
×
809
    for (int32_t j = 0; j < nVg; ++j) {
×
810
      SMountVgInfo *pVgInfo = TARRAY_GET_ELEM(pDbInfo->pVgs, j);
×
811
      SVnode        vnode = {
×
812
                 .config.vgId = pVgInfo->vgId,
×
813
                 .config.dbId = pVgInfo->dbId,
×
814
                 .config.cacheLastSize = pVgInfo->cacheLastSize,
×
815
                 .config.szPage = pVgInfo->szPage,
×
816
                 .config.szCache = pVgInfo->szCache,
×
817
                 .config.szBuf = pVgInfo->szBuf,
×
818
                 .config.cacheLast = pVgInfo->cacheLast,
×
819
                 .config.standby = pVgInfo->standby,
×
820
                 .config.hashMethod = pVgInfo->hashMethod,
×
821
                 .config.hashBegin = pVgInfo->hashBegin,
×
822
                 .config.hashEnd = pVgInfo->hashEnd,
×
823
                 .config.hashPrefix = pVgInfo->hashPrefix,
×
824
                 .config.hashSuffix = pVgInfo->hashSuffix,
×
825
                 .config.sttTrigger = pVgInfo->sttTrigger,
×
826
                 .config.syncCfg.replicaNum = pVgInfo->replications,
×
827
                 .config.tsdbCfg.precision = pVgInfo->precision,
×
828
                 .config.tsdbCfg.compression = pVgInfo->compression,
×
829
                 .config.tsdbCfg.slLevel = pVgInfo->slLevel,
×
830
                 .config.tsdbCfg.days = pVgInfo->daysPerFile,
×
831
                 .config.tsdbCfg.keep0 = pVgInfo->keep0,
×
832
                 .config.tsdbCfg.keep1 = pVgInfo->keep1,
×
833
                 .config.tsdbCfg.keep2 = pVgInfo->keep2,
×
834
                 .config.tsdbCfg.keepTimeOffset = pVgInfo->keepTimeOffset,
×
835
                 .config.tsdbCfg.minRows = pVgInfo->minRows,
×
836
                 .config.tsdbCfg.maxRows = pVgInfo->maxRows,
×
837
                 .config.tsdbPageSize = pVgInfo->tsdbPageSize,
×
838
                 .config.ssChunkSize = pVgInfo->ssChunkSize,
×
839
                 .config.ssKeepLocal = pVgInfo->ssKeepLocal,
×
840
                 .config.ssCompact = pVgInfo->ssCompact,
×
841
                 .config.walCfg.fsyncPeriod = pVgInfo->walFsyncPeriod,
×
842
                 .config.walCfg.retentionPeriod = pVgInfo->walRetentionPeriod,
×
843
                 .config.walCfg.rollPeriod = pVgInfo->walRollPeriod,
×
844
                 .config.walCfg.retentionSize = pVgInfo->walRetentionSize,
×
845
                 .config.walCfg.segSize = pVgInfo->walSegSize,
×
846
                 .config.walCfg.level = pVgInfo->walLevel,
×
847
                 .config.walCfg.encryptAlgorithm = pVgInfo->encryptAlgorithm,
×
848
                 .diskPrimary = pVgInfo->diskPrimary,
×
849
      };
850
      void *vnodePath = taosArrayGet(pMountInfo->pDisks[0], pVgInfo->diskPrimary);
×
851
      snprintf(path, sizeof(path), "%s%s%s%svnode%d", *(char **)vnodePath, TD_DIRSEP, dmNodeName(VNODE), TD_DIRSEP,
×
852
               pVgInfo->vgId);
853
      vnode.path = path;
×
854

855
      int32_t rollback = vnodeShouldRollback(&vnode);
×
856
      if ((code = metaOpen(&vnode, &vnode.pMeta, rollback)) != 0) {
×
857
        dError("mount:%s, failed to retrieve stbs of vnode:%d for db:%" PRId64 " on dnode:%d since %s, path:%s",
×
858
               pReq->mountName, pVgInfo->vgId, pVgInfo->dbId, pReq->dnodeId, tstrerror(code), path);
859
        TAOS_CHECK_EXIT(code);
×
860
      } else {
861
        dInfo("mount:%s, success to retrieve stbs of vnode:%d for db:%" PRId64 " on dnode:%d, path:%s", pReq->mountName,
×
862
              pVgInfo->vgId, pVgInfo->dbId, pReq->dnodeId, path);
863

864
        SMetaReader mr = {0};
×
865
        tb_uid_t    suid = 0;
×
866
        SMeta      *pMeta = vnode.pMeta;
×
867

868
        metaReaderDoInit(&mr, pMeta, META_READER_LOCK);
×
869
        if (!suidList && !(suidList = taosArrayInit(1, sizeof(tb_uid_t)))) {
×
870
          TSDB_CHECK_CODE(terrno, lino, _exit0);
×
871
        }
872
        taosArrayClear(suidList);
×
873
        TSDB_CHECK_CODE(vnodeGetStbIdList(&vnode, 0, suidList), lino, _exit0);
×
874
        dInfo("mount:%s, vnode:%d, db:%" PRId64 ", stbs num:%d on dnode:%d", pReq->mountName, pVgInfo->vgId,
×
875
              pVgInfo->dbId, (int32_t)taosArrayGetSize(suidList), pReq->dnodeId);
876
        int32_t nStbs = taosArrayGetSize(suidList);
×
877
        if (!pDbInfo->pStbs && !(pDbInfo->pStbs = taosArrayInit(nStbs, sizeof(void *)))) {
×
878
          TSDB_CHECK_CODE(terrno, lino, _exit0);
×
879
        }
880
        for (int32_t i = 0; i < nStbs; ++i) {
×
881
          suid = *(tb_uid_t *)taosArrayGet(suidList, i);
×
882
          dInfo("mount:%s, vnode:%d, db:%" PRId64 ", stb suid:%" PRIu64 " on dnode:%d", pReq->mountName, pVgInfo->vgId,
×
883
                pVgInfo->dbId, suid, pReq->dnodeId);
884
          if ((code = metaReaderGetTableEntryByUidCache(&mr, suid)) < 0) {
×
885
            TSDB_CHECK_CODE(code, lino, _exit0);
×
886
          }
887
          if (mr.me.uid != suid || mr.me.type != TSDB_SUPER_TABLE ||
×
888
              mr.me.colCmpr.nCols != mr.me.stbEntry.schemaRow.nCols) {
×
889
            dError("mount:%s, vnode:%d, db:%" PRId64 ", stb info not match, suid:%" PRIu64 " expected:%" PRIu64
×
890
                   ", type:%" PRIi8 " expected:%d, nCmprCols:%d nCols:%d on dnode:%d",
891
                   pReq->mountName, pVgInfo->vgId, pVgInfo->dbId, mr.me.uid, suid, mr.me.type, TSDB_SUPER_TABLE,
892
                   mr.me.colCmpr.nCols, mr.me.stbEntry.schemaRow.nCols, pReq->dnodeId);
893
            TSDB_CHECK_CODE(TSDB_CODE_FILE_CORRUPTED, lino, _exit0);
×
894
          }
895
          SMountStbInfo stbInfo = {
×
896
              .req.source = TD_REQ_FROM_APP,
897
              .req.suid = suid,
898
              .req.colVer = mr.me.stbEntry.schemaRow.version,
×
899
              .req.tagVer = mr.me.stbEntry.schemaTag.version,
×
900
              .req.numOfColumns = mr.me.stbEntry.schemaRow.nCols,
×
901
              .req.numOfTags = mr.me.stbEntry.schemaTag.nCols,
×
902
          };
903
          snprintf(stbInfo.req.name, sizeof(stbInfo.req.name), "%s", mr.me.name);
×
904
          if (!pCols && !(pCols = taosArrayInit(stbInfo.req.numOfColumns, sizeof(SFieldWithOptions)))) {
×
905
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
906
          }
907
          if (!pTags && !(pTags = taosArrayInit(stbInfo.req.numOfTags, sizeof(SField)))) {
×
908
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
909
          }
910

911
          if (!pColExts && !(pColExts = taosArrayInit(stbInfo.req.numOfColumns, sizeof(col_id_t)))) {
×
912
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
913
          }
914
          if (!pTagExts && !(pTagExts = taosArrayInit(stbInfo.req.numOfTags, sizeof(col_id_t)))) {
×
915
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
916
          }
917
          taosArrayClear(pCols);
×
918
          taosArrayClear(pTags);
×
919
          taosArrayClear(pColExts);
×
920
          taosArrayClear(pTagExts);
×
921
          stbInfo.req.pColumns = pCols;
×
922
          stbInfo.req.pTags = pTags;
×
923
          stbInfo.pColExts = pColExts;
×
924
          stbInfo.pTagExts = pTagExts;
×
925

926
          for (int32_t c = 0; c < stbInfo.req.numOfColumns; ++c) {
×
927
            SSchema          *pSchema = mr.me.stbEntry.schemaRow.pSchema + c;
×
928
            SColCmpr         *pColComp = mr.me.colCmpr.pColCmpr + c;
×
929
            SFieldWithOptions col = {
×
930
                .type = pSchema->type,
×
931
                .flags = pSchema->flags,
×
932
                .bytes = pSchema->bytes,
×
933
                .compress = pColComp->alg,
×
934
            };
935
            (void)snprintf(col.name, sizeof(col.name), "%s", pSchema->name);
×
936
            if (pSchema->colId != pColComp->id) {
×
937
              TSDB_CHECK_CODE(TSDB_CODE_FILE_CORRUPTED, lino, _exit0);
×
938
            }
939
            if (mr.me.pExtSchemas) {
×
940
              col.typeMod = (mr.me.pExtSchemas + c)->typeMod;
×
941
            }
942
            TSDB_CHECK_NULL(taosArrayPush(pCols, &col), code, lino, _exit0, terrno);
×
943
            TSDB_CHECK_NULL(taosArrayPush(pColExts, &pSchema->colId), code, lino, _exit0, terrno);
×
944
          }
945
          for (int32_t t = 0; t < stbInfo.req.numOfTags; ++t) {
×
946
            SSchema *pSchema = mr.me.stbEntry.schemaTag.pSchema + t;
×
947
            SField   tag = {
×
948
                  .type = pSchema->type,
×
949
                  .flags = pSchema->flags,
×
950
                  .bytes = pSchema->bytes,
×
951
            };
952
            (void)snprintf(tag.name, sizeof(tag.name), "%s", pSchema->name);
×
953
            TSDB_CHECK_NULL(taosArrayPush(pTags, &tag), code, lino, _exit0, terrno);
×
954
            TSDB_CHECK_NULL(taosArrayPush(pTagExts, &pSchema->colId), code, lino, _exit0, terrno);
×
955
          }
956
          tDecoderClear(&mr.coder);
×
957

958
          // serialize the SMountStbInfo
959
          int32_t firstPartLen = 0;
×
960
          int32_t msgLen = tSerializeSMountStbInfo(NULL, 0, &firstPartLen, &stbInfo);
×
961
          if (msgLen <= 0) {
×
962
            TSDB_CHECK_CODE(msgLen < 0 ? msgLen : TSDB_CODE_INTERNAL_ERROR, lino, _exit0);
×
963
          }
964
          void *pBuf = taosMemoryMalloc((sizeof(int32_t) << 1) + msgLen);  // totalLen(4)|1stPartLen(4)|1stPart|2ndPart
×
965
          if (!pBuf) TSDB_CHECK_CODE(TSDB_CODE_OUT_OF_MEMORY, lino, _exit0);
×
966
          *(int32_t *)pBuf = (sizeof(int32_t) << 1) + msgLen;
×
967
          *(int32_t *)POINTER_SHIFT(pBuf, sizeof(int32_t)) = firstPartLen;
×
968
          if (tSerializeSMountStbInfo(POINTER_SHIFT(pBuf, (sizeof(int32_t) << 1)), msgLen, NULL, &stbInfo) <= 0) {
×
969
            taosMemoryFree(pBuf);
×
970
            TSDB_CHECK_CODE(msgLen < 0 ? msgLen : TSDB_CODE_INTERNAL_ERROR, lino, _exit0);
×
971
          }
972
          if (!taosArrayPush(pDbInfo->pStbs, &pBuf)) {
×
973
            taosMemoryFree(pBuf);
×
974
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
975
          }
976
        }
977
      _exit0:
×
978
        metaReaderClear(&mr);
×
979
        metaClose(&vnode.pMeta);
×
980
        TAOS_CHECK_EXIT(code);
×
981
      }
982
      break;  // retrieve stbs from one vnode is enough
×
983
    }
984
  }
985
_exit:
×
986
  if (code != 0) {
×
987
    dError("mount:%s, failed to retrieve mount stbs at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
988
           pReq->dnodeId, tstrerror(code), path);
989
  }
990
  taosArrayDestroy(suidList);
×
991
  taosArrayDestroy(pCols);
×
992
  taosArrayDestroy(pTags);
×
993
  taosArrayDestroy(pColExts);
×
994
  taosArrayDestroy(pTagExts);
×
995
  TAOS_RETURN(code);
×
996
}
997

998
int32_t vmMountCheckRunning(const char *mountName, const char *mountPath, TdFilePtr *pFile, int32_t retryLimit) {
×
999
  int32_t code = 0, lino = 0;
×
1000
  int32_t retryTimes = 0;
×
1001
  char    filepath[PATH_MAX] = {0};
×
1002
  (void)snprintf(filepath, sizeof(filepath), "%s%s.running", mountPath, TD_DIRSEP);
×
1003
  TSDB_CHECK_NULL((*pFile = taosOpenFile(filepath, TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_CLOEXEC)), code, lino, _exit,
×
1004
                  terrno);
1005
  int32_t ret = 0;
×
1006
  do {
1007
    ret = taosLockFile(*pFile);
×
1008
    if (ret == 0) break;
×
1009
    taosMsleep(1000);
×
1010
    ++retryTimes;
×
1011
    dError("mount:%s, failed to lock file:%s since %s, retryTimes:%d", mountName, filepath, tstrerror(ret),
×
1012
           retryTimes);
1013
  } while (retryTimes < retryLimit);
×
1014
  TAOS_CHECK_EXIT(ret);
×
1015
_exit:
×
1016
  if (code != 0) {
×
1017
    taosCloseFile(pFile);
×
1018
    *pFile = NULL;
×
1019
    dError("mount:%s, failed to check running at line %d since %s, path:%s", mountName, lino, tstrerror(code),
×
1020
           filepath);
1021
  }
1022
  TAOS_RETURN(code);
×
1023
}
1024

1025
static int32_t vmRetrieveMountPreCheck(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
×
1026
  int32_t code = 0, lino = 0;
×
1027
  char    path[TSDB_MOUNT_FPATH_LEN] = {0};
×
1028
  TSDB_CHECK_CONDITION(taosCheckAccessFile(pReq->mountPath, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
×
1029
  TAOS_CHECK_EXIT(vmMountCheckRunning(pReq->mountName, pReq->mountPath, &pMountInfo->pFile, 3));
×
1030
  (void)snprintf(path, sizeof(path), "%s%s%s%sdnode.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP);
×
1031
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
×
1032
  (void)snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(MNODE));
×
1033
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
×
1034
  (void)snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
×
1035
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
×
1036
  (void)snprintf(path, sizeof(path), "%s%s%s%sconfig%slocal.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP,
×
1037
           TD_DIRSEP);
1038
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
×
1039
_exit:
×
1040
  if (code != 0) {
×
1041
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
1042
           pReq->dnodeId, tstrerror(code), path);
1043
  }
1044
  TAOS_RETURN(code);
×
1045
}
1046

1047
static int32_t vmRetrieveMountPathImpl(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, SRetrieveMountPathReq *pReq,
×
1048
                                       SMountInfo *pMountInfo) {
1049
  int32_t code = 0, lino = 0;
×
1050
  pMountInfo->dnodeId = pReq->dnodeId;
×
1051
  pMountInfo->mountUid = pReq->mountUid;
×
1052
  (void)tsnprintf(pMountInfo->mountName, sizeof(pMountInfo->mountName), "%s", pReq->mountName);
×
1053
  (void)tsnprintf(pMountInfo->mountPath, sizeof(pMountInfo->mountPath), "%s", pReq->mountPath);
×
1054
  pMountInfo->ignoreExist = pReq->ignoreExist;
×
1055
  pMountInfo->valLen = pReq->valLen;
×
1056
  pMountInfo->pVal = pReq->pVal;
×
1057
  TAOS_CHECK_EXIT(vmRetrieveMountPreCheck(pMgmt, pReq, pMountInfo));
×
1058
  TAOS_CHECK_EXIT(vmRetrieveMountDnode(pMgmt, pReq, pMountInfo));
×
1059
  TAOS_CHECK_EXIT(vmRetrieveMountVnodes(pMgmt, pReq, pMountInfo));
×
1060
  TAOS_CHECK_EXIT(vmRetrieveMountStbs(pMgmt, pReq, pMountInfo));
×
1061
_exit:
×
1062
  if (code != 0) {
×
1063
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
1064
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
1065
  }
1066
  TAOS_RETURN(code);
×
1067
}
1068

1069
int32_t vmProcessRetrieveMountPathReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
1070
  int32_t               code = 0, lino = 0;
×
1071
  int32_t               rspCode = 0;
×
1072
  SVnodeMgmt            vndMgmt = {0};
×
1073
  SMountInfo            mountInfo = {0};
×
1074
  void                 *pBuf = NULL;
×
1075
  int32_t               bufLen = 0;
×
1076
  SRetrieveMountPathReq req = {0};
×
1077

1078
  vndMgmt = *pMgmt;
×
1079
  vndMgmt.path = NULL;
×
1080
  TAOS_CHECK_GOTO(tDeserializeSRetrieveMountPathReq(pMsg->pCont, pMsg->contLen, &req), &lino, _end);
×
1081
  dInfo("mount:%s, start to retrieve path:%s", req.mountName, req.mountPath);
×
1082
  TAOS_CHECK_GOTO(vmRetrieveMountPathImpl(&vndMgmt, pMsg, &req, &mountInfo), &lino, _end);
×
1083
_end:
×
1084
  TSDB_CHECK_CONDITION((bufLen = tSerializeSMountInfo(NULL, 0, &mountInfo)) >= 0, rspCode, lino, _exit, bufLen);
×
1085
  TSDB_CHECK_CONDITION((pBuf = rpcMallocCont(bufLen)), rspCode, lino, _exit, terrno);
×
1086
  TSDB_CHECK_CONDITION((bufLen = tSerializeSMountInfo(pBuf, bufLen, &mountInfo)) >= 0, rspCode, lino, _exit, bufLen);
×
1087
  pMsg->info.rsp = pBuf;
×
1088
  pMsg->info.rspLen = bufLen;
×
1089
_exit:
×
1090
  if (rspCode != 0) {
×
1091
    // corner case: if occurs, the client will not receive the response, and the client should be killed manually
1092
    dError("mount:%s, failed to retrieve mount at line %d since %s, dnode:%d, path:%s", req.mountName, lino,
×
1093
           tstrerror(rspCode), req.dnodeId, req.mountPath);
1094
    rpcFreeCont(pBuf);
×
1095
    code = rspCode;
×
1096
  } else if (code != 0) {
×
1097
    // the client would receive the response with error msg
1098
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", req.mountName, lino,
×
1099
           req.dnodeId, tstrerror(code), req.mountPath);
1100
  } else {
1101
    int32_t nVgs = 0;
×
1102
    int32_t nDbs = taosArrayGetSize(mountInfo.pDbs);
×
1103
    for (int32_t i = 0; i < nDbs; ++i) {
×
1104
      SMountDbInfo *pDb = TARRAY_GET_ELEM(mountInfo.pDbs, i);
×
1105
      nVgs += taosArrayGetSize(pDb->pVgs);
×
1106
    }
1107
    dInfo("mount:%s, success to retrieve mount, nDbs:%d, nVgs:%d, path:%s", req.mountName, nDbs, nVgs, req.mountPath);
×
1108
  }
1109
  taosMemFreeClear(vndMgmt.path);
×
1110
  tFreeMountInfo(&mountInfo, false);
×
1111
  TAOS_RETURN(code);
×
1112
}
1113

1114
static int32_t vmMountVnode(SVnodeMgmt *pMgmt, const char *path, SVnodeCfg *pCfg, int32_t diskPrimary,
×
1115
                            SMountVnodeReq *req, STfs *pMountTfs) {
1116
  int32_t    code = 0;
×
1117
  SVnodeInfo info = {0};
×
1118
  char       hostDir[TSDB_FILENAME_LEN] = {0};
×
1119
  char       mountDir[TSDB_FILENAME_LEN] = {0};
×
1120
  char       mountVnode[32] = {0};
×
1121

1122
  if ((code = vnodeCheckCfg(pCfg)) < 0) {
×
1123
    vError("vgId:%d, mount:%s, failed to mount vnode since:%s", pCfg->vgId, req->mountName, tstrerror(code));
×
1124
    return code;
×
1125
  }
1126

1127
  vnodeGetPrimaryDir(path, 0, pMgmt->pTfs, hostDir, TSDB_FILENAME_LEN);
×
1128
  if ((code = taosMkDir(hostDir))) {
×
1129
    vError("vgId:%d, mount:%s, failed to prepare vnode dir since %s, host path: %s", pCfg->vgId, req->mountName,
×
1130
           tstrerror(code), hostDir);
1131
    return code;
×
1132
  }
1133

1134
  info.config = *pCfg;  // copy the config
×
1135
  info.state.committed = req->committed;
×
1136
  info.state.commitID = req->commitID;
×
1137
  info.state.commitTerm = req->commitTerm;
×
1138
  info.state.applied = req->committed;
×
1139
  info.state.applyTerm = req->commitTerm;
×
1140
  info.config.vndStats.numOfSTables = req->numOfSTables;
×
1141
  info.config.vndStats.numOfCTables = req->numOfCTables;
×
1142
  info.config.vndStats.numOfNTables = req->numOfNTables;
×
1143

1144
  SVnodeInfo oldInfo = {0};
×
1145
  oldInfo.config = vnodeCfgDefault;
×
1146
  if (vnodeLoadInfo(hostDir, &oldInfo) == 0) {
×
1147
    if (oldInfo.config.dbId != info.config.dbId) {
×
1148
      code = TSDB_CODE_VND_ALREADY_EXIST_BUT_NOT_MATCH;
×
1149
      vError("vgId:%d, mount:%s, vnode config info already exists at %s. oldDbId:%" PRId64 "(%s) at cluster:%" PRId64
×
1150
             ", newDbId:%" PRId64 "(%s) at cluser:%" PRId64 ", code:%s",
1151
             oldInfo.config.vgId, req->mountName, hostDir, oldInfo.config.dbId, oldInfo.config.dbname,
1152
             oldInfo.config.syncCfg.nodeInfo[oldInfo.config.syncCfg.myIndex].clusterId, info.config.dbId,
1153
             info.config.dbname, info.config.syncCfg.nodeInfo[info.config.syncCfg.myIndex].clusterId, tstrerror(code));
1154

1155
    } else {
1156
      vWarn("vgId:%d, mount:%s, vnode config info already exists at %s.", oldInfo.config.vgId, req->mountName, hostDir);
×
1157
    }
1158
    return code;
×
1159
  }
1160

1161
  char hostSubDir[TSDB_FILENAME_LEN] = {0};
×
1162
  char mountSubDir[TSDB_FILENAME_LEN] = {0};
×
1163
  (void)snprintf(mountVnode, sizeof(mountVnode), "vnode%svnode%d", TD_DIRSEP, req->mountVgId);
×
1164
  vnodeGetPrimaryDir(mountVnode, diskPrimary, pMountTfs, mountDir, TSDB_FILENAME_LEN);
×
1165
  static const char *vndSubDirs[] = {"meta", "sync", "tq", "tsdb", "wal"};
1166
  for (int32_t i = 0; i < tListLen(vndSubDirs); ++i) {
×
1167
    (void)snprintf(hostSubDir, sizeof(hostSubDir), "%s%s%s", hostDir, TD_DIRSEP, vndSubDirs[i]);
×
1168
    (void)snprintf(mountSubDir, sizeof(mountSubDir), "%s%s%s", mountDir, TD_DIRSEP, vndSubDirs[i]);
×
1169
    if ((code = taosSymLink(mountSubDir, hostSubDir)) != 0) {
×
1170
      vError("vgId:%d, mount:%s, failed to create vnode symlink %s -> %s since %s", info.config.vgId, req->mountName,
×
1171
             mountSubDir, hostSubDir, tstrerror(code));
1172
      return code;
×
1173
    }
1174
  }
1175
  vInfo("vgId:%d, mount:save vnode config while create", info.config.vgId);
×
1176
  if ((code = vnodeSaveInfo(hostDir, &info)) < 0 || (code = vnodeCommitInfo(hostDir)) < 0) {
×
1177
    vError("vgId:%d, mount:%s, failed to save vnode config since %s, mount path: %s", pCfg ? pCfg->vgId : 0,
×
1178
           req->mountName, tstrerror(code), hostDir);
1179
    return code;
×
1180
  }
1181
  vInfo("vgId:%d, mount:%s, vnode is mounted from %s to %s", info.config.vgId, req->mountName, mountDir, hostDir);
×
1182
  return 0;
×
1183
}
1184

1185
int32_t vmProcessMountVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
1186
  int32_t          code = 0, lino = 0;
×
1187
  SMountVnodeReq   req = {0};
×
1188
  SCreateVnodeReq *pCreateReq = &req.createReq;
×
1189
  SVnodeCfg        vnodeCfg = {0};
×
1190
  SWrapperCfg      wrapperCfg = {0};
×
1191
  SVnode          *pImpl = NULL;
×
1192
  STfs            *pMountTfs = NULL;
×
1193
  char             path[TSDB_FILENAME_LEN] = {0};
×
1194
  bool             releaseTfs = false;
×
1195

1196
  if (tDeserializeSMountVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
×
1197
    dError("vgId:%d, failed to mount vnode since deserialize request error", pCreateReq->vgId);
×
1198
    return TSDB_CODE_INVALID_MSG;
×
1199
  }
1200

1201
  if (pCreateReq->learnerReplica == 0) {
×
1202
    pCreateReq->learnerSelfIndex = -1;
×
1203
  }
1204
  for (int32_t i = 0; i < pCreateReq->replica; ++i) {
×
1205
    dInfo("mount:%s, vgId:%d, replica:%d ep:%s:%u dnode:%d", req.mountName, pCreateReq->vgId, i,
×
1206
          pCreateReq->replicas[i].fqdn, pCreateReq->replicas[i].port, pCreateReq->replicas[i].id);
1207
  }
1208
  for (int32_t i = 0; i < pCreateReq->learnerReplica; ++i) {
×
1209
    dInfo("mount:%s, vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.mountName, pCreateReq->vgId, i,
×
1210
          pCreateReq->learnerReplicas[i].fqdn, pCreateReq->learnerReplicas[i].port, pCreateReq->replicas[i].id);
1211
  }
1212

1213
  SReplica *pReplica = NULL;
×
1214
  if (pCreateReq->selfIndex != -1) {
×
1215
    pReplica = &pCreateReq->replicas[pCreateReq->selfIndex];
×
1216
  } else {
1217
    pReplica = &pCreateReq->learnerReplicas[pCreateReq->learnerSelfIndex];
×
1218
  }
1219
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
×
1220
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
×
1221
    (void)tFreeSMountVnodeReq(&req);
×
1222
    code = TSDB_CODE_INVALID_MSG;
×
1223
    dError("mount:%s, vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode, reason:%s", req.mountName,
×
1224
           pCreateReq->vgId, pReplica->id, pReplica->fqdn, pReplica->port, tstrerror(code));
1225
    return code;
×
1226
  }
1227
  vmGenerateVnodeCfg(pCreateReq, &vnodeCfg);
×
1228
  vnodeCfg.mountVgId = req.mountVgId;
×
1229
  vmGenerateWrapperCfg(pMgmt, pCreateReq, &wrapperCfg);
×
1230
  wrapperCfg.mountId = req.mountId;
×
1231

1232
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, pCreateReq->vgId, false);
×
1233
  if (pVnode != NULL && (pCreateReq->replica == 1 || !pVnode->failed)) {
×
1234
    dError("mount:%s, vgId:%d, already exist", req.mountName, pCreateReq->vgId);
×
1235
    (void)tFreeSMountVnodeReq(&req);
×
1236
    vmReleaseVnode(pMgmt, pVnode);
×
1237
    code = TSDB_CODE_VND_ALREADY_EXIST;
×
1238
    return 0;
×
1239
  }
1240
  vmReleaseVnode(pMgmt, pVnode);
×
1241

1242
  wrapperCfg.diskPrimary = req.diskPrimary;
×
1243
  (void)snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
×
1244
  TAOS_CHECK_EXIT(vmAcquireMountTfs(pMgmt, req.mountId, req.mountName, req.mountPath, &pMountTfs));
×
1245
  releaseTfs = true;
×
1246

1247
  TAOS_CHECK_EXIT(vmMountVnode(pMgmt, path, &vnodeCfg, wrapperCfg.diskPrimary, &req, pMountTfs));
×
1248
  if (!(pImpl = vnodeOpen(path, 0, pMgmt->pTfs, pMountTfs, pMgmt->msgCb, true))) {
×
1249
    TAOS_CHECK_EXIT(terrno != 0 ? terrno : -1);
×
1250
  }
1251
  if ((code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl)) != 0) {
×
1252
    TAOS_CHECK_EXIT(terrno != 0 ? terrno : code);
×
1253
  }
1254
  TAOS_CHECK_EXIT(vnodeStart(pImpl));
×
1255
  TAOS_CHECK_EXIT(vmWriteVnodeListToFile(pMgmt));
×
1256
  TAOS_CHECK_EXIT(vmWriteMountListToFile(pMgmt));
×
1257
_exit:
×
1258
  vmCleanPrimaryDisk(pMgmt, pCreateReq->vgId);
×
1259
  if (code != 0) {
×
1260
    dError("mount:%s, vgId:%d, msgType:%s, failed at line %d to mount vnode since %s", req.mountName, pCreateReq->vgId,
×
1261
           TMSG_INFO(pMsg->msgType), lino, tstrerror(code));
1262
    vmCloseFailedVnode(pMgmt, pCreateReq->vgId);
×
1263
    vnodeClose(pImpl);
×
1264
    vnodeDestroy(0, path, pMgmt->pTfs, 0);
×
1265
    if (releaseTfs) vmReleaseMountTfs(pMgmt, req.mountId, 1);
×
1266
  } else {
1267
    dInfo("mount:%s, vgId:%d, msgType:%s, success to mount vnode", req.mountName, pCreateReq->vgId,
×
1268
          TMSG_INFO(pMsg->msgType));
1269
  }
1270

1271
  pMsg->code = code;
×
1272
  pMsg->info.rsp = NULL;
×
1273
  pMsg->info.rspLen = 0;
×
1274

1275
  (void)tFreeSMountVnodeReq(&req);
×
1276
  TAOS_RETURN(code);
×
1277
}
1278
#endif  // USE_MOUNT
1279

1280
// alter replica doesn't use this, but restore dnode still use this
1281
int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
4,944✔
1282
  SAlterVnodeTypeReq req = {0};
4,944✔
1283
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
4,944!
1284
    terrno = TSDB_CODE_INVALID_MSG;
×
1285
    return -1;
×
1286
  }
1287

1288
  if (req.learnerReplicas == 0) {
1289
    req.learnerSelfIndex = -1;
1290
  }
1291

1292
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process alter-node-type-request", req.vgId,
4,944!
1293
        TMSG_INFO(pMsg->msgType));
1294

1295
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
4,944✔
1296
  if (pVnode == NULL) {
4,944!
1297
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
1298
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1299
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1300
    return -1;
×
1301
  }
1302

1303
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
4,944✔
1304
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
4,944!
1305
  if (role == TAOS_SYNC_ROLE_VOTER) {
4,944!
1306
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
1307
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
1308
    vmReleaseVnode(pMgmt, pVnode);
×
1309
    return -1;
×
1310
  }
1311

1312
  dInfo("vgId:%d, checking node catch up", req.vgId);
4,944!
1313
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
4,944✔
1314
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
4,698✔
1315
    vmReleaseVnode(pMgmt, pVnode);
4,698✔
1316
    return -1;
4,698✔
1317
  }
1318

1319
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
246!
1320

1321
  int32_t vgId = req.vgId;
246✔
1322
  dInfo("vgId:%d, start to alter vnode type replica:%d selfIndex:%d strict:%d changeVersion:%d", vgId, req.replica,
246!
1323
        req.selfIndex, req.strict, req.changeVersion);
1324
  for (int32_t i = 0; i < req.replica; ++i) {
1,004✔
1325
    SReplica *pReplica = &req.replicas[i];
758✔
1326
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
758!
1327
  }
1328
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
246!
1329
    SReplica *pReplica = &req.learnerReplicas[i];
×
1330
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
×
1331
  }
1332

1333
  if (req.replica <= 0 || (req.selfIndex < 0 && req.learnerSelfIndex < 0) || req.selfIndex >= req.replica ||
246!
1334
      req.learnerSelfIndex >= req.learnerReplica) {
246!
1335
    terrno = TSDB_CODE_INVALID_MSG;
×
1336
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
1337
    vmReleaseVnode(pMgmt, pVnode);
×
1338
    return -1;
×
1339
  }
1340

1341
  SReplica *pReplica = NULL;
246✔
1342
  if (req.selfIndex != -1) {
246!
1343
    pReplica = &req.replicas[req.selfIndex];
246✔
1344
  } else {
1345
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
×
1346
  }
1347

1348
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
246!
1349
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
246!
1350
    terrno = TSDB_CODE_INVALID_MSG;
×
1351
    dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode", vgId, pReplica->id, pReplica->fqdn,
×
1352
           pReplica->port);
1353
    vmReleaseVnode(pMgmt, pVnode);
×
1354
    return -1;
×
1355
  }
1356

1357
  dInfo("vgId:%d, start to close vnode", vgId);
246!
1358
  SWrapperCfg wrapperCfg = {
246✔
1359
      .dropped = pVnode->dropped,
246✔
1360
      .vgId = pVnode->vgId,
246✔
1361
      .vgVersion = pVnode->vgVersion,
246✔
1362
      .diskPrimary = pVnode->diskPrimary,
246✔
1363
  };
1364
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
246✔
1365

1366
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
246✔
1367
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
246✔
1368

1369
  int32_t diskPrimary = wrapperCfg.diskPrimary;
246✔
1370
  char    path[TSDB_FILENAME_LEN] = {0};
246✔
1371
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
246✔
1372

1373
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
246!
1374
  if (vnodeAlterReplica(path, &req, diskPrimary, pMgmt->pTfs) < 0) {
246!
1375
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
1376
    return -1;
×
1377
  }
1378

1379
  dInfo("vgId:%d, begin to open vnode", vgId);
246!
1380
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
246✔
1381
  if (pImpl == NULL) {
246!
1382
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
1383
    return -1;
×
1384
  }
1385

1386
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
246!
1387
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
1388
    return -1;
×
1389
  }
1390

1391
  if (vnodeStart(pImpl) != 0) {
246!
1392
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
1393
    return -1;
×
1394
  }
1395

1396
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process alter-node-type-request, vnode config is altered",
246!
1397
        req.vgId, TMSG_INFO(pMsg->msgType));
1398
  return 0;
246✔
1399
}
1400

1401
int32_t vmProcessCheckLearnCatchupReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
1402
  SCheckLearnCatchupReq req = {0};
×
1403
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
×
1404
    terrno = TSDB_CODE_INVALID_MSG;
×
1405
    return -1;
×
1406
  }
1407

1408
  if (req.learnerReplicas == 0) {
1409
    req.learnerSelfIndex = -1;
1410
  }
1411

1412
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process check-learner-catchup-request", req.vgId,
×
1413
        TMSG_INFO(pMsg->msgType));
1414

1415
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
×
1416
  if (pVnode == NULL) {
×
1417
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
1418
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1419
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1420
    return -1;
×
1421
  }
1422

1423
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
×
1424
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
×
1425
  if (role == TAOS_SYNC_ROLE_VOTER) {
×
1426
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
1427
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
1428
    vmReleaseVnode(pMgmt, pVnode);
×
1429
    return -1;
×
1430
  }
1431

1432
  dInfo("vgId:%d, checking node catch up", req.vgId);
×
1433
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
×
1434
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
×
1435
    vmReleaseVnode(pMgmt, pVnode);
×
1436
    return -1;
×
1437
  }
1438

1439
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
×
1440

1441
  vmReleaseVnode(pMgmt, pVnode);
×
1442

1443
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process check-learner-catchup-request", req.vgId,
×
1444
        TMSG_INFO(pMsg->msgType));
1445

1446
  return 0;
×
1447
}
1448

1449
int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
34✔
1450
  SDisableVnodeWriteReq req = {0};
34✔
1451
  if (tDeserializeSDisableVnodeWriteReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
34!
1452
    terrno = TSDB_CODE_INVALID_MSG;
×
1453
    return -1;
×
1454
  }
1455

1456
  dInfo("vgId:%d, vnode write disable:%d", req.vgId, req.disable);
34!
1457

1458
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
34✔
1459
  if (pVnode == NULL) {
34!
1460
    dError("vgId:%d, failed to disable write since %s", req.vgId, terrstr());
×
1461
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1462
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1463
    return -1;
×
1464
  }
1465

1466
  pVnode->disable = req.disable;
34✔
1467
  vmReleaseVnode(pMgmt, pVnode);
34✔
1468
  return 0;
34✔
1469
}
1470

1471
int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
32✔
1472
  SAlterVnodeHashRangeReq req = {0};
32✔
1473
  if (tDeserializeSAlterVnodeHashRangeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
32!
1474
    terrno = TSDB_CODE_INVALID_MSG;
×
1475
    return -1;
×
1476
  }
1477

1478
  int32_t srcVgId = req.srcVgId;
32✔
1479
  int32_t dstVgId = req.dstVgId;
32✔
1480

1481
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, dstVgId);
32✔
1482
  if (pVnode != NULL) {
32!
1483
    dError("vgId:%d, vnode already exist", dstVgId);
×
1484
    vmReleaseVnode(pMgmt, pVnode);
×
1485
    terrno = TSDB_CODE_VND_ALREADY_EXIST;
×
1486
    return -1;
×
1487
  }
1488

1489
  dInfo("vgId:%d, start to alter vnode hashrange:[%u, %u], dstVgId:%d", req.srcVgId, req.hashBegin, req.hashEnd,
32!
1490
        req.dstVgId);
1491
  pVnode = vmAcquireVnode(pMgmt, srcVgId);
32✔
1492
  if (pVnode == NULL) {
32!
1493
    dError("vgId:%d, failed to alter hashrange since %s", srcVgId, terrstr());
×
1494
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1495
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1496
    return -1;
×
1497
  }
1498

1499
  SWrapperCfg wrapperCfg = {
32✔
1500
      .dropped = pVnode->dropped,
32✔
1501
      .vgId = dstVgId,
1502
      .vgVersion = pVnode->vgVersion,
32✔
1503
      .diskPrimary = pVnode->diskPrimary,
32✔
1504
  };
1505
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
32✔
1506

1507
  // prepare alter
1508
  pVnode->toVgId = dstVgId;
32✔
1509
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
32!
1510
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
1511
    return -1;
×
1512
  }
1513

1514
  dInfo("vgId:%d, close vnode", srcVgId);
32!
1515
  vmCloseVnode(pMgmt, pVnode, true, false);
32✔
1516

1517
  int32_t diskPrimary = wrapperCfg.diskPrimary;
32✔
1518
  char    srcPath[TSDB_FILENAME_LEN] = {0};
32✔
1519
  char    dstPath[TSDB_FILENAME_LEN] = {0};
32✔
1520
  snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
32✔
1521
  snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
32✔
1522

1523
  dInfo("vgId:%d, alter vnode hashrange at %s", srcVgId, srcPath);
32!
1524
  if (vnodeAlterHashRange(srcPath, dstPath, &req, diskPrimary, pMgmt->pTfs) < 0) {
32!
1525
    dError("vgId:%d, failed to alter vnode hashrange since %s", srcVgId, terrstr());
×
1526
    return -1;
×
1527
  }
1528

1529
  dInfo("vgId:%d, open vnode", dstVgId);
32!
1530
  SVnode *pImpl = vnodeOpen(dstPath, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
32✔
1531

1532
  if (pImpl == NULL) {
32!
1533
    dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr());
×
1534
    return -1;
×
1535
  }
1536

1537
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
32!
1538
    dError("vgId:%d, failed to open vnode mgmt since %s", dstVgId, terrstr());
×
1539
    return -1;
×
1540
  }
1541

1542
  if (vnodeStart(pImpl) != 0) {
32!
1543
    dError("vgId:%d, failed to start sync since %s", dstVgId, terrstr());
×
1544
    return -1;
×
1545
  }
1546

1547
  // complete alter
1548
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
32!
1549
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
1550
    return -1;
×
1551
  }
1552

1553
  dInfo("vgId:%d, vnode hashrange is altered", dstVgId);
32!
1554
  return 0;
32✔
1555
}
1556

1557
int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
1,806✔
1558
  SAlterVnodeReplicaReq alterReq = {0};
1,806✔
1559
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
1,806!
1560
    terrno = TSDB_CODE_INVALID_MSG;
×
1561
    return -1;
×
1562
  }
1563

1564
  if (alterReq.learnerReplica == 0) {
1,806✔
1565
    alterReq.learnerSelfIndex = -1;
1,315✔
1566
  }
1567

1568
  int32_t vgId = alterReq.vgId;
1,806✔
1569
  dInfo(
1,806!
1570
      "vgId:%d, vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
1571
      "learnerSelfIndex:%d strict:%d changeVersion:%d",
1572
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
1573
      alterReq.learnerSelfIndex, alterReq.strict, alterReq.changeVersion);
1574

1575
  for (int32_t i = 0; i < alterReq.replica; ++i) {
7,149✔
1576
    SReplica *pReplica = &alterReq.replicas[i];
5,343✔
1577
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
5,343!
1578
  }
1579
  for (int32_t i = 0; i < alterReq.learnerReplica; ++i) {
2,297✔
1580
    SReplica *pReplica = &alterReq.learnerReplicas[i];
491✔
1581
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
491!
1582
  }
1583

1584
  if (alterReq.replica <= 0 || (alterReq.selfIndex < 0 && alterReq.learnerSelfIndex < 0) ||
1,806!
1585
      alterReq.selfIndex >= alterReq.replica || alterReq.learnerSelfIndex >= alterReq.learnerReplica) {
1,806!
1586
    terrno = TSDB_CODE_INVALID_MSG;
×
1587
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
1588
    return -1;
×
1589
  }
1590

1591
  SReplica *pReplica = NULL;
1,806✔
1592
  if (alterReq.selfIndex != -1) {
1,806!
1593
    pReplica = &alterReq.replicas[alterReq.selfIndex];
1,806✔
1594
  } else {
1595
    pReplica = &alterReq.learnerReplicas[alterReq.learnerSelfIndex];
×
1596
  }
1597

1598
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
1,806!
1599
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
1,806!
1600
    terrno = TSDB_CODE_INVALID_MSG;
×
1601
    dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode", vgId, pReplica->id, pReplica->fqdn,
×
1602
           pReplica->port);
1603
    return -1;
×
1604
  }
1605

1606
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
1,806✔
1607
  if (pVnode == NULL) {
1,806!
1608
    dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
×
1609
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1610
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1611
    return -1;
×
1612
  }
1613

1614
  dInfo("vgId:%d, start to close vnode", vgId);
1,806!
1615
  SWrapperCfg wrapperCfg = {
1,806✔
1616
      .dropped = pVnode->dropped,
1,806✔
1617
      .vgId = pVnode->vgId,
1,806✔
1618
      .vgVersion = pVnode->vgVersion,
1,806✔
1619
      .diskPrimary = pVnode->diskPrimary,
1,806✔
1620
  };
1621
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
1,806✔
1622

1623
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
1,806✔
1624
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
1,806✔
1625

1626
  int32_t diskPrimary = wrapperCfg.diskPrimary;
1,806✔
1627
  char    path[TSDB_FILENAME_LEN] = {0};
1,806✔
1628
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
1,806✔
1629

1630
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
1,806!
1631
  if (vnodeAlterReplica(path, &alterReq, diskPrimary, pMgmt->pTfs) < 0) {
1,806!
1632
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
1633
    return -1;
×
1634
  }
1635

1636
  dInfo("vgId:%d, begin to open vnode", vgId);
1,806!
1637
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
1,806✔
1638
  if (pImpl == NULL) {
1,806!
1639
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
1640
    return -1;
×
1641
  }
1642

1643
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
1,806!
1644
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
1645
    return -1;
×
1646
  }
1647

1648
  if (vnodeStart(pImpl) != 0) {
1,806!
1649
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
1650
    return -1;
×
1651
  }
1652

1653
  dInfo(
1,806!
1654
      "vgId:%d, vnode management handle msgType:%s, end to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
1655
      "learnerSelfIndex:%d strict:%d",
1656
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
1657
      alterReq.learnerSelfIndex, alterReq.strict);
1658
  return 0;
1,806✔
1659
}
1660

1661
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
4,613✔
1662
  int32_t       code = 0;
4,613✔
1663
  SDropVnodeReq dropReq = {0};
4,613✔
1664
  if (tDeserializeSDropVnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
4,613!
1665
    terrno = TSDB_CODE_INVALID_MSG;
×
1666
    return terrno;
×
1667
  }
1668

1669
  int32_t vgId = dropReq.vgId;
4,613✔
1670
  dInfo("vgId:%d, start to drop vnode", vgId);
4,613!
1671

1672
  if (dropReq.dnodeId != pMgmt->pData->dnodeId) {
4,613!
1673
    terrno = TSDB_CODE_INVALID_MSG;
×
1674
    dError("vgId:%d, dnodeId:%d not matched with local dnode", dropReq.vgId, dropReq.dnodeId);
×
1675
    return terrno;
×
1676
  }
1677

1678
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, vgId, false);
4,613✔
1679
  if (pVnode == NULL) {
4,613!
1680
    dInfo("vgId:%d, failed to drop since %s", vgId, terrstr());
×
1681
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1682
    return terrno;
×
1683
  }
1684

1685
  pVnode->dropped = 1;
4,613✔
1686
  if ((code = vmWriteVnodeListToFile(pMgmt)) != 0) {
4,613!
1687
    pVnode->dropped = 0;
×
1688
    vmReleaseVnode(pMgmt, pVnode);
×
1689
    return code;
×
1690
  }
1691

1692
  vmCloseVnode(pMgmt, pVnode, false, false);
4,613✔
1693
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
4,613!
1694
    dError("vgId:%d, failed to write vnode list since %s", vgId, terrstr());
×
1695
  }
1696

1697
  dInfo("vgId:%d, is dropped", vgId);
4,613!
1698
  return 0;
4,613✔
1699
}
1700

1701
int32_t vmProcessArbHeartBeatReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
84✔
1702
  SVArbHeartBeatReq arbHbReq = {0};
84✔
1703
  SVArbHeartBeatRsp arbHbRsp = {0};
84✔
1704
  if (tDeserializeSVArbHeartBeatReq(pMsg->pCont, pMsg->contLen, &arbHbReq) != 0) {
84!
1705
    terrno = TSDB_CODE_INVALID_MSG;
×
1706
    return -1;
×
1707
  }
1708

1709
  if (arbHbReq.dnodeId != pMgmt->pData->dnodeId) {
84!
1710
    terrno = TSDB_CODE_INVALID_MSG;
×
1711
    dError("dnodeId:%d not matched with local dnode", arbHbReq.dnodeId);
×
1712
    goto _OVER;
×
1713
  }
1714

1715
  if (strlen(arbHbReq.arbToken) == 0) {
84!
1716
    terrno = TSDB_CODE_INVALID_MSG;
×
1717
    dError("dnodeId:%d arbToken is empty", arbHbReq.dnodeId);
×
1718
    goto _OVER;
×
1719
  }
1720

1721
  size_t size = taosArrayGetSize(arbHbReq.hbMembers);
84✔
1722

1723
  arbHbRsp.dnodeId = pMgmt->pData->dnodeId;
84✔
1724
  tstrncpy(arbHbRsp.arbToken, arbHbReq.arbToken, TSDB_ARB_TOKEN_SIZE);
84✔
1725
  arbHbRsp.hbMembers = taosArrayInit(size, sizeof(SVArbHbRspMember));
84✔
1726
  if (arbHbRsp.hbMembers == NULL) {
84!
1727
    goto _OVER;
×
1728
  }
1729

1730
  for (int32_t i = 0; i < size; i++) {
201✔
1731
    SVArbHbReqMember *pReqMember = taosArrayGet(arbHbReq.hbMembers, i);
117✔
1732
    SVnodeObj        *pVnode = vmAcquireVnode(pMgmt, pReqMember->vgId);
117✔
1733
    if (pVnode == NULL) {
117!
1734
      dError("dnodeId:%d vgId:%d not found failed to process arb hb req", arbHbReq.dnodeId, pReqMember->vgId);
×
1735
      continue;
×
1736
    }
1737

1738
    SVArbHbRspMember rspMember = {0};
117✔
1739
    rspMember.vgId = pReqMember->vgId;
117✔
1740
    rspMember.hbSeq = pReqMember->hbSeq;
117✔
1741
    if (vnodeGetArbToken(pVnode->pImpl, rspMember.memberToken) != 0) {
117!
1742
      dError("dnodeId:%d vgId:%d failed to get arb token", arbHbReq.dnodeId, pReqMember->vgId);
×
1743
      vmReleaseVnode(pMgmt, pVnode);
×
1744
      continue;
×
1745
    }
1746

1747
    if (vnodeUpdateArbTerm(pVnode->pImpl, arbHbReq.arbTerm) != 0) {
117!
1748
      dError("dnodeId:%d vgId:%d failed to update arb term", arbHbReq.dnodeId, pReqMember->vgId);
×
1749
      vmReleaseVnode(pMgmt, pVnode);
×
1750
      continue;
×
1751
    }
1752

1753
    if (taosArrayPush(arbHbRsp.hbMembers, &rspMember) == NULL) {
234!
1754
      dError("dnodeId:%d vgId:%d failed to push arb hb rsp member", arbHbReq.dnodeId, pReqMember->vgId);
×
1755
      vmReleaseVnode(pMgmt, pVnode);
×
1756
      goto _OVER;
×
1757
    }
1758

1759
    vmReleaseVnode(pMgmt, pVnode);
117✔
1760
  }
1761

1762
  SRpcMsg rspMsg = {.info = pMsg->info};
84✔
1763
  int32_t rspLen = tSerializeSVArbHeartBeatRsp(NULL, 0, &arbHbRsp);
84✔
1764
  if (rspLen < 0) {
84!
1765
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1766
    goto _OVER;
×
1767
  }
1768

1769
  void *pRsp = rpcMallocCont(rspLen);
84✔
1770
  if (pRsp == NULL) {
84!
1771
    terrno = terrno;
×
1772
    goto _OVER;
×
1773
  }
1774

1775
  if (tSerializeSVArbHeartBeatRsp(pRsp, rspLen, &arbHbRsp) <= 0) {
84!
1776
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1777
    rpcFreeCont(pRsp);
×
1778
    goto _OVER;
×
1779
  }
1780
  pMsg->info.rsp = pRsp;
84✔
1781
  pMsg->info.rspLen = rspLen;
84✔
1782

1783
  terrno = TSDB_CODE_SUCCESS;
84✔
1784

1785
_OVER:
84✔
1786
  tFreeSVArbHeartBeatReq(&arbHbReq);
84✔
1787
  tFreeSVArbHeartBeatRsp(&arbHbRsp);
84✔
1788
  return terrno;
84✔
1789
}
1790

1791
SArray *vmGetMsgHandles() {
3,073✔
1792
  int32_t code = -1;
3,073✔
1793
  SArray *pArray = taosArrayInit(32, sizeof(SMgmtHandle));
3,073✔
1794
  if (pArray == NULL) goto _OVER;
3,073!
1795

1796
  if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1797
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
3,073!
1798
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
3,073!
1799
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
3,073!
1800
  if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
3,073!
1801
  if (dmSetMgmtHandle(pArray, TDMT_VND_EXEC_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
3,073!
1802
  if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
3,073!
1803
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
3,073!
1804
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1805
  if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1806
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
3,073!
1807
  if (dmSetMgmtHandle(pArray, TDMT_VND_VSUBTABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
3,073!
1808
  if (dmSetMgmtHandle(pArray, TDMT_VND_VSTB_REF_DBS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
3,073!
1809
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_CFG, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
3,073!
1810
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
3,073!
1811
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
3,073!
1812
  if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
3,073!
1813
  if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
3,073!
1814
  if (dmSetMgmtHandle(pArray, TDMT_SCH_TASK_NOTIFY, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
3,073!
1815
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1816
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1817
  if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_TTL_EXPIRED_TBS, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1818
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1819
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1820
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1821
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1822
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_SMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1823
  if (dmSetMgmtHandle(pArray, TDMT_VND_CANCEL_SMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1824
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1825
  if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1826
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1827
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DELETE_SUB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1828
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1829
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SEEK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
3,073!
1830
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_ADD_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1831
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DEL_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1832
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
3,073!
1833
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME_PUSH, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
3,073!
1834
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_WALINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
3,073!
1835
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_COMMITTEDINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
3,073!
1836
  if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1837
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1838
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1839
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
3,073!
1840
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1841
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1842
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_COMPACT_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
3,073!
1843
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1844
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_NAME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
3,073!
1845
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_SSMIGRATE_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
3,073!
1846
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_SSMIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1847

1848
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1849
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1850
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
3,073!
1851
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
3,073!
1852
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
3,073!
1853
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
3,073!
1854
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
3,073!
1855
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
3,073!
1856
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
3,073!
1857
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
3,073!
1858
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
3,073!
1859
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1860
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1861
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_ALL_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1862
  if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
3,073!
1863
  if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
3,073!
1864
  if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
3,073!
1865
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY, vmPutMsgToStreamLongExecQueue, 0) == NULL) goto _OVER;
3,073!
1866

1867
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_CHKPT_EXEC, vmPutMsgToStreamChkQueue, 0) == NULL) goto _OVER;
3,073!
1868
  if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
3,073!
1869
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
3,073!
1870
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
3,073!
1871

1872
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1873
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_CONSEN_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1874
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1875
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1876
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1877
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_START, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1878
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1879
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1880
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1881

1882
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
3,073!
1883
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1884
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1885
  if (dmSetMgmtHandle(pArray, TDMT_VND_DISABLE_WRITE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
3,073!
1886
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
3,073!
1887
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1888
  if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1889
  if (dmSetMgmtHandle(pArray, TDMT_VND_SSMIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1890
  if (dmSetMgmtHandle(pArray, TDMT_VND_FOLLOWER_SSMIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1891
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
3,073!
1892
  if (dmSetMgmtHandle(pArray, TDMT_DND_MOUNT_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
3,073!
1893
  if (dmSetMgmtHandle(pArray, TDMT_DND_RETRIEVE_MOUNT_PATH, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
3,073!
1894
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
3,073!
1895
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
3,073!
1896
  if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
3,073!
1897
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CONFIG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1898

1899
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT_ELECTION, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
3,073!
1900
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
3,073!
1901
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
3,073!
1902
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
3,073!
1903
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
3,073!
1904
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
3,073!
1905
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
3,073!
1906
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
3,073!
1907
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
3,073!
1908
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
3,073!
1909
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
3,073!
1910
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_FORCE_FOLLOWER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
3,073!
1911

1912
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
3,073!
1913
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
3,073!
1914
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
3,073!
1915
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
3,073!
1916
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
3,073!
1917

1918
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_HEARTBEAT, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
3,073!
1919
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_CHECK_SYNC, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
3,073!
1920
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_ASSIGNED_LEADER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
3,073!
1921

1922
  code = 0;
3,073✔
1923

1924
_OVER:
3,073✔
1925
  if (code != 0) {
3,073!
1926
    taosArrayDestroy(pArray);
×
1927
    return NULL;
×
1928
  } else {
1929
    return pArray;
3,073✔
1930
  }
1931
}
1932

1933
void vmUpdateMetricsInfo(SVnodeMgmt *pMgmt, int64_t clusterId) {
×
1934
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
1935

1936
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
1937
  while (pIter) {
×
1938
    SVnodeObj **ppVnode = pIter;
×
1939
    if (ppVnode == NULL || *ppVnode == NULL) {
×
1940
      continue;
×
1941
    }
1942

1943
    SVnodeObj *pVnode = *ppVnode;
×
1944
    if (!pVnode->failed) {
×
1945
      SRawWriteMetrics metrics = {0};
×
1946
      if (vnodeGetRawWriteMetrics(pVnode->pImpl, &metrics) == 0) {
×
1947
        // Add the metrics to the global metrics system with cluster ID
1948
        SName   name = {0};
×
1949
        int32_t code = tNameFromString(&name, pVnode->pImpl->config.dbname, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1950
        if (code < 0) {
×
1951
          dError("failed to get db name since %s", tstrerror(code));
×
1952
          continue;
×
1953
        }
1954
        code = addWriteMetrics(pVnode->vgId, pMgmt->pData->dnodeId, clusterId, tsLocalEp, name.dbname, &metrics);
×
1955
        if (code != TSDB_CODE_SUCCESS) {
×
1956
          dError("Failed to add write metrics for vgId: %d, code: %d", pVnode->vgId, code);
×
1957
        } else {
1958
          // After successfully adding metrics, reset the vnode's write metrics using atomic operations
1959
          if (vnodeResetRawWriteMetrics(pVnode->pImpl, &metrics) != 0) {
×
1960
            dError("Failed to reset write metrics for vgId: %d", pVnode->vgId);
×
1961
          }
1962
        }
1963
      } else {
1964
        dError("Failed to get write metrics for vgId: %d", pVnode->vgId);
×
1965
      }
1966
    }
1967
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
1968
  }
1969

1970
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
1971
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc