• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5050

12 May 2026 05:36AM UTC coverage: 73.398% (+0.09%) from 73.313%
#5050

push

travis-ci

web-flow
merge: from main to 3.0 branch #35319

90 of 101 new or added lines in 2 files covered. (89.11%)

489 existing lines in 125 files now uncovered.

281602 of 383662 relevant lines covered (73.4%)

138099127.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.3
/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "metrics.h"
18
#include "taos_monitor.h"
19
#include "vmInt.h"
20
#include "vnd.h"
21
#include "vnodeInt.h"
22
#include "tencrypt.h"
23

24
extern taos_counter_t *tsInsertCounter;
25

26
#ifdef TD_ENTERPRISE
27
// Forward declaration for enterprise function
28
extern int32_t vnodeGetCompactProgress(SVnode *pVnode, int32_t compactId, SQueryCompactProgressRsp *pRsp);
29
#endif
30

31
// Forward declaration for function defined in metrics.c
32
extern int32_t addWriteMetrics(int32_t vgId, int32_t dnodeId, int64_t clusterId, const char *dnodeEp,
33
                               const char *dbname, const SRawWriteMetrics *pRawMetrics);
34

35
void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
49,174,914✔
36
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad));
49,174,914✔
37
  if (pInfo->pVloads == NULL) return;
49,174,914✔
38

39
  tfsUpdateSize(pMgmt->pTfs);
49,174,914✔
40

41
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
49,174,914✔
42

43
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
49,174,914✔
44
  while (pIter) {
195,009,695✔
45
    SVnodeObj **ppVnode = pIter;
145,834,781✔
46
    if (ppVnode == NULL || *ppVnode == NULL) continue;
145,834,781✔
47

48
    SVnodeObj *pVnode = *ppVnode;
145,834,781✔
49
    SVnodeLoad vload = {.vgId = pVnode->vgId};
145,834,781✔
50
    if (!pVnode->failed) {
145,834,781✔
51
      if (vnodeGetLoad(pVnode->pImpl, &vload) != 0) {
145,834,781✔
52
        dError("failed to get vnode load");
×
53
      }
54
      if (isReset) vnodeResetLoad(pVnode->pImpl, &vload);
145,834,781✔
55
    }
56
    if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
291,669,562✔
57
      dError("failed to push vnode load");
×
58
    }
59
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
145,834,781✔
60
  }
61

62
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
49,174,914✔
63
}
64

65
void vmSetVnodeSyncTimeout(SVnodeMgmt *pMgmt) {
×
66
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
67

68
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
69
  while (pIter) {
×
70
    SVnodeObj **ppVnode = pIter;
×
71
    if (ppVnode == NULL || *ppVnode == NULL) continue;
×
72

73
    SVnodeObj *pVnode = *ppVnode;
×
74

75
    if (vnodeSetSyncTimeout(pVnode->pImpl, tsVnodeElectIntervalMs) != 0) {
×
76
      dError("vgId:%d, failed to vnodeSetSyncTimeout", pVnode->vgId);
×
77
    }
78
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
79
  }
80

81
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
82
}
×
83

84
void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
×
85
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoadLite));
×
86
  if (!pInfo->pVloads) return;
×
87

88
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
89

90
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
91
  while (pIter) {
×
92
    SVnodeObj **ppVnode = pIter;
×
93
    if (ppVnode == NULL || *ppVnode == NULL) continue;
×
94

95
    SVnodeObj *pVnode = *ppVnode;
×
96
    if (!pVnode->failed) {
×
97
      SVnodeLoadLite vload = {0};
×
98
      if (vnodeGetLoadLite(pVnode->pImpl, &vload) == 0) {
×
99
        if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
×
100
          taosArrayDestroy(pInfo->pVloads);
×
101
          pInfo->pVloads = NULL;
×
102
          break;
×
103
        }
104
      }
105
    }
106
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
107
  }
108

109
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
110
}
111

112
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
83✔
113
  SMonVloadInfo vloads = {0};
83✔
114
  vmGetVnodeLoads(pMgmt, &vloads, true);
83✔
115

116
  SArray *pVloads = vloads.pVloads;
83✔
117
  if (pVloads == NULL) return;
83✔
118

119
  int32_t totalVnodes = 0;
83✔
120
  int32_t masterNum = 0;
83✔
121
  int64_t numOfSelectReqs = 0;
83✔
122
  int64_t numOfInsertReqs = 0;
83✔
123
  int64_t numOfInsertSuccessReqs = 0;
83✔
124
  int64_t numOfBatchInsertReqs = 0;
83✔
125
  int64_t numOfBatchInsertSuccessReqs = 0;
83✔
126

127
  for (int32_t i = 0; i < taosArrayGetSize(pVloads); ++i) {
249✔
128
    SVnodeLoad *pLoad = taosArrayGet(pVloads, i);
166✔
129
    numOfSelectReqs += pLoad->numOfSelectReqs;
166✔
130
    numOfInsertReqs += pLoad->numOfInsertReqs;
166✔
131
    numOfInsertSuccessReqs += pLoad->numOfInsertSuccessReqs;
166✔
132
    numOfBatchInsertReqs += pLoad->numOfBatchInsertReqs;
166✔
133
    numOfBatchInsertSuccessReqs += pLoad->numOfBatchInsertSuccessReqs;
166✔
134
    if (pLoad->syncState == TAOS_SYNC_STATE_LEADER || pLoad->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
166✔
135
      masterNum++;
166✔
136
    }
137
    totalVnodes++;
166✔
138
  }
139

140
  pInfo->vstat.totalVnodes = totalVnodes;
83✔
141
  pInfo->vstat.masterNum = masterNum;
83✔
142
  pInfo->vstat.numOfSelectReqs = numOfSelectReqs;
83✔
143
  pInfo->vstat.numOfInsertReqs = numOfInsertReqs;                          // delta
83✔
144
  pInfo->vstat.numOfInsertSuccessReqs = numOfInsertSuccessReqs;            // delta
83✔
145
  pInfo->vstat.numOfBatchInsertReqs = numOfBatchInsertReqs;                // delta
83✔
146
  pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;  // delta
83✔
147
  pMgmt->state.totalVnodes = totalVnodes;
83✔
148
  pMgmt->state.masterNum = masterNum;
83✔
149
  pMgmt->state.numOfSelectReqs = numOfSelectReqs;
83✔
150
  pMgmt->state.numOfInsertReqs = numOfInsertReqs;
83✔
151
  pMgmt->state.numOfInsertSuccessReqs = numOfInsertSuccessReqs;
83✔
152
  pMgmt->state.numOfBatchInsertReqs = numOfBatchInsertReqs;
83✔
153
  pMgmt->state.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;
83✔
154

155
  if (tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs) != 0) {
83✔
156
    dError("failed to get tfs monitor info");
×
157
  }
158
  taosArrayDestroy(pVloads);
83✔
159
}
160

161
void vmCleanExpriedSamples(SVnodeMgmt *pMgmt) {
83✔
162
  int list_size = taos_counter_get_keys_size(tsInsertCounter);
83✔
163
  if (list_size == 0) return;
83✔
164
  int32_t *vgroup_ids;
×
165
  char   **keys;
×
166
  int      r = 0;
×
167
  r = taos_counter_get_vgroup_ids(tsInsertCounter, &keys, &vgroup_ids, &list_size);
×
168
  if (r) {
×
169
    dError("failed to get vgroup ids");
×
170
    return;
×
171
  }
172
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
173
  for (int i = 0; i < list_size; i++) {
×
174
    int32_t vgroup_id = vgroup_ids[i];
×
175
    void   *vnode = taosHashGet(pMgmt->runngingHash, &vgroup_id, sizeof(int32_t));
×
176
    if (vnode == NULL) {
×
177
      r = taos_counter_delete(tsInsertCounter, keys[i]);
×
178
      if (r) {
×
179
        dError("failed to delete monitor sample key:%s", keys[i]);
×
180
      }
181
    }
182
  }
183
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
184
  if (vgroup_ids) taosMemoryFree(vgroup_ids);
×
185
  if (keys) taosMemoryFree(keys);
×
186
  return;
×
187
}
188

189
void vmCleanExpiredMetrics(SVnodeMgmt *pMgmt) {
×
190
  if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0 || !tsEnableMetrics) {
×
191
    return;
×
192
  }
193

194
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
195
  void     *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
196
  SHashObj *pValidVgroups = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
×
197
  if (pValidVgroups == NULL) {
×
198
    (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
199
    return;
×
200
  }
201

202
  while (pIter != NULL) {
×
203
    SVnodeObj **ppVnode = pIter;
×
204
    if (ppVnode && *ppVnode) {
×
205
      int32_t vgId = (*ppVnode)->vgId;
×
206
      char    dummy = 1;  // hash table value (we only care about the key)
×
207
      if (taosHashPut(pValidVgroups, &vgId, sizeof(int32_t), &dummy, sizeof(char)) != 0) {
×
208
        dError("failed to put vgId:%d to valid vgroups hash", vgId);
×
209
      }
210
    }
211
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
212
  }
213
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
214

215
  // Clean expired metrics by removing metrics for non-existent vgroups
216
  int32_t code = cleanupExpiredMetrics(pValidVgroups);
×
217
  if (code != TSDB_CODE_SUCCESS) {
×
218
    dError("failed to clean expired metrics, code:%d", code);
×
219
  }
220

221
  taosHashCleanup(pValidVgroups);
×
222
}
223

224
static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
3,621,388✔
225
  memcpy(pCfg, &vnodeCfgDefault, sizeof(SVnodeCfg));
3,621,388✔
226

227
  pCfg->vgId = pCreate->vgId;
3,621,388✔
228
  tstrncpy(pCfg->dbname, pCreate->db, sizeof(pCfg->dbname));
3,618,762✔
229
  pCfg->dbId = pCreate->dbUid;
3,620,846✔
230
  pCfg->szPage = pCreate->pageSize * 1024;
3,617,950✔
231
  pCfg->szCache = pCreate->pages;
3,614,508✔
232
  pCfg->cacheLast = pCreate->cacheLast;
3,618,697✔
233
  pCfg->cacheLastSize = pCreate->cacheLastSize;
3,614,281✔
234
  pCfg->cacheLastShardBits = pCreate->cacheLastShardBits;
3,612,860✔
235
  pCfg->szBuf = (uint64_t)pCreate->buffer * 1024 * 1024;
3,607,557✔
236
  pCfg->isWeak = true;
3,607,527✔
237
  pCfg->isTsma = pCreate->isTsma;
3,600,068✔
238
  pCfg->tsdbCfg.compression = pCreate->compression;
3,601,060✔
239
  pCfg->tsdbCfg.precision = pCreate->precision;
3,600,231✔
240
  pCfg->tsdbCfg.days = pCreate->daysPerFile;
3,601,871✔
241
  pCfg->tsdbCfg.keep0 = pCreate->daysToKeep0;
3,605,364✔
242
  pCfg->tsdbCfg.keep1 = pCreate->daysToKeep1;
3,588,830✔
243
  pCfg->tsdbCfg.keep2 = pCreate->daysToKeep2;
3,586,680✔
244
  pCfg->tsdbCfg.keepTimeOffset = pCreate->keepTimeOffset;
3,588,185✔
245
  pCfg->tsdbCfg.minRows = pCreate->minRows;
3,597,433✔
246
  pCfg->tsdbCfg.maxRows = pCreate->maxRows;
3,589,878✔
247
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
248
  // pCfg->tsdbCfg.encryptAlgr = pCreate->encryptAlgr;
249
  tstrncpy(pCfg->tsdbCfg.encryptData.encryptAlgrName, pCreate->encryptAlgrName, TSDB_ENCRYPT_ALGR_NAME_LEN);
3,579,633✔
250
  if (pCfg->tsdbCfg.encryptAlgr == DND_CA_SM4 || pCfg->tsdbCfg.encryptData.encryptAlgrName[0] != '\0') {
3,612,729✔
251
    tstrncpy(pCfg->tsdbCfg.encryptData.encryptKey, tsDataKey, ENCRYPT_KEY_LEN + 1);
975✔
252
  }
253
#else
254
  pCfg->tsdbCfg.encryptAlgr = 0;
255
#endif
256

257
  pCfg->walCfg.vgId = pCreate->vgId;  // pCreate->mountVgId ? pCreate->mountVgId : pCreate->vgId;
3,597,823✔
258
  pCfg->walCfg.fsyncPeriod = pCreate->walFsyncPeriod;
3,592,716✔
259
  pCfg->walCfg.retentionPeriod = pCreate->walRetentionPeriod;
3,582,562✔
260
  pCfg->walCfg.rollPeriod = pCreate->walRollPeriod;
3,591,822✔
261
  pCfg->walCfg.retentionSize = pCreate->walRetentionSize;
3,574,074✔
262
  pCfg->walCfg.segSize = pCreate->walSegmentSize;
3,584,167✔
263
  pCfg->walCfg.level = pCreate->walLevel;
3,576,735✔
264
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
265
  // pCfg->walCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
266
  tstrncpy(pCfg->walCfg.encryptData.encryptAlgrName, pCreate->encryptAlgrName, TSDB_ENCRYPT_ALGR_NAME_LEN);
3,580,371✔
267
  if (pCfg->walCfg.encryptAlgr == DND_CA_SM4 || pCfg->walCfg.encryptData.encryptAlgrName[0] != '\0') {
3,579,877✔
268
    tstrncpy(pCfg->walCfg.encryptData.encryptKey, tsDataKey, ENCRYPT_KEY_LEN + 1);
17,409✔
269
  }
270
#else
271
  pCfg->walCfg.encryptAlgr = 0;
272
#endif
273

274
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
275
  // pCfg->tdbEncryptAlgorithm = pCreate->encryptAlgorithm;
276
  tstrncpy(pCfg->tdbEncryptData.encryptAlgrName, pCreate->encryptAlgrName, TSDB_ENCRYPT_ALGR_NAME_LEN);
3,583,484✔
277
  if (pCfg->tdbEncryptAlgr == DND_CA_SM4 || pCfg->tdbEncryptData.encryptAlgrName[0] != '\0') {
3,595,683✔
278
    tstrncpy(pCfg->tdbEncryptData.encryptKey, tsDataKey, ENCRYPT_KEY_LEN + 1);
137✔
279
  }
280
#else
281
  pCfg->tdbEncryptAlgr = 0;
282
#endif
283

284
  pCfg->sttTrigger = pCreate->sstTrigger;
3,585,535✔
285
  pCfg->hashBegin = pCreate->hashBegin;
3,576,314✔
286
  pCfg->hashEnd = pCreate->hashEnd;
3,570,260✔
287
  pCfg->hashMethod = pCreate->hashMethod;
3,582,799✔
288
  pCfg->hashPrefix = pCreate->hashPrefix;
3,577,209✔
289
  pCfg->hashSuffix = pCreate->hashSuffix;
3,569,047✔
290
  pCfg->tsdbPageSize = pCreate->tsdbPageSize * 1024;
3,569,412✔
291

292
  pCfg->ssChunkSize = pCreate->ssChunkSize;
3,576,152✔
293
  pCfg->ssKeepLocal = pCreate->ssKeepLocal;
3,571,395✔
294
  pCfg->ssCompact = pCreate->ssCompact;
3,590,441✔
295

296
  pCfg->isAudit = pCreate->isAudit;
3,577,333✔
297
  pCfg->allowDrop = pCreate->allowDrop;
3,581,181✔
298
  pCfg->secureDelete = pCreate->secureDelete;
3,556,984✔
299
  pCfg->securityLevel = pCreate->securityLevel;
3,579,292✔
300

301
  pCfg->standby = 0;
3,551,738✔
302
  pCfg->syncCfg.replicaNum = 0;
3,595,790✔
303
  pCfg->syncCfg.totalReplicaNum = 0;
3,574,624✔
304
  pCfg->syncCfg.changeVersion = pCreate->changeVersion;
3,589,955✔
305

306
  memset(&pCfg->syncCfg.nodeInfo, 0, sizeof(pCfg->syncCfg.nodeInfo));
3,572,957✔
307
  for (int32_t i = 0; i < pCreate->replica; ++i) {
8,336,191✔
308
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
4,727,163✔
309
    pNode->nodeId = pCreate->replicas[pCfg->syncCfg.replicaNum].id;
4,715,528✔
310
    pNode->nodePort = pCreate->replicas[pCfg->syncCfg.replicaNum].port;
4,714,516✔
311
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
4,703,924✔
312
    tstrncpy(pNode->nodeFqdn, pCreate->replicas[pCfg->syncCfg.replicaNum].fqdn, TSDB_FQDN_LEN);
4,698,211✔
313
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
4,737,833✔
314
    pCfg->syncCfg.replicaNum++;
4,745,787✔
315
  }
316
  if (pCreate->selfIndex != -1) {
3,604,154✔
317
    pCfg->syncCfg.myIndex = pCreate->selfIndex;
3,488,092✔
318
  }
319
  for (int32_t i = pCfg->syncCfg.replicaNum; i < pCreate->replica + pCreate->learnerReplica; ++i) {
3,709,313✔
320
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
111,025✔
321
    pNode->nodeId = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].id;
111,025✔
322
    pNode->nodePort = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].port;
111,025✔
323
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
111,025✔
324
    tstrncpy(pNode->nodeFqdn, pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].fqdn, TSDB_FQDN_LEN);
111,025✔
325
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
111,025✔
326
    pCfg->syncCfg.totalReplicaNum++;
111,025✔
327
  }
328
  pCfg->syncCfg.totalReplicaNum += pCfg->syncCfg.replicaNum;
3,582,756✔
329
  if (pCreate->learnerSelfIndex != -1) {
3,583,387✔
330
    pCfg->syncCfg.myIndex = pCreate->replica + pCreate->learnerSelfIndex;
111,025✔
331
  }
332
}
3,579,768✔
333

334
static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) {
3,583,305✔
335
  pCfg->vgId = pCreate->vgId;
3,583,305✔
336
  pCfg->vgVersion = pCreate->vgVersion;
3,579,988✔
337
  pCfg->dropped = 0;
3,577,658✔
338
  snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCreate->vgId);
3,554,748✔
339
}
3,609,506✔
340

341
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
3,579,118✔
342
  SCreateVnodeReq req = {0};
3,579,118✔
343
  SVnodeCfg       vnodeCfg = {0};
3,619,632✔
344
  SWrapperCfg     wrapperCfg = {0};
3,610,081✔
345
  int32_t         code = -1;
3,611,683✔
346
  char            path[TSDB_FILENAME_LEN] = {0};
3,611,683✔
347

348
  if (tDeserializeSCreateVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
3,609,018✔
349
    return TSDB_CODE_INVALID_MSG;
×
350
  }
351

352
  if (req.learnerReplica == 0) {
3,613,380✔
353
    req.learnerSelfIndex = -1;
3,500,843✔
354
  }
355

356
  dInfo(
3,613,380✔
357
      "vgId:%d, vnode management handle msgType:%s, start to create vnode, page:%d pageSize:%d buffer:%d szPage:%d "
358
      "szBuf:%" PRIu64 ", cacheLast:%d cacheLastSize:%d sstTrigger:%d tsdbPageSize:%d %d dbname:%s dbId:%" PRId64
359
      ", days:%d keep0:%d keep1:%d keep2:%d keepTimeOffset%d ssChunkSize:%d ssKeepLocal:%d ssCompact:%d tsma:%d "
360
      "precision:%d compression:%d minRows:%d maxRows:%d"
361
      ", wal fsync:%d level:%d retentionPeriod:%d retentionSize:%" PRId64 " rollPeriod:%d segSize:%" PRId64
362
      ", hash method:%d begin:%u end:%u prefix:%d surfix:%d replica:%d selfIndex:%d "
363
      "learnerReplica:%d learnerSelfIndex:%d strict:%d changeVersion:%d encryptAlgorithm:%d encryptAlgrName:%s, "
364
      "isAudit:%" PRIu8 " allowDrop:%" PRIu8 " securityLevel:%d",
365
      req.vgId, TMSG_INFO(pMsg->msgType), req.pages, req.pageSize, req.buffer, req.pageSize * 1024,
366
      (uint64_t)req.buffer * 1024 * 1024, req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize,
367
      req.tsdbPageSize * 1024, req.db, req.dbUid, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
368
      req.keepTimeOffset, req.ssChunkSize, req.ssKeepLocal, req.ssCompact, req.isTsma, req.precision, req.compression,
369
      req.minRows, req.maxRows, req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize,
370
      req.walRollPeriod, req.walSegmentSize, req.hashMethod, req.hashBegin, req.hashEnd, req.hashPrefix, req.hashSuffix,
371
      req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict, req.changeVersion,
372
      req.encryptAlgorithm, req.encryptAlgrName, req.isAudit, req.allowDrop, req.securityLevel);
373

374
  for (int32_t i = 0; i < req.replica; ++i) {
8,381,306✔
375
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port,
4,760,070✔
376
          req.replicas[i].id);
377
  }
378
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
3,732,261✔
379
    dInfo("vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.vgId, i, req.learnerReplicas[i].fqdn,
111,025✔
380
          req.learnerReplicas[i].port, req.replicas[i].id);
381
  }
382

383
  SReplica *pReplica = NULL;
3,621,236✔
384
  if (req.selfIndex != -1) {
3,621,236✔
385
    pReplica = &req.replicas[req.selfIndex];
3,510,155✔
386
  } else {
387
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
111,081✔
388
  }
389
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
3,620,068✔
390
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
3,620,700✔
UNCOV
391
    (void)tFreeSCreateVnodeReq(&req);
×
392

393
    code = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
394
    dError("vgId:%d, dnodeId:%d ep:%s:%u in request, ep:%s:%u in local, %s", req.vgId, pReplica->id,
×
395
           pReplica->fqdn, pReplica->port, tsLocalFqdn, tsServerPort, tstrerror(code));
396
    return code;
×
397
  }
398

399
  if (taosWaitCfgKeyLoaded() != 0) {
3,620,684✔
400
    (void)tFreeSCreateVnodeReq(&req);
×
401
    code = terrno;
×
402
    dError("vgId:%d, failed to create vnode since encrypt key is not loaded, reason:%s", req.vgId, tstrerror(code));
×
403
    return code;
×
404
  }
405

406
  if (req.encryptAlgrName[0] != '\0' && strlen(tsDataKey) == 0) {
3,620,644✔
407
    (void)tFreeSCreateVnodeReq(&req);
×
408
    code = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
409
    dError("vgId:%d, failed to create vnode since encrypt key is empty, reason:%s", req.vgId, tstrerror(code));
×
410
    return code;
×
411
  }
412

413
  vmGenerateVnodeCfg(&req, &vnodeCfg);
3,620,644✔
414

415
  vmGenerateWrapperCfg(pMgmt, &req, &wrapperCfg);
3,584,665✔
416

417
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, req.vgId, false);
3,608,580✔
418
  if (pVnode != NULL && (req.replica == 1 || !pVnode->failed)) {
3,613,073✔
419
    dError("vgId:%d, already exist", req.vgId);
26,835✔
420
    (void)tFreeSCreateVnodeReq(&req);
26,835✔
421
    vmReleaseVnode(pMgmt, pVnode);
26,835✔
422
    code = TSDB_CODE_VND_ALREADY_EXIST;
26,835✔
423
    return 0;
26,835✔
424
  }
425

426
  int32_t diskPrimary = vmGetPrimaryDisk(pMgmt, vnodeCfg.vgId);
3,586,238✔
427
  if (diskPrimary < 0) {
3,584,622✔
428
    diskPrimary = vmAllocPrimaryDisk(pMgmt, vnodeCfg.vgId);
3,585,333✔
429
  }
430
  wrapperCfg.diskPrimary = diskPrimary;
3,593,313✔
431

432
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
3,593,313✔
433

434
  if ((code = vnodeCreate(path, &vnodeCfg, diskPrimary, pMgmt->pTfs)) < 0) {
3,593,313✔
435
    dError("vgId:%d, failed to create vnode since %s", req.vgId, tstrerror(code));
×
436
    vmReleaseVnode(pMgmt, pVnode);
×
437
    vmCleanPrimaryDisk(pMgmt, req.vgId);
×
438
    (void)tFreeSCreateVnodeReq(&req);
×
439
    return code;
×
440
  }
441

442
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, true);
3,594,345✔
443
  if (pImpl == NULL) {
3,594,345✔
444
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
445
    code = terrno != 0 ? terrno : -1;
×
446
    goto _OVER;
×
447
  }
448

449
  code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl);
3,594,345✔
450
  if (code != 0) {
3,594,345✔
451
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
452
    code = terrno != 0 ? terrno : code;
×
453
    goto _OVER;
×
454
  }
455

456
  code = vnodeStart(pImpl);
3,594,345✔
457
  if (code != 0) {
3,594,345✔
458
    dError("vgId:%d, failed to start sync since %s", req.vgId, terrstr());
×
459
    goto _OVER;
×
460
  }
461

462
  code = vmWriteVnodeListToFile(pMgmt);
3,594,345✔
463
  if (code != 0) {
3,594,345✔
464
    code = terrno != 0 ? terrno : code;
×
465
    goto _OVER;
×
466
  }
467

468
_OVER:
3,594,345✔
469
  vmCleanPrimaryDisk(pMgmt, req.vgId);
3,594,345✔
470

471
  if (code != 0) {
3,594,013✔
472
    vmCloseFailedVnode(pMgmt, req.vgId);
×
473

474
    vnodeClose(pImpl);
×
475
    vnodeDestroy(0, path, pMgmt->pTfs, 0);
×
476
  } else {
477
    dInfo("vgId:%d, vnode management handle msgType:%s, end to create vnode, vnode is created", req.vgId,
3,594,013✔
478
          TMSG_INFO(pMsg->msgType));
479
  }
480

481
  (void)tFreeSCreateVnodeReq(&req);
3,594,345✔
482
  terrno = code;
3,594,212✔
483
  return code;
3,594,345✔
484
}
485

486
#ifdef USE_MOUNT
487
typedef struct {
488
  int64_t dbId;
489
  int32_t vgId;
490
  int32_t diskPrimary;
491
} SMountDbVgId;
492
extern int32_t vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo);
493
extern int32_t mndFetchSdbStables(const char *mntName, const char *path, void *output);
494

495
static int compareVnodeInfo(const void *p1, const void *p2) {
3,626✔
496
  SVnodeInfo *v1 = (SVnodeInfo *)p1;
3,626✔
497
  SVnodeInfo *v2 = (SVnodeInfo *)p2;
3,626✔
498

499
  if (v1->config.dbId == v2->config.dbId) {
3,626✔
500
    if (v1->config.vgId == v2->config.vgId) {
2,072✔
501
      return 0;
×
502
    }
503
    return v1->config.vgId > v2->config.vgId ? 1 : -1;
2,072✔
504
  }
505

506
  return v1->config.dbId > v2->config.dbId ? 1 : -1;
1,554✔
507
}
508
static int compareVgDiskPrimary(const void *p1, const void *p2) {
3,626✔
509
  SMountDbVgId *v1 = (SMountDbVgId *)p1;
3,626✔
510
  SMountDbVgId *v2 = (SMountDbVgId *)p2;
3,626✔
511

512
  if (v1->dbId == v2->dbId) {
3,626✔
513
    if (v1->vgId == v2->vgId) {
2,072✔
514
      return 0;
×
515
    }
516
    return v1->vgId > v2->vgId ? 1 : -1;
2,072✔
517
  }
518

519
  return v1->dbId > v2->dbId ? 1 : -1;
1,554✔
520
}
521

522
static int32_t vmRetrieveMountDnode(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
518✔
523
  int32_t   code = 0, lino = 0;
518✔
524
  TdFilePtr pFile = NULL;
518✔
525
  char     *content = NULL;
518✔
526
  SJson    *pJson = NULL;
518✔
527
  int64_t   size = 0;
518✔
528
  int64_t   clusterId = 0, dropped = 0, encryptScope = 0;
518✔
529
  char      file[TSDB_MOUNT_FPATH_LEN] = {0};
518✔
530
  SArray   *pDisks = NULL;
518✔
531
  // step 1: fetch clusterId from dnode.json
532
  (void)snprintf(file, sizeof(file), "%s%s%s%sdnode.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP);
518✔
533
  TAOS_CHECK_EXIT(taosStatFile(file, &size, NULL, NULL));
518✔
534
  TSDB_CHECK_NULL((pFile = taosOpenFile(file, TD_FILE_READ)), code, lino, _exit, terrno);
518✔
535
  TSDB_CHECK_NULL((content = taosMemoryMalloc(size + 1)), code, lino, _exit, terrno);
518✔
536
  if (taosReadFile(pFile, content, size) != size) {
518✔
537
    TAOS_CHECK_EXIT(terrno);
×
538
  }
539
  content[size] = '\0';
518✔
540
  pJson = tjsonParse(content);
518✔
541
  if (pJson == NULL) {
518✔
542
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
543
  }
544
  tjsonGetNumberValue(pJson, "dropped", dropped, code);
518✔
545
  TAOS_CHECK_EXIT(code);
518✔
546
  if (dropped == 1) {
518✔
547
    TAOS_CHECK_EXIT(TSDB_CODE_MND_MOUNT_DNODE_DROPPED);
×
548
  }
549
  tjsonGetNumberValue(pJson, "encryptScope", encryptScope, code);
518✔
550
  TAOS_CHECK_EXIT(code);
518✔
551
  if (encryptScope != 0) {
518✔
552
    TAOS_CHECK_EXIT(TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG);
×
553
  }
554
  tjsonGetNumberValue(pJson, "clusterId", clusterId, code);
518✔
555
  TAOS_CHECK_EXIT(code);
518✔
556
  if (clusterId == 0) {
518✔
557
    TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_CLUSTER_ID);
×
558
  }
559
  pMountInfo->clusterId = clusterId;
518✔
560
  if (content != NULL) taosMemoryFreeClear(content);
518✔
561
  if (pJson != NULL) {
518✔
562
    cJSON_Delete(pJson);
518✔
563
    pJson = NULL;
518✔
564
  }
565
  if (pFile != NULL) taosCloseFile(&pFile);
518✔
566
  // step 2: fetch dataDir from dnode/config/local.json
567
  TAOS_CHECK_EXIT(vmGetMountDisks(pMgmt, pReq->mountPath, &pDisks));
518✔
568
  int32_t nDisks = taosArrayGetSize(pDisks);
518✔
569
  if (nDisks < 1 || nDisks > TFS_MAX_DISKS) {
518✔
570
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
571
  }
572
  for (int32_t i = 0; i < nDisks; ++i) {
3,626✔
573
    SDiskCfg *pDisk = TARRAY_GET_ELEM(pDisks, i);
3,108✔
574
    if (!pMountInfo->pDisks[pDisk->level]) {
3,108✔
575
      pMountInfo->pDisks[pDisk->level] = taosArrayInit(1, sizeof(char *));
1,036✔
576
      if (!pMountInfo->pDisks[pDisk->level]) {
1,036✔
577
        TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
578
      }
579
    }
580
    char *pDir = taosStrdup(pDisk->dir);
3,108✔
581
    if (pDir == NULL) {
3,108✔
582
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
583
    }
584
    if (pDisk->primary == 1 && taosArrayGetSize(pMountInfo->pDisks[0])) {
3,108✔
585
      // put the primary disk to the first position of level 0
586
      if (!taosArrayInsert(pMountInfo->pDisks[0], 0, &pDir)) {
518✔
587
        taosMemFree(pDir);
×
588
        TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
589
      }
590
    } else if (!taosArrayPush(pMountInfo->pDisks[pDisk->level], &pDir)) {
5,180✔
591
      taosMemFree(pDir);
×
592
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
593
    }
594
  }
595
  for (int32_t i = 0; i < TFS_MAX_TIERS; ++i) {
2,072✔
596
    int32_t nDisk = taosArrayGetSize(pMountInfo->pDisks[i]);
1,554✔
597
    if (nDisk < (i == 0 ? 1 : 0) || nDisk > TFS_MAX_DISKS_PER_TIER) {
1,554✔
598
      dError("mount:%s, invalid disk number:%d at level:%d", pReq->mountName, nDisk, i);
×
599
      TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
600
    }
601
  }
602
_exit:
518✔
603
  if (content != NULL) taosMemoryFreeClear(content);
518✔
604
  if (pJson != NULL) cJSON_Delete(pJson);
518✔
605
  if (pFile != NULL) taosCloseFile(&pFile);
518✔
606
  if (pDisks != NULL) {
518✔
607
    taosArrayDestroy(pDisks);
518✔
608
    pDisks = NULL;
518✔
609
  }
610
  if (code != 0) {
518✔
611
    dError("mount:%s, failed to retrieve mount dnode at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
612
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
613
  } else {
614
    dInfo("mount:%s, success to retrieve mount dnode on dnode:%d, clusterId:%" PRId64 ", path:%s", pReq->mountName,
518✔
615
          pReq->dnodeId, pMountInfo->clusterId, pReq->mountPath);
616
  }
617
  TAOS_RETURN(code);
518✔
618
}
619

620
static int32_t vmRetrieveMountVnodes(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
518✔
621
  int32_t       code = 0, lino = 0;
518✔
622
  SWrapperCfg  *pCfgs = NULL;
518✔
623
  int32_t       numOfVnodes = 0;
518✔
624
  char          path[TSDB_MOUNT_FPATH_LEN] = {0};
518✔
625
  TdDirPtr      pDir = NULL;
518✔
626
  TdDirEntryPtr de = NULL;
518✔
627
  SVnodeMgmt    vnodeMgmt = {0};
518✔
628
  SArray       *pVgCfgs = NULL;
518✔
629
  SArray       *pDbInfos = NULL;
518✔
630
  SArray       *pDiskPrimarys = NULL;
518✔
631

632
  snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
518✔
633
  vnodeMgmt.path = path;
518✔
634
  TAOS_CHECK_EXIT(vmGetVnodeListFromFile(&vnodeMgmt, &pCfgs, &numOfVnodes));
518✔
635
  dInfo("mount:%s, num of vnodes is %d in path:%s", pReq->mountName, numOfVnodes, vnodeMgmt.path);
518✔
636
  TSDB_CHECK_NULL((pVgCfgs = taosArrayInit_s(sizeof(SVnodeInfo), numOfVnodes)), code, lino, _exit, terrno);
518✔
637
  TSDB_CHECK_NULL((pDiskPrimarys = taosArrayInit(numOfVnodes, sizeof(SMountDbVgId))), code, lino, _exit, terrno);
518✔
638

639
  int32_t nDiskLevel0 = taosArrayGetSize(pMountInfo->pDisks[0]);
518✔
640
  int32_t nVgDropped = 0, j = 0;
518✔
641
  for (int32_t i = 0; i < numOfVnodes; ++i) {
2,590✔
642
    SWrapperCfg *pCfg = &pCfgs[i];
2,072✔
643
    // in order to support multi-tier disk, the pCfg->path should be adapted according to the diskPrimary firstly
644
    if (nDiskLevel0 > 1) {
2,072✔
645
      char *pDir = taosArrayGet(pMountInfo->pDisks[0], pCfg->diskPrimary);
2,072✔
646
      if (!pDir) TAOS_CHECK_EXIT(TSDB_CODE_INTERNAL_ERROR);
2,072✔
647
      (void)snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%svnode%d", *(char **)pDir, TD_DIRSEP, TD_DIRSEP,
2,072✔
648
                     pCfg->vgId);
649
    }
650
    dInfo("mount:%s, vnode path:%s, dropped:%" PRIi8, pReq->mountName, pCfg->path, pCfg->dropped);
2,072✔
651
    if (pCfg->dropped) {
2,072✔
652
      ++nVgDropped;
×
653
      continue;
×
654
    }
655
    if (!taosCheckAccessFile(pCfg->path, TD_FILE_ACCESS_EXIST_OK | TD_FILE_ACCESS_READ_OK | TD_FILE_ACCESS_WRITE_OK)) {
2,072✔
656
      dError("mount:%s, vnode path:%s, no r/w authority", pReq->mountName, pCfg->path);
×
657
      TAOS_CHECK_EXIT(TSDB_CODE_MND_NO_RIGHTS);
×
658
    }
659
    SVnodeInfo *pInfo = TARRAY_GET_ELEM(pVgCfgs, j++);
2,072✔
660
    TAOS_CHECK_EXIT(vnodeLoadInfo(pCfg->path, pInfo));
2,072✔
661
    if (pInfo->config.syncCfg.replicaNum > 1) {
2,072✔
662
      dError("mount:%s, vnode path:%s, invalid replica:%d", pReq->mountName, pCfg->path,
×
663
             pInfo->config.syncCfg.replicaNum);
664
      TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_REPLICA);
×
665
    } else if (pInfo->config.vgId != pCfg->vgId) {
2,072✔
666
      dError("mount:%s, vnode path:%s, vgId:%d not match:%d", pReq->mountName, pCfg->path, pInfo->config.vgId,
×
667
             pCfg->vgId);
668
      TAOS_CHECK_EXIT(TSDB_CODE_FILE_CORRUPTED);
×
669
    } else if (pInfo->config.tdbEncryptData.encryptAlgrName[0] != '\0' ||
2,072✔
670
               pInfo->config.tsdbCfg.encryptData.encryptAlgrName[0] != '\0' ||
2,072✔
671
               pInfo->config.walCfg.encryptData.encryptAlgrName[0] != '\0') {
2,072✔
672
      dError("mount:%s, vnode path:%s, invalid encrypt algorithm, tdb:%s wal:%s tsdb:%s", pReq->mountName, pCfg->path,
×
673
             pInfo->config.tdbEncryptData.encryptAlgrName, pInfo->config.walCfg.encryptData.encryptAlgrName,
674
             pInfo->config.tsdbCfg.encryptData.encryptAlgrName);
675
      TAOS_CHECK_EXIT(TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG);
×
676
    }
677
    SMountDbVgId dbVgId = {.dbId = pInfo->config.dbId, .vgId = pInfo->config.vgId, .diskPrimary = pCfg->diskPrimary};
2,072✔
678
    TSDB_CHECK_NULL(taosArrayPush(pDiskPrimarys, &dbVgId), code, lino, _exit, terrno);
2,072✔
679
  }
680
  if (nVgDropped > 0) {
518✔
681
    dInfo("mount:%s, %d vnodes are dropped", pReq->mountName, nVgDropped);
×
682
    int32_t nVgToDrop = taosArrayGetSize(pVgCfgs) - nVgDropped;
×
683
    if (nVgToDrop > 0) taosArrayRemoveBatch(pVgCfgs, nVgToDrop - 1, nVgToDrop, NULL);
×
684
  }
685
  int32_t nVgCfg = taosArrayGetSize(pVgCfgs);
518✔
686
  int32_t nDiskPrimary = taosArrayGetSize(pDiskPrimarys);
518✔
687
  if (nVgCfg != nDiskPrimary) {
518✔
688
    dError("mount:%s, nVgCfg:%d not match nDiskPrimary:%d", pReq->mountName, nVgCfg, nDiskPrimary);
×
689
    TAOS_CHECK_EXIT(TSDB_CODE_APP_ERROR);
×
690
  }
691
  if (nVgCfg > 1) {
518✔
692
    taosArraySort(pVgCfgs, compareVnodeInfo);
518✔
693
    taosArraySort(pDiskPrimarys, compareVgDiskPrimary);
518✔
694
  }
695

696
  int64_t clusterId = pMountInfo->clusterId;
518✔
697
  int64_t dbId = 0, vgId = 0, nDb = 0;
518✔
698
  for (int32_t i = 0; i < nVgCfg; ++i) {
1,894✔
699
    SVnodeInfo *pInfo = TARRAY_GET_ELEM(pVgCfgs, i);
1,550✔
700
    if (clusterId != pInfo->config.syncCfg.nodeInfo->clusterId) {
1,550✔
701
      dError("mount:%s, clusterId:%" PRId64 " not match:%" PRId64, pReq->mountName, clusterId,
174✔
702
             pInfo->config.syncCfg.nodeInfo->clusterId);
703
      TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_CLUSTER_ID);
174✔
704
    }
705
    if (dbId != pInfo->config.dbId) {
1,376✔
706
      dbId = pInfo->config.dbId;
688✔
707
      ++nDb;
688✔
708
    }
709
    if (vgId == pInfo->config.vgId) {
1,376✔
710
      TAOS_CHECK_EXIT(TSDB_CODE_FILE_CORRUPTED);
×
711
    } else {
712
      vgId = pInfo->config.vgId;
1,376✔
713
    }
714
  }
715

716
  if (nDb > 0) {
344✔
717
    TSDB_CHECK_NULL((pDbInfos = taosArrayInit_s(sizeof(SMountDbInfo), nDb)), code, lino, _exit, terrno);
344✔
718
    int32_t dbIdx = -1;
344✔
719
    for (int32_t i = 0; i < nVgCfg; ++i) {
1,720✔
720
      SVnodeInfo   *pVgCfg = TARRAY_GET_ELEM(pVgCfgs, i);
1,376✔
721
      SMountDbVgId *pDiskPrimary = TARRAY_GET_ELEM(pDiskPrimarys, i);
1,376✔
722
      SMountDbInfo *pDbInfo = NULL;
1,376✔
723
      if (i == 0 || ((SMountDbInfo *)TARRAY_GET_ELEM(pDbInfos, dbIdx))->dbId != pVgCfg->config.dbId) {
1,376✔
724
        pDbInfo = TARRAY_GET_ELEM(pDbInfos, ++dbIdx);
688✔
725
        pDbInfo->dbId = pVgCfg->config.dbId;
688✔
726
        snprintf(pDbInfo->dbName, sizeof(pDbInfo->dbName), "%s", pVgCfg->config.dbname);
688✔
727
        TSDB_CHECK_NULL((pDbInfo->pVgs = taosArrayInit(nVgCfg / nDb, sizeof(SMountVgInfo))), code, lino, _exit, terrno);
688✔
728
      } else {
729
        pDbInfo = TARRAY_GET_ELEM(pDbInfos, dbIdx);
688✔
730
      }
731
      SMountVgInfo vgInfo = {
1,376✔
732
          .diskPrimary = pDiskPrimary->diskPrimary,
1,376✔
733
          .vgId = pVgCfg->config.vgId,
1,376✔
734
          .dbId = pVgCfg->config.dbId,
1,376✔
735
          .cacheLastSize = pVgCfg->config.cacheLastSize,
1,376✔
736
          .szPage = pVgCfg->config.szPage,
1,376✔
737
          .szCache = pVgCfg->config.szCache,
1,376✔
738
          .szBuf = pVgCfg->config.szBuf,
1,376✔
739
          .cacheLast = pVgCfg->config.cacheLast,
1,376✔
740
          .standby = pVgCfg->config.standby,
1,376✔
741
          .hashMethod = pVgCfg->config.hashMethod,
1,376✔
742
          .hashBegin = pVgCfg->config.hashBegin,
1,376✔
743
          .hashEnd = pVgCfg->config.hashEnd,
1,376✔
744
          .hashPrefix = pVgCfg->config.hashPrefix,
1,376✔
745
          .hashSuffix = pVgCfg->config.hashSuffix,
1,376✔
746
          .sttTrigger = pVgCfg->config.sttTrigger,
1,376✔
747
          .replications = pVgCfg->config.syncCfg.replicaNum,
1,376✔
748
          .precision = pVgCfg->config.tsdbCfg.precision,
1,376✔
749
          .compression = pVgCfg->config.tsdbCfg.compression,
1,376✔
750
          .slLevel = pVgCfg->config.tsdbCfg.slLevel,
1,376✔
751
          .daysPerFile = pVgCfg->config.tsdbCfg.days,
1,376✔
752
          .keep0 = pVgCfg->config.tsdbCfg.keep0,
1,376✔
753
          .keep1 = pVgCfg->config.tsdbCfg.keep1,
1,376✔
754
          .keep2 = pVgCfg->config.tsdbCfg.keep2,
1,376✔
755
          .keepTimeOffset = pVgCfg->config.tsdbCfg.keepTimeOffset,
1,376✔
756
          .minRows = pVgCfg->config.tsdbCfg.minRows,
1,376✔
757
          .maxRows = pVgCfg->config.tsdbCfg.maxRows,
1,376✔
758
          .tsdbPageSize = pVgCfg->config.tsdbPageSize / 1024,
1,376✔
759
          .ssChunkSize = pVgCfg->config.ssChunkSize,
1,376✔
760
          .ssKeepLocal = pVgCfg->config.ssKeepLocal,
1,376✔
761
          .ssCompact = pVgCfg->config.ssCompact,
1,376✔
762
          .walFsyncPeriod = pVgCfg->config.walCfg.fsyncPeriod,
1,376✔
763
          .walRetentionPeriod = pVgCfg->config.walCfg.retentionPeriod,
1,376✔
764
          .walRollPeriod = pVgCfg->config.walCfg.rollPeriod,
1,376✔
765
          .walRetentionSize = pVgCfg->config.walCfg.retentionSize,
1,376✔
766
          .walSegSize = pVgCfg->config.walCfg.segSize,
1,376✔
767
          .walLevel = pVgCfg->config.walCfg.level,
1,376✔
768
          .isAudit = pVgCfg->config.isAudit,
1,376✔
769
          .allowDrop = pVgCfg->config.allowDrop,
1,376✔
770
          .secureDelete = pVgCfg->config.secureDelete,
1,376✔
771
          .securityLevel = pVgCfg->config.securityLevel,
1,376✔
772
          //.encryptAlgorithm = pVgCfg->config.walCfg.encryptAlgorithm,
773
          .committed = pVgCfg->state.committed,
1,376✔
774
          .commitID = pVgCfg->state.commitID,
1,376✔
775
          .commitTerm = pVgCfg->state.commitTerm,
1,376✔
776
          .numOfSTables = pVgCfg->config.vndStats.numOfSTables,
1,376✔
777
          .numOfCTables = pVgCfg->config.vndStats.numOfCTables,
1,376✔
778
          .numOfNTables = pVgCfg->config.vndStats.numOfNTables,
1,376✔
779
      };
780
      TSDB_CHECK_NULL(taosArrayPush(pDbInfo->pVgs, &vgInfo), code, lino, _exit, terrno);
2,752✔
781
    }
782
  }
783

784
  pMountInfo->pDbs = pDbInfos;
344✔
785

786
_exit:
518✔
787
  if (code != 0) {
518✔
788
    dError("mount:%s, failed to retrieve mount vnode at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
174✔
789
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
790
  }
791
  taosArrayDestroy(pDiskPrimarys);
518✔
792
  taosArrayDestroy(pVgCfgs);
518✔
793
  taosMemoryFreeClear(pCfgs);
518✔
794
  TAOS_RETURN(code);
518✔
795
}
796

797
/**
798
 *   Retrieve the stables from vnode meta.
799
 */
800
static int32_t vmRetrieveMountStbs(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
344✔
801
  int32_t code = 0, lino = 0;
344✔
802
  char    path[TSDB_MOUNT_FPATH_LEN] = {0};
344✔
803
  int32_t nDb = taosArrayGetSize(pMountInfo->pDbs);
344✔
804
  SArray *suidList = NULL;
344✔
805
  SArray *pCols = NULL;
344✔
806
  SArray *pTags = NULL;
344✔
807
  SArray *pColExts = NULL;
344✔
808
  SArray *pTagExts = NULL;
344✔
809

810
  snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
344✔
811
  for (int32_t i = 0; i < nDb; ++i) {
1,032✔
812
    SMountDbInfo *pDbInfo = TARRAY_GET_ELEM(pMountInfo->pDbs, i);
688✔
813
    int32_t       nVg = taosArrayGetSize(pDbInfo->pVgs);
688✔
814
    for (int32_t j = 0; j < nVg; ++j) {
688✔
815
      SMountVgInfo *pVgInfo = TARRAY_GET_ELEM(pDbInfo->pVgs, j);
688✔
816
      SVnode        vnode = {
688✔
817
                 .config.vgId = pVgInfo->vgId,
688✔
818
                 .config.dbId = pVgInfo->dbId,
688✔
819
                 .config.cacheLastSize = pVgInfo->cacheLastSize,
688✔
820
                 .config.szPage = pVgInfo->szPage,
688✔
821
                 .config.szCache = pVgInfo->szCache,
688✔
822
                 .config.szBuf = pVgInfo->szBuf,
688✔
823
                 .config.cacheLast = pVgInfo->cacheLast,
688✔
824
                 .config.standby = pVgInfo->standby,
688✔
825
                 .config.hashMethod = pVgInfo->hashMethod,
688✔
826
                 .config.hashBegin = pVgInfo->hashBegin,
688✔
827
                 .config.hashEnd = pVgInfo->hashEnd,
688✔
828
                 .config.hashPrefix = pVgInfo->hashPrefix,
688✔
829
                 .config.hashSuffix = pVgInfo->hashSuffix,
688✔
830
                 .config.sttTrigger = pVgInfo->sttTrigger,
688✔
831
                 .config.syncCfg.replicaNum = pVgInfo->replications,
688✔
832
                 .config.tsdbCfg.precision = pVgInfo->precision,
688✔
833
                 .config.tsdbCfg.compression = pVgInfo->compression,
688✔
834
                 .config.tsdbCfg.slLevel = pVgInfo->slLevel,
688✔
835
                 .config.tsdbCfg.days = pVgInfo->daysPerFile,
688✔
836
                 .config.tsdbCfg.keep0 = pVgInfo->keep0,
688✔
837
                 .config.tsdbCfg.keep1 = pVgInfo->keep1,
688✔
838
                 .config.tsdbCfg.keep2 = pVgInfo->keep2,
688✔
839
                 .config.tsdbCfg.keepTimeOffset = pVgInfo->keepTimeOffset,
688✔
840
                 .config.tsdbCfg.minRows = pVgInfo->minRows,
688✔
841
                 .config.tsdbCfg.maxRows = pVgInfo->maxRows,
688✔
842
                 .config.tsdbPageSize = pVgInfo->tsdbPageSize,
688✔
843
                 .config.ssChunkSize = pVgInfo->ssChunkSize,
688✔
844
                 .config.ssKeepLocal = pVgInfo->ssKeepLocal,
688✔
845
                 .config.ssCompact = pVgInfo->ssCompact,
688✔
846
                 .config.isAudit = pVgInfo->isAudit,
688✔
847
                 .config.allowDrop = pVgInfo->allowDrop,
688✔
848
                 .config.secureDelete = pVgInfo->secureDelete,
688✔
849
                 .config.securityLevel = pVgInfo->securityLevel,
688✔
850
                 .config.walCfg.fsyncPeriod = pVgInfo->walFsyncPeriod,
688✔
851
                 .config.walCfg.retentionPeriod = pVgInfo->walRetentionPeriod,
688✔
852
                 .config.walCfg.rollPeriod = pVgInfo->walRollPeriod,
688✔
853
                 .config.walCfg.retentionSize = pVgInfo->walRetentionSize,
688✔
854
                 .config.walCfg.segSize = pVgInfo->walSegSize,
688✔
855
                 .config.walCfg.level = pVgInfo->walLevel,
688✔
856
          //.config.walCfg.encryptAlgorithm = pVgInfo->encryptAlgorithm,
857
                 .diskPrimary = pVgInfo->diskPrimary,
688✔
858
      };
859
      void *vnodePath = taosArrayGet(pMountInfo->pDisks[0], pVgInfo->diskPrimary);
688✔
860
      snprintf(path, sizeof(path), "%s%s%s%svnode%d", *(char **)vnodePath, TD_DIRSEP, dmNodeName(VNODE), TD_DIRSEP,
688✔
861
               pVgInfo->vgId);
862
      vnode.path = path;
688✔
863

864
      int32_t rollback = vnodeShouldRollback(&vnode);
688✔
865
      if ((code = metaOpen(&vnode, &vnode.pMeta, rollback)) != 0) {
688✔
866
        dError("mount:%s, failed to retrieve stbs of vnode:%d for db:%" PRId64 " on dnode:%d since %s, path:%s",
×
867
               pReq->mountName, pVgInfo->vgId, pVgInfo->dbId, pReq->dnodeId, tstrerror(code), path);
868
        TAOS_CHECK_EXIT(code);
×
869
      } else {
870
        dInfo("mount:%s, success to retrieve stbs of vnode:%d for db:%" PRId64 " on dnode:%d, path:%s", pReq->mountName,
688✔
871
              pVgInfo->vgId, pVgInfo->dbId, pReq->dnodeId, path);
872

873
        SMetaReader mr = {0};
688✔
874
        tb_uid_t    suid = 0;
688✔
875
        SMeta      *pMeta = vnode.pMeta;
688✔
876

877
        metaReaderDoInit(&mr, pMeta, META_READER_LOCK);
688✔
878
        if (!suidList && !(suidList = taosArrayInit(1, sizeof(tb_uid_t)))) {
688✔
879
          TSDB_CHECK_CODE(terrno, lino, _exit0);
×
880
        }
881
        taosArrayClear(suidList);
688✔
882
        TSDB_CHECK_CODE(vnodeGetStbIdList(&vnode, 0, suidList), lino, _exit0);
688✔
883
        dInfo("mount:%s, vnode:%d, db:%" PRId64 ", stbs num:%d on dnode:%d", pReq->mountName, pVgInfo->vgId,
688✔
884
              pVgInfo->dbId, (int32_t)taosArrayGetSize(suidList), pReq->dnodeId);
885
        int32_t nStbs = taosArrayGetSize(suidList);
688✔
886
        if (!pDbInfo->pStbs && !(pDbInfo->pStbs = taosArrayInit(nStbs, sizeof(void *)))) {
688✔
887
          TSDB_CHECK_CODE(terrno, lino, _exit0);
×
888
        }
889
        for (int32_t i = 0; i < nStbs; ++i) {
2,752✔
890
          suid = *(tb_uid_t *)taosArrayGet(suidList, i);
2,064✔
891
          dInfo("mount:%s, vnode:%d, db:%" PRId64 ", stb suid:%" PRIu64 " on dnode:%d", pReq->mountName, pVgInfo->vgId,
2,064✔
892
                pVgInfo->dbId, suid, pReq->dnodeId);
893
          if ((code = metaReaderGetTableEntryByUidCache(&mr, suid)) < 0) {
2,064✔
894
            TSDB_CHECK_CODE(code, lino, _exit0);
×
895
          }
896
          if (mr.me.uid != suid || mr.me.type != TSDB_SUPER_TABLE ||
2,064✔
897
              mr.me.colCmpr.nCols != mr.me.stbEntry.schemaRow.nCols) {
2,064✔
898
            dError("mount:%s, vnode:%d, db:%" PRId64 ", stb info not match, suid:%" PRIu64 " expected:%" PRIu64
×
899
                   ", type:%" PRIi8 " expected:%d, nCmprCols:%d nCols:%d on dnode:%d",
900
                   pReq->mountName, pVgInfo->vgId, pVgInfo->dbId, mr.me.uid, suid, mr.me.type, TSDB_SUPER_TABLE,
901
                   mr.me.colCmpr.nCols, mr.me.stbEntry.schemaRow.nCols, pReq->dnodeId);
902
            TSDB_CHECK_CODE(TSDB_CODE_FILE_CORRUPTED, lino, _exit0);
×
903
          }
904
          SMountStbInfo stbInfo = {
2,064✔
905
              .req.source = TD_REQ_FROM_APP,
906
              .req.suid = suid,
907
              .req.colVer = mr.me.stbEntry.schemaRow.version,
2,064✔
908
              .req.tagVer = mr.me.stbEntry.schemaTag.version,
2,064✔
909
              .req.numOfColumns = mr.me.stbEntry.schemaRow.nCols,
2,064✔
910
              .req.numOfTags = mr.me.stbEntry.schemaTag.nCols,
2,064✔
911
              .req.virtualStb = TABLE_IS_VIRTUAL(mr.me.flags) ? 1 : 0,
2,064✔
912
          };
913
          snprintf(stbInfo.req.name, sizeof(stbInfo.req.name), "%s", mr.me.name);
2,064✔
914
          if (!pCols && !(pCols = taosArrayInit(stbInfo.req.numOfColumns, sizeof(SFieldWithOptions)))) {
2,064✔
915
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
916
          }
917
          if (!pTags && !(pTags = taosArrayInit(stbInfo.req.numOfTags, sizeof(SField)))) {
2,064✔
918
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
919
          }
920

921
          if (!pColExts && !(pColExts = taosArrayInit(stbInfo.req.numOfColumns, sizeof(col_id_t)))) {
2,064✔
922
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
923
          }
924
          if (!pTagExts && !(pTagExts = taosArrayInit(stbInfo.req.numOfTags, sizeof(col_id_t)))) {
2,064✔
925
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
926
          }
927
          taosArrayClear(pCols);
2,064✔
928
          taosArrayClear(pTags);
2,064✔
929
          taosArrayClear(pColExts);
2,064✔
930
          taosArrayClear(pTagExts);
2,064✔
931
          stbInfo.req.pColumns = pCols;
2,064✔
932
          stbInfo.req.pTags = pTags;
2,064✔
933
          stbInfo.pColExts = pColExts;
2,064✔
934
          stbInfo.pTagExts = pTagExts;
2,064✔
935

936
          for (int32_t c = 0; c < stbInfo.req.numOfColumns; ++c) {
12,384✔
937
            SSchema          *pSchema = mr.me.stbEntry.schemaRow.pSchema + c;
10,320✔
938
            SColCmpr         *pColComp = mr.me.colCmpr.pColCmpr + c;
10,320✔
939
            SFieldWithOptions col = {
10,320✔
940
                .type = pSchema->type,
10,320✔
941
                .flags = pSchema->flags,
10,320✔
942
                .bytes = pSchema->bytes,
10,320✔
943
                .compress = pColComp->alg,
10,320✔
944
            };
945
            (void)snprintf(col.name, sizeof(col.name), "%s", pSchema->name);
10,320✔
946
            if (pSchema->colId != pColComp->id) {
10,320✔
947
              TSDB_CHECK_CODE(TSDB_CODE_FILE_CORRUPTED, lino, _exit0);
×
948
            }
949
            if (mr.me.pExtSchemas) {
10,320✔
950
              col.typeMod = (mr.me.pExtSchemas + c)->typeMod;
×
951
            }
952
            TSDB_CHECK_NULL(taosArrayPush(pCols, &col), code, lino, _exit0, terrno);
10,320✔
953
            TSDB_CHECK_NULL(taosArrayPush(pColExts, &pSchema->colId), code, lino, _exit0, terrno);
20,640✔
954
          }
955
          for (int32_t t = 0; t < stbInfo.req.numOfTags; ++t) {
4,816✔
956
            SSchema *pSchema = mr.me.stbEntry.schemaTag.pSchema + t;
2,752✔
957
            SField   tag = {
2,752✔
958
                  .type = pSchema->type,
2,752✔
959
                  .flags = pSchema->flags,
2,752✔
960
                  .bytes = pSchema->bytes,
2,752✔
961
            };
962
            (void)snprintf(tag.name, sizeof(tag.name), "%s", pSchema->name);
2,752✔
963
            TSDB_CHECK_NULL(taosArrayPush(pTags, &tag), code, lino, _exit0, terrno);
2,752✔
964
            TSDB_CHECK_NULL(taosArrayPush(pTagExts, &pSchema->colId), code, lino, _exit0, terrno);
5,504✔
965
          }
966
          tDecoderClear(&mr.coder);
2,064✔
967

968
          // serialize the SMountStbInfo
969
          int32_t firstPartLen = 0;
2,064✔
970
          int32_t msgLen = tSerializeSMountStbInfo(NULL, 0, &firstPartLen, &stbInfo);
2,064✔
971
          if (msgLen <= 0) {
2,064✔
972
            TSDB_CHECK_CODE(msgLen < 0 ? msgLen : TSDB_CODE_INTERNAL_ERROR, lino, _exit0);
×
973
          }
974
          void *pBuf = taosMemoryMalloc((sizeof(int32_t) << 1) + msgLen);  // totalLen(4)|1stPartLen(4)|1stPart|2ndPart
2,064✔
975
          if (!pBuf) TSDB_CHECK_CODE(TSDB_CODE_OUT_OF_MEMORY, lino, _exit0);
2,064✔
976
          *(int32_t *)pBuf = (sizeof(int32_t) << 1) + msgLen;
2,064✔
977
          *(int32_t *)POINTER_SHIFT(pBuf, sizeof(int32_t)) = firstPartLen;
2,064✔
978
          if (tSerializeSMountStbInfo(POINTER_SHIFT(pBuf, (sizeof(int32_t) << 1)), msgLen, NULL, &stbInfo) <= 0) {
2,064✔
979
            taosMemoryFree(pBuf);
×
980
            TSDB_CHECK_CODE(msgLen < 0 ? msgLen : TSDB_CODE_INTERNAL_ERROR, lino, _exit0);
×
981
          }
982
          if (!taosArrayPush(pDbInfo->pStbs, &pBuf)) {
4,128✔
983
            taosMemoryFree(pBuf);
×
984
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
985
          }
986
        }
987
      _exit0:
688✔
988
        metaReaderClear(&mr);
688✔
989
        metaClose(&vnode.pMeta);
688✔
990
        TAOS_CHECK_EXIT(code);
688✔
991
      }
992
      break;  // retrieve stbs from one vnode is enough
688✔
993
    }
994
  }
995
_exit:
344✔
996
  if (code != 0) {
344✔
997
    dError("mount:%s, failed to retrieve mount stbs at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
998
           pReq->dnodeId, tstrerror(code), path);
999
  }
1000
  taosArrayDestroy(suidList);
344✔
1001
  taosArrayDestroy(pCols);
344✔
1002
  taosArrayDestroy(pTags);
344✔
1003
  taosArrayDestroy(pColExts);
344✔
1004
  taosArrayDestroy(pTagExts);
344✔
1005
  TAOS_RETURN(code);
344✔
1006
}
1007

1008
int32_t vmMountCheckRunning(const char *mountName, const char *mountPath, TdFilePtr *pFile, int32_t retryLimit) {
1,551✔
1009
  int32_t code = 0, lino = 0;
1,551✔
1010
  int32_t retryTimes = 0;
1,551✔
1011
  char    filepath[PATH_MAX] = {0};
1,551✔
1012
  (void)snprintf(filepath, sizeof(filepath), "%s%s.running", mountPath, TD_DIRSEP);
1,551✔
1013
  TSDB_CHECK_NULL((*pFile = taosOpenFile(
1,551✔
1014
                       filepath, TD_FILE_CREATE | TD_FILE_READ | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_CLOEXEC)),
1015
                  code, lino, _exit, terrno);
1016
  int32_t ret = 0;
1,551✔
1017
  do {
1018
    ret = taosLockFile(*pFile);
1,899✔
1019
    if (ret == 0) break;
1,899✔
1020
    taosMsleep(1000);
522✔
1021
    ++retryTimes;
522✔
1022
    dError("mount:%s, failed to lock file:%s since %s, retryTimes:%d", mountName, filepath, tstrerror(ret), retryTimes);
522✔
1023
  } while (retryTimes < retryLimit);
522✔
1024
  TAOS_CHECK_EXIT(ret);
1,551✔
1025
_exit:
1,551✔
1026
  if (code != 0) {
1,551✔
1027
    (void)taosCloseFile(pFile);
174✔
1028
    *pFile = NULL;
174✔
1029
    dError("mount:%s, failed to check running at line %d since %s, path:%s", mountName, lino, tstrerror(code),
174✔
1030
           filepath);
1031
  }
1032
  TAOS_RETURN(code);
1,551✔
1033
}
1034

1035
static int32_t vmRetrieveMountPreCheck(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
1,207✔
1036
  int32_t code = 0, lino = 0;
1,207✔
1037
  char    path[TSDB_MOUNT_FPATH_LEN] = {0};
1,207✔
1038
  TSDB_CHECK_CONDITION(taosCheckAccessFile(pReq->mountPath, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
1,207✔
1039
  TAOS_CHECK_EXIT(vmMountCheckRunning(pReq->mountName, pReq->mountPath, &pMountInfo->pFile, 3));
866✔
1040
  (void)snprintf(path, sizeof(path), "%s%s%s%sdnode.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP);
692✔
1041
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
692✔
1042
  (void)snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(MNODE));
518✔
1043
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
518✔
1044
  (void)snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
518✔
1045
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
518✔
1046
  (void)snprintf(path, sizeof(path), "%s%s%s%sconfig%slocal.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP,
518✔
1047
           TD_DIRSEP);
1048
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
518✔
1049
_exit:
1,207✔
1050
  if (code != 0) {
1,207✔
1051
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
689✔
1052
           pReq->dnodeId, tstrerror(code), path);
1053
  }
1054
  TAOS_RETURN(code);
1,207✔
1055
}
1056

1057
static int32_t vmRetrieveMountPathImpl(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, SRetrieveMountPathReq *pReq,
1,207✔
1058
                                       SMountInfo *pMountInfo) {
1059
  int32_t code = 0, lino = 0;
1,207✔
1060
  pMountInfo->dnodeId = pReq->dnodeId;
1,207✔
1061
  pMountInfo->mountUid = pReq->mountUid;
1,207✔
1062
  (void)snprintf(pMountInfo->mountName, sizeof(pMountInfo->mountName), "%s", pReq->mountName);
1,207✔
1063
  (void)snprintf(pMountInfo->mountPath, sizeof(pMountInfo->mountPath), "%s", pReq->mountPath);
1,207✔
1064
  pMountInfo->ignoreExist = pReq->ignoreExist;
1,207✔
1065
  pMountInfo->valLen = pReq->valLen;
1,207✔
1066
  pMountInfo->pVal = pReq->pVal;
1,207✔
1067
  TAOS_CHECK_EXIT(vmRetrieveMountPreCheck(pMgmt, pReq, pMountInfo));
1,207✔
1068
  TAOS_CHECK_EXIT(vmRetrieveMountDnode(pMgmt, pReq, pMountInfo));
518✔
1069
  TAOS_CHECK_EXIT(vmRetrieveMountVnodes(pMgmt, pReq, pMountInfo));
518✔
1070
  TAOS_CHECK_EXIT(vmRetrieveMountStbs(pMgmt, pReq, pMountInfo));
344✔
1071
_exit:
344✔
1072
  if (code != 0) {
1,207✔
1073
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
863✔
1074
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
1075
  }
1076
  TAOS_RETURN(code);
1,207✔
1077
}
1078

1079
int32_t vmProcessRetrieveMountPathReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
1,207✔
1080
  int32_t               code = 0, lino = 0;
1,207✔
1081
  int32_t               rspCode = 0;
1,207✔
1082
  SVnodeMgmt            vndMgmt = {0};
1,207✔
1083
  SMountInfo            mountInfo = {0};
1,207✔
1084
  void                 *pBuf = NULL;
1,207✔
1085
  int32_t               bufLen = 0;
1,207✔
1086
  SRetrieveMountPathReq req = {0};
1,207✔
1087

1088
  vndMgmt = *pMgmt;
1,207✔
1089
  vndMgmt.path = NULL;
1,207✔
1090
  TAOS_CHECK_GOTO(tDeserializeSRetrieveMountPathReq(pMsg->pCont, pMsg->contLen, &req), &lino, _end);
1,207✔
1091
  dInfo("mount:%s, start to retrieve path:%s", req.mountName, req.mountPath);
1,207✔
1092
  TAOS_CHECK_GOTO(vmRetrieveMountPathImpl(&vndMgmt, pMsg, &req, &mountInfo), &lino, _end);
1,207✔
1093
_end:
1,207✔
1094
  TSDB_CHECK_CONDITION((bufLen = tSerializeSMountInfo(NULL, 0, &mountInfo)) >= 0, rspCode, lino, _exit, bufLen);
1,207✔
1095
  TSDB_CHECK_CONDITION((pBuf = rpcMallocCont(bufLen)), rspCode, lino, _exit, terrno);
1,207✔
1096
  TSDB_CHECK_CONDITION((bufLen = tSerializeSMountInfo(pBuf, bufLen, &mountInfo)) >= 0, rspCode, lino, _exit, bufLen);
1,207✔
1097
  pMsg->info.rsp = pBuf;
1,207✔
1098
  pMsg->info.rspLen = bufLen;
1,207✔
1099
_exit:
1,207✔
1100
  if (rspCode != 0) {
1,207✔
1101
    // corner case: if occurs, the client will not receive the response, and the client should be killed manually
1102
    dError("mount:%s, failed to retrieve mount at line %d since %s, dnode:%d, path:%s", req.mountName, lino,
×
1103
           tstrerror(rspCode), req.dnodeId, req.mountPath);
1104
    rpcFreeCont(pBuf);
×
1105
    code = rspCode;
×
1106
  } else if (code != 0) {
1,207✔
1107
    // the client would receive the response with error msg
1108
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", req.mountName, lino,
863✔
1109
           req.dnodeId, tstrerror(code), req.mountPath);
1110
  } else {
1111
    int32_t nVgs = 0;
344✔
1112
    int32_t nDbs = taosArrayGetSize(mountInfo.pDbs);
344✔
1113
    for (int32_t i = 0; i < nDbs; ++i) {
1,032✔
1114
      SMountDbInfo *pDb = TARRAY_GET_ELEM(mountInfo.pDbs, i);
688✔
1115
      nVgs += taosArrayGetSize(pDb->pVgs);
688✔
1116
    }
1117
    dInfo("mount:%s, success to retrieve mount, nDbs:%d, nVgs:%d, path:%s", req.mountName, nDbs, nVgs, req.mountPath);
344✔
1118
  }
1119
  taosMemFreeClear(vndMgmt.path);
1,207✔
1120
  tFreeMountInfo(&mountInfo, false);
1,207✔
1121
  TAOS_RETURN(code);
1,207✔
1122
}
1123

1124
static int32_t vmMountVnode(SVnodeMgmt *pMgmt, const char *path, SVnodeCfg *pCfg, int32_t diskPrimary,
1,376✔
1125
                            SMountVnodeReq *req, STfs *pMountTfs) {
1126
  int32_t    code = 0;
1,376✔
1127
  SVnodeInfo info = {0};
1,376✔
1128
  char       hostDir[TSDB_FILENAME_LEN] = {0};
1,376✔
1129
  char       mountDir[TSDB_FILENAME_LEN] = {0};
1,376✔
1130
  char       mountVnode[32] = {0};
1,376✔
1131

1132
  if ((code = vnodeCheckCfg(pCfg)) < 0) {
1,376✔
1133
    vError("vgId:%d, mount:%s, failed to mount vnode since:%s", pCfg->vgId, req->mountName, tstrerror(code));
×
1134
    return code;
×
1135
  }
1136

1137
  vnodeGetPrimaryDir(path, 0, pMgmt->pTfs, hostDir, TSDB_FILENAME_LEN);
1,376✔
1138
  if ((code = taosMkDir(hostDir))) {
1,376✔
1139
    vError("vgId:%d, mount:%s, failed to prepare vnode dir since %s, host path: %s", pCfg->vgId, req->mountName,
×
1140
           tstrerror(code), hostDir);
1141
    return code;
×
1142
  }
1143

1144
  info.config = *pCfg;  // copy the config
1,376✔
1145
  info.state.committed = req->committed;
1,376✔
1146
  info.state.commitID = req->commitID;
1,376✔
1147
  info.state.commitTerm = req->commitTerm;
1,376✔
1148
  info.state.applied = req->committed;
1,376✔
1149
  info.state.applyTerm = req->commitTerm;
1,376✔
1150
  info.config.vndStats.numOfSTables = req->numOfSTables;
1,376✔
1151
  info.config.vndStats.numOfCTables = req->numOfCTables;
1,376✔
1152
  info.config.vndStats.numOfNTables = req->numOfNTables;
1,376✔
1153

1154
  SVnodeInfo oldInfo = {0};
1,376✔
1155
  oldInfo.config = vnodeCfgDefault;
1,376✔
1156
  if (vnodeLoadInfo(hostDir, &oldInfo) == 0) {
1,376✔
1157
    if (oldInfo.config.dbId != info.config.dbId) {
×
1158
      code = TSDB_CODE_VND_ALREADY_EXIST_BUT_NOT_MATCH;
×
1159
      vError("vgId:%d, mount:%s, vnode config info already exists at %s. oldDbId:%" PRId64 "(%s) at cluster:%" PRId64
×
1160
             ", newDbId:%" PRId64 "(%s) at cluser:%" PRId64 ", code:%s",
1161
             oldInfo.config.vgId, req->mountName, hostDir, oldInfo.config.dbId, oldInfo.config.dbname,
1162
             oldInfo.config.syncCfg.nodeInfo[oldInfo.config.syncCfg.myIndex].clusterId, info.config.dbId,
1163
             info.config.dbname, info.config.syncCfg.nodeInfo[info.config.syncCfg.myIndex].clusterId, tstrerror(code));
1164

1165
    } else {
1166
      vWarn("vgId:%d, mount:%s, vnode config info already exists at %s.", oldInfo.config.vgId, req->mountName, hostDir);
×
1167
    }
1168
    return code;
×
1169
  }
1170

1171
  char hostSubDir[TSDB_FILENAME_LEN] = {0};
1,376✔
1172
  char mountSubDir[TSDB_FILENAME_LEN] = {0};
1,376✔
1173
  (void)snprintf(mountVnode, sizeof(mountVnode), "vnode%svnode%d", TD_DIRSEP, req->mountVgId);
1,376✔
1174
  vnodeGetPrimaryDir(mountVnode, diskPrimary, pMountTfs, mountDir, TSDB_FILENAME_LEN);
1,376✔
1175
  static const char *vndSubDirs[] = {"meta", "sync", "tq", "tsdb", "wal"};
1176
  for (int32_t i = 0; i < tListLen(vndSubDirs); ++i) {
8,256✔
1177
    (void)snprintf(hostSubDir, sizeof(hostSubDir), "%s%s%s", hostDir, TD_DIRSEP, vndSubDirs[i]);
6,880✔
1178
    (void)snprintf(mountSubDir, sizeof(mountSubDir), "%s%s%s", mountDir, TD_DIRSEP, vndSubDirs[i]);
6,880✔
1179
    if ((code = taosSymLink(mountSubDir, hostSubDir)) != 0) {
6,880✔
1180
      vError("vgId:%d, mount:%s, failed to create vnode symlink %s -> %s since %s", info.config.vgId, req->mountName,
×
1181
             mountSubDir, hostSubDir, tstrerror(code));
1182
      return code;
×
1183
    }
1184
  }
1185
  vInfo("vgId:%d, mount:save vnode config while create", info.config.vgId);
1,376✔
1186
  if ((code = vnodeSaveInfo(hostDir, &info)) < 0 || (code = vnodeCommitInfo(hostDir)) < 0) {
1,376✔
1187
    vError("vgId:%d, mount:%s, failed to save vnode config since %s, mount path: %s", pCfg ? pCfg->vgId : 0,
×
1188
           req->mountName, tstrerror(code), hostDir);
1189
    return code;
×
1190
  }
1191
  vInfo("vgId:%d, mount:%s, vnode is mounted from %s to %s", info.config.vgId, req->mountName, mountDir, hostDir);
1,376✔
1192
  return 0;
1,376✔
1193
}
1194

1195
int32_t vmProcessMountVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
1,376✔
1196
  int32_t          code = 0, lino = 0;
1,376✔
1197
  SMountVnodeReq   req = {0};
1,376✔
1198
  SCreateVnodeReq *pCreateReq = &req.createReq;
1,376✔
1199
  SVnodeCfg        vnodeCfg = {0};
1,376✔
1200
  SWrapperCfg      wrapperCfg = {0};
1,376✔
1201
  SVnode          *pImpl = NULL;
1,376✔
1202
  STfs            *pMountTfs = NULL;
1,376✔
1203
  char             path[TSDB_FILENAME_LEN] = {0};
1,376✔
1204
  bool             releaseTfs = false;
1,376✔
1205

1206
  if (tDeserializeSMountVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
1,376✔
1207
    dError("vgId:%d, failed to mount vnode since deserialize request error", pCreateReq->vgId);
×
1208
    return TSDB_CODE_INVALID_MSG;
×
1209
  }
1210

1211
  if (pCreateReq->learnerReplica == 0) {
1,376✔
1212
    pCreateReq->learnerSelfIndex = -1;
1,376✔
1213
  }
1214
  for (int32_t i = 0; i < pCreateReq->replica; ++i) {
2,752✔
1215
    dInfo("mount:%s, vgId:%d, replica:%d ep:%s:%u dnode:%d", req.mountName, pCreateReq->vgId, i,
1,376✔
1216
          pCreateReq->replicas[i].fqdn, pCreateReq->replicas[i].port, pCreateReq->replicas[i].id);
1217
  }
1218
  for (int32_t i = 0; i < pCreateReq->learnerReplica; ++i) {
1,376✔
1219
    dInfo("mount:%s, vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.mountName, pCreateReq->vgId, i,
×
1220
          pCreateReq->learnerReplicas[i].fqdn, pCreateReq->learnerReplicas[i].port, pCreateReq->replicas[i].id);
1221
  }
1222

1223
  SReplica *pReplica = NULL;
1,376✔
1224
  if (pCreateReq->selfIndex != -1) {
1,376✔
1225
    pReplica = &pCreateReq->replicas[pCreateReq->selfIndex];
1,376✔
1226
  } else {
1227
    pReplica = &pCreateReq->learnerReplicas[pCreateReq->learnerSelfIndex];
×
1228
  }
1229
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
1,376✔
1230
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
1,376✔
1231
    (void)tFreeSMountVnodeReq(&req);
×
1232
    code = TSDB_CODE_INVALID_MSG;
×
1233
    dError("mount:%s, vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode, reason:%s", req.mountName,
×
1234
           pCreateReq->vgId, pReplica->id, pReplica->fqdn, pReplica->port, tstrerror(code));
1235
    return code;
×
1236
  }
1237
  
1238
  if (taosWaitCfgKeyLoaded() != 0) {
1,376✔
1239
    (void)tFreeSMountVnodeReq(&req);
×
1240
    code = terrno;
×
1241
    dError("mount:%s, vgId:%d, failed to create vnode since encrypt key is not loaded, reason:%s", req.mountName,
×
1242
           pCreateReq->vgId, tstrerror(code));
1243
    return code;
×
1244
  }
1245

1246
  vmGenerateVnodeCfg(pCreateReq, &vnodeCfg);
1,376✔
1247
  vnodeCfg.mountVgId = req.mountVgId;
1,376✔
1248
  vmGenerateWrapperCfg(pMgmt, pCreateReq, &wrapperCfg);
1,376✔
1249
  wrapperCfg.mountId = req.mountId;
1,376✔
1250

1251
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, pCreateReq->vgId, false);
1,376✔
1252
  if (pVnode != NULL && (pCreateReq->replica == 1 || !pVnode->failed)) {
1,376✔
1253
    dError("mount:%s, vgId:%d, already exist", req.mountName, pCreateReq->vgId);
×
1254
    (void)tFreeSMountVnodeReq(&req);
×
1255
    vmReleaseVnode(pMgmt, pVnode);
×
1256
    code = TSDB_CODE_VND_ALREADY_EXIST;
×
1257
    return 0;
×
1258
  }
1259
  vmReleaseVnode(pMgmt, pVnode);
1,376✔
1260

1261
  wrapperCfg.diskPrimary = req.diskPrimary;
1,376✔
1262
  (void)snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
1,376✔
1263
  TAOS_CHECK_EXIT(vmAcquireMountTfs(pMgmt, req.mountId, req.mountName, req.mountPath, &pMountTfs));
1,376✔
1264
  releaseTfs = true;
1,376✔
1265

1266
  TAOS_CHECK_EXIT(vmMountVnode(pMgmt, path, &vnodeCfg, wrapperCfg.diskPrimary, &req, pMountTfs));
1,376✔
1267
  if (!(pImpl = vnodeOpen(path, 0, pMgmt->pTfs, pMountTfs, pMgmt->msgCb, true))) {
1,376✔
1268
    TAOS_CHECK_EXIT(terrno != 0 ? terrno : -1);
×
1269
  }
1270
  if ((code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl)) != 0) {
1,376✔
1271
    TAOS_CHECK_EXIT(terrno != 0 ? terrno : code);
×
1272
  }
1273
  TAOS_CHECK_EXIT(vnodeStart(pImpl));
1,376✔
1274
  TAOS_CHECK_EXIT(vmWriteVnodeListToFile(pMgmt));
1,376✔
1275
  TAOS_CHECK_EXIT(vmWriteMountListToFile(pMgmt));
1,376✔
1276
_exit:
1,376✔
1277
  vmCleanPrimaryDisk(pMgmt, pCreateReq->vgId);
1,376✔
1278
  if (code != 0) {
1,376✔
1279
    dError("mount:%s, vgId:%d, msgType:%s, failed at line %d to mount vnode since %s", req.mountName, pCreateReq->vgId,
×
1280
           TMSG_INFO(pMsg->msgType), lino, tstrerror(code));
1281
    vmCloseFailedVnode(pMgmt, pCreateReq->vgId);
×
1282
    vnodeClose(pImpl);
×
1283
    vnodeDestroy(0, path, pMgmt->pTfs, 0);
×
1284
    if (releaseTfs) vmReleaseMountTfs(pMgmt, req.mountId, 1);
×
1285
  } else {
1286
    dInfo("mount:%s, vgId:%d, msgType:%s, success to mount vnode", req.mountName, pCreateReq->vgId,
1,376✔
1287
          TMSG_INFO(pMsg->msgType));
1288
  }
1289

1290
  pMsg->code = code;
1,376✔
1291
  pMsg->info.rsp = NULL;
1,376✔
1292
  pMsg->info.rspLen = 0;
1,376✔
1293

1294
  (void)tFreeSMountVnodeReq(&req);
1,376✔
1295
  TAOS_RETURN(code);
1,376✔
1296
}
1297
#endif  // USE_MOUNT
1298

1299
// alter replica doesn't use this, but restore dnode still use this
1300
int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
2,346,823✔
1301
  SAlterVnodeTypeReq req = {0};
2,346,823✔
1302
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
2,346,823✔
1303
    terrno = TSDB_CODE_INVALID_MSG;
×
1304
    return -1;
×
1305
  }
1306

1307
  if (req.learnerReplicas == 0) {
1308
    req.learnerSelfIndex = -1;
1309
  }
1310

1311
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process alter-node-type-request", req.vgId,
2,346,823✔
1312
        TMSG_INFO(pMsg->msgType));
1313

1314
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
2,346,823✔
1315
  if (pVnode == NULL) {
2,346,823✔
1316
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
1317
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1318
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1319
    return -1;
×
1320
  }
1321

1322
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
2,346,823✔
1323
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
2,346,823✔
1324
  if (role == TAOS_SYNC_ROLE_VOTER) {
2,346,823✔
1325
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
1326
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
1327
    vmReleaseVnode(pMgmt, pVnode);
×
1328
    return -1;
×
1329
  }
1330

1331
  dInfo("vgId:%d, checking node catch up", req.vgId);
2,346,823✔
1332
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
2,346,823✔
1333
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
2,237,525✔
1334
    vmReleaseVnode(pMgmt, pVnode);
2,237,525✔
1335
    return -1;
2,237,525✔
1336
  }
1337

1338
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
109,298✔
1339

1340
  int32_t vgId = req.vgId;
109,298✔
1341
  dInfo("vgId:%d, start to alter vnode type replica:%d selfIndex:%d strict:%d changeVersion:%d", vgId, req.replica,
109,298✔
1342
        req.selfIndex, req.strict, req.changeVersion);
1343
  for (int32_t i = 0; i < req.replica; ++i) {
436,581✔
1344
    SReplica *pReplica = &req.replicas[i];
327,283✔
1345
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
327,283✔
1346
  }
1347
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
109,298✔
1348
    SReplica *pReplica = &req.learnerReplicas[i];
×
1349
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
×
1350
  }
1351

1352
  if (req.replica <= 0 || (req.selfIndex < 0 && req.learnerSelfIndex < 0) || req.selfIndex >= req.replica ||
109,298✔
1353
      req.learnerSelfIndex >= req.learnerReplica) {
109,298✔
1354
    terrno = TSDB_CODE_INVALID_MSG;
×
1355
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
1356
    vmReleaseVnode(pMgmt, pVnode);
×
1357
    return -1;
×
1358
  }
1359

1360
  SReplica *pReplica = NULL;
109,298✔
1361
  if (req.selfIndex != -1) {
109,298✔
1362
    pReplica = &req.replicas[req.selfIndex];
109,298✔
1363
  } else {
1364
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
×
1365
  }
1366

1367
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
109,298✔
1368
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
109,298✔
1369
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1370
    dError("vgId:%d, dnodeId:%d ep:%s:%u in request, ep:%s:%u in local, %s", vgId, pReplica->id, pReplica->fqdn,
×
1371
           pReplica->port, tsLocalFqdn, tsServerPort, tstrerror(terrno));
1372
    vmReleaseVnode(pMgmt, pVnode);
×
1373
    return -1;
×
1374
  }
1375

1376
  dInfo("vgId:%d, start to close vnode", vgId);
109,298✔
1377
  SWrapperCfg wrapperCfg = {
109,298✔
1378
      .dropped = pVnode->dropped,
109,298✔
1379
      .vgId = pVnode->vgId,
109,298✔
1380
      .vgVersion = pVnode->vgVersion,
109,298✔
1381
      .diskPrimary = pVnode->diskPrimary,
109,298✔
1382
  };
1383
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
109,298✔
1384

1385
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
109,298✔
1386
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
109,298✔
1387

1388
  int32_t diskPrimary = wrapperCfg.diskPrimary;
109,298✔
1389
  char    path[TSDB_FILENAME_LEN] = {0};
109,298✔
1390
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
109,298✔
1391

1392
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
109,298✔
1393
  if (vnodeAlterReplica(path, &req, diskPrimary, pMgmt->pTfs) < 0) {
109,298✔
1394
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
1395
    return -1;
×
1396
  }
1397

1398
  dInfo("vgId:%d, begin to open vnode", vgId);
109,298✔
1399
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
109,298✔
1400
  if (pImpl == NULL) {
109,298✔
1401
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
1402
    return -1;
×
1403
  }
1404

1405
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
109,298✔
1406
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
1407
    return -1;
×
1408
  }
1409

1410
  if (vnodeStart(pImpl) != 0) {
109,298✔
1411
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
1412
    return -1;
×
1413
  }
1414

1415
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process alter-node-type-request, vnode config is altered",
109,298✔
1416
        req.vgId, TMSG_INFO(pMsg->msgType));
1417
  return 0;
109,298✔
1418
}
1419

1420
int32_t vmProcessCheckLearnCatchupReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
1421
  SCheckLearnCatchupReq req = {0};
×
1422
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
×
1423
    terrno = TSDB_CODE_INVALID_MSG;
×
1424
    return -1;
×
1425
  }
1426

1427
  if (req.learnerReplicas == 0) {
1428
    req.learnerSelfIndex = -1;
1429
  }
1430

1431
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process check-learner-catchup-request", req.vgId,
×
1432
        TMSG_INFO(pMsg->msgType));
1433

1434
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
×
1435
  if (pVnode == NULL) {
×
1436
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
1437
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1438
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1439
    return -1;
×
1440
  }
1441

1442
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
×
1443
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
×
1444
  if (role == TAOS_SYNC_ROLE_VOTER) {
×
1445
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
1446
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
1447
    vmReleaseVnode(pMgmt, pVnode);
×
1448
    return -1;
×
1449
  }
1450

1451
  dInfo("vgId:%d, checking node catch up", req.vgId);
×
1452
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
×
1453
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
×
1454
    vmReleaseVnode(pMgmt, pVnode);
×
1455
    return -1;
×
1456
  }
1457

1458
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
×
1459

1460
  vmReleaseVnode(pMgmt, pVnode);
×
1461

1462
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process check-learner-catchup-request", req.vgId,
×
1463
        TMSG_INFO(pMsg->msgType));
1464

1465
  return 0;
×
1466
}
1467

1468
int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
29,820✔
1469
  SDisableVnodeWriteReq req = {0};
29,820✔
1470
  if (tDeserializeSDisableVnodeWriteReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
29,820✔
1471
    terrno = TSDB_CODE_INVALID_MSG;
×
1472
    return -1;
×
1473
  }
1474

1475
  dInfo("vgId:%d, vnode write disable:%d", req.vgId, req.disable);
29,820✔
1476

1477
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
29,820✔
1478
  if (pVnode == NULL) {
29,820✔
1479
    dError("vgId:%d, failed to disable write since %s", req.vgId, terrstr());
×
1480
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1481
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1482
    return -1;
×
1483
  }
1484

1485
  pVnode->disable = req.disable;
29,820✔
1486
  vmReleaseVnode(pMgmt, pVnode);
29,820✔
1487
  return 0;
29,820✔
1488
}
1489

1490
int32_t vmProcessSetKeepVersionReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
3,684✔
1491
  SMsgHead *pHead = pMsg->pCont;
3,684✔
1492
  pHead->contLen = ntohl(pHead->contLen);
3,684✔
1493
  pHead->vgId = ntohl(pHead->vgId);
3,684✔
1494

1495
  SVndSetKeepVersionReq req = {0};
3,684✔
1496
  if (tDeserializeSVndSetKeepVersionReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead),
3,684✔
1497
                                        &req) != 0) {
1498
    terrno = TSDB_CODE_INVALID_MSG;
×
1499
    return -1;
×
1500
  }
1501

1502
  dInfo("vgId:%d, set wal keep version to %" PRId64, pHead->vgId, req.keepVersion);
3,684✔
1503

1504
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
3,684✔
1505
  if (pVnode == NULL) {
3,684✔
1506
    dError("vgId:%d, failed to set keep version since %s", pHead->vgId, terrstr());
×
1507
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1508
    return -1;
×
1509
  }
1510

1511
  // Directly call vnodeSetWalKeepVersion for immediate effect (< 1ms)
1512
  // This bypasses Raft to avoid timing issues where WAL might be deleted
1513
  // before keepVersion is set through the Raft consensus process
1514
  int32_t code = vnodeSetWalKeepVersion(pVnode->pImpl, req.keepVersion);
3,684✔
1515
  if (code != TSDB_CODE_SUCCESS) {
3,684✔
1516
    dError("vgId:%d, failed to set keepVersion to %" PRId64 " since %s", pHead->vgId, req.keepVersion, tstrerror(code));
×
1517
    terrno = code;
×
1518
    vmReleaseVnode(pMgmt, pVnode);
×
1519
    return -1;
×
1520
  }
1521

1522
  dInfo("vgId:%d, successfully set keepVersion to %" PRId64, pHead->vgId, req.keepVersion);
3,684✔
1523

1524
  vmReleaseVnode(pMgmt, pVnode);
3,684✔
1525
  return 0;
3,684✔
1526
}
1527

1528
int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
29,023✔
1529
  SAlterVnodeHashRangeReq req = {0};
29,023✔
1530
  if (tDeserializeSAlterVnodeHashRangeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
29,023✔
1531
    terrno = TSDB_CODE_INVALID_MSG;
×
1532
    return -1;
×
1533
  }
1534

1535
  int32_t srcVgId = req.srcVgId;
29,023✔
1536
  int32_t dstVgId = req.dstVgId;
29,023✔
1537

1538
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, dstVgId);
29,023✔
1539
  if (pVnode != NULL) {
29,023✔
1540
    dError("vgId:%d, vnode already exist", dstVgId);
×
1541
    vmReleaseVnode(pMgmt, pVnode);
×
1542
    terrno = TSDB_CODE_VND_ALREADY_EXIST;
×
1543
    return -1;
×
1544
  }
1545

1546
  dInfo("vgId:%d, start to alter vnode hashrange:[%u, %u], dstVgId:%d", req.srcVgId, req.hashBegin, req.hashEnd,
29,023✔
1547
        req.dstVgId);
1548
  pVnode = vmAcquireVnode(pMgmt, srcVgId);
29,023✔
1549
  if (pVnode == NULL) {
29,023✔
1550
    dError("vgId:%d, failed to alter hashrange since %s", srcVgId, terrstr());
×
1551
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1552
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1553
    return -1;
×
1554
  }
1555

1556
  SWrapperCfg wrapperCfg = {
29,023✔
1557
      .dropped = pVnode->dropped,
29,023✔
1558
      .vgId = dstVgId,
1559
      .vgVersion = pVnode->vgVersion,
29,023✔
1560
      .diskPrimary = pVnode->diskPrimary,
29,023✔
1561
  };
1562
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
29,023✔
1563

1564
  // prepare alter
1565
  pVnode->toVgId = dstVgId;
29,023✔
1566
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
29,023✔
1567
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
1568
    return -1;
×
1569
  }
1570

1571
  dInfo("vgId:%d, close vnode", srcVgId);
29,023✔
1572
  vmCloseVnode(pMgmt, pVnode, true, false);
29,023✔
1573

1574
  int32_t diskPrimary = wrapperCfg.diskPrimary;
29,023✔
1575
  char    srcPath[TSDB_FILENAME_LEN] = {0};
29,023✔
1576
  char    dstPath[TSDB_FILENAME_LEN] = {0};
29,023✔
1577
  snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
29,023✔
1578
  snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
29,023✔
1579

1580
  dInfo("vgId:%d, alter vnode hashrange at %s", srcVgId, srcPath);
29,023✔
1581
  if (vnodeAlterHashRange(srcPath, dstPath, &req, diskPrimary, pMgmt->pTfs) < 0) {
29,023✔
1582
    dError("vgId:%d, failed to alter vnode hashrange since %s", srcVgId, terrstr());
×
1583
    return -1;
×
1584
  }
1585

1586
  dInfo("vgId:%d, open vnode", dstVgId);
29,023✔
1587
  SVnode *pImpl = vnodeOpen(dstPath, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
29,023✔
1588

1589
  if (pImpl == NULL) {
29,023✔
1590
    dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr());
×
1591
    return -1;
×
1592
  }
1593

1594
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
29,023✔
1595
    dError("vgId:%d, failed to open vnode mgmt since %s", dstVgId, terrstr());
×
1596
    return -1;
×
1597
  }
1598

1599
  if (vnodeStart(pImpl) != 0) {
29,023✔
1600
    dError("vgId:%d, failed to start sync since %s", dstVgId, terrstr());
×
1601
    return -1;
×
1602
  }
1603

1604
  // complete alter
1605
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
29,023✔
1606
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
1607
    return -1;
×
1608
  }
1609

1610
  dInfo("vgId:%d, vnode hashrange is altered", dstVgId);
29,023✔
1611
  return 0;
29,023✔
1612
}
1613

1614
int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
754,879✔
1615
  SAlterVnodeReplicaReq alterReq = {0};
754,879✔
1616
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
754,879✔
1617
    terrno = TSDB_CODE_INVALID_MSG;
×
1618
    return -1;
×
1619
  }
1620

1621
  if (alterReq.learnerReplica == 0) {
754,879✔
1622
    alterReq.learnerSelfIndex = -1;
540,665✔
1623
  }
1624

1625
  int32_t vgId = alterReq.vgId;
754,879✔
1626
  dInfo(
754,879✔
1627
      "vgId:%d, vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
1628
      "learnerSelfIndex:%d strict:%d changeVersion:%d",
1629
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
1630
      alterReq.learnerSelfIndex, alterReq.strict, alterReq.changeVersion);
1631

1632
  for (int32_t i = 0; i < alterReq.replica; ++i) {
2,941,847✔
1633
    SReplica *pReplica = &alterReq.replicas[i];
2,186,968✔
1634
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
2,186,968✔
1635
  }
1636
  for (int32_t i = 0; i < alterReq.learnerReplica; ++i) {
969,093✔
1637
    SReplica *pReplica = &alterReq.learnerReplicas[i];
214,214✔
1638
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
214,214✔
1639
  }
1640

1641
  if (alterReq.replica <= 0 || (alterReq.selfIndex < 0 && alterReq.learnerSelfIndex < 0) ||
754,879✔
1642
      alterReq.selfIndex >= alterReq.replica || alterReq.learnerSelfIndex >= alterReq.learnerReplica) {
754,879✔
1643
    terrno = TSDB_CODE_INVALID_MSG;
×
1644
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
1645
    return -1;
×
1646
  }
1647

1648
  SReplica *pReplica = NULL;
754,879✔
1649
  if (alterReq.selfIndex != -1) {
754,879✔
1650
    pReplica = &alterReq.replicas[alterReq.selfIndex];
754,879✔
1651
  } else {
1652
    pReplica = &alterReq.learnerReplicas[alterReq.learnerSelfIndex];
×
1653
  }
1654

1655
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
754,879✔
1656
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
754,879✔
1657
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1658
    dError("vgId:%d, dnodeId:%d ep:%s:%u in request, ep:%s:%u in lcoal, %s", vgId, pReplica->id, pReplica->fqdn,
×
1659
           pReplica->port, tsLocalFqdn, tsServerPort, tstrerror(terrno));
1660
    return -1;
×
1661
  }
1662

1663
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
754,879✔
1664
  if (pVnode == NULL) {
754,879✔
1665
    dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
×
1666
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1667
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1668
    return -1;
×
1669
  }
1670

1671
  dInfo("vgId:%d, start to close vnode", vgId);
754,879✔
1672
  SWrapperCfg wrapperCfg = {
754,879✔
1673
      .dropped = pVnode->dropped,
754,879✔
1674
      .vgId = pVnode->vgId,
754,879✔
1675
      .vgVersion = pVnode->vgVersion,
754,879✔
1676
      .diskPrimary = pVnode->diskPrimary,
754,879✔
1677
  };
1678
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
754,879✔
1679

1680
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
754,879✔
1681
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
754,879✔
1682

1683
  int32_t diskPrimary = wrapperCfg.diskPrimary;
754,879✔
1684
  char    path[TSDB_FILENAME_LEN] = {0};
754,879✔
1685
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
754,879✔
1686

1687
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
754,879✔
1688
  if (vnodeAlterReplica(path, &alterReq, diskPrimary, pMgmt->pTfs) < 0) {
754,879✔
1689
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
1690
    return -1;
×
1691
  }
1692

1693
  dInfo("vgId:%d, begin to open vnode", vgId);
754,879✔
1694
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
754,879✔
1695
  if (pImpl == NULL) {
754,879✔
1696
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
1697
    return -1;
×
1698
  }
1699

1700
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
754,879✔
1701
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
1702
    return -1;
×
1703
  }
1704

1705
  if (vnodeStart(pImpl) != 0) {
754,879✔
1706
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
1707
    return -1;
×
1708
  }
1709

1710
  dInfo(
754,879✔
1711
      "vgId:%d, vnode management handle msgType:%s, end to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
1712
      "learnerSelfIndex:%d strict:%d",
1713
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
1714
      alterReq.learnerSelfIndex, alterReq.strict);
1715
  return 0;
754,879✔
1716
}
1717

1718
int32_t vmProcessAlterVnodeElectBaselineReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
33,210✔
1719
  SAlterVnodeElectBaselineReq alterReq = {0};
33,210✔
1720
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
33,210✔
1721
    return TSDB_CODE_INVALID_MSG;
×
1722
  }
1723

1724
  int32_t vgId = alterReq.vgId;
33,210✔
1725
  dInfo(
33,210✔
1726
      "vgId:%d, process alter vnode elect-base-line msgType:%s, electBaseLine:%d",
1727
      vgId, TMSG_INFO(pMsg->msgType), alterReq.electBaseLine);
1728

1729
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
33,210✔
1730
  if (pVnode == NULL) {
33,210✔
1731
    dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
×
1732
    return terrno;
×
1733
  }
1734

1735
  if(vnodeSetElectBaseline(pVnode->pImpl, alterReq.electBaseLine) != 0){
33,210✔
1736
    vmReleaseVnode(pMgmt, pVnode);
×
1737
    return -1;
×
1738
  }
1739

1740
  vmReleaseVnode(pMgmt, pVnode);
33,210✔
1741
  return 0;
33,210✔
1742
}
1743

1744
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
1,903,671✔
1745
  int32_t       code = 0;
1,903,671✔
1746
  SDropVnodeReq dropReq = {0};
1,903,671✔
1747
  if (tDeserializeSDropVnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
1,903,671✔
1748
    terrno = TSDB_CODE_INVALID_MSG;
×
1749
    return terrno;
×
1750
  }
1751

1752
  int32_t vgId = dropReq.vgId;
1,903,671✔
1753
  dInfo("vgId:%d, start to drop vnode", vgId);
1,903,671✔
1754

1755
  if (dropReq.dnodeId != pMgmt->pData->dnodeId) {
1,903,671✔
1756
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1757
    dError("vgId:%d, dnodeId:%d, %s", dropReq.vgId, dropReq.dnodeId, tstrerror(terrno));
×
1758
    return terrno;
×
1759
  }
1760

1761
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, vgId, false);
1,903,671✔
1762
  if (pVnode == NULL) {
1,903,671✔
1763
    dInfo("vgId:%d, failed to drop since %s", vgId, terrstr());
×
1764
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1765
    return terrno;
×
1766
  }
1767

1768
  pVnode->dropped = 1;
1,903,671✔
1769
  if ((code = vmWriteVnodeListToFile(pMgmt)) != 0) {
1,903,671✔
1770
    pVnode->dropped = 0;
×
1771
    vmReleaseVnode(pMgmt, pVnode);
×
1772
    return code;
×
1773
  }
1774

1775
  vmCloseVnode(pMgmt, pVnode, false, false);
1,903,671✔
1776
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
1,903,671✔
1777
    dError("vgId:%d, failed to write vnode list since %s", vgId, terrstr());
×
1778
  }
1779

1780
  dInfo("vgId:%d, is dropped", vgId);
1,903,671✔
1781
  return 0;
1,903,671✔
1782
}
1783

1784
int32_t vmProcessArbHeartBeatReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
125,932✔
1785
  SVArbHeartBeatReq arbHbReq = {0};
125,932✔
1786
  SVArbHeartBeatRsp arbHbRsp = {0};
125,932✔
1787
  if (tDeserializeSVArbHeartBeatReq(pMsg->pCont, pMsg->contLen, &arbHbReq) != 0) {
125,932✔
1788
    terrno = TSDB_CODE_INVALID_MSG;
×
1789
    return -1;
×
1790
  }
1791

1792
  if (arbHbReq.dnodeId != pMgmt->pData->dnodeId) {
125,932✔
1793
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1794
    dError("dnodeId:%d, %s", arbHbReq.dnodeId, tstrerror(terrno));
×
1795
    goto _OVER;
×
1796
  }
1797

1798
  if (strlen(arbHbReq.arbToken) == 0) {
125,932✔
1799
    terrno = TSDB_CODE_INVALID_MSG;
×
1800
    dError("dnodeId:%d arbToken is empty", arbHbReq.dnodeId);
×
1801
    goto _OVER;
×
1802
  }
1803

1804
  size_t size = taosArrayGetSize(arbHbReq.hbMembers);
125,932✔
1805

1806
  arbHbRsp.dnodeId = pMgmt->pData->dnodeId;
125,932✔
1807
  tstrncpy(arbHbRsp.arbToken, arbHbReq.arbToken, TSDB_ARB_TOKEN_SIZE);
125,932✔
1808
  arbHbRsp.hbMembers = taosArrayInit(size, sizeof(SVArbHbRspMember));
125,932✔
1809
  if (arbHbRsp.hbMembers == NULL) {
125,932✔
1810
    goto _OVER;
×
1811
  }
1812

1813
  for (int32_t i = 0; i < size; i++) {
264,209✔
1814
    SVArbHbReqMember *pReqMember = taosArrayGet(arbHbReq.hbMembers, i);
138,277✔
1815
    SVnodeObj        *pVnode = vmAcquireVnode(pMgmt, pReqMember->vgId);
138,277✔
1816
    if (pVnode == NULL) {
138,277✔
1817
      dError("dnodeId:%d vgId:%d not found failed to process arb hb req", arbHbReq.dnodeId, pReqMember->vgId);
22,288✔
1818
      continue;
22,288✔
1819
    }
1820

1821
    SVArbHbRspMember rspMember = {0};
115,989✔
1822
    rspMember.vgId = pReqMember->vgId;
115,989✔
1823
    rspMember.hbSeq = pReqMember->hbSeq;
115,989✔
1824
    if (vnodeGetArbToken(pVnode->pImpl, rspMember.memberToken) != 0) {
115,989✔
1825
      dError("dnodeId:%d vgId:%d failed to get arb token", arbHbReq.dnodeId, pReqMember->vgId);
×
1826
      vmReleaseVnode(pMgmt, pVnode);
×
1827
      continue;
×
1828
    }
1829

1830
    if (vnodeUpdateArbTerm(pVnode->pImpl, arbHbReq.arbTerm) != 0) {
115,989✔
1831
      dError("dnodeId:%d vgId:%d failed to update arb term", arbHbReq.dnodeId, pReqMember->vgId);
×
1832
      vmReleaseVnode(pMgmt, pVnode);
×
1833
      continue;
×
1834
    }
1835

1836
    if (taosArrayPush(arbHbRsp.hbMembers, &rspMember) == NULL) {
231,978✔
1837
      dError("dnodeId:%d vgId:%d failed to push arb hb rsp member", arbHbReq.dnodeId, pReqMember->vgId);
×
1838
      vmReleaseVnode(pMgmt, pVnode);
×
1839
      goto _OVER;
×
1840
    }
1841

1842
    vmReleaseVnode(pMgmt, pVnode);
115,989✔
1843
  }
1844

1845
  SRpcMsg rspMsg = {.info = pMsg->info};
125,932✔
1846
  int32_t rspLen = tSerializeSVArbHeartBeatRsp(NULL, 0, &arbHbRsp);
125,932✔
1847
  if (rspLen < 0) {
125,932✔
1848
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1849
    goto _OVER;
×
1850
  }
1851

1852
  void *pRsp = rpcMallocCont(rspLen);
125,932✔
1853
  if (pRsp == NULL) {
125,932✔
1854
    terrno = terrno;
×
1855
    goto _OVER;
×
1856
  }
1857

1858
  if (tSerializeSVArbHeartBeatRsp(pRsp, rspLen, &arbHbRsp) <= 0) {
125,932✔
1859
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1860
    rpcFreeCont(pRsp);
×
1861
    goto _OVER;
×
1862
  }
1863
  pMsg->info.rsp = pRsp;
125,932✔
1864
  pMsg->info.rspLen = rspLen;
125,932✔
1865

1866
  terrno = TSDB_CODE_SUCCESS;
125,932✔
1867

1868
_OVER:
125,932✔
1869
  tFreeSVArbHeartBeatReq(&arbHbReq);
125,932✔
1870
  tFreeSVArbHeartBeatRsp(&arbHbRsp);
125,932✔
1871
  return terrno;
125,932✔
1872
}
1873

1874
int32_t vmProcessDnodeQueryCompactProgressReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
132,710✔
1875
  int32_t                       code = 0;
132,710✔
1876
  SDnodeQueryCompactProgressReq req = {0};
132,710✔
1877
  void                         *pRsp = NULL;
132,710✔
1878
  SVnodeObj                   **ppVnodes = NULL;
132,710✔
1879
  int32_t                       numOfVnodes = 0;
132,710✔
1880

1881
  code = tDeserializeSDnodeQueryCompactProgressReq(pMsg->pCont, pMsg->contLen, &req);
132,710✔
1882
  if (code != 0) {
132,710✔
1883
    dError("dnode:%d, failed to deserialize dnode-query-compact-progress req, code:%s",
×
1884
           pMgmt->pData->dnodeId, tstrerror(code));
1885
    goto _exit;
×
1886
  }
1887

1888
  dDebug("dnode:%d, receive dnode-query-compact-progress req, compactId:%d", pMgmt->pData->dnodeId, req.compactId);
132,710✔
1889

1890
  // collect compact progress from all running vnodes
1891
  SArray *pProgressArray = taosArrayInit(16, sizeof(SQueryCompactProgressRsp));
132,710✔
1892
  if (pProgressArray == NULL) {
132,710✔
1893
    code = TSDB_CODE_OUT_OF_MEMORY;
×
1894
    goto _exit;
×
1895
  }
1896

1897
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
132,710✔
1898
  if (code != 0) {
132,710✔
1899
    dError("dnode:%d, failed to get vnode list, code:%s", pMgmt->pData->dnodeId, tstrerror(code));
×
1900
    taosArrayDestroy(pProgressArray);
×
1901
    goto _exit;
×
1902
  }
1903

1904
  for (int32_t i = 0; i < numOfVnodes; i++) {
591,733✔
1905
    SVnodeObj *pVnode = ppVnodes[i];
459,023✔
1906
    if (pVnode == NULL) {
459,023✔
1907
      continue;
×
1908
    }
1909
    if (pVnode->failed || pVnode->pImpl == NULL) {
459,023✔
1910
      vmReleaseVnode(pMgmt, pVnode);
×
1911
      continue;
×
1912
    }
1913
#ifdef TD_ENTERPRISE
1914
    SQueryCompactProgressRsp vnodeRsp = {0};
459,023✔
1915
    vnodeRsp.dnodeId = pMgmt->pData->dnodeId;
459,023✔
1916
    if (vnodeGetCompactProgress(pVnode->pImpl, req.compactId, &vnodeRsp) == 0 && vnodeRsp.compactId != 0) {
459,023✔
1917
      if (taosArrayPush(pProgressArray, &vnodeRsp) == NULL) {
459,023✔
1918
        dError("dnode:%d, vgId:%d, failed to push compact progress", pMgmt->pData->dnodeId, pVnode->vgId);
×
1919
      }
1920
    }
1921
#endif
1922
    vmReleaseVnode(pMgmt, pVnode);
459,023✔
1923
  }
1924
  taosMemoryFree(ppVnodes);
132,710✔
1925

1926
  SDnodeQueryCompactProgressRsp rsp = {0};
132,710✔
1927
  rsp.dnodeId       = pMgmt->pData->dnodeId;
132,710✔
1928
  rsp.numOfVnodes   = (int32_t)taosArrayGetSize(pProgressArray);
132,710✔
1929
  rsp.vnodeProgress = (rsp.numOfVnodes > 0) ? (SQueryCompactProgressRsp *)taosArrayGet(pProgressArray, 0) : NULL;
132,710✔
1930

1931
  dInfo("dnode:%d, send dnode-query-compact-progress rsp, numOfVnodes:%d", rsp.dnodeId, rsp.numOfVnodes);
132,710✔
1932

1933
  int32_t rspLen = tSerializeSDnodeQueryCompactProgressRsp(NULL, 0, &rsp);
132,710✔
1934
  if (rspLen < 0) {
132,710✔
1935
    code = rspLen;
×
1936
    taosArrayDestroy(pProgressArray);
×
1937
    goto _exit;
×
1938
  }
1939

1940
  pRsp = rpcMallocCont(rspLen);
132,710✔
1941
  if (pRsp == NULL) {
132,710✔
1942
    code = TSDB_CODE_OUT_OF_MEMORY;
×
1943
    taosArrayDestroy(pProgressArray);
×
1944
    goto _exit;
×
1945
  }
1946

1947
  if (tSerializeSDnodeQueryCompactProgressRsp(pRsp, rspLen, &rsp) < 0) {
132,710✔
1948
    code = TSDB_CODE_INVALID_MSG;
×
1949
    rpcFreeCont(pRsp);
×
1950
    pRsp = NULL;
×
1951
    taosArrayDestroy(pProgressArray);
×
1952
    goto _exit;
×
1953
  }
1954

1955
  taosArrayDestroy(pProgressArray);
132,710✔
1956
  pMsg->info.rsp    = pRsp;
132,710✔
1957
  pMsg->info.rspLen = rspLen;
132,710✔
1958

1959
_exit:
132,710✔
1960
  return code;
132,710✔
1961
}
1962

1963
SArray *vmGetMsgHandles() {
719,130✔
1964
  int32_t code = -1;
719,130✔
1965
  SArray *pArray = taosArrayInit(32, sizeof(SMgmtHandle));
719,130✔
1966
  if (pArray == NULL) goto _OVER;
719,130✔
1967

1968
  if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
719,130✔
1969
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
719,130✔
1970
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
719,130✔
1971
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
719,130✔
1972
  if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
719,130✔
1973
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
719,130✔
1974
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
719,130✔
1975
  if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
719,130✔
1976
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
719,130✔
1977
  if (dmSetMgmtHandle(pArray, TDMT_VND_VSUBTABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
719,130✔
1978
  if (dmSetMgmtHandle(pArray, TDMT_VND_VSTB_REF_DBS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
719,130✔
1979
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_CFG, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
719,130✔
1980
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
719,130✔
1981
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
719,130✔
1982
  if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
719,130✔
1983
  if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
719,130✔
1984
  if (dmSetMgmtHandle(pArray, TDMT_SCH_TASK_NOTIFY, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
719,130✔
1985
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
719,130✔
1986
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
719,130✔
1987
  if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_TTL_EXPIRED_TBS, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
719,130✔
1988
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
719,130✔
1989
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
719,130✔
1990
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
719,130✔
1991
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
719,130✔
1992
  if (dmSetMgmtHandle(pArray, TDMT_VND_SNODE_DROP_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
719,130✔
1993
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
719,130✔
1994
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DELETE_SUB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
719,130✔
1995
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
719,130✔
1996
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
719,130✔
1997
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_WALINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
719,130✔
1998
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_COMMITTEDINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
719,130✔
1999
  if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
719,130✔
2000
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
719,130✔
2001
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
719,130✔
2002
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
719,130✔
2003
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
719,130✔
2004
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
719,130✔
2005
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_COMPACT_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
719,130✔
2006
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_TRIM_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
719,130✔
2007
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_SCAN_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
719,130✔
2008
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
719,130✔
2009
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_SCAN, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
719,130✔
2010
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
719,130✔
2011
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_NAME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
719,130✔
2012
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
719,130✔
2013
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
719,130✔
2014
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
719,130✔
2015

2016
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
719,130✔
2017
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
719,130✔
2018
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
719,130✔
2019
  if (dmSetMgmtHandle(pArray, TDMT_VND_DISABLE_WRITE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
719,130✔
2020
  if (dmSetMgmtHandle(pArray, TDMT_VND_SET_KEEP_VERSION, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
719,130✔
2021
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
719,130✔
2022
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
719,130✔
2023
  if (dmSetMgmtHandle(pArray, TDMT_VND_SCAN, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
719,130✔
2024
  if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
719,130✔
2025
  if (dmSetMgmtHandle(pArray, TDMT_VND_LIST_SSMIGRATE_FILESETS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
719,130✔
2026
  if (dmSetMgmtHandle(pArray, TDMT_VND_SSMIGRATE_FILESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
719,130✔
2027
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_SSMIGRATE_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
719,130✔
2028
  if (dmSetMgmtHandle(pArray, TDMT_VND_FOLLOWER_SSMIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
719,130✔
2029
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_SSMIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
719,130✔
2030
  if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM_WAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
719,130✔
2031
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
719,130✔
2032
  if (dmSetMgmtHandle(pArray, TDMT_DND_MOUNT_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
719,130✔
2033
  if (dmSetMgmtHandle(pArray, TDMT_DND_RETRIEVE_MOUNT_PATH, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
719,130✔
2034
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
719,130✔
2035
  if (dmSetMgmtHandle(pArray, TDMT_DND_QUERY_COMPACT_PROGRESS, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
719,130✔
2036
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
719,130✔
2037
  if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
719,130✔
2038
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CONFIG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
719,130✔
2039
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_ELECTBASELINE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
719,130✔
2040

2041
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT_ELECTION, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
719,130✔
2042
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
719,130✔
2043
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
719,130✔
2044
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
719,130✔
2045
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
719,130✔
2046
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
719,130✔
2047
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
719,130✔
2048
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
719,130✔
2049
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
719,130✔
2050
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
719,130✔
2051
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
719,130✔
2052
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_FORCE_FOLLOWER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
719,130✔
2053

2054
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
719,130✔
2055
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
719,130✔
2056
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
719,130✔
2057
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
719,130✔
2058
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
719,130✔
2059

2060
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_HEARTBEAT, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
719,130✔
2061
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_CHECK_SYNC, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
719,130✔
2062
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_ASSIGNED_LEADER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
719,130✔
2063

2064
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_FETCH, vmPutMsgToStreamReaderQueue, 0) == NULL) goto _OVER;
719,130✔
2065
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_PULL, vmPutMsgToStreamReaderQueue, 0) == NULL) goto _OVER;
719,130✔
2066
  if (dmSetMgmtHandle(pArray, TDMT_VND_AUDIT_RECORD, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
719,130✔
2067

2068
  code = 0;
719,130✔
2069

2070
_OVER:
719,130✔
2071
  if (code != 0) {
719,130✔
2072
    taosArrayDestroy(pArray);
×
2073
    return NULL;
×
2074
  } else {
2075
    return pArray;
719,130✔
2076
  }
2077
}
2078

2079
void vmUpdateMetricsInfo(SVnodeMgmt *pMgmt, int64_t clusterId) {
×
2080
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
2081

2082
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
2083
  while (pIter) {
×
2084
    SVnodeObj **ppVnode = pIter;
×
2085
    if (ppVnode == NULL || *ppVnode == NULL) {
×
2086
      continue;
×
2087
    }
2088

2089
    SVnodeObj *pVnode = *ppVnode;
×
2090
    if (!pVnode->failed) {
×
2091
      SRawWriteMetrics metrics = {0};
×
2092
      if (vnodeGetRawWriteMetrics(pVnode->pImpl, &metrics) == 0) {
×
2093
        // Add the metrics to the global metrics system with cluster ID
2094
        SName   name = {0};
×
2095
        int32_t code = tNameFromString(&name, pVnode->pImpl->config.dbname, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
2096
        if (code < 0) {
×
2097
          dError("failed to get db name since %s", tstrerror(code));
×
2098
          continue;
×
2099
        }
2100
        code = addWriteMetrics(pVnode->vgId, pMgmt->pData->dnodeId, clusterId, tsLocalEp, name.dbname, &metrics);
×
2101
        if (code != TSDB_CODE_SUCCESS) {
×
2102
          dError("Failed to add write metrics for vgId: %d, code: %d", pVnode->vgId, code);
×
2103
        } else {
2104
          // After successfully adding metrics, reset the vnode's write metrics using atomic operations
2105
          if (vnodeResetRawWriteMetrics(pVnode->pImpl, &metrics) != 0) {
×
2106
            dError("Failed to reset write metrics for vgId: %d", pVnode->vgId);
×
2107
          }
2108
        }
2109
      } else {
2110
        dError("Failed to get write metrics for vgId: %d", pVnode->vgId);
×
2111
      }
2112
    }
2113
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
2114
  }
2115

2116
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
2117
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc