• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4988

16 Mar 2026 12:26PM UTC coverage: 75.821% (+1.9%) from 73.883%
#4988

push

travis-ci

web-flow
feat: support secure delete option. (#34591)

274 of 464 new or added lines in 29 files covered. (59.05%)

4404 existing lines in 23 files now uncovered.

337108 of 444611 relevant lines covered (75.82%)

146708292.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.7
/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "metrics.h"
18
#include "taos_monitor.h"
19
#include "vmInt.h"
20
#include "vnd.h"
21
#include "vnodeInt.h"
22
#include "tencrypt.h"
23

24
extern taos_counter_t *tsInsertCounter;
25

26
// Forward declaration for function defined in metrics.c
27
extern int32_t addWriteMetrics(int32_t vgId, int32_t dnodeId, int64_t clusterId, const char *dnodeEp,
28
                               const char *dbname, const SRawWriteMetrics *pRawMetrics);
29

30
void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
98,661,102✔
31
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad));
98,661,102✔
32
  if (pInfo->pVloads == NULL) return;
98,661,102✔
33

34
  tfsUpdateSize(pMgmt->pTfs);
98,661,102✔
35

36
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
98,661,102✔
37

38
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
98,661,102✔
39
  while (pIter) {
381,387,064✔
40
    SVnodeObj **ppVnode = pIter;
282,725,962✔
41
    if (ppVnode == NULL || *ppVnode == NULL) continue;
282,725,962✔
42

43
    SVnodeObj *pVnode = *ppVnode;
282,725,962✔
44
    SVnodeLoad vload = {.vgId = pVnode->vgId};
282,725,962✔
45
    if (!pVnode->failed) {
282,725,962✔
46
      if (vnodeGetLoad(pVnode->pImpl, &vload) != 0) {
282,725,962✔
47
        dError("failed to get vnode load");
×
48
      }
49
      if (isReset) vnodeResetLoad(pVnode->pImpl, &vload);
282,725,962✔
50
    }
51
    if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
565,451,924✔
52
      dError("failed to push vnode load");
×
53
    }
54
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
282,725,962✔
55
  }
56

57
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
98,661,102✔
58
}
59

60
void vmSetVnodeSyncTimeout(SVnodeMgmt *pMgmt) {
×
61
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
62

63
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
64
  while (pIter) {
×
65
    SVnodeObj **ppVnode = pIter;
×
66
    if (ppVnode == NULL || *ppVnode == NULL) continue;
×
67

68
    SVnodeObj *pVnode = *ppVnode;
×
69

70
    if (vnodeSetSyncTimeout(pVnode->pImpl, tsVnodeElectIntervalMs) != 0) {
×
71
      dError("vgId:%d, failed to vnodeSetSyncTimeout", pVnode->vgId);
×
72
    }
73
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
74
  }
75

76
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
77
}
×
78

79
void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
×
80
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoadLite));
×
81
  if (!pInfo->pVloads) return;
×
82

83
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
84

85
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
86
  while (pIter) {
×
87
    SVnodeObj **ppVnode = pIter;
×
88
    if (ppVnode == NULL || *ppVnode == NULL) continue;
×
89

90
    SVnodeObj *pVnode = *ppVnode;
×
91
    if (!pVnode->failed) {
×
92
      SVnodeLoadLite vload = {0};
×
93
      if (vnodeGetLoadLite(pVnode->pImpl, &vload) == 0) {
×
94
        if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
×
95
          taosArrayDestroy(pInfo->pVloads);
×
96
          pInfo->pVloads = NULL;
×
97
          break;
×
98
        }
99
      }
100
    }
101
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
102
  }
103

104
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
105
}
106

107
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
164✔
108
  SMonVloadInfo vloads = {0};
164✔
109
  vmGetVnodeLoads(pMgmt, &vloads, true);
164✔
110

111
  SArray *pVloads = vloads.pVloads;
164✔
112
  if (pVloads == NULL) return;
164✔
113

114
  int32_t totalVnodes = 0;
164✔
115
  int32_t masterNum = 0;
164✔
116
  int64_t numOfSelectReqs = 0;
164✔
117
  int64_t numOfInsertReqs = 0;
164✔
118
  int64_t numOfInsertSuccessReqs = 0;
164✔
119
  int64_t numOfBatchInsertReqs = 0;
164✔
120
  int64_t numOfBatchInsertSuccessReqs = 0;
164✔
121

122
  for (int32_t i = 0; i < taosArrayGetSize(pVloads); ++i) {
1,148✔
123
    SVnodeLoad *pLoad = taosArrayGet(pVloads, i);
984✔
124
    numOfSelectReqs += pLoad->numOfSelectReqs;
984✔
125
    numOfInsertReqs += pLoad->numOfInsertReqs;
984✔
126
    numOfInsertSuccessReqs += pLoad->numOfInsertSuccessReqs;
984✔
127
    numOfBatchInsertReqs += pLoad->numOfBatchInsertReqs;
984✔
128
    numOfBatchInsertSuccessReqs += pLoad->numOfBatchInsertSuccessReqs;
984✔
129
    if (pLoad->syncState == TAOS_SYNC_STATE_LEADER || pLoad->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
984✔
130
      masterNum++;
984✔
131
    }
132
    totalVnodes++;
984✔
133
  }
134

135
  pInfo->vstat.totalVnodes = totalVnodes;
164✔
136
  pInfo->vstat.masterNum = masterNum;
164✔
137
  pInfo->vstat.numOfSelectReqs = numOfSelectReqs;
164✔
138
  pInfo->vstat.numOfInsertReqs = numOfInsertReqs;                          // delta
164✔
139
  pInfo->vstat.numOfInsertSuccessReqs = numOfInsertSuccessReqs;            // delta
164✔
140
  pInfo->vstat.numOfBatchInsertReqs = numOfBatchInsertReqs;                // delta
164✔
141
  pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;  // delta
164✔
142
  pMgmt->state.totalVnodes = totalVnodes;
164✔
143
  pMgmt->state.masterNum = masterNum;
164✔
144
  pMgmt->state.numOfSelectReqs = numOfSelectReqs;
164✔
145
  pMgmt->state.numOfInsertReqs = numOfInsertReqs;
164✔
146
  pMgmt->state.numOfInsertSuccessReqs = numOfInsertSuccessReqs;
164✔
147
  pMgmt->state.numOfBatchInsertReqs = numOfBatchInsertReqs;
164✔
148
  pMgmt->state.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;
164✔
149

150
  if (tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs) != 0) {
164✔
151
    dError("failed to get tfs monitor info");
×
152
  }
153
  taosArrayDestroy(pVloads);
164✔
154
}
155

156
void vmCleanExpriedSamples(SVnodeMgmt *pMgmt) {
164✔
157
  int list_size = taos_counter_get_keys_size(tsInsertCounter);
164✔
158
  if (list_size == 0) return;
164✔
159
  int32_t *vgroup_ids;
×
160
  char   **keys;
×
161
  int      r = 0;
×
162
  r = taos_counter_get_vgroup_ids(tsInsertCounter, &keys, &vgroup_ids, &list_size);
×
163
  if (r) {
×
164
    dError("failed to get vgroup ids");
×
165
    return;
×
166
  }
167
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
168
  for (int i = 0; i < list_size; i++) {
×
169
    int32_t vgroup_id = vgroup_ids[i];
×
170
    void   *vnode = taosHashGet(pMgmt->runngingHash, &vgroup_id, sizeof(int32_t));
×
171
    if (vnode == NULL) {
×
172
      r = taos_counter_delete(tsInsertCounter, keys[i]);
×
173
      if (r) {
×
174
        dError("failed to delete monitor sample key:%s", keys[i]);
×
175
      }
176
    }
177
  }
178
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
179
  if (vgroup_ids) taosMemoryFree(vgroup_ids);
×
180
  if (keys) taosMemoryFree(keys);
×
181
  return;
×
182
}
183

184
void vmCleanExpiredMetrics(SVnodeMgmt *pMgmt) {
×
185
  if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0 || !tsEnableMetrics) {
×
186
    return;
×
187
  }
188

189
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
190
  void     *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
191
  SHashObj *pValidVgroups = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
×
192
  if (pValidVgroups == NULL) {
×
193
    (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
194
    return;
×
195
  }
196

197
  while (pIter != NULL) {
×
198
    SVnodeObj **ppVnode = pIter;
×
199
    if (ppVnode && *ppVnode) {
×
200
      int32_t vgId = (*ppVnode)->vgId;
×
201
      char    dummy = 1;  // hash table value (we only care about the key)
×
202
      if (taosHashPut(pValidVgroups, &vgId, sizeof(int32_t), &dummy, sizeof(char)) != 0) {
×
203
        dError("failed to put vgId:%d to valid vgroups hash", vgId);
×
204
      }
205
    }
206
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
207
  }
208
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
209

210
  // Clean expired metrics by removing metrics for non-existent vgroups
211
  int32_t code = cleanupExpiredMetrics(pValidVgroups);
×
212
  if (code != TSDB_CODE_SUCCESS) {
×
213
    dError("failed to clean expired metrics, code:%d", code);
×
214
  }
215

216
  taosHashCleanup(pValidVgroups);
×
217
}
218

219
static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
6,221,884✔
220
  memcpy(pCfg, &vnodeCfgDefault, sizeof(SVnodeCfg));
6,221,884✔
221

222
  pCfg->vgId = pCreate->vgId;
6,221,884✔
223
  tstrncpy(pCfg->dbname, pCreate->db, sizeof(pCfg->dbname));
6,219,204✔
224
  pCfg->dbId = pCreate->dbUid;
6,219,650✔
225
  pCfg->szPage = pCreate->pageSize * 1024;
6,218,030✔
226
  pCfg->szCache = pCreate->pages;
6,212,364✔
227
  pCfg->cacheLast = pCreate->cacheLast;
6,214,904✔
228
  pCfg->cacheLastSize = pCreate->cacheLastSize;
6,210,920✔
229
  pCfg->szBuf = (uint64_t)pCreate->buffer * 1024 * 1024;
6,214,870✔
230
  pCfg->isWeak = true;
6,214,160✔
231
  pCfg->isTsma = pCreate->isTsma;
6,214,420✔
232
  pCfg->tsdbCfg.compression = pCreate->compression;
6,213,624✔
233
  pCfg->tsdbCfg.precision = pCreate->precision;
6,211,424✔
234
  pCfg->tsdbCfg.days = pCreate->daysPerFile;
6,214,144✔
235
  pCfg->tsdbCfg.keep0 = pCreate->daysToKeep0;
6,216,960✔
236
  pCfg->tsdbCfg.keep1 = pCreate->daysToKeep1;
6,208,368✔
237
  pCfg->tsdbCfg.keep2 = pCreate->daysToKeep2;
6,213,784✔
238
  pCfg->tsdbCfg.keepTimeOffset = pCreate->keepTimeOffset;
6,210,890✔
239
  pCfg->tsdbCfg.minRows = pCreate->minRows;
6,211,924✔
240
  pCfg->tsdbCfg.maxRows = pCreate->maxRows;
6,211,144✔
241
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
242
  // pCfg->tsdbCfg.encryptAlgr = pCreate->encryptAlgr;
243
  tstrncpy(pCfg->tsdbCfg.encryptData.encryptAlgrName, pCreate->encryptAlgrName, TSDB_ENCRYPT_ALGR_NAME_LEN);
6,202,582✔
244
  if (pCfg->tsdbCfg.encryptAlgr == DND_CA_SM4 || pCfg->tsdbCfg.encryptData.encryptAlgrName[0] != '\0') {
6,221,712✔
245
    tstrncpy(pCfg->tsdbCfg.encryptData.encryptKey, tsDataKey, ENCRYPT_KEY_LEN + 1);
11,678✔
246
  }
247
#else
248
  pCfg->tsdbCfg.encryptAlgr = 0;
249
#endif
250

251
  pCfg->walCfg.vgId = pCreate->vgId;  // pCreate->mountVgId ? pCreate->mountVgId : pCreate->vgId;
6,204,556✔
252
  pCfg->walCfg.fsyncPeriod = pCreate->walFsyncPeriod;
6,200,212✔
253
  pCfg->walCfg.retentionPeriod = pCreate->walRetentionPeriod;
6,199,608✔
254
  pCfg->walCfg.rollPeriod = pCreate->walRollPeriod;
6,207,228✔
255
  pCfg->walCfg.retentionSize = pCreate->walRetentionSize;
6,202,288✔
256
  pCfg->walCfg.segSize = pCreate->walSegmentSize;
6,199,620✔
257
  pCfg->walCfg.level = pCreate->walLevel;
6,207,314✔
258
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
259
  // pCfg->walCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
260
  tstrncpy(pCfg->walCfg.encryptData.encryptAlgrName, pCreate->encryptAlgrName, TSDB_ENCRYPT_ALGR_NAME_LEN);
6,200,060✔
261
  if (pCfg->walCfg.encryptAlgr == DND_CA_SM4 || pCfg->walCfg.encryptData.encryptAlgrName[0] != '\0') {
6,214,200✔
262
    tstrncpy(pCfg->walCfg.encryptData.encryptKey, tsDataKey, ENCRYPT_KEY_LEN + 1);
14,842✔
263
  }
264
#else
265
  pCfg->walCfg.encryptAlgr = 0;
266
#endif
267

268
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
269
  // pCfg->tdbEncryptAlgorithm = pCreate->encryptAlgorithm;
270
  tstrncpy(pCfg->tdbEncryptData.encryptAlgrName, pCreate->encryptAlgrName, TSDB_ENCRYPT_ALGR_NAME_LEN);
6,202,574✔
271
  if (pCfg->tdbEncryptAlgr == DND_CA_SM4 || pCfg->tdbEncryptData.encryptAlgrName[0] != '\0') {
6,212,658✔
272
    tstrncpy(pCfg->tdbEncryptData.encryptKey, tsDataKey, ENCRYPT_KEY_LEN + 1);
18,348✔
273
  }
274
#else
275
  pCfg->tdbEncryptAlgr = 0;
276
#endif
277

278
  pCfg->sttTrigger = pCreate->sstTrigger;
6,201,756✔
279
  pCfg->hashBegin = pCreate->hashBegin;
6,198,012✔
280
  pCfg->hashEnd = pCreate->hashEnd;
6,201,620✔
281
  pCfg->hashMethod = pCreate->hashMethod;
6,201,166✔
282
  pCfg->hashPrefix = pCreate->hashPrefix;
6,197,558✔
283
  pCfg->hashSuffix = pCreate->hashSuffix;
6,200,414✔
284
  pCfg->tsdbPageSize = pCreate->tsdbPageSize * 1024;
6,203,206✔
285

286
  pCfg->ssChunkSize = pCreate->ssChunkSize;
6,203,658✔
287
  pCfg->ssKeepLocal = pCreate->ssKeepLocal;
6,199,676✔
288
  pCfg->ssCompact = pCreate->ssCompact;
6,201,738✔
289

290
  pCfg->isAudit = pCreate->isAudit;
6,194,942✔
291
  pCfg->allowDrop = pCreate->allowDrop;
6,194,410✔
292
  pCfg->secureDelete = pCreate->secureDelete;
6,199,132✔
293

294
  pCfg->standby = 0;
6,199,902✔
295
  pCfg->syncCfg.replicaNum = 0;
6,198,790✔
296
  pCfg->syncCfg.totalReplicaNum = 0;
6,196,016✔
297
  pCfg->syncCfg.changeVersion = pCreate->changeVersion;
6,199,454✔
298

299
  memset(&pCfg->syncCfg.nodeInfo, 0, sizeof(pCfg->syncCfg.nodeInfo));
6,206,202✔
300
  for (int32_t i = 0; i < pCreate->replica; ++i) {
14,484,800✔
301
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
8,279,268✔
302
    pNode->nodeId = pCreate->replicas[pCfg->syncCfg.replicaNum].id;
8,275,790✔
303
    pNode->nodePort = pCreate->replicas[pCfg->syncCfg.replicaNum].port;
8,255,958✔
304
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
8,251,538✔
305
    tstrncpy(pNode->nodeFqdn, pCreate->replicas[pCfg->syncCfg.replicaNum].fqdn, TSDB_FQDN_LEN);
8,264,236✔
306
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
8,273,944✔
307
    pCfg->syncCfg.replicaNum++;
8,282,130✔
308
  }
309
  if (pCreate->selfIndex != -1) {
6,219,206✔
310
    pCfg->syncCfg.myIndex = pCreate->selfIndex;
6,015,970✔
311
  }
312
  for (int32_t i = pCfg->syncCfg.replicaNum; i < pCreate->replica + pCreate->learnerReplica; ++i) {
6,386,444✔
313
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
202,540✔
314
    pNode->nodeId = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].id;
202,540✔
315
    pNode->nodePort = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].port;
202,540✔
316
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
202,540✔
317
    tstrncpy(pNode->nodeFqdn, pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].fqdn, TSDB_FQDN_LEN);
202,540✔
318
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
202,540✔
319
    pCfg->syncCfg.totalReplicaNum++;
202,540✔
320
  }
321
  pCfg->syncCfg.totalReplicaNum += pCfg->syncCfg.replicaNum;
6,196,804✔
322
  if (pCreate->learnerSelfIndex != -1) {
6,193,666✔
323
    pCfg->syncCfg.myIndex = pCreate->replica + pCreate->learnerSelfIndex;
202,540✔
324
  }
325
}
6,195,128✔
326

327
static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) {
6,200,212✔
328
  pCfg->vgId = pCreate->vgId;
6,200,212✔
329
  pCfg->vgVersion = pCreate->vgVersion;
6,198,982✔
330
  pCfg->dropped = 0;
6,184,490✔
331
  snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCreate->vgId);
6,182,688✔
332
}
6,196,082✔
333

334
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
6,192,324✔
335
  SCreateVnodeReq req = {0};
6,192,324✔
336
  SVnodeCfg       vnodeCfg = {0};
6,220,294✔
337
  SWrapperCfg     wrapperCfg = {0};
6,214,826✔
338
  int32_t         code = -1;
6,216,934✔
339
  char            path[TSDB_FILENAME_LEN] = {0};
6,216,934✔
340

341
  if (tDeserializeSCreateVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
6,214,790✔
UNCOV
342
    return TSDB_CODE_INVALID_MSG;
×
343
  }
344

345
  if (req.learnerReplica == 0) {
6,215,192✔
346
    req.learnerSelfIndex = -1;
6,015,526✔
347
  }
348

349
  dInfo(
6,215,192✔
350
      "vgId:%d, vnode management handle msgType:%s, start to create vnode, page:%d pageSize:%d buffer:%d szPage:%d "
351
      "szBuf:%" PRIu64 ", cacheLast:%d cacheLastSize:%d sstTrigger:%d tsdbPageSize:%d %d dbname:%s dbId:%" PRId64
352
      ", days:%d keep0:%d keep1:%d keep2:%d keepTimeOffset%d ssChunkSize:%d ssKeepLocal:%d ssCompact:%d tsma:%d "
353
      "precision:%d compression:%d minRows:%d maxRows:%d"
354
      ", wal fsync:%d level:%d retentionPeriod:%d retentionSize:%" PRId64 " rollPeriod:%d segSize:%" PRId64
355
      ", hash method:%d begin:%u end:%u prefix:%d surfix:%d replica:%d selfIndex:%d "
356
      "learnerReplica:%d learnerSelfIndex:%d strict:%d changeVersion:%d encryptAlgorithm:%d encryptAlgrName:%s, "
357
      "isAudit:%" PRIu8 " allowDrop:%" PRIu8,
358
      req.vgId, TMSG_INFO(pMsg->msgType), req.pages, req.pageSize, req.buffer, req.pageSize * 1024,
359
      (uint64_t)req.buffer * 1024 * 1024, req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize,
360
      req.tsdbPageSize * 1024, req.db, req.dbUid, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
361
      req.keepTimeOffset, req.ssChunkSize, req.ssKeepLocal, req.ssCompact, req.isTsma, req.precision, req.compression,
362
      req.minRows, req.maxRows, req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize,
363
      req.walRollPeriod, req.walSegmentSize, req.hashMethod, req.hashBegin, req.hashEnd, req.hashPrefix, req.hashSuffix,
364
      req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict, req.changeVersion,
365
      req.encryptAlgorithm, req.encryptAlgrName, req.isAudit, req.allowDrop);
366

367
  for (int32_t i = 0; i < req.replica; ++i) {
14,503,756✔
368
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port,
8,283,462✔
369
          req.replicas[i].id);
370
  }
371
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
6,422,834✔
372
    dInfo("vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.vgId, i, req.learnerReplicas[i].fqdn,
202,540✔
373
          req.learnerReplicas[i].port, req.replicas[i].id);
374
  }
375

376
  SReplica *pReplica = NULL;
6,220,294✔
377
  if (req.selfIndex != -1) {
6,220,294✔
378
    pReplica = &req.replicas[req.selfIndex];
6,017,754✔
379
  } else {
380
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
202,540✔
381
  }
382
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
6,220,294✔
383
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
6,220,294✔
384
    (void)tFreeSCreateVnodeReq(&req);
1,694✔
385

UNCOV
386
    code = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
387
    dError("vgId:%d, dnodeId:%d ep:%s:%u in request, ep:%s:%u in local, %s", req.vgId, pReplica->id,
×
388
           pReplica->fqdn, pReplica->port, tsLocalFqdn, tsServerPort, tstrerror(code));
UNCOV
389
    return code;
×
390
  }
391

392
  if (taosWaitCfgKeyLoaded() != 0) {
6,219,404✔
UNCOV
393
    (void)tFreeSCreateVnodeReq(&req);
×
394
    code = terrno;
×
395
    dError("vgId:%d, failed to create vnode since encrypt key is not loaded, reason:%s", req.vgId, tstrerror(code));
×
396
    return code;
×
397
  }
398

399
  if (req.encryptAlgrName[0] != '\0' && strlen(tsDataKey) == 0) {
6,219,404✔
UNCOV
400
    (void)tFreeSCreateVnodeReq(&req);
×
401
    code = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
402
    dError("vgId:%d, failed to create vnode since encrypt key is empty, reason:%s", req.vgId, tstrerror(code));
×
403
    return code;
×
404
  }
405

406
  vmGenerateVnodeCfg(&req, &vnodeCfg);
6,219,404✔
407

408
  vmGenerateWrapperCfg(pMgmt, &req, &wrapperCfg);
6,195,314✔
409

410
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, req.vgId, false);
6,214,304✔
411
  if (pVnode != NULL && (req.replica == 1 || !pVnode->failed)) {
6,215,970✔
412
    dError("vgId:%d, already exist", req.vgId);
36,610✔
413
    (void)tFreeSCreateVnodeReq(&req);
36,610✔
414
    vmReleaseVnode(pMgmt, pVnode);
36,610✔
415
    code = TSDB_CODE_VND_ALREADY_EXIST;
36,610✔
416
    return 0;
36,610✔
417
  }
418

419
  int32_t diskPrimary = vmGetPrimaryDisk(pMgmt, vnodeCfg.vgId);
6,179,360✔
420
  if (diskPrimary < 0) {
6,173,548✔
421
    diskPrimary = vmAllocPrimaryDisk(pMgmt, vnodeCfg.vgId);
6,175,338✔
422
  }
423
  wrapperCfg.diskPrimary = diskPrimary;
6,181,894✔
424

425
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
6,181,894✔
426

427
  if ((code = vnodeCreate(path, &vnodeCfg, diskPrimary, pMgmt->pTfs)) < 0) {
6,181,894✔
UNCOV
428
    dError("vgId:%d, failed to create vnode since %s", req.vgId, tstrerror(code));
×
429
    vmReleaseVnode(pMgmt, pVnode);
×
430
    vmCleanPrimaryDisk(pMgmt, req.vgId);
×
431
    (void)tFreeSCreateVnodeReq(&req);
×
432
    return code;
×
433
  }
434

435
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, true);
6,183,684✔
436
  if (pImpl == NULL) {
6,182,176✔
UNCOV
437
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
438
    code = terrno != 0 ? terrno : -1;
×
439
    goto _OVER;
×
440
  }
441

442
  code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl);
6,182,176✔
443
  if (code != 0) {
6,183,684✔
UNCOV
444
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
445
    code = terrno != 0 ? terrno : code;
×
446
    goto _OVER;
×
447
  }
448

449
  code = vnodeStart(pImpl);
6,183,684✔
450
  if (code != 0) {
6,183,684✔
UNCOV
451
    dError("vgId:%d, failed to start sync since %s", req.vgId, terrstr());
×
452
    goto _OVER;
×
453
  }
454

455
  code = vmWriteVnodeListToFile(pMgmt);
6,183,684✔
456
  if (code != 0) {
6,183,684✔
UNCOV
457
    code = terrno != 0 ? terrno : code;
×
458
    goto _OVER;
×
459
  }
460

461
_OVER:
6,183,684✔
462
  vmCleanPrimaryDisk(pMgmt, req.vgId);
6,183,684✔
463

464
  if (code != 0) {
6,183,684✔
UNCOV
465
    vmCloseFailedVnode(pMgmt, req.vgId);
×
466

UNCOV
467
    vnodeClose(pImpl);
×
468
    vnodeDestroy(0, path, pMgmt->pTfs, 0);
×
469
  } else {
470
    dInfo("vgId:%d, vnode management handle msgType:%s, end to create vnode, vnode is created", req.vgId,
6,183,684✔
471
          TMSG_INFO(pMsg->msgType));
472
  }
473

474
  (void)tFreeSCreateVnodeReq(&req);
6,183,684✔
475
  terrno = code;
6,183,684✔
476
  return code;
6,183,684✔
477
}
478

479
#ifdef USE_MOUNT
480
typedef struct {
481
  int64_t dbId;
482
  int32_t vgId;
483
  int32_t diskPrimary;
484
} SMountDbVgId;
485
extern int32_t vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo);
486
extern int32_t mndFetchSdbStables(const char *mntName, const char *path, void *output);
487

488
static int compareVnodeInfo(const void *p1, const void *p2) {
6,524✔
489
  SVnodeInfo *v1 = (SVnodeInfo *)p1;
6,524✔
490
  SVnodeInfo *v2 = (SVnodeInfo *)p2;
6,524✔
491

492
  if (v1->config.dbId == v2->config.dbId) {
6,524✔
493
    if (v1->config.vgId == v2->config.vgId) {
3,728✔
UNCOV
494
      return 0;
×
495
    }
496
    return v1->config.vgId > v2->config.vgId ? 1 : -1;
3,728✔
497
  }
498

499
  return v1->config.dbId > v2->config.dbId ? 1 : -1;
2,796✔
500
}
501
static int compareVgDiskPrimary(const void *p1, const void *p2) {
6,524✔
502
  SMountDbVgId *v1 = (SMountDbVgId *)p1;
6,524✔
503
  SMountDbVgId *v2 = (SMountDbVgId *)p2;
6,524✔
504

505
  if (v1->dbId == v2->dbId) {
6,524✔
506
    if (v1->vgId == v2->vgId) {
3,728✔
UNCOV
507
      return 0;
×
508
    }
509
    return v1->vgId > v2->vgId ? 1 : -1;
3,728✔
510
  }
511

512
  return v1->dbId > v2->dbId ? 1 : -1;
2,796✔
513
}
514

515
static int32_t vmRetrieveMountDnode(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
932✔
516
  int32_t   code = 0, lino = 0;
932✔
517
  TdFilePtr pFile = NULL;
932✔
518
  char     *content = NULL;
932✔
519
  SJson    *pJson = NULL;
932✔
520
  int64_t   size = 0;
932✔
521
  int64_t   clusterId = 0, dropped = 0, encryptScope = 0;
932✔
522
  char      file[TSDB_MOUNT_FPATH_LEN] = {0};
932✔
523
  SArray   *pDisks = NULL;
932✔
524
  // step 1: fetch clusterId from dnode.json
525
  (void)snprintf(file, sizeof(file), "%s%s%s%sdnode.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP);
932✔
526
  TAOS_CHECK_EXIT(taosStatFile(file, &size, NULL, NULL));
932✔
527
  TSDB_CHECK_NULL((pFile = taosOpenFile(file, TD_FILE_READ)), code, lino, _exit, terrno);
932✔
528
  TSDB_CHECK_NULL((content = taosMemoryMalloc(size + 1)), code, lino, _exit, terrno);
932✔
529
  if (taosReadFile(pFile, content, size) != size) {
932✔
UNCOV
530
    TAOS_CHECK_EXIT(terrno);
×
531
  }
532
  content[size] = '\0';
932✔
533
  pJson = tjsonParse(content);
932✔
534
  if (pJson == NULL) {
932✔
UNCOV
535
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
536
  }
537
  tjsonGetNumberValue(pJson, "dropped", dropped, code);
932✔
538
  TAOS_CHECK_EXIT(code);
932✔
539
  if (dropped == 1) {
932✔
UNCOV
540
    TAOS_CHECK_EXIT(TSDB_CODE_MND_MOUNT_DNODE_DROPPED);
×
541
  }
542
  tjsonGetNumberValue(pJson, "encryptScope", encryptScope, code);
932✔
543
  TAOS_CHECK_EXIT(code);
932✔
544
  if (encryptScope != 0) {
932✔
UNCOV
545
    TAOS_CHECK_EXIT(TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG);
×
546
  }
547
  tjsonGetNumberValue(pJson, "clusterId", clusterId, code);
932✔
548
  TAOS_CHECK_EXIT(code);
932✔
549
  if (clusterId == 0) {
932✔
UNCOV
550
    TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_CLUSTER_ID);
×
551
  }
552
  pMountInfo->clusterId = clusterId;
932✔
553
  if (content != NULL) taosMemoryFreeClear(content);
932✔
554
  if (pJson != NULL) {
932✔
555
    cJSON_Delete(pJson);
932✔
556
    pJson = NULL;
932✔
557
  }
558
  if (pFile != NULL) taosCloseFile(&pFile);
932✔
559
  // step 2: fetch dataDir from dnode/config/local.json
560
  TAOS_CHECK_EXIT(vmGetMountDisks(pMgmt, pReq->mountPath, &pDisks));
932✔
561
  int32_t nDisks = taosArrayGetSize(pDisks);
932✔
562
  if (nDisks < 1 || nDisks > TFS_MAX_DISKS) {
932✔
UNCOV
563
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
564
  }
565
  for (int32_t i = 0; i < nDisks; ++i) {
6,524✔
566
    SDiskCfg *pDisk = TARRAY_GET_ELEM(pDisks, i);
5,592✔
567
    if (!pMountInfo->pDisks[pDisk->level]) {
5,592✔
568
      pMountInfo->pDisks[pDisk->level] = taosArrayInit(1, sizeof(char *));
1,864✔
569
      if (!pMountInfo->pDisks[pDisk->level]) {
1,864✔
UNCOV
570
        TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
571
      }
572
    }
573
    char *pDir = taosStrdup(pDisk->dir);
5,592✔
574
    if (pDir == NULL) {
5,592✔
UNCOV
575
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
576
    }
577
    if (pDisk->primary == 1 && taosArrayGetSize(pMountInfo->pDisks[0])) {
5,592✔
578
      // put the primary disk to the first position of level 0
579
      if (!taosArrayInsert(pMountInfo->pDisks[0], 0, &pDir)) {
932✔
UNCOV
580
        taosMemFree(pDir);
×
581
        TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
582
      }
583
    } else if (!taosArrayPush(pMountInfo->pDisks[pDisk->level], &pDir)) {
9,320✔
UNCOV
584
      taosMemFree(pDir);
×
585
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
586
    }
587
  }
588
  for (int32_t i = 0; i < TFS_MAX_TIERS; ++i) {
3,728✔
589
    int32_t nDisk = taosArrayGetSize(pMountInfo->pDisks[i]);
2,796✔
590
    if (nDisk < (i == 0 ? 1 : 0) || nDisk > TFS_MAX_DISKS_PER_TIER) {
2,796✔
UNCOV
591
      dError("mount:%s, invalid disk number:%d at level:%d", pReq->mountName, nDisk, i);
×
592
      TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
593
    }
594
  }
595
_exit:
932✔
596
  if (content != NULL) taosMemoryFreeClear(content);
932✔
597
  if (pJson != NULL) cJSON_Delete(pJson);
932✔
598
  if (pFile != NULL) taosCloseFile(&pFile);
932✔
599
  if (pDisks != NULL) {
932✔
600
    taosArrayDestroy(pDisks);
932✔
601
    pDisks = NULL;
932✔
602
  }
603
  if (code != 0) {
932✔
UNCOV
604
    dError("mount:%s, failed to retrieve mount dnode at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
605
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
606
  } else {
607
    dInfo("mount:%s, success to retrieve mount dnode on dnode:%d, clusterId:%" PRId64 ", path:%s", pReq->mountName,
932✔
608
          pReq->dnodeId, pMountInfo->clusterId, pReq->mountPath);
609
  }
610
  TAOS_RETURN(code);
932✔
611
}
612

613
static int32_t vmRetrieveMountVnodes(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
932✔
614
  int32_t       code = 0, lino = 0;
932✔
615
  SWrapperCfg  *pCfgs = NULL;
932✔
616
  int32_t       numOfVnodes = 0;
932✔
617
  char          path[TSDB_MOUNT_FPATH_LEN] = {0};
932✔
618
  TdDirPtr      pDir = NULL;
932✔
619
  TdDirEntryPtr de = NULL;
932✔
620
  SVnodeMgmt    vnodeMgmt = {0};
932✔
621
  SArray       *pVgCfgs = NULL;
932✔
622
  SArray       *pDbInfos = NULL;
932✔
623
  SArray       *pDiskPrimarys = NULL;
932✔
624

625
  snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
932✔
626
  vnodeMgmt.path = path;
932✔
627
  TAOS_CHECK_EXIT(vmGetVnodeListFromFile(&vnodeMgmt, &pCfgs, &numOfVnodes));
932✔
628
  dInfo("mount:%s, num of vnodes is %d in path:%s", pReq->mountName, numOfVnodes, vnodeMgmt.path);
932✔
629
  TSDB_CHECK_NULL((pVgCfgs = taosArrayInit_s(sizeof(SVnodeInfo), numOfVnodes)), code, lino, _exit, terrno);
932✔
630
  TSDB_CHECK_NULL((pDiskPrimarys = taosArrayInit(numOfVnodes, sizeof(SMountDbVgId))), code, lino, _exit, terrno);
932✔
631

632
  int32_t nDiskLevel0 = taosArrayGetSize(pMountInfo->pDisks[0]);
932✔
633
  int32_t nVgDropped = 0, j = 0;
932✔
634
  for (int32_t i = 0; i < numOfVnodes; ++i) {
4,660✔
635
    SWrapperCfg *pCfg = &pCfgs[i];
3,728✔
636
    // in order to support multi-tier disk, the pCfg->path should be adapted according to the diskPrimary firstly
637
    if (nDiskLevel0 > 1) {
3,728✔
638
      char *pDir = taosArrayGet(pMountInfo->pDisks[0], pCfg->diskPrimary);
3,728✔
639
      if (!pDir) TAOS_CHECK_EXIT(TSDB_CODE_INTERNAL_ERROR);
3,728✔
640
      (void)snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%svnode%d", *(char **)pDir, TD_DIRSEP, TD_DIRSEP,
3,728✔
641
                     pCfg->vgId);
642
    }
643
    dInfo("mount:%s, vnode path:%s, dropped:%" PRIi8, pReq->mountName, pCfg->path, pCfg->dropped);
3,728✔
644
    if (pCfg->dropped) {
3,728✔
UNCOV
645
      ++nVgDropped;
×
646
      continue;
×
647
    }
648
    if (!taosCheckAccessFile(pCfg->path, TD_FILE_ACCESS_EXIST_OK | TD_FILE_ACCESS_READ_OK | TD_FILE_ACCESS_WRITE_OK)) {
3,728✔
UNCOV
649
      dError("mount:%s, vnode path:%s, no r/w authority", pReq->mountName, pCfg->path);
×
650
      TAOS_CHECK_EXIT(TSDB_CODE_MND_NO_RIGHTS);
×
651
    }
652
    SVnodeInfo *pInfo = TARRAY_GET_ELEM(pVgCfgs, j++);
3,728✔
653
    TAOS_CHECK_EXIT(vnodeLoadInfo(pCfg->path, pInfo));
3,728✔
654
    if (pInfo->config.syncCfg.replicaNum > 1) {
3,728✔
UNCOV
655
      dError("mount:%s, vnode path:%s, invalid replica:%d", pReq->mountName, pCfg->path,
×
656
             pInfo->config.syncCfg.replicaNum);
UNCOV
657
      TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_REPLICA);
×
658
    } else if (pInfo->config.vgId != pCfg->vgId) {
3,728✔
UNCOV
659
      dError("mount:%s, vnode path:%s, vgId:%d not match:%d", pReq->mountName, pCfg->path, pInfo->config.vgId,
×
660
             pCfg->vgId);
UNCOV
661
      TAOS_CHECK_EXIT(TSDB_CODE_FILE_CORRUPTED);
×
662
    } else if (pInfo->config.tdbEncryptData.encryptAlgrName[0] != '\0' ||
3,728✔
663
               pInfo->config.tsdbCfg.encryptData.encryptAlgrName[0] != '\0' ||
3,728✔
664
               pInfo->config.walCfg.encryptData.encryptAlgrName[0] != '\0') {
3,728✔
UNCOV
665
      dError("mount:%s, vnode path:%s, invalid encrypt algorithm, tdb:%s wal:%s tsdb:%s", pReq->mountName, pCfg->path,
×
666
             pInfo->config.tdbEncryptData.encryptAlgrName, pInfo->config.walCfg.encryptData.encryptAlgrName,
667
             pInfo->config.tsdbCfg.encryptData.encryptAlgrName);
UNCOV
668
      TAOS_CHECK_EXIT(TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG);
×
669
    }
670
    SMountDbVgId dbVgId = {.dbId = pInfo->config.dbId, .vgId = pInfo->config.vgId, .diskPrimary = pCfg->diskPrimary};
3,728✔
671
    TSDB_CHECK_NULL(taosArrayPush(pDiskPrimarys, &dbVgId), code, lino, _exit, terrno);
3,728✔
672
  }
673
  if (nVgDropped > 0) {
932✔
UNCOV
674
    dInfo("mount:%s, %d vnodes are dropped", pReq->mountName, nVgDropped);
×
675
    int32_t nVgToDrop = taosArrayGetSize(pVgCfgs) - nVgDropped;
×
676
    if (nVgToDrop > 0) taosArrayRemoveBatch(pVgCfgs, nVgToDrop - 1, nVgToDrop, NULL);
×
677
  }
678
  int32_t nVgCfg = taosArrayGetSize(pVgCfgs);
932✔
679
  int32_t nDiskPrimary = taosArrayGetSize(pDiskPrimarys);
932✔
680
  if (nVgCfg != nDiskPrimary) {
932✔
UNCOV
681
    dError("mount:%s, nVgCfg:%d not match nDiskPrimary:%d", pReq->mountName, nVgCfg, nDiskPrimary);
×
682
    TAOS_CHECK_EXIT(TSDB_CODE_APP_ERROR);
×
683
  }
684
  if (nVgCfg > 1) {
932✔
685
    taosArraySort(pVgCfgs, compareVnodeInfo);
932✔
686
    taosArraySort(pDiskPrimarys, compareVgDiskPrimary);
932✔
687
  }
688

689
  int64_t clusterId = pMountInfo->clusterId;
932✔
690
  int64_t dbId = 0, vgId = 0, nDb = 0;
932✔
691
  for (int32_t i = 0; i < nVgCfg; ++i) {
3,412✔
692
    SVnodeInfo *pInfo = TARRAY_GET_ELEM(pVgCfgs, i);
2,792✔
693
    if (clusterId != pInfo->config.syncCfg.nodeInfo->clusterId) {
2,792✔
694
      dError("mount:%s, clusterId:%" PRId64 " not match:%" PRId64, pReq->mountName, clusterId,
312✔
695
             pInfo->config.syncCfg.nodeInfo->clusterId);
696
      TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_CLUSTER_ID);
312✔
697
    }
698
    if (dbId != pInfo->config.dbId) {
2,480✔
699
      dbId = pInfo->config.dbId;
1,240✔
700
      ++nDb;
1,240✔
701
    }
702
    if (vgId == pInfo->config.vgId) {
2,480✔
UNCOV
703
      TAOS_CHECK_EXIT(TSDB_CODE_FILE_CORRUPTED);
×
704
    } else {
705
      vgId = pInfo->config.vgId;
2,480✔
706
    }
707
  }
708

709
  if (nDb > 0) {
620✔
710
    TSDB_CHECK_NULL((pDbInfos = taosArrayInit_s(sizeof(SMountDbInfo), nDb)), code, lino, _exit, terrno);
620✔
711
    int32_t dbIdx = -1;
620✔
712
    for (int32_t i = 0; i < nVgCfg; ++i) {
3,100✔
713
      SVnodeInfo   *pVgCfg = TARRAY_GET_ELEM(pVgCfgs, i);
2,480✔
714
      SMountDbVgId *pDiskPrimary = TARRAY_GET_ELEM(pDiskPrimarys, i);
2,480✔
715
      SMountDbInfo *pDbInfo = NULL;
2,480✔
716
      if (i == 0 || ((SMountDbInfo *)TARRAY_GET_ELEM(pDbInfos, dbIdx))->dbId != pVgCfg->config.dbId) {
2,480✔
717
        pDbInfo = TARRAY_GET_ELEM(pDbInfos, ++dbIdx);
1,240✔
718
        pDbInfo->dbId = pVgCfg->config.dbId;
1,240✔
719
        snprintf(pDbInfo->dbName, sizeof(pDbInfo->dbName), "%s", pVgCfg->config.dbname);
1,240✔
720
        TSDB_CHECK_NULL((pDbInfo->pVgs = taosArrayInit(nVgCfg / nDb, sizeof(SMountVgInfo))), code, lino, _exit, terrno);
1,240✔
721
      } else {
722
        pDbInfo = TARRAY_GET_ELEM(pDbInfos, dbIdx);
1,240✔
723
      }
724
      SMountVgInfo vgInfo = {
2,480✔
725
          .diskPrimary = pDiskPrimary->diskPrimary,
2,480✔
726
          .vgId = pVgCfg->config.vgId,
2,480✔
727
          .dbId = pVgCfg->config.dbId,
2,480✔
728
          .cacheLastSize = pVgCfg->config.cacheLastSize,
2,480✔
729
          .szPage = pVgCfg->config.szPage,
2,480✔
730
          .szCache = pVgCfg->config.szCache,
2,480✔
731
          .szBuf = pVgCfg->config.szBuf,
2,480✔
732
          .cacheLast = pVgCfg->config.cacheLast,
2,480✔
733
          .standby = pVgCfg->config.standby,
2,480✔
734
          .hashMethod = pVgCfg->config.hashMethod,
2,480✔
735
          .hashBegin = pVgCfg->config.hashBegin,
2,480✔
736
          .hashEnd = pVgCfg->config.hashEnd,
2,480✔
737
          .hashPrefix = pVgCfg->config.hashPrefix,
2,480✔
738
          .hashSuffix = pVgCfg->config.hashSuffix,
2,480✔
739
          .sttTrigger = pVgCfg->config.sttTrigger,
2,480✔
740
          .replications = pVgCfg->config.syncCfg.replicaNum,
2,480✔
741
          .precision = pVgCfg->config.tsdbCfg.precision,
2,480✔
742
          .compression = pVgCfg->config.tsdbCfg.compression,
2,480✔
743
          .slLevel = pVgCfg->config.tsdbCfg.slLevel,
2,480✔
744
          .daysPerFile = pVgCfg->config.tsdbCfg.days,
2,480✔
745
          .keep0 = pVgCfg->config.tsdbCfg.keep0,
2,480✔
746
          .keep1 = pVgCfg->config.tsdbCfg.keep1,
2,480✔
747
          .keep2 = pVgCfg->config.tsdbCfg.keep2,
2,480✔
748
          .keepTimeOffset = pVgCfg->config.tsdbCfg.keepTimeOffset,
2,480✔
749
          .minRows = pVgCfg->config.tsdbCfg.minRows,
2,480✔
750
          .maxRows = pVgCfg->config.tsdbCfg.maxRows,
2,480✔
751
          .tsdbPageSize = pVgCfg->config.tsdbPageSize / 1024,
2,480✔
752
          .ssChunkSize = pVgCfg->config.ssChunkSize,
2,480✔
753
          .ssKeepLocal = pVgCfg->config.ssKeepLocal,
2,480✔
754
          .ssCompact = pVgCfg->config.ssCompact,
2,480✔
755
          .walFsyncPeriod = pVgCfg->config.walCfg.fsyncPeriod,
2,480✔
756
          .walRetentionPeriod = pVgCfg->config.walCfg.retentionPeriod,
2,480✔
757
          .walRollPeriod = pVgCfg->config.walCfg.rollPeriod,
2,480✔
758
          .walRetentionSize = pVgCfg->config.walCfg.retentionSize,
2,480✔
759
          .walSegSize = pVgCfg->config.walCfg.segSize,
2,480✔
760
          .walLevel = pVgCfg->config.walCfg.level,
2,480✔
761
          .isAudit = pVgCfg->config.isAudit,
2,480✔
762
          .allowDrop = pVgCfg->config.allowDrop,
2,480✔
763
          .secureDelete = pVgCfg->config.secureDelete,
2,480✔
764
          //.encryptAlgorithm = pVgCfg->config.walCfg.encryptAlgorithm,
765
          .committed = pVgCfg->state.committed,
2,480✔
766
          .commitID = pVgCfg->state.commitID,
2,480✔
767
          .commitTerm = pVgCfg->state.commitTerm,
2,480✔
768
          .numOfSTables = pVgCfg->config.vndStats.numOfSTables,
2,480✔
769
          .numOfCTables = pVgCfg->config.vndStats.numOfCTables,
2,480✔
770
          .numOfNTables = pVgCfg->config.vndStats.numOfNTables,
2,480✔
771
      };
772
      TSDB_CHECK_NULL(taosArrayPush(pDbInfo->pVgs, &vgInfo), code, lino, _exit, terrno);
4,960✔
773
    }
774
  }
775

776
  pMountInfo->pDbs = pDbInfos;
620✔
777

778
_exit:
932✔
779
  if (code != 0) {
932✔
780
    dError("mount:%s, failed to retrieve mount vnode at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
312✔
781
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
782
  }
783
  taosArrayDestroy(pDiskPrimarys);
932✔
784
  taosArrayDestroy(pVgCfgs);
932✔
785
  taosMemoryFreeClear(pCfgs);
932✔
786
  TAOS_RETURN(code);
932✔
787
}
788

789
/**
790
 *   Retrieve the stables from vnode meta.
791
 */
792
static int32_t vmRetrieveMountStbs(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
620✔
793
  int32_t code = 0, lino = 0;
620✔
794
  char    path[TSDB_MOUNT_FPATH_LEN] = {0};
620✔
795
  int32_t nDb = taosArrayGetSize(pMountInfo->pDbs);
620✔
796
  SArray *suidList = NULL;
620✔
797
  SArray *pCols = NULL;
620✔
798
  SArray *pTags = NULL;
620✔
799
  SArray *pColExts = NULL;
620✔
800
  SArray *pTagExts = NULL;
620✔
801

802
  snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
620✔
803
  for (int32_t i = 0; i < nDb; ++i) {
1,860✔
804
    SMountDbInfo *pDbInfo = TARRAY_GET_ELEM(pMountInfo->pDbs, i);
1,240✔
805
    int32_t       nVg = taosArrayGetSize(pDbInfo->pVgs);
1,240✔
806
    for (int32_t j = 0; j < nVg; ++j) {
1,240✔
807
      SMountVgInfo *pVgInfo = TARRAY_GET_ELEM(pDbInfo->pVgs, j);
1,240✔
808
      SVnode        vnode = {
1,240✔
809
                 .config.vgId = pVgInfo->vgId,
1,240✔
810
                 .config.dbId = pVgInfo->dbId,
1,240✔
811
                 .config.cacheLastSize = pVgInfo->cacheLastSize,
1,240✔
812
                 .config.szPage = pVgInfo->szPage,
1,240✔
813
                 .config.szCache = pVgInfo->szCache,
1,240✔
814
                 .config.szBuf = pVgInfo->szBuf,
1,240✔
815
                 .config.cacheLast = pVgInfo->cacheLast,
1,240✔
816
                 .config.standby = pVgInfo->standby,
1,240✔
817
                 .config.hashMethod = pVgInfo->hashMethod,
1,240✔
818
                 .config.hashBegin = pVgInfo->hashBegin,
1,240✔
819
                 .config.hashEnd = pVgInfo->hashEnd,
1,240✔
820
                 .config.hashPrefix = pVgInfo->hashPrefix,
1,240✔
821
                 .config.hashSuffix = pVgInfo->hashSuffix,
1,240✔
822
                 .config.sttTrigger = pVgInfo->sttTrigger,
1,240✔
823
                 .config.syncCfg.replicaNum = pVgInfo->replications,
1,240✔
824
                 .config.tsdbCfg.precision = pVgInfo->precision,
1,240✔
825
                 .config.tsdbCfg.compression = pVgInfo->compression,
1,240✔
826
                 .config.tsdbCfg.slLevel = pVgInfo->slLevel,
1,240✔
827
                 .config.tsdbCfg.days = pVgInfo->daysPerFile,
1,240✔
828
                 .config.tsdbCfg.keep0 = pVgInfo->keep0,
1,240✔
829
                 .config.tsdbCfg.keep1 = pVgInfo->keep1,
1,240✔
830
                 .config.tsdbCfg.keep2 = pVgInfo->keep2,
1,240✔
831
                 .config.tsdbCfg.keepTimeOffset = pVgInfo->keepTimeOffset,
1,240✔
832
                 .config.tsdbCfg.minRows = pVgInfo->minRows,
1,240✔
833
                 .config.tsdbCfg.maxRows = pVgInfo->maxRows,
1,240✔
834
                 .config.tsdbPageSize = pVgInfo->tsdbPageSize,
1,240✔
835
                 .config.ssChunkSize = pVgInfo->ssChunkSize,
1,240✔
836
                 .config.ssKeepLocal = pVgInfo->ssKeepLocal,
1,240✔
837
                 .config.ssCompact = pVgInfo->ssCompact,
1,240✔
838
                 .config.isAudit = pVgInfo->isAudit,
1,240✔
839
                 .config.allowDrop = pVgInfo->allowDrop,
1,240✔
840
                 .config.secureDelete = pVgInfo->secureDelete,
1,240✔
841
                 .config.walCfg.fsyncPeriod = pVgInfo->walFsyncPeriod,
1,240✔
842
                 .config.walCfg.retentionPeriod = pVgInfo->walRetentionPeriod,
1,240✔
843
                 .config.walCfg.rollPeriod = pVgInfo->walRollPeriod,
1,240✔
844
                 .config.walCfg.retentionSize = pVgInfo->walRetentionSize,
1,240✔
845
                 .config.walCfg.segSize = pVgInfo->walSegSize,
1,240✔
846
                 .config.walCfg.level = pVgInfo->walLevel,
1,240✔
847
          //.config.walCfg.encryptAlgorithm = pVgInfo->encryptAlgorithm,
848
                 .diskPrimary = pVgInfo->diskPrimary,
1,240✔
849
      };
850
      void *vnodePath = taosArrayGet(pMountInfo->pDisks[0], pVgInfo->diskPrimary);
1,240✔
851
      snprintf(path, sizeof(path), "%s%s%s%svnode%d", *(char **)vnodePath, TD_DIRSEP, dmNodeName(VNODE), TD_DIRSEP,
1,240✔
852
               pVgInfo->vgId);
853
      vnode.path = path;
1,240✔
854

855
      int32_t rollback = vnodeShouldRollback(&vnode);
1,240✔
856
      if ((code = metaOpen(&vnode, &vnode.pMeta, rollback)) != 0) {
1,240✔
UNCOV
857
        dError("mount:%s, failed to retrieve stbs of vnode:%d for db:%" PRId64 " on dnode:%d since %s, path:%s",
×
858
               pReq->mountName, pVgInfo->vgId, pVgInfo->dbId, pReq->dnodeId, tstrerror(code), path);
UNCOV
859
        TAOS_CHECK_EXIT(code);
×
860
      } else {
861
        dInfo("mount:%s, success to retrieve stbs of vnode:%d for db:%" PRId64 " on dnode:%d, path:%s", pReq->mountName,
1,240✔
862
              pVgInfo->vgId, pVgInfo->dbId, pReq->dnodeId, path);
863

864
        SMetaReader mr = {0};
1,240✔
865
        tb_uid_t    suid = 0;
1,240✔
866
        SMeta      *pMeta = vnode.pMeta;
1,240✔
867

868
        metaReaderDoInit(&mr, pMeta, META_READER_LOCK);
1,240✔
869
        if (!suidList && !(suidList = taosArrayInit(1, sizeof(tb_uid_t)))) {
1,240✔
UNCOV
870
          TSDB_CHECK_CODE(terrno, lino, _exit0);
×
871
        }
872
        taosArrayClear(suidList);
1,240✔
873
        TSDB_CHECK_CODE(vnodeGetStbIdList(&vnode, 0, suidList), lino, _exit0);
1,240✔
874
        dInfo("mount:%s, vnode:%d, db:%" PRId64 ", stbs num:%d on dnode:%d", pReq->mountName, pVgInfo->vgId,
1,240✔
875
              pVgInfo->dbId, (int32_t)taosArrayGetSize(suidList), pReq->dnodeId);
876
        int32_t nStbs = taosArrayGetSize(suidList);
1,240✔
877
        if (!pDbInfo->pStbs && !(pDbInfo->pStbs = taosArrayInit(nStbs, sizeof(void *)))) {
1,240✔
UNCOV
878
          TSDB_CHECK_CODE(terrno, lino, _exit0);
×
879
        }
880
        for (int32_t i = 0; i < nStbs; ++i) {
4,960✔
881
          suid = *(tb_uid_t *)taosArrayGet(suidList, i);
3,720✔
882
          dInfo("mount:%s, vnode:%d, db:%" PRId64 ", stb suid:%" PRIu64 " on dnode:%d", pReq->mountName, pVgInfo->vgId,
3,720✔
883
                pVgInfo->dbId, suid, pReq->dnodeId);
884
          if ((code = metaReaderGetTableEntryByUidCache(&mr, suid)) < 0) {
3,720✔
UNCOV
885
            TSDB_CHECK_CODE(code, lino, _exit0);
×
886
          }
887
          if (mr.me.uid != suid || mr.me.type != TSDB_SUPER_TABLE ||
3,720✔
888
              mr.me.colCmpr.nCols != mr.me.stbEntry.schemaRow.nCols) {
3,720✔
UNCOV
889
            dError("mount:%s, vnode:%d, db:%" PRId64 ", stb info not match, suid:%" PRIu64 " expected:%" PRIu64
×
890
                   ", type:%" PRIi8 " expected:%d, nCmprCols:%d nCols:%d on dnode:%d",
891
                   pReq->mountName, pVgInfo->vgId, pVgInfo->dbId, mr.me.uid, suid, mr.me.type, TSDB_SUPER_TABLE,
892
                   mr.me.colCmpr.nCols, mr.me.stbEntry.schemaRow.nCols, pReq->dnodeId);
UNCOV
893
            TSDB_CHECK_CODE(TSDB_CODE_FILE_CORRUPTED, lino, _exit0);
×
894
          }
895
          SMountStbInfo stbInfo = {
3,720✔
896
              .req.source = TD_REQ_FROM_APP,
897
              .req.suid = suid,
898
              .req.colVer = mr.me.stbEntry.schemaRow.version,
3,720✔
899
              .req.tagVer = mr.me.stbEntry.schemaTag.version,
3,720✔
900
              .req.numOfColumns = mr.me.stbEntry.schemaRow.nCols,
3,720✔
901
              .req.numOfTags = mr.me.stbEntry.schemaTag.nCols,
3,720✔
902
              .req.virtualStb = TABLE_IS_VIRTUAL(mr.me.flags) ? 1 : 0,
3,720✔
903
          };
904
          snprintf(stbInfo.req.name, sizeof(stbInfo.req.name), "%s", mr.me.name);
3,720✔
905
          if (!pCols && !(pCols = taosArrayInit(stbInfo.req.numOfColumns, sizeof(SFieldWithOptions)))) {
3,720✔
UNCOV
906
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
907
          }
908
          if (!pTags && !(pTags = taosArrayInit(stbInfo.req.numOfTags, sizeof(SField)))) {
3,720✔
909
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
910
          }
911

912
          if (!pColExts && !(pColExts = taosArrayInit(stbInfo.req.numOfColumns, sizeof(col_id_t)))) {
3,720✔
UNCOV
913
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
914
          }
915
          if (!pTagExts && !(pTagExts = taosArrayInit(stbInfo.req.numOfTags, sizeof(col_id_t)))) {
3,720✔
916
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
917
          }
918
          taosArrayClear(pCols);
3,720✔
919
          taosArrayClear(pTags);
3,720✔
920
          taosArrayClear(pColExts);
3,720✔
921
          taosArrayClear(pTagExts);
3,720✔
922
          stbInfo.req.pColumns = pCols;
3,720✔
923
          stbInfo.req.pTags = pTags;
3,720✔
924
          stbInfo.pColExts = pColExts;
3,720✔
925
          stbInfo.pTagExts = pTagExts;
3,720✔
926

927
          for (int32_t c = 0; c < stbInfo.req.numOfColumns; ++c) {
22,320✔
928
            SSchema          *pSchema = mr.me.stbEntry.schemaRow.pSchema + c;
18,600✔
929
            SColCmpr         *pColComp = mr.me.colCmpr.pColCmpr + c;
18,600✔
930
            SFieldWithOptions col = {
18,600✔
931
                .type = pSchema->type,
18,600✔
932
                .flags = pSchema->flags,
18,600✔
933
                .bytes = pSchema->bytes,
18,600✔
934
                .compress = pColComp->alg,
18,600✔
935
            };
936
            (void)snprintf(col.name, sizeof(col.name), "%s", pSchema->name);
18,600✔
937
            if (pSchema->colId != pColComp->id) {
18,600✔
UNCOV
938
              TSDB_CHECK_CODE(TSDB_CODE_FILE_CORRUPTED, lino, _exit0);
×
939
            }
940
            if (mr.me.pExtSchemas) {
18,600✔
941
              col.typeMod = (mr.me.pExtSchemas + c)->typeMod;
×
942
            }
943
            TSDB_CHECK_NULL(taosArrayPush(pCols, &col), code, lino, _exit0, terrno);
18,600✔
944
            TSDB_CHECK_NULL(taosArrayPush(pColExts, &pSchema->colId), code, lino, _exit0, terrno);
37,200✔
945
          }
946
          for (int32_t t = 0; t < stbInfo.req.numOfTags; ++t) {
8,680✔
947
            SSchema *pSchema = mr.me.stbEntry.schemaTag.pSchema + t;
4,960✔
948
            SField   tag = {
4,960✔
949
                  .type = pSchema->type,
4,960✔
950
                  .flags = pSchema->flags,
4,960✔
951
                  .bytes = pSchema->bytes,
4,960✔
952
            };
953
            (void)snprintf(tag.name, sizeof(tag.name), "%s", pSchema->name);
4,960✔
954
            TSDB_CHECK_NULL(taosArrayPush(pTags, &tag), code, lino, _exit0, terrno);
4,960✔
955
            TSDB_CHECK_NULL(taosArrayPush(pTagExts, &pSchema->colId), code, lino, _exit0, terrno);
9,920✔
956
          }
957
          tDecoderClear(&mr.coder);
3,720✔
958

959
          // serialize the SMountStbInfo
960
          int32_t firstPartLen = 0;
3,720✔
961
          int32_t msgLen = tSerializeSMountStbInfo(NULL, 0, &firstPartLen, &stbInfo);
3,720✔
962
          if (msgLen <= 0) {
3,720✔
UNCOV
963
            TSDB_CHECK_CODE(msgLen < 0 ? msgLen : TSDB_CODE_INTERNAL_ERROR, lino, _exit0);
×
964
          }
965
          void *pBuf = taosMemoryMalloc((sizeof(int32_t) << 1) + msgLen);  // totalLen(4)|1stPartLen(4)|1stPart|2ndPart
3,720✔
966
          if (!pBuf) TSDB_CHECK_CODE(TSDB_CODE_OUT_OF_MEMORY, lino, _exit0);
3,720✔
967
          *(int32_t *)pBuf = (sizeof(int32_t) << 1) + msgLen;
3,720✔
968
          *(int32_t *)POINTER_SHIFT(pBuf, sizeof(int32_t)) = firstPartLen;
3,720✔
969
          if (tSerializeSMountStbInfo(POINTER_SHIFT(pBuf, (sizeof(int32_t) << 1)), msgLen, NULL, &stbInfo) <= 0) {
3,720✔
UNCOV
970
            taosMemoryFree(pBuf);
×
UNCOV
971
            TSDB_CHECK_CODE(msgLen < 0 ? msgLen : TSDB_CODE_INTERNAL_ERROR, lino, _exit0);
×
972
          }
973
          if (!taosArrayPush(pDbInfo->pStbs, &pBuf)) {
7,440✔
974
            taosMemoryFree(pBuf);
×
UNCOV
975
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
976
          }
977
        }
978
      _exit0:
1,240✔
979
        metaReaderClear(&mr);
1,240✔
980
        metaClose(&vnode.pMeta);
1,240✔
981
        TAOS_CHECK_EXIT(code);
1,240✔
982
      }
983
      break;  // retrieve stbs from one vnode is enough
1,240✔
984
    }
985
  }
986
_exit:
620✔
987
  if (code != 0) {
620✔
UNCOV
988
    dError("mount:%s, failed to retrieve mount stbs at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
989
           pReq->dnodeId, tstrerror(code), path);
990
  }
991
  taosArrayDestroy(suidList);
620✔
992
  taosArrayDestroy(pCols);
620✔
993
  taosArrayDestroy(pTags);
620✔
994
  taosArrayDestroy(pColExts);
620✔
995
  taosArrayDestroy(pTagExts);
620✔
996
  TAOS_RETURN(code);
620✔
997
}
998

999
int32_t vmMountCheckRunning(const char *mountName, const char *mountPath, TdFilePtr *pFile, int32_t retryLimit) {
2,796✔
1000
  int32_t code = 0, lino = 0;
2,796✔
1001
  int32_t retryTimes = 0;
2,796✔
1002
  char    filepath[PATH_MAX] = {0};
2,796✔
1003
  (void)snprintf(filepath, sizeof(filepath), "%s%s.running", mountPath, TD_DIRSEP);
2,796✔
1004
  TSDB_CHECK_NULL((*pFile = taosOpenFile(
2,796✔
1005
                       filepath, TD_FILE_CREATE | TD_FILE_READ | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_CLOEXEC)),
1006
                  code, lino, _exit, terrno);
1007
  int32_t ret = 0;
2,796✔
1008
  do {
1009
    ret = taosLockFile(*pFile);
3,420✔
1010
    if (ret == 0) break;
3,420✔
1011
    taosMsleep(1000);
936✔
1012
    ++retryTimes;
936✔
1013
    dError("mount:%s, failed to lock file:%s since %s, retryTimes:%d", mountName, filepath, tstrerror(ret), retryTimes);
936✔
1014
  } while (retryTimes < retryLimit);
936✔
1015
  TAOS_CHECK_EXIT(ret);
2,796✔
1016
_exit:
2,796✔
1017
  if (code != 0) {
2,796✔
1018
    (void)taosCloseFile(pFile);
312✔
1019
    *pFile = NULL;
312✔
1020
    dError("mount:%s, failed to check running at line %d since %s, path:%s", mountName, lino, tstrerror(code),
312✔
1021
           filepath);
1022
  }
1023
  TAOS_RETURN(code);
2,796✔
1024
}
1025

1026
static int32_t vmRetrieveMountPreCheck(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
1,868✔
1027
  int32_t code = 0, lino = 0;
1,868✔
1028
  char    path[TSDB_MOUNT_FPATH_LEN] = {0};
1,868✔
1029
  TSDB_CHECK_CONDITION(taosCheckAccessFile(pReq->mountPath, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
1,868✔
1030
  TAOS_CHECK_EXIT(vmMountCheckRunning(pReq->mountName, pReq->mountPath, &pMountInfo->pFile, 3));
1,556✔
1031
  (void)snprintf(path, sizeof(path), "%s%s%s%sdnode.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP);
1,244✔
1032
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
1,244✔
1033
  (void)snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(MNODE));
932✔
1034
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
932✔
1035
  (void)snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
932✔
1036
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
932✔
1037
  (void)snprintf(path, sizeof(path), "%s%s%s%sconfig%slocal.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP,
932✔
1038
           TD_DIRSEP);
1039
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
932✔
1040
_exit:
1,868✔
1041
  if (code != 0) {
1,868✔
1042
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
936✔
1043
           pReq->dnodeId, tstrerror(code), path);
1044
  }
1045
  TAOS_RETURN(code);
1,868✔
1046
}
1047

1048
static int32_t vmRetrieveMountPathImpl(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, SRetrieveMountPathReq *pReq,
1,868✔
1049
                                       SMountInfo *pMountInfo) {
1050
  int32_t code = 0, lino = 0;
1,868✔
1051
  pMountInfo->dnodeId = pReq->dnodeId;
1,868✔
1052
  pMountInfo->mountUid = pReq->mountUid;
1,868✔
1053
  (void)tsnprintf(pMountInfo->mountName, sizeof(pMountInfo->mountName), "%s", pReq->mountName);
1,868✔
1054
  (void)tsnprintf(pMountInfo->mountPath, sizeof(pMountInfo->mountPath), "%s", pReq->mountPath);
1,868✔
1055
  pMountInfo->ignoreExist = pReq->ignoreExist;
1,868✔
1056
  pMountInfo->valLen = pReq->valLen;
1,868✔
1057
  pMountInfo->pVal = pReq->pVal;
1,868✔
1058
  TAOS_CHECK_EXIT(vmRetrieveMountPreCheck(pMgmt, pReq, pMountInfo));
1,868✔
1059
  TAOS_CHECK_EXIT(vmRetrieveMountDnode(pMgmt, pReq, pMountInfo));
932✔
1060
  TAOS_CHECK_EXIT(vmRetrieveMountVnodes(pMgmt, pReq, pMountInfo));
932✔
1061
  TAOS_CHECK_EXIT(vmRetrieveMountStbs(pMgmt, pReq, pMountInfo));
620✔
1062
_exit:
620✔
1063
  if (code != 0) {
1,868✔
1064
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
1,248✔
1065
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
1066
  }
1067
  TAOS_RETURN(code);
1,868✔
1068
}
1069

1070
int32_t vmProcessRetrieveMountPathReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
1,868✔
1071
  int32_t               code = 0, lino = 0;
1,868✔
1072
  int32_t               rspCode = 0;
1,868✔
1073
  SVnodeMgmt            vndMgmt = {0};
1,868✔
1074
  SMountInfo            mountInfo = {0};
1,868✔
1075
  void                 *pBuf = NULL;
1,868✔
1076
  int32_t               bufLen = 0;
1,868✔
1077
  SRetrieveMountPathReq req = {0};
1,868✔
1078

1079
  vndMgmt = *pMgmt;
1,868✔
1080
  vndMgmt.path = NULL;
1,868✔
1081
  TAOS_CHECK_GOTO(tDeserializeSRetrieveMountPathReq(pMsg->pCont, pMsg->contLen, &req), &lino, _end);
1,868✔
1082
  dInfo("mount:%s, start to retrieve path:%s", req.mountName, req.mountPath);
1,868✔
1083
  TAOS_CHECK_GOTO(vmRetrieveMountPathImpl(&vndMgmt, pMsg, &req, &mountInfo), &lino, _end);
1,868✔
1084
_end:
1,868✔
1085
  TSDB_CHECK_CONDITION((bufLen = tSerializeSMountInfo(NULL, 0, &mountInfo)) >= 0, rspCode, lino, _exit, bufLen);
1,868✔
1086
  TSDB_CHECK_CONDITION((pBuf = rpcMallocCont(bufLen)), rspCode, lino, _exit, terrno);
1,868✔
1087
  TSDB_CHECK_CONDITION((bufLen = tSerializeSMountInfo(pBuf, bufLen, &mountInfo)) >= 0, rspCode, lino, _exit, bufLen);
1,868✔
1088
  pMsg->info.rsp = pBuf;
1,868✔
1089
  pMsg->info.rspLen = bufLen;
1,868✔
1090
_exit:
1,868✔
1091
  if (rspCode != 0) {
1,868✔
1092
    // corner case: if occurs, the client will not receive the response, and the client should be killed manually
UNCOV
1093
    dError("mount:%s, failed to retrieve mount at line %d since %s, dnode:%d, path:%s", req.mountName, lino,
×
1094
           tstrerror(rspCode), req.dnodeId, req.mountPath);
UNCOV
1095
    rpcFreeCont(pBuf);
×
1096
    code = rspCode;
×
1097
  } else if (code != 0) {
1,868✔
1098
    // the client would receive the response with error msg
1099
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", req.mountName, lino,
1,248✔
1100
           req.dnodeId, tstrerror(code), req.mountPath);
1101
  } else {
1102
    int32_t nVgs = 0;
620✔
1103
    int32_t nDbs = taosArrayGetSize(mountInfo.pDbs);
620✔
1104
    for (int32_t i = 0; i < nDbs; ++i) {
1,860✔
1105
      SMountDbInfo *pDb = TARRAY_GET_ELEM(mountInfo.pDbs, i);
1,240✔
1106
      nVgs += taosArrayGetSize(pDb->pVgs);
1,240✔
1107
    }
1108
    dInfo("mount:%s, success to retrieve mount, nDbs:%d, nVgs:%d, path:%s", req.mountName, nDbs, nVgs, req.mountPath);
620✔
1109
  }
1110
  taosMemFreeClear(vndMgmt.path);
1,868✔
1111
  tFreeMountInfo(&mountInfo, false);
1,868✔
1112
  TAOS_RETURN(code);
1,868✔
1113
}
1114

1115
static int32_t vmMountVnode(SVnodeMgmt *pMgmt, const char *path, SVnodeCfg *pCfg, int32_t diskPrimary,
2,480✔
1116
                            SMountVnodeReq *req, STfs *pMountTfs) {
1117
  int32_t    code = 0;
2,480✔
1118
  SVnodeInfo info = {0};
2,480✔
1119
  char       hostDir[TSDB_FILENAME_LEN] = {0};
2,480✔
1120
  char       mountDir[TSDB_FILENAME_LEN] = {0};
2,480✔
1121
  char       mountVnode[32] = {0};
2,480✔
1122

1123
  if ((code = vnodeCheckCfg(pCfg)) < 0) {
2,480✔
UNCOV
1124
    vError("vgId:%d, mount:%s, failed to mount vnode since:%s", pCfg->vgId, req->mountName, tstrerror(code));
×
UNCOV
1125
    return code;
×
1126
  }
1127

1128
  vnodeGetPrimaryDir(path, 0, pMgmt->pTfs, hostDir, TSDB_FILENAME_LEN);
2,480✔
1129
  if ((code = taosMkDir(hostDir))) {
2,480✔
UNCOV
1130
    vError("vgId:%d, mount:%s, failed to prepare vnode dir since %s, host path: %s", pCfg->vgId, req->mountName,
×
1131
           tstrerror(code), hostDir);
UNCOV
1132
    return code;
×
1133
  }
1134

1135
  info.config = *pCfg;  // copy the config
2,480✔
1136
  info.state.committed = req->committed;
2,480✔
1137
  info.state.commitID = req->commitID;
2,480✔
1138
  info.state.commitTerm = req->commitTerm;
2,480✔
1139
  info.state.applied = req->committed;
2,480✔
1140
  info.state.applyTerm = req->commitTerm;
2,480✔
1141
  info.config.vndStats.numOfSTables = req->numOfSTables;
2,480✔
1142
  info.config.vndStats.numOfCTables = req->numOfCTables;
2,480✔
1143
  info.config.vndStats.numOfNTables = req->numOfNTables;
2,480✔
1144

1145
  SVnodeInfo oldInfo = {0};
2,480✔
1146
  oldInfo.config = vnodeCfgDefault;
2,480✔
1147
  if (vnodeLoadInfo(hostDir, &oldInfo) == 0) {
2,480✔
UNCOV
1148
    if (oldInfo.config.dbId != info.config.dbId) {
×
UNCOV
1149
      code = TSDB_CODE_VND_ALREADY_EXIST_BUT_NOT_MATCH;
×
UNCOV
1150
      vError("vgId:%d, mount:%s, vnode config info already exists at %s. oldDbId:%" PRId64 "(%s) at cluster:%" PRId64
×
1151
             ", newDbId:%" PRId64 "(%s) at cluser:%" PRId64 ", code:%s",
1152
             oldInfo.config.vgId, req->mountName, hostDir, oldInfo.config.dbId, oldInfo.config.dbname,
1153
             oldInfo.config.syncCfg.nodeInfo[oldInfo.config.syncCfg.myIndex].clusterId, info.config.dbId,
1154
             info.config.dbname, info.config.syncCfg.nodeInfo[info.config.syncCfg.myIndex].clusterId, tstrerror(code));
1155

1156
    } else {
UNCOV
1157
      vWarn("vgId:%d, mount:%s, vnode config info already exists at %s.", oldInfo.config.vgId, req->mountName, hostDir);
×
1158
    }
UNCOV
1159
    return code;
×
1160
  }
1161

1162
  char hostSubDir[TSDB_FILENAME_LEN] = {0};
2,480✔
1163
  char mountSubDir[TSDB_FILENAME_LEN] = {0};
2,480✔
1164
  (void)snprintf(mountVnode, sizeof(mountVnode), "vnode%svnode%d", TD_DIRSEP, req->mountVgId);
2,480✔
1165
  vnodeGetPrimaryDir(mountVnode, diskPrimary, pMountTfs, mountDir, TSDB_FILENAME_LEN);
2,480✔
1166
  static const char *vndSubDirs[] = {"meta", "sync", "tq", "tsdb", "wal"};
1167
  for (int32_t i = 0; i < tListLen(vndSubDirs); ++i) {
14,880✔
1168
    (void)snprintf(hostSubDir, sizeof(hostSubDir), "%s%s%s", hostDir, TD_DIRSEP, vndSubDirs[i]);
12,400✔
1169
    (void)snprintf(mountSubDir, sizeof(mountSubDir), "%s%s%s", mountDir, TD_DIRSEP, vndSubDirs[i]);
12,400✔
1170
    if ((code = taosSymLink(mountSubDir, hostSubDir)) != 0) {
12,400✔
UNCOV
1171
      vError("vgId:%d, mount:%s, failed to create vnode symlink %s -> %s since %s", info.config.vgId, req->mountName,
×
1172
             mountSubDir, hostSubDir, tstrerror(code));
UNCOV
1173
      return code;
×
1174
    }
1175
  }
1176
  vInfo("vgId:%d, mount:save vnode config while create", info.config.vgId);
2,480✔
1177
  if ((code = vnodeSaveInfo(hostDir, &info)) < 0 || (code = vnodeCommitInfo(hostDir)) < 0) {
2,480✔
UNCOV
1178
    vError("vgId:%d, mount:%s, failed to save vnode config since %s, mount path: %s", pCfg ? pCfg->vgId : 0,
×
1179
           req->mountName, tstrerror(code), hostDir);
UNCOV
1180
    return code;
×
1181
  }
1182
  vInfo("vgId:%d, mount:%s, vnode is mounted from %s to %s", info.config.vgId, req->mountName, mountDir, hostDir);
2,480✔
1183
  return 0;
2,480✔
1184
}
1185

1186
int32_t vmProcessMountVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
2,480✔
1187
  int32_t          code = 0, lino = 0;
2,480✔
1188
  SMountVnodeReq   req = {0};
2,480✔
1189
  SCreateVnodeReq *pCreateReq = &req.createReq;
2,480✔
1190
  SVnodeCfg        vnodeCfg = {0};
2,480✔
1191
  SWrapperCfg      wrapperCfg = {0};
2,480✔
1192
  SVnode          *pImpl = NULL;
2,480✔
1193
  STfs            *pMountTfs = NULL;
2,480✔
1194
  char             path[TSDB_FILENAME_LEN] = {0};
2,480✔
1195
  bool             releaseTfs = false;
2,480✔
1196

1197
  if (tDeserializeSMountVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
2,480✔
UNCOV
1198
    dError("vgId:%d, failed to mount vnode since deserialize request error", pCreateReq->vgId);
×
UNCOV
1199
    return TSDB_CODE_INVALID_MSG;
×
1200
  }
1201

1202
  if (pCreateReq->learnerReplica == 0) {
2,480✔
1203
    pCreateReq->learnerSelfIndex = -1;
2,480✔
1204
  }
1205
  for (int32_t i = 0; i < pCreateReq->replica; ++i) {
4,960✔
1206
    dInfo("mount:%s, vgId:%d, replica:%d ep:%s:%u dnode:%d", req.mountName, pCreateReq->vgId, i,
2,480✔
1207
          pCreateReq->replicas[i].fqdn, pCreateReq->replicas[i].port, pCreateReq->replicas[i].id);
1208
  }
1209
  for (int32_t i = 0; i < pCreateReq->learnerReplica; ++i) {
2,480✔
UNCOV
1210
    dInfo("mount:%s, vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.mountName, pCreateReq->vgId, i,
×
1211
          pCreateReq->learnerReplicas[i].fqdn, pCreateReq->learnerReplicas[i].port, pCreateReq->replicas[i].id);
1212
  }
1213

1214
  SReplica *pReplica = NULL;
2,480✔
1215
  if (pCreateReq->selfIndex != -1) {
2,480✔
1216
    pReplica = &pCreateReq->replicas[pCreateReq->selfIndex];
2,480✔
1217
  } else {
UNCOV
1218
    pReplica = &pCreateReq->learnerReplicas[pCreateReq->learnerSelfIndex];
×
1219
  }
1220
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
2,480✔
1221
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
2,480✔
UNCOV
1222
    (void)tFreeSMountVnodeReq(&req);
×
UNCOV
1223
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1224
    dError("mount:%s, vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode, reason:%s", req.mountName,
×
1225
           pCreateReq->vgId, pReplica->id, pReplica->fqdn, pReplica->port, tstrerror(code));
1226
    return code;
×
1227
  }
1228
  
1229
  if (taosWaitCfgKeyLoaded() != 0) {
2,480✔
UNCOV
1230
    (void)tFreeSMountVnodeReq(&req);
×
UNCOV
1231
    code = terrno;
×
UNCOV
1232
    dError("mount:%s, vgId:%d, failed to create vnode since encrypt key is not loaded, reason:%s", req.mountName,
×
1233
           pCreateReq->vgId, tstrerror(code));
1234
    return code;
×
1235
  }
1236

1237
  vmGenerateVnodeCfg(pCreateReq, &vnodeCfg);
2,480✔
1238
  vnodeCfg.mountVgId = req.mountVgId;
2,480✔
1239
  vmGenerateWrapperCfg(pMgmt, pCreateReq, &wrapperCfg);
2,480✔
1240
  wrapperCfg.mountId = req.mountId;
2,480✔
1241

1242
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, pCreateReq->vgId, false);
2,480✔
1243
  if (pVnode != NULL && (pCreateReq->replica == 1 || !pVnode->failed)) {
2,480✔
UNCOV
1244
    dError("mount:%s, vgId:%d, already exist", req.mountName, pCreateReq->vgId);
×
UNCOV
1245
    (void)tFreeSMountVnodeReq(&req);
×
UNCOV
1246
    vmReleaseVnode(pMgmt, pVnode);
×
1247
    code = TSDB_CODE_VND_ALREADY_EXIST;
×
1248
    return 0;
×
1249
  }
1250
  vmReleaseVnode(pMgmt, pVnode);
2,480✔
1251

1252
  wrapperCfg.diskPrimary = req.diskPrimary;
2,480✔
1253
  (void)snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
2,480✔
1254
  TAOS_CHECK_EXIT(vmAcquireMountTfs(pMgmt, req.mountId, req.mountName, req.mountPath, &pMountTfs));
2,480✔
1255
  releaseTfs = true;
2,480✔
1256

1257
  TAOS_CHECK_EXIT(vmMountVnode(pMgmt, path, &vnodeCfg, wrapperCfg.diskPrimary, &req, pMountTfs));
2,480✔
1258
  if (!(pImpl = vnodeOpen(path, 0, pMgmt->pTfs, pMountTfs, pMgmt->msgCb, true))) {
2,480✔
UNCOV
1259
    TAOS_CHECK_EXIT(terrno != 0 ? terrno : -1);
×
1260
  }
1261
  if ((code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl)) != 0) {
2,480✔
1262
    TAOS_CHECK_EXIT(terrno != 0 ? terrno : code);
×
1263
  }
1264
  TAOS_CHECK_EXIT(vnodeStart(pImpl));
2,480✔
1265
  TAOS_CHECK_EXIT(vmWriteVnodeListToFile(pMgmt));
2,480✔
1266
  TAOS_CHECK_EXIT(vmWriteMountListToFile(pMgmt));
2,480✔
1267
_exit:
2,480✔
1268
  vmCleanPrimaryDisk(pMgmt, pCreateReq->vgId);
2,480✔
1269
  if (code != 0) {
2,480✔
UNCOV
1270
    dError("mount:%s, vgId:%d, msgType:%s, failed at line %d to mount vnode since %s", req.mountName, pCreateReq->vgId,
×
1271
           TMSG_INFO(pMsg->msgType), lino, tstrerror(code));
UNCOV
1272
    vmCloseFailedVnode(pMgmt, pCreateReq->vgId);
×
1273
    vnodeClose(pImpl);
×
UNCOV
1274
    vnodeDestroy(0, path, pMgmt->pTfs, 0);
×
1275
    if (releaseTfs) vmReleaseMountTfs(pMgmt, req.mountId, 1);
×
1276
  } else {
1277
    dInfo("mount:%s, vgId:%d, msgType:%s, success to mount vnode", req.mountName, pCreateReq->vgId,
2,480✔
1278
          TMSG_INFO(pMsg->msgType));
1279
  }
1280

1281
  pMsg->code = code;
2,480✔
1282
  pMsg->info.rsp = NULL;
2,480✔
1283
  pMsg->info.rspLen = 0;
2,480✔
1284

1285
  (void)tFreeSMountVnodeReq(&req);
2,480✔
1286
  TAOS_RETURN(code);
2,480✔
1287
}
1288
#endif  // USE_MOUNT
1289

1290
// alter replica doesn't use this, but restore dnode still use this
1291
int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
4,376,502✔
1292
  SAlterVnodeTypeReq req = {0};
4,376,502✔
1293
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
4,376,502✔
UNCOV
1294
    terrno = TSDB_CODE_INVALID_MSG;
×
UNCOV
1295
    return -1;
×
1296
  }
1297

1298
  if (req.learnerReplicas == 0) {
1299
    req.learnerSelfIndex = -1;
1300
  }
1301

1302
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process alter-node-type-request", req.vgId,
4,376,502✔
1303
        TMSG_INFO(pMsg->msgType));
1304

1305
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
4,376,502✔
1306
  if (pVnode == NULL) {
4,376,502✔
UNCOV
1307
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
UNCOV
1308
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
UNCOV
1309
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1310
    return -1;
×
1311
  }
1312

1313
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
4,376,502✔
1314
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
4,376,502✔
1315
  if (role == TAOS_SYNC_ROLE_VOTER) {
4,376,502✔
UNCOV
1316
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
UNCOV
1317
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
UNCOV
1318
    vmReleaseVnode(pMgmt, pVnode);
×
1319
    return -1;
×
1320
  }
1321

1322
  dInfo("vgId:%d, checking node catch up", req.vgId);
4,376,502✔
1323
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
4,376,502✔
1324
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
4,179,598✔
1325
    vmReleaseVnode(pMgmt, pVnode);
4,179,598✔
1326
    return -1;
4,179,598✔
1327
  }
1328

1329
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
196,904✔
1330

1331
  int32_t vgId = req.vgId;
196,904✔
1332
  dInfo("vgId:%d, start to alter vnode type replica:%d selfIndex:%d strict:%d changeVersion:%d", vgId, req.replica,
196,904✔
1333
        req.selfIndex, req.strict, req.changeVersion);
1334
  for (int32_t i = 0; i < req.replica; ++i) {
788,020✔
1335
    SReplica *pReplica = &req.replicas[i];
591,116✔
1336
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
591,116✔
1337
  }
1338
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
196,904✔
UNCOV
1339
    SReplica *pReplica = &req.learnerReplicas[i];
×
UNCOV
1340
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
×
1341
  }
1342

1343
  if (req.replica <= 0 || (req.selfIndex < 0 && req.learnerSelfIndex < 0) || req.selfIndex >= req.replica ||
196,904✔
1344
      req.learnerSelfIndex >= req.learnerReplica) {
196,904✔
UNCOV
1345
    terrno = TSDB_CODE_INVALID_MSG;
×
UNCOV
1346
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
UNCOV
1347
    vmReleaseVnode(pMgmt, pVnode);
×
1348
    return -1;
×
1349
  }
1350

1351
  SReplica *pReplica = NULL;
196,904✔
1352
  if (req.selfIndex != -1) {
196,904✔
1353
    pReplica = &req.replicas[req.selfIndex];
196,904✔
1354
  } else {
UNCOV
1355
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
×
1356
  }
1357

1358
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
196,904✔
1359
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
196,904✔
UNCOV
1360
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
UNCOV
1361
    dError("vgId:%d, dnodeId:%d ep:%s:%u in request, ep:%s:%u in local, %s", vgId, pReplica->id, pReplica->fqdn,
×
1362
           pReplica->port, tsLocalFqdn, tsServerPort, tstrerror(terrno));
1363
    vmReleaseVnode(pMgmt, pVnode);
×
1364
    return -1;
×
1365
  }
1366

1367
  dInfo("vgId:%d, start to close vnode", vgId);
196,904✔
1368
  SWrapperCfg wrapperCfg = {
196,904✔
1369
      .dropped = pVnode->dropped,
196,904✔
1370
      .vgId = pVnode->vgId,
196,904✔
1371
      .vgVersion = pVnode->vgVersion,
196,904✔
1372
      .diskPrimary = pVnode->diskPrimary,
196,904✔
1373
  };
1374
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
196,904✔
1375

1376
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
196,904✔
1377
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
196,904✔
1378

1379
  int32_t diskPrimary = wrapperCfg.diskPrimary;
196,904✔
1380
  char    path[TSDB_FILENAME_LEN] = {0};
196,904✔
1381
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
196,904✔
1382

1383
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
196,904✔
1384
  if (vnodeAlterReplica(path, &req, diskPrimary, pMgmt->pTfs) < 0) {
196,904✔
UNCOV
1385
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
UNCOV
1386
    return -1;
×
1387
  }
1388

1389
  dInfo("vgId:%d, begin to open vnode", vgId);
196,904✔
1390
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
196,904✔
1391
  if (pImpl == NULL) {
196,904✔
UNCOV
1392
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
UNCOV
1393
    return -1;
×
1394
  }
1395

1396
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
196,904✔
UNCOV
1397
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
UNCOV
1398
    return -1;
×
1399
  }
1400

1401
  if (vnodeStart(pImpl) != 0) {
196,904✔
UNCOV
1402
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
UNCOV
1403
    return -1;
×
1404
  }
1405

1406
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process alter-node-type-request, vnode config is altered",
196,904✔
1407
        req.vgId, TMSG_INFO(pMsg->msgType));
1408
  return 0;
196,904✔
1409
}
1410

UNCOV
1411
int32_t vmProcessCheckLearnCatchupReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
UNCOV
1412
  SCheckLearnCatchupReq req = {0};
×
UNCOV
1413
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
×
1414
    terrno = TSDB_CODE_INVALID_MSG;
×
1415
    return -1;
×
1416
  }
1417

1418
  if (req.learnerReplicas == 0) {
1419
    req.learnerSelfIndex = -1;
1420
  }
1421

UNCOV
1422
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process check-learner-catchup-request", req.vgId,
×
1423
        TMSG_INFO(pMsg->msgType));
1424

1425
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
×
UNCOV
1426
  if (pVnode == NULL) {
×
UNCOV
1427
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
1428
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1429
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1430
    return -1;
×
1431
  }
1432

1433
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
×
UNCOV
1434
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
×
UNCOV
1435
  if (role == TAOS_SYNC_ROLE_VOTER) {
×
1436
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
1437
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
1438
    vmReleaseVnode(pMgmt, pVnode);
×
1439
    return -1;
×
1440
  }
1441

1442
  dInfo("vgId:%d, checking node catch up", req.vgId);
×
UNCOV
1443
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
×
UNCOV
1444
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
×
1445
    vmReleaseVnode(pMgmt, pVnode);
×
1446
    return -1;
×
1447
  }
1448

1449
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
×
1450

UNCOV
1451
  vmReleaseVnode(pMgmt, pVnode);
×
1452

UNCOV
1453
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process check-learner-catchup-request", req.vgId,
×
1454
        TMSG_INFO(pMsg->msgType));
1455

1456
  return 0;
×
1457
}
1458

1459
int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
55,038✔
1460
  SDisableVnodeWriteReq req = {0};
55,038✔
1461
  if (tDeserializeSDisableVnodeWriteReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
55,038✔
UNCOV
1462
    terrno = TSDB_CODE_INVALID_MSG;
×
UNCOV
1463
    return -1;
×
1464
  }
1465

1466
  dInfo("vgId:%d, vnode write disable:%d", req.vgId, req.disable);
55,038✔
1467

1468
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
55,038✔
1469
  if (pVnode == NULL) {
55,038✔
UNCOV
1470
    dError("vgId:%d, failed to disable write since %s", req.vgId, terrstr());
×
UNCOV
1471
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
UNCOV
1472
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1473
    return -1;
×
1474
  }
1475

1476
  pVnode->disable = req.disable;
55,038✔
1477
  vmReleaseVnode(pMgmt, pVnode);
55,038✔
1478
  return 0;
55,038✔
1479
}
1480

1481
int32_t vmProcessSetKeepVersionReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
6,850✔
1482
  SMsgHead *pHead = pMsg->pCont;
6,850✔
1483
  pHead->contLen = ntohl(pHead->contLen);
6,850✔
1484
  pHead->vgId = ntohl(pHead->vgId);
6,850✔
1485

1486
  SVndSetKeepVersionReq req = {0};
6,850✔
1487
  if (tDeserializeSVndSetKeepVersionReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead),
6,850✔
1488
                                        &req) != 0) {
UNCOV
1489
    terrno = TSDB_CODE_INVALID_MSG;
×
UNCOV
1490
    return -1;
×
1491
  }
1492

1493
  dInfo("vgId:%d, set wal keep version to %" PRId64, pHead->vgId, req.keepVersion);
6,850✔
1494

1495
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
6,850✔
1496
  if (pVnode == NULL) {
6,850✔
UNCOV
1497
    dError("vgId:%d, failed to set keep version since %s", pHead->vgId, terrstr());
×
UNCOV
1498
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
UNCOV
1499
    return -1;
×
1500
  }
1501

1502
  // Directly call vnodeSetWalKeepVersion for immediate effect (< 1ms)
1503
  // This bypasses Raft to avoid timing issues where WAL might be deleted
1504
  // before keepVersion is set through the Raft consensus process
1505
  int32_t code = vnodeSetWalKeepVersion(pVnode->pImpl, req.keepVersion);
6,850✔
1506
  if (code != TSDB_CODE_SUCCESS) {
6,850✔
UNCOV
1507
    dError("vgId:%d, failed to set keepVersion to %" PRId64 " since %s", pHead->vgId, req.keepVersion, tstrerror(code));
×
UNCOV
1508
    terrno = code;
×
UNCOV
1509
    vmReleaseVnode(pMgmt, pVnode);
×
1510
    return -1;
×
1511
  }
1512

1513
  dInfo("vgId:%d, successfully set keepVersion to %" PRId64, pHead->vgId, req.keepVersion);
6,850✔
1514

1515
  vmReleaseVnode(pMgmt, pVnode);
6,850✔
1516
  return 0;
6,850✔
1517
}
1518

1519
int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
53,576✔
1520
  SAlterVnodeHashRangeReq req = {0};
53,576✔
1521
  if (tDeserializeSAlterVnodeHashRangeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
53,576✔
UNCOV
1522
    terrno = TSDB_CODE_INVALID_MSG;
×
UNCOV
1523
    return -1;
×
1524
  }
1525

1526
  int32_t srcVgId = req.srcVgId;
53,576✔
1527
  int32_t dstVgId = req.dstVgId;
53,576✔
1528

1529
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, dstVgId);
53,576✔
1530
  if (pVnode != NULL) {
53,576✔
UNCOV
1531
    dError("vgId:%d, vnode already exist", dstVgId);
×
UNCOV
1532
    vmReleaseVnode(pMgmt, pVnode);
×
UNCOV
1533
    terrno = TSDB_CODE_VND_ALREADY_EXIST;
×
1534
    return -1;
×
1535
  }
1536

1537
  dInfo("vgId:%d, start to alter vnode hashrange:[%u, %u], dstVgId:%d", req.srcVgId, req.hashBegin, req.hashEnd,
53,576✔
1538
        req.dstVgId);
1539
  pVnode = vmAcquireVnode(pMgmt, srcVgId);
53,576✔
1540
  if (pVnode == NULL) {
53,576✔
UNCOV
1541
    dError("vgId:%d, failed to alter hashrange since %s", srcVgId, terrstr());
×
UNCOV
1542
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
UNCOV
1543
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1544
    return -1;
×
1545
  }
1546

1547
  SWrapperCfg wrapperCfg = {
53,576✔
1548
      .dropped = pVnode->dropped,
53,576✔
1549
      .vgId = dstVgId,
1550
      .vgVersion = pVnode->vgVersion,
53,576✔
1551
      .diskPrimary = pVnode->diskPrimary,
53,576✔
1552
  };
1553
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
53,576✔
1554

1555
  // prepare alter
1556
  pVnode->toVgId = dstVgId;
53,576✔
1557
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
53,576✔
UNCOV
1558
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
UNCOV
1559
    return -1;
×
1560
  }
1561

1562
  dInfo("vgId:%d, close vnode", srcVgId);
53,576✔
1563
  vmCloseVnode(pMgmt, pVnode, true, false);
53,576✔
1564

1565
  int32_t diskPrimary = wrapperCfg.diskPrimary;
53,576✔
1566
  char    srcPath[TSDB_FILENAME_LEN] = {0};
53,576✔
1567
  char    dstPath[TSDB_FILENAME_LEN] = {0};
53,576✔
1568
  snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
53,576✔
1569
  snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
53,576✔
1570

1571
  dInfo("vgId:%d, alter vnode hashrange at %s", srcVgId, srcPath);
53,576✔
1572
  if (vnodeAlterHashRange(srcPath, dstPath, &req, diskPrimary, pMgmt->pTfs) < 0) {
53,576✔
UNCOV
1573
    dError("vgId:%d, failed to alter vnode hashrange since %s", srcVgId, terrstr());
×
UNCOV
1574
    return -1;
×
1575
  }
1576

1577
  dInfo("vgId:%d, open vnode", dstVgId);
53,576✔
1578
  SVnode *pImpl = vnodeOpen(dstPath, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
53,576✔
1579

1580
  if (pImpl == NULL) {
53,576✔
UNCOV
1581
    dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr());
×
UNCOV
1582
    return -1;
×
1583
  }
1584

1585
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
53,576✔
UNCOV
1586
    dError("vgId:%d, failed to open vnode mgmt since %s", dstVgId, terrstr());
×
UNCOV
1587
    return -1;
×
1588
  }
1589

1590
  if (vnodeStart(pImpl) != 0) {
53,576✔
UNCOV
1591
    dError("vgId:%d, failed to start sync since %s", dstVgId, terrstr());
×
UNCOV
1592
    return -1;
×
1593
  }
1594

1595
  // complete alter
1596
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
53,576✔
UNCOV
1597
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
UNCOV
1598
    return -1;
×
1599
  }
1600

1601
  dInfo("vgId:%d, vnode hashrange is altered", dstVgId);
53,576✔
1602
  return 0;
53,576✔
1603
}
1604

1605
int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
1,381,200✔
1606
  SAlterVnodeReplicaReq alterReq = {0};
1,381,200✔
1607
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
1,381,200✔
UNCOV
1608
    terrno = TSDB_CODE_INVALID_MSG;
×
UNCOV
1609
    return -1;
×
1610
  }
1611

1612
  if (alterReq.learnerReplica == 0) {
1,381,200✔
1613
    alterReq.learnerSelfIndex = -1;
988,206✔
1614
  }
1615

1616
  int32_t vgId = alterReq.vgId;
1,381,200✔
1617
  dInfo(
1,381,200✔
1618
      "vgId:%d, vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
1619
      "learnerSelfIndex:%d strict:%d changeVersion:%d",
1620
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
1621
      alterReq.learnerSelfIndex, alterReq.strict, alterReq.changeVersion);
1622

1623
  for (int32_t i = 0; i < alterReq.replica; ++i) {
5,379,312✔
1624
    SReplica *pReplica = &alterReq.replicas[i];
3,998,112✔
1625
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
3,998,112✔
1626
  }
1627
  for (int32_t i = 0; i < alterReq.learnerReplica; ++i) {
1,774,194✔
1628
    SReplica *pReplica = &alterReq.learnerReplicas[i];
392,994✔
1629
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
392,994✔
1630
  }
1631

1632
  if (alterReq.replica <= 0 || (alterReq.selfIndex < 0 && alterReq.learnerSelfIndex < 0) ||
1,381,200✔
1633
      alterReq.selfIndex >= alterReq.replica || alterReq.learnerSelfIndex >= alterReq.learnerReplica) {
1,381,200✔
UNCOV
1634
    terrno = TSDB_CODE_INVALID_MSG;
×
UNCOV
1635
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
UNCOV
1636
    return -1;
×
1637
  }
1638

1639
  SReplica *pReplica = NULL;
1,381,200✔
1640
  if (alterReq.selfIndex != -1) {
1,381,200✔
1641
    pReplica = &alterReq.replicas[alterReq.selfIndex];
1,381,200✔
1642
  } else {
UNCOV
1643
    pReplica = &alterReq.learnerReplicas[alterReq.learnerSelfIndex];
×
1644
  }
1645

1646
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
1,381,200✔
1647
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
1,381,200✔
UNCOV
1648
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
UNCOV
1649
    dError("vgId:%d, dnodeId:%d ep:%s:%u in request, ep:%s:%u in lcoal, %s", vgId, pReplica->id, pReplica->fqdn,
×
1650
           pReplica->port, tsLocalFqdn, tsServerPort, tstrerror(terrno));
1651
    return -1;
×
1652
  }
1653

1654
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
1,381,200✔
1655
  if (pVnode == NULL) {
1,381,200✔
UNCOV
1656
    dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
×
UNCOV
1657
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
UNCOV
1658
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1659
    return -1;
×
1660
  }
1661

1662
  dInfo("vgId:%d, start to close vnode", vgId);
1,381,200✔
1663
  SWrapperCfg wrapperCfg = {
1,381,200✔
1664
      .dropped = pVnode->dropped,
1,381,200✔
1665
      .vgId = pVnode->vgId,
1,381,200✔
1666
      .vgVersion = pVnode->vgVersion,
1,381,200✔
1667
      .diskPrimary = pVnode->diskPrimary,
1,381,200✔
1668
  };
1669
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
1,381,200✔
1670

1671
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
1,381,200✔
1672
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
1,381,200✔
1673

1674
  int32_t diskPrimary = wrapperCfg.diskPrimary;
1,381,200✔
1675
  char    path[TSDB_FILENAME_LEN] = {0};
1,381,200✔
1676
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
1,381,200✔
1677

1678
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
1,381,200✔
1679
  if (vnodeAlterReplica(path, &alterReq, diskPrimary, pMgmt->pTfs) < 0) {
1,381,200✔
UNCOV
1680
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
UNCOV
1681
    return -1;
×
1682
  }
1683

1684
  dInfo("vgId:%d, begin to open vnode", vgId);
1,381,200✔
1685
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
1,381,200✔
1686
  if (pImpl == NULL) {
1,381,200✔
UNCOV
1687
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
UNCOV
1688
    return -1;
×
1689
  }
1690

1691
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
1,381,200✔
UNCOV
1692
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
UNCOV
1693
    return -1;
×
1694
  }
1695

1696
  if (vnodeStart(pImpl) != 0) {
1,381,200✔
UNCOV
1697
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
UNCOV
1698
    return -1;
×
1699
  }
1700

1701
  dInfo(
1,381,200✔
1702
      "vgId:%d, vnode management handle msgType:%s, end to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
1703
      "learnerSelfIndex:%d strict:%d",
1704
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
1705
      alterReq.learnerSelfIndex, alterReq.strict);
1706
  return 0;
1,381,200✔
1707
}
1708

1709
int32_t vmProcessAlterVnodeElectBaselineReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
42,224✔
1710
  SAlterVnodeElectBaselineReq alterReq = {0};
42,224✔
1711
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
42,224✔
UNCOV
1712
    return TSDB_CODE_INVALID_MSG;
×
1713
  }
1714

1715
  int32_t vgId = alterReq.vgId;
42,224✔
1716
  dInfo(
42,224✔
1717
      "vgId:%d, process alter vnode elect-base-line msgType:%s, electBaseLine:%d",
1718
      vgId, TMSG_INFO(pMsg->msgType), alterReq.electBaseLine);
1719

1720
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
42,224✔
1721
  if (pVnode == NULL) {
42,224✔
UNCOV
1722
    dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
×
UNCOV
1723
    return terrno;
×
1724
  }
1725

1726
  if(vnodeSetElectBaseline(pVnode->pImpl, alterReq.electBaseLine) != 0){
42,224✔
UNCOV
1727
    vmReleaseVnode(pMgmt, pVnode);
×
UNCOV
1728
    return -1;
×
1729
  }
1730

1731
  vmReleaseVnode(pMgmt, pVnode);
42,224✔
1732
  return 0;
42,224✔
1733
}
1734

1735
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
3,264,836✔
1736
  int32_t       code = 0;
3,264,836✔
1737
  SDropVnodeReq dropReq = {0};
3,264,836✔
1738
  if (tDeserializeSDropVnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
3,264,836✔
UNCOV
1739
    terrno = TSDB_CODE_INVALID_MSG;
×
UNCOV
1740
    return terrno;
×
1741
  }
1742

1743
  int32_t vgId = dropReq.vgId;
3,264,836✔
1744
  dInfo("vgId:%d, start to drop vnode", vgId);
3,264,836✔
1745

1746
  if (dropReq.dnodeId != pMgmt->pData->dnodeId) {
3,264,836✔
UNCOV
1747
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
UNCOV
1748
    dError("vgId:%d, dnodeId:%d, %s", dropReq.vgId, dropReq.dnodeId, tstrerror(terrno));
×
UNCOV
1749
    return terrno;
×
1750
  }
1751

1752
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, vgId, false);
3,264,836✔
1753
  if (pVnode == NULL) {
3,264,836✔
UNCOV
1754
    dInfo("vgId:%d, failed to drop since %s", vgId, terrstr());
×
UNCOV
1755
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
UNCOV
1756
    return terrno;
×
1757
  }
1758

1759
  pVnode->dropped = 1;
3,264,836✔
1760
  if ((code = vmWriteVnodeListToFile(pMgmt)) != 0) {
3,264,836✔
UNCOV
1761
    pVnode->dropped = 0;
×
UNCOV
1762
    vmReleaseVnode(pMgmt, pVnode);
×
UNCOV
1763
    return code;
×
1764
  }
1765

1766
  vmCloseVnode(pMgmt, pVnode, false, false);
3,264,836✔
1767
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
3,264,836✔
UNCOV
1768
    dError("vgId:%d, failed to write vnode list since %s", vgId, terrstr());
×
1769
  }
1770

1771
  dInfo("vgId:%d, is dropped", vgId);
3,264,836✔
1772
  return 0;
3,264,836✔
1773
}
1774

1775
int32_t vmProcessArbHeartBeatReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
216,434✔
1776
  SVArbHeartBeatReq arbHbReq = {0};
216,434✔
1777
  SVArbHeartBeatRsp arbHbRsp = {0};
216,434✔
1778
  if (tDeserializeSVArbHeartBeatReq(pMsg->pCont, pMsg->contLen, &arbHbReq) != 0) {
216,434✔
UNCOV
1779
    terrno = TSDB_CODE_INVALID_MSG;
×
UNCOV
1780
    return -1;
×
1781
  }
1782

1783
  if (arbHbReq.dnodeId != pMgmt->pData->dnodeId) {
216,434✔
UNCOV
1784
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
UNCOV
1785
    dError("dnodeId:%d, %s", arbHbReq.dnodeId, tstrerror(terrno));
×
UNCOV
1786
    goto _OVER;
×
1787
  }
1788

1789
  if (strlen(arbHbReq.arbToken) == 0) {
216,434✔
UNCOV
1790
    terrno = TSDB_CODE_INVALID_MSG;
×
UNCOV
1791
    dError("dnodeId:%d arbToken is empty", arbHbReq.dnodeId);
×
UNCOV
1792
    goto _OVER;
×
1793
  }
1794

1795
  size_t size = taosArrayGetSize(arbHbReq.hbMembers);
216,434✔
1796

1797
  arbHbRsp.dnodeId = pMgmt->pData->dnodeId;
216,434✔
1798
  tstrncpy(arbHbRsp.arbToken, arbHbReq.arbToken, TSDB_ARB_TOKEN_SIZE);
216,434✔
1799
  arbHbRsp.hbMembers = taosArrayInit(size, sizeof(SVArbHbRspMember));
216,434✔
1800
  if (arbHbRsp.hbMembers == NULL) {
216,434✔
UNCOV
1801
    goto _OVER;
×
1802
  }
1803

1804
  for (int32_t i = 0; i < size; i++) {
458,436✔
1805
    SVArbHbReqMember *pReqMember = taosArrayGet(arbHbReq.hbMembers, i);
242,002✔
1806
    SVnodeObj        *pVnode = vmAcquireVnode(pMgmt, pReqMember->vgId);
242,002✔
1807
    if (pVnode == NULL) {
242,002✔
1808
      dError("dnodeId:%d vgId:%d not found failed to process arb hb req", arbHbReq.dnodeId, pReqMember->vgId);
49,708✔
1809
      continue;
49,708✔
1810
    }
1811

1812
    SVArbHbRspMember rspMember = {0};
192,294✔
1813
    rspMember.vgId = pReqMember->vgId;
192,294✔
1814
    rspMember.hbSeq = pReqMember->hbSeq;
192,294✔
1815
    if (vnodeGetArbToken(pVnode->pImpl, rspMember.memberToken) != 0) {
192,294✔
UNCOV
1816
      dError("dnodeId:%d vgId:%d failed to get arb token", arbHbReq.dnodeId, pReqMember->vgId);
×
UNCOV
1817
      vmReleaseVnode(pMgmt, pVnode);
×
UNCOV
1818
      continue;
×
1819
    }
1820

1821
    if (vnodeUpdateArbTerm(pVnode->pImpl, arbHbReq.arbTerm) != 0) {
192,294✔
UNCOV
1822
      dError("dnodeId:%d vgId:%d failed to update arb term", arbHbReq.dnodeId, pReqMember->vgId);
×
UNCOV
1823
      vmReleaseVnode(pMgmt, pVnode);
×
UNCOV
1824
      continue;
×
1825
    }
1826

1827
    if (taosArrayPush(arbHbRsp.hbMembers, &rspMember) == NULL) {
384,588✔
UNCOV
1828
      dError("dnodeId:%d vgId:%d failed to push arb hb rsp member", arbHbReq.dnodeId, pReqMember->vgId);
×
UNCOV
1829
      vmReleaseVnode(pMgmt, pVnode);
×
UNCOV
1830
      goto _OVER;
×
1831
    }
1832

1833
    vmReleaseVnode(pMgmt, pVnode);
192,294✔
1834
  }
1835

1836
  SRpcMsg rspMsg = {.info = pMsg->info};
216,434✔
1837
  int32_t rspLen = tSerializeSVArbHeartBeatRsp(NULL, 0, &arbHbRsp);
216,434✔
1838
  if (rspLen < 0) {
216,434✔
UNCOV
1839
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
1840
    goto _OVER;
×
1841
  }
1842

1843
  void *pRsp = rpcMallocCont(rspLen);
216,434✔
1844
  if (pRsp == NULL) {
216,434✔
UNCOV
1845
    terrno = terrno;
×
UNCOV
1846
    goto _OVER;
×
1847
  }
1848

1849
  if (tSerializeSVArbHeartBeatRsp(pRsp, rspLen, &arbHbRsp) <= 0) {
216,434✔
UNCOV
1850
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
1851
    rpcFreeCont(pRsp);
×
UNCOV
1852
    goto _OVER;
×
1853
  }
1854
  pMsg->info.rsp = pRsp;
216,434✔
1855
  pMsg->info.rspLen = rspLen;
216,434✔
1856

1857
  terrno = TSDB_CODE_SUCCESS;
216,434✔
1858

1859
_OVER:
216,434✔
1860
  tFreeSVArbHeartBeatReq(&arbHbReq);
216,434✔
1861
  tFreeSVArbHeartBeatRsp(&arbHbRsp);
216,434✔
1862
  return terrno;
216,434✔
1863
}
1864

1865
SArray *vmGetMsgHandles() {
1,275,512✔
1866
  int32_t code = -1;
1,275,512✔
1867
  SArray *pArray = taosArrayInit(32, sizeof(SMgmtHandle));
1,275,512✔
1868
  if (pArray == NULL) goto _OVER;
1,275,512✔
1869

1870
  if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,275,512✔
1871
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
1,275,512✔
1872
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
1,275,512✔
1873
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
1,275,512✔
1874
  if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,275,512✔
1875
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,275,512✔
1876
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,275,512✔
1877
  if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,275,512✔
1878
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,275,512✔
1879
  if (dmSetMgmtHandle(pArray, TDMT_VND_VSUBTABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,275,512✔
1880
  if (dmSetMgmtHandle(pArray, TDMT_VND_VSTB_REF_DBS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,275,512✔
1881
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_CFG, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,275,512✔
1882
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,275,512✔
1883
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,275,512✔
1884
  if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,275,512✔
1885
  if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,275,512✔
1886
  if (dmSetMgmtHandle(pArray, TDMT_SCH_TASK_NOTIFY, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,275,512✔
1887
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,275,512✔
1888
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,275,512✔
1889
  if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_TTL_EXPIRED_TBS, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,275,512✔
1890
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,275,512✔
1891
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,275,512✔
1892
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,275,512✔
1893
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,275,512✔
1894
  if (dmSetMgmtHandle(pArray, TDMT_VND_SNODE_DROP_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,275,512✔
1895
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,275,512✔
1896
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DELETE_SUB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,275,512✔
1897
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,275,512✔
1898
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SEEK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,275,512✔
1899
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
1,275,512✔
1900
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME_PUSH, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
1,275,512✔
1901
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_WALINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,275,512✔
1902
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_COMMITTEDINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,275,512✔
1903
  if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,275,512✔
1904
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,275,512✔
1905
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,275,512✔
1906
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,275,512✔
1907
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,275,512✔
1908
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,275,512✔
1909
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_COMPACT_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,275,512✔
1910
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_TRIM_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,275,512✔
1911
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_SCAN_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,275,512✔
1912
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,275,512✔
1913
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_SCAN, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,275,512✔
1914
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,275,512✔
1915
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_NAME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,275,512✔
1916
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,275,512✔
1917
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,275,512✔
1918
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,275,512✔
1919

1920
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,275,512✔
1921
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,275,512✔
1922
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,275,512✔
1923
  if (dmSetMgmtHandle(pArray, TDMT_VND_DISABLE_WRITE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,275,512✔
1924
  if (dmSetMgmtHandle(pArray, TDMT_VND_SET_KEEP_VERSION, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,275,512✔
1925
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,275,512✔
1926
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,275,512✔
1927
  if (dmSetMgmtHandle(pArray, TDMT_VND_SCAN, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,275,512✔
1928
  if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,275,512✔
1929
  if (dmSetMgmtHandle(pArray, TDMT_VND_LIST_SSMIGRATE_FILESETS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,275,512✔
1930
  if (dmSetMgmtHandle(pArray, TDMT_VND_SSMIGRATE_FILESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,275,512✔
1931
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_SSMIGRATE_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,275,512✔
1932
  if (dmSetMgmtHandle(pArray, TDMT_VND_FOLLOWER_SSMIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,275,512✔
1933
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_SSMIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,275,512✔
1934
  if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM_WAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,275,512✔
1935
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
1,275,512✔
1936
  if (dmSetMgmtHandle(pArray, TDMT_DND_MOUNT_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
1,275,512✔
1937
  if (dmSetMgmtHandle(pArray, TDMT_DND_RETRIEVE_MOUNT_PATH, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
1,275,512✔
1938
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,275,512✔
1939
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,275,512✔
1940
  if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,275,512✔
1941
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CONFIG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,275,512✔
1942
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_ELECTBASELINE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,275,512✔
1943

1944
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT_ELECTION, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,275,512✔
1945
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,275,512✔
1946
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,275,512✔
1947
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,275,512✔
1948
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,275,512✔
1949
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,275,512✔
1950
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,275,512✔
1951
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,275,512✔
1952
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,275,512✔
1953
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,275,512✔
1954
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,275,512✔
1955
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_FORCE_FOLLOWER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,275,512✔
1956

1957
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
1,275,512✔
1958
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
1,275,512✔
1959
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
1,275,512✔
1960
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
1,275,512✔
1961
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
1,275,512✔
1962

1963
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_HEARTBEAT, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,275,512✔
1964
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_CHECK_SYNC, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,275,512✔
1965
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_ASSIGNED_LEADER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,275,512✔
1966

1967
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_FETCH, vmPutMsgToStreamReaderQueue, 0) == NULL) goto _OVER;
1,275,512✔
1968
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_PULL, vmPutMsgToStreamReaderQueue, 0) == NULL) goto _OVER;
1,275,512✔
1969
  if (dmSetMgmtHandle(pArray, TDMT_VND_AUDIT_RECORD, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,275,512✔
1970

1971
  code = 0;
1,275,512✔
1972

1973
_OVER:
1,275,512✔
1974
  if (code != 0) {
1,275,512✔
UNCOV
1975
    taosArrayDestroy(pArray);
×
UNCOV
1976
    return NULL;
×
1977
  } else {
1978
    return pArray;
1,275,512✔
1979
  }
1980
}
1981

UNCOV
1982
void vmUpdateMetricsInfo(SVnodeMgmt *pMgmt, int64_t clusterId) {
×
UNCOV
1983
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
1984

1985
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
1986
  while (pIter) {
×
UNCOV
1987
    SVnodeObj **ppVnode = pIter;
×
1988
    if (ppVnode == NULL || *ppVnode == NULL) {
×
1989
      continue;
×
1990
    }
1991

1992
    SVnodeObj *pVnode = *ppVnode;
×
UNCOV
1993
    if (!pVnode->failed) {
×
UNCOV
1994
      SRawWriteMetrics metrics = {0};
×
1995
      if (vnodeGetRawWriteMetrics(pVnode->pImpl, &metrics) == 0) {
×
1996
        // Add the metrics to the global metrics system with cluster ID
1997
        SName   name = {0};
×
1998
        int32_t code = tNameFromString(&name, pVnode->pImpl->config.dbname, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
UNCOV
1999
        if (code < 0) {
×
2000
          dError("failed to get db name since %s", tstrerror(code));
×
2001
          continue;
×
2002
        }
2003
        code = addWriteMetrics(pVnode->vgId, pMgmt->pData->dnodeId, clusterId, tsLocalEp, name.dbname, &metrics);
×
2004
        if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
2005
          dError("Failed to add write metrics for vgId: %d, code: %d", pVnode->vgId, code);
×
2006
        } else {
2007
          // After successfully adding metrics, reset the vnode's write metrics using atomic operations
2008
          if (vnodeResetRawWriteMetrics(pVnode->pImpl, &metrics) != 0) {
×
UNCOV
2009
            dError("Failed to reset write metrics for vgId: %d", pVnode->vgId);
×
2010
          }
2011
        }
2012
      } else {
UNCOV
2013
        dError("Failed to get write metrics for vgId: %d", pVnode->vgId);
×
2014
      }
2015
    }
2016
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
2017
  }
2018

2019
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
UNCOV
2020
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc