• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4513

17 Jul 2025 02:02AM UTC coverage: 31.359% (-31.1%) from 62.446%
#4513

push

travis-ci

web-flow
Merge pull request #31914 from taosdata/fix/3.0/compare-ans-failed

fix:Convert line endings from LF to CRLF for ans file

68541 of 301034 branches covered (22.77%)

Branch coverage included in aggregate %.

117356 of 291771 relevant lines covered (40.22%)

602262.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

15.1
/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "metrics.h"
18
#include "taos_monitor.h"
19
#include "vmInt.h"
20
#include "vnd.h"
21
#include "vnodeInt.h"
22

23
extern taos_counter_t *tsInsertCounter;
24

25
// Forward declaration for function defined in metrics.c
26
extern int32_t addWriteMetrics(int32_t vgId, int32_t dnodeId, int64_t clusterId, const char *dnodeEp,
27
                               const char *dbname, const SRawWriteMetrics *pRawMetrics);
28

29
void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
162✔
30
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad));
162✔
31
  if (pInfo->pVloads == NULL) return;
162!
32

33
  tfsUpdateSize(pMgmt->pTfs);
162✔
34

35
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
162✔
36

37
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
162✔
38
  while (pIter) {
1,054✔
39
    SVnodeObj **ppVnode = pIter;
892✔
40
    if (ppVnode == NULL || *ppVnode == NULL) continue;
892!
41

42
    SVnodeObj *pVnode = *ppVnode;
892✔
43
    SVnodeLoad vload = {.vgId = pVnode->vgId};
892✔
44
    if (!pVnode->failed) {
892!
45
      if (vnodeGetLoad(pVnode->pImpl, &vload) != 0) {
892!
46
        dError("failed to get vnode load");
×
47
      }
48
      if (isReset) vnodeResetLoad(pVnode->pImpl, &vload);
892!
49
    }
50
    if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
1,784!
51
      dError("failed to push vnode load");
×
52
    }
53
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
892✔
54
  }
55

56
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
162✔
57
}
58

59
void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
×
60
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoadLite));
×
61
  if (!pInfo->pVloads) return;
×
62

63
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
64

65
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
66
  while (pIter) {
×
67
    SVnodeObj **ppVnode = pIter;
×
68
    if (ppVnode == NULL || *ppVnode == NULL) continue;
×
69

70
    SVnodeObj *pVnode = *ppVnode;
×
71
    if (!pVnode->failed) {
×
72
      SVnodeLoadLite vload = {0};
×
73
      if (vnodeGetLoadLite(pVnode->pImpl, &vload) == 0) {
×
74
        if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
×
75
          taosArrayDestroy(pInfo->pVloads);
×
76
          pInfo->pVloads = NULL;
×
77
          break;
×
78
        }
79
      }
80
    }
81
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
82
  }
83

84
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
85
}
86

87
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
×
88
  SMonVloadInfo vloads = {0};
×
89
  vmGetVnodeLoads(pMgmt, &vloads, true);
×
90

91
  SArray *pVloads = vloads.pVloads;
×
92
  if (pVloads == NULL) return;
×
93

94
  int32_t totalVnodes = 0;
×
95
  int32_t masterNum = 0;
×
96
  int64_t numOfSelectReqs = 0;
×
97
  int64_t numOfInsertReqs = 0;
×
98
  int64_t numOfInsertSuccessReqs = 0;
×
99
  int64_t numOfBatchInsertReqs = 0;
×
100
  int64_t numOfBatchInsertSuccessReqs = 0;
×
101

102
  for (int32_t i = 0; i < taosArrayGetSize(pVloads); ++i) {
×
103
    SVnodeLoad *pLoad = taosArrayGet(pVloads, i);
×
104
    numOfSelectReqs += pLoad->numOfSelectReqs;
×
105
    numOfInsertReqs += pLoad->numOfInsertReqs;
×
106
    numOfInsertSuccessReqs += pLoad->numOfInsertSuccessReqs;
×
107
    numOfBatchInsertReqs += pLoad->numOfBatchInsertReqs;
×
108
    numOfBatchInsertSuccessReqs += pLoad->numOfBatchInsertSuccessReqs;
×
109
    if (pLoad->syncState == TAOS_SYNC_STATE_LEADER || pLoad->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
×
110
      masterNum++;
×
111
    }
112
    totalVnodes++;
×
113
  }
114

115
  pInfo->vstat.totalVnodes = totalVnodes;
×
116
  pInfo->vstat.masterNum = masterNum;
×
117
  pInfo->vstat.numOfSelectReqs = numOfSelectReqs;
×
118
  pInfo->vstat.numOfInsertReqs = numOfInsertReqs;                          // delta
×
119
  pInfo->vstat.numOfInsertSuccessReqs = numOfInsertSuccessReqs;            // delta
×
120
  pInfo->vstat.numOfBatchInsertReqs = numOfBatchInsertReqs;                // delta
×
121
  pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;  // delta
×
122
  pMgmt->state.totalVnodes = totalVnodes;
×
123
  pMgmt->state.masterNum = masterNum;
×
124
  pMgmt->state.numOfSelectReqs = numOfSelectReqs;
×
125
  pMgmt->state.numOfInsertReqs = numOfInsertReqs;
×
126
  pMgmt->state.numOfInsertSuccessReqs = numOfInsertSuccessReqs;
×
127
  pMgmt->state.numOfBatchInsertReqs = numOfBatchInsertReqs;
×
128
  pMgmt->state.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;
×
129

130
  if (tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs) != 0) {
×
131
    dError("failed to get tfs monitor info");
×
132
  }
133
  taosArrayDestroy(pVloads);
×
134
}
135

136
void vmCleanExpriedSamples(SVnodeMgmt *pMgmt) {
×
137
  int list_size = taos_counter_get_keys_size(tsInsertCounter);
×
138
  if (list_size == 0) return;
×
139
  int32_t *vgroup_ids;
140
  char   **keys;
141
  int      r = 0;
×
142
  r = taos_counter_get_vgroup_ids(tsInsertCounter, &keys, &vgroup_ids, &list_size);
×
143
  if (r) {
×
144
    dError("failed to get vgroup ids");
×
145
    return;
×
146
  }
147
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
148
  for (int i = 0; i < list_size; i++) {
×
149
    int32_t vgroup_id = vgroup_ids[i];
×
150
    void   *vnode = taosHashGet(pMgmt->runngingHash, &vgroup_id, sizeof(int32_t));
×
151
    if (vnode == NULL) {
×
152
      r = taos_counter_delete(tsInsertCounter, keys[i]);
×
153
      if (r) {
×
154
        dError("failed to delete monitor sample key:%s", keys[i]);
×
155
      }
156
    }
157
  }
158
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
159
  if (vgroup_ids) taosMemoryFree(vgroup_ids);
×
160
  if (keys) taosMemoryFree(keys);
×
161
  return;
×
162
}
163

164
void vmCleanExpiredMetrics(SVnodeMgmt *pMgmt) {
×
165
  if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0 || !tsEnableMetrics) {
×
166
    return;
×
167
  }
168

169
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
170
  void     *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
171
  SHashObj *pValidVgroups = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
×
172
  if (pValidVgroups == NULL) {
×
173
    (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
174
    return;
×
175
  }
176

177
  while (pIter != NULL) {
×
178
    SVnodeObj **ppVnode = pIter;
×
179
    if (ppVnode && *ppVnode) {
×
180
      int32_t vgId = (*ppVnode)->vgId;
×
181
      char    dummy = 1;  // hash table value (we only care about the key)
×
182
      if (taosHashPut(pValidVgroups, &vgId, sizeof(int32_t), &dummy, sizeof(char)) != 0) {
×
183
        dError("failed to put vgId:%d to valid vgroups hash", vgId);
×
184
      }
185
    }
186
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
187
  }
188
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
189

190
  // Clean expired metrics by removing metrics for non-existent vgroups
191
  int32_t code = cleanupExpiredMetrics(pValidVgroups);
×
192
  if (code != TSDB_CODE_SUCCESS) {
×
193
    dError("failed to clean expired metrics, code:%d", code);
×
194
  }
195

196
  taosHashCleanup(pValidVgroups);
×
197
}
198

199
static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
40✔
200
  memcpy(pCfg, &vnodeCfgDefault, sizeof(SVnodeCfg));
40✔
201

202
  pCfg->vgId = pCreate->vgId;
40✔
203
  tstrncpy(pCfg->dbname, pCreate->db, sizeof(pCfg->dbname));
40✔
204
  pCfg->dbId = pCreate->dbUid;
40✔
205
  pCfg->szPage = pCreate->pageSize * 1024;
40✔
206
  pCfg->szCache = pCreate->pages;
40✔
207
  pCfg->cacheLast = pCreate->cacheLast;
40✔
208
  pCfg->cacheLastSize = pCreate->cacheLastSize;
40✔
209
  pCfg->szBuf = (uint64_t)pCreate->buffer * 1024 * 1024;
40✔
210
  pCfg->isWeak = true;
40✔
211
  pCfg->isTsma = pCreate->isTsma;
40✔
212
  pCfg->tsdbCfg.compression = pCreate->compression;
40✔
213
  pCfg->tsdbCfg.precision = pCreate->precision;
40✔
214
  pCfg->tsdbCfg.days = pCreate->daysPerFile;
40✔
215
  pCfg->tsdbCfg.keep0 = pCreate->daysToKeep0;
40✔
216
  pCfg->tsdbCfg.keep1 = pCreate->daysToKeep1;
40✔
217
  pCfg->tsdbCfg.keep2 = pCreate->daysToKeep2;
40✔
218
  pCfg->tsdbCfg.keepTimeOffset = pCreate->keepTimeOffset;
40✔
219
  pCfg->tsdbCfg.minRows = pCreate->minRows;
40✔
220
  pCfg->tsdbCfg.maxRows = pCreate->maxRows;
40✔
221
  for (size_t i = 0; i < taosArrayGetSize(pCreate->pRetensions); ++i) {
40!
222
    SRetention *pRetention = &pCfg->tsdbCfg.retentions[i];
×
223
    memcpy(pRetention, taosArrayGet(pCreate->pRetensions, i), sizeof(SRetention));
×
224
    if (i == 0) {
×
225
      if ((pRetention->freq >= 0 && pRetention->keep > 0)) pCfg->isRsma = 1;
×
226
    }
227
  }
228
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
229
  pCfg->tsdbCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
40✔
230
  if (pCfg->tsdbCfg.encryptAlgorithm == DND_CA_SM4) {
40!
231
    tstrncpy(pCfg->tsdbCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
×
232
  }
233
#else
234
  pCfg->tsdbCfg.encryptAlgorithm = 0;
235
#endif
236

237
  pCfg->walCfg.vgId = pCreate->vgId;  // pCreate->mountVgId ? pCreate->mountVgId : pCreate->vgId;
40✔
238
  pCfg->walCfg.fsyncPeriod = pCreate->walFsyncPeriod;
40✔
239
  pCfg->walCfg.retentionPeriod = pCreate->walRetentionPeriod;
40✔
240
  pCfg->walCfg.rollPeriod = pCreate->walRollPeriod;
40✔
241
  pCfg->walCfg.retentionSize = pCreate->walRetentionSize;
40✔
242
  pCfg->walCfg.segSize = pCreate->walSegmentSize;
40✔
243
  pCfg->walCfg.level = pCreate->walLevel;
40✔
244
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
245
  pCfg->walCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
40✔
246
  if (pCfg->walCfg.encryptAlgorithm == DND_CA_SM4) {
40!
247
    tstrncpy(pCfg->walCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
×
248
  }
249
#else
250
  pCfg->walCfg.encryptAlgorithm = 0;
251
#endif
252

253
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
254
  pCfg->tdbEncryptAlgorithm = pCreate->encryptAlgorithm;
40✔
255
  if (pCfg->tdbEncryptAlgorithm == DND_CA_SM4) {
40!
256
    tstrncpy(pCfg->tdbEncryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
×
257
  }
258
#else
259
  pCfg->tdbEncryptAlgorithm = 0;
260
#endif
261

262
  pCfg->sttTrigger = pCreate->sstTrigger;
40✔
263
  pCfg->hashBegin = pCreate->hashBegin;
40✔
264
  pCfg->hashEnd = pCreate->hashEnd;
40✔
265
  pCfg->hashMethod = pCreate->hashMethod;
40✔
266
  pCfg->hashPrefix = pCreate->hashPrefix;
40✔
267
  pCfg->hashSuffix = pCreate->hashSuffix;
40✔
268
  pCfg->tsdbPageSize = pCreate->tsdbPageSize * 1024;
40✔
269

270
  pCfg->ssChunkSize = pCreate->ssChunkSize;
40✔
271
  pCfg->ssKeepLocal = pCreate->ssKeepLocal;
40✔
272
  pCfg->ssCompact = pCreate->ssCompact;
40✔
273

274
  pCfg->standby = 0;
40✔
275
  pCfg->syncCfg.replicaNum = 0;
40✔
276
  pCfg->syncCfg.totalReplicaNum = 0;
40✔
277
  pCfg->syncCfg.changeVersion = pCreate->changeVersion;
40✔
278

279
  memset(&pCfg->syncCfg.nodeInfo, 0, sizeof(pCfg->syncCfg.nodeInfo));
40✔
280
  for (int32_t i = 0; i < pCreate->replica; ++i) {
80✔
281
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
40✔
282
    pNode->nodeId = pCreate->replicas[pCfg->syncCfg.replicaNum].id;
40✔
283
    pNode->nodePort = pCreate->replicas[pCfg->syncCfg.replicaNum].port;
40✔
284
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
40✔
285
    tstrncpy(pNode->nodeFqdn, pCreate->replicas[pCfg->syncCfg.replicaNum].fqdn, TSDB_FQDN_LEN);
40✔
286
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
40✔
287
    pCfg->syncCfg.replicaNum++;
40✔
288
  }
289
  if (pCreate->selfIndex != -1) {
40!
290
    pCfg->syncCfg.myIndex = pCreate->selfIndex;
40✔
291
  }
292
  for (int32_t i = pCfg->syncCfg.replicaNum; i < pCreate->replica + pCreate->learnerReplica; ++i) {
40!
293
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
×
294
    pNode->nodeId = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].id;
×
295
    pNode->nodePort = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].port;
×
296
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
297
    tstrncpy(pNode->nodeFqdn, pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].fqdn, TSDB_FQDN_LEN);
×
298
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
×
299
    pCfg->syncCfg.totalReplicaNum++;
×
300
  }
301
  pCfg->syncCfg.totalReplicaNum += pCfg->syncCfg.replicaNum;
40✔
302
  if (pCreate->learnerSelfIndex != -1) {
40!
303
    pCfg->syncCfg.myIndex = pCreate->replica + pCreate->learnerSelfIndex;
×
304
  }
305
}
40✔
306

307
static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) {
40✔
308
  pCfg->vgId = pCreate->vgId;
40✔
309
  pCfg->vgVersion = pCreate->vgVersion;
40✔
310
  pCfg->dropped = 0;
40✔
311
  snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCreate->vgId);
40✔
312
}
40✔
313

314
static int32_t vmTsmaAdjustDays(SVnodeCfg *pCfg, SCreateVnodeReq *pReq) {
40✔
315
  if (pReq->isTsma) {
40!
316
    SMsgHead *smaMsg = pReq->pTsma;
×
317
    uint32_t  contLen = (uint32_t)(htonl(smaMsg->contLen) - sizeof(SMsgHead));
×
318
    return smaGetTSmaDays(pCfg, POINTER_SHIFT(smaMsg, sizeof(SMsgHead)), contLen, &pCfg->tsdbCfg.days);
×
319
  }
320
  return 0;
40✔
321
}
322

323
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
40✔
324
  SCreateVnodeReq req = {0};
40✔
325
  SVnodeCfg       vnodeCfg = {0};
40✔
326
  SWrapperCfg     wrapperCfg = {0};
40✔
327
  int32_t         code = -1;
40✔
328
  char            path[TSDB_FILENAME_LEN] = {0};
40✔
329

330
  if (tDeserializeSCreateVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
40!
331
    return TSDB_CODE_INVALID_MSG;
×
332
  }
333

334
  if (req.learnerReplica == 0) {
40!
335
    req.learnerSelfIndex = -1;
40✔
336
  }
337

338
  dInfo(
40!
339
      "vgId:%d, vnode management handle msgType:%s, start to create vnode, page:%d pageSize:%d buffer:%d szPage:%d "
340
      "szBuf:%" PRIu64 ", cacheLast:%d cacheLastSize:%d sstTrigger:%d tsdbPageSize:%d %d dbname:%s dbId:%" PRId64
341
      ", days:%d keep0:%d keep1:%d keep2:%d keepTimeOffset%d ssChunkSize:%d ssKeepLocal:%d ssCompact:%d tsma:%d "
342
      "precision:%d compression:%d minRows:%d maxRows:%d"
343
      ", wal fsync:%d level:%d retentionPeriod:%d retentionSize:%" PRId64 " rollPeriod:%d segSize:%" PRId64
344
      ", hash method:%d begin:%u end:%u prefix:%d surfix:%d replica:%d selfIndex:%d "
345
      "learnerReplica:%d learnerSelfIndex:%d strict:%d changeVersion:%d encryptAlgorithm:%d",
346
      req.vgId, TMSG_INFO(pMsg->msgType), req.pages, req.pageSize, req.buffer, req.pageSize * 1024,
347
      (uint64_t)req.buffer * 1024 * 1024, req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize,
348
      req.tsdbPageSize * 1024, req.db, req.dbUid, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
349
      req.keepTimeOffset, req.ssChunkSize, req.ssKeepLocal, req.ssCompact, req.isTsma, req.precision, req.compression,
350
      req.minRows, req.maxRows, req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize,
351
      req.walRollPeriod, req.walSegmentSize, req.hashMethod, req.hashBegin, req.hashEnd, req.hashPrefix, req.hashSuffix,
352
      req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict, req.changeVersion,
353
      req.encryptAlgorithm);
354

355
  for (int32_t i = 0; i < req.replica; ++i) {
80✔
356
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port,
40!
357
          req.replicas[i].id);
358
  }
359
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
40!
360
    dInfo("vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.vgId, i, req.learnerReplicas[i].fqdn,
×
361
          req.learnerReplicas[i].port, req.replicas[i].id);
362
  }
363

364
  SReplica *pReplica = NULL;
40✔
365
  if (req.selfIndex != -1) {
40!
366
    pReplica = &req.replicas[req.selfIndex];
40✔
367
  } else {
368
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
×
369
  }
370
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
40!
371
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
40!
372
    (void)tFreeSCreateVnodeReq(&req);
×
373
    code = TSDB_CODE_INVALID_MSG;
×
374
    dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode, reason:%s", req.vgId, pReplica->id,
×
375
           pReplica->fqdn, pReplica->port, tstrerror(code));
376
    return code;
×
377
  }
378

379
  if (req.encryptAlgorithm == DND_CA_SM4) {
40!
380
    if (strlen(tsEncryptKey) == 0) {
×
381
      (void)tFreeSCreateVnodeReq(&req);
×
382
      code = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
383
      dError("vgId:%d, failed to create vnode since encrypt key is empty, reason:%s", req.vgId, tstrerror(code));
×
384
      return code;
×
385
    }
386
  }
387

388
  vmGenerateVnodeCfg(&req, &vnodeCfg);
40✔
389

390
  if ((code = vmTsmaAdjustDays(&vnodeCfg, &req)) < 0) {
40!
391
    dError("vgId:%d, failed to adjust tsma days since %s", req.vgId, tstrerror(code));
×
392
    goto _OVER;
×
393
  }
394

395
  vmGenerateWrapperCfg(pMgmt, &req, &wrapperCfg);
40✔
396

397
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, req.vgId, false);
40✔
398
  if (pVnode != NULL && (req.replica == 1 || !pVnode->failed)) {
40!
399
    dError("vgId:%d, already exist", req.vgId);
×
400
    (void)tFreeSCreateVnodeReq(&req);
×
401
    vmReleaseVnode(pMgmt, pVnode);
×
402
    code = TSDB_CODE_VND_ALREADY_EXIST;
×
403
    return 0;
×
404
  }
405

406
  int32_t diskPrimary = vmGetPrimaryDisk(pMgmt, vnodeCfg.vgId);
40✔
407
  if (diskPrimary < 0) {
40!
408
    diskPrimary = vmAllocPrimaryDisk(pMgmt, vnodeCfg.vgId);
40✔
409
  }
410
  wrapperCfg.diskPrimary = diskPrimary;
40✔
411

412
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
40✔
413

414
  if ((code = vnodeCreate(path, &vnodeCfg, diskPrimary, pMgmt->pTfs)) < 0) {
40!
415
    dError("vgId:%d, failed to create vnode since %s", req.vgId, tstrerror(code));
×
416
    vmReleaseVnode(pMgmt, pVnode);
×
417
    vmCleanPrimaryDisk(pMgmt, req.vgId);
×
418
    (void)tFreeSCreateVnodeReq(&req);
×
419
    return code;
×
420
  }
421

422
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, true);
40✔
423
  if (pImpl == NULL) {
40!
424
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
425
    code = terrno != 0 ? terrno : -1;
×
426
    goto _OVER;
×
427
  }
428

429
  code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl);
40✔
430
  if (code != 0) {
40!
431
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
432
    code = terrno != 0 ? terrno : code;
×
433
    goto _OVER;
×
434
  }
435

436
  code = vnodeStart(pImpl);
40✔
437
  if (code != 0) {
40!
438
    dError("vgId:%d, failed to start sync since %s", req.vgId, terrstr());
×
439
    goto _OVER;
×
440
  }
441

442
  code = vmWriteVnodeListToFile(pMgmt);
40✔
443
  if (code != 0) {
40!
444
    code = terrno != 0 ? terrno : code;
×
445
    goto _OVER;
×
446
  }
447

448
_OVER:
40✔
449
  vmCleanPrimaryDisk(pMgmt, req.vgId);
40✔
450

451
  if (code != 0) {
40!
452
    vmCloseFailedVnode(pMgmt, req.vgId);
×
453

454
    vnodeClose(pImpl);
×
455
    vnodeDestroy(0, path, pMgmt->pTfs, 0);
×
456
  } else {
457
    dInfo("vgId:%d, vnode management handle msgType:%s, end to create vnode, vnode is created", req.vgId,
40!
458
          TMSG_INFO(pMsg->msgType));
459
  }
460

461
  (void)tFreeSCreateVnodeReq(&req);
40✔
462
  terrno = code;
40✔
463
  return code;
40✔
464
}
465

466
#ifdef USE_MOUNT
467
typedef struct {
468
  int64_t dbId;
469
  int32_t vgId;
470
  int32_t diskPrimary;
471
} SMountDbVgId;
472
extern int32_t vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo);
473
extern int32_t mndFetchSdbStables(const char *mntName, const char *path, void *output);
474

475
static int compareVnodeInfo(const void *p1, const void *p2) {
×
476
  SVnodeInfo *v1 = (SVnodeInfo *)p1;
×
477
  SVnodeInfo *v2 = (SVnodeInfo *)p2;
×
478

479
  if (v1->config.dbId == v2->config.dbId) {
×
480
    if (v1->config.vgId == v2->config.vgId) {
×
481
      return 0;
×
482
    }
483
    return v1->config.vgId > v2->config.vgId ? 1 : -1;
×
484
  }
485

486
  return v1->config.dbId > v2->config.dbId ? 1 : -1;
×
487
}
488
static int compareVgDiskPrimary(const void *p1, const void *p2) {
×
489
  SMountDbVgId *v1 = (SMountDbVgId *)p1;
×
490
  SMountDbVgId *v2 = (SMountDbVgId *)p2;
×
491

492
  if (v1->dbId == v2->dbId) {
×
493
    if (v1->vgId == v2->vgId) {
×
494
      return 0;
×
495
    }
496
    return v1->vgId > v2->vgId ? 1 : -1;
×
497
  }
498

499
  return v1->dbId > v2->dbId ? 1 : -1;
×
500
}
501

502
static int32_t vmRetrieveMountDnode(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
×
503
  int32_t   code = 0, lino = 0;
×
504
  TdFilePtr pFile = NULL;
×
505
  char     *content = NULL;
×
506
  SJson    *pJson = NULL;
×
507
  int64_t   size = 0;
×
508
  int64_t   clusterId = 0, dropped = 0, encryptScope = 0;
×
509
  char      file[TSDB_MOUNT_FPATH_LEN] = {0};
×
510
  SArray   *pDisks = NULL;
×
511
  // step 1: fetch clusterId from dnode.json
512
  (void)snprintf(file, sizeof(file), "%s%s%s%sdnode.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP);
×
513
  TAOS_CHECK_EXIT(taosStatFile(file, &size, NULL, NULL));
×
514
  TSDB_CHECK_NULL((pFile = taosOpenFile(file, TD_FILE_READ)), code, lino, _exit, terrno);
×
515
  TSDB_CHECK_NULL((content = taosMemoryMalloc(size + 1)), code, lino, _exit, terrno);
×
516
  if (taosReadFile(pFile, content, size) != size) {
×
517
    TAOS_CHECK_EXIT(terrno);
×
518
  }
519
  content[size] = '\0';
×
520
  pJson = tjsonParse(content);
×
521
  if (pJson == NULL) {
×
522
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
523
  }
524
  tjsonGetNumberValue(pJson, "dropped", dropped, code);
×
525
  TAOS_CHECK_EXIT(code);
×
526
  if (dropped == 1) {
×
527
    TAOS_CHECK_EXIT(TSDB_CODE_MND_MOUNT_DNODE_DROPPED);
×
528
  }
529
  tjsonGetNumberValue(pJson, "encryptScope", encryptScope, code);
×
530
  TAOS_CHECK_EXIT(code);
×
531
  if (encryptScope != 0) {
×
532
    TAOS_CHECK_EXIT(TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG);
×
533
  }
534
  tjsonGetNumberValue(pJson, "clusterId", clusterId, code);
×
535
  TAOS_CHECK_EXIT(code);
×
536
  if (clusterId == 0) {
×
537
    TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_CLUSTER_ID);
×
538
  }
539
  pMountInfo->clusterId = clusterId;
×
540
  if (content != NULL) taosMemoryFreeClear(content);
×
541
  if (pJson != NULL) {
×
542
    cJSON_Delete(pJson);
×
543
    pJson = NULL;
×
544
  }
545
  if (pFile != NULL) taosCloseFile(&pFile);
×
546
  // step 2: fetch dataDir from dnode/config/local.json
547
  TAOS_CHECK_EXIT(vmGetMountDisks(pMgmt, pReq->mountPath, &pDisks));
×
548
  int32_t nDisks = taosArrayGetSize(pDisks);
×
549
  if (nDisks < 1 || nDisks > TFS_MAX_DISKS) {
×
550
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
551
  }
552
  for (int32_t i = 0; i < nDisks; ++i) {
×
553
    SDiskCfg *pDisk = TARRAY_GET_ELEM(pDisks, i);
×
554
    if (!pMountInfo->pDisks[pDisk->level]) {
×
555
      pMountInfo->pDisks[pDisk->level] = taosArrayInit(1, sizeof(char *));
×
556
      if (!pMountInfo->pDisks[pDisk->level]) {
×
557
        TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
558
      }
559
    }
560
    char *pDir = taosStrdup(pDisk->dir);
×
561
    if (pDir == NULL) {
×
562
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
563
    }
564
    if (pDisk->primary == 1 && taosArrayGetSize(pMountInfo->pDisks[0])) {
×
565
      // put the primary disk to the first position of level 0
566
      if (!taosArrayInsert(pMountInfo->pDisks[0], 0, &pDir)) {
×
567
        taosMemFree(pDir);
×
568
        TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
569
      }
570
    } else if (!taosArrayPush(pMountInfo->pDisks[pDisk->level], &pDir)) {
×
571
      taosMemFree(pDir);
×
572
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
573
    }
574
  }
575
  for (int32_t i = 0; i < TFS_MAX_TIERS; ++i) {
×
576
    int32_t nDisk = taosArrayGetSize(pMountInfo->pDisks[i]);
×
577
    if (nDisk < (i == 0 ? 1 : 0) || nDisk > TFS_MAX_DISKS_PER_TIER) {
×
578
      dError("mount:%s, invalid disk number:%d at level:%d", pReq->mountName, nDisk, i);
×
579
      TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
580
    }
581
  }
582
_exit:
×
583
  if (content != NULL) taosMemoryFreeClear(content);
×
584
  if (pJson != NULL) cJSON_Delete(pJson);
×
585
  if (pFile != NULL) taosCloseFile(&pFile);
×
586
  if (pDisks != NULL) {
×
587
    taosArrayDestroy(pDisks);
×
588
    pDisks = NULL;
×
589
  }
590
  if (code != 0) {
×
591
    dError("mount:%s, failed to retrieve mount dnode at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
592
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
593
  } else {
594
    dInfo("mount:%s, success to retrieve mount dnode on dnode:%d, clusterId:%" PRId64 ", path:%s", pReq->mountName,
×
595
          pReq->dnodeId, pMountInfo->clusterId, pReq->mountPath);
596
  }
597
  TAOS_RETURN(code);
×
598
}
599

600
static int32_t vmRetrieveMountVnodes(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
×
601
  int32_t       code = 0, lino = 0;
×
602
  SWrapperCfg  *pCfgs = NULL;
×
603
  int32_t       numOfVnodes = 0;
×
604
  char          path[TSDB_MOUNT_FPATH_LEN] = {0};
×
605
  TdDirPtr      pDir = NULL;
×
606
  TdDirEntryPtr de = NULL;
×
607
  SVnodeMgmt    vnodeMgmt = {0};
×
608
  SArray       *pVgCfgs = NULL;
×
609
  SArray       *pDbInfos = NULL;
×
610
  SArray       *pDiskPrimarys = NULL;
×
611

612
  snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
×
613
  vnodeMgmt.path = path;
×
614
  TAOS_CHECK_EXIT(vmGetVnodeListFromFile(&vnodeMgmt, &pCfgs, &numOfVnodes));
×
615
  dInfo("mount:%s, num of vnodes is %d in path:%s", pReq->mountName, numOfVnodes, vnodeMgmt.path);
×
616
  TSDB_CHECK_NULL((pVgCfgs = taosArrayInit_s(sizeof(SVnodeInfo), numOfVnodes)), code, lino, _exit, terrno);
×
617
  TSDB_CHECK_NULL((pDiskPrimarys = taosArrayInit(numOfVnodes, sizeof(SMountDbVgId))), code, lino, _exit, terrno);
×
618

619
  int32_t nDiskLevel0 = taosArrayGetSize(pMountInfo->pDisks[0]);
×
620
  int32_t nVgDropped = 0, j = 0;
×
621
  for (int32_t i = 0; i < numOfVnodes; ++i) {
×
622
    SWrapperCfg *pCfg = &pCfgs[i];
×
623
    // in order to support multi-tier disk, the pCfg->path should be adapted according to the diskPrimary firstly
624
    if (nDiskLevel0 > 1) {
×
625
      char *pDir = taosArrayGet(pMountInfo->pDisks[0], pCfg->diskPrimary);
×
626
      if (!pDir) TAOS_CHECK_EXIT(TSDB_CODE_INTERNAL_ERROR);
×
627
      (void)snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%svnode%d", *(char **)pDir, TD_DIRSEP, TD_DIRSEP,
×
628
                     pCfg->vgId);
629
    }
630
    dInfo("mount:%s, vnode path:%s, dropped:%" PRIi8, pReq->mountName, pCfg->path, pCfg->dropped);
×
631
    if (pCfg->dropped) {
×
632
      ++nVgDropped;
×
633
      continue;
×
634
    }
635
    if (!taosCheckAccessFile(pCfg->path, TD_FILE_ACCESS_EXIST_OK | TD_FILE_ACCESS_READ_OK | TD_FILE_ACCESS_WRITE_OK)) {
×
636
      dError("mount:%s, vnode path:%s, no r/w authority", pReq->mountName, pCfg->path);
×
637
      TAOS_CHECK_EXIT(TSDB_CODE_MND_NO_RIGHTS);
×
638
    }
639
    SVnodeInfo *pInfo = TARRAY_GET_ELEM(pVgCfgs, j++);
×
640
    TAOS_CHECK_EXIT(vnodeLoadInfo(pCfg->path, pInfo));
×
641
    if (pInfo->config.syncCfg.replicaNum > 1) {
×
642
      dError("mount:%s, vnode path:%s, invalid replica:%d", pReq->mountName, pCfg->path,
×
643
             pInfo->config.syncCfg.replicaNum);
644
      TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_REPLICA);
×
645
    } else if (pInfo->config.vgId != pCfg->vgId) {
×
646
      dError("mount:%s, vnode path:%s, vgId:%d not match:%d", pReq->mountName, pCfg->path, pInfo->config.vgId,
×
647
             pCfg->vgId);
648
      TAOS_CHECK_EXIT(TSDB_CODE_FILE_CORRUPTED);
×
649
    } else if (pInfo->config.tdbEncryptAlgorithm || pInfo->config.tsdbCfg.encryptAlgorithm ||
×
650
               pInfo->config.walCfg.encryptAlgorithm) {
×
651
      dError("mount:%s, vnode path:%s, invalid encrypt algorithm, tdb:%d wal:%d tsdb:%d", pReq->mountName, pCfg->path,
×
652
             pInfo->config.tdbEncryptAlgorithm, pInfo->config.walCfg.encryptAlgorithm,
653
             pInfo->config.tsdbCfg.encryptAlgorithm);
654
      TAOS_CHECK_EXIT(TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG);
×
655
    }
656
    SMountDbVgId dbVgId = {.dbId = pInfo->config.dbId, .vgId = pInfo->config.vgId, .diskPrimary = pCfg->diskPrimary};
×
657
    TSDB_CHECK_NULL(taosArrayPush(pDiskPrimarys, &dbVgId), code, lino, _exit, terrno);
×
658
  }
659
  if (nVgDropped > 0) {
×
660
    dInfo("mount:%s, %d vnodes are dropped", pReq->mountName, nVgDropped);
×
661
    int32_t nVgToDrop = taosArrayGetSize(pVgCfgs) - nVgDropped;
×
662
    if (nVgToDrop > 0) taosArrayRemoveBatch(pVgCfgs, nVgToDrop - 1, nVgToDrop, NULL);
×
663
  }
664
  int32_t nVgCfg = taosArrayGetSize(pVgCfgs);
×
665
  int32_t nDiskPrimary = taosArrayGetSize(pDiskPrimarys);
×
666
  if (nVgCfg != nDiskPrimary) {
×
667
    dError("mount:%s, nVgCfg:%d not match nDiskPrimary:%d", pReq->mountName, nVgCfg, nDiskPrimary);
×
668
    TAOS_CHECK_EXIT(TSDB_CODE_APP_ERROR);
×
669
  }
670
  if (nVgCfg > 1) {
×
671
    taosArraySort(pVgCfgs, compareVnodeInfo);
×
672
    taosArraySort(pDiskPrimarys, compareVgDiskPrimary);
×
673
  }
674

675
  int64_t clusterId = pMountInfo->clusterId;
×
676
  int64_t dbId = 0, vgId = 0, nDb = 0;
×
677
  for (int32_t i = 0; i < nVgCfg; ++i) {
×
678
    SVnodeInfo *pInfo = TARRAY_GET_ELEM(pVgCfgs, i);
×
679
    if (clusterId != pInfo->config.syncCfg.nodeInfo->clusterId) {
×
680
      dError("mount:%s, clusterId:%" PRId64 " not match:%" PRId64, pReq->mountName, clusterId,
×
681
             pInfo->config.syncCfg.nodeInfo->clusterId);
682
      TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_CLUSTER_ID);
×
683
    }
684
    if (dbId != pInfo->config.dbId) {
×
685
      dbId = pInfo->config.dbId;
×
686
      ++nDb;
×
687
    }
688
    if (vgId == pInfo->config.vgId) {
×
689
      TAOS_CHECK_EXIT(TSDB_CODE_FILE_CORRUPTED);
×
690
    } else {
691
      vgId = pInfo->config.vgId;
×
692
    }
693
  }
694

695
  if (nDb > 0) {
×
696
    TSDB_CHECK_NULL((pDbInfos = taosArrayInit_s(sizeof(SMountDbInfo), nDb)), code, lino, _exit, terrno);
×
697
    int32_t dbIdx = -1;
×
698
    for (int32_t i = 0; i < nVgCfg; ++i) {
×
699
      SVnodeInfo   *pVgCfg = TARRAY_GET_ELEM(pVgCfgs, i);
×
700
      SMountDbVgId *pDiskPrimary = TARRAY_GET_ELEM(pDiskPrimarys, i);
×
701
      SMountDbInfo *pDbInfo = NULL;
×
702
      if (i == 0 || ((SMountDbInfo *)TARRAY_GET_ELEM(pDbInfos, dbIdx))->dbId != pVgCfg->config.dbId) {
×
703
        pDbInfo = TARRAY_GET_ELEM(pDbInfos, ++dbIdx);
×
704
        pDbInfo->dbId = pVgCfg->config.dbId;
×
705
        snprintf(pDbInfo->dbName, sizeof(pDbInfo->dbName), "%s", pVgCfg->config.dbname);
×
706
        TSDB_CHECK_NULL((pDbInfo->pVgs = taosArrayInit(nVgCfg / nDb, sizeof(SMountVgInfo))), code, lino, _exit, terrno);
×
707
      } else {
708
        pDbInfo = TARRAY_GET_ELEM(pDbInfos, dbIdx);
×
709
      }
710
      SMountVgInfo vgInfo = {
×
711
          .diskPrimary = pDiskPrimary->diskPrimary,
×
712
          .vgId = pVgCfg->config.vgId,
×
713
          .dbId = pVgCfg->config.dbId,
×
714
          .cacheLastSize = pVgCfg->config.cacheLastSize,
×
715
          .szPage = pVgCfg->config.szPage,
×
716
          .szCache = pVgCfg->config.szCache,
×
717
          .szBuf = pVgCfg->config.szBuf,
×
718
          .cacheLast = pVgCfg->config.cacheLast,
×
719
          .standby = pVgCfg->config.standby,
×
720
          .hashMethod = pVgCfg->config.hashMethod,
×
721
          .hashBegin = pVgCfg->config.hashBegin,
×
722
          .hashEnd = pVgCfg->config.hashEnd,
×
723
          .hashPrefix = pVgCfg->config.hashPrefix,
×
724
          .hashSuffix = pVgCfg->config.hashSuffix,
×
725
          .sttTrigger = pVgCfg->config.sttTrigger,
×
726
          .replications = pVgCfg->config.syncCfg.replicaNum,
×
727
          .precision = pVgCfg->config.tsdbCfg.precision,
×
728
          .compression = pVgCfg->config.tsdbCfg.compression,
×
729
          .slLevel = pVgCfg->config.tsdbCfg.slLevel,
×
730
          .daysPerFile = pVgCfg->config.tsdbCfg.days,
×
731
          .keep0 = pVgCfg->config.tsdbCfg.keep0,
×
732
          .keep1 = pVgCfg->config.tsdbCfg.keep1,
×
733
          .keep2 = pVgCfg->config.tsdbCfg.keep2,
×
734
          .keepTimeOffset = pVgCfg->config.tsdbCfg.keepTimeOffset,
×
735
          .minRows = pVgCfg->config.tsdbCfg.minRows,
×
736
          .maxRows = pVgCfg->config.tsdbCfg.maxRows,
×
737
          .tsdbPageSize = pVgCfg->config.tsdbPageSize / 1024,
×
738
          .ssChunkSize = pVgCfg->config.ssChunkSize,
×
739
          .ssKeepLocal = pVgCfg->config.ssKeepLocal,
×
740
          .ssCompact = pVgCfg->config.ssCompact,
×
741
          .walFsyncPeriod = pVgCfg->config.walCfg.fsyncPeriod,
×
742
          .walRetentionPeriod = pVgCfg->config.walCfg.retentionPeriod,
×
743
          .walRollPeriod = pVgCfg->config.walCfg.rollPeriod,
×
744
          .walRetentionSize = pVgCfg->config.walCfg.retentionSize,
×
745
          .walSegSize = pVgCfg->config.walCfg.segSize,
×
746
          .walLevel = pVgCfg->config.walCfg.level,
×
747
          .encryptAlgorithm = pVgCfg->config.walCfg.encryptAlgorithm,
×
748
          .committed = pVgCfg->state.committed,
×
749
          .commitID = pVgCfg->state.commitID,
×
750
          .commitTerm = pVgCfg->state.commitTerm,
×
751
          .numOfSTables = pVgCfg->config.vndStats.numOfSTables,
×
752
          .numOfCTables = pVgCfg->config.vndStats.numOfCTables,
×
753
          .numOfNTables = pVgCfg->config.vndStats.numOfNTables,
×
754
      };
755
      TSDB_CHECK_NULL(taosArrayPush(pDbInfo->pVgs, &vgInfo), code, lino, _exit, terrno);
×
756
    }
757
  }
758

759
  pMountInfo->pDbs = pDbInfos;
×
760

761
_exit:
×
762
  if (code != 0) {
×
763
    dError("mount:%s, failed to retrieve mount vnode at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
764
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
765
  }
766
  taosArrayDestroy(pDiskPrimarys);
×
767
  taosArrayDestroy(pVgCfgs);
×
768
  taosMemoryFreeClear(pCfgs);
×
769
  TAOS_RETURN(code);
×
770
}
771

772
/**
773
 *   Retrieve the stables from vnode meta.
774
 */
775
static int32_t vmRetrieveMountStbs(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
×
776
  int32_t code = 0, lino = 0;
×
777
  char    path[TSDB_MOUNT_FPATH_LEN] = {0};
×
778
  int32_t nDb = taosArrayGetSize(pMountInfo->pDbs);
×
779
  SArray *suidList = NULL;
×
780
  SArray *pCols = NULL;
×
781
  SArray *pTags = NULL;
×
782
  SArray *pColExts = NULL;
×
783
  SArray *pTagExts = NULL;
×
784

785
  snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
×
786
  for (int32_t i = 0; i < nDb; ++i) {
×
787
    SMountDbInfo *pDbInfo = TARRAY_GET_ELEM(pMountInfo->pDbs, i);
×
788
    int32_t       nVg = taosArrayGetSize(pDbInfo->pVgs);
×
789
    for (int32_t j = 0; j < nVg; ++j) {
×
790
      SMountVgInfo *pVgInfo = TARRAY_GET_ELEM(pDbInfo->pVgs, j);
×
791
      SVnode        vnode = {
×
792
                 .config.vgId = pVgInfo->vgId,
×
793
                 .config.dbId = pVgInfo->dbId,
×
794
                 .config.cacheLastSize = pVgInfo->cacheLastSize,
×
795
                 .config.szPage = pVgInfo->szPage,
×
796
                 .config.szCache = pVgInfo->szCache,
×
797
                 .config.szBuf = pVgInfo->szBuf,
×
798
                 .config.cacheLast = pVgInfo->cacheLast,
×
799
                 .config.standby = pVgInfo->standby,
×
800
                 .config.hashMethod = pVgInfo->hashMethod,
×
801
                 .config.hashBegin = pVgInfo->hashBegin,
×
802
                 .config.hashEnd = pVgInfo->hashEnd,
×
803
                 .config.hashPrefix = pVgInfo->hashPrefix,
×
804
                 .config.hashSuffix = pVgInfo->hashSuffix,
×
805
                 .config.sttTrigger = pVgInfo->sttTrigger,
×
806
                 .config.syncCfg.replicaNum = pVgInfo->replications,
×
807
                 .config.tsdbCfg.precision = pVgInfo->precision,
×
808
                 .config.tsdbCfg.compression = pVgInfo->compression,
×
809
                 .config.tsdbCfg.slLevel = pVgInfo->slLevel,
×
810
                 .config.tsdbCfg.days = pVgInfo->daysPerFile,
×
811
                 .config.tsdbCfg.keep0 = pVgInfo->keep0,
×
812
                 .config.tsdbCfg.keep1 = pVgInfo->keep1,
×
813
                 .config.tsdbCfg.keep2 = pVgInfo->keep2,
×
814
                 .config.tsdbCfg.keepTimeOffset = pVgInfo->keepTimeOffset,
×
815
                 .config.tsdbCfg.minRows = pVgInfo->minRows,
×
816
                 .config.tsdbCfg.maxRows = pVgInfo->maxRows,
×
817
                 .config.tsdbPageSize = pVgInfo->tsdbPageSize,
×
818
                 .config.ssChunkSize = pVgInfo->ssChunkSize,
×
819
                 .config.ssKeepLocal = pVgInfo->ssKeepLocal,
×
820
                 .config.ssCompact = pVgInfo->ssCompact,
×
821
                 .config.walCfg.fsyncPeriod = pVgInfo->walFsyncPeriod,
×
822
                 .config.walCfg.retentionPeriod = pVgInfo->walRetentionPeriod,
×
823
                 .config.walCfg.rollPeriod = pVgInfo->walRollPeriod,
×
824
                 .config.walCfg.retentionSize = pVgInfo->walRetentionSize,
×
825
                 .config.walCfg.segSize = pVgInfo->walSegSize,
×
826
                 .config.walCfg.level = pVgInfo->walLevel,
×
827
                 .config.walCfg.encryptAlgorithm = pVgInfo->encryptAlgorithm,
×
828
                 .diskPrimary = pVgInfo->diskPrimary,
×
829
      };
830
      void *vnodePath = taosArrayGet(pMountInfo->pDisks[0], pVgInfo->diskPrimary);
×
831
      snprintf(path, sizeof(path), "%s%s%s%svnode%d", *(char **)vnodePath, TD_DIRSEP, dmNodeName(VNODE), TD_DIRSEP,
×
832
               pVgInfo->vgId);
833
      vnode.path = path;
×
834

835
      int32_t rollback = vnodeShouldRollback(&vnode);
×
836
      if ((code = metaOpen(&vnode, &vnode.pMeta, rollback)) != 0) {
×
837
        dError("mount:%s, failed to retrieve stbs of vnode:%d for db:%" PRId64 " on dnode:%d since %s, path:%s",
×
838
               pReq->mountName, pVgInfo->vgId, pVgInfo->dbId, pReq->dnodeId, tstrerror(code), path);
839
        TAOS_CHECK_EXIT(code);
×
840
      } else {
841
        dInfo("mount:%s, success to retrieve stbs of vnode:%d for db:%" PRId64 " on dnode:%d, path:%s", pReq->mountName,
×
842
              pVgInfo->vgId, pVgInfo->dbId, pReq->dnodeId, path);
843

844
        SMetaReader mr = {0};
×
845
        tb_uid_t    suid = 0;
×
846
        SMeta      *pMeta = vnode.pMeta;
×
847

848
        metaReaderDoInit(&mr, pMeta, META_READER_LOCK);
×
849
        if (!suidList && !(suidList = taosArrayInit(1, sizeof(tb_uid_t)))) {
×
850
          TSDB_CHECK_CODE(terrno, lino, _exit0);
×
851
        }
852
        taosArrayClear(suidList);
×
853
        TSDB_CHECK_CODE(vnodeGetStbIdList(&vnode, 0, suidList), lino, _exit0);
×
854
        dInfo("mount:%s, vnode:%d, db:%" PRId64 ", stbs num:%d on dnode:%d", pReq->mountName, pVgInfo->vgId,
×
855
              pVgInfo->dbId, (int32_t)taosArrayGetSize(suidList), pReq->dnodeId);
856
        int32_t nStbs = taosArrayGetSize(suidList);
×
857
        if (!pDbInfo->pStbs && !(pDbInfo->pStbs = taosArrayInit(nStbs, sizeof(void *)))) {
×
858
          TSDB_CHECK_CODE(terrno, lino, _exit0);
×
859
        }
860
        for (int32_t i = 0; i < nStbs; ++i) {
×
861
          suid = *(tb_uid_t *)taosArrayGet(suidList, i);
×
862
          dInfo("mount:%s, vnode:%d, db:%" PRId64 ", stb suid:%" PRIu64 " on dnode:%d", pReq->mountName, pVgInfo->vgId,
×
863
                pVgInfo->dbId, suid, pReq->dnodeId);
864
          if ((code = metaReaderGetTableEntryByUidCache(&mr, suid)) < 0) {
×
865
            TSDB_CHECK_CODE(code, lino, _exit0);
×
866
          }
867
          if (mr.me.uid != suid || mr.me.type != TSDB_SUPER_TABLE ||
×
868
              mr.me.colCmpr.nCols != mr.me.stbEntry.schemaRow.nCols) {
×
869
            dError("mount:%s, vnode:%d, db:%" PRId64 ", stb info not match, suid:%" PRIu64 " expected:%" PRIu64
×
870
                   ", type:%" PRIi8 " expected:%d, nCmprCols:%d nCols:%d on dnode:%d",
871
                   pReq->mountName, pVgInfo->vgId, pVgInfo->dbId, mr.me.uid, suid, mr.me.type, TSDB_SUPER_TABLE,
872
                   mr.me.colCmpr.nCols, mr.me.stbEntry.schemaRow.nCols, pReq->dnodeId);
873
            TSDB_CHECK_CODE(TSDB_CODE_FILE_CORRUPTED, lino, _exit0);
×
874
          }
875
          SMountStbInfo stbInfo = {
×
876
              .req.source = TD_REQ_FROM_APP,
877
              .req.suid = suid,
878
              .req.colVer = mr.me.stbEntry.schemaRow.version,
×
879
              .req.tagVer = mr.me.stbEntry.schemaTag.version,
×
880
              .req.numOfColumns = mr.me.stbEntry.schemaRow.nCols,
×
881
              .req.numOfTags = mr.me.stbEntry.schemaTag.nCols,
×
882
          };
883
          snprintf(stbInfo.req.name, sizeof(stbInfo.req.name), "%s", mr.me.name);
×
884
          if (!pCols && !(pCols = taosArrayInit(stbInfo.req.numOfColumns, sizeof(SFieldWithOptions)))) {
×
885
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
886
          }
887
          if (!pTags && !(pTags = taosArrayInit(stbInfo.req.numOfTags, sizeof(SField)))) {
×
888
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
889
          }
890

891
          if (!pColExts && !(pColExts = taosArrayInit(stbInfo.req.numOfColumns, sizeof(col_id_t)))) {
×
892
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
893
          }
894
          if (!pTagExts && !(pTagExts = taosArrayInit(stbInfo.req.numOfTags, sizeof(col_id_t)))) {
×
895
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
896
          }
897
          taosArrayClear(pCols);
×
898
          taosArrayClear(pTags);
×
899
          taosArrayClear(pColExts);
×
900
          taosArrayClear(pTagExts);
×
901
          stbInfo.req.pColumns = pCols;
×
902
          stbInfo.req.pTags = pTags;
×
903
          stbInfo.pColExts = pColExts;
×
904
          stbInfo.pTagExts = pTagExts;
×
905

906
          for (int32_t c = 0; c < stbInfo.req.numOfColumns; ++c) {
×
907
            SSchema          *pSchema = mr.me.stbEntry.schemaRow.pSchema + c;
×
908
            SColCmpr         *pColComp = mr.me.colCmpr.pColCmpr + c;
×
909
            SFieldWithOptions col = {
×
910
                .type = pSchema->type,
×
911
                .flags = pSchema->flags,
×
912
                .bytes = pSchema->bytes,
×
913
                .compress = pColComp->alg,
×
914
            };
915
            (void)snprintf(col.name, sizeof(col.name), "%s", pSchema->name);
×
916
            if (pSchema->colId != pColComp->id) {
×
917
              TSDB_CHECK_CODE(TSDB_CODE_FILE_CORRUPTED, lino, _exit0);
×
918
            }
919
            if (mr.me.pExtSchemas) {
×
920
              col.typeMod = (mr.me.pExtSchemas + c)->typeMod;
×
921
            }
922
            TSDB_CHECK_NULL(taosArrayPush(pCols, &col), code, lino, _exit0, terrno);
×
923
            TSDB_CHECK_NULL(taosArrayPush(pColExts, &pSchema->colId), code, lino, _exit0, terrno);
×
924
          }
925
          for (int32_t t = 0; t < stbInfo.req.numOfTags; ++t) {
×
926
            SSchema *pSchema = mr.me.stbEntry.schemaTag.pSchema + t;
×
927
            SField   tag = {
×
928
                  .type = pSchema->type,
×
929
                  .flags = pSchema->flags,
×
930
                  .bytes = pSchema->bytes,
×
931
            };
932
            (void)snprintf(tag.name, sizeof(tag.name), "%s", pSchema->name);
×
933
            TSDB_CHECK_NULL(taosArrayPush(pTags, &tag), code, lino, _exit0, terrno);
×
934
            TSDB_CHECK_NULL(taosArrayPush(pTagExts, &pSchema->colId), code, lino, _exit0, terrno);
×
935
          }
936
          tDecoderClear(&mr.coder);
×
937

938
          // serialize the SMountStbInfo
939
          int32_t firstPartLen = 0;
×
940
          int32_t msgLen = tSerializeSMountStbInfo(NULL, 0, &firstPartLen, &stbInfo);
×
941
          if (msgLen <= 0) {
×
942
            TSDB_CHECK_CODE(msgLen < 0 ? msgLen : TSDB_CODE_INTERNAL_ERROR, lino, _exit0);
×
943
          }
944
          void *pBuf = taosMemoryMalloc((sizeof(int32_t) << 1) + msgLen);  // totalLen(4)|1stPartLen(4)|1stPart|2ndPart
×
945
          if (!pBuf) TSDB_CHECK_CODE(TSDB_CODE_OUT_OF_MEMORY, lino, _exit0);
×
946
          *(int32_t *)pBuf = (sizeof(int32_t) << 1) + msgLen;
×
947
          *(int32_t *)POINTER_SHIFT(pBuf, sizeof(int32_t)) = firstPartLen;
×
948
          if (tSerializeSMountStbInfo(POINTER_SHIFT(pBuf, (sizeof(int32_t) << 1)), msgLen, NULL, &stbInfo) <= 0) {
×
949
            taosMemoryFree(pBuf);
×
950
            TSDB_CHECK_CODE(msgLen < 0 ? msgLen : TSDB_CODE_INTERNAL_ERROR, lino, _exit0);
×
951
          }
952
          if (!taosArrayPush(pDbInfo->pStbs, &pBuf)) {
×
953
            taosMemoryFree(pBuf);
×
954
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
955
          }
956
        }
957
      _exit0:
×
958
        metaReaderClear(&mr);
×
959
        metaClose(&vnode.pMeta);
×
960
        TAOS_CHECK_EXIT(code);
×
961
      }
962
      break;  // retrieve stbs from one vnode is enough
×
963
    }
964
  }
965
_exit:
×
966
  if (code != 0) {
×
967
    dError("mount:%s, failed to retrieve mount stbs at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
968
           pReq->dnodeId, tstrerror(code), path);
969
  }
970
  taosArrayDestroy(suidList);
×
971
  taosArrayDestroy(pCols);
×
972
  taosArrayDestroy(pTags);
×
973
  taosArrayDestroy(pColExts);
×
974
  taosArrayDestroy(pTagExts);
×
975
  TAOS_RETURN(code);
×
976
}
977

978
int32_t vmMountCheckRunning(const char *mountName, const char *mountPath, TdFilePtr *pFile, int32_t retryLimit) {
×
979
  int32_t code = 0, lino = 0;
×
980
  int32_t retryTimes = 0;
×
981
  char    filepath[PATH_MAX] = {0};
×
982
  (void)snprintf(filepath, sizeof(filepath), "%s%s.running", mountPath, TD_DIRSEP);
×
983
  TSDB_CHECK_NULL((*pFile = taosOpenFile(filepath, TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_CLOEXEC)), code, lino, _exit,
×
984
                  terrno);
985
  int32_t ret = 0;
×
986
  do {
987
    ret = taosLockFile(*pFile);
×
988
    if (ret == 0) break;
×
989
    taosMsleep(1000);
×
990
    ++retryTimes;
×
991
    dError("mount:%s, failed to lock file:%s since %s, retryTimes:%d", mountName, filepath, tstrerror(ret),
×
992
           retryTimes);
993
  } while (retryTimes < retryLimit);
×
994
  TAOS_CHECK_EXIT(ret);
×
995
_exit:
×
996
  if (code != 0) {
×
997
    taosCloseFile(pFile);
×
998
    *pFile = NULL;
×
999
    dError("mount:%s, failed to check running at line %d since %s, path:%s", mountName, lino, tstrerror(code),
×
1000
           filepath);
1001
  }
1002
  TAOS_RETURN(code);
×
1003
}
1004

1005
static int32_t vmRetrieveMountPreCheck(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
×
1006
  int32_t code = 0, lino = 0;
×
1007
  char    path[TSDB_MOUNT_FPATH_LEN] = {0};
×
1008
  TSDB_CHECK_CONDITION(taosCheckAccessFile(pReq->mountPath, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
×
1009
  TAOS_CHECK_EXIT(vmMountCheckRunning(pReq->mountName, pReq->mountPath, &pMountInfo->pFile, 3));
×
1010
  (void)snprintf(path, sizeof(path), "%s%s%s%sdnode.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP);
×
1011
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
×
1012
  (void)snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(MNODE));
×
1013
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
×
1014
  (void)snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
×
1015
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
×
1016
  (void)snprintf(path, sizeof(path), "%s%s%s%sconfig%slocal.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP,
×
1017
           TD_DIRSEP);
1018
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
×
1019
_exit:
×
1020
  if (code != 0) {
×
1021
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
1022
           pReq->dnodeId, tstrerror(code), path);
1023
  }
1024
  TAOS_RETURN(code);
×
1025
}
1026

1027
static int32_t vmRetrieveMountPathImpl(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, SRetrieveMountPathReq *pReq,
×
1028
                                       SMountInfo *pMountInfo) {
1029
  int32_t code = 0, lino = 0;
×
1030
  pMountInfo->dnodeId = pReq->dnodeId;
×
1031
  pMountInfo->mountUid = pReq->mountUid;
×
1032
  (void)tsnprintf(pMountInfo->mountName, sizeof(pMountInfo->mountName), "%s", pReq->mountName);
×
1033
  (void)tsnprintf(pMountInfo->mountPath, sizeof(pMountInfo->mountPath), "%s", pReq->mountPath);
×
1034
  pMountInfo->ignoreExist = pReq->ignoreExist;
×
1035
  pMountInfo->valLen = pReq->valLen;
×
1036
  pMountInfo->pVal = pReq->pVal;
×
1037
  TAOS_CHECK_EXIT(vmRetrieveMountPreCheck(pMgmt, pReq, pMountInfo));
×
1038
  TAOS_CHECK_EXIT(vmRetrieveMountDnode(pMgmt, pReq, pMountInfo));
×
1039
  TAOS_CHECK_EXIT(vmRetrieveMountVnodes(pMgmt, pReq, pMountInfo));
×
1040
  TAOS_CHECK_EXIT(vmRetrieveMountStbs(pMgmt, pReq, pMountInfo));
×
1041
_exit:
×
1042
  if (code != 0) {
×
1043
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
1044
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
1045
  }
1046
  TAOS_RETURN(code);
×
1047
}
1048

1049
int32_t vmProcessRetrieveMountPathReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
1050
  int32_t               code = 0, lino = 0;
×
1051
  int32_t               rspCode = 0;
×
1052
  SVnodeMgmt            vndMgmt = {0};
×
1053
  SMountInfo            mountInfo = {0};
×
1054
  void                 *pBuf = NULL;
×
1055
  int32_t               bufLen = 0;
×
1056
  SRetrieveMountPathReq req = {0};
×
1057

1058
  vndMgmt = *pMgmt;
×
1059
  vndMgmt.path = NULL;
×
1060
  TAOS_CHECK_GOTO(tDeserializeSRetrieveMountPathReq(pMsg->pCont, pMsg->contLen, &req), &lino, _end);
×
1061
  dInfo("mount:%s, start to retrieve path:%s", req.mountName, req.mountPath);
×
1062
  TAOS_CHECK_GOTO(vmRetrieveMountPathImpl(&vndMgmt, pMsg, &req, &mountInfo), &lino, _end);
×
1063
_end:
×
1064
  TSDB_CHECK_CONDITION((bufLen = tSerializeSMountInfo(NULL, 0, &mountInfo)) >= 0, rspCode, lino, _exit, bufLen);
×
1065
  TSDB_CHECK_CONDITION((pBuf = rpcMallocCont(bufLen)), rspCode, lino, _exit, terrno);
×
1066
  TSDB_CHECK_CONDITION((bufLen = tSerializeSMountInfo(pBuf, bufLen, &mountInfo)) >= 0, rspCode, lino, _exit, bufLen);
×
1067
  pMsg->info.rsp = pBuf;
×
1068
  pMsg->info.rspLen = bufLen;
×
1069
_exit:
×
1070
  if (rspCode != 0) {
×
1071
    // corner case: if occurs, the client will not receive the response, and the client should be killed manually
1072
    dError("mount:%s, failed to retrieve mount at line %d since %s, dnode:%d, path:%s", req.mountName, lino,
×
1073
           tstrerror(rspCode), req.dnodeId, req.mountPath);
1074
    rpcFreeCont(pBuf);
×
1075
    code = rspCode;
×
1076
  } else if (code != 0) {
×
1077
    // the client would receive the response with error msg
1078
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", req.mountName, lino,
×
1079
           req.dnodeId, tstrerror(code), req.mountPath);
1080
  } else {
1081
    int32_t nVgs = 0;
×
1082
    int32_t nDbs = taosArrayGetSize(mountInfo.pDbs);
×
1083
    for (int32_t i = 0; i < nDbs; ++i) {
×
1084
      SMountDbInfo *pDb = TARRAY_GET_ELEM(mountInfo.pDbs, i);
×
1085
      nVgs += taosArrayGetSize(pDb->pVgs);
×
1086
    }
1087
    dInfo("mount:%s, success to retrieve mount, nDbs:%d, nVgs:%d, path:%s", req.mountName, nDbs, nVgs, req.mountPath);
×
1088
  }
1089
  taosMemFreeClear(vndMgmt.path);
×
1090
  tFreeMountInfo(&mountInfo, false);
×
1091
  TAOS_RETURN(code);
×
1092
}
1093

1094
static int32_t vmMountVnode(SVnodeMgmt *pMgmt, const char *path, SVnodeCfg *pCfg, int32_t diskPrimary,
×
1095
                            SMountVnodeReq *req, STfs *pMountTfs) {
1096
  int32_t    code = 0;
×
1097
  SVnodeInfo info = {0};
×
1098
  char       hostDir[TSDB_FILENAME_LEN] = {0};
×
1099
  char       mountDir[TSDB_FILENAME_LEN] = {0};
×
1100
  char       mountVnode[32] = {0};
×
1101

1102
  if ((code = vnodeCheckCfg(pCfg)) < 0) {
×
1103
    vError("vgId:%d, mount:%s, failed to mount vnode since:%s", pCfg->vgId, req->mountName, tstrerror(code));
×
1104
    return code;
×
1105
  }
1106

1107
  vnodeGetPrimaryDir(path, 0, pMgmt->pTfs, hostDir, TSDB_FILENAME_LEN);
×
1108
  if ((code = taosMkDir(hostDir))) {
×
1109
    vError("vgId:%d, mount:%s, failed to prepare vnode dir since %s, host path: %s", pCfg->vgId, req->mountName,
×
1110
           tstrerror(code), hostDir);
1111
    return code;
×
1112
  }
1113

1114
  info.config = *pCfg;  // copy the config
×
1115
  info.state.committed = req->committed;
×
1116
  info.state.commitID = req->commitID;
×
1117
  info.state.commitTerm = req->commitTerm;
×
1118
  info.state.applied = req->committed;
×
1119
  info.state.applyTerm = req->commitTerm;
×
1120
  info.config.vndStats.numOfSTables = req->numOfSTables;
×
1121
  info.config.vndStats.numOfCTables = req->numOfCTables;
×
1122
  info.config.vndStats.numOfNTables = req->numOfNTables;
×
1123

1124
  SVnodeInfo oldInfo = {0};
×
1125
  oldInfo.config = vnodeCfgDefault;
×
1126
  if (vnodeLoadInfo(hostDir, &oldInfo) == 0) {
×
1127
    if (oldInfo.config.dbId != info.config.dbId) {
×
1128
      code = TSDB_CODE_VND_ALREADY_EXIST_BUT_NOT_MATCH;
×
1129
      vError("vgId:%d, mount:%s, vnode config info already exists at %s. oldDbId:%" PRId64 "(%s) at cluster:%" PRId64
×
1130
             ", newDbId:%" PRId64 "(%s) at cluser:%" PRId64 ", code:%s",
1131
             oldInfo.config.vgId, req->mountName, hostDir, oldInfo.config.dbId, oldInfo.config.dbname,
1132
             oldInfo.config.syncCfg.nodeInfo[oldInfo.config.syncCfg.myIndex].clusterId, info.config.dbId,
1133
             info.config.dbname, info.config.syncCfg.nodeInfo[info.config.syncCfg.myIndex].clusterId, tstrerror(code));
1134

1135
    } else {
1136
      vWarn("vgId:%d, mount:%s, vnode config info already exists at %s.", oldInfo.config.vgId, req->mountName, hostDir);
×
1137
    }
1138
    return code;
×
1139
  }
1140

1141
  char hostSubDir[TSDB_FILENAME_LEN] = {0};
×
1142
  char mountSubDir[TSDB_FILENAME_LEN] = {0};
×
1143
  (void)snprintf(mountVnode, sizeof(mountVnode), "vnode%svnode%d", TD_DIRSEP, req->mountVgId);
×
1144
  vnodeGetPrimaryDir(mountVnode, diskPrimary, pMountTfs, mountDir, TSDB_FILENAME_LEN);
×
1145
  static const char *vndSubDirs[] = {"meta", "sync", "tq", "tsdb", "wal"};
1146
  for (int32_t i = 0; i < tListLen(vndSubDirs); ++i) {
×
1147
    (void)snprintf(hostSubDir, sizeof(hostSubDir), "%s%s%s", hostDir, TD_DIRSEP, vndSubDirs[i]);
×
1148
    (void)snprintf(mountSubDir, sizeof(mountSubDir), "%s%s%s", mountDir, TD_DIRSEP, vndSubDirs[i]);
×
1149
    if ((code = taosSymLink(mountSubDir, hostSubDir)) != 0) {
×
1150
      vError("vgId:%d, mount:%s, failed to create vnode symlink %s -> %s since %s", info.config.vgId, req->mountName,
×
1151
             mountSubDir, hostSubDir, tstrerror(code));
1152
      return code;
×
1153
    }
1154
  }
1155
  vInfo("vgId:%d, mount:save vnode config while create", info.config.vgId);
×
1156
  if ((code = vnodeSaveInfo(hostDir, &info)) < 0 || (code = vnodeCommitInfo(hostDir)) < 0) {
×
1157
    vError("vgId:%d, mount:%s, failed to save vnode config since %s, mount path: %s", pCfg ? pCfg->vgId : 0,
×
1158
           req->mountName, tstrerror(code), hostDir);
1159
    return code;
×
1160
  }
1161
  vInfo("vgId:%d, mount:%s, vnode is mounted from %s to %s", info.config.vgId, req->mountName, mountDir, hostDir);
×
1162
  return 0;
×
1163
}
1164

1165
int32_t vmProcessMountVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
1166
  int32_t          code = 0, lino = 0;
×
1167
  SMountVnodeReq   req = {0};
×
1168
  SCreateVnodeReq *pCreateReq = &req.createReq;
×
1169
  SVnodeCfg        vnodeCfg = {0};
×
1170
  SWrapperCfg      wrapperCfg = {0};
×
1171
  SVnode          *pImpl = NULL;
×
1172
  STfs            *pMountTfs = NULL;
×
1173
  char             path[TSDB_FILENAME_LEN] = {0};
×
1174
  bool             releaseTfs = false;
×
1175

1176
  if (tDeserializeSMountVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
×
1177
    dError("vgId:%d, failed to mount vnode since deserialize request error", pCreateReq->vgId);
×
1178
    return TSDB_CODE_INVALID_MSG;
×
1179
  }
1180

1181
  if (pCreateReq->learnerReplica == 0) {
×
1182
    pCreateReq->learnerSelfIndex = -1;
×
1183
  }
1184
  for (int32_t i = 0; i < pCreateReq->replica; ++i) {
×
1185
    dInfo("mount:%s, vgId:%d, replica:%d ep:%s:%u dnode:%d", req.mountName, pCreateReq->vgId, i,
×
1186
          pCreateReq->replicas[i].fqdn, pCreateReq->replicas[i].port, pCreateReq->replicas[i].id);
1187
  }
1188
  for (int32_t i = 0; i < pCreateReq->learnerReplica; ++i) {
×
1189
    dInfo("mount:%s, vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.mountName, pCreateReq->vgId, i,
×
1190
          pCreateReq->learnerReplicas[i].fqdn, pCreateReq->learnerReplicas[i].port, pCreateReq->replicas[i].id);
1191
  }
1192

1193
  SReplica *pReplica = NULL;
×
1194
  if (pCreateReq->selfIndex != -1) {
×
1195
    pReplica = &pCreateReq->replicas[pCreateReq->selfIndex];
×
1196
  } else {
1197
    pReplica = &pCreateReq->learnerReplicas[pCreateReq->learnerSelfIndex];
×
1198
  }
1199
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
×
1200
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
×
1201
    (void)tFreeSMountVnodeReq(&req);
×
1202
    code = TSDB_CODE_INVALID_MSG;
×
1203
    dError("mount:%s, vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode, reason:%s", req.mountName,
×
1204
           pCreateReq->vgId, pReplica->id, pReplica->fqdn, pReplica->port, tstrerror(code));
1205
    return code;
×
1206
  }
1207
  vmGenerateVnodeCfg(pCreateReq, &vnodeCfg);
×
1208
  vnodeCfg.mountVgId = req.mountVgId;
×
1209
  vmGenerateWrapperCfg(pMgmt, pCreateReq, &wrapperCfg);
×
1210
  wrapperCfg.mountId = req.mountId;
×
1211

1212
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, pCreateReq->vgId, false);
×
1213
  if (pVnode != NULL && (pCreateReq->replica == 1 || !pVnode->failed)) {
×
1214
    dError("mount:%s, vgId:%d, already exist", req.mountName, pCreateReq->vgId);
×
1215
    (void)tFreeSMountVnodeReq(&req);
×
1216
    vmReleaseVnode(pMgmt, pVnode);
×
1217
    code = TSDB_CODE_VND_ALREADY_EXIST;
×
1218
    return 0;
×
1219
  }
1220
  vmReleaseVnode(pMgmt, pVnode);
×
1221

1222
  wrapperCfg.diskPrimary = req.diskPrimary;
×
1223
  (void)snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
×
1224
  TAOS_CHECK_EXIT(vmAcquireMountTfs(pMgmt, req.mountId, req.mountName, req.mountPath, &pMountTfs));
×
1225
  releaseTfs = true;
×
1226

1227
  TAOS_CHECK_EXIT(vmMountVnode(pMgmt, path, &vnodeCfg, wrapperCfg.diskPrimary, &req, pMountTfs));
×
1228
  if (!(pImpl = vnodeOpen(path, 0, pMgmt->pTfs, pMountTfs, pMgmt->msgCb, true))) {
×
1229
    TAOS_CHECK_EXIT(terrno != 0 ? terrno : -1);
×
1230
  }
1231
  if ((code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl)) != 0) {
×
1232
    TAOS_CHECK_EXIT(terrno != 0 ? terrno : code);
×
1233
  }
1234
  TAOS_CHECK_EXIT(vnodeStart(pImpl));
×
1235
  TAOS_CHECK_EXIT(vmWriteVnodeListToFile(pMgmt));
×
1236
  TAOS_CHECK_EXIT(vmWriteMountListToFile(pMgmt));
×
1237
_exit:
×
1238
  vmCleanPrimaryDisk(pMgmt, pCreateReq->vgId);
×
1239
  if (code != 0) {
×
1240
    dError("mount:%s, vgId:%d, msgType:%s, failed at line %d to mount vnode since %s", req.mountName, pCreateReq->vgId,
×
1241
           TMSG_INFO(pMsg->msgType), lino, tstrerror(code));
1242
    vmCloseFailedVnode(pMgmt, pCreateReq->vgId);
×
1243
    vnodeClose(pImpl);
×
1244
    vnodeDestroy(0, path, pMgmt->pTfs, 0);
×
1245
    if (releaseTfs) vmReleaseMountTfs(pMgmt, req.mountId, 1);
×
1246
  } else {
1247
    dInfo("mount:%s, vgId:%d, msgType:%s, success to mount vnode", req.mountName, pCreateReq->vgId,
×
1248
          TMSG_INFO(pMsg->msgType));
1249
  }
1250

1251
  pMsg->code = code;
×
1252
  pMsg->info.rsp = NULL;
×
1253
  pMsg->info.rspLen = 0;
×
1254

1255
  (void)tFreeSMountVnodeReq(&req);
×
1256
  TAOS_RETURN(code);
×
1257
}
1258
#endif  // USE_MOUNT
1259

1260
// alter replica doesn't use this, but restore dnode still use this
1261
int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
1262
  SAlterVnodeTypeReq req = {0};
×
1263
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
×
1264
    terrno = TSDB_CODE_INVALID_MSG;
×
1265
    return -1;
×
1266
  }
1267

1268
  if (req.learnerReplicas == 0) {
1269
    req.learnerSelfIndex = -1;
1270
  }
1271

1272
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process alter-node-type-request", req.vgId,
×
1273
        TMSG_INFO(pMsg->msgType));
1274

1275
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
×
1276
  if (pVnode == NULL) {
×
1277
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
1278
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1279
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1280
    return -1;
×
1281
  }
1282

1283
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
×
1284
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
×
1285
  if (role == TAOS_SYNC_ROLE_VOTER) {
×
1286
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
1287
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
1288
    vmReleaseVnode(pMgmt, pVnode);
×
1289
    return -1;
×
1290
  }
1291

1292
  dInfo("vgId:%d, checking node catch up", req.vgId);
×
1293
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
×
1294
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
×
1295
    vmReleaseVnode(pMgmt, pVnode);
×
1296
    return -1;
×
1297
  }
1298

1299
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
×
1300

1301
  int32_t vgId = req.vgId;
×
1302
  dInfo("vgId:%d, start to alter vnode type replica:%d selfIndex:%d strict:%d changeVersion:%d", vgId, req.replica,
×
1303
        req.selfIndex, req.strict, req.changeVersion);
1304
  for (int32_t i = 0; i < req.replica; ++i) {
×
1305
    SReplica *pReplica = &req.replicas[i];
×
1306
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
×
1307
  }
1308
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
×
1309
    SReplica *pReplica = &req.learnerReplicas[i];
×
1310
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
×
1311
  }
1312

1313
  if (req.replica <= 0 || (req.selfIndex < 0 && req.learnerSelfIndex < 0) || req.selfIndex >= req.replica ||
×
1314
      req.learnerSelfIndex >= req.learnerReplica) {
×
1315
    terrno = TSDB_CODE_INVALID_MSG;
×
1316
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
1317
    vmReleaseVnode(pMgmt, pVnode);
×
1318
    return -1;
×
1319
  }
1320

1321
  SReplica *pReplica = NULL;
×
1322
  if (req.selfIndex != -1) {
×
1323
    pReplica = &req.replicas[req.selfIndex];
×
1324
  } else {
1325
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
×
1326
  }
1327

1328
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
×
1329
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
×
1330
    terrno = TSDB_CODE_INVALID_MSG;
×
1331
    dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode", vgId, pReplica->id, pReplica->fqdn,
×
1332
           pReplica->port);
1333
    vmReleaseVnode(pMgmt, pVnode);
×
1334
    return -1;
×
1335
  }
1336

1337
  dInfo("vgId:%d, start to close vnode", vgId);
×
1338
  SWrapperCfg wrapperCfg = {
×
1339
      .dropped = pVnode->dropped,
×
1340
      .vgId = pVnode->vgId,
×
1341
      .vgVersion = pVnode->vgVersion,
×
1342
      .diskPrimary = pVnode->diskPrimary,
×
1343
  };
1344
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
×
1345

1346
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
×
1347
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
×
1348

1349
  int32_t diskPrimary = wrapperCfg.diskPrimary;
×
1350
  char    path[TSDB_FILENAME_LEN] = {0};
×
1351
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
×
1352

1353
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
×
1354
  if (vnodeAlterReplica(path, &req, diskPrimary, pMgmt->pTfs) < 0) {
×
1355
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
1356
    return -1;
×
1357
  }
1358

1359
  dInfo("vgId:%d, begin to open vnode", vgId);
×
1360
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
×
1361
  if (pImpl == NULL) {
×
1362
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
1363
    return -1;
×
1364
  }
1365

1366
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
×
1367
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
1368
    return -1;
×
1369
  }
1370

1371
  if (vnodeStart(pImpl) != 0) {
×
1372
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
1373
    return -1;
×
1374
  }
1375

1376
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process alter-node-type-request, vnode config is altered",
×
1377
        req.vgId, TMSG_INFO(pMsg->msgType));
1378
  return 0;
×
1379
}
1380

1381
int32_t vmProcessCheckLearnCatchupReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
1382
  SCheckLearnCatchupReq req = {0};
×
1383
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
×
1384
    terrno = TSDB_CODE_INVALID_MSG;
×
1385
    return -1;
×
1386
  }
1387

1388
  if (req.learnerReplicas == 0) {
1389
    req.learnerSelfIndex = -1;
1390
  }
1391

1392
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process check-learner-catchup-request", req.vgId,
×
1393
        TMSG_INFO(pMsg->msgType));
1394

1395
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
×
1396
  if (pVnode == NULL) {
×
1397
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
1398
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1399
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1400
    return -1;
×
1401
  }
1402

1403
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
×
1404
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
×
1405
  if (role == TAOS_SYNC_ROLE_VOTER) {
×
1406
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
1407
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
1408
    vmReleaseVnode(pMgmt, pVnode);
×
1409
    return -1;
×
1410
  }
1411

1412
  dInfo("vgId:%d, checking node catch up", req.vgId);
×
1413
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
×
1414
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
×
1415
    vmReleaseVnode(pMgmt, pVnode);
×
1416
    return -1;
×
1417
  }
1418

1419
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
×
1420

1421
  vmReleaseVnode(pMgmt, pVnode);
×
1422

1423
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process check-learner-catchup-request", req.vgId,
×
1424
        TMSG_INFO(pMsg->msgType));
1425

1426
  return 0;
×
1427
}
1428

1429
int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
1430
  SDisableVnodeWriteReq req = {0};
×
1431
  if (tDeserializeSDisableVnodeWriteReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
×
1432
    terrno = TSDB_CODE_INVALID_MSG;
×
1433
    return -1;
×
1434
  }
1435

1436
  dInfo("vgId:%d, vnode write disable:%d", req.vgId, req.disable);
×
1437

1438
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
×
1439
  if (pVnode == NULL) {
×
1440
    dError("vgId:%d, failed to disable write since %s", req.vgId, terrstr());
×
1441
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1442
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1443
    return -1;
×
1444
  }
1445

1446
  pVnode->disable = req.disable;
×
1447
  vmReleaseVnode(pMgmt, pVnode);
×
1448
  return 0;
×
1449
}
1450

1451
int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
1452
  SAlterVnodeHashRangeReq req = {0};
×
1453
  if (tDeserializeSAlterVnodeHashRangeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
×
1454
    terrno = TSDB_CODE_INVALID_MSG;
×
1455
    return -1;
×
1456
  }
1457

1458
  int32_t srcVgId = req.srcVgId;
×
1459
  int32_t dstVgId = req.dstVgId;
×
1460

1461
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, dstVgId);
×
1462
  if (pVnode != NULL) {
×
1463
    dError("vgId:%d, vnode already exist", dstVgId);
×
1464
    vmReleaseVnode(pMgmt, pVnode);
×
1465
    terrno = TSDB_CODE_VND_ALREADY_EXIST;
×
1466
    return -1;
×
1467
  }
1468

1469
  dInfo("vgId:%d, start to alter vnode hashrange:[%u, %u], dstVgId:%d", req.srcVgId, req.hashBegin, req.hashEnd,
×
1470
        req.dstVgId);
1471
  pVnode = vmAcquireVnode(pMgmt, srcVgId);
×
1472
  if (pVnode == NULL) {
×
1473
    dError("vgId:%d, failed to alter hashrange since %s", srcVgId, terrstr());
×
1474
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1475
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1476
    return -1;
×
1477
  }
1478

1479
  SWrapperCfg wrapperCfg = {
×
1480
      .dropped = pVnode->dropped,
×
1481
      .vgId = dstVgId,
1482
      .vgVersion = pVnode->vgVersion,
×
1483
      .diskPrimary = pVnode->diskPrimary,
×
1484
  };
1485
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
×
1486

1487
  // prepare alter
1488
  pVnode->toVgId = dstVgId;
×
1489
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
×
1490
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
1491
    return -1;
×
1492
  }
1493

1494
  dInfo("vgId:%d, close vnode", srcVgId);
×
1495
  vmCloseVnode(pMgmt, pVnode, true, false);
×
1496

1497
  int32_t diskPrimary = wrapperCfg.diskPrimary;
×
1498
  char    srcPath[TSDB_FILENAME_LEN] = {0};
×
1499
  char    dstPath[TSDB_FILENAME_LEN] = {0};
×
1500
  snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
×
1501
  snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
×
1502

1503
  dInfo("vgId:%d, alter vnode hashrange at %s", srcVgId, srcPath);
×
1504
  if (vnodeAlterHashRange(srcPath, dstPath, &req, diskPrimary, pMgmt->pTfs) < 0) {
×
1505
    dError("vgId:%d, failed to alter vnode hashrange since %s", srcVgId, terrstr());
×
1506
    return -1;
×
1507
  }
1508

1509
  dInfo("vgId:%d, open vnode", dstVgId);
×
1510
  SVnode *pImpl = vnodeOpen(dstPath, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
×
1511

1512
  if (pImpl == NULL) {
×
1513
    dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr());
×
1514
    return -1;
×
1515
  }
1516

1517
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
×
1518
    dError("vgId:%d, failed to open vnode mgmt since %s", dstVgId, terrstr());
×
1519
    return -1;
×
1520
  }
1521

1522
  if (vnodeStart(pImpl) != 0) {
×
1523
    dError("vgId:%d, failed to start sync since %s", dstVgId, terrstr());
×
1524
    return -1;
×
1525
  }
1526

1527
  // complete alter
1528
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
×
1529
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
1530
    return -1;
×
1531
  }
1532

1533
  dInfo("vgId:%d, vnode hashrange is altered", dstVgId);
×
1534
  return 0;
×
1535
}
1536

1537
int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
1538
  SAlterVnodeReplicaReq alterReq = {0};
×
1539
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
×
1540
    terrno = TSDB_CODE_INVALID_MSG;
×
1541
    return -1;
×
1542
  }
1543

1544
  if (alterReq.learnerReplica == 0) {
×
1545
    alterReq.learnerSelfIndex = -1;
×
1546
  }
1547

1548
  int32_t vgId = alterReq.vgId;
×
1549
  dInfo(
×
1550
      "vgId:%d, vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
1551
      "learnerSelfIndex:%d strict:%d changeVersion:%d",
1552
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
1553
      alterReq.learnerSelfIndex, alterReq.strict, alterReq.changeVersion);
1554

1555
  for (int32_t i = 0; i < alterReq.replica; ++i) {
×
1556
    SReplica *pReplica = &alterReq.replicas[i];
×
1557
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
×
1558
  }
1559
  for (int32_t i = 0; i < alterReq.learnerReplica; ++i) {
×
1560
    SReplica *pReplica = &alterReq.learnerReplicas[i];
×
1561
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
×
1562
  }
1563

1564
  if (alterReq.replica <= 0 || (alterReq.selfIndex < 0 && alterReq.learnerSelfIndex < 0) ||
×
1565
      alterReq.selfIndex >= alterReq.replica || alterReq.learnerSelfIndex >= alterReq.learnerReplica) {
×
1566
    terrno = TSDB_CODE_INVALID_MSG;
×
1567
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
1568
    return -1;
×
1569
  }
1570

1571
  SReplica *pReplica = NULL;
×
1572
  if (alterReq.selfIndex != -1) {
×
1573
    pReplica = &alterReq.replicas[alterReq.selfIndex];
×
1574
  } else {
1575
    pReplica = &alterReq.learnerReplicas[alterReq.learnerSelfIndex];
×
1576
  }
1577

1578
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
×
1579
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
×
1580
    terrno = TSDB_CODE_INVALID_MSG;
×
1581
    dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode", vgId, pReplica->id, pReplica->fqdn,
×
1582
           pReplica->port);
1583
    return -1;
×
1584
  }
1585

1586
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
×
1587
  if (pVnode == NULL) {
×
1588
    dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
×
1589
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1590
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1591
    return -1;
×
1592
  }
1593

1594
  dInfo("vgId:%d, start to close vnode", vgId);
×
1595
  SWrapperCfg wrapperCfg = {
×
1596
      .dropped = pVnode->dropped,
×
1597
      .vgId = pVnode->vgId,
×
1598
      .vgVersion = pVnode->vgVersion,
×
1599
      .diskPrimary = pVnode->diskPrimary,
×
1600
  };
1601
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
×
1602

1603
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
×
1604
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
×
1605

1606
  int32_t diskPrimary = wrapperCfg.diskPrimary;
×
1607
  char    path[TSDB_FILENAME_LEN] = {0};
×
1608
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
×
1609

1610
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
×
1611
  if (vnodeAlterReplica(path, &alterReq, diskPrimary, pMgmt->pTfs) < 0) {
×
1612
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
1613
    return -1;
×
1614
  }
1615

1616
  dInfo("vgId:%d, begin to open vnode", vgId);
×
1617
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
×
1618
  if (pImpl == NULL) {
×
1619
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
1620
    return -1;
×
1621
  }
1622

1623
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
×
1624
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
1625
    return -1;
×
1626
  }
1627

1628
  if (vnodeStart(pImpl) != 0) {
×
1629
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
1630
    return -1;
×
1631
  }
1632

1633
  dInfo(
×
1634
      "vgId:%d, vnode management handle msgType:%s, end to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
1635
      "learnerSelfIndex:%d strict:%d",
1636
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
1637
      alterReq.learnerSelfIndex, alterReq.strict);
1638
  return 0;
×
1639
}
1640

1641
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
16✔
1642
  int32_t       code = 0;
16✔
1643
  SDropVnodeReq dropReq = {0};
16✔
1644
  if (tDeserializeSDropVnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
16!
1645
    terrno = TSDB_CODE_INVALID_MSG;
×
1646
    return terrno;
×
1647
  }
1648

1649
  int32_t vgId = dropReq.vgId;
16✔
1650
  dInfo("vgId:%d, start to drop vnode", vgId);
16!
1651

1652
  if (dropReq.dnodeId != pMgmt->pData->dnodeId) {
16!
1653
    terrno = TSDB_CODE_INVALID_MSG;
×
1654
    dError("vgId:%d, dnodeId:%d not matched with local dnode", dropReq.vgId, dropReq.dnodeId);
×
1655
    return terrno;
×
1656
  }
1657

1658
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, vgId, false);
16✔
1659
  if (pVnode == NULL) {
16!
1660
    dInfo("vgId:%d, failed to drop since %s", vgId, terrstr());
×
1661
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1662
    return terrno;
×
1663
  }
1664

1665
  pVnode->dropped = 1;
16✔
1666
  if ((code = vmWriteVnodeListToFile(pMgmt)) != 0) {
16!
1667
    pVnode->dropped = 0;
×
1668
    vmReleaseVnode(pMgmt, pVnode);
×
1669
    return code;
×
1670
  }
1671

1672
  vmCloseVnode(pMgmt, pVnode, false, false);
16✔
1673
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
16!
1674
    dError("vgId:%d, failed to write vnode list since %s", vgId, terrstr());
×
1675
  }
1676

1677
  dInfo("vgId:%d, is dropped", vgId);
16!
1678
  return 0;
16✔
1679
}
1680

1681
int32_t vmProcessArbHeartBeatReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
1682
  SVArbHeartBeatReq arbHbReq = {0};
×
1683
  SVArbHeartBeatRsp arbHbRsp = {0};
×
1684
  if (tDeserializeSVArbHeartBeatReq(pMsg->pCont, pMsg->contLen, &arbHbReq) != 0) {
×
1685
    terrno = TSDB_CODE_INVALID_MSG;
×
1686
    return -1;
×
1687
  }
1688

1689
  if (arbHbReq.dnodeId != pMgmt->pData->dnodeId) {
×
1690
    terrno = TSDB_CODE_INVALID_MSG;
×
1691
    dError("dnodeId:%d not matched with local dnode", arbHbReq.dnodeId);
×
1692
    goto _OVER;
×
1693
  }
1694

1695
  if (strlen(arbHbReq.arbToken) == 0) {
×
1696
    terrno = TSDB_CODE_INVALID_MSG;
×
1697
    dError("dnodeId:%d arbToken is empty", arbHbReq.dnodeId);
×
1698
    goto _OVER;
×
1699
  }
1700

1701
  size_t size = taosArrayGetSize(arbHbReq.hbMembers);
×
1702

1703
  arbHbRsp.dnodeId = pMgmt->pData->dnodeId;
×
1704
  tstrncpy(arbHbRsp.arbToken, arbHbReq.arbToken, TSDB_ARB_TOKEN_SIZE);
×
1705
  arbHbRsp.hbMembers = taosArrayInit(size, sizeof(SVArbHbRspMember));
×
1706
  if (arbHbRsp.hbMembers == NULL) {
×
1707
    goto _OVER;
×
1708
  }
1709

1710
  for (int32_t i = 0; i < size; i++) {
×
1711
    SVArbHbReqMember *pReqMember = taosArrayGet(arbHbReq.hbMembers, i);
×
1712
    SVnodeObj        *pVnode = vmAcquireVnode(pMgmt, pReqMember->vgId);
×
1713
    if (pVnode == NULL) {
×
1714
      dError("dnodeId:%d vgId:%d not found failed to process arb hb req", arbHbReq.dnodeId, pReqMember->vgId);
×
1715
      continue;
×
1716
    }
1717

1718
    SVArbHbRspMember rspMember = {0};
×
1719
    rspMember.vgId = pReqMember->vgId;
×
1720
    rspMember.hbSeq = pReqMember->hbSeq;
×
1721
    if (vnodeGetArbToken(pVnode->pImpl, rspMember.memberToken) != 0) {
×
1722
      dError("dnodeId:%d vgId:%d failed to get arb token", arbHbReq.dnodeId, pReqMember->vgId);
×
1723
      vmReleaseVnode(pMgmt, pVnode);
×
1724
      continue;
×
1725
    }
1726

1727
    if (vnodeUpdateArbTerm(pVnode->pImpl, arbHbReq.arbTerm) != 0) {
×
1728
      dError("dnodeId:%d vgId:%d failed to update arb term", arbHbReq.dnodeId, pReqMember->vgId);
×
1729
      vmReleaseVnode(pMgmt, pVnode);
×
1730
      continue;
×
1731
    }
1732

1733
    if (taosArrayPush(arbHbRsp.hbMembers, &rspMember) == NULL) {
×
1734
      dError("dnodeId:%d vgId:%d failed to push arb hb rsp member", arbHbReq.dnodeId, pReqMember->vgId);
×
1735
      vmReleaseVnode(pMgmt, pVnode);
×
1736
      goto _OVER;
×
1737
    }
1738

1739
    vmReleaseVnode(pMgmt, pVnode);
×
1740
  }
1741

1742
  SRpcMsg rspMsg = {.info = pMsg->info};
×
1743
  int32_t rspLen = tSerializeSVArbHeartBeatRsp(NULL, 0, &arbHbRsp);
×
1744
  if (rspLen < 0) {
×
1745
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1746
    goto _OVER;
×
1747
  }
1748

1749
  void *pRsp = rpcMallocCont(rspLen);
×
1750
  if (pRsp == NULL) {
×
1751
    terrno = terrno;
×
1752
    goto _OVER;
×
1753
  }
1754

1755
  if (tSerializeSVArbHeartBeatRsp(pRsp, rspLen, &arbHbRsp) <= 0) {
×
1756
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1757
    rpcFreeCont(pRsp);
×
1758
    goto _OVER;
×
1759
  }
1760
  pMsg->info.rsp = pRsp;
×
1761
  pMsg->info.rspLen = rspLen;
×
1762

1763
  terrno = TSDB_CODE_SUCCESS;
×
1764

1765
_OVER:
×
1766
  tFreeSVArbHeartBeatReq(&arbHbReq);
×
1767
  tFreeSVArbHeartBeatRsp(&arbHbRsp);
×
1768
  return terrno;
×
1769
}
1770

1771
SArray *vmGetMsgHandles() {
15✔
1772
  int32_t code = -1;
15✔
1773
  SArray *pArray = taosArrayInit(32, sizeof(SMgmtHandle));
15✔
1774
  if (pArray == NULL) goto _OVER;
15!
1775

1776
  if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
15!
1777
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
15!
1778
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
15!
1779
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
15!
1780
  if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
15!
1781
  if (dmSetMgmtHandle(pArray, TDMT_VND_EXEC_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
15!
1782
  if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
15!
1783
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
15!
1784
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
15!
1785
  if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
15!
1786
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
15!
1787
  if (dmSetMgmtHandle(pArray, TDMT_VND_VSUBTABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
15!
1788
  if (dmSetMgmtHandle(pArray, TDMT_VND_VSTB_REF_DBS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
15!
1789
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_CFG, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
15!
1790
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
15!
1791
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
15!
1792
  if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
15!
1793
  if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
15!
1794
  if (dmSetMgmtHandle(pArray, TDMT_SCH_TASK_NOTIFY, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
15!
1795
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
15!
1796
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
15!
1797
  if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_TTL_EXPIRED_TBS, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
15!
1798
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
15!
1799
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
15!
1800
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
15!
1801
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
15!
1802
  if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
15!
1803
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
15!
1804
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DELETE_SUB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
15!
1805
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
15!
1806
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SEEK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
15!
1807
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_ADD_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
15!
1808
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DEL_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
15!
1809
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
15!
1810
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME_PUSH, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
15!
1811
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_WALINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
15!
1812
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_COMMITTEDINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
15!
1813
  if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
15!
1814
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
15!
1815
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
15!
1816
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
15!
1817
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
15!
1818
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
15!
1819
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_COMPACT_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
15!
1820
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
15!
1821
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_NAME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
15!
1822
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_SSMIGRATE_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
15!
1823
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_SSMIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
15!
1824

1825
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
15!
1826
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
15!
1827
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
15!
1828
  if (dmSetMgmtHandle(pArray, TDMT_VND_DISABLE_WRITE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
15!
1829
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
15!
1830
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
15!
1831
  if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
15!
1832
  if (dmSetMgmtHandle(pArray, TDMT_VND_SSMIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
15!
1833
  if (dmSetMgmtHandle(pArray, TDMT_VND_FOLLOWER_SSMIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
15!
1834
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
15!
1835
  if (dmSetMgmtHandle(pArray, TDMT_DND_MOUNT_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
15!
1836
  if (dmSetMgmtHandle(pArray, TDMT_DND_RETRIEVE_MOUNT_PATH, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
15!
1837
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
15!
1838
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
15!
1839
  if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
15!
1840
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CONFIG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
15!
1841

1842
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT_ELECTION, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
15!
1843
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
15!
1844
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
15!
1845
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
15!
1846
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
15!
1847
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
15!
1848
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
15!
1849
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
15!
1850
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
15!
1851
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
15!
1852
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
15!
1853
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_FORCE_FOLLOWER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
15!
1854

1855
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
15!
1856
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
15!
1857
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
15!
1858
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
15!
1859
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
15!
1860

1861
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_HEARTBEAT, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
15!
1862
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_CHECK_SYNC, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
15!
1863
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_ASSIGNED_LEADER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
15!
1864

1865
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_FETCH, vmPutMsgToStreamReaderQueue, 0) == NULL) goto _OVER;
15!
1866
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_PULL, vmPutMsgToStreamReaderQueue, 0) == NULL) goto _OVER;
15!
1867
  code = 0;
15✔
1868

1869
_OVER:
15✔
1870
  if (code != 0) {
15!
1871
    taosArrayDestroy(pArray);
×
1872
    return NULL;
×
1873
  } else {
1874
    return pArray;
15✔
1875
  }
1876
}
1877

1878
void vmUpdateMetricsInfo(SVnodeMgmt *pMgmt, int64_t clusterId) {
×
1879
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
1880

1881
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
1882
  while (pIter) {
×
1883
    SVnodeObj **ppVnode = pIter;
×
1884
    if (ppVnode == NULL || *ppVnode == NULL) {
×
1885
      continue;
×
1886
    }
1887

1888
    SVnodeObj *pVnode = *ppVnode;
×
1889
    if (!pVnode->failed) {
×
1890
      SRawWriteMetrics metrics = {0};
×
1891
      if (vnodeGetRawWriteMetrics(pVnode->pImpl, &metrics) == 0) {
×
1892
        // Add the metrics to the global metrics system with cluster ID
1893
        SName   name = {0};
×
1894
        int32_t code = tNameFromString(&name, pVnode->pImpl->config.dbname, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1895
        if (code < 0) {
×
1896
          dError("failed to get db name since %s", tstrerror(code));
×
1897
          continue;
×
1898
        }
1899
        code = addWriteMetrics(pVnode->vgId, pMgmt->pData->dnodeId, clusterId, tsLocalEp, name.dbname, &metrics);
×
1900
        if (code != TSDB_CODE_SUCCESS) {
×
1901
          dError("Failed to add write metrics for vgId: %d, code: %d", pVnode->vgId, code);
×
1902
        } else {
1903
          // After successfully adding metrics, reset the vnode's write metrics using atomic operations
1904
          if (vnodeResetRawWriteMetrics(pVnode->pImpl, &metrics) != 0) {
×
1905
            dError("Failed to reset write metrics for vgId: %d", pVnode->vgId);
×
1906
          }
1907
        }
1908
      } else {
1909
        dError("Failed to get write metrics for vgId: %d", pVnode->vgId);
×
1910
      }
1911
    }
1912
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
1913
  }
1914

1915
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
1916
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc