• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4674

18 Aug 2025 07:58AM UTC coverage: 59.821% (+0.1%) from 59.715%
#4674

push

travis-ci

web-flow
test: update case desc (#32551)

136937 of 292075 branches covered (46.88%)

Branch coverage included in aggregate %.

207916 of 284395 relevant lines covered (73.11%)

4553289.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.39
/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "metrics.h"
18
#include "taos_monitor.h"
19
#include "vmInt.h"
20
#include "vnd.h"
21
#include "vnodeInt.h"
22

23
extern taos_counter_t *tsInsertCounter;
24

25
// Forward declaration for function defined in metrics.c
26
extern int32_t addWriteMetrics(int32_t vgId, int32_t dnodeId, int64_t clusterId, const char *dnodeEp,
27
                               const char *dbname, const SRawWriteMetrics *pRawMetrics);
28

29
void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
104,613✔
30
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad));
104,613✔
31
  if (pInfo->pVloads == NULL) return;
104,613!
32

33
  tfsUpdateSize(pMgmt->pTfs);
104,613✔
34

35
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
104,613✔
36

37
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
104,613✔
38
  while (pIter) {
362,698✔
39
    SVnodeObj **ppVnode = pIter;
258,085✔
40
    if (ppVnode == NULL || *ppVnode == NULL) continue;
258,085!
41

42
    SVnodeObj *pVnode = *ppVnode;
258,085✔
43
    SVnodeLoad vload = {.vgId = pVnode->vgId};
258,085✔
44
    if (!pVnode->failed) {
258,085!
45
      if (vnodeGetLoad(pVnode->pImpl, &vload) != 0) {
258,085!
46
        dError("failed to get vnode load");
×
47
      }
48
      if (isReset) vnodeResetLoad(pVnode->pImpl, &vload);
258,085✔
49
    }
50
    if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
516,170!
51
      dError("failed to push vnode load");
×
52
    }
53
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
258,085✔
54
  }
55

56
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
104,613✔
57
}
58

59
void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
×
60
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoadLite));
×
61
  if (!pInfo->pVloads) return;
×
62

63
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
64

65
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
66
  while (pIter) {
×
67
    SVnodeObj **ppVnode = pIter;
×
68
    if (ppVnode == NULL || *ppVnode == NULL) continue;
×
69

70
    SVnodeObj *pVnode = *ppVnode;
×
71
    if (!pVnode->failed) {
×
72
      SVnodeLoadLite vload = {0};
×
73
      if (vnodeGetLoadLite(pVnode->pImpl, &vload) == 0) {
×
74
        if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
×
75
          taosArrayDestroy(pInfo->pVloads);
×
76
          pInfo->pVloads = NULL;
×
77
          break;
×
78
        }
79
      }
80
    }
81
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
82
  }
83

84
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
85
}
86

87
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
17✔
88
  SMonVloadInfo vloads = {0};
17✔
89
  vmGetVnodeLoads(pMgmt, &vloads, true);
17✔
90

91
  SArray *pVloads = vloads.pVloads;
17✔
92
  if (pVloads == NULL) return;
17!
93

94
  int32_t totalVnodes = 0;
17✔
95
  int32_t masterNum = 0;
17✔
96
  int64_t numOfSelectReqs = 0;
17✔
97
  int64_t numOfInsertReqs = 0;
17✔
98
  int64_t numOfInsertSuccessReqs = 0;
17✔
99
  int64_t numOfBatchInsertReqs = 0;
17✔
100
  int64_t numOfBatchInsertSuccessReqs = 0;
17✔
101

102
  for (int32_t i = 0; i < taosArrayGetSize(pVloads); ++i) {
53✔
103
    SVnodeLoad *pLoad = taosArrayGet(pVloads, i);
36✔
104
    numOfSelectReqs += pLoad->numOfSelectReqs;
36✔
105
    numOfInsertReqs += pLoad->numOfInsertReqs;
36✔
106
    numOfInsertSuccessReqs += pLoad->numOfInsertSuccessReqs;
36✔
107
    numOfBatchInsertReqs += pLoad->numOfBatchInsertReqs;
36✔
108
    numOfBatchInsertSuccessReqs += pLoad->numOfBatchInsertSuccessReqs;
36✔
109
    if (pLoad->syncState == TAOS_SYNC_STATE_LEADER || pLoad->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
36!
110
      masterNum++;
36✔
111
    }
112
    totalVnodes++;
36✔
113
  }
114

115
  pInfo->vstat.totalVnodes = totalVnodes;
17✔
116
  pInfo->vstat.masterNum = masterNum;
17✔
117
  pInfo->vstat.numOfSelectReqs = numOfSelectReqs;
17✔
118
  pInfo->vstat.numOfInsertReqs = numOfInsertReqs;                          // delta
17✔
119
  pInfo->vstat.numOfInsertSuccessReqs = numOfInsertSuccessReqs;            // delta
17✔
120
  pInfo->vstat.numOfBatchInsertReqs = numOfBatchInsertReqs;                // delta
17✔
121
  pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;  // delta
17✔
122
  pMgmt->state.totalVnodes = totalVnodes;
17✔
123
  pMgmt->state.masterNum = masterNum;
17✔
124
  pMgmt->state.numOfSelectReqs = numOfSelectReqs;
17✔
125
  pMgmt->state.numOfInsertReqs = numOfInsertReqs;
17✔
126
  pMgmt->state.numOfInsertSuccessReqs = numOfInsertSuccessReqs;
17✔
127
  pMgmt->state.numOfBatchInsertReqs = numOfBatchInsertReqs;
17✔
128
  pMgmt->state.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;
17✔
129

130
  if (tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs) != 0) {
17!
131
    dError("failed to get tfs monitor info");
×
132
  }
133
  taosArrayDestroy(pVloads);
17✔
134
}
135

136
void vmCleanExpriedSamples(SVnodeMgmt *pMgmt) {
17✔
137
  int list_size = taos_counter_get_keys_size(tsInsertCounter);
17✔
138
  if (list_size == 0) return;
17!
139
  int32_t *vgroup_ids;
140
  char   **keys;
141
  int      r = 0;
×
142
  r = taos_counter_get_vgroup_ids(tsInsertCounter, &keys, &vgroup_ids, &list_size);
×
143
  if (r) {
×
144
    dError("failed to get vgroup ids");
×
145
    return;
×
146
  }
147
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
148
  for (int i = 0; i < list_size; i++) {
×
149
    int32_t vgroup_id = vgroup_ids[i];
×
150
    void   *vnode = taosHashGet(pMgmt->runngingHash, &vgroup_id, sizeof(int32_t));
×
151
    if (vnode == NULL) {
×
152
      r = taos_counter_delete(tsInsertCounter, keys[i]);
×
153
      if (r) {
×
154
        dError("failed to delete monitor sample key:%s", keys[i]);
×
155
      }
156
    }
157
  }
158
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
159
  if (vgroup_ids) taosMemoryFree(vgroup_ids);
×
160
  if (keys) taosMemoryFree(keys);
×
161
  return;
×
162
}
163

164
void vmCleanExpiredMetrics(SVnodeMgmt *pMgmt) {
×
165
  if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0 || !tsEnableMetrics) {
×
166
    return;
×
167
  }
168

169
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
170
  void     *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
171
  SHashObj *pValidVgroups = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
×
172
  if (pValidVgroups == NULL) {
×
173
    (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
174
    return;
×
175
  }
176

177
  while (pIter != NULL) {
×
178
    SVnodeObj **ppVnode = pIter;
×
179
    if (ppVnode && *ppVnode) {
×
180
      int32_t vgId = (*ppVnode)->vgId;
×
181
      char    dummy = 1;  // hash table value (we only care about the key)
×
182
      if (taosHashPut(pValidVgroups, &vgId, sizeof(int32_t), &dummy, sizeof(char)) != 0) {
×
183
        dError("failed to put vgId:%d to valid vgroups hash", vgId);
×
184
      }
185
    }
186
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
187
  }
188
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
189

190
  // Clean expired metrics by removing metrics for non-existent vgroups
191
  int32_t code = cleanupExpiredMetrics(pValidVgroups);
×
192
  if (code != TSDB_CODE_SUCCESS) {
×
193
    dError("failed to clean expired metrics, code:%d", code);
×
194
  }
195

196
  taosHashCleanup(pValidVgroups);
×
197
}
198

199
static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
9,879✔
200
  memcpy(pCfg, &vnodeCfgDefault, sizeof(SVnodeCfg));
9,879✔
201

202
  pCfg->vgId = pCreate->vgId;
9,879✔
203
  tstrncpy(pCfg->dbname, pCreate->db, sizeof(pCfg->dbname));
9,879✔
204
  pCfg->dbId = pCreate->dbUid;
9,879✔
205
  pCfg->szPage = pCreate->pageSize * 1024;
9,879✔
206
  pCfg->szCache = pCreate->pages;
9,879✔
207
  pCfg->cacheLast = pCreate->cacheLast;
9,879✔
208
  pCfg->cacheLastSize = pCreate->cacheLastSize;
9,879✔
209
  pCfg->szBuf = (uint64_t)pCreate->buffer * 1024 * 1024;
9,879✔
210
  pCfg->isWeak = true;
9,879✔
211
  pCfg->isTsma = pCreate->isTsma;
9,879✔
212
  pCfg->tsdbCfg.compression = pCreate->compression;
9,879✔
213
  pCfg->tsdbCfg.precision = pCreate->precision;
9,879✔
214
  pCfg->tsdbCfg.days = pCreate->daysPerFile;
9,879✔
215
  pCfg->tsdbCfg.keep0 = pCreate->daysToKeep0;
9,879✔
216
  pCfg->tsdbCfg.keep1 = pCreate->daysToKeep1;
9,879✔
217
  pCfg->tsdbCfg.keep2 = pCreate->daysToKeep2;
9,879✔
218
  pCfg->tsdbCfg.keepTimeOffset = pCreate->keepTimeOffset;
9,879✔
219
  pCfg->tsdbCfg.minRows = pCreate->minRows;
9,879✔
220
  pCfg->tsdbCfg.maxRows = pCreate->maxRows;
9,879✔
221
  for (size_t i = 0; i < taosArrayGetSize(pCreate->pRetensions); ++i) {
9,879!
222
    SRetention *pRetention = &pCfg->tsdbCfg.retentions[i];
×
223
    memcpy(pRetention, taosArrayGet(pCreate->pRetensions, i), sizeof(SRetention));
×
224
    if (i == 0) {
×
225
      if ((pRetention->freq >= 0 && pRetention->keep > 0)) pCfg->isRsma = 1;
×
226
    }
227
  }
228
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
229
  pCfg->tsdbCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
9,878✔
230
  if (pCfg->tsdbCfg.encryptAlgorithm == DND_CA_SM4) {
9,878✔
231
    tstrncpy(pCfg->tsdbCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
3✔
232
  }
233
#else
234
  pCfg->tsdbCfg.encryptAlgorithm = 0;
235
#endif
236

237
  pCfg->walCfg.vgId = pCreate->vgId;  // pCreate->mountVgId ? pCreate->mountVgId : pCreate->vgId;
9,878✔
238
  pCfg->walCfg.fsyncPeriod = pCreate->walFsyncPeriod;
9,878✔
239
  pCfg->walCfg.retentionPeriod = pCreate->walRetentionPeriod;
9,878✔
240
  pCfg->walCfg.rollPeriod = pCreate->walRollPeriod;
9,878✔
241
  pCfg->walCfg.retentionSize = pCreate->walRetentionSize;
9,878✔
242
  pCfg->walCfg.segSize = pCreate->walSegmentSize;
9,878✔
243
  pCfg->walCfg.level = pCreate->walLevel;
9,878✔
244
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
245
  pCfg->walCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
9,878✔
246
  if (pCfg->walCfg.encryptAlgorithm == DND_CA_SM4) {
9,878✔
247
    tstrncpy(pCfg->walCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
3✔
248
  }
249
#else
250
  pCfg->walCfg.encryptAlgorithm = 0;
251
#endif
252

253
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
254
  pCfg->tdbEncryptAlgorithm = pCreate->encryptAlgorithm;
9,878✔
255
  if (pCfg->tdbEncryptAlgorithm == DND_CA_SM4) {
9,878✔
256
    tstrncpy(pCfg->tdbEncryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
3✔
257
  }
258
#else
259
  pCfg->tdbEncryptAlgorithm = 0;
260
#endif
261

262
  pCfg->sttTrigger = pCreate->sstTrigger;
9,878✔
263
  pCfg->hashBegin = pCreate->hashBegin;
9,878✔
264
  pCfg->hashEnd = pCreate->hashEnd;
9,878✔
265
  pCfg->hashMethod = pCreate->hashMethod;
9,878✔
266
  pCfg->hashPrefix = pCreate->hashPrefix;
9,878✔
267
  pCfg->hashSuffix = pCreate->hashSuffix;
9,878✔
268
  pCfg->tsdbPageSize = pCreate->tsdbPageSize * 1024;
9,878✔
269

270
  pCfg->ssChunkSize = pCreate->ssChunkSize;
9,878✔
271
  pCfg->ssKeepLocal = pCreate->ssKeepLocal;
9,878✔
272
  pCfg->ssCompact = pCreate->ssCompact;
9,878✔
273

274
  pCfg->standby = 0;
9,878✔
275
  pCfg->syncCfg.replicaNum = 0;
9,878✔
276
  pCfg->syncCfg.totalReplicaNum = 0;
9,878✔
277
  pCfg->syncCfg.changeVersion = pCreate->changeVersion;
9,878✔
278

279
  memset(&pCfg->syncCfg.nodeInfo, 0, sizeof(pCfg->syncCfg.nodeInfo));
9,878✔
280
  for (int32_t i = 0; i < pCreate->replica; ++i) {
22,440✔
281
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
12,561✔
282
    pNode->nodeId = pCreate->replicas[pCfg->syncCfg.replicaNum].id;
12,561✔
283
    pNode->nodePort = pCreate->replicas[pCfg->syncCfg.replicaNum].port;
12,561✔
284
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
12,561✔
285
    tstrncpy(pNode->nodeFqdn, pCreate->replicas[pCfg->syncCfg.replicaNum].fqdn, TSDB_FQDN_LEN);
12,561✔
286
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
12,561✔
287
    pCfg->syncCfg.replicaNum++;
12,562✔
288
  }
289
  if (pCreate->selfIndex != -1) {
9,879✔
290
    pCfg->syncCfg.myIndex = pCreate->selfIndex;
9,696✔
291
  }
292
  for (int32_t i = pCfg->syncCfg.replicaNum; i < pCreate->replica + pCreate->learnerReplica; ++i) {
10,061✔
293
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
182✔
294
    pNode->nodeId = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].id;
182✔
295
    pNode->nodePort = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].port;
182✔
296
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
182✔
297
    tstrncpy(pNode->nodeFqdn, pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].fqdn, TSDB_FQDN_LEN);
182✔
298
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
182✔
299
    pCfg->syncCfg.totalReplicaNum++;
182✔
300
  }
301
  pCfg->syncCfg.totalReplicaNum += pCfg->syncCfg.replicaNum;
9,879✔
302
  if (pCreate->learnerSelfIndex != -1) {
9,879✔
303
    pCfg->syncCfg.myIndex = pCreate->replica + pCreate->learnerSelfIndex;
182✔
304
  }
305
}
9,879✔
306

307
static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) {
9,877✔
308
  pCfg->vgId = pCreate->vgId;
9,877✔
309
  pCfg->vgVersion = pCreate->vgVersion;
9,877✔
310
  pCfg->dropped = 0;
9,877✔
311
  snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCreate->vgId);
9,877✔
312
}
9,877✔
313

314
static int32_t vmTsmaAdjustDays(SVnodeCfg *pCfg, SCreateVnodeReq *pReq) {
9,868✔
315
  if (pReq->isTsma) {
9,868!
316
    SMsgHead *smaMsg = pReq->pTsma;
×
317
    uint32_t  contLen = (uint32_t)(htonl(smaMsg->contLen) - sizeof(SMsgHead));
×
318
    return smaGetTSmaDays(pCfg, POINTER_SHIFT(smaMsg, sizeof(SMsgHead)), contLen, &pCfg->tsdbCfg.days);
×
319
  }
320
  return 0;
9,868✔
321
}
322

323
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
9,870✔
324
  SCreateVnodeReq req = {0};
9,870✔
325
  SVnodeCfg       vnodeCfg = {0};
9,870✔
326
  SWrapperCfg     wrapperCfg = {0};
9,870✔
327
  int32_t         code = -1;
9,870✔
328
  char            path[TSDB_FILENAME_LEN] = {0};
9,870✔
329

330
  if (tDeserializeSCreateVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
9,870!
331
    return TSDB_CODE_INVALID_MSG;
×
332
  }
333

334
  if (req.learnerReplica == 0) {
9,850✔
335
    req.learnerSelfIndex = -1;
9,670✔
336
  }
337

338
  dInfo(
9,850!
339
      "vgId:%d, vnode management handle msgType:%s, start to create vnode, page:%d pageSize:%d buffer:%d szPage:%d "
340
      "szBuf:%" PRIu64 ", cacheLast:%d cacheLastSize:%d sstTrigger:%d tsdbPageSize:%d %d dbname:%s dbId:%" PRId64
341
      ", days:%d keep0:%d keep1:%d keep2:%d keepTimeOffset%d ssChunkSize:%d ssKeepLocal:%d ssCompact:%d tsma:%d "
342
      "precision:%d compression:%d minRows:%d maxRows:%d"
343
      ", wal fsync:%d level:%d retentionPeriod:%d retentionSize:%" PRId64 " rollPeriod:%d segSize:%" PRId64
344
      ", hash method:%d begin:%u end:%u prefix:%d surfix:%d replica:%d selfIndex:%d "
345
      "learnerReplica:%d learnerSelfIndex:%d strict:%d changeVersion:%d encryptAlgorithm:%d",
346
      req.vgId, TMSG_INFO(pMsg->msgType), req.pages, req.pageSize, req.buffer, req.pageSize * 1024,
347
      (uint64_t)req.buffer * 1024 * 1024, req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize,
348
      req.tsdbPageSize * 1024, req.db, req.dbUid, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
349
      req.keepTimeOffset, req.ssChunkSize, req.ssKeepLocal, req.ssCompact, req.isTsma, req.precision, req.compression,
350
      req.minRows, req.maxRows, req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize,
351
      req.walRollPeriod, req.walSegmentSize, req.hashMethod, req.hashBegin, req.hashEnd, req.hashPrefix, req.hashSuffix,
352
      req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict, req.changeVersion,
353
      req.encryptAlgorithm);
354

355
  for (int32_t i = 0; i < req.replica; ++i) {
22,425✔
356
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port,
12,554!
357
          req.replicas[i].id);
358
  }
359
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
10,053✔
360
    dInfo("vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.vgId, i, req.learnerReplicas[i].fqdn,
182!
361
          req.learnerReplicas[i].port, req.replicas[i].id);
362
  }
363

364
  SReplica *pReplica = NULL;
9,871✔
365
  if (req.selfIndex != -1) {
9,871✔
366
    pReplica = &req.replicas[req.selfIndex];
9,689✔
367
  } else {
368
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
182✔
369
  }
370
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
9,871!
371
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
9,871!
372
    (void)tFreeSCreateVnodeReq(&req);
×
373
    code = TSDB_CODE_INVALID_MSG;
×
374
    dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode, reason:%s", req.vgId, pReplica->id,
×
375
           pReplica->fqdn, pReplica->port, tstrerror(code));
376
    return code;
×
377
  }
378

379
  if (req.encryptAlgorithm == DND_CA_SM4) {
9,871✔
380
    if (strlen(tsEncryptKey) == 0) {
3!
381
      (void)tFreeSCreateVnodeReq(&req);
×
382
      code = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
383
      dError("vgId:%d, failed to create vnode since encrypt key is empty, reason:%s", req.vgId, tstrerror(code));
×
384
      return code;
×
385
    }
386
  }
387

388
  vmGenerateVnodeCfg(&req, &vnodeCfg);
9,871✔
389

390
  if ((code = vmTsmaAdjustDays(&vnodeCfg, &req)) < 0) {
9,868!
391
    dError("vgId:%d, failed to adjust tsma days since %s", req.vgId, tstrerror(code));
×
392
    goto _OVER;
×
393
  }
394

395
  vmGenerateWrapperCfg(pMgmt, &req, &wrapperCfg);
9,868✔
396

397
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, req.vgId, false);
9,869✔
398
  if (pVnode != NULL && (req.replica == 1 || !pVnode->failed)) {
9,868!
399
    dError("vgId:%d, already exist", req.vgId);
6!
400
    (void)tFreeSCreateVnodeReq(&req);
6✔
401
    vmReleaseVnode(pMgmt, pVnode);
6✔
402
    code = TSDB_CODE_VND_ALREADY_EXIST;
6✔
403
    return 0;
6✔
404
  }
405

406
  int32_t diskPrimary = vmGetPrimaryDisk(pMgmt, vnodeCfg.vgId);
9,862✔
407
  if (diskPrimary < 0) {
9,864!
408
    diskPrimary = vmAllocPrimaryDisk(pMgmt, vnodeCfg.vgId);
9,864✔
409
  }
410
  wrapperCfg.diskPrimary = diskPrimary;
9,865✔
411

412
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
9,865✔
413

414
  if ((code = vnodeCreate(path, &vnodeCfg, diskPrimary, pMgmt->pTfs)) < 0) {
9,865!
415
    dError("vgId:%d, failed to create vnode since %s", req.vgId, tstrerror(code));
×
416
    vmReleaseVnode(pMgmt, pVnode);
×
417
    vmCleanPrimaryDisk(pMgmt, req.vgId);
×
418
    (void)tFreeSCreateVnodeReq(&req);
×
419
    return code;
×
420
  }
421

422
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, true);
9,865✔
423
  if (pImpl == NULL) {
9,865!
424
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
425
    code = terrno != 0 ? terrno : -1;
×
426
    goto _OVER;
×
427
  }
428

429
  code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl);
9,865✔
430
  if (code != 0) {
9,865!
431
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
432
    code = terrno != 0 ? terrno : code;
×
433
    goto _OVER;
×
434
  }
435

436
  code = vnodeStart(pImpl);
9,865✔
437
  if (code != 0) {
9,865!
438
    dError("vgId:%d, failed to start sync since %s", req.vgId, terrstr());
×
439
    goto _OVER;
×
440
  }
441

442
  code = vmWriteVnodeListToFile(pMgmt);
9,865✔
443
  if (code != 0) {
9,865!
444
    code = terrno != 0 ? terrno : code;
×
445
    goto _OVER;
×
446
  }
447

448
_OVER:
9,865✔
449
  vmCleanPrimaryDisk(pMgmt, req.vgId);
9,865✔
450

451
  if (code != 0) {
9,865!
452
    vmCloseFailedVnode(pMgmt, req.vgId);
×
453

454
    vnodeClose(pImpl);
×
455
    vnodeDestroy(0, path, pMgmt->pTfs, 0);
×
456
  } else {
457
    dInfo("vgId:%d, vnode management handle msgType:%s, end to create vnode, vnode is created", req.vgId,
9,865!
458
          TMSG_INFO(pMsg->msgType));
459
  }
460

461
  (void)tFreeSCreateVnodeReq(&req);
9,865✔
462
  terrno = code;
9,865✔
463
  return code;
9,865✔
464
}
465

466
#ifdef USE_MOUNT
467
typedef struct {
468
  int64_t dbId;
469
  int32_t vgId;
470
  int32_t diskPrimary;
471
} SMountDbVgId;
472
extern int32_t vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo);
473
extern int32_t mndFetchSdbStables(const char *mntName, const char *path, void *output);
474

475
static int compareVnodeInfo(const void *p1, const void *p2) {
21✔
476
  SVnodeInfo *v1 = (SVnodeInfo *)p1;
21✔
477
  SVnodeInfo *v2 = (SVnodeInfo *)p2;
21✔
478

479
  if (v1->config.dbId == v2->config.dbId) {
21✔
480
    if (v1->config.vgId == v2->config.vgId) {
12!
481
      return 0;
×
482
    }
483
    return v1->config.vgId > v2->config.vgId ? 1 : -1;
12!
484
  }
485

486
  return v1->config.dbId > v2->config.dbId ? 1 : -1;
9!
487
}
488
static int compareVgDiskPrimary(const void *p1, const void *p2) {
21✔
489
  SMountDbVgId *v1 = (SMountDbVgId *)p1;
21✔
490
  SMountDbVgId *v2 = (SMountDbVgId *)p2;
21✔
491

492
  if (v1->dbId == v2->dbId) {
21✔
493
    if (v1->vgId == v2->vgId) {
12!
494
      return 0;
×
495
    }
496
    return v1->vgId > v2->vgId ? 1 : -1;
12!
497
  }
498

499
  return v1->dbId > v2->dbId ? 1 : -1;
9!
500
}
501

502
static int32_t vmRetrieveMountDnode(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
3✔
503
  int32_t   code = 0, lino = 0;
3✔
504
  TdFilePtr pFile = NULL;
3✔
505
  char     *content = NULL;
3✔
506
  SJson    *pJson = NULL;
3✔
507
  int64_t   size = 0;
3✔
508
  int64_t   clusterId = 0, dropped = 0, encryptScope = 0;
3✔
509
  char      file[TSDB_MOUNT_FPATH_LEN] = {0};
3✔
510
  SArray   *pDisks = NULL;
3✔
511
  // step 1: fetch clusterId from dnode.json
512
  (void)snprintf(file, sizeof(file), "%s%s%s%sdnode.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP);
3✔
513
  TAOS_CHECK_EXIT(taosStatFile(file, &size, NULL, NULL));
3!
514
  TSDB_CHECK_NULL((pFile = taosOpenFile(file, TD_FILE_READ)), code, lino, _exit, terrno);
3!
515
  TSDB_CHECK_NULL((content = taosMemoryMalloc(size + 1)), code, lino, _exit, terrno);
3!
516
  if (taosReadFile(pFile, content, size) != size) {
3!
517
    TAOS_CHECK_EXIT(terrno);
×
518
  }
519
  content[size] = '\0';
3✔
520
  pJson = tjsonParse(content);
3✔
521
  if (pJson == NULL) {
3!
522
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
523
  }
524
  tjsonGetNumberValue(pJson, "dropped", dropped, code);
3✔
525
  TAOS_CHECK_EXIT(code);
3!
526
  if (dropped == 1) {
3!
527
    TAOS_CHECK_EXIT(TSDB_CODE_MND_MOUNT_DNODE_DROPPED);
×
528
  }
529
  tjsonGetNumberValue(pJson, "encryptScope", encryptScope, code);
3✔
530
  TAOS_CHECK_EXIT(code);
3!
531
  if (encryptScope != 0) {
3!
532
    TAOS_CHECK_EXIT(TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG);
×
533
  }
534
  tjsonGetNumberValue(pJson, "clusterId", clusterId, code);
3✔
535
  TAOS_CHECK_EXIT(code);
3!
536
  if (clusterId == 0) {
3!
537
    TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_CLUSTER_ID);
×
538
  }
539
  pMountInfo->clusterId = clusterId;
3✔
540
  if (content != NULL) taosMemoryFreeClear(content);
3!
541
  if (pJson != NULL) {
3!
542
    cJSON_Delete(pJson);
3✔
543
    pJson = NULL;
3✔
544
  }
545
  if (pFile != NULL) taosCloseFile(&pFile);
3!
546
  // step 2: fetch dataDir from dnode/config/local.json
547
  TAOS_CHECK_EXIT(vmGetMountDisks(pMgmt, pReq->mountPath, &pDisks));
3!
548
  int32_t nDisks = taosArrayGetSize(pDisks);
3✔
549
  if (nDisks < 1 || nDisks > TFS_MAX_DISKS) {
3!
550
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
551
  }
552
  for (int32_t i = 0; i < nDisks; ++i) {
21✔
553
    SDiskCfg *pDisk = TARRAY_GET_ELEM(pDisks, i);
18✔
554
    if (!pMountInfo->pDisks[pDisk->level]) {
18✔
555
      pMountInfo->pDisks[pDisk->level] = taosArrayInit(1, sizeof(char *));
6✔
556
      if (!pMountInfo->pDisks[pDisk->level]) {
6!
557
        TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
558
      }
559
    }
560
    char *pDir = taosStrdup(pDisk->dir);
18!
561
    if (pDir == NULL) {
18!
562
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
563
    }
564
    if (pDisk->primary == 1 && taosArrayGetSize(pMountInfo->pDisks[0])) {
18!
565
      // put the primary disk to the first position of level 0
566
      if (!taosArrayInsert(pMountInfo->pDisks[0], 0, &pDir)) {
3!
567
        taosMemFree(pDir);
×
568
        TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
569
      }
570
    } else if (!taosArrayPush(pMountInfo->pDisks[pDisk->level], &pDir)) {
30!
571
      taosMemFree(pDir);
×
572
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
573
    }
574
  }
575
  for (int32_t i = 0; i < TFS_MAX_TIERS; ++i) {
12✔
576
    int32_t nDisk = taosArrayGetSize(pMountInfo->pDisks[i]);
9✔
577
    if (nDisk < (i == 0 ? 1 : 0) || nDisk > TFS_MAX_DISKS_PER_TIER) {
9!
578
      dError("mount:%s, invalid disk number:%d at level:%d", pReq->mountName, nDisk, i);
×
579
      TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
580
    }
581
  }
582
_exit:
3✔
583
  if (content != NULL) taosMemoryFreeClear(content);
3!
584
  if (pJson != NULL) cJSON_Delete(pJson);
3!
585
  if (pFile != NULL) taosCloseFile(&pFile);
3!
586
  if (pDisks != NULL) {
3!
587
    taosArrayDestroy(pDisks);
3✔
588
    pDisks = NULL;
3✔
589
  }
590
  if (code != 0) {
3!
591
    dError("mount:%s, failed to retrieve mount dnode at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
592
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
593
  } else {
594
    dInfo("mount:%s, success to retrieve mount dnode on dnode:%d, clusterId:%" PRId64 ", path:%s", pReq->mountName,
3!
595
          pReq->dnodeId, pMountInfo->clusterId, pReq->mountPath);
596
  }
597
  TAOS_RETURN(code);
3✔
598
}
599

600
static int32_t vmRetrieveMountVnodes(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
3✔
601
  int32_t       code = 0, lino = 0;
3✔
602
  SWrapperCfg  *pCfgs = NULL;
3✔
603
  int32_t       numOfVnodes = 0;
3✔
604
  char          path[TSDB_MOUNT_FPATH_LEN] = {0};
3✔
605
  TdDirPtr      pDir = NULL;
3✔
606
  TdDirEntryPtr de = NULL;
3✔
607
  SVnodeMgmt    vnodeMgmt = {0};
3✔
608
  SArray       *pVgCfgs = NULL;
3✔
609
  SArray       *pDbInfos = NULL;
3✔
610
  SArray       *pDiskPrimarys = NULL;
3✔
611

612
  snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
3✔
613
  vnodeMgmt.path = path;
3✔
614
  TAOS_CHECK_EXIT(vmGetVnodeListFromFile(&vnodeMgmt, &pCfgs, &numOfVnodes));
3!
615
  dInfo("mount:%s, num of vnodes is %d in path:%s", pReq->mountName, numOfVnodes, vnodeMgmt.path);
3!
616
  TSDB_CHECK_NULL((pVgCfgs = taosArrayInit_s(sizeof(SVnodeInfo), numOfVnodes)), code, lino, _exit, terrno);
3!
617
  TSDB_CHECK_NULL((pDiskPrimarys = taosArrayInit(numOfVnodes, sizeof(SMountDbVgId))), code, lino, _exit, terrno);
3!
618

619
  int32_t nDiskLevel0 = taosArrayGetSize(pMountInfo->pDisks[0]);
3✔
620
  int32_t nVgDropped = 0, j = 0;
3✔
621
  for (int32_t i = 0; i < numOfVnodes; ++i) {
15✔
622
    SWrapperCfg *pCfg = &pCfgs[i];
12✔
623
    // in order to support multi-tier disk, the pCfg->path should be adapted according to the diskPrimary firstly
624
    if (nDiskLevel0 > 1) {
12!
625
      char *pDir = taosArrayGet(pMountInfo->pDisks[0], pCfg->diskPrimary);
12✔
626
      if (!pDir) TAOS_CHECK_EXIT(TSDB_CODE_INTERNAL_ERROR);
12!
627
      (void)snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%svnode%d", *(char **)pDir, TD_DIRSEP, TD_DIRSEP,
12✔
628
                     pCfg->vgId);
629
    }
630
    dInfo("mount:%s, vnode path:%s, dropped:%" PRIi8, pReq->mountName, pCfg->path, pCfg->dropped);
12!
631
    if (pCfg->dropped) {
12!
632
      ++nVgDropped;
×
633
      continue;
×
634
    }
635
    if (!taosCheckAccessFile(pCfg->path, TD_FILE_ACCESS_EXIST_OK | TD_FILE_ACCESS_READ_OK | TD_FILE_ACCESS_WRITE_OK)) {
12!
636
      dError("mount:%s, vnode path:%s, no r/w authority", pReq->mountName, pCfg->path);
×
637
      TAOS_CHECK_EXIT(TSDB_CODE_MND_NO_RIGHTS);
×
638
    }
639
    SVnodeInfo *pInfo = TARRAY_GET_ELEM(pVgCfgs, j++);
12✔
640
    TAOS_CHECK_EXIT(vnodeLoadInfo(pCfg->path, pInfo));
12!
641
    if (pInfo->config.syncCfg.replicaNum > 1) {
12!
642
      dError("mount:%s, vnode path:%s, invalid replica:%d", pReq->mountName, pCfg->path,
×
643
             pInfo->config.syncCfg.replicaNum);
644
      TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_REPLICA);
×
645
    } else if (pInfo->config.vgId != pCfg->vgId) {
12!
646
      dError("mount:%s, vnode path:%s, vgId:%d not match:%d", pReq->mountName, pCfg->path, pInfo->config.vgId,
×
647
             pCfg->vgId);
648
      TAOS_CHECK_EXIT(TSDB_CODE_FILE_CORRUPTED);
×
649
    } else if (pInfo->config.tdbEncryptAlgorithm || pInfo->config.tsdbCfg.encryptAlgorithm ||
12!
650
               pInfo->config.walCfg.encryptAlgorithm) {
12!
651
      dError("mount:%s, vnode path:%s, invalid encrypt algorithm, tdb:%d wal:%d tsdb:%d", pReq->mountName, pCfg->path,
×
652
             pInfo->config.tdbEncryptAlgorithm, pInfo->config.walCfg.encryptAlgorithm,
653
             pInfo->config.tsdbCfg.encryptAlgorithm);
654
      TAOS_CHECK_EXIT(TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG);
×
655
    }
656
    SMountDbVgId dbVgId = {.dbId = pInfo->config.dbId, .vgId = pInfo->config.vgId, .diskPrimary = pCfg->diskPrimary};
12✔
657
    TSDB_CHECK_NULL(taosArrayPush(pDiskPrimarys, &dbVgId), code, lino, _exit, terrno);
12!
658
  }
659
  if (nVgDropped > 0) {
3!
660
    dInfo("mount:%s, %d vnodes are dropped", pReq->mountName, nVgDropped);
×
661
    int32_t nVgToDrop = taosArrayGetSize(pVgCfgs) - nVgDropped;
×
662
    if (nVgToDrop > 0) taosArrayRemoveBatch(pVgCfgs, nVgToDrop - 1, nVgToDrop, NULL);
×
663
  }
664
  int32_t nVgCfg = taosArrayGetSize(pVgCfgs);
3✔
665
  int32_t nDiskPrimary = taosArrayGetSize(pDiskPrimarys);
3✔
666
  if (nVgCfg != nDiskPrimary) {
3!
667
    dError("mount:%s, nVgCfg:%d not match nDiskPrimary:%d", pReq->mountName, nVgCfg, nDiskPrimary);
×
668
    TAOS_CHECK_EXIT(TSDB_CODE_APP_ERROR);
×
669
  }
670
  if (nVgCfg > 1) {
3!
671
    taosArraySort(pVgCfgs, compareVnodeInfo);
3✔
672
    taosArraySort(pDiskPrimarys, compareVgDiskPrimary);
3✔
673
  }
674

675
  int64_t clusterId = pMountInfo->clusterId;
3✔
676
  int64_t dbId = 0, vgId = 0, nDb = 0;
3✔
677
  for (int32_t i = 0; i < nVgCfg; ++i) {
11✔
678
    SVnodeInfo *pInfo = TARRAY_GET_ELEM(pVgCfgs, i);
9✔
679
    if (clusterId != pInfo->config.syncCfg.nodeInfo->clusterId) {
9✔
680
      dError("mount:%s, clusterId:%" PRId64 " not match:%" PRId64, pReq->mountName, clusterId,
1!
681
             pInfo->config.syncCfg.nodeInfo->clusterId);
682
      TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_CLUSTER_ID);
1!
683
    }
684
    if (dbId != pInfo->config.dbId) {
8✔
685
      dbId = pInfo->config.dbId;
4✔
686
      ++nDb;
4✔
687
    }
688
    if (vgId == pInfo->config.vgId) {
8!
689
      TAOS_CHECK_EXIT(TSDB_CODE_FILE_CORRUPTED);
×
690
    } else {
691
      vgId = pInfo->config.vgId;
8✔
692
    }
693
  }
694

695
  if (nDb > 0) {
2!
696
    TSDB_CHECK_NULL((pDbInfos = taosArrayInit_s(sizeof(SMountDbInfo), nDb)), code, lino, _exit, terrno);
2!
697
    int32_t dbIdx = -1;
2✔
698
    for (int32_t i = 0; i < nVgCfg; ++i) {
10✔
699
      SVnodeInfo   *pVgCfg = TARRAY_GET_ELEM(pVgCfgs, i);
8✔
700
      SMountDbVgId *pDiskPrimary = TARRAY_GET_ELEM(pDiskPrimarys, i);
8✔
701
      SMountDbInfo *pDbInfo = NULL;
8✔
702
      if (i == 0 || ((SMountDbInfo *)TARRAY_GET_ELEM(pDbInfos, dbIdx))->dbId != pVgCfg->config.dbId) {
8✔
703
        pDbInfo = TARRAY_GET_ELEM(pDbInfos, ++dbIdx);
4✔
704
        pDbInfo->dbId = pVgCfg->config.dbId;
4✔
705
        snprintf(pDbInfo->dbName, sizeof(pDbInfo->dbName), "%s", pVgCfg->config.dbname);
4✔
706
        TSDB_CHECK_NULL((pDbInfo->pVgs = taosArrayInit(nVgCfg / nDb, sizeof(SMountVgInfo))), code, lino, _exit, terrno);
4!
707
      } else {
708
        pDbInfo = TARRAY_GET_ELEM(pDbInfos, dbIdx);
4✔
709
      }
710
      SMountVgInfo vgInfo = {
8✔
711
          .diskPrimary = pDiskPrimary->diskPrimary,
8✔
712
          .vgId = pVgCfg->config.vgId,
8✔
713
          .dbId = pVgCfg->config.dbId,
8✔
714
          .cacheLastSize = pVgCfg->config.cacheLastSize,
8✔
715
          .szPage = pVgCfg->config.szPage,
8✔
716
          .szCache = pVgCfg->config.szCache,
8✔
717
          .szBuf = pVgCfg->config.szBuf,
8✔
718
          .cacheLast = pVgCfg->config.cacheLast,
8✔
719
          .standby = pVgCfg->config.standby,
8✔
720
          .hashMethod = pVgCfg->config.hashMethod,
8✔
721
          .hashBegin = pVgCfg->config.hashBegin,
8✔
722
          .hashEnd = pVgCfg->config.hashEnd,
8✔
723
          .hashPrefix = pVgCfg->config.hashPrefix,
8✔
724
          .hashSuffix = pVgCfg->config.hashSuffix,
8✔
725
          .sttTrigger = pVgCfg->config.sttTrigger,
8✔
726
          .replications = pVgCfg->config.syncCfg.replicaNum,
8✔
727
          .precision = pVgCfg->config.tsdbCfg.precision,
8✔
728
          .compression = pVgCfg->config.tsdbCfg.compression,
8✔
729
          .slLevel = pVgCfg->config.tsdbCfg.slLevel,
8✔
730
          .daysPerFile = pVgCfg->config.tsdbCfg.days,
8✔
731
          .keep0 = pVgCfg->config.tsdbCfg.keep0,
8✔
732
          .keep1 = pVgCfg->config.tsdbCfg.keep1,
8✔
733
          .keep2 = pVgCfg->config.tsdbCfg.keep2,
8✔
734
          .keepTimeOffset = pVgCfg->config.tsdbCfg.keepTimeOffset,
8✔
735
          .minRows = pVgCfg->config.tsdbCfg.minRows,
8✔
736
          .maxRows = pVgCfg->config.tsdbCfg.maxRows,
8✔
737
          .tsdbPageSize = pVgCfg->config.tsdbPageSize / 1024,
8✔
738
          .ssChunkSize = pVgCfg->config.ssChunkSize,
8✔
739
          .ssKeepLocal = pVgCfg->config.ssKeepLocal,
8✔
740
          .ssCompact = pVgCfg->config.ssCompact,
8✔
741
          .walFsyncPeriod = pVgCfg->config.walCfg.fsyncPeriod,
8✔
742
          .walRetentionPeriod = pVgCfg->config.walCfg.retentionPeriod,
8✔
743
          .walRollPeriod = pVgCfg->config.walCfg.rollPeriod,
8✔
744
          .walRetentionSize = pVgCfg->config.walCfg.retentionSize,
8✔
745
          .walSegSize = pVgCfg->config.walCfg.segSize,
8✔
746
          .walLevel = pVgCfg->config.walCfg.level,
8✔
747
          .encryptAlgorithm = pVgCfg->config.walCfg.encryptAlgorithm,
8✔
748
          .committed = pVgCfg->state.committed,
8✔
749
          .commitID = pVgCfg->state.commitID,
8✔
750
          .commitTerm = pVgCfg->state.commitTerm,
8✔
751
          .numOfSTables = pVgCfg->config.vndStats.numOfSTables,
8✔
752
          .numOfCTables = pVgCfg->config.vndStats.numOfCTables,
8✔
753
          .numOfNTables = pVgCfg->config.vndStats.numOfNTables,
8✔
754
      };
755
      TSDB_CHECK_NULL(taosArrayPush(pDbInfo->pVgs, &vgInfo), code, lino, _exit, terrno);
16!
756
    }
757
  }
758

759
  pMountInfo->pDbs = pDbInfos;
2✔
760

761
_exit:
3✔
762
  if (code != 0) {
3✔
763
    dError("mount:%s, failed to retrieve mount vnode at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
1!
764
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
765
  }
766
  taosArrayDestroy(pDiskPrimarys);
3✔
767
  taosArrayDestroy(pVgCfgs);
3✔
768
  taosMemoryFreeClear(pCfgs);
3!
769
  TAOS_RETURN(code);
3✔
770
}
771

772
/**
773
 *   Retrieve the stables from vnode meta.
774
 */
775
static int32_t vmRetrieveMountStbs(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
2✔
776
  int32_t code = 0, lino = 0;
2✔
777
  char    path[TSDB_MOUNT_FPATH_LEN] = {0};
2✔
778
  int32_t nDb = taosArrayGetSize(pMountInfo->pDbs);
2✔
779
  SArray *suidList = NULL;
2✔
780
  SArray *pCols = NULL;
2✔
781
  SArray *pTags = NULL;
2✔
782
  SArray *pColExts = NULL;
2✔
783
  SArray *pTagExts = NULL;
2✔
784

785
  snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
2✔
786
  for (int32_t i = 0; i < nDb; ++i) {
6✔
787
    SMountDbInfo *pDbInfo = TARRAY_GET_ELEM(pMountInfo->pDbs, i);
4✔
788
    int32_t       nVg = taosArrayGetSize(pDbInfo->pVgs);
4✔
789
    for (int32_t j = 0; j < nVg; ++j) {
4!
790
      SMountVgInfo *pVgInfo = TARRAY_GET_ELEM(pDbInfo->pVgs, j);
4✔
791
      SVnode        vnode = {
4✔
792
                 .config.vgId = pVgInfo->vgId,
4✔
793
                 .config.dbId = pVgInfo->dbId,
4✔
794
                 .config.cacheLastSize = pVgInfo->cacheLastSize,
4✔
795
                 .config.szPage = pVgInfo->szPage,
4✔
796
                 .config.szCache = pVgInfo->szCache,
4✔
797
                 .config.szBuf = pVgInfo->szBuf,
4✔
798
                 .config.cacheLast = pVgInfo->cacheLast,
4✔
799
                 .config.standby = pVgInfo->standby,
4✔
800
                 .config.hashMethod = pVgInfo->hashMethod,
4✔
801
                 .config.hashBegin = pVgInfo->hashBegin,
4✔
802
                 .config.hashEnd = pVgInfo->hashEnd,
4✔
803
                 .config.hashPrefix = pVgInfo->hashPrefix,
4✔
804
                 .config.hashSuffix = pVgInfo->hashSuffix,
4✔
805
                 .config.sttTrigger = pVgInfo->sttTrigger,
4✔
806
                 .config.syncCfg.replicaNum = pVgInfo->replications,
4✔
807
                 .config.tsdbCfg.precision = pVgInfo->precision,
4✔
808
                 .config.tsdbCfg.compression = pVgInfo->compression,
4✔
809
                 .config.tsdbCfg.slLevel = pVgInfo->slLevel,
4✔
810
                 .config.tsdbCfg.days = pVgInfo->daysPerFile,
4✔
811
                 .config.tsdbCfg.keep0 = pVgInfo->keep0,
4✔
812
                 .config.tsdbCfg.keep1 = pVgInfo->keep1,
4✔
813
                 .config.tsdbCfg.keep2 = pVgInfo->keep2,
4✔
814
                 .config.tsdbCfg.keepTimeOffset = pVgInfo->keepTimeOffset,
4✔
815
                 .config.tsdbCfg.minRows = pVgInfo->minRows,
4✔
816
                 .config.tsdbCfg.maxRows = pVgInfo->maxRows,
4✔
817
                 .config.tsdbPageSize = pVgInfo->tsdbPageSize,
4✔
818
                 .config.ssChunkSize = pVgInfo->ssChunkSize,
4✔
819
                 .config.ssKeepLocal = pVgInfo->ssKeepLocal,
4✔
820
                 .config.ssCompact = pVgInfo->ssCompact,
4✔
821
                 .config.walCfg.fsyncPeriod = pVgInfo->walFsyncPeriod,
4✔
822
                 .config.walCfg.retentionPeriod = pVgInfo->walRetentionPeriod,
4✔
823
                 .config.walCfg.rollPeriod = pVgInfo->walRollPeriod,
4✔
824
                 .config.walCfg.retentionSize = pVgInfo->walRetentionSize,
4✔
825
                 .config.walCfg.segSize = pVgInfo->walSegSize,
4✔
826
                 .config.walCfg.level = pVgInfo->walLevel,
4✔
827
                 .config.walCfg.encryptAlgorithm = pVgInfo->encryptAlgorithm,
4✔
828
                 .diskPrimary = pVgInfo->diskPrimary,
4✔
829
      };
830
      void *vnodePath = taosArrayGet(pMountInfo->pDisks[0], pVgInfo->diskPrimary);
4✔
831
      snprintf(path, sizeof(path), "%s%s%s%svnode%d", *(char **)vnodePath, TD_DIRSEP, dmNodeName(VNODE), TD_DIRSEP,
4✔
832
               pVgInfo->vgId);
833
      vnode.path = path;
4✔
834

835
      int32_t rollback = vnodeShouldRollback(&vnode);
4✔
836
      if ((code = metaOpen(&vnode, &vnode.pMeta, rollback)) != 0) {
4!
837
        dError("mount:%s, failed to retrieve stbs of vnode:%d for db:%" PRId64 " on dnode:%d since %s, path:%s",
×
838
               pReq->mountName, pVgInfo->vgId, pVgInfo->dbId, pReq->dnodeId, tstrerror(code), path);
839
        TAOS_CHECK_EXIT(code);
×
840
      } else {
841
        dInfo("mount:%s, success to retrieve stbs of vnode:%d for db:%" PRId64 " on dnode:%d, path:%s", pReq->mountName,
4!
842
              pVgInfo->vgId, pVgInfo->dbId, pReq->dnodeId, path);
843

844
        SMetaReader mr = {0};
4✔
845
        tb_uid_t    suid = 0;
4✔
846
        SMeta      *pMeta = vnode.pMeta;
4✔
847

848
        metaReaderDoInit(&mr, pMeta, META_READER_LOCK);
4✔
849
        if (!suidList && !(suidList = taosArrayInit(1, sizeof(tb_uid_t)))) {
4!
850
          TSDB_CHECK_CODE(terrno, lino, _exit0);
×
851
        }
852
        taosArrayClear(suidList);
4✔
853
        TSDB_CHECK_CODE(vnodeGetStbIdList(&vnode, 0, suidList), lino, _exit0);
4!
854
        dInfo("mount:%s, vnode:%d, db:%" PRId64 ", stbs num:%d on dnode:%d", pReq->mountName, pVgInfo->vgId,
4!
855
              pVgInfo->dbId, (int32_t)taosArrayGetSize(suidList), pReq->dnodeId);
856
        int32_t nStbs = taosArrayGetSize(suidList);
4✔
857
        if (!pDbInfo->pStbs && !(pDbInfo->pStbs = taosArrayInit(nStbs, sizeof(void *)))) {
4!
858
          TSDB_CHECK_CODE(terrno, lino, _exit0);
×
859
        }
860
        for (int32_t i = 0; i < nStbs; ++i) {
16✔
861
          suid = *(tb_uid_t *)taosArrayGet(suidList, i);
12✔
862
          dInfo("mount:%s, vnode:%d, db:%" PRId64 ", stb suid:%" PRIu64 " on dnode:%d", pReq->mountName, pVgInfo->vgId,
12!
863
                pVgInfo->dbId, suid, pReq->dnodeId);
864
          if ((code = metaReaderGetTableEntryByUidCache(&mr, suid)) < 0) {
12!
865
            TSDB_CHECK_CODE(code, lino, _exit0);
×
866
          }
867
          if (mr.me.uid != suid || mr.me.type != TSDB_SUPER_TABLE ||
12!
868
              mr.me.colCmpr.nCols != mr.me.stbEntry.schemaRow.nCols) {
12!
869
            dError("mount:%s, vnode:%d, db:%" PRId64 ", stb info not match, suid:%" PRIu64 " expected:%" PRIu64
×
870
                   ", type:%" PRIi8 " expected:%d, nCmprCols:%d nCols:%d on dnode:%d",
871
                   pReq->mountName, pVgInfo->vgId, pVgInfo->dbId, mr.me.uid, suid, mr.me.type, TSDB_SUPER_TABLE,
872
                   mr.me.colCmpr.nCols, mr.me.stbEntry.schemaRow.nCols, pReq->dnodeId);
873
            TSDB_CHECK_CODE(TSDB_CODE_FILE_CORRUPTED, lino, _exit0);
×
874
          }
875
          SMountStbInfo stbInfo = {
12✔
876
              .req.source = TD_REQ_FROM_APP,
877
              .req.suid = suid,
878
              .req.colVer = mr.me.stbEntry.schemaRow.version,
12✔
879
              .req.tagVer = mr.me.stbEntry.schemaTag.version,
12✔
880
              .req.numOfColumns = mr.me.stbEntry.schemaRow.nCols,
12✔
881
              .req.numOfTags = mr.me.stbEntry.schemaTag.nCols,
12✔
882
              .req.virtualStb = TABLE_IS_VIRTUAL(mr.me.flags) ? 1 : 0,
12✔
883
          };
884
          snprintf(stbInfo.req.name, sizeof(stbInfo.req.name), "%s", mr.me.name);
12✔
885
          if (!pCols && !(pCols = taosArrayInit(stbInfo.req.numOfColumns, sizeof(SFieldWithOptions)))) {
12!
886
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
887
          }
888
          if (!pTags && !(pTags = taosArrayInit(stbInfo.req.numOfTags, sizeof(SField)))) {
12!
889
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
890
          }
891

892
          if (!pColExts && !(pColExts = taosArrayInit(stbInfo.req.numOfColumns, sizeof(col_id_t)))) {
12!
893
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
894
          }
895
          if (!pTagExts && !(pTagExts = taosArrayInit(stbInfo.req.numOfTags, sizeof(col_id_t)))) {
12!
896
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
897
          }
898
          taosArrayClear(pCols);
12✔
899
          taosArrayClear(pTags);
12✔
900
          taosArrayClear(pColExts);
12✔
901
          taosArrayClear(pTagExts);
12✔
902
          stbInfo.req.pColumns = pCols;
12✔
903
          stbInfo.req.pTags = pTags;
12✔
904
          stbInfo.pColExts = pColExts;
12✔
905
          stbInfo.pTagExts = pTagExts;
12✔
906

907
          for (int32_t c = 0; c < stbInfo.req.numOfColumns; ++c) {
72✔
908
            SSchema          *pSchema = mr.me.stbEntry.schemaRow.pSchema + c;
60✔
909
            SColCmpr         *pColComp = mr.me.colCmpr.pColCmpr + c;
60✔
910
            SFieldWithOptions col = {
60✔
911
                .type = pSchema->type,
60✔
912
                .flags = pSchema->flags,
60✔
913
                .bytes = pSchema->bytes,
60✔
914
                .compress = pColComp->alg,
60✔
915
            };
916
            (void)snprintf(col.name, sizeof(col.name), "%s", pSchema->name);
60✔
917
            if (pSchema->colId != pColComp->id) {
60!
918
              TSDB_CHECK_CODE(TSDB_CODE_FILE_CORRUPTED, lino, _exit0);
×
919
            }
920
            if (mr.me.pExtSchemas) {
60!
921
              col.typeMod = (mr.me.pExtSchemas + c)->typeMod;
×
922
            }
923
            TSDB_CHECK_NULL(taosArrayPush(pCols, &col), code, lino, _exit0, terrno);
60!
924
            TSDB_CHECK_NULL(taosArrayPush(pColExts, &pSchema->colId), code, lino, _exit0, terrno);
120!
925
          }
926
          for (int32_t t = 0; t < stbInfo.req.numOfTags; ++t) {
28✔
927
            SSchema *pSchema = mr.me.stbEntry.schemaTag.pSchema + t;
16✔
928
            SField   tag = {
16✔
929
                  .type = pSchema->type,
16✔
930
                  .flags = pSchema->flags,
16✔
931
                  .bytes = pSchema->bytes,
16✔
932
            };
933
            (void)snprintf(tag.name, sizeof(tag.name), "%s", pSchema->name);
16✔
934
            TSDB_CHECK_NULL(taosArrayPush(pTags, &tag), code, lino, _exit0, terrno);
16!
935
            TSDB_CHECK_NULL(taosArrayPush(pTagExts, &pSchema->colId), code, lino, _exit0, terrno);
32!
936
          }
937
          tDecoderClear(&mr.coder);
12✔
938

939
          // serialize the SMountStbInfo
940
          int32_t firstPartLen = 0;
12✔
941
          int32_t msgLen = tSerializeSMountStbInfo(NULL, 0, &firstPartLen, &stbInfo);
12✔
942
          if (msgLen <= 0) {
12!
943
            TSDB_CHECK_CODE(msgLen < 0 ? msgLen : TSDB_CODE_INTERNAL_ERROR, lino, _exit0);
×
944
          }
945
          void *pBuf = taosMemoryMalloc((sizeof(int32_t) << 1) + msgLen);  // totalLen(4)|1stPartLen(4)|1stPart|2ndPart
12!
946
          if (!pBuf) TSDB_CHECK_CODE(TSDB_CODE_OUT_OF_MEMORY, lino, _exit0);
12!
947
          *(int32_t *)pBuf = (sizeof(int32_t) << 1) + msgLen;
12✔
948
          *(int32_t *)POINTER_SHIFT(pBuf, sizeof(int32_t)) = firstPartLen;
12✔
949
          if (tSerializeSMountStbInfo(POINTER_SHIFT(pBuf, (sizeof(int32_t) << 1)), msgLen, NULL, &stbInfo) <= 0) {
12!
950
            taosMemoryFree(pBuf);
×
951
            TSDB_CHECK_CODE(msgLen < 0 ? msgLen : TSDB_CODE_INTERNAL_ERROR, lino, _exit0);
×
952
          }
953
          if (!taosArrayPush(pDbInfo->pStbs, &pBuf)) {
24!
954
            taosMemoryFree(pBuf);
×
955
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
956
          }
957
        }
958
      _exit0:
4✔
959
        metaReaderClear(&mr);
4✔
960
        metaClose(&vnode.pMeta);
4✔
961
        TAOS_CHECK_EXIT(code);
4!
962
      }
963
      break;  // retrieve stbs from one vnode is enough
4✔
964
    }
965
  }
966
_exit:
2✔
967
  if (code != 0) {
2!
968
    dError("mount:%s, failed to retrieve mount stbs at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
969
           pReq->dnodeId, tstrerror(code), path);
970
  }
971
  taosArrayDestroy(suidList);
2✔
972
  taosArrayDestroy(pCols);
2✔
973
  taosArrayDestroy(pTags);
2✔
974
  taosArrayDestroy(pColExts);
2✔
975
  taosArrayDestroy(pTagExts);
2✔
976
  TAOS_RETURN(code);
2✔
977
}
978

979
int32_t vmMountCheckRunning(const char *mountName, const char *mountPath, TdFilePtr *pFile, int32_t retryLimit) {
9✔
980
  int32_t code = 0, lino = 0;
9✔
981
  int32_t retryTimes = 0;
9✔
982
  char    filepath[PATH_MAX] = {0};
9✔
983
  (void)snprintf(filepath, sizeof(filepath), "%s%s.running", mountPath, TD_DIRSEP);
9✔
984
  TSDB_CHECK_NULL((*pFile = taosOpenFile(
9!
985
                       filepath, TD_FILE_CREATE | TD_FILE_READ | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_CLOEXEC)),
986
                  code, lino, _exit, terrno);
987
  int32_t ret = 0;
9✔
988
  do {
989
    ret = taosLockFile(*pFile);
11✔
990
    if (ret == 0) break;
11✔
991
    taosMsleep(1000);
3✔
992
    ++retryTimes;
3✔
993
    dError("mount:%s, failed to lock file:%s since %s, retryTimes:%d", mountName, filepath, tstrerror(ret), retryTimes);
3!
994
  } while (retryTimes < retryLimit);
3✔
995
  TAOS_CHECK_EXIT(ret);
9✔
996
_exit:
8✔
997
  if (code != 0) {
9✔
998
    (void)taosCloseFile(pFile);
1✔
999
    *pFile = NULL;
1✔
1000
    dError("mount:%s, failed to check running at line %d since %s, path:%s", mountName, lino, tstrerror(code),
1!
1001
           filepath);
1002
  }
1003
  TAOS_RETURN(code);
9✔
1004
}
1005

1006
static int32_t vmRetrieveMountPreCheck(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
6✔
1007
  int32_t code = 0, lino = 0;
6✔
1008
  char    path[TSDB_MOUNT_FPATH_LEN] = {0};
6✔
1009
  TSDB_CHECK_CONDITION(taosCheckAccessFile(pReq->mountPath, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
6✔
1010
  TAOS_CHECK_EXIT(vmMountCheckRunning(pReq->mountName, pReq->mountPath, &pMountInfo->pFile, 3));
5✔
1011
  (void)snprintf(path, sizeof(path), "%s%s%s%sdnode.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP);
4✔
1012
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
4✔
1013
  (void)snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(MNODE));
3✔
1014
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
3!
1015
  (void)snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
3✔
1016
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
3!
1017
  (void)snprintf(path, sizeof(path), "%s%s%s%sconfig%slocal.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP,
3✔
1018
           TD_DIRSEP);
1019
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
3!
1020
_exit:
3✔
1021
  if (code != 0) {
6✔
1022
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
3!
1023
           pReq->dnodeId, tstrerror(code), path);
1024
  }
1025
  TAOS_RETURN(code);
6✔
1026
}
1027

1028
static int32_t vmRetrieveMountPathImpl(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, SRetrieveMountPathReq *pReq,
6✔
1029
                                       SMountInfo *pMountInfo) {
1030
  int32_t code = 0, lino = 0;
6✔
1031
  pMountInfo->dnodeId = pReq->dnodeId;
6✔
1032
  pMountInfo->mountUid = pReq->mountUid;
6✔
1033
  (void)tsnprintf(pMountInfo->mountName, sizeof(pMountInfo->mountName), "%s", pReq->mountName);
6✔
1034
  (void)tsnprintf(pMountInfo->mountPath, sizeof(pMountInfo->mountPath), "%s", pReq->mountPath);
6✔
1035
  pMountInfo->ignoreExist = pReq->ignoreExist;
6✔
1036
  pMountInfo->valLen = pReq->valLen;
6✔
1037
  pMountInfo->pVal = pReq->pVal;
6✔
1038
  TAOS_CHECK_EXIT(vmRetrieveMountPreCheck(pMgmt, pReq, pMountInfo));
6✔
1039
  TAOS_CHECK_EXIT(vmRetrieveMountDnode(pMgmt, pReq, pMountInfo));
3!
1040
  TAOS_CHECK_EXIT(vmRetrieveMountVnodes(pMgmt, pReq, pMountInfo));
3✔
1041
  TAOS_CHECK_EXIT(vmRetrieveMountStbs(pMgmt, pReq, pMountInfo));
2!
1042
_exit:
2✔
1043
  if (code != 0) {
6✔
1044
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
4!
1045
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
1046
  }
1047
  TAOS_RETURN(code);
6✔
1048
}
1049

1050
int32_t vmProcessRetrieveMountPathReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
6✔
1051
  int32_t               code = 0, lino = 0;
6✔
1052
  int32_t               rspCode = 0;
6✔
1053
  SVnodeMgmt            vndMgmt = {0};
6✔
1054
  SMountInfo            mountInfo = {0};
6✔
1055
  void                 *pBuf = NULL;
6✔
1056
  int32_t               bufLen = 0;
6✔
1057
  SRetrieveMountPathReq req = {0};
6✔
1058

1059
  vndMgmt = *pMgmt;
6✔
1060
  vndMgmt.path = NULL;
6✔
1061
  TAOS_CHECK_GOTO(tDeserializeSRetrieveMountPathReq(pMsg->pCont, pMsg->contLen, &req), &lino, _end);
6!
1062
  dInfo("mount:%s, start to retrieve path:%s", req.mountName, req.mountPath);
6!
1063
  TAOS_CHECK_GOTO(vmRetrieveMountPathImpl(&vndMgmt, pMsg, &req, &mountInfo), &lino, _end);
6✔
1064
_end:
2✔
1065
  TSDB_CHECK_CONDITION((bufLen = tSerializeSMountInfo(NULL, 0, &mountInfo)) >= 0, rspCode, lino, _exit, bufLen);
6!
1066
  TSDB_CHECK_CONDITION((pBuf = rpcMallocCont(bufLen)), rspCode, lino, _exit, terrno);
6!
1067
  TSDB_CHECK_CONDITION((bufLen = tSerializeSMountInfo(pBuf, bufLen, &mountInfo)) >= 0, rspCode, lino, _exit, bufLen);
6!
1068
  pMsg->info.rsp = pBuf;
6✔
1069
  pMsg->info.rspLen = bufLen;
6✔
1070
_exit:
6✔
1071
  if (rspCode != 0) {
6!
1072
    // corner case: if occurs, the client will not receive the response, and the client should be killed manually
1073
    dError("mount:%s, failed to retrieve mount at line %d since %s, dnode:%d, path:%s", req.mountName, lino,
×
1074
           tstrerror(rspCode), req.dnodeId, req.mountPath);
1075
    rpcFreeCont(pBuf);
×
1076
    code = rspCode;
×
1077
  } else if (code != 0) {
6✔
1078
    // the client would receive the response with error msg
1079
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", req.mountName, lino,
4!
1080
           req.dnodeId, tstrerror(code), req.mountPath);
1081
  } else {
1082
    int32_t nVgs = 0;
2✔
1083
    int32_t nDbs = taosArrayGetSize(mountInfo.pDbs);
2✔
1084
    for (int32_t i = 0; i < nDbs; ++i) {
6✔
1085
      SMountDbInfo *pDb = TARRAY_GET_ELEM(mountInfo.pDbs, i);
4✔
1086
      nVgs += taosArrayGetSize(pDb->pVgs);
4✔
1087
    }
1088
    dInfo("mount:%s, success to retrieve mount, nDbs:%d, nVgs:%d, path:%s", req.mountName, nDbs, nVgs, req.mountPath);
2!
1089
  }
1090
  taosMemFreeClear(vndMgmt.path);
6!
1091
  tFreeMountInfo(&mountInfo, false);
6✔
1092
  TAOS_RETURN(code);
6✔
1093
}
1094

1095
static int32_t vmMountVnode(SVnodeMgmt *pMgmt, const char *path, SVnodeCfg *pCfg, int32_t diskPrimary,
8✔
1096
                            SMountVnodeReq *req, STfs *pMountTfs) {
1097
  int32_t    code = 0;
8✔
1098
  SVnodeInfo info = {0};
8✔
1099
  char       hostDir[TSDB_FILENAME_LEN] = {0};
8✔
1100
  char       mountDir[TSDB_FILENAME_LEN] = {0};
8✔
1101
  char       mountVnode[32] = {0};
8✔
1102

1103
  if ((code = vnodeCheckCfg(pCfg)) < 0) {
8!
1104
    vError("vgId:%d, mount:%s, failed to mount vnode since:%s", pCfg->vgId, req->mountName, tstrerror(code));
×
1105
    return code;
×
1106
  }
1107

1108
  vnodeGetPrimaryDir(path, 0, pMgmt->pTfs, hostDir, TSDB_FILENAME_LEN);
8✔
1109
  if ((code = taosMkDir(hostDir))) {
8!
1110
    vError("vgId:%d, mount:%s, failed to prepare vnode dir since %s, host path: %s", pCfg->vgId, req->mountName,
×
1111
           tstrerror(code), hostDir);
1112
    return code;
×
1113
  }
1114

1115
  info.config = *pCfg;  // copy the config
8✔
1116
  info.state.committed = req->committed;
8✔
1117
  info.state.commitID = req->commitID;
8✔
1118
  info.state.commitTerm = req->commitTerm;
8✔
1119
  info.state.applied = req->committed;
8✔
1120
  info.state.applyTerm = req->commitTerm;
8✔
1121
  info.config.vndStats.numOfSTables = req->numOfSTables;
8✔
1122
  info.config.vndStats.numOfCTables = req->numOfCTables;
8✔
1123
  info.config.vndStats.numOfNTables = req->numOfNTables;
8✔
1124

1125
  SVnodeInfo oldInfo = {0};
8✔
1126
  oldInfo.config = vnodeCfgDefault;
8✔
1127
  if (vnodeLoadInfo(hostDir, &oldInfo) == 0) {
8!
1128
    if (oldInfo.config.dbId != info.config.dbId) {
×
1129
      code = TSDB_CODE_VND_ALREADY_EXIST_BUT_NOT_MATCH;
×
1130
      vError("vgId:%d, mount:%s, vnode config info already exists at %s. oldDbId:%" PRId64 "(%s) at cluster:%" PRId64
×
1131
             ", newDbId:%" PRId64 "(%s) at cluser:%" PRId64 ", code:%s",
1132
             oldInfo.config.vgId, req->mountName, hostDir, oldInfo.config.dbId, oldInfo.config.dbname,
1133
             oldInfo.config.syncCfg.nodeInfo[oldInfo.config.syncCfg.myIndex].clusterId, info.config.dbId,
1134
             info.config.dbname, info.config.syncCfg.nodeInfo[info.config.syncCfg.myIndex].clusterId, tstrerror(code));
1135

1136
    } else {
1137
      vWarn("vgId:%d, mount:%s, vnode config info already exists at %s.", oldInfo.config.vgId, req->mountName, hostDir);
×
1138
    }
1139
    return code;
×
1140
  }
1141

1142
  char hostSubDir[TSDB_FILENAME_LEN] = {0};
8✔
1143
  char mountSubDir[TSDB_FILENAME_LEN] = {0};
8✔
1144
  (void)snprintf(mountVnode, sizeof(mountVnode), "vnode%svnode%d", TD_DIRSEP, req->mountVgId);
8✔
1145
  vnodeGetPrimaryDir(mountVnode, diskPrimary, pMountTfs, mountDir, TSDB_FILENAME_LEN);
8✔
1146
  static const char *vndSubDirs[] = {"meta", "sync", "tq", "tsdb", "wal"};
1147
  for (int32_t i = 0; i < tListLen(vndSubDirs); ++i) {
48✔
1148
    (void)snprintf(hostSubDir, sizeof(hostSubDir), "%s%s%s", hostDir, TD_DIRSEP, vndSubDirs[i]);
40✔
1149
    (void)snprintf(mountSubDir, sizeof(mountSubDir), "%s%s%s", mountDir, TD_DIRSEP, vndSubDirs[i]);
40✔
1150
    if ((code = taosSymLink(mountSubDir, hostSubDir)) != 0) {
40!
1151
      vError("vgId:%d, mount:%s, failed to create vnode symlink %s -> %s since %s", info.config.vgId, req->mountName,
×
1152
             mountSubDir, hostSubDir, tstrerror(code));
1153
      return code;
×
1154
    }
1155
  }
1156
  vInfo("vgId:%d, mount:save vnode config while create", info.config.vgId);
8!
1157
  if ((code = vnodeSaveInfo(hostDir, &info)) < 0 || (code = vnodeCommitInfo(hostDir)) < 0) {
8!
1158
    vError("vgId:%d, mount:%s, failed to save vnode config since %s, mount path: %s", pCfg ? pCfg->vgId : 0,
×
1159
           req->mountName, tstrerror(code), hostDir);
1160
    return code;
×
1161
  }
1162
  vInfo("vgId:%d, mount:%s, vnode is mounted from %s to %s", info.config.vgId, req->mountName, mountDir, hostDir);
8!
1163
  return 0;
8✔
1164
}
1165

1166
int32_t vmProcessMountVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
8✔
1167
  int32_t          code = 0, lino = 0;
8✔
1168
  SMountVnodeReq   req = {0};
8✔
1169
  SCreateVnodeReq *pCreateReq = &req.createReq;
8✔
1170
  SVnodeCfg        vnodeCfg = {0};
8✔
1171
  SWrapperCfg      wrapperCfg = {0};
8✔
1172
  SVnode          *pImpl = NULL;
8✔
1173
  STfs            *pMountTfs = NULL;
8✔
1174
  char             path[TSDB_FILENAME_LEN] = {0};
8✔
1175
  bool             releaseTfs = false;
8✔
1176

1177
  if (tDeserializeSMountVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
8!
1178
    dError("vgId:%d, failed to mount vnode since deserialize request error", pCreateReq->vgId);
×
1179
    return TSDB_CODE_INVALID_MSG;
×
1180
  }
1181

1182
  if (pCreateReq->learnerReplica == 0) {
8!
1183
    pCreateReq->learnerSelfIndex = -1;
8✔
1184
  }
1185
  for (int32_t i = 0; i < pCreateReq->replica; ++i) {
16✔
1186
    dInfo("mount:%s, vgId:%d, replica:%d ep:%s:%u dnode:%d", req.mountName, pCreateReq->vgId, i,
8!
1187
          pCreateReq->replicas[i].fqdn, pCreateReq->replicas[i].port, pCreateReq->replicas[i].id);
1188
  }
1189
  for (int32_t i = 0; i < pCreateReq->learnerReplica; ++i) {
8!
1190
    dInfo("mount:%s, vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.mountName, pCreateReq->vgId, i,
×
1191
          pCreateReq->learnerReplicas[i].fqdn, pCreateReq->learnerReplicas[i].port, pCreateReq->replicas[i].id);
1192
  }
1193

1194
  SReplica *pReplica = NULL;
8✔
1195
  if (pCreateReq->selfIndex != -1) {
8!
1196
    pReplica = &pCreateReq->replicas[pCreateReq->selfIndex];
8✔
1197
  } else {
1198
    pReplica = &pCreateReq->learnerReplicas[pCreateReq->learnerSelfIndex];
×
1199
  }
1200
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
8!
1201
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
8!
1202
    (void)tFreeSMountVnodeReq(&req);
×
1203
    code = TSDB_CODE_INVALID_MSG;
×
1204
    dError("mount:%s, vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode, reason:%s", req.mountName,
×
1205
           pCreateReq->vgId, pReplica->id, pReplica->fqdn, pReplica->port, tstrerror(code));
1206
    return code;
×
1207
  }
1208
  vmGenerateVnodeCfg(pCreateReq, &vnodeCfg);
8✔
1209
  vnodeCfg.mountVgId = req.mountVgId;
8✔
1210
  vmGenerateWrapperCfg(pMgmt, pCreateReq, &wrapperCfg);
8✔
1211
  wrapperCfg.mountId = req.mountId;
8✔
1212

1213
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, pCreateReq->vgId, false);
8✔
1214
  if (pVnode != NULL && (pCreateReq->replica == 1 || !pVnode->failed)) {
8!
1215
    dError("mount:%s, vgId:%d, already exist", req.mountName, pCreateReq->vgId);
×
1216
    (void)tFreeSMountVnodeReq(&req);
×
1217
    vmReleaseVnode(pMgmt, pVnode);
×
1218
    code = TSDB_CODE_VND_ALREADY_EXIST;
×
1219
    return 0;
×
1220
  }
1221
  vmReleaseVnode(pMgmt, pVnode);
8✔
1222

1223
  wrapperCfg.diskPrimary = req.diskPrimary;
8✔
1224
  (void)snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
8✔
1225
  TAOS_CHECK_EXIT(vmAcquireMountTfs(pMgmt, req.mountId, req.mountName, req.mountPath, &pMountTfs));
8!
1226
  releaseTfs = true;
8✔
1227

1228
  TAOS_CHECK_EXIT(vmMountVnode(pMgmt, path, &vnodeCfg, wrapperCfg.diskPrimary, &req, pMountTfs));
8!
1229
  if (!(pImpl = vnodeOpen(path, 0, pMgmt->pTfs, pMountTfs, pMgmt->msgCb, true))) {
8!
1230
    TAOS_CHECK_EXIT(terrno != 0 ? terrno : -1);
×
1231
  }
1232
  if ((code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl)) != 0) {
8!
1233
    TAOS_CHECK_EXIT(terrno != 0 ? terrno : code);
×
1234
  }
1235
  TAOS_CHECK_EXIT(vnodeStart(pImpl));
8!
1236
  TAOS_CHECK_EXIT(vmWriteVnodeListToFile(pMgmt));
8!
1237
  TAOS_CHECK_EXIT(vmWriteMountListToFile(pMgmt));
8!
1238
_exit:
8✔
1239
  vmCleanPrimaryDisk(pMgmt, pCreateReq->vgId);
8✔
1240
  if (code != 0) {
8!
1241
    dError("mount:%s, vgId:%d, msgType:%s, failed at line %d to mount vnode since %s", req.mountName, pCreateReq->vgId,
×
1242
           TMSG_INFO(pMsg->msgType), lino, tstrerror(code));
1243
    vmCloseFailedVnode(pMgmt, pCreateReq->vgId);
×
1244
    vnodeClose(pImpl);
×
1245
    vnodeDestroy(0, path, pMgmt->pTfs, 0);
×
1246
    if (releaseTfs) vmReleaseMountTfs(pMgmt, req.mountId, 1);
×
1247
  } else {
1248
    dInfo("mount:%s, vgId:%d, msgType:%s, success to mount vnode", req.mountName, pCreateReq->vgId,
8!
1249
          TMSG_INFO(pMsg->msgType));
1250
  }
1251

1252
  pMsg->code = code;
8✔
1253
  pMsg->info.rsp = NULL;
8✔
1254
  pMsg->info.rspLen = 0;
8✔
1255

1256
  (void)tFreeSMountVnodeReq(&req);
8✔
1257
  TAOS_RETURN(code);
8✔
1258
}
1259
#endif  // USE_MOUNT
1260

1261
// alter replica doesn't use this, but restore dnode still use this
1262
int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
3,548✔
1263
  SAlterVnodeTypeReq req = {0};
3,548✔
1264
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
3,548!
1265
    terrno = TSDB_CODE_INVALID_MSG;
×
1266
    return -1;
×
1267
  }
1268

1269
  if (req.learnerReplicas == 0) {
1270
    req.learnerSelfIndex = -1;
1271
  }
1272

1273
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process alter-node-type-request", req.vgId,
3,548!
1274
        TMSG_INFO(pMsg->msgType));
1275

1276
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
3,548✔
1277
  if (pVnode == NULL) {
3,548!
1278
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
1279
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1280
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1281
    return -1;
×
1282
  }
1283

1284
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
3,548✔
1285
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
3,548!
1286
  if (role == TAOS_SYNC_ROLE_VOTER) {
3,548!
1287
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
1288
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
1289
    vmReleaseVnode(pMgmt, pVnode);
×
1290
    return -1;
×
1291
  }
1292

1293
  dInfo("vgId:%d, checking node catch up", req.vgId);
3,548!
1294
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
3,548✔
1295
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
3,367✔
1296
    vmReleaseVnode(pMgmt, pVnode);
3,367✔
1297
    return -1;
3,367✔
1298
  }
1299

1300
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
181!
1301

1302
  int32_t vgId = req.vgId;
181✔
1303
  dInfo("vgId:%d, start to alter vnode type replica:%d selfIndex:%d strict:%d changeVersion:%d", vgId, req.replica,
181!
1304
        req.selfIndex, req.strict, req.changeVersion);
1305
  for (int32_t i = 0; i < req.replica; ++i) {
697✔
1306
    SReplica *pReplica = &req.replicas[i];
516✔
1307
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
516!
1308
  }
1309
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
181!
1310
    SReplica *pReplica = &req.learnerReplicas[i];
×
1311
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
×
1312
  }
1313

1314
  if (req.replica <= 0 || (req.selfIndex < 0 && req.learnerSelfIndex < 0) || req.selfIndex >= req.replica ||
181!
1315
      req.learnerSelfIndex >= req.learnerReplica) {
181!
1316
    terrno = TSDB_CODE_INVALID_MSG;
×
1317
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
1318
    vmReleaseVnode(pMgmt, pVnode);
×
1319
    return -1;
×
1320
  }
1321

1322
  SReplica *pReplica = NULL;
181✔
1323
  if (req.selfIndex != -1) {
181!
1324
    pReplica = &req.replicas[req.selfIndex];
181✔
1325
  } else {
1326
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
×
1327
  }
1328

1329
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
181!
1330
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
181!
1331
    terrno = TSDB_CODE_INVALID_MSG;
×
1332
    dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode", vgId, pReplica->id, pReplica->fqdn,
×
1333
           pReplica->port);
1334
    vmReleaseVnode(pMgmt, pVnode);
×
1335
    return -1;
×
1336
  }
1337

1338
  dInfo("vgId:%d, start to close vnode", vgId);
181!
1339
  SWrapperCfg wrapperCfg = {
181✔
1340
      .dropped = pVnode->dropped,
181✔
1341
      .vgId = pVnode->vgId,
181✔
1342
      .vgVersion = pVnode->vgVersion,
181✔
1343
      .diskPrimary = pVnode->diskPrimary,
181✔
1344
  };
1345
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
181✔
1346

1347
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
181✔
1348
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
181✔
1349

1350
  int32_t diskPrimary = wrapperCfg.diskPrimary;
181✔
1351
  char    path[TSDB_FILENAME_LEN] = {0};
181✔
1352
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
181✔
1353

1354
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
181!
1355
  if (vnodeAlterReplica(path, &req, diskPrimary, pMgmt->pTfs) < 0) {
181!
1356
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
1357
    return -1;
×
1358
  }
1359

1360
  dInfo("vgId:%d, begin to open vnode", vgId);
181!
1361
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
181✔
1362
  if (pImpl == NULL) {
181!
1363
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
1364
    return -1;
×
1365
  }
1366

1367
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
181!
1368
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
1369
    return -1;
×
1370
  }
1371

1372
  if (vnodeStart(pImpl) != 0) {
181!
1373
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
1374
    return -1;
×
1375
  }
1376

1377
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process alter-node-type-request, vnode config is altered",
181!
1378
        req.vgId, TMSG_INFO(pMsg->msgType));
1379
  return 0;
181✔
1380
}
1381

1382
int32_t vmProcessCheckLearnCatchupReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
1383
  SCheckLearnCatchupReq req = {0};
×
1384
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
×
1385
    terrno = TSDB_CODE_INVALID_MSG;
×
1386
    return -1;
×
1387
  }
1388

1389
  if (req.learnerReplicas == 0) {
1390
    req.learnerSelfIndex = -1;
1391
  }
1392

1393
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process check-learner-catchup-request", req.vgId,
×
1394
        TMSG_INFO(pMsg->msgType));
1395

1396
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
×
1397
  if (pVnode == NULL) {
×
1398
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
1399
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1400
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1401
    return -1;
×
1402
  }
1403

1404
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
×
1405
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
×
1406
  if (role == TAOS_SYNC_ROLE_VOTER) {
×
1407
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
1408
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
1409
    vmReleaseVnode(pMgmt, pVnode);
×
1410
    return -1;
×
1411
  }
1412

1413
  dInfo("vgId:%d, checking node catch up", req.vgId);
×
1414
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
×
1415
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
×
1416
    vmReleaseVnode(pMgmt, pVnode);
×
1417
    return -1;
×
1418
  }
1419

1420
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
×
1421

1422
  vmReleaseVnode(pMgmt, pVnode);
×
1423

1424
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process check-learner-catchup-request", req.vgId,
×
1425
        TMSG_INFO(pMsg->msgType));
1426

1427
  return 0;
×
1428
}
1429

1430
int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
64✔
1431
  SDisableVnodeWriteReq req = {0};
64✔
1432
  if (tDeserializeSDisableVnodeWriteReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
64!
1433
    terrno = TSDB_CODE_INVALID_MSG;
×
1434
    return -1;
×
1435
  }
1436

1437
  dInfo("vgId:%d, vnode write disable:%d", req.vgId, req.disable);
64!
1438

1439
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
64✔
1440
  if (pVnode == NULL) {
64!
1441
    dError("vgId:%d, failed to disable write since %s", req.vgId, terrstr());
×
1442
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1443
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1444
    return -1;
×
1445
  }
1446

1447
  pVnode->disable = req.disable;
64✔
1448
  vmReleaseVnode(pMgmt, pVnode);
64✔
1449
  return 0;
64✔
1450
}
1451

1452
int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
64✔
1453
  SAlterVnodeHashRangeReq req = {0};
64✔
1454
  if (tDeserializeSAlterVnodeHashRangeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
64!
1455
    terrno = TSDB_CODE_INVALID_MSG;
×
1456
    return -1;
×
1457
  }
1458

1459
  int32_t srcVgId = req.srcVgId;
64✔
1460
  int32_t dstVgId = req.dstVgId;
64✔
1461

1462
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, dstVgId);
64✔
1463
  if (pVnode != NULL) {
64!
1464
    dError("vgId:%d, vnode already exist", dstVgId);
×
1465
    vmReleaseVnode(pMgmt, pVnode);
×
1466
    terrno = TSDB_CODE_VND_ALREADY_EXIST;
×
1467
    return -1;
×
1468
  }
1469

1470
  dInfo("vgId:%d, start to alter vnode hashrange:[%u, %u], dstVgId:%d", req.srcVgId, req.hashBegin, req.hashEnd,
64!
1471
        req.dstVgId);
1472
  pVnode = vmAcquireVnode(pMgmt, srcVgId);
64✔
1473
  if (pVnode == NULL) {
64!
1474
    dError("vgId:%d, failed to alter hashrange since %s", srcVgId, terrstr());
×
1475
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1476
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1477
    return -1;
×
1478
  }
1479

1480
  SWrapperCfg wrapperCfg = {
64✔
1481
      .dropped = pVnode->dropped,
64✔
1482
      .vgId = dstVgId,
1483
      .vgVersion = pVnode->vgVersion,
64✔
1484
      .diskPrimary = pVnode->diskPrimary,
64✔
1485
  };
1486
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
64✔
1487

1488
  // prepare alter
1489
  pVnode->toVgId = dstVgId;
64✔
1490
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
64!
1491
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
1492
    return -1;
×
1493
  }
1494

1495
  dInfo("vgId:%d, close vnode", srcVgId);
64!
1496
  vmCloseVnode(pMgmt, pVnode, true, false);
64✔
1497

1498
  int32_t diskPrimary = wrapperCfg.diskPrimary;
64✔
1499
  char    srcPath[TSDB_FILENAME_LEN] = {0};
64✔
1500
  char    dstPath[TSDB_FILENAME_LEN] = {0};
64✔
1501
  snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
64✔
1502
  snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
64✔
1503

1504
  dInfo("vgId:%d, alter vnode hashrange at %s", srcVgId, srcPath);
64!
1505
  if (vnodeAlterHashRange(srcPath, dstPath, &req, diskPrimary, pMgmt->pTfs) < 0) {
64!
1506
    dError("vgId:%d, failed to alter vnode hashrange since %s", srcVgId, terrstr());
×
1507
    return -1;
×
1508
  }
1509

1510
  dInfo("vgId:%d, open vnode", dstVgId);
64!
1511
  SVnode *pImpl = vnodeOpen(dstPath, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
64✔
1512

1513
  if (pImpl == NULL) {
64!
1514
    dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr());
×
1515
    return -1;
×
1516
  }
1517

1518
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
64!
1519
    dError("vgId:%d, failed to open vnode mgmt since %s", dstVgId, terrstr());
×
1520
    return -1;
×
1521
  }
1522

1523
  if (vnodeStart(pImpl) != 0) {
64!
1524
    dError("vgId:%d, failed to start sync since %s", dstVgId, terrstr());
×
1525
    return -1;
×
1526
  }
1527

1528
  // complete alter
1529
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
64!
1530
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
1531
    return -1;
×
1532
  }
1533

1534
  dInfo("vgId:%d, vnode hashrange is altered", dstVgId);
64!
1535
  return 0;
64✔
1536
}
1537

1538
int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
1,234✔
1539
  SAlterVnodeReplicaReq alterReq = {0};
1,234✔
1540
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
1,234!
1541
    terrno = TSDB_CODE_INVALID_MSG;
×
1542
    return -1;
×
1543
  }
1544

1545
  if (alterReq.learnerReplica == 0) {
1,234✔
1546
    alterReq.learnerSelfIndex = -1;
913✔
1547
  }
1548

1549
  int32_t vgId = alterReq.vgId;
1,234✔
1550
  dInfo(
1,234!
1551
      "vgId:%d, vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
1552
      "learnerSelfIndex:%d strict:%d changeVersion:%d",
1553
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
1554
      alterReq.learnerSelfIndex, alterReq.strict, alterReq.changeVersion);
1555

1556
  for (int32_t i = 0; i < alterReq.replica; ++i) {
4,733✔
1557
    SReplica *pReplica = &alterReq.replicas[i];
3,499✔
1558
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
3,499!
1559
  }
1560
  for (int32_t i = 0; i < alterReq.learnerReplica; ++i) {
1,555✔
1561
    SReplica *pReplica = &alterReq.learnerReplicas[i];
321✔
1562
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
321!
1563
  }
1564

1565
  if (alterReq.replica <= 0 || (alterReq.selfIndex < 0 && alterReq.learnerSelfIndex < 0) ||
1,234!
1566
      alterReq.selfIndex >= alterReq.replica || alterReq.learnerSelfIndex >= alterReq.learnerReplica) {
1,234!
1567
    terrno = TSDB_CODE_INVALID_MSG;
×
1568
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
1569
    return -1;
×
1570
  }
1571

1572
  SReplica *pReplica = NULL;
1,234✔
1573
  if (alterReq.selfIndex != -1) {
1,234!
1574
    pReplica = &alterReq.replicas[alterReq.selfIndex];
1,234✔
1575
  } else {
1576
    pReplica = &alterReq.learnerReplicas[alterReq.learnerSelfIndex];
×
1577
  }
1578

1579
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
1,234!
1580
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
1,234!
1581
    terrno = TSDB_CODE_INVALID_MSG;
×
1582
    dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode", vgId, pReplica->id, pReplica->fqdn,
×
1583
           pReplica->port);
1584
    return -1;
×
1585
  }
1586

1587
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
1,234✔
1588
  if (pVnode == NULL) {
1,234!
1589
    dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
×
1590
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1591
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1592
    return -1;
×
1593
  }
1594

1595
  dInfo("vgId:%d, start to close vnode", vgId);
1,234!
1596
  SWrapperCfg wrapperCfg = {
1,234✔
1597
      .dropped = pVnode->dropped,
1,234✔
1598
      .vgId = pVnode->vgId,
1,234✔
1599
      .vgVersion = pVnode->vgVersion,
1,234✔
1600
      .diskPrimary = pVnode->diskPrimary,
1,234✔
1601
  };
1602
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
1,234✔
1603

1604
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
1,234✔
1605
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
1,234✔
1606

1607
  int32_t diskPrimary = wrapperCfg.diskPrimary;
1,234✔
1608
  char    path[TSDB_FILENAME_LEN] = {0};
1,234✔
1609
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
1,234✔
1610

1611
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
1,234!
1612
  if (vnodeAlterReplica(path, &alterReq, diskPrimary, pMgmt->pTfs) < 0) {
1,234!
1613
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
1614
    return -1;
×
1615
  }
1616

1617
  dInfo("vgId:%d, begin to open vnode", vgId);
1,234!
1618
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
1,234✔
1619
  if (pImpl == NULL) {
1,234!
1620
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
1621
    return -1;
×
1622
  }
1623

1624
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
1,234!
1625
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
1626
    return -1;
×
1627
  }
1628

1629
  if (vnodeStart(pImpl) != 0) {
1,234!
1630
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
1631
    return -1;
×
1632
  }
1633

1634
  dInfo(
1,234!
1635
      "vgId:%d, vnode management handle msgType:%s, end to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
1636
      "learnerSelfIndex:%d strict:%d",
1637
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
1638
      alterReq.learnerSelfIndex, alterReq.strict);
1639
  return 0;
1,234✔
1640
}
1641

1642
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
3,748✔
1643
  int32_t       code = 0;
3,748✔
1644
  SDropVnodeReq dropReq = {0};
3,748✔
1645
  if (tDeserializeSDropVnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
3,748!
1646
    terrno = TSDB_CODE_INVALID_MSG;
×
1647
    return terrno;
×
1648
  }
1649

1650
  int32_t vgId = dropReq.vgId;
3,748✔
1651
  dInfo("vgId:%d, start to drop vnode", vgId);
3,748!
1652

1653
  if (dropReq.dnodeId != pMgmt->pData->dnodeId) {
3,748!
1654
    terrno = TSDB_CODE_INVALID_MSG;
×
1655
    dError("vgId:%d, dnodeId:%d not matched with local dnode", dropReq.vgId, dropReq.dnodeId);
×
1656
    return terrno;
×
1657
  }
1658

1659
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, vgId, false);
3,748✔
1660
  if (pVnode == NULL) {
3,748!
1661
    dInfo("vgId:%d, failed to drop since %s", vgId, terrstr());
×
1662
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1663
    return terrno;
×
1664
  }
1665

1666
  pVnode->dropped = 1;
3,748✔
1667
  if ((code = vmWriteVnodeListToFile(pMgmt)) != 0) {
3,748!
1668
    pVnode->dropped = 0;
×
1669
    vmReleaseVnode(pMgmt, pVnode);
×
1670
    return code;
×
1671
  }
1672

1673
  vmCloseVnode(pMgmt, pVnode, false, false);
3,748✔
1674
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
3,748!
1675
    dError("vgId:%d, failed to write vnode list since %s", vgId, terrstr());
×
1676
  }
1677

1678
  dInfo("vgId:%d, is dropped", vgId);
3,748!
1679
  return 0;
3,748✔
1680
}
1681

1682
int32_t vmProcessArbHeartBeatReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
137✔
1683
  SVArbHeartBeatReq arbHbReq = {0};
137✔
1684
  SVArbHeartBeatRsp arbHbRsp = {0};
137✔
1685
  if (tDeserializeSVArbHeartBeatReq(pMsg->pCont, pMsg->contLen, &arbHbReq) != 0) {
137!
1686
    terrno = TSDB_CODE_INVALID_MSG;
×
1687
    return -1;
×
1688
  }
1689

1690
  if (arbHbReq.dnodeId != pMgmt->pData->dnodeId) {
137!
1691
    terrno = TSDB_CODE_INVALID_MSG;
×
1692
    dError("dnodeId:%d not matched with local dnode", arbHbReq.dnodeId);
×
1693
    goto _OVER;
×
1694
  }
1695

1696
  if (strlen(arbHbReq.arbToken) == 0) {
137!
1697
    terrno = TSDB_CODE_INVALID_MSG;
×
1698
    dError("dnodeId:%d arbToken is empty", arbHbReq.dnodeId);
×
1699
    goto _OVER;
×
1700
  }
1701

1702
  size_t size = taosArrayGetSize(arbHbReq.hbMembers);
137✔
1703

1704
  arbHbRsp.dnodeId = pMgmt->pData->dnodeId;
137✔
1705
  tstrncpy(arbHbRsp.arbToken, arbHbReq.arbToken, TSDB_ARB_TOKEN_SIZE);
137✔
1706
  arbHbRsp.hbMembers = taosArrayInit(size, sizeof(SVArbHbRspMember));
137✔
1707
  if (arbHbRsp.hbMembers == NULL) {
137!
1708
    goto _OVER;
×
1709
  }
1710

1711
  for (int32_t i = 0; i < size; i++) {
289✔
1712
    SVArbHbReqMember *pReqMember = taosArrayGet(arbHbReq.hbMembers, i);
152✔
1713
    SVnodeObj        *pVnode = vmAcquireVnode(pMgmt, pReqMember->vgId);
152✔
1714
    if (pVnode == NULL) {
152!
1715
      dError("dnodeId:%d vgId:%d not found failed to process arb hb req", arbHbReq.dnodeId, pReqMember->vgId);
×
1716
      continue;
×
1717
    }
1718

1719
    SVArbHbRspMember rspMember = {0};
152✔
1720
    rspMember.vgId = pReqMember->vgId;
152✔
1721
    rspMember.hbSeq = pReqMember->hbSeq;
152✔
1722
    if (vnodeGetArbToken(pVnode->pImpl, rspMember.memberToken) != 0) {
152!
1723
      dError("dnodeId:%d vgId:%d failed to get arb token", arbHbReq.dnodeId, pReqMember->vgId);
×
1724
      vmReleaseVnode(pMgmt, pVnode);
×
1725
      continue;
×
1726
    }
1727

1728
    if (vnodeUpdateArbTerm(pVnode->pImpl, arbHbReq.arbTerm) != 0) {
152!
1729
      dError("dnodeId:%d vgId:%d failed to update arb term", arbHbReq.dnodeId, pReqMember->vgId);
×
1730
      vmReleaseVnode(pMgmt, pVnode);
×
1731
      continue;
×
1732
    }
1733

1734
    if (taosArrayPush(arbHbRsp.hbMembers, &rspMember) == NULL) {
304!
1735
      dError("dnodeId:%d vgId:%d failed to push arb hb rsp member", arbHbReq.dnodeId, pReqMember->vgId);
×
1736
      vmReleaseVnode(pMgmt, pVnode);
×
1737
      goto _OVER;
×
1738
    }
1739

1740
    vmReleaseVnode(pMgmt, pVnode);
152✔
1741
  }
1742

1743
  SRpcMsg rspMsg = {.info = pMsg->info};
137✔
1744
  int32_t rspLen = tSerializeSVArbHeartBeatRsp(NULL, 0, &arbHbRsp);
137✔
1745
  if (rspLen < 0) {
137!
1746
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1747
    goto _OVER;
×
1748
  }
1749

1750
  void *pRsp = rpcMallocCont(rspLen);
137✔
1751
  if (pRsp == NULL) {
137!
1752
    terrno = terrno;
×
1753
    goto _OVER;
×
1754
  }
1755

1756
  if (tSerializeSVArbHeartBeatRsp(pRsp, rspLen, &arbHbRsp) <= 0) {
137!
1757
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1758
    rpcFreeCont(pRsp);
×
1759
    goto _OVER;
×
1760
  }
1761
  pMsg->info.rsp = pRsp;
137✔
1762
  pMsg->info.rspLen = rspLen;
137✔
1763

1764
  terrno = TSDB_CODE_SUCCESS;
137✔
1765

1766
_OVER:
137✔
1767
  tFreeSVArbHeartBeatReq(&arbHbReq);
137✔
1768
  tFreeSVArbHeartBeatRsp(&arbHbRsp);
137✔
1769
  return terrno;
137✔
1770
}
1771

1772
SArray *vmGetMsgHandles() {
2,732✔
1773
  int32_t code = -1;
2,732✔
1774
  SArray *pArray = taosArrayInit(32, sizeof(SMgmtHandle));
2,732✔
1775
  if (pArray == NULL) goto _OVER;
2,732!
1776

1777
  if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,732!
1778
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,732!
1779
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,732!
1780
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,732!
1781
  if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,732!
1782
  if (dmSetMgmtHandle(pArray, TDMT_VND_EXEC_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,732!
1783
  if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,732!
1784
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,732!
1785
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,732!
1786
  if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,732!
1787
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,732!
1788
  if (dmSetMgmtHandle(pArray, TDMT_VND_VSUBTABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,732!
1789
  if (dmSetMgmtHandle(pArray, TDMT_VND_VSTB_REF_DBS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,732!
1790
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_CFG, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,732!
1791
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,732!
1792
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,732!
1793
  if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,732!
1794
  if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,732!
1795
  if (dmSetMgmtHandle(pArray, TDMT_SCH_TASK_NOTIFY, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,732!
1796
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,732!
1797
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,732!
1798
  if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_TTL_EXPIRED_TBS, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,732!
1799
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,732!
1800
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,732!
1801
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,732!
1802
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,732!
1803
  if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,732!
1804
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,732!
1805
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DELETE_SUB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,732!
1806
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,732!
1807
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SEEK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,732!
1808
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_ADD_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,732!
1809
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DEL_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,732!
1810
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,732!
1811
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME_PUSH, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,732!
1812
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_WALINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,732!
1813
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_COMMITTEDINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,732!
1814
  if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,732!
1815
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,732!
1816
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,732!
1817
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,732!
1818
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,732!
1819
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,732!
1820
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_COMPACT_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,732!
1821
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,732!
1822
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_NAME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,732!
1823
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_SSMIGRATE_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,732!
1824
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_SSMIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,732!
1825

1826
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,732!
1827
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,732!
1828
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,732!
1829
  if (dmSetMgmtHandle(pArray, TDMT_VND_DISABLE_WRITE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,732!
1830
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,732!
1831
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,732!
1832
  if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,732!
1833
  if (dmSetMgmtHandle(pArray, TDMT_VND_SSMIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,732!
1834
  if (dmSetMgmtHandle(pArray, TDMT_VND_FOLLOWER_SSMIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,732!
1835
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
2,732!
1836
  if (dmSetMgmtHandle(pArray, TDMT_DND_MOUNT_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
2,732!
1837
  if (dmSetMgmtHandle(pArray, TDMT_DND_RETRIEVE_MOUNT_PATH, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
2,732!
1838
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,732!
1839
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,732!
1840
  if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,732!
1841
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CONFIG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,732!
1842

1843
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT_ELECTION, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,732!
1844
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,732!
1845
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,732!
1846
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,732!
1847
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,732!
1848
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,732!
1849
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,732!
1850
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,732!
1851
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,732!
1852
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,732!
1853
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,732!
1854
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_FORCE_FOLLOWER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,732!
1855

1856
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
2,732!
1857
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
2,732!
1858
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
2,732!
1859
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
2,732!
1860
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
2,732!
1861

1862
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_HEARTBEAT, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,732!
1863
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_CHECK_SYNC, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,732!
1864
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_ASSIGNED_LEADER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,732!
1865

1866
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_FETCH, vmPutMsgToStreamReaderQueue, 0) == NULL) goto _OVER;
2,732!
1867
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_PULL, vmPutMsgToStreamReaderQueue, 0) == NULL) goto _OVER;
2,732!
1868
  code = 0;
2,732✔
1869

1870
_OVER:
2,732✔
1871
  if (code != 0) {
2,732!
1872
    taosArrayDestroy(pArray);
×
1873
    return NULL;
×
1874
  } else {
1875
    return pArray;
2,732✔
1876
  }
1877
}
1878

1879
void vmUpdateMetricsInfo(SVnodeMgmt *pMgmt, int64_t clusterId) {
×
1880
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
1881

1882
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
1883
  while (pIter) {
×
1884
    SVnodeObj **ppVnode = pIter;
×
1885
    if (ppVnode == NULL || *ppVnode == NULL) {
×
1886
      continue;
×
1887
    }
1888

1889
    SVnodeObj *pVnode = *ppVnode;
×
1890
    if (!pVnode->failed) {
×
1891
      SRawWriteMetrics metrics = {0};
×
1892
      if (vnodeGetRawWriteMetrics(pVnode->pImpl, &metrics) == 0) {
×
1893
        // Add the metrics to the global metrics system with cluster ID
1894
        SName   name = {0};
×
1895
        int32_t code = tNameFromString(&name, pVnode->pImpl->config.dbname, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1896
        if (code < 0) {
×
1897
          dError("failed to get db name since %s", tstrerror(code));
×
1898
          continue;
×
1899
        }
1900
        code = addWriteMetrics(pVnode->vgId, pMgmt->pData->dnodeId, clusterId, tsLocalEp, name.dbname, &metrics);
×
1901
        if (code != TSDB_CODE_SUCCESS) {
×
1902
          dError("Failed to add write metrics for vgId: %d, code: %d", pVnode->vgId, code);
×
1903
        } else {
1904
          // After successfully adding metrics, reset the vnode's write metrics using atomic operations
1905
          if (vnodeResetRawWriteMetrics(pVnode->pImpl, &metrics) != 0) {
×
1906
            dError("Failed to reset write metrics for vgId: %d", pVnode->vgId);
×
1907
          }
1908
        }
1909
      } else {
1910
        dError("Failed to get write metrics for vgId: %d", pVnode->vgId);
×
1911
      }
1912
    }
1913
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
1914
  }
1915

1916
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
1917
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc