• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5011

03 Apr 2026 03:59PM UTC coverage: 72.3% (+0.008%) from 72.292%
#5011

push

travis-ci

web-flow
merge: from main to 3.0 branch #35067

4053 of 5985 new or added lines in 68 files covered. (67.72%)

732 existing lines in 143 files now uncovered.

257430 of 356056 relevant lines covered (72.3%)

131834103.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.63
/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "metrics.h"
18
#include "taos_monitor.h"
19
#include "vmInt.h"
20
#include "vnd.h"
21
#include "vnodeInt.h"
22
#include "tencrypt.h"
23

24
extern taos_counter_t *tsInsertCounter;
25

26
// Forward declaration for function defined in metrics.c
27
extern int32_t addWriteMetrics(int32_t vgId, int32_t dnodeId, int64_t clusterId, const char *dnodeEp,
28
                               const char *dbname, const SRawWriteMetrics *pRawMetrics);
29

30
void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
50,155,564✔
31
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad));
50,155,564✔
32
  if (pInfo->pVloads == NULL) return;
50,155,564✔
33

34
  tfsUpdateSize(pMgmt->pTfs);
50,155,564✔
35

36
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
50,155,564✔
37

38
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
50,155,564✔
39
  while (pIter) {
201,113,279✔
40
    SVnodeObj **ppVnode = pIter;
150,957,715✔
41
    if (ppVnode == NULL || *ppVnode == NULL) continue;
150,957,715✔
42

43
    SVnodeObj *pVnode = *ppVnode;
150,957,715✔
44
    SVnodeLoad vload = {.vgId = pVnode->vgId};
150,957,715✔
45
    if (!pVnode->failed) {
150,957,715✔
46
      if (vnodeGetLoad(pVnode->pImpl, &vload) != 0) {
150,957,715✔
47
        dError("failed to get vnode load");
×
48
      }
49
      if (isReset) vnodeResetLoad(pVnode->pImpl, &vload);
150,957,715✔
50
    }
51
    if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
301,915,430✔
52
      dError("failed to push vnode load");
×
53
    }
54
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
150,957,715✔
55
  }
56

57
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
50,155,564✔
58
}
59

60
void vmSetVnodeSyncTimeout(SVnodeMgmt *pMgmt) {
×
61
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
62

63
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
64
  while (pIter) {
×
65
    SVnodeObj **ppVnode = pIter;
×
66
    if (ppVnode == NULL || *ppVnode == NULL) continue;
×
67

68
    SVnodeObj *pVnode = *ppVnode;
×
69

70
    if (vnodeSetSyncTimeout(pVnode->pImpl, tsVnodeElectIntervalMs) != 0) {
×
71
      dError("vgId:%d, failed to vnodeSetSyncTimeout", pVnode->vgId);
×
72
    }
73
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
74
  }
75

76
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
77
}
×
78

79
void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
×
80
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoadLite));
×
81
  if (!pInfo->pVloads) return;
×
82

83
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
84

85
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
86
  while (pIter) {
×
87
    SVnodeObj **ppVnode = pIter;
×
88
    if (ppVnode == NULL || *ppVnode == NULL) continue;
×
89

90
    SVnodeObj *pVnode = *ppVnode;
×
91
    if (!pVnode->failed) {
×
92
      SVnodeLoadLite vload = {0};
×
93
      if (vnodeGetLoadLite(pVnode->pImpl, &vload) == 0) {
×
94
        if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
×
95
          taosArrayDestroy(pInfo->pVloads);
×
96
          pInfo->pVloads = NULL;
×
97
          break;
×
98
        }
99
      }
100
    }
101
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
102
  }
103

104
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
105
}
106

107
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
165✔
108
  SMonVloadInfo vloads = {0};
165✔
109
  vmGetVnodeLoads(pMgmt, &vloads, true);
165✔
110

111
  SArray *pVloads = vloads.pVloads;
165✔
112
  if (pVloads == NULL) return;
165✔
113

114
  int32_t totalVnodes = 0;
165✔
115
  int32_t masterNum = 0;
165✔
116
  int64_t numOfSelectReqs = 0;
165✔
117
  int64_t numOfInsertReqs = 0;
165✔
118
  int64_t numOfInsertSuccessReqs = 0;
165✔
119
  int64_t numOfBatchInsertReqs = 0;
165✔
120
  int64_t numOfBatchInsertSuccessReqs = 0;
165✔
121

122
  for (int32_t i = 0; i < taosArrayGetSize(pVloads); ++i) {
605✔
123
    SVnodeLoad *pLoad = taosArrayGet(pVloads, i);
440✔
124
    numOfSelectReqs += pLoad->numOfSelectReqs;
440✔
125
    numOfInsertReqs += pLoad->numOfInsertReqs;
440✔
126
    numOfInsertSuccessReqs += pLoad->numOfInsertSuccessReqs;
440✔
127
    numOfBatchInsertReqs += pLoad->numOfBatchInsertReqs;
440✔
128
    numOfBatchInsertSuccessReqs += pLoad->numOfBatchInsertSuccessReqs;
440✔
129
    if (pLoad->syncState == TAOS_SYNC_STATE_LEADER || pLoad->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
440✔
130
      masterNum++;
440✔
131
    }
132
    totalVnodes++;
440✔
133
  }
134

135
  pInfo->vstat.totalVnodes = totalVnodes;
165✔
136
  pInfo->vstat.masterNum = masterNum;
165✔
137
  pInfo->vstat.numOfSelectReqs = numOfSelectReqs;
165✔
138
  pInfo->vstat.numOfInsertReqs = numOfInsertReqs;                          // delta
165✔
139
  pInfo->vstat.numOfInsertSuccessReqs = numOfInsertSuccessReqs;            // delta
165✔
140
  pInfo->vstat.numOfBatchInsertReqs = numOfBatchInsertReqs;                // delta
165✔
141
  pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;  // delta
165✔
142
  pMgmt->state.totalVnodes = totalVnodes;
165✔
143
  pMgmt->state.masterNum = masterNum;
165✔
144
  pMgmt->state.numOfSelectReqs = numOfSelectReqs;
165✔
145
  pMgmt->state.numOfInsertReqs = numOfInsertReqs;
165✔
146
  pMgmt->state.numOfInsertSuccessReqs = numOfInsertSuccessReqs;
165✔
147
  pMgmt->state.numOfBatchInsertReqs = numOfBatchInsertReqs;
165✔
148
  pMgmt->state.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;
165✔
149

150
  if (tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs) != 0) {
165✔
151
    dError("failed to get tfs monitor info");
×
152
  }
153
  taosArrayDestroy(pVloads);
165✔
154
}
155

156
void vmCleanExpriedSamples(SVnodeMgmt *pMgmt) {
165✔
157
  int list_size = taos_counter_get_keys_size(tsInsertCounter);
165✔
158
  if (list_size == 0) return;
165✔
159
  int32_t *vgroup_ids;
×
160
  char   **keys;
×
161
  int      r = 0;
×
162
  r = taos_counter_get_vgroup_ids(tsInsertCounter, &keys, &vgroup_ids, &list_size);
×
163
  if (r) {
×
164
    dError("failed to get vgroup ids");
×
165
    return;
×
166
  }
167
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
168
  for (int i = 0; i < list_size; i++) {
×
169
    int32_t vgroup_id = vgroup_ids[i];
×
170
    void   *vnode = taosHashGet(pMgmt->runngingHash, &vgroup_id, sizeof(int32_t));
×
171
    if (vnode == NULL) {
×
172
      r = taos_counter_delete(tsInsertCounter, keys[i]);
×
173
      if (r) {
×
174
        dError("failed to delete monitor sample key:%s", keys[i]);
×
175
      }
176
    }
177
  }
178
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
179
  if (vgroup_ids) taosMemoryFree(vgroup_ids);
×
180
  if (keys) taosMemoryFree(keys);
×
181
  return;
×
182
}
183

184
void vmCleanExpiredMetrics(SVnodeMgmt *pMgmt) {
×
185
  if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0 || !tsEnableMetrics) {
×
186
    return;
×
187
  }
188

189
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
190
  void     *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
191
  SHashObj *pValidVgroups = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
×
192
  if (pValidVgroups == NULL) {
×
193
    (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
194
    return;
×
195
  }
196

197
  while (pIter != NULL) {
×
198
    SVnodeObj **ppVnode = pIter;
×
199
    if (ppVnode && *ppVnode) {
×
200
      int32_t vgId = (*ppVnode)->vgId;
×
201
      char    dummy = 1;  // hash table value (we only care about the key)
×
202
      if (taosHashPut(pValidVgroups, &vgId, sizeof(int32_t), &dummy, sizeof(char)) != 0) {
×
203
        dError("failed to put vgId:%d to valid vgroups hash", vgId);
×
204
      }
205
    }
206
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
207
  }
208
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
209

210
  // Clean expired metrics by removing metrics for non-existent vgroups
211
  int32_t code = cleanupExpiredMetrics(pValidVgroups);
×
212
  if (code != TSDB_CODE_SUCCESS) {
×
213
    dError("failed to clean expired metrics, code:%d", code);
×
214
  }
215

216
  taosHashCleanup(pValidVgroups);
×
217
}
218

219
static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
3,183,251✔
220
  memcpy(pCfg, &vnodeCfgDefault, sizeof(SVnodeCfg));
3,183,251✔
221

222
  pCfg->vgId = pCreate->vgId;
3,183,251✔
223
  tstrncpy(pCfg->dbname, pCreate->db, sizeof(pCfg->dbname));
3,181,932✔
224
  pCfg->dbId = pCreate->dbUid;
3,183,435✔
225
  pCfg->szPage = pCreate->pageSize * 1024;
3,180,864✔
226
  pCfg->szCache = pCreate->pages;
3,182,190✔
227
  pCfg->cacheLast = pCreate->cacheLast;
3,179,846✔
228
  pCfg->cacheLastSize = pCreate->cacheLastSize;
3,182,229✔
229
  pCfg->cacheLastShardBits = pCreate->cacheLastShardBits;
3,180,726✔
230
  pCfg->szBuf = (uint64_t)pCreate->buffer * 1024 * 1024;
3,181,713✔
231
  pCfg->isWeak = true;
3,179,507✔
232
  pCfg->isTsma = pCreate->isTsma;
3,181,530✔
233
  pCfg->tsdbCfg.compression = pCreate->compression;
3,180,948✔
234
  pCfg->tsdbCfg.precision = pCreate->precision;
3,180,255✔
235
  pCfg->tsdbCfg.days = pCreate->daysPerFile;
3,181,062✔
236
  pCfg->tsdbCfg.keep0 = pCreate->daysToKeep0;
3,180,427✔
237
  pCfg->tsdbCfg.keep1 = pCreate->daysToKeep1;
3,179,779✔
238
  pCfg->tsdbCfg.keep2 = pCreate->daysToKeep2;
3,179,563✔
239
  pCfg->tsdbCfg.keepTimeOffset = pCreate->keepTimeOffset;
3,179,300✔
240
  pCfg->tsdbCfg.minRows = pCreate->minRows;
3,181,322✔
241
  pCfg->tsdbCfg.maxRows = pCreate->maxRows;
3,178,617✔
242
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
243
  // pCfg->tsdbCfg.encryptAlgr = pCreate->encryptAlgr;
244
  tstrncpy(pCfg->tsdbCfg.encryptData.encryptAlgrName, pCreate->encryptAlgrName, TSDB_ENCRYPT_ALGR_NAME_LEN);
3,178,450✔
245
  if (pCfg->tsdbCfg.encryptAlgr == DND_CA_SM4 || pCfg->tsdbCfg.encryptData.encryptAlgrName[0] != '\0') {
3,182,316✔
246
    tstrncpy(pCfg->tsdbCfg.encryptData.encryptKey, tsDataKey, ENCRYPT_KEY_LEN + 1);
5,261✔
247
  }
248
#else
249
  pCfg->tsdbCfg.encryptAlgr = 0;
250
#endif
251

252
  pCfg->walCfg.vgId = pCreate->vgId;  // pCreate->mountVgId ? pCreate->mountVgId : pCreate->vgId;
3,178,789✔
253
  pCfg->walCfg.fsyncPeriod = pCreate->walFsyncPeriod;
3,182,532✔
254
  pCfg->walCfg.retentionPeriod = pCreate->walRetentionPeriod;
3,176,513✔
255
  pCfg->walCfg.rollPeriod = pCreate->walRollPeriod;
3,180,737✔
256
  pCfg->walCfg.retentionSize = pCreate->walRetentionSize;
3,181,184✔
257
  pCfg->walCfg.segSize = pCreate->walSegmentSize;
3,182,032✔
258
  pCfg->walCfg.level = pCreate->walLevel;
3,178,869✔
259
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
260
  // pCfg->walCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
261
  tstrncpy(pCfg->walCfg.encryptData.encryptAlgrName, pCreate->encryptAlgrName, TSDB_ENCRYPT_ALGR_NAME_LEN);
3,181,803✔
262
  if (pCfg->walCfg.encryptAlgr == DND_CA_SM4 || pCfg->walCfg.encryptData.encryptAlgrName[0] != '\0') {
3,181,789✔
263
    tstrncpy(pCfg->walCfg.encryptData.encryptKey, tsDataKey, ENCRYPT_KEY_LEN + 1);
7,198✔
264
  }
265
#else
266
  pCfg->walCfg.encryptAlgr = 0;
267
#endif
268

269
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
270
  // pCfg->tdbEncryptAlgorithm = pCreate->encryptAlgorithm;
271
  tstrncpy(pCfg->tdbEncryptData.encryptAlgrName, pCreate->encryptAlgrName, TSDB_ENCRYPT_ALGR_NAME_LEN);
3,177,551✔
272
  if (pCfg->tdbEncryptAlgr == DND_CA_SM4 || pCfg->tdbEncryptData.encryptAlgrName[0] != '\0') {
3,176,423✔
273
    tstrncpy(pCfg->tdbEncryptData.encryptKey, tsDataKey, ENCRYPT_KEY_LEN + 1);
7,124✔
274
  }
275
#else
276
  pCfg->tdbEncryptAlgr = 0;
277
#endif
278

279
  pCfg->sttTrigger = pCreate->sstTrigger;
3,180,919✔
280
  pCfg->hashBegin = pCreate->hashBegin;
3,181,679✔
281
  pCfg->hashEnd = pCreate->hashEnd;
3,181,103✔
282
  pCfg->hashMethod = pCreate->hashMethod;
3,181,470✔
283
  pCfg->hashPrefix = pCreate->hashPrefix;
3,178,980✔
284
  pCfg->hashSuffix = pCreate->hashSuffix;
3,180,937✔
285
  pCfg->tsdbPageSize = pCreate->tsdbPageSize * 1024;
3,175,867✔
286

287
  pCfg->ssChunkSize = pCreate->ssChunkSize;
3,181,929✔
288
  pCfg->ssKeepLocal = pCreate->ssKeepLocal;
3,175,104✔
289
  pCfg->ssCompact = pCreate->ssCompact;
3,178,858✔
290

291
  pCfg->isAudit = pCreate->isAudit;
3,175,037✔
292
  pCfg->allowDrop = pCreate->allowDrop;
3,181,303✔
293
  pCfg->secureDelete = pCreate->secureDelete;
3,178,220✔
294

295
  pCfg->standby = 0;
3,181,260✔
296
  pCfg->syncCfg.replicaNum = 0;
3,176,778✔
297
  pCfg->syncCfg.totalReplicaNum = 0;
3,177,049✔
298
  pCfg->syncCfg.changeVersion = pCreate->changeVersion;
3,174,167✔
299

300
  memset(&pCfg->syncCfg.nodeInfo, 0, sizeof(pCfg->syncCfg.nodeInfo));
3,172,965✔
301
  for (int32_t i = 0; i < pCreate->replica; ++i) {
7,394,461✔
302
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
4,215,757✔
303
    pNode->nodeId = pCreate->replicas[pCfg->syncCfg.replicaNum].id;
4,216,087✔
304
    pNode->nodePort = pCreate->replicas[pCfg->syncCfg.replicaNum].port;
4,210,868✔
305
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
4,210,190✔
306
    tstrncpy(pNode->nodeFqdn, pCreate->replicas[pCfg->syncCfg.replicaNum].fqdn, TSDB_FQDN_LEN);
4,215,910✔
307
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
4,217,052✔
308
    pCfg->syncCfg.replicaNum++;
4,217,751✔
309
  }
310
  if (pCreate->selfIndex != -1) {
3,181,232✔
311
    pCfg->syncCfg.myIndex = pCreate->selfIndex;
3,071,944✔
312
  }
313
  for (int32_t i = pCfg->syncCfg.replicaNum; i < pCreate->replica + pCreate->learnerReplica; ++i) {
3,280,421✔
314
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
107,053✔
315
    pNode->nodeId = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].id;
107,053✔
316
    pNode->nodePort = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].port;
107,053✔
317
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
107,157✔
318
    tstrncpy(pNode->nodeFqdn, pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].fqdn, TSDB_FQDN_LEN);
107,157✔
319
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
107,053✔
320
    pCfg->syncCfg.totalReplicaNum++;
107,157✔
321
  }
322
  pCfg->syncCfg.totalReplicaNum += pCfg->syncCfg.replicaNum;
3,175,029✔
323
  if (pCreate->learnerSelfIndex != -1) {
3,177,621✔
324
    pCfg->syncCfg.myIndex = pCreate->replica + pCreate->learnerSelfIndex;
107,157✔
325
  }
326
}
3,177,900✔
327

328
static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) {
3,175,315✔
329
  pCfg->vgId = pCreate->vgId;
3,175,315✔
330
  pCfg->vgVersion = pCreate->vgVersion;
3,173,772✔
331
  pCfg->dropped = 0;
3,170,876✔
332
  snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCreate->vgId);
3,176,888✔
333
}
3,171,943✔
334

335
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
3,171,186✔
336
  SCreateVnodeReq req = {0};
3,171,186✔
337
  SVnodeCfg       vnodeCfg = {0};
3,182,262✔
338
  SWrapperCfg     wrapperCfg = {0};
3,181,326✔
339
  int32_t         code = -1;
3,181,593✔
340
  char            path[TSDB_FILENAME_LEN] = {0};
3,181,593✔
341

342
  if (tDeserializeSCreateVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
3,180,267✔
343
    return TSDB_CODE_INVALID_MSG;
×
344
  }
345

346
  if (req.learnerReplica == 0) {
3,180,498✔
347
    req.learnerSelfIndex = -1;
3,072,014✔
348
  }
349

350
  dInfo(
3,180,498✔
351
      "vgId:%d, vnode management handle msgType:%s, start to create vnode, page:%d pageSize:%d buffer:%d szPage:%d "
352
      "szBuf:%" PRIu64 ", cacheLast:%d cacheLastSize:%d sstTrigger:%d tsdbPageSize:%d %d dbname:%s dbId:%" PRId64
353
      ", days:%d keep0:%d keep1:%d keep2:%d keepTimeOffset%d ssChunkSize:%d ssKeepLocal:%d ssCompact:%d tsma:%d "
354
      "precision:%d compression:%d minRows:%d maxRows:%d"
355
      ", wal fsync:%d level:%d retentionPeriod:%d retentionSize:%" PRId64 " rollPeriod:%d segSize:%" PRId64
356
      ", hash method:%d begin:%u end:%u prefix:%d surfix:%d replica:%d selfIndex:%d "
357
      "learnerReplica:%d learnerSelfIndex:%d strict:%d changeVersion:%d encryptAlgorithm:%d encryptAlgrName:%s, "
358
      "isAudit:%" PRIu8 " allowDrop:%" PRIu8,
359
      req.vgId, TMSG_INFO(pMsg->msgType), req.pages, req.pageSize, req.buffer, req.pageSize * 1024,
360
      (uint64_t)req.buffer * 1024 * 1024, req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize,
361
      req.tsdbPageSize * 1024, req.db, req.dbUid, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
362
      req.keepTimeOffset, req.ssChunkSize, req.ssKeepLocal, req.ssCompact, req.isTsma, req.precision, req.compression,
363
      req.minRows, req.maxRows, req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize,
364
      req.walRollPeriod, req.walSegmentSize, req.hashMethod, req.hashBegin, req.hashEnd, req.hashPrefix, req.hashSuffix,
365
      req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict, req.changeVersion,
366
      req.encryptAlgorithm, req.encryptAlgrName, req.isAudit, req.allowDrop);
367

368
  for (int32_t i = 0; i < req.replica; ++i) {
7,399,732✔
369
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port,
4,217,805✔
370
          req.replicas[i].id);
371
  }
372
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
3,289,084✔
373
    dInfo("vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.vgId, i, req.learnerReplicas[i].fqdn,
107,157✔
374
          req.learnerReplicas[i].port, req.replicas[i].id);
375
  }
376

377
  SReplica *pReplica = NULL;
3,181,927✔
378
  if (req.selfIndex != -1) {
3,181,927✔
379
    pReplica = &req.replicas[req.selfIndex];
3,074,593✔
380
  } else {
381
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
107,334✔
382
  }
383
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
3,182,331✔
384
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
3,182,331✔
UNCOV
385
    (void)tFreeSCreateVnodeReq(&req);
×
386

387
    code = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
388
    dError("vgId:%d, dnodeId:%d ep:%s:%u in request, ep:%s:%u in local, %s", req.vgId, pReplica->id,
×
389
           pReplica->fqdn, pReplica->port, tsLocalFqdn, tsServerPort, tstrerror(code));
390
    return code;
×
391
  }
392

393
  if (taosWaitCfgKeyLoaded() != 0) {
3,182,331✔
394
    (void)tFreeSCreateVnodeReq(&req);
×
395
    code = terrno;
×
396
    dError("vgId:%d, failed to create vnode since encrypt key is not loaded, reason:%s", req.vgId, tstrerror(code));
×
397
    return code;
×
398
  }
399

400
  if (req.encryptAlgrName[0] != '\0' && strlen(tsDataKey) == 0) {
3,182,331✔
401
    (void)tFreeSCreateVnodeReq(&req);
×
402
    code = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
403
    dError("vgId:%d, failed to create vnode since encrypt key is empty, reason:%s", req.vgId, tstrerror(code));
×
404
    return code;
×
405
  }
406

407
  vmGenerateVnodeCfg(&req, &vnodeCfg);
3,182,331✔
408

409
  vmGenerateWrapperCfg(pMgmt, &req, &wrapperCfg);
3,178,596✔
410

411
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, req.vgId, false);
3,180,731✔
412
  if (pVnode != NULL && (req.replica == 1 || !pVnode->failed)) {
3,181,390✔
413
    dError("vgId:%d, already exist", req.vgId);
19,834✔
414
    (void)tFreeSCreateVnodeReq(&req);
19,834✔
415
    vmReleaseVnode(pMgmt, pVnode);
19,834✔
416
    code = TSDB_CODE_VND_ALREADY_EXIST;
19,834✔
417
    return 0;
19,834✔
418
  }
419

420
  int32_t diskPrimary = vmGetPrimaryDisk(pMgmt, vnodeCfg.vgId);
3,161,556✔
421
  if (diskPrimary < 0) {
3,159,182✔
422
    diskPrimary = vmAllocPrimaryDisk(pMgmt, vnodeCfg.vgId);
3,160,223✔
423
  }
424
  wrapperCfg.diskPrimary = diskPrimary;
3,161,456✔
425

426
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
3,161,456✔
427

428
  if ((code = vnodeCreate(path, &vnodeCfg, diskPrimary, pMgmt->pTfs)) < 0) {
3,161,456✔
429
    dError("vgId:%d, failed to create vnode since %s", req.vgId, tstrerror(code));
×
430
    vmReleaseVnode(pMgmt, pVnode);
×
431
    vmCleanPrimaryDisk(pMgmt, req.vgId);
×
432
    (void)tFreeSCreateVnodeReq(&req);
×
433
    return code;
×
434
  }
435

436
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, true);
3,162,497✔
437
  if (pImpl == NULL) {
3,161,819✔
438
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
439
    code = terrno != 0 ? terrno : -1;
×
440
    goto _OVER;
×
441
  }
442

443
  code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl);
3,161,819✔
444
  if (code != 0) {
3,162,497✔
445
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
446
    code = terrno != 0 ? terrno : code;
×
447
    goto _OVER;
×
448
  }
449

450
  code = vnodeStart(pImpl);
3,162,497✔
451
  if (code != 0) {
3,162,497✔
452
    dError("vgId:%d, failed to start sync since %s", req.vgId, terrstr());
×
453
    goto _OVER;
×
454
  }
455

456
  code = vmWriteVnodeListToFile(pMgmt);
3,162,497✔
457
  if (code != 0) {
3,162,497✔
458
    code = terrno != 0 ? terrno : code;
×
459
    goto _OVER;
×
460
  }
461

462
_OVER:
3,162,497✔
463
  vmCleanPrimaryDisk(pMgmt, req.vgId);
3,162,497✔
464

465
  if (code != 0) {
3,162,497✔
466
    vmCloseFailedVnode(pMgmt, req.vgId);
×
467

468
    vnodeClose(pImpl);
×
469
    vnodeDestroy(0, path, pMgmt->pTfs, 0);
×
470
  } else {
471
    dInfo("vgId:%d, vnode management handle msgType:%s, end to create vnode, vnode is created", req.vgId,
3,162,497✔
472
          TMSG_INFO(pMsg->msgType));
473
  }
474

475
  (void)tFreeSCreateVnodeReq(&req);
3,162,497✔
476
  terrno = code;
3,162,497✔
477
  return code;
3,162,497✔
478
}
479

480
#ifdef USE_MOUNT
481
typedef struct {
482
  int64_t dbId;
483
  int32_t vgId;
484
  int32_t diskPrimary;
485
} SMountDbVgId;
486
extern int32_t vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo);
487
extern int32_t mndFetchSdbStables(const char *mntName, const char *path, void *output);
488

489
static int compareVnodeInfo(const void *p1, const void *p2) {
2,926✔
490
  SVnodeInfo *v1 = (SVnodeInfo *)p1;
2,926✔
491
  SVnodeInfo *v2 = (SVnodeInfo *)p2;
2,926✔
492

493
  if (v1->config.dbId == v2->config.dbId) {
2,926✔
494
    if (v1->config.vgId == v2->config.vgId) {
1,672✔
495
      return 0;
×
496
    }
497
    return v1->config.vgId > v2->config.vgId ? 1 : -1;
1,672✔
498
  }
499

500
  return v1->config.dbId > v2->config.dbId ? 1 : -1;
1,254✔
501
}
502
static int compareVgDiskPrimary(const void *p1, const void *p2) {
2,926✔
503
  SMountDbVgId *v1 = (SMountDbVgId *)p1;
2,926✔
504
  SMountDbVgId *v2 = (SMountDbVgId *)p2;
2,926✔
505

506
  if (v1->dbId == v2->dbId) {
2,926✔
507
    if (v1->vgId == v2->vgId) {
1,672✔
508
      return 0;
×
509
    }
510
    return v1->vgId > v2->vgId ? 1 : -1;
1,672✔
511
  }
512

513
  return v1->dbId > v2->dbId ? 1 : -1;
1,254✔
514
}
515

516
static int32_t vmRetrieveMountDnode(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
418✔
517
  int32_t   code = 0, lino = 0;
418✔
518
  TdFilePtr pFile = NULL;
418✔
519
  char     *content = NULL;
418✔
520
  SJson    *pJson = NULL;
418✔
521
  int64_t   size = 0;
418✔
522
  int64_t   clusterId = 0, dropped = 0, encryptScope = 0;
418✔
523
  char      file[TSDB_MOUNT_FPATH_LEN] = {0};
418✔
524
  SArray   *pDisks = NULL;
418✔
525
  // step 1: fetch clusterId from dnode.json
526
  (void)snprintf(file, sizeof(file), "%s%s%s%sdnode.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP);
418✔
527
  TAOS_CHECK_EXIT(taosStatFile(file, &size, NULL, NULL));
418✔
528
  TSDB_CHECK_NULL((pFile = taosOpenFile(file, TD_FILE_READ)), code, lino, _exit, terrno);
418✔
529
  TSDB_CHECK_NULL((content = taosMemoryMalloc(size + 1)), code, lino, _exit, terrno);
418✔
530
  if (taosReadFile(pFile, content, size) != size) {
418✔
531
    TAOS_CHECK_EXIT(terrno);
×
532
  }
533
  content[size] = '\0';
418✔
534
  pJson = tjsonParse(content);
418✔
535
  if (pJson == NULL) {
418✔
536
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
537
  }
538
  tjsonGetNumberValue(pJson, "dropped", dropped, code);
418✔
539
  TAOS_CHECK_EXIT(code);
418✔
540
  if (dropped == 1) {
418✔
541
    TAOS_CHECK_EXIT(TSDB_CODE_MND_MOUNT_DNODE_DROPPED);
×
542
  }
543
  tjsonGetNumberValue(pJson, "encryptScope", encryptScope, code);
418✔
544
  TAOS_CHECK_EXIT(code);
418✔
545
  if (encryptScope != 0) {
418✔
546
    TAOS_CHECK_EXIT(TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG);
×
547
  }
548
  tjsonGetNumberValue(pJson, "clusterId", clusterId, code);
418✔
549
  TAOS_CHECK_EXIT(code);
418✔
550
  if (clusterId == 0) {
418✔
551
    TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_CLUSTER_ID);
×
552
  }
553
  pMountInfo->clusterId = clusterId;
418✔
554
  if (content != NULL) taosMemoryFreeClear(content);
418✔
555
  if (pJson != NULL) {
418✔
556
    cJSON_Delete(pJson);
418✔
557
    pJson = NULL;
418✔
558
  }
559
  if (pFile != NULL) taosCloseFile(&pFile);
418✔
560
  // step 2: fetch dataDir from dnode/config/local.json
561
  TAOS_CHECK_EXIT(vmGetMountDisks(pMgmt, pReq->mountPath, &pDisks));
418✔
562
  int32_t nDisks = taosArrayGetSize(pDisks);
418✔
563
  if (nDisks < 1 || nDisks > TFS_MAX_DISKS) {
418✔
564
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
565
  }
566
  for (int32_t i = 0; i < nDisks; ++i) {
2,926✔
567
    SDiskCfg *pDisk = TARRAY_GET_ELEM(pDisks, i);
2,508✔
568
    if (!pMountInfo->pDisks[pDisk->level]) {
2,508✔
569
      pMountInfo->pDisks[pDisk->level] = taosArrayInit(1, sizeof(char *));
836✔
570
      if (!pMountInfo->pDisks[pDisk->level]) {
836✔
571
        TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
572
      }
573
    }
574
    char *pDir = taosStrdup(pDisk->dir);
2,508✔
575
    if (pDir == NULL) {
2,508✔
576
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
577
    }
578
    if (pDisk->primary == 1 && taosArrayGetSize(pMountInfo->pDisks[0])) {
2,508✔
579
      // put the primary disk to the first position of level 0
580
      if (!taosArrayInsert(pMountInfo->pDisks[0], 0, &pDir)) {
418✔
581
        taosMemFree(pDir);
×
582
        TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
583
      }
584
    } else if (!taosArrayPush(pMountInfo->pDisks[pDisk->level], &pDir)) {
4,180✔
585
      taosMemFree(pDir);
×
586
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
587
    }
588
  }
589
  for (int32_t i = 0; i < TFS_MAX_TIERS; ++i) {
1,672✔
590
    int32_t nDisk = taosArrayGetSize(pMountInfo->pDisks[i]);
1,254✔
591
    if (nDisk < (i == 0 ? 1 : 0) || nDisk > TFS_MAX_DISKS_PER_TIER) {
1,254✔
592
      dError("mount:%s, invalid disk number:%d at level:%d", pReq->mountName, nDisk, i);
×
593
      TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
594
    }
595
  }
596
_exit:
418✔
597
  if (content != NULL) taosMemoryFreeClear(content);
418✔
598
  if (pJson != NULL) cJSON_Delete(pJson);
418✔
599
  if (pFile != NULL) taosCloseFile(&pFile);
418✔
600
  if (pDisks != NULL) {
418✔
601
    taosArrayDestroy(pDisks);
418✔
602
    pDisks = NULL;
418✔
603
  }
604
  if (code != 0) {
418✔
605
    dError("mount:%s, failed to retrieve mount dnode at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
606
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
607
  } else {
608
    dInfo("mount:%s, success to retrieve mount dnode on dnode:%d, clusterId:%" PRId64 ", path:%s", pReq->mountName,
418✔
609
          pReq->dnodeId, pMountInfo->clusterId, pReq->mountPath);
610
  }
611
  TAOS_RETURN(code);
418✔
612
}
613

614
static int32_t vmRetrieveMountVnodes(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
418✔
615
  int32_t       code = 0, lino = 0;
418✔
616
  SWrapperCfg  *pCfgs = NULL;
418✔
617
  int32_t       numOfVnodes = 0;
418✔
618
  char          path[TSDB_MOUNT_FPATH_LEN] = {0};
418✔
619
  TdDirPtr      pDir = NULL;
418✔
620
  TdDirEntryPtr de = NULL;
418✔
621
  SVnodeMgmt    vnodeMgmt = {0};
418✔
622
  SArray       *pVgCfgs = NULL;
418✔
623
  SArray       *pDbInfos = NULL;
418✔
624
  SArray       *pDiskPrimarys = NULL;
418✔
625

626
  snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
418✔
627
  vnodeMgmt.path = path;
418✔
628
  TAOS_CHECK_EXIT(vmGetVnodeListFromFile(&vnodeMgmt, &pCfgs, &numOfVnodes));
418✔
629
  dInfo("mount:%s, num of vnodes is %d in path:%s", pReq->mountName, numOfVnodes, vnodeMgmt.path);
418✔
630
  TSDB_CHECK_NULL((pVgCfgs = taosArrayInit_s(sizeof(SVnodeInfo), numOfVnodes)), code, lino, _exit, terrno);
418✔
631
  TSDB_CHECK_NULL((pDiskPrimarys = taosArrayInit(numOfVnodes, sizeof(SMountDbVgId))), code, lino, _exit, terrno);
418✔
632

633
  int32_t nDiskLevel0 = taosArrayGetSize(pMountInfo->pDisks[0]);
418✔
634
  int32_t nVgDropped = 0, j = 0;
418✔
635
  for (int32_t i = 0; i < numOfVnodes; ++i) {
2,090✔
636
    SWrapperCfg *pCfg = &pCfgs[i];
1,672✔
637
    // in order to support multi-tier disk, the pCfg->path should be adapted according to the diskPrimary firstly
638
    if (nDiskLevel0 > 1) {
1,672✔
639
      char *pDir = taosArrayGet(pMountInfo->pDisks[0], pCfg->diskPrimary);
1,672✔
640
      if (!pDir) TAOS_CHECK_EXIT(TSDB_CODE_INTERNAL_ERROR);
1,672✔
641
      (void)snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%svnode%d", *(char **)pDir, TD_DIRSEP, TD_DIRSEP,
1,672✔
642
                     pCfg->vgId);
643
    }
644
    dInfo("mount:%s, vnode path:%s, dropped:%" PRIi8, pReq->mountName, pCfg->path, pCfg->dropped);
1,672✔
645
    if (pCfg->dropped) {
1,672✔
646
      ++nVgDropped;
×
647
      continue;
×
648
    }
649
    if (!taosCheckAccessFile(pCfg->path, TD_FILE_ACCESS_EXIST_OK | TD_FILE_ACCESS_READ_OK | TD_FILE_ACCESS_WRITE_OK)) {
1,672✔
650
      dError("mount:%s, vnode path:%s, no r/w authority", pReq->mountName, pCfg->path);
×
651
      TAOS_CHECK_EXIT(TSDB_CODE_MND_NO_RIGHTS);
×
652
    }
653
    SVnodeInfo *pInfo = TARRAY_GET_ELEM(pVgCfgs, j++);
1,672✔
654
    TAOS_CHECK_EXIT(vnodeLoadInfo(pCfg->path, pInfo));
1,672✔
655
    if (pInfo->config.syncCfg.replicaNum > 1) {
1,672✔
656
      dError("mount:%s, vnode path:%s, invalid replica:%d", pReq->mountName, pCfg->path,
×
657
             pInfo->config.syncCfg.replicaNum);
658
      TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_REPLICA);
×
659
    } else if (pInfo->config.vgId != pCfg->vgId) {
1,672✔
660
      dError("mount:%s, vnode path:%s, vgId:%d not match:%d", pReq->mountName, pCfg->path, pInfo->config.vgId,
×
661
             pCfg->vgId);
662
      TAOS_CHECK_EXIT(TSDB_CODE_FILE_CORRUPTED);
×
663
    } else if (pInfo->config.tdbEncryptData.encryptAlgrName[0] != '\0' ||
1,672✔
664
               pInfo->config.tsdbCfg.encryptData.encryptAlgrName[0] != '\0' ||
1,672✔
665
               pInfo->config.walCfg.encryptData.encryptAlgrName[0] != '\0') {
1,672✔
666
      dError("mount:%s, vnode path:%s, invalid encrypt algorithm, tdb:%s wal:%s tsdb:%s", pReq->mountName, pCfg->path,
×
667
             pInfo->config.tdbEncryptData.encryptAlgrName, pInfo->config.walCfg.encryptData.encryptAlgrName,
668
             pInfo->config.tsdbCfg.encryptData.encryptAlgrName);
669
      TAOS_CHECK_EXIT(TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG);
×
670
    }
671
    SMountDbVgId dbVgId = {.dbId = pInfo->config.dbId, .vgId = pInfo->config.vgId, .diskPrimary = pCfg->diskPrimary};
1,672✔
672
    TSDB_CHECK_NULL(taosArrayPush(pDiskPrimarys, &dbVgId), code, lino, _exit, terrno);
1,672✔
673
  }
674
  if (nVgDropped > 0) {
418✔
675
    dInfo("mount:%s, %d vnodes are dropped", pReq->mountName, nVgDropped);
×
676
    int32_t nVgToDrop = taosArrayGetSize(pVgCfgs) - nVgDropped;
×
677
    if (nVgToDrop > 0) taosArrayRemoveBatch(pVgCfgs, nVgToDrop - 1, nVgToDrop, NULL);
×
678
  }
679
  int32_t nVgCfg = taosArrayGetSize(pVgCfgs);
418✔
680
  int32_t nDiskPrimary = taosArrayGetSize(pDiskPrimarys);
418✔
681
  if (nVgCfg != nDiskPrimary) {
418✔
682
    dError("mount:%s, nVgCfg:%d not match nDiskPrimary:%d", pReq->mountName, nVgCfg, nDiskPrimary);
×
683
    TAOS_CHECK_EXIT(TSDB_CODE_APP_ERROR);
×
684
  }
685
  if (nVgCfg > 1) {
418✔
686
    taosArraySort(pVgCfgs, compareVnodeInfo);
418✔
687
    taosArraySort(pDiskPrimarys, compareVgDiskPrimary);
418✔
688
  }
689

690
  int64_t clusterId = pMountInfo->clusterId;
418✔
691
  int64_t dbId = 0, vgId = 0, nDb = 0;
418✔
692
  for (int32_t i = 0; i < nVgCfg; ++i) {
1,522✔
693
    SVnodeInfo *pInfo = TARRAY_GET_ELEM(pVgCfgs, i);
1,246✔
694
    if (clusterId != pInfo->config.syncCfg.nodeInfo->clusterId) {
1,246✔
695
      dError("mount:%s, clusterId:%" PRId64 " not match:%" PRId64, pReq->mountName, clusterId,
142✔
696
             pInfo->config.syncCfg.nodeInfo->clusterId);
697
      TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_CLUSTER_ID);
142✔
698
    }
699
    if (dbId != pInfo->config.dbId) {
1,104✔
700
      dbId = pInfo->config.dbId;
552✔
701
      ++nDb;
552✔
702
    }
703
    if (vgId == pInfo->config.vgId) {
1,104✔
704
      TAOS_CHECK_EXIT(TSDB_CODE_FILE_CORRUPTED);
×
705
    } else {
706
      vgId = pInfo->config.vgId;
1,104✔
707
    }
708
  }
709

710
  if (nDb > 0) {
276✔
711
    TSDB_CHECK_NULL((pDbInfos = taosArrayInit_s(sizeof(SMountDbInfo), nDb)), code, lino, _exit, terrno);
276✔
712
    int32_t dbIdx = -1;
276✔
713
    for (int32_t i = 0; i < nVgCfg; ++i) {
1,380✔
714
      SVnodeInfo   *pVgCfg = TARRAY_GET_ELEM(pVgCfgs, i);
1,104✔
715
      SMountDbVgId *pDiskPrimary = TARRAY_GET_ELEM(pDiskPrimarys, i);
1,104✔
716
      SMountDbInfo *pDbInfo = NULL;
1,104✔
717
      if (i == 0 || ((SMountDbInfo *)TARRAY_GET_ELEM(pDbInfos, dbIdx))->dbId != pVgCfg->config.dbId) {
1,104✔
718
        pDbInfo = TARRAY_GET_ELEM(pDbInfos, ++dbIdx);
552✔
719
        pDbInfo->dbId = pVgCfg->config.dbId;
552✔
720
        snprintf(pDbInfo->dbName, sizeof(pDbInfo->dbName), "%s", pVgCfg->config.dbname);
552✔
721
        TSDB_CHECK_NULL((pDbInfo->pVgs = taosArrayInit(nVgCfg / nDb, sizeof(SMountVgInfo))), code, lino, _exit, terrno);
552✔
722
      } else {
723
        pDbInfo = TARRAY_GET_ELEM(pDbInfos, dbIdx);
552✔
724
      }
725
      SMountVgInfo vgInfo = {
1,104✔
726
          .diskPrimary = pDiskPrimary->diskPrimary,
1,104✔
727
          .vgId = pVgCfg->config.vgId,
1,104✔
728
          .dbId = pVgCfg->config.dbId,
1,104✔
729
          .cacheLastSize = pVgCfg->config.cacheLastSize,
1,104✔
730
          .szPage = pVgCfg->config.szPage,
1,104✔
731
          .szCache = pVgCfg->config.szCache,
1,104✔
732
          .szBuf = pVgCfg->config.szBuf,
1,104✔
733
          .cacheLast = pVgCfg->config.cacheLast,
1,104✔
734
          .standby = pVgCfg->config.standby,
1,104✔
735
          .hashMethod = pVgCfg->config.hashMethod,
1,104✔
736
          .hashBegin = pVgCfg->config.hashBegin,
1,104✔
737
          .hashEnd = pVgCfg->config.hashEnd,
1,104✔
738
          .hashPrefix = pVgCfg->config.hashPrefix,
1,104✔
739
          .hashSuffix = pVgCfg->config.hashSuffix,
1,104✔
740
          .sttTrigger = pVgCfg->config.sttTrigger,
1,104✔
741
          .replications = pVgCfg->config.syncCfg.replicaNum,
1,104✔
742
          .precision = pVgCfg->config.tsdbCfg.precision,
1,104✔
743
          .compression = pVgCfg->config.tsdbCfg.compression,
1,104✔
744
          .slLevel = pVgCfg->config.tsdbCfg.slLevel,
1,104✔
745
          .daysPerFile = pVgCfg->config.tsdbCfg.days,
1,104✔
746
          .keep0 = pVgCfg->config.tsdbCfg.keep0,
1,104✔
747
          .keep1 = pVgCfg->config.tsdbCfg.keep1,
1,104✔
748
          .keep2 = pVgCfg->config.tsdbCfg.keep2,
1,104✔
749
          .keepTimeOffset = pVgCfg->config.tsdbCfg.keepTimeOffset,
1,104✔
750
          .minRows = pVgCfg->config.tsdbCfg.minRows,
1,104✔
751
          .maxRows = pVgCfg->config.tsdbCfg.maxRows,
1,104✔
752
          .tsdbPageSize = pVgCfg->config.tsdbPageSize / 1024,
1,104✔
753
          .ssChunkSize = pVgCfg->config.ssChunkSize,
1,104✔
754
          .ssKeepLocal = pVgCfg->config.ssKeepLocal,
1,104✔
755
          .ssCompact = pVgCfg->config.ssCompact,
1,104✔
756
          .walFsyncPeriod = pVgCfg->config.walCfg.fsyncPeriod,
1,104✔
757
          .walRetentionPeriod = pVgCfg->config.walCfg.retentionPeriod,
1,104✔
758
          .walRollPeriod = pVgCfg->config.walCfg.rollPeriod,
1,104✔
759
          .walRetentionSize = pVgCfg->config.walCfg.retentionSize,
1,104✔
760
          .walSegSize = pVgCfg->config.walCfg.segSize,
1,104✔
761
          .walLevel = pVgCfg->config.walCfg.level,
1,104✔
762
          .isAudit = pVgCfg->config.isAudit,
1,104✔
763
          .allowDrop = pVgCfg->config.allowDrop,
1,104✔
764
          .secureDelete = pVgCfg->config.secureDelete,
1,104✔
765
          //.encryptAlgorithm = pVgCfg->config.walCfg.encryptAlgorithm,
766
          .committed = pVgCfg->state.committed,
1,104✔
767
          .commitID = pVgCfg->state.commitID,
1,104✔
768
          .commitTerm = pVgCfg->state.commitTerm,
1,104✔
769
          .numOfSTables = pVgCfg->config.vndStats.numOfSTables,
1,104✔
770
          .numOfCTables = pVgCfg->config.vndStats.numOfCTables,
1,104✔
771
          .numOfNTables = pVgCfg->config.vndStats.numOfNTables,
1,104✔
772
      };
773
      TSDB_CHECK_NULL(taosArrayPush(pDbInfo->pVgs, &vgInfo), code, lino, _exit, terrno);
2,208✔
774
    }
775
  }
776

777
  pMountInfo->pDbs = pDbInfos;
276✔
778

779
_exit:
418✔
780
  if (code != 0) {
418✔
781
    dError("mount:%s, failed to retrieve mount vnode at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
142✔
782
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
783
  }
784
  taosArrayDestroy(pDiskPrimarys);
418✔
785
  taosArrayDestroy(pVgCfgs);
418✔
786
  taosMemoryFreeClear(pCfgs);
418✔
787
  TAOS_RETURN(code);
418✔
788
}
789

790
/**
791
 *   Retrieve the stables from vnode meta.
792
 */
793
static int32_t vmRetrieveMountStbs(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
276✔
794
  int32_t code = 0, lino = 0;
276✔
795
  char    path[TSDB_MOUNT_FPATH_LEN] = {0};
276✔
796
  int32_t nDb = taosArrayGetSize(pMountInfo->pDbs);
276✔
797
  SArray *suidList = NULL;
276✔
798
  SArray *pCols = NULL;
276✔
799
  SArray *pTags = NULL;
276✔
800
  SArray *pColExts = NULL;
276✔
801
  SArray *pTagExts = NULL;
276✔
802

803
  snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
276✔
804
  for (int32_t i = 0; i < nDb; ++i) {
828✔
805
    SMountDbInfo *pDbInfo = TARRAY_GET_ELEM(pMountInfo->pDbs, i);
552✔
806
    int32_t       nVg = taosArrayGetSize(pDbInfo->pVgs);
552✔
807
    for (int32_t j = 0; j < nVg; ++j) {
552✔
808
      SMountVgInfo *pVgInfo = TARRAY_GET_ELEM(pDbInfo->pVgs, j);
552✔
809
      SVnode        vnode = {
552✔
810
                 .config.vgId = pVgInfo->vgId,
552✔
811
                 .config.dbId = pVgInfo->dbId,
552✔
812
                 .config.cacheLastSize = pVgInfo->cacheLastSize,
552✔
813
                 .config.szPage = pVgInfo->szPage,
552✔
814
                 .config.szCache = pVgInfo->szCache,
552✔
815
                 .config.szBuf = pVgInfo->szBuf,
552✔
816
                 .config.cacheLast = pVgInfo->cacheLast,
552✔
817
                 .config.standby = pVgInfo->standby,
552✔
818
                 .config.hashMethod = pVgInfo->hashMethod,
552✔
819
                 .config.hashBegin = pVgInfo->hashBegin,
552✔
820
                 .config.hashEnd = pVgInfo->hashEnd,
552✔
821
                 .config.hashPrefix = pVgInfo->hashPrefix,
552✔
822
                 .config.hashSuffix = pVgInfo->hashSuffix,
552✔
823
                 .config.sttTrigger = pVgInfo->sttTrigger,
552✔
824
                 .config.syncCfg.replicaNum = pVgInfo->replications,
552✔
825
                 .config.tsdbCfg.precision = pVgInfo->precision,
552✔
826
                 .config.tsdbCfg.compression = pVgInfo->compression,
552✔
827
                 .config.tsdbCfg.slLevel = pVgInfo->slLevel,
552✔
828
                 .config.tsdbCfg.days = pVgInfo->daysPerFile,
552✔
829
                 .config.tsdbCfg.keep0 = pVgInfo->keep0,
552✔
830
                 .config.tsdbCfg.keep1 = pVgInfo->keep1,
552✔
831
                 .config.tsdbCfg.keep2 = pVgInfo->keep2,
552✔
832
                 .config.tsdbCfg.keepTimeOffset = pVgInfo->keepTimeOffset,
552✔
833
                 .config.tsdbCfg.minRows = pVgInfo->minRows,
552✔
834
                 .config.tsdbCfg.maxRows = pVgInfo->maxRows,
552✔
835
                 .config.tsdbPageSize = pVgInfo->tsdbPageSize,
552✔
836
                 .config.ssChunkSize = pVgInfo->ssChunkSize,
552✔
837
                 .config.ssKeepLocal = pVgInfo->ssKeepLocal,
552✔
838
                 .config.ssCompact = pVgInfo->ssCompact,
552✔
839
                 .config.isAudit = pVgInfo->isAudit,
552✔
840
                 .config.allowDrop = pVgInfo->allowDrop,
552✔
841
                 .config.secureDelete = pVgInfo->secureDelete,
552✔
842
                 .config.walCfg.fsyncPeriod = pVgInfo->walFsyncPeriod,
552✔
843
                 .config.walCfg.retentionPeriod = pVgInfo->walRetentionPeriod,
552✔
844
                 .config.walCfg.rollPeriod = pVgInfo->walRollPeriod,
552✔
845
                 .config.walCfg.retentionSize = pVgInfo->walRetentionSize,
552✔
846
                 .config.walCfg.segSize = pVgInfo->walSegSize,
552✔
847
                 .config.walCfg.level = pVgInfo->walLevel,
552✔
848
          //.config.walCfg.encryptAlgorithm = pVgInfo->encryptAlgorithm,
849
                 .diskPrimary = pVgInfo->diskPrimary,
552✔
850
      };
851
      void *vnodePath = taosArrayGet(pMountInfo->pDisks[0], pVgInfo->diskPrimary);
552✔
852
      snprintf(path, sizeof(path), "%s%s%s%svnode%d", *(char **)vnodePath, TD_DIRSEP, dmNodeName(VNODE), TD_DIRSEP,
552✔
853
               pVgInfo->vgId);
854
      vnode.path = path;
552✔
855

856
      int32_t rollback = vnodeShouldRollback(&vnode);
552✔
857
      if ((code = metaOpen(&vnode, &vnode.pMeta, rollback)) != 0) {
552✔
858
        dError("mount:%s, failed to retrieve stbs of vnode:%d for db:%" PRId64 " on dnode:%d since %s, path:%s",
×
859
               pReq->mountName, pVgInfo->vgId, pVgInfo->dbId, pReq->dnodeId, tstrerror(code), path);
860
        TAOS_CHECK_EXIT(code);
×
861
      } else {
862
        dInfo("mount:%s, success to retrieve stbs of vnode:%d for db:%" PRId64 " on dnode:%d, path:%s", pReq->mountName,
552✔
863
              pVgInfo->vgId, pVgInfo->dbId, pReq->dnodeId, path);
864

865
        SMetaReader mr = {0};
552✔
866
        tb_uid_t    suid = 0;
552✔
867
        SMeta      *pMeta = vnode.pMeta;
552✔
868

869
        metaReaderDoInit(&mr, pMeta, META_READER_LOCK);
552✔
870
        if (!suidList && !(suidList = taosArrayInit(1, sizeof(tb_uid_t)))) {
552✔
871
          TSDB_CHECK_CODE(terrno, lino, _exit0);
×
872
        }
873
        taosArrayClear(suidList);
552✔
874
        TSDB_CHECK_CODE(vnodeGetStbIdList(&vnode, 0, suidList), lino, _exit0);
552✔
875
        dInfo("mount:%s, vnode:%d, db:%" PRId64 ", stbs num:%d on dnode:%d", pReq->mountName, pVgInfo->vgId,
552✔
876
              pVgInfo->dbId, (int32_t)taosArrayGetSize(suidList), pReq->dnodeId);
877
        int32_t nStbs = taosArrayGetSize(suidList);
552✔
878
        if (!pDbInfo->pStbs && !(pDbInfo->pStbs = taosArrayInit(nStbs, sizeof(void *)))) {
552✔
879
          TSDB_CHECK_CODE(terrno, lino, _exit0);
×
880
        }
881
        for (int32_t i = 0; i < nStbs; ++i) {
2,208✔
882
          suid = *(tb_uid_t *)taosArrayGet(suidList, i);
1,656✔
883
          dInfo("mount:%s, vnode:%d, db:%" PRId64 ", stb suid:%" PRIu64 " on dnode:%d", pReq->mountName, pVgInfo->vgId,
1,656✔
884
                pVgInfo->dbId, suid, pReq->dnodeId);
885
          if ((code = metaReaderGetTableEntryByUidCache(&mr, suid)) < 0) {
1,656✔
886
            TSDB_CHECK_CODE(code, lino, _exit0);
×
887
          }
888
          if (mr.me.uid != suid || mr.me.type != TSDB_SUPER_TABLE ||
1,656✔
889
              mr.me.colCmpr.nCols != mr.me.stbEntry.schemaRow.nCols) {
1,656✔
890
            dError("mount:%s, vnode:%d, db:%" PRId64 ", stb info not match, suid:%" PRIu64 " expected:%" PRIu64
×
891
                   ", type:%" PRIi8 " expected:%d, nCmprCols:%d nCols:%d on dnode:%d",
892
                   pReq->mountName, pVgInfo->vgId, pVgInfo->dbId, mr.me.uid, suid, mr.me.type, TSDB_SUPER_TABLE,
893
                   mr.me.colCmpr.nCols, mr.me.stbEntry.schemaRow.nCols, pReq->dnodeId);
894
            TSDB_CHECK_CODE(TSDB_CODE_FILE_CORRUPTED, lino, _exit0);
×
895
          }
896
          SMountStbInfo stbInfo = {
1,656✔
897
              .req.source = TD_REQ_FROM_APP,
898
              .req.suid = suid,
899
              .req.colVer = mr.me.stbEntry.schemaRow.version,
1,656✔
900
              .req.tagVer = mr.me.stbEntry.schemaTag.version,
1,656✔
901
              .req.numOfColumns = mr.me.stbEntry.schemaRow.nCols,
1,656✔
902
              .req.numOfTags = mr.me.stbEntry.schemaTag.nCols,
1,656✔
903
              .req.virtualStb = TABLE_IS_VIRTUAL(mr.me.flags) ? 1 : 0,
1,656✔
904
          };
905
          snprintf(stbInfo.req.name, sizeof(stbInfo.req.name), "%s", mr.me.name);
1,656✔
906
          if (!pCols && !(pCols = taosArrayInit(stbInfo.req.numOfColumns, sizeof(SFieldWithOptions)))) {
1,656✔
907
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
908
          }
909
          if (!pTags && !(pTags = taosArrayInit(stbInfo.req.numOfTags, sizeof(SField)))) {
1,656✔
910
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
911
          }
912

913
          if (!pColExts && !(pColExts = taosArrayInit(stbInfo.req.numOfColumns, sizeof(col_id_t)))) {
1,656✔
914
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
915
          }
916
          if (!pTagExts && !(pTagExts = taosArrayInit(stbInfo.req.numOfTags, sizeof(col_id_t)))) {
1,656✔
917
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
918
          }
919
          taosArrayClear(pCols);
1,656✔
920
          taosArrayClear(pTags);
1,656✔
921
          taosArrayClear(pColExts);
1,656✔
922
          taosArrayClear(pTagExts);
1,656✔
923
          stbInfo.req.pColumns = pCols;
1,656✔
924
          stbInfo.req.pTags = pTags;
1,656✔
925
          stbInfo.pColExts = pColExts;
1,656✔
926
          stbInfo.pTagExts = pTagExts;
1,656✔
927

928
          for (int32_t c = 0; c < stbInfo.req.numOfColumns; ++c) {
9,936✔
929
            SSchema          *pSchema = mr.me.stbEntry.schemaRow.pSchema + c;
8,280✔
930
            SColCmpr         *pColComp = mr.me.colCmpr.pColCmpr + c;
8,280✔
931
            SFieldWithOptions col = {
8,280✔
932
                .type = pSchema->type,
8,280✔
933
                .flags = pSchema->flags,
8,280✔
934
                .bytes = pSchema->bytes,
8,280✔
935
                .compress = pColComp->alg,
8,280✔
936
            };
937
            (void)snprintf(col.name, sizeof(col.name), "%s", pSchema->name);
8,280✔
938
            if (pSchema->colId != pColComp->id) {
8,280✔
939
              TSDB_CHECK_CODE(TSDB_CODE_FILE_CORRUPTED, lino, _exit0);
×
940
            }
941
            if (mr.me.pExtSchemas) {
8,280✔
942
              col.typeMod = (mr.me.pExtSchemas + c)->typeMod;
×
943
            }
944
            TSDB_CHECK_NULL(taosArrayPush(pCols, &col), code, lino, _exit0, terrno);
8,280✔
945
            TSDB_CHECK_NULL(taosArrayPush(pColExts, &pSchema->colId), code, lino, _exit0, terrno);
16,560✔
946
          }
947
          for (int32_t t = 0; t < stbInfo.req.numOfTags; ++t) {
3,864✔
948
            SSchema *pSchema = mr.me.stbEntry.schemaTag.pSchema + t;
2,208✔
949
            SField   tag = {
2,208✔
950
                  .type = pSchema->type,
2,208✔
951
                  .flags = pSchema->flags,
2,208✔
952
                  .bytes = pSchema->bytes,
2,208✔
953
            };
954
            (void)snprintf(tag.name, sizeof(tag.name), "%s", pSchema->name);
2,208✔
955
            TSDB_CHECK_NULL(taosArrayPush(pTags, &tag), code, lino, _exit0, terrno);
2,208✔
956
            TSDB_CHECK_NULL(taosArrayPush(pTagExts, &pSchema->colId), code, lino, _exit0, terrno);
4,416✔
957
          }
958
          tDecoderClear(&mr.coder);
1,656✔
959

960
          // serialize the SMountStbInfo
961
          int32_t firstPartLen = 0;
1,656✔
962
          int32_t msgLen = tSerializeSMountStbInfo(NULL, 0, &firstPartLen, &stbInfo);
1,656✔
963
          if (msgLen <= 0) {
1,656✔
964
            TSDB_CHECK_CODE(msgLen < 0 ? msgLen : TSDB_CODE_INTERNAL_ERROR, lino, _exit0);
×
965
          }
966
          void *pBuf = taosMemoryMalloc((sizeof(int32_t) << 1) + msgLen);  // totalLen(4)|1stPartLen(4)|1stPart|2ndPart
1,656✔
967
          if (!pBuf) TSDB_CHECK_CODE(TSDB_CODE_OUT_OF_MEMORY, lino, _exit0);
1,656✔
968
          *(int32_t *)pBuf = (sizeof(int32_t) << 1) + msgLen;
1,656✔
969
          *(int32_t *)POINTER_SHIFT(pBuf, sizeof(int32_t)) = firstPartLen;
1,656✔
970
          if (tSerializeSMountStbInfo(POINTER_SHIFT(pBuf, (sizeof(int32_t) << 1)), msgLen, NULL, &stbInfo) <= 0) {
1,656✔
971
            taosMemoryFree(pBuf);
×
972
            TSDB_CHECK_CODE(msgLen < 0 ? msgLen : TSDB_CODE_INTERNAL_ERROR, lino, _exit0);
×
973
          }
974
          if (!taosArrayPush(pDbInfo->pStbs, &pBuf)) {
3,312✔
975
            taosMemoryFree(pBuf);
×
976
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
977
          }
978
        }
979
      _exit0:
552✔
980
        metaReaderClear(&mr);
552✔
981
        metaClose(&vnode.pMeta);
552✔
982
        TAOS_CHECK_EXIT(code);
552✔
983
      }
984
      break;  // retrieve stbs from one vnode is enough
552✔
985
    }
986
  }
987
_exit:
276✔
988
  if (code != 0) {
276✔
989
    dError("mount:%s, failed to retrieve mount stbs at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
990
           pReq->dnodeId, tstrerror(code), path);
991
  }
992
  taosArrayDestroy(suidList);
276✔
993
  taosArrayDestroy(pCols);
276✔
994
  taosArrayDestroy(pTags);
276✔
995
  taosArrayDestroy(pColExts);
276✔
996
  taosArrayDestroy(pTagExts);
276✔
997
  TAOS_RETURN(code);
276✔
998
}
999

1000
int32_t vmMountCheckRunning(const char *mountName, const char *mountPath, TdFilePtr *pFile, int32_t retryLimit) {
1,252✔
1001
  int32_t code = 0, lino = 0;
1,252✔
1002
  int32_t retryTimes = 0;
1,252✔
1003
  char    filepath[PATH_MAX] = {0};
1,252✔
1004
  (void)snprintf(filepath, sizeof(filepath), "%s%s.running", mountPath, TD_DIRSEP);
1,252✔
1005
  TSDB_CHECK_NULL((*pFile = taosOpenFile(
1,252✔
1006
                       filepath, TD_FILE_CREATE | TD_FILE_READ | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_CLOEXEC)),
1007
                  code, lino, _exit, terrno);
1008
  int32_t ret = 0;
1,252✔
1009
  do {
1010
    ret = taosLockFile(*pFile);
1,536✔
1011
    if (ret == 0) break;
1,536✔
1012
    taosMsleep(1000);
426✔
1013
    ++retryTimes;
426✔
1014
    dError("mount:%s, failed to lock file:%s since %s, retryTimes:%d", mountName, filepath, tstrerror(ret), retryTimes);
426✔
1015
  } while (retryTimes < retryLimit);
426✔
1016
  TAOS_CHECK_EXIT(ret);
1,252✔
1017
_exit:
1,252✔
1018
  if (code != 0) {
1,252✔
1019
    (void)taosCloseFile(pFile);
142✔
1020
    *pFile = NULL;
142✔
1021
    dError("mount:%s, failed to check running at line %d since %s, path:%s", mountName, lino, tstrerror(code),
142✔
1022
           filepath);
1023
  }
1024
  TAOS_RETURN(code);
1,252✔
1025
}
1026

1027
static int32_t vmRetrieveMountPreCheck(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
844✔
1028
  int32_t code = 0, lino = 0;
844✔
1029
  char    path[TSDB_MOUNT_FPATH_LEN] = {0};
844✔
1030
  TSDB_CHECK_CONDITION(taosCheckAccessFile(pReq->mountPath, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
844✔
1031
  TAOS_CHECK_EXIT(vmMountCheckRunning(pReq->mountName, pReq->mountPath, &pMountInfo->pFile, 3));
702✔
1032
  (void)snprintf(path, sizeof(path), "%s%s%s%sdnode.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP);
560✔
1033
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
560✔
1034
  (void)snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(MNODE));
418✔
1035
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
418✔
1036
  (void)snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
418✔
1037
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
418✔
1038
  (void)snprintf(path, sizeof(path), "%s%s%s%sconfig%slocal.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP,
418✔
1039
           TD_DIRSEP);
1040
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
418✔
1041
_exit:
844✔
1042
  if (code != 0) {
844✔
1043
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
426✔
1044
           pReq->dnodeId, tstrerror(code), path);
1045
  }
1046
  TAOS_RETURN(code);
844✔
1047
}
1048

1049
static int32_t vmRetrieveMountPathImpl(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, SRetrieveMountPathReq *pReq,
844✔
1050
                                       SMountInfo *pMountInfo) {
1051
  int32_t code = 0, lino = 0;
844✔
1052
  pMountInfo->dnodeId = pReq->dnodeId;
844✔
1053
  pMountInfo->mountUid = pReq->mountUid;
844✔
1054
  (void)snprintf(pMountInfo->mountName, sizeof(pMountInfo->mountName), "%s", pReq->mountName);
844✔
1055
  (void)snprintf(pMountInfo->mountPath, sizeof(pMountInfo->mountPath), "%s", pReq->mountPath);
844✔
1056
  pMountInfo->ignoreExist = pReq->ignoreExist;
844✔
1057
  pMountInfo->valLen = pReq->valLen;
844✔
1058
  pMountInfo->pVal = pReq->pVal;
844✔
1059
  TAOS_CHECK_EXIT(vmRetrieveMountPreCheck(pMgmt, pReq, pMountInfo));
844✔
1060
  TAOS_CHECK_EXIT(vmRetrieveMountDnode(pMgmt, pReq, pMountInfo));
418✔
1061
  TAOS_CHECK_EXIT(vmRetrieveMountVnodes(pMgmt, pReq, pMountInfo));
418✔
1062
  TAOS_CHECK_EXIT(vmRetrieveMountStbs(pMgmt, pReq, pMountInfo));
276✔
1063
_exit:
276✔
1064
  if (code != 0) {
844✔
1065
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
568✔
1066
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
1067
  }
1068
  TAOS_RETURN(code);
844✔
1069
}
1070

1071
int32_t vmProcessRetrieveMountPathReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
844✔
1072
  int32_t               code = 0, lino = 0;
844✔
1073
  int32_t               rspCode = 0;
844✔
1074
  SVnodeMgmt            vndMgmt = {0};
844✔
1075
  SMountInfo            mountInfo = {0};
844✔
1076
  void                 *pBuf = NULL;
844✔
1077
  int32_t               bufLen = 0;
844✔
1078
  SRetrieveMountPathReq req = {0};
844✔
1079

1080
  vndMgmt = *pMgmt;
844✔
1081
  vndMgmt.path = NULL;
844✔
1082
  TAOS_CHECK_GOTO(tDeserializeSRetrieveMountPathReq(pMsg->pCont, pMsg->contLen, &req), &lino, _end);
844✔
1083
  dInfo("mount:%s, start to retrieve path:%s", req.mountName, req.mountPath);
844✔
1084
  TAOS_CHECK_GOTO(vmRetrieveMountPathImpl(&vndMgmt, pMsg, &req, &mountInfo), &lino, _end);
844✔
1085
_end:
844✔
1086
  TSDB_CHECK_CONDITION((bufLen = tSerializeSMountInfo(NULL, 0, &mountInfo)) >= 0, rspCode, lino, _exit, bufLen);
844✔
1087
  TSDB_CHECK_CONDITION((pBuf = rpcMallocCont(bufLen)), rspCode, lino, _exit, terrno);
844✔
1088
  TSDB_CHECK_CONDITION((bufLen = tSerializeSMountInfo(pBuf, bufLen, &mountInfo)) >= 0, rspCode, lino, _exit, bufLen);
844✔
1089
  pMsg->info.rsp = pBuf;
844✔
1090
  pMsg->info.rspLen = bufLen;
844✔
1091
_exit:
844✔
1092
  if (rspCode != 0) {
844✔
1093
    // corner case: if occurs, the client will not receive the response, and the client should be killed manually
1094
    dError("mount:%s, failed to retrieve mount at line %d since %s, dnode:%d, path:%s", req.mountName, lino,
×
1095
           tstrerror(rspCode), req.dnodeId, req.mountPath);
1096
    rpcFreeCont(pBuf);
×
1097
    code = rspCode;
×
1098
  } else if (code != 0) {
844✔
1099
    // the client would receive the response with error msg
1100
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", req.mountName, lino,
568✔
1101
           req.dnodeId, tstrerror(code), req.mountPath);
1102
  } else {
1103
    int32_t nVgs = 0;
276✔
1104
    int32_t nDbs = taosArrayGetSize(mountInfo.pDbs);
276✔
1105
    for (int32_t i = 0; i < nDbs; ++i) {
828✔
1106
      SMountDbInfo *pDb = TARRAY_GET_ELEM(mountInfo.pDbs, i);
552✔
1107
      nVgs += taosArrayGetSize(pDb->pVgs);
552✔
1108
    }
1109
    dInfo("mount:%s, success to retrieve mount, nDbs:%d, nVgs:%d, path:%s", req.mountName, nDbs, nVgs, req.mountPath);
276✔
1110
  }
1111
  taosMemFreeClear(vndMgmt.path);
844✔
1112
  tFreeMountInfo(&mountInfo, false);
844✔
1113
  TAOS_RETURN(code);
844✔
1114
}
1115

1116
static int32_t vmMountVnode(SVnodeMgmt *pMgmt, const char *path, SVnodeCfg *pCfg, int32_t diskPrimary,
1,104✔
1117
                            SMountVnodeReq *req, STfs *pMountTfs) {
1118
  int32_t    code = 0;
1,104✔
1119
  SVnodeInfo info = {0};
1,104✔
1120
  char       hostDir[TSDB_FILENAME_LEN] = {0};
1,104✔
1121
  char       mountDir[TSDB_FILENAME_LEN] = {0};
1,104✔
1122
  char       mountVnode[32] = {0};
1,104✔
1123

1124
  if ((code = vnodeCheckCfg(pCfg)) < 0) {
1,104✔
1125
    vError("vgId:%d, mount:%s, failed to mount vnode since:%s", pCfg->vgId, req->mountName, tstrerror(code));
×
1126
    return code;
×
1127
  }
1128

1129
  vnodeGetPrimaryDir(path, 0, pMgmt->pTfs, hostDir, TSDB_FILENAME_LEN);
1,104✔
1130
  if ((code = taosMkDir(hostDir))) {
1,104✔
1131
    vError("vgId:%d, mount:%s, failed to prepare vnode dir since %s, host path: %s", pCfg->vgId, req->mountName,
×
1132
           tstrerror(code), hostDir);
1133
    return code;
×
1134
  }
1135

1136
  info.config = *pCfg;  // copy the config
1,104✔
1137
  info.state.committed = req->committed;
1,104✔
1138
  info.state.commitID = req->commitID;
1,104✔
1139
  info.state.commitTerm = req->commitTerm;
1,104✔
1140
  info.state.applied = req->committed;
1,104✔
1141
  info.state.applyTerm = req->commitTerm;
1,104✔
1142
  info.config.vndStats.numOfSTables = req->numOfSTables;
1,104✔
1143
  info.config.vndStats.numOfCTables = req->numOfCTables;
1,104✔
1144
  info.config.vndStats.numOfNTables = req->numOfNTables;
1,104✔
1145

1146
  SVnodeInfo oldInfo = {0};
1,104✔
1147
  oldInfo.config = vnodeCfgDefault;
1,104✔
1148
  if (vnodeLoadInfo(hostDir, &oldInfo) == 0) {
1,104✔
1149
    if (oldInfo.config.dbId != info.config.dbId) {
×
1150
      code = TSDB_CODE_VND_ALREADY_EXIST_BUT_NOT_MATCH;
×
1151
      vError("vgId:%d, mount:%s, vnode config info already exists at %s. oldDbId:%" PRId64 "(%s) at cluster:%" PRId64
×
1152
             ", newDbId:%" PRId64 "(%s) at cluser:%" PRId64 ", code:%s",
1153
             oldInfo.config.vgId, req->mountName, hostDir, oldInfo.config.dbId, oldInfo.config.dbname,
1154
             oldInfo.config.syncCfg.nodeInfo[oldInfo.config.syncCfg.myIndex].clusterId, info.config.dbId,
1155
             info.config.dbname, info.config.syncCfg.nodeInfo[info.config.syncCfg.myIndex].clusterId, tstrerror(code));
1156

1157
    } else {
1158
      vWarn("vgId:%d, mount:%s, vnode config info already exists at %s.", oldInfo.config.vgId, req->mountName, hostDir);
×
1159
    }
1160
    return code;
×
1161
  }
1162

1163
  char hostSubDir[TSDB_FILENAME_LEN] = {0};
1,104✔
1164
  char mountSubDir[TSDB_FILENAME_LEN] = {0};
1,104✔
1165
  (void)snprintf(mountVnode, sizeof(mountVnode), "vnode%svnode%d", TD_DIRSEP, req->mountVgId);
1,104✔
1166
  vnodeGetPrimaryDir(mountVnode, diskPrimary, pMountTfs, mountDir, TSDB_FILENAME_LEN);
1,104✔
1167
  static const char *vndSubDirs[] = {"meta", "sync", "tq", "tsdb", "wal"};
1168
  for (int32_t i = 0; i < tListLen(vndSubDirs); ++i) {
6,624✔
1169
    (void)snprintf(hostSubDir, sizeof(hostSubDir), "%s%s%s", hostDir, TD_DIRSEP, vndSubDirs[i]);
5,520✔
1170
    (void)snprintf(mountSubDir, sizeof(mountSubDir), "%s%s%s", mountDir, TD_DIRSEP, vndSubDirs[i]);
5,520✔
1171
    if ((code = taosSymLink(mountSubDir, hostSubDir)) != 0) {
5,520✔
1172
      vError("vgId:%d, mount:%s, failed to create vnode symlink %s -> %s since %s", info.config.vgId, req->mountName,
×
1173
             mountSubDir, hostSubDir, tstrerror(code));
1174
      return code;
×
1175
    }
1176
  }
1177
  vInfo("vgId:%d, mount:save vnode config while create", info.config.vgId);
1,104✔
1178
  if ((code = vnodeSaveInfo(hostDir, &info)) < 0 || (code = vnodeCommitInfo(hostDir)) < 0) {
1,104✔
1179
    vError("vgId:%d, mount:%s, failed to save vnode config since %s, mount path: %s", pCfg ? pCfg->vgId : 0,
×
1180
           req->mountName, tstrerror(code), hostDir);
1181
    return code;
×
1182
  }
1183
  vInfo("vgId:%d, mount:%s, vnode is mounted from %s to %s", info.config.vgId, req->mountName, mountDir, hostDir);
1,104✔
1184
  return 0;
1,104✔
1185
}
1186

1187
int32_t vmProcessMountVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
1,104✔
1188
  int32_t          code = 0, lino = 0;
1,104✔
1189
  SMountVnodeReq   req = {0};
1,104✔
1190
  SCreateVnodeReq *pCreateReq = &req.createReq;
1,104✔
1191
  SVnodeCfg        vnodeCfg = {0};
1,104✔
1192
  SWrapperCfg      wrapperCfg = {0};
1,104✔
1193
  SVnode          *pImpl = NULL;
1,104✔
1194
  STfs            *pMountTfs = NULL;
1,104✔
1195
  char             path[TSDB_FILENAME_LEN] = {0};
1,104✔
1196
  bool             releaseTfs = false;
1,104✔
1197

1198
  if (tDeserializeSMountVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
1,104✔
1199
    dError("vgId:%d, failed to mount vnode since deserialize request error", pCreateReq->vgId);
×
1200
    return TSDB_CODE_INVALID_MSG;
×
1201
  }
1202

1203
  if (pCreateReq->learnerReplica == 0) {
1,104✔
1204
    pCreateReq->learnerSelfIndex = -1;
1,104✔
1205
  }
1206
  for (int32_t i = 0; i < pCreateReq->replica; ++i) {
2,066✔
1207
    dInfo("mount:%s, vgId:%d, replica:%d ep:%s:%u dnode:%d", req.mountName, pCreateReq->vgId, i,
962✔
1208
          pCreateReq->replicas[i].fqdn, pCreateReq->replicas[i].port, pCreateReq->replicas[i].id);
1209
  }
1210
  for (int32_t i = 0; i < pCreateReq->learnerReplica; ++i) {
1,104✔
1211
    dInfo("mount:%s, vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.mountName, pCreateReq->vgId, i,
×
1212
          pCreateReq->learnerReplicas[i].fqdn, pCreateReq->learnerReplicas[i].port, pCreateReq->replicas[i].id);
1213
  }
1214

1215
  SReplica *pReplica = NULL;
1,104✔
1216
  if (pCreateReq->selfIndex != -1) {
1,104✔
1217
    pReplica = &pCreateReq->replicas[pCreateReq->selfIndex];
1,104✔
1218
  } else {
1219
    pReplica = &pCreateReq->learnerReplicas[pCreateReq->learnerSelfIndex];
×
1220
  }
1221
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
1,104✔
1222
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
1,104✔
1223
    (void)tFreeSMountVnodeReq(&req);
×
1224
    code = TSDB_CODE_INVALID_MSG;
×
1225
    dError("mount:%s, vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode, reason:%s", req.mountName,
×
1226
           pCreateReq->vgId, pReplica->id, pReplica->fqdn, pReplica->port, tstrerror(code));
1227
    return code;
×
1228
  }
1229
  
1230
  if (taosWaitCfgKeyLoaded() != 0) {
1,104✔
1231
    (void)tFreeSMountVnodeReq(&req);
×
1232
    code = terrno;
×
1233
    dError("mount:%s, vgId:%d, failed to create vnode since encrypt key is not loaded, reason:%s", req.mountName,
×
1234
           pCreateReq->vgId, tstrerror(code));
1235
    return code;
×
1236
  }
1237

1238
  vmGenerateVnodeCfg(pCreateReq, &vnodeCfg);
1,104✔
1239
  vnodeCfg.mountVgId = req.mountVgId;
1,104✔
1240
  vmGenerateWrapperCfg(pMgmt, pCreateReq, &wrapperCfg);
1,104✔
1241
  wrapperCfg.mountId = req.mountId;
1,104✔
1242

1243
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, pCreateReq->vgId, false);
1,104✔
1244
  if (pVnode != NULL && (pCreateReq->replica == 1 || !pVnode->failed)) {
1,104✔
1245
    dError("mount:%s, vgId:%d, already exist", req.mountName, pCreateReq->vgId);
×
1246
    (void)tFreeSMountVnodeReq(&req);
×
1247
    vmReleaseVnode(pMgmt, pVnode);
×
1248
    code = TSDB_CODE_VND_ALREADY_EXIST;
×
1249
    return 0;
×
1250
  }
1251
  vmReleaseVnode(pMgmt, pVnode);
1,104✔
1252

1253
  wrapperCfg.diskPrimary = req.diskPrimary;
1,104✔
1254
  (void)snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
1,104✔
1255
  TAOS_CHECK_EXIT(vmAcquireMountTfs(pMgmt, req.mountId, req.mountName, req.mountPath, &pMountTfs));
1,104✔
1256
  releaseTfs = true;
1,104✔
1257

1258
  TAOS_CHECK_EXIT(vmMountVnode(pMgmt, path, &vnodeCfg, wrapperCfg.diskPrimary, &req, pMountTfs));
1,104✔
1259
  if (!(pImpl = vnodeOpen(path, 0, pMgmt->pTfs, pMountTfs, pMgmt->msgCb, true))) {
1,104✔
1260
    TAOS_CHECK_EXIT(terrno != 0 ? terrno : -1);
×
1261
  }
1262
  if ((code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl)) != 0) {
1,104✔
1263
    TAOS_CHECK_EXIT(terrno != 0 ? terrno : code);
×
1264
  }
1265
  TAOS_CHECK_EXIT(vnodeStart(pImpl));
1,104✔
1266
  TAOS_CHECK_EXIT(vmWriteVnodeListToFile(pMgmt));
1,104✔
1267
  TAOS_CHECK_EXIT(vmWriteMountListToFile(pMgmt));
1,104✔
1268
_exit:
1,104✔
1269
  vmCleanPrimaryDisk(pMgmt, pCreateReq->vgId);
1,104✔
1270
  if (code != 0) {
1,104✔
1271
    dError("mount:%s, vgId:%d, msgType:%s, failed at line %d to mount vnode since %s", req.mountName, pCreateReq->vgId,
×
1272
           TMSG_INFO(pMsg->msgType), lino, tstrerror(code));
1273
    vmCloseFailedVnode(pMgmt, pCreateReq->vgId);
×
1274
    vnodeClose(pImpl);
×
1275
    vnodeDestroy(0, path, pMgmt->pTfs, 0);
×
1276
    if (releaseTfs) vmReleaseMountTfs(pMgmt, req.mountId, 1);
×
1277
  } else {
1278
    dInfo("mount:%s, vgId:%d, msgType:%s, success to mount vnode", req.mountName, pCreateReq->vgId,
1,104✔
1279
          TMSG_INFO(pMsg->msgType));
1280
  }
1281

1282
  pMsg->code = code;
1,104✔
1283
  pMsg->info.rsp = NULL;
1,104✔
1284
  pMsg->info.rspLen = 0;
1,104✔
1285

1286
  (void)tFreeSMountVnodeReq(&req);
1,104✔
1287
  TAOS_RETURN(code);
1,104✔
1288
}
1289
#endif  // USE_MOUNT
1290

1291
// alter replica doesn't use this, but restore dnode still use this
1292
int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
2,199,632✔
1293
  SAlterVnodeTypeReq req = {0};
2,199,632✔
1294
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
2,199,632✔
1295
    terrno = TSDB_CODE_INVALID_MSG;
×
1296
    return -1;
×
1297
  }
1298

1299
  if (req.learnerReplicas == 0) {
1300
    req.learnerSelfIndex = -1;
1301
  }
1302

1303
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process alter-node-type-request", req.vgId,
2,199,632✔
1304
        TMSG_INFO(pMsg->msgType));
1305

1306
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
2,199,632✔
1307
  if (pVnode == NULL) {
2,199,632✔
1308
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
1309
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1310
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1311
    return -1;
×
1312
  }
1313

1314
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
2,199,632✔
1315
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
2,199,632✔
1316
  if (role == TAOS_SYNC_ROLE_VOTER) {
2,199,632✔
1317
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
1318
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
1319
    vmReleaseVnode(pMgmt, pVnode);
×
1320
    return -1;
×
1321
  }
1322

1323
  dInfo("vgId:%d, checking node catch up", req.vgId);
2,199,632✔
1324
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
2,199,632✔
1325
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
2,095,858✔
1326
    vmReleaseVnode(pMgmt, pVnode);
2,095,858✔
1327
    return -1;
2,095,858✔
1328
  }
1329

1330
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
103,774✔
1331

1332
  int32_t vgId = req.vgId;
103,774✔
1333
  dInfo("vgId:%d, start to alter vnode type replica:%d selfIndex:%d strict:%d changeVersion:%d", vgId, req.replica,
103,774✔
1334
        req.selfIndex, req.strict, req.changeVersion);
1335
  for (int32_t i = 0; i < req.replica; ++i) {
409,361✔
1336
    SReplica *pReplica = &req.replicas[i];
305,587✔
1337
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
305,587✔
1338
  }
1339
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
103,774✔
1340
    SReplica *pReplica = &req.learnerReplicas[i];
×
1341
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
×
1342
  }
1343

1344
  if (req.replica <= 0 || (req.selfIndex < 0 && req.learnerSelfIndex < 0) || req.selfIndex >= req.replica ||
103,774✔
1345
      req.learnerSelfIndex >= req.learnerReplica) {
103,774✔
1346
    terrno = TSDB_CODE_INVALID_MSG;
×
1347
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
1348
    vmReleaseVnode(pMgmt, pVnode);
×
1349
    return -1;
×
1350
  }
1351

1352
  SReplica *pReplica = NULL;
103,774✔
1353
  if (req.selfIndex != -1) {
103,774✔
1354
    pReplica = &req.replicas[req.selfIndex];
103,774✔
1355
  } else {
1356
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
×
1357
  }
1358

1359
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
103,774✔
1360
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
103,774✔
1361
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1362
    dError("vgId:%d, dnodeId:%d ep:%s:%u in request, ep:%s:%u in local, %s", vgId, pReplica->id, pReplica->fqdn,
×
1363
           pReplica->port, tsLocalFqdn, tsServerPort, tstrerror(terrno));
1364
    vmReleaseVnode(pMgmt, pVnode);
×
1365
    return -1;
×
1366
  }
1367

1368
  dInfo("vgId:%d, start to close vnode", vgId);
103,774✔
1369
  SWrapperCfg wrapperCfg = {
103,774✔
1370
      .dropped = pVnode->dropped,
103,774✔
1371
      .vgId = pVnode->vgId,
103,774✔
1372
      .vgVersion = pVnode->vgVersion,
103,774✔
1373
      .diskPrimary = pVnode->diskPrimary,
103,774✔
1374
  };
1375
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
103,774✔
1376

1377
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
103,774✔
1378
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
103,774✔
1379

1380
  int32_t diskPrimary = wrapperCfg.diskPrimary;
103,774✔
1381
  char    path[TSDB_FILENAME_LEN] = {0};
103,774✔
1382
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
103,774✔
1383

1384
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
103,774✔
1385
  if (vnodeAlterReplica(path, &req, diskPrimary, pMgmt->pTfs) < 0) {
103,774✔
1386
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
1387
    return -1;
×
1388
  }
1389

1390
  dInfo("vgId:%d, begin to open vnode", vgId);
103,774✔
1391
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
103,774✔
1392
  if (pImpl == NULL) {
103,774✔
1393
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
1394
    return -1;
×
1395
  }
1396

1397
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
103,774✔
1398
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
1399
    return -1;
×
1400
  }
1401

1402
  if (vnodeStart(pImpl) != 0) {
103,774✔
1403
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
1404
    return -1;
×
1405
  }
1406

1407
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process alter-node-type-request, vnode config is altered",
103,774✔
1408
        req.vgId, TMSG_INFO(pMsg->msgType));
1409
  return 0;
103,774✔
1410
}
1411

1412
int32_t vmProcessCheckLearnCatchupReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
1413
  SCheckLearnCatchupReq req = {0};
×
1414
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
×
1415
    terrno = TSDB_CODE_INVALID_MSG;
×
1416
    return -1;
×
1417
  }
1418

1419
  if (req.learnerReplicas == 0) {
1420
    req.learnerSelfIndex = -1;
1421
  }
1422

1423
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process check-learner-catchup-request", req.vgId,
×
1424
        TMSG_INFO(pMsg->msgType));
1425

1426
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
×
1427
  if (pVnode == NULL) {
×
1428
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
1429
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1430
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1431
    return -1;
×
1432
  }
1433

1434
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
×
1435
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
×
1436
  if (role == TAOS_SYNC_ROLE_VOTER) {
×
1437
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
1438
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
1439
    vmReleaseVnode(pMgmt, pVnode);
×
1440
    return -1;
×
1441
  }
1442

1443
  dInfo("vgId:%d, checking node catch up", req.vgId);
×
1444
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
×
1445
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
×
1446
    vmReleaseVnode(pMgmt, pVnode);
×
1447
    return -1;
×
1448
  }
1449

1450
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
×
1451

1452
  vmReleaseVnode(pMgmt, pVnode);
×
1453

1454
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process check-learner-catchup-request", req.vgId,
×
1455
        TMSG_INFO(pMsg->msgType));
1456

1457
  return 0;
×
1458
}
1459

1460
int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
34,195✔
1461
  SDisableVnodeWriteReq req = {0};
34,195✔
1462
  if (tDeserializeSDisableVnodeWriteReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
34,195✔
1463
    terrno = TSDB_CODE_INVALID_MSG;
×
1464
    return -1;
×
1465
  }
1466

1467
  dInfo("vgId:%d, vnode write disable:%d", req.vgId, req.disable);
34,195✔
1468

1469
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
34,195✔
1470
  if (pVnode == NULL) {
34,195✔
1471
    dError("vgId:%d, failed to disable write since %s", req.vgId, terrstr());
×
1472
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1473
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1474
    return -1;
×
1475
  }
1476

1477
  pVnode->disable = req.disable;
34,195✔
1478
  vmReleaseVnode(pMgmt, pVnode);
34,195✔
1479
  return 0;
34,195✔
1480
}
1481

1482
int32_t vmProcessSetKeepVersionReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
3,369✔
1483
  SMsgHead *pHead = pMsg->pCont;
3,369✔
1484
  pHead->contLen = ntohl(pHead->contLen);
3,369✔
1485
  pHead->vgId = ntohl(pHead->vgId);
3,369✔
1486

1487
  SVndSetKeepVersionReq req = {0};
3,369✔
1488
  if (tDeserializeSVndSetKeepVersionReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead),
3,369✔
1489
                                        &req) != 0) {
1490
    terrno = TSDB_CODE_INVALID_MSG;
×
1491
    return -1;
×
1492
  }
1493

1494
  dInfo("vgId:%d, set wal keep version to %" PRId64, pHead->vgId, req.keepVersion);
3,369✔
1495

1496
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
3,369✔
1497
  if (pVnode == NULL) {
3,369✔
1498
    dError("vgId:%d, failed to set keep version since %s", pHead->vgId, terrstr());
×
1499
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1500
    return -1;
×
1501
  }
1502

1503
  // Directly call vnodeSetWalKeepVersion for immediate effect (< 1ms)
1504
  // This bypasses Raft to avoid timing issues where WAL might be deleted
1505
  // before keepVersion is set through the Raft consensus process
1506
  int32_t code = vnodeSetWalKeepVersion(pVnode->pImpl, req.keepVersion);
3,369✔
1507
  if (code != TSDB_CODE_SUCCESS) {
3,369✔
1508
    dError("vgId:%d, failed to set keepVersion to %" PRId64 " since %s", pHead->vgId, req.keepVersion, tstrerror(code));
×
1509
    terrno = code;
×
1510
    vmReleaseVnode(pMgmt, pVnode);
×
1511
    return -1;
×
1512
  }
1513

1514
  dInfo("vgId:%d, successfully set keepVersion to %" PRId64, pHead->vgId, req.keepVersion);
3,369✔
1515

1516
  vmReleaseVnode(pMgmt, pVnode);
3,369✔
1517
  return 0;
3,369✔
1518
}
1519

1520
int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
33,456✔
1521
  SAlterVnodeHashRangeReq req = {0};
33,456✔
1522
  if (tDeserializeSAlterVnodeHashRangeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
33,456✔
1523
    terrno = TSDB_CODE_INVALID_MSG;
×
1524
    return -1;
×
1525
  }
1526

1527
  int32_t srcVgId = req.srcVgId;
33,456✔
1528
  int32_t dstVgId = req.dstVgId;
33,456✔
1529

1530
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, dstVgId);
33,456✔
1531
  if (pVnode != NULL) {
33,456✔
1532
    dError("vgId:%d, vnode already exist", dstVgId);
×
1533
    vmReleaseVnode(pMgmt, pVnode);
×
1534
    terrno = TSDB_CODE_VND_ALREADY_EXIST;
×
1535
    return -1;
×
1536
  }
1537

1538
  dInfo("vgId:%d, start to alter vnode hashrange:[%u, %u], dstVgId:%d", req.srcVgId, req.hashBegin, req.hashEnd,
33,456✔
1539
        req.dstVgId);
1540
  pVnode = vmAcquireVnode(pMgmt, srcVgId);
33,456✔
1541
  if (pVnode == NULL) {
33,456✔
1542
    dError("vgId:%d, failed to alter hashrange since %s", srcVgId, terrstr());
×
1543
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1544
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1545
    return -1;
×
1546
  }
1547

1548
  SWrapperCfg wrapperCfg = {
33,456✔
1549
      .dropped = pVnode->dropped,
33,456✔
1550
      .vgId = dstVgId,
1551
      .vgVersion = pVnode->vgVersion,
33,456✔
1552
      .diskPrimary = pVnode->diskPrimary,
33,456✔
1553
  };
1554
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
33,456✔
1555

1556
  // prepare alter
1557
  pVnode->toVgId = dstVgId;
33,456✔
1558
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
33,456✔
1559
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
1560
    return -1;
×
1561
  }
1562

1563
  dInfo("vgId:%d, close vnode", srcVgId);
33,456✔
1564
  vmCloseVnode(pMgmt, pVnode, true, false);
33,456✔
1565

1566
  int32_t diskPrimary = wrapperCfg.diskPrimary;
33,456✔
1567
  char    srcPath[TSDB_FILENAME_LEN] = {0};
33,456✔
1568
  char    dstPath[TSDB_FILENAME_LEN] = {0};
33,456✔
1569
  snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
33,456✔
1570
  snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
33,456✔
1571

1572
  dInfo("vgId:%d, alter vnode hashrange at %s", srcVgId, srcPath);
33,456✔
1573
  if (vnodeAlterHashRange(srcPath, dstPath, &req, diskPrimary, pMgmt->pTfs) < 0) {
33,456✔
1574
    dError("vgId:%d, failed to alter vnode hashrange since %s", srcVgId, terrstr());
×
1575
    return -1;
×
1576
  }
1577

1578
  dInfo("vgId:%d, open vnode", dstVgId);
33,456✔
1579
  SVnode *pImpl = vnodeOpen(dstPath, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
33,456✔
1580

1581
  if (pImpl == NULL) {
33,456✔
1582
    dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr());
×
1583
    return -1;
×
1584
  }
1585

1586
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
33,456✔
1587
    dError("vgId:%d, failed to open vnode mgmt since %s", dstVgId, terrstr());
×
1588
    return -1;
×
1589
  }
1590

1591
  if (vnodeStart(pImpl) != 0) {
33,456✔
1592
    dError("vgId:%d, failed to start sync since %s", dstVgId, terrstr());
×
1593
    return -1;
×
1594
  }
1595

1596
  // complete alter
1597
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
33,456✔
1598
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
1599
    return -1;
×
1600
  }
1601

1602
  dInfo("vgId:%d, vnode hashrange is altered", dstVgId);
33,456✔
1603
  return 0;
33,456✔
1604
}
1605

1606
int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
715,313✔
1607
  SAlterVnodeReplicaReq alterReq = {0};
715,313✔
1608
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
715,313✔
1609
    terrno = TSDB_CODE_INVALID_MSG;
×
1610
    return -1;
×
1611
  }
1612

1613
  if (alterReq.learnerReplica == 0) {
715,313✔
1614
    alterReq.learnerSelfIndex = -1;
514,133✔
1615
  }
1616

1617
  int32_t vgId = alterReq.vgId;
715,313✔
1618
  dInfo(
715,313✔
1619
      "vgId:%d, vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
1620
      "learnerSelfIndex:%d strict:%d changeVersion:%d",
1621
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
1622
      alterReq.learnerSelfIndex, alterReq.strict, alterReq.changeVersion);
1623

1624
  for (int32_t i = 0; i < alterReq.replica; ++i) {
2,764,165✔
1625
    SReplica *pReplica = &alterReq.replicas[i];
2,048,852✔
1626
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
2,048,852✔
1627
  }
1628
  for (int32_t i = 0; i < alterReq.learnerReplica; ++i) {
916,493✔
1629
    SReplica *pReplica = &alterReq.learnerReplicas[i];
201,180✔
1630
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
201,180✔
1631
  }
1632

1633
  if (alterReq.replica <= 0 || (alterReq.selfIndex < 0 && alterReq.learnerSelfIndex < 0) ||
715,313✔
1634
      alterReq.selfIndex >= alterReq.replica || alterReq.learnerSelfIndex >= alterReq.learnerReplica) {
715,313✔
1635
    terrno = TSDB_CODE_INVALID_MSG;
×
1636
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
1637
    return -1;
×
1638
  }
1639

1640
  SReplica *pReplica = NULL;
715,313✔
1641
  if (alterReq.selfIndex != -1) {
715,313✔
1642
    pReplica = &alterReq.replicas[alterReq.selfIndex];
715,313✔
1643
  } else {
1644
    pReplica = &alterReq.learnerReplicas[alterReq.learnerSelfIndex];
×
1645
  }
1646

1647
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
715,313✔
1648
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
715,313✔
1649
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1650
    dError("vgId:%d, dnodeId:%d ep:%s:%u in request, ep:%s:%u in lcoal, %s", vgId, pReplica->id, pReplica->fqdn,
×
1651
           pReplica->port, tsLocalFqdn, tsServerPort, tstrerror(terrno));
1652
    return -1;
×
1653
  }
1654

1655
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
715,313✔
1656
  if (pVnode == NULL) {
715,313✔
1657
    dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
×
1658
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1659
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1660
    return -1;
×
1661
  }
1662

1663
  dInfo("vgId:%d, start to close vnode", vgId);
715,313✔
1664
  SWrapperCfg wrapperCfg = {
715,313✔
1665
      .dropped = pVnode->dropped,
715,313✔
1666
      .vgId = pVnode->vgId,
715,313✔
1667
      .vgVersion = pVnode->vgVersion,
715,313✔
1668
      .diskPrimary = pVnode->diskPrimary,
715,313✔
1669
  };
1670
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
715,313✔
1671

1672
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
715,313✔
1673
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
715,313✔
1674

1675
  int32_t diskPrimary = wrapperCfg.diskPrimary;
715,313✔
1676
  char    path[TSDB_FILENAME_LEN] = {0};
715,313✔
1677
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
715,313✔
1678

1679
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
715,313✔
1680
  if (vnodeAlterReplica(path, &alterReq, diskPrimary, pMgmt->pTfs) < 0) {
715,313✔
1681
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
1682
    return -1;
×
1683
  }
1684

1685
  dInfo("vgId:%d, begin to open vnode", vgId);
715,313✔
1686
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
715,313✔
1687
  if (pImpl == NULL) {
715,313✔
1688
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
1689
    return -1;
×
1690
  }
1691

1692
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
715,313✔
1693
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
1694
    return -1;
×
1695
  }
1696

1697
  if (vnodeStart(pImpl) != 0) {
715,313✔
1698
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
1699
    return -1;
×
1700
  }
1701

1702
  dInfo(
715,313✔
1703
      "vgId:%d, vnode management handle msgType:%s, end to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
1704
      "learnerSelfIndex:%d strict:%d",
1705
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
1706
      alterReq.learnerSelfIndex, alterReq.strict);
1707
  return 0;
715,313✔
1708
}
1709

1710
int32_t vmProcessAlterVnodeElectBaselineReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
21,228✔
1711
  SAlterVnodeElectBaselineReq alterReq = {0};
21,228✔
1712
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
21,228✔
1713
    return TSDB_CODE_INVALID_MSG;
×
1714
  }
1715

1716
  int32_t vgId = alterReq.vgId;
21,228✔
1717
  dInfo(
21,228✔
1718
      "vgId:%d, process alter vnode elect-base-line msgType:%s, electBaseLine:%d",
1719
      vgId, TMSG_INFO(pMsg->msgType), alterReq.electBaseLine);
1720

1721
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
21,228✔
1722
  if (pVnode == NULL) {
21,228✔
1723
    dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
×
1724
    return terrno;
×
1725
  }
1726

1727
  if(vnodeSetElectBaseline(pVnode->pImpl, alterReq.electBaseLine) != 0){
21,228✔
1728
    vmReleaseVnode(pMgmt, pVnode);
×
1729
    return -1;
×
1730
  }
1731

1732
  vmReleaseVnode(pMgmt, pVnode);
21,228✔
1733
  return 0;
21,228✔
1734
}
1735

1736
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
1,667,938✔
1737
  int32_t       code = 0;
1,667,938✔
1738
  SDropVnodeReq dropReq = {0};
1,667,938✔
1739
  if (tDeserializeSDropVnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
1,667,938✔
1740
    terrno = TSDB_CODE_INVALID_MSG;
×
1741
    return terrno;
×
1742
  }
1743

1744
  int32_t vgId = dropReq.vgId;
1,667,938✔
1745
  dInfo("vgId:%d, start to drop vnode", vgId);
1,667,938✔
1746

1747
  if (dropReq.dnodeId != pMgmt->pData->dnodeId) {
1,667,938✔
1748
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1749
    dError("vgId:%d, dnodeId:%d, %s", dropReq.vgId, dropReq.dnodeId, tstrerror(terrno));
×
1750
    return terrno;
×
1751
  }
1752

1753
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, vgId, false);
1,667,938✔
1754
  if (pVnode == NULL) {
1,667,938✔
1755
    dInfo("vgId:%d, failed to drop since %s", vgId, terrstr());
×
1756
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1757
    return terrno;
×
1758
  }
1759

1760
  pVnode->dropped = 1;
1,667,938✔
1761
  if ((code = vmWriteVnodeListToFile(pMgmt)) != 0) {
1,667,938✔
1762
    pVnode->dropped = 0;
×
1763
    vmReleaseVnode(pMgmt, pVnode);
×
1764
    return code;
×
1765
  }
1766

1767
  vmCloseVnode(pMgmt, pVnode, false, false);
1,667,938✔
1768
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
1,667,938✔
1769
    dError("vgId:%d, failed to write vnode list since %s", vgId, terrstr());
×
1770
  }
1771

1772
  dInfo("vgId:%d, is dropped", vgId);
1,667,938✔
1773
  return 0;
1,667,938✔
1774
}
1775

1776
int32_t vmProcessArbHeartBeatReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
101,659✔
1777
  SVArbHeartBeatReq arbHbReq = {0};
101,659✔
1778
  SVArbHeartBeatRsp arbHbRsp = {0};
101,659✔
1779
  if (tDeserializeSVArbHeartBeatReq(pMsg->pCont, pMsg->contLen, &arbHbReq) != 0) {
101,659✔
1780
    terrno = TSDB_CODE_INVALID_MSG;
×
1781
    return -1;
×
1782
  }
1783

1784
  if (arbHbReq.dnodeId != pMgmt->pData->dnodeId) {
101,659✔
1785
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1786
    dError("dnodeId:%d, %s", arbHbReq.dnodeId, tstrerror(terrno));
×
1787
    goto _OVER;
×
1788
  }
1789

1790
  if (strlen(arbHbReq.arbToken) == 0) {
101,659✔
1791
    terrno = TSDB_CODE_INVALID_MSG;
×
1792
    dError("dnodeId:%d arbToken is empty", arbHbReq.dnodeId);
×
1793
    goto _OVER;
×
1794
  }
1795

1796
  size_t size = taosArrayGetSize(arbHbReq.hbMembers);
101,659✔
1797

1798
  arbHbRsp.dnodeId = pMgmt->pData->dnodeId;
101,659✔
1799
  tstrncpy(arbHbRsp.arbToken, arbHbReq.arbToken, TSDB_ARB_TOKEN_SIZE);
101,659✔
1800
  arbHbRsp.hbMembers = taosArrayInit(size, sizeof(SVArbHbRspMember));
101,659✔
1801
  if (arbHbRsp.hbMembers == NULL) {
101,659✔
1802
    goto _OVER;
×
1803
  }
1804

1805
  for (int32_t i = 0; i < size; i++) {
216,872✔
1806
    SVArbHbReqMember *pReqMember = taosArrayGet(arbHbReq.hbMembers, i);
115,213✔
1807
    SVnodeObj        *pVnode = vmAcquireVnode(pMgmt, pReqMember->vgId);
115,213✔
1808
    if (pVnode == NULL) {
115,213✔
1809
      dError("dnodeId:%d vgId:%d not found failed to process arb hb req", arbHbReq.dnodeId, pReqMember->vgId);
18,928✔
1810
      continue;
18,928✔
1811
    }
1812

1813
    SVArbHbRspMember rspMember = {0};
96,285✔
1814
    rspMember.vgId = pReqMember->vgId;
96,285✔
1815
    rspMember.hbSeq = pReqMember->hbSeq;
96,285✔
1816
    if (vnodeGetArbToken(pVnode->pImpl, rspMember.memberToken) != 0) {
96,285✔
1817
      dError("dnodeId:%d vgId:%d failed to get arb token", arbHbReq.dnodeId, pReqMember->vgId);
×
1818
      vmReleaseVnode(pMgmt, pVnode);
×
1819
      continue;
×
1820
    }
1821

1822
    if (vnodeUpdateArbTerm(pVnode->pImpl, arbHbReq.arbTerm) != 0) {
96,285✔
1823
      dError("dnodeId:%d vgId:%d failed to update arb term", arbHbReq.dnodeId, pReqMember->vgId);
×
1824
      vmReleaseVnode(pMgmt, pVnode);
×
1825
      continue;
×
1826
    }
1827

1828
    if (taosArrayPush(arbHbRsp.hbMembers, &rspMember) == NULL) {
192,570✔
1829
      dError("dnodeId:%d vgId:%d failed to push arb hb rsp member", arbHbReq.dnodeId, pReqMember->vgId);
×
1830
      vmReleaseVnode(pMgmt, pVnode);
×
1831
      goto _OVER;
×
1832
    }
1833

1834
    vmReleaseVnode(pMgmt, pVnode);
96,285✔
1835
  }
1836

1837
  SRpcMsg rspMsg = {.info = pMsg->info};
101,659✔
1838
  int32_t rspLen = tSerializeSVArbHeartBeatRsp(NULL, 0, &arbHbRsp);
101,659✔
1839
  if (rspLen < 0) {
101,659✔
1840
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1841
    goto _OVER;
×
1842
  }
1843

1844
  void *pRsp = rpcMallocCont(rspLen);
101,659✔
1845
  if (pRsp == NULL) {
101,659✔
1846
    terrno = terrno;
×
1847
    goto _OVER;
×
1848
  }
1849

1850
  if (tSerializeSVArbHeartBeatRsp(pRsp, rspLen, &arbHbRsp) <= 0) {
101,659✔
1851
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1852
    rpcFreeCont(pRsp);
×
1853
    goto _OVER;
×
1854
  }
1855
  pMsg->info.rsp = pRsp;
101,659✔
1856
  pMsg->info.rspLen = rspLen;
101,659✔
1857

1858
  terrno = TSDB_CODE_SUCCESS;
101,659✔
1859

1860
_OVER:
101,659✔
1861
  tFreeSVArbHeartBeatReq(&arbHbReq);
101,659✔
1862
  tFreeSVArbHeartBeatRsp(&arbHbRsp);
101,659✔
1863
  return terrno;
101,659✔
1864
}
1865

1866
SArray *vmGetMsgHandles() {
632,822✔
1867
  int32_t code = -1;
632,822✔
1868
  SArray *pArray = taosArrayInit(32, sizeof(SMgmtHandle));
632,822✔
1869
  if (pArray == NULL) goto _OVER;
632,822✔
1870

1871
  if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
632,822✔
1872
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
632,822✔
1873
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
632,822✔
1874
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
632,822✔
1875
  if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
632,822✔
1876
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
632,822✔
1877
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
632,822✔
1878
  if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
632,822✔
1879
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
632,822✔
1880
  if (dmSetMgmtHandle(pArray, TDMT_VND_VSUBTABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
632,822✔
1881
  if (dmSetMgmtHandle(pArray, TDMT_VND_VSTB_REF_DBS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
632,822✔
1882
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_CFG, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
632,822✔
1883
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
632,822✔
1884
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
632,822✔
1885
  if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
632,822✔
1886
  if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
632,822✔
1887
  if (dmSetMgmtHandle(pArray, TDMT_SCH_TASK_NOTIFY, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
632,822✔
1888
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
632,822✔
1889
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
632,822✔
1890
  if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_TTL_EXPIRED_TBS, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
632,822✔
1891
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
632,822✔
1892
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
632,822✔
1893
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
632,822✔
1894
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
632,822✔
1895
  if (dmSetMgmtHandle(pArray, TDMT_VND_SNODE_DROP_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
632,822✔
1896
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
632,822✔
1897
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DELETE_SUB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
632,822✔
1898
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
632,822✔
1899
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SEEK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
632,822✔
1900
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
632,822✔
1901
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_WALINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
632,822✔
1902
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_COMMITTEDINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
632,822✔
1903
  if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
632,822✔
1904
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
632,822✔
1905
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
632,822✔
1906
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
632,822✔
1907
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
632,822✔
1908
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
632,822✔
1909
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_COMPACT_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
632,822✔
1910
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_TRIM_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
632,822✔
1911
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_SCAN_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
632,822✔
1912
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
632,822✔
1913
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_SCAN, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
632,822✔
1914
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
632,822✔
1915
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_NAME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
632,822✔
1916
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
632,822✔
1917
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
632,822✔
1918
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
632,822✔
1919

1920
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
632,822✔
1921
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
632,822✔
1922
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
632,822✔
1923
  if (dmSetMgmtHandle(pArray, TDMT_VND_DISABLE_WRITE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
632,822✔
1924
  if (dmSetMgmtHandle(pArray, TDMT_VND_SET_KEEP_VERSION, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
632,822✔
1925
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
632,822✔
1926
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
632,822✔
1927
  if (dmSetMgmtHandle(pArray, TDMT_VND_SCAN, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
632,822✔
1928
  if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
632,822✔
1929
  if (dmSetMgmtHandle(pArray, TDMT_VND_LIST_SSMIGRATE_FILESETS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
632,822✔
1930
  if (dmSetMgmtHandle(pArray, TDMT_VND_SSMIGRATE_FILESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
632,822✔
1931
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_SSMIGRATE_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
632,822✔
1932
  if (dmSetMgmtHandle(pArray, TDMT_VND_FOLLOWER_SSMIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
632,822✔
1933
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_SSMIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
632,822✔
1934
  if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM_WAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
632,822✔
1935
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
632,822✔
1936
  if (dmSetMgmtHandle(pArray, TDMT_DND_MOUNT_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
632,822✔
1937
  if (dmSetMgmtHandle(pArray, TDMT_DND_RETRIEVE_MOUNT_PATH, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
632,822✔
1938
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
632,822✔
1939
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
632,822✔
1940
  if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
632,822✔
1941
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CONFIG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
632,822✔
1942
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_ELECTBASELINE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
632,822✔
1943

1944
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT_ELECTION, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
632,822✔
1945
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
632,822✔
1946
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
632,822✔
1947
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
632,822✔
1948
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
632,822✔
1949
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
632,822✔
1950
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
632,822✔
1951
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
632,822✔
1952
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
632,822✔
1953
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
632,822✔
1954
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
632,822✔
1955
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_FORCE_FOLLOWER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
632,822✔
1956

1957
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
632,822✔
1958
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
632,822✔
1959
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
632,822✔
1960
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
632,822✔
1961
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
632,822✔
1962

1963
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_HEARTBEAT, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
632,822✔
1964
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_CHECK_SYNC, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
632,822✔
1965
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_ASSIGNED_LEADER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
632,822✔
1966

1967
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_FETCH, vmPutMsgToStreamReaderQueue, 0) == NULL) goto _OVER;
632,822✔
1968
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_PULL, vmPutMsgToStreamReaderQueue, 0) == NULL) goto _OVER;
632,822✔
1969
  if (dmSetMgmtHandle(pArray, TDMT_VND_AUDIT_RECORD, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
632,822✔
1970

1971
  code = 0;
632,822✔
1972

1973
_OVER:
632,822✔
1974
  if (code != 0) {
632,822✔
1975
    taosArrayDestroy(pArray);
×
1976
    return NULL;
×
1977
  } else {
1978
    return pArray;
632,822✔
1979
  }
1980
}
1981

1982
void vmUpdateMetricsInfo(SVnodeMgmt *pMgmt, int64_t clusterId) {
×
1983
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
1984

1985
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
1986
  while (pIter) {
×
1987
    SVnodeObj **ppVnode = pIter;
×
1988
    if (ppVnode == NULL || *ppVnode == NULL) {
×
1989
      continue;
×
1990
    }
1991

1992
    SVnodeObj *pVnode = *ppVnode;
×
1993
    if (!pVnode->failed) {
×
1994
      SRawWriteMetrics metrics = {0};
×
1995
      if (vnodeGetRawWriteMetrics(pVnode->pImpl, &metrics) == 0) {
×
1996
        // Add the metrics to the global metrics system with cluster ID
1997
        SName   name = {0};
×
1998
        int32_t code = tNameFromString(&name, pVnode->pImpl->config.dbname, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1999
        if (code < 0) {
×
2000
          dError("failed to get db name since %s", tstrerror(code));
×
2001
          continue;
×
2002
        }
2003
        code = addWriteMetrics(pVnode->vgId, pMgmt->pData->dnodeId, clusterId, tsLocalEp, name.dbname, &metrics);
×
2004
        if (code != TSDB_CODE_SUCCESS) {
×
2005
          dError("Failed to add write metrics for vgId: %d, code: %d", pVnode->vgId, code);
×
2006
        } else {
2007
          // After successfully adding metrics, reset the vnode's write metrics using atomic operations
2008
          if (vnodeResetRawWriteMetrics(pVnode->pImpl, &metrics) != 0) {
×
2009
            dError("Failed to reset write metrics for vgId: %d", pVnode->vgId);
×
2010
          }
2011
        }
2012
      } else {
2013
        dError("Failed to get write metrics for vgId: %d", pVnode->vgId);
×
2014
      }
2015
    }
2016
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
2017
  }
2018

2019
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
2020
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc