• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5028

20 Apr 2026 09:07AM UTC coverage: 72.986% (-0.01%) from 72.996%
#5028

push

travis-ci

web-flow
perf: optimize compact progress query from O(m*n) to O(n+m) (#35115)

104 of 141 new or added lines in 4 files covered. (73.76%)

5170 existing lines in 133 files now uncovered.

273777 of 375111 relevant lines covered (72.99%)

130458474.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.31
/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "metrics.h"
18
#include "taos_monitor.h"
19
#include "vmInt.h"
20
#include "vnd.h"
21
#include "vnodeInt.h"
22
#include "tencrypt.h"
23

24
extern taos_counter_t *tsInsertCounter;
25

26
#ifdef TD_ENTERPRISE
27
// Forward declaration for enterprise function
28
extern int32_t vnodeGetCompactProgress(SVnode *pVnode, int32_t compactId, SQueryCompactProgressRsp *pRsp);
29
#endif
30

31
// Forward declaration for function defined in metrics.c
32
extern int32_t addWriteMetrics(int32_t vgId, int32_t dnodeId, int64_t clusterId, const char *dnodeEp,
33
                               const char *dbname, const SRawWriteMetrics *pRawMetrics);
34

35
void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
51,438,798✔
36
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad));
51,438,798✔
37
  if (pInfo->pVloads == NULL) return;
51,438,798✔
38

39
  tfsUpdateSize(pMgmt->pTfs);
51,438,798✔
40

41
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
51,438,798✔
42

43
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
51,438,798✔
44
  while (pIter) {
201,469,417✔
45
    SVnodeObj **ppVnode = pIter;
150,030,619✔
46
    if (ppVnode == NULL || *ppVnode == NULL) continue;
150,030,619✔
47

48
    SVnodeObj *pVnode = *ppVnode;
150,030,619✔
49
    SVnodeLoad vload = {.vgId = pVnode->vgId};
150,030,619✔
50
    if (!pVnode->failed) {
150,030,619✔
51
      if (vnodeGetLoad(pVnode->pImpl, &vload) != 0) {
150,030,619✔
52
        dError("failed to get vnode load");
×
53
      }
54
      if (isReset) vnodeResetLoad(pVnode->pImpl, &vload);
150,030,619✔
55
    }
56
    if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
300,061,238✔
57
      dError("failed to push vnode load");
×
58
    }
59
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
150,030,619✔
60
  }
61

62
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
51,438,798✔
63
}
64

65
void vmSetVnodeSyncTimeout(SVnodeMgmt *pMgmt) {
×
66
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
67

68
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
69
  while (pIter) {
×
70
    SVnodeObj **ppVnode = pIter;
×
71
    if (ppVnode == NULL || *ppVnode == NULL) continue;
×
72

73
    SVnodeObj *pVnode = *ppVnode;
×
74

75
    if (vnodeSetSyncTimeout(pVnode->pImpl, tsVnodeElectIntervalMs) != 0) {
×
76
      dError("vgId:%d, failed to vnodeSetSyncTimeout", pVnode->vgId);
×
77
    }
78
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
79
  }
80

81
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
82
}
×
83

84
void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
×
85
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoadLite));
×
86
  if (!pInfo->pVloads) return;
×
87

88
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
89

90
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
91
  while (pIter) {
×
92
    SVnodeObj **ppVnode = pIter;
×
93
    if (ppVnode == NULL || *ppVnode == NULL) continue;
×
94

95
    SVnodeObj *pVnode = *ppVnode;
×
96
    if (!pVnode->failed) {
×
97
      SVnodeLoadLite vload = {0};
×
98
      if (vnodeGetLoadLite(pVnode->pImpl, &vload) == 0) {
×
99
        if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
×
100
          taosArrayDestroy(pInfo->pVloads);
×
101
          pInfo->pVloads = NULL;
×
102
          break;
×
103
        }
104
      }
105
    }
106
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
107
  }
108

109
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
110
}
111

112
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
52✔
113
  SMonVloadInfo vloads = {0};
52✔
114
  vmGetVnodeLoads(pMgmt, &vloads, true);
52✔
115

116
  SArray *pVloads = vloads.pVloads;
52✔
117
  if (pVloads == NULL) return;
52✔
118

119
  int32_t totalVnodes = 0;
52✔
120
  int32_t masterNum = 0;
52✔
121
  int64_t numOfSelectReqs = 0;
52✔
122
  int64_t numOfInsertReqs = 0;
52✔
123
  int64_t numOfInsertSuccessReqs = 0;
52✔
124
  int64_t numOfBatchInsertReqs = 0;
52✔
125
  int64_t numOfBatchInsertSuccessReqs = 0;
52✔
126

127
  for (int32_t i = 0; i < taosArrayGetSize(pVloads); ++i) {
156✔
128
    SVnodeLoad *pLoad = taosArrayGet(pVloads, i);
104✔
129
    numOfSelectReqs += pLoad->numOfSelectReqs;
104✔
130
    numOfInsertReqs += pLoad->numOfInsertReqs;
104✔
131
    numOfInsertSuccessReqs += pLoad->numOfInsertSuccessReqs;
104✔
132
    numOfBatchInsertReqs += pLoad->numOfBatchInsertReqs;
104✔
133
    numOfBatchInsertSuccessReqs += pLoad->numOfBatchInsertSuccessReqs;
104✔
134
    if (pLoad->syncState == TAOS_SYNC_STATE_LEADER || pLoad->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
104✔
135
      masterNum++;
104✔
136
    }
137
    totalVnodes++;
104✔
138
  }
139

140
  pInfo->vstat.totalVnodes = totalVnodes;
52✔
141
  pInfo->vstat.masterNum = masterNum;
52✔
142
  pInfo->vstat.numOfSelectReqs = numOfSelectReqs;
52✔
143
  pInfo->vstat.numOfInsertReqs = numOfInsertReqs;                          // delta
52✔
144
  pInfo->vstat.numOfInsertSuccessReqs = numOfInsertSuccessReqs;            // delta
52✔
145
  pInfo->vstat.numOfBatchInsertReqs = numOfBatchInsertReqs;                // delta
52✔
146
  pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;  // delta
52✔
147
  pMgmt->state.totalVnodes = totalVnodes;
52✔
148
  pMgmt->state.masterNum = masterNum;
52✔
149
  pMgmt->state.numOfSelectReqs = numOfSelectReqs;
52✔
150
  pMgmt->state.numOfInsertReqs = numOfInsertReqs;
52✔
151
  pMgmt->state.numOfInsertSuccessReqs = numOfInsertSuccessReqs;
52✔
152
  pMgmt->state.numOfBatchInsertReqs = numOfBatchInsertReqs;
52✔
153
  pMgmt->state.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;
52✔
154

155
  if (tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs) != 0) {
52✔
156
    dError("failed to get tfs monitor info");
×
157
  }
158
  taosArrayDestroy(pVloads);
52✔
159
}
160

161
void vmCleanExpriedSamples(SVnodeMgmt *pMgmt) {
52✔
162
  int list_size = taos_counter_get_keys_size(tsInsertCounter);
52✔
163
  if (list_size == 0) return;
52✔
164
  int32_t *vgroup_ids;
×
165
  char   **keys;
×
166
  int      r = 0;
×
167
  r = taos_counter_get_vgroup_ids(tsInsertCounter, &keys, &vgroup_ids, &list_size);
×
168
  if (r) {
×
169
    dError("failed to get vgroup ids");
×
170
    return;
×
171
  }
172
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
173
  for (int i = 0; i < list_size; i++) {
×
174
    int32_t vgroup_id = vgroup_ids[i];
×
175
    void   *vnode = taosHashGet(pMgmt->runngingHash, &vgroup_id, sizeof(int32_t));
×
176
    if (vnode == NULL) {
×
177
      r = taos_counter_delete(tsInsertCounter, keys[i]);
×
178
      if (r) {
×
179
        dError("failed to delete monitor sample key:%s", keys[i]);
×
180
      }
181
    }
182
  }
183
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
184
  if (vgroup_ids) taosMemoryFree(vgroup_ids);
×
185
  if (keys) taosMemoryFree(keys);
×
186
  return;
×
187
}
188

189
void vmCleanExpiredMetrics(SVnodeMgmt *pMgmt) {
×
190
  if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0 || !tsEnableMetrics) {
×
191
    return;
×
192
  }
193

194
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
195
  void     *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
196
  SHashObj *pValidVgroups = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
×
197
  if (pValidVgroups == NULL) {
×
198
    (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
199
    return;
×
200
  }
201

202
  while (pIter != NULL) {
×
203
    SVnodeObj **ppVnode = pIter;
×
204
    if (ppVnode && *ppVnode) {
×
205
      int32_t vgId = (*ppVnode)->vgId;
×
206
      char    dummy = 1;  // hash table value (we only care about the key)
×
207
      if (taosHashPut(pValidVgroups, &vgId, sizeof(int32_t), &dummy, sizeof(char)) != 0) {
×
208
        dError("failed to put vgId:%d to valid vgroups hash", vgId);
×
209
      }
210
    }
211
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
212
  }
213
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
214

215
  // Clean expired metrics by removing metrics for non-existent vgroups
216
  int32_t code = cleanupExpiredMetrics(pValidVgroups);
×
217
  if (code != TSDB_CODE_SUCCESS) {
×
218
    dError("failed to clean expired metrics, code:%d", code);
×
219
  }
220

221
  taosHashCleanup(pValidVgroups);
×
222
}
223

224
static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
3,249,256✔
225
  memcpy(pCfg, &vnodeCfgDefault, sizeof(SVnodeCfg));
3,249,256✔
226

227
  pCfg->vgId = pCreate->vgId;
3,249,256✔
228
  tstrncpy(pCfg->dbname, pCreate->db, sizeof(pCfg->dbname));
3,249,596✔
229
  pCfg->dbId = pCreate->dbUid;
3,250,887✔
230
  pCfg->szPage = pCreate->pageSize * 1024;
3,249,590✔
231
  pCfg->szCache = pCreate->pages;
3,250,107✔
232
  pCfg->cacheLast = pCreate->cacheLast;
3,248,573✔
233
  pCfg->cacheLastSize = pCreate->cacheLastSize;
3,249,596✔
234
  pCfg->cacheLastShardBits = pCreate->cacheLastShardBits;
3,249,157✔
235
  pCfg->szBuf = (uint64_t)pCreate->buffer * 1024 * 1024;
3,250,887✔
236
  pCfg->isWeak = true;
3,247,340✔
237
  pCfg->isTsma = pCreate->isTsma;
3,249,151✔
238
  pCfg->tsdbCfg.compression = pCreate->compression;
3,250,376✔
239
  pCfg->tsdbCfg.precision = pCreate->precision;
3,247,812✔
240
  pCfg->tsdbCfg.days = pCreate->daysPerFile;
3,248,986✔
241
  pCfg->tsdbCfg.keep0 = pCreate->daysToKeep0;
3,247,908✔
242
  pCfg->tsdbCfg.keep1 = pCreate->daysToKeep1;
3,249,157✔
243
  pCfg->tsdbCfg.keep2 = pCreate->daysToKeep2;
3,249,157✔
244
  pCfg->tsdbCfg.keepTimeOffset = pCreate->keepTimeOffset;
3,249,579✔
245
  pCfg->tsdbCfg.minRows = pCreate->minRows;
3,248,058✔
246
  pCfg->tsdbCfg.maxRows = pCreate->maxRows;
3,249,721✔
247
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
248
  // pCfg->tsdbCfg.encryptAlgr = pCreate->encryptAlgr;
249
  tstrncpy(pCfg->tsdbCfg.encryptData.encryptAlgrName, pCreate->encryptAlgrName, TSDB_ENCRYPT_ALGR_NAME_LEN);
3,246,986✔
250
  if (pCfg->tsdbCfg.encryptAlgr == DND_CA_SM4 || pCfg->tsdbCfg.encryptData.encryptAlgrName[0] != '\0') {
3,248,990✔
251
    tstrncpy(pCfg->tsdbCfg.encryptData.encryptKey, tsDataKey, ENCRYPT_KEY_LEN + 1);
7,318✔
252
  }
253
#else
254
  pCfg->tsdbCfg.encryptAlgr = 0;
255
#endif
256

257
  pCfg->walCfg.vgId = pCreate->vgId;  // pCreate->mountVgId ? pCreate->mountVgId : pCreate->vgId;
3,246,973✔
258
  pCfg->walCfg.fsyncPeriod = pCreate->walFsyncPeriod;
3,248,905✔
259
  pCfg->walCfg.retentionPeriod = pCreate->walRetentionPeriod;
3,246,543✔
260
  pCfg->walCfg.rollPeriod = pCreate->walRollPeriod;
3,248,775✔
261
  pCfg->walCfg.retentionSize = pCreate->walRetentionSize;
3,247,914✔
262
  pCfg->walCfg.segSize = pCreate->walSegmentSize;
3,248,591✔
263
  pCfg->walCfg.level = pCreate->walLevel;
3,247,315✔
264
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
265
  // pCfg->walCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
266
  tstrncpy(pCfg->walCfg.encryptData.encryptAlgrName, pCreate->encryptAlgrName, TSDB_ENCRYPT_ALGR_NAME_LEN);
3,247,836✔
267
  if (pCfg->walCfg.encryptAlgr == DND_CA_SM4 || pCfg->walCfg.encryptData.encryptAlgrName[0] != '\0') {
3,250,240✔
268
    tstrncpy(pCfg->walCfg.encryptData.encryptKey, tsDataKey, ENCRYPT_KEY_LEN + 1);
5,502✔
269
  }
270
#else
271
  pCfg->walCfg.encryptAlgr = 0;
272
#endif
273

274
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
275
  // pCfg->tdbEncryptAlgorithm = pCreate->encryptAlgorithm;
276
  tstrncpy(pCfg->tdbEncryptData.encryptAlgrName, pCreate->encryptAlgrName, TSDB_ENCRYPT_ALGR_NAME_LEN);
3,248,650✔
277
  if (pCfg->tdbEncryptAlgr == DND_CA_SM4 || pCfg->tdbEncryptData.encryptAlgrName[0] != '\0') {
3,245,651✔
278
    tstrncpy(pCfg->tdbEncryptData.encryptKey, tsDataKey, ENCRYPT_KEY_LEN + 1);
5,671✔
279
  }
280
#else
281
  pCfg->tdbEncryptAlgr = 0;
282
#endif
283

284
  pCfg->sttTrigger = pCreate->sstTrigger;
3,247,100✔
285
  pCfg->hashBegin = pCreate->hashBegin;
3,248,157✔
286
  pCfg->hashEnd = pCreate->hashEnd;
3,245,697✔
287
  pCfg->hashMethod = pCreate->hashMethod;
3,247,074✔
288
  pCfg->hashPrefix = pCreate->hashPrefix;
3,246,175✔
289
  pCfg->hashSuffix = pCreate->hashSuffix;
3,246,691✔
290
  pCfg->tsdbPageSize = pCreate->tsdbPageSize * 1024;
3,245,259✔
291

292
  pCfg->ssChunkSize = pCreate->ssChunkSize;
3,247,404✔
293
  pCfg->ssKeepLocal = pCreate->ssKeepLocal;
3,242,680✔
294
  pCfg->ssCompact = pCreate->ssCompact;
3,248,135✔
295

296
  pCfg->isAudit = pCreate->isAudit;
3,245,772✔
297
  pCfg->allowDrop = pCreate->allowDrop;
3,248,134✔
298
  pCfg->secureDelete = pCreate->secureDelete;
3,247,558✔
299

300
  pCfg->standby = 0;
3,248,108✔
301
  pCfg->syncCfg.replicaNum = 0;
3,249,123✔
302
  pCfg->syncCfg.totalReplicaNum = 0;
3,246,202✔
303
  pCfg->syncCfg.changeVersion = pCreate->changeVersion;
3,245,394✔
304

305
  memset(&pCfg->syncCfg.nodeInfo, 0, sizeof(pCfg->syncCfg.nodeInfo));
3,247,390✔
306
  for (int32_t i = 0; i < pCreate->replica; ++i) {
7,498,128✔
307
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
4,248,999✔
308
    pNode->nodeId = pCreate->replicas[pCfg->syncCfg.replicaNum].id;
4,246,519✔
309
    pNode->nodePort = pCreate->replicas[pCfg->syncCfg.replicaNum].port;
4,244,025✔
310
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
4,246,300✔
311
    tstrncpy(pNode->nodeFqdn, pCreate->replicas[pCfg->syncCfg.replicaNum].fqdn, TSDB_FQDN_LEN);
4,246,142✔
312
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
4,244,268✔
313
    pCfg->syncCfg.replicaNum++;
4,250,668✔
314
  }
315
  if (pCreate->selfIndex != -1) {
3,249,772✔
316
    pCfg->syncCfg.myIndex = pCreate->selfIndex;
3,146,987✔
317
  }
318
  for (int32_t i = pCfg->syncCfg.replicaNum; i < pCreate->replica + pCreate->learnerReplica; ++i) {
3,351,115✔
319
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
102,280✔
320
    pNode->nodeId = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].id;
102,280✔
321
    pNode->nodePort = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].port;
102,280✔
322
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
102,280✔
323
    tstrncpy(pNode->nodeFqdn, pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].fqdn, TSDB_FQDN_LEN);
102,280✔
324
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
102,280✔
325
    pCfg->syncCfg.totalReplicaNum++;
102,280✔
326
  }
327
  pCfg->syncCfg.totalReplicaNum += pCfg->syncCfg.replicaNum;
3,245,623✔
328
  if (pCreate->learnerSelfIndex != -1) {
3,246,678✔
329
    pCfg->syncCfg.myIndex = pCreate->replica + pCreate->learnerSelfIndex;
102,280✔
330
  }
331
}
3,247,264✔
332

333
static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) {
3,247,934✔
334
  pCfg->vgId = pCreate->vgId;
3,247,934✔
335
  pCfg->vgVersion = pCreate->vgVersion;
3,244,787✔
336
  pCfg->dropped = 0;
3,243,312✔
337
  snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCreate->vgId);
3,247,155✔
338
}
3,246,261✔
339

340
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
3,242,110✔
341
  SCreateVnodeReq req = {0};
3,242,110✔
342
  SVnodeCfg       vnodeCfg = {0};
3,249,799✔
343
  SWrapperCfg     wrapperCfg = {0};
3,248,456✔
344
  int32_t         code = -1;
3,249,799✔
345
  char            path[TSDB_FILENAME_LEN] = {0};
3,249,799✔
346

347
  if (tDeserializeSCreateVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
3,248,234✔
348
    return TSDB_CODE_INVALID_MSG;
×
349
  }
350

351
  if (req.learnerReplica == 0) {
3,246,259✔
352
    req.learnerSelfIndex = -1;
3,145,520✔
353
  }
354

355
  dInfo(
3,246,259✔
356
      "vgId:%d, vnode management handle msgType:%s, start to create vnode, page:%d pageSize:%d buffer:%d szPage:%d "
357
      "szBuf:%" PRIu64 ", cacheLast:%d cacheLastSize:%d sstTrigger:%d tsdbPageSize:%d %d dbname:%s dbId:%" PRId64
358
      ", days:%d keep0:%d keep1:%d keep2:%d keepTimeOffset%d ssChunkSize:%d ssKeepLocal:%d ssCompact:%d tsma:%d "
359
      "precision:%d compression:%d minRows:%d maxRows:%d"
360
      ", wal fsync:%d level:%d retentionPeriod:%d retentionSize:%" PRId64 " rollPeriod:%d segSize:%" PRId64
361
      ", hash method:%d begin:%u end:%u prefix:%d surfix:%d replica:%d selfIndex:%d "
362
      "learnerReplica:%d learnerSelfIndex:%d strict:%d changeVersion:%d encryptAlgorithm:%d encryptAlgrName:%s, "
363
      "isAudit:%" PRIu8 " allowDrop:%" PRIu8,
364
      req.vgId, TMSG_INFO(pMsg->msgType), req.pages, req.pageSize, req.buffer, req.pageSize * 1024,
365
      (uint64_t)req.buffer * 1024 * 1024, req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize,
366
      req.tsdbPageSize * 1024, req.db, req.dbUid, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
367
      req.keepTimeOffset, req.ssChunkSize, req.ssKeepLocal, req.ssCompact, req.isTsma, req.precision, req.compression,
368
      req.minRows, req.maxRows, req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize,
369
      req.walRollPeriod, req.walSegmentSize, req.hashMethod, req.hashBegin, req.hashEnd, req.hashPrefix, req.hashSuffix,
370
      req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict, req.changeVersion,
371
      req.encryptAlgorithm, req.encryptAlgrName, req.isAudit, req.allowDrop);
372

373
  for (int32_t i = 0; i < req.replica; ++i) {
7,499,719✔
374
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port,
4,249,920✔
375
          req.replicas[i].id);
376
  }
377
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
3,352,079✔
378
    dInfo("vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.vgId, i, req.learnerReplicas[i].fqdn,
102,280✔
379
          req.learnerReplicas[i].port, req.replicas[i].id);
380
  }
381

382
  SReplica *pReplica = NULL;
3,249,799✔
383
  if (req.selfIndex != -1) {
3,249,799✔
384
    pReplica = &req.replicas[req.selfIndex];
3,147,519✔
385
  } else {
386
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
102,280✔
387
  }
388
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
3,249,799✔
389
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
3,249,459✔
390
    (void)tFreeSCreateVnodeReq(&req);
340✔
391

392
    code = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
393
    dError("vgId:%d, dnodeId:%d ep:%s:%u in request, ep:%s:%u in local, %s", req.vgId, pReplica->id,
×
394
           pReplica->fqdn, pReplica->port, tsLocalFqdn, tsServerPort, tstrerror(code));
395
    return code;
×
396
  }
397

398
  if (taosWaitCfgKeyLoaded() != 0) {
3,249,459✔
399
    (void)tFreeSCreateVnodeReq(&req);
×
400
    code = terrno;
×
401
    dError("vgId:%d, failed to create vnode since encrypt key is not loaded, reason:%s", req.vgId, tstrerror(code));
×
402
    return code;
×
403
  }
404

405
  if (req.encryptAlgrName[0] != '\0' && strlen(tsDataKey) == 0) {
3,248,508✔
406
    (void)tFreeSCreateVnodeReq(&req);
×
407
    code = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
408
    dError("vgId:%d, failed to create vnode since encrypt key is empty, reason:%s", req.vgId, tstrerror(code));
×
409
    return code;
×
410
  }
411

412
  vmGenerateVnodeCfg(&req, &vnodeCfg);
3,248,508✔
413

414
  vmGenerateWrapperCfg(pMgmt, &req, &wrapperCfg);
3,247,335✔
415

416
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, req.vgId, false);
3,247,792✔
417
  if (pVnode != NULL && (req.replica == 1 || !pVnode->failed)) {
3,249,213✔
418
    dError("vgId:%d, already exist", req.vgId);
11,813✔
419
    (void)tFreeSCreateVnodeReq(&req);
11,813✔
420
    vmReleaseVnode(pMgmt, pVnode);
11,813✔
421
    code = TSDB_CODE_VND_ALREADY_EXIST;
11,813✔
422
    return 0;
11,813✔
423
  }
424

425
  int32_t diskPrimary = vmGetPrimaryDisk(pMgmt, vnodeCfg.vgId);
3,237,400✔
426
  if (diskPrimary < 0) {
3,234,409✔
427
    diskPrimary = vmAllocPrimaryDisk(pMgmt, vnodeCfg.vgId);
3,234,409✔
428
  }
429
  wrapperCfg.diskPrimary = diskPrimary;
3,237,986✔
430

431
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
3,237,986✔
432

433
  if ((code = vnodeCreate(path, &vnodeCfg, diskPrimary, pMgmt->pTfs)) < 0) {
3,237,986✔
434
    dError("vgId:%d, failed to create vnode since %s", req.vgId, tstrerror(code));
×
435
    vmReleaseVnode(pMgmt, pVnode);
×
436
    vmCleanPrimaryDisk(pMgmt, req.vgId);
×
437
    (void)tFreeSCreateVnodeReq(&req);
×
438
    return code;
×
439
  }
440

441
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, true);
3,237,986✔
442
  if (pImpl == NULL) {
3,237,986✔
443
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
444
    code = terrno != 0 ? terrno : -1;
×
445
    goto _OVER;
×
446
  }
447

448
  code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl);
3,237,986✔
449
  if (code != 0) {
3,237,986✔
450
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
451
    code = terrno != 0 ? terrno : code;
×
452
    goto _OVER;
×
453
  }
454

455
  code = vnodeStart(pImpl);
3,237,986✔
456
  if (code != 0) {
3,237,986✔
457
    dError("vgId:%d, failed to start sync since %s", req.vgId, terrstr());
×
458
    goto _OVER;
×
459
  }
460

461
  code = vmWriteVnodeListToFile(pMgmt);
3,237,986✔
462
  if (code != 0) {
3,237,986✔
463
    code = terrno != 0 ? terrno : code;
×
464
    goto _OVER;
×
465
  }
466

467
_OVER:
3,237,986✔
468
  vmCleanPrimaryDisk(pMgmt, req.vgId);
3,237,986✔
469

470
  if (code != 0) {
3,237,986✔
471
    vmCloseFailedVnode(pMgmt, req.vgId);
×
472

473
    vnodeClose(pImpl);
×
474
    vnodeDestroy(0, path, pMgmt->pTfs, 0);
×
475
  } else {
476
    dInfo("vgId:%d, vnode management handle msgType:%s, end to create vnode, vnode is created", req.vgId,
3,237,986✔
477
          TMSG_INFO(pMsg->msgType));
478
  }
479

480
  (void)tFreeSCreateVnodeReq(&req);
3,237,986✔
481
  terrno = code;
3,237,986✔
482
  return code;
3,237,986✔
483
}
484

485
#ifdef USE_MOUNT
486
typedef struct {
487
  int64_t dbId;
488
  int32_t vgId;
489
  int32_t diskPrimary;
490
} SMountDbVgId;
491
extern int32_t vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo);
492
extern int32_t mndFetchSdbStables(const char *mntName, const char *path, void *output);
493

494
static int compareVnodeInfo(const void *p1, const void *p2) {
2,863✔
495
  SVnodeInfo *v1 = (SVnodeInfo *)p1;
2,863✔
496
  SVnodeInfo *v2 = (SVnodeInfo *)p2;
2,863✔
497

498
  if (v1->config.dbId == v2->config.dbId) {
2,863✔
499
    if (v1->config.vgId == v2->config.vgId) {
1,636✔
500
      return 0;
×
501
    }
502
    return v1->config.vgId > v2->config.vgId ? 1 : -1;
1,636✔
503
  }
504

505
  return v1->config.dbId > v2->config.dbId ? 1 : -1;
1,227✔
506
}
507
static int compareVgDiskPrimary(const void *p1, const void *p2) {
2,863✔
508
  SMountDbVgId *v1 = (SMountDbVgId *)p1;
2,863✔
509
  SMountDbVgId *v2 = (SMountDbVgId *)p2;
2,863✔
510

511
  if (v1->dbId == v2->dbId) {
2,863✔
512
    if (v1->vgId == v2->vgId) {
1,636✔
513
      return 0;
×
514
    }
515
    return v1->vgId > v2->vgId ? 1 : -1;
1,636✔
516
  }
517

518
  return v1->dbId > v2->dbId ? 1 : -1;
1,227✔
519
}
520

521
static int32_t vmRetrieveMountDnode(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
409✔
522
  int32_t   code = 0, lino = 0;
409✔
523
  TdFilePtr pFile = NULL;
409✔
524
  char     *content = NULL;
409✔
525
  SJson    *pJson = NULL;
409✔
526
  int64_t   size = 0;
409✔
527
  int64_t   clusterId = 0, dropped = 0, encryptScope = 0;
409✔
528
  char      file[TSDB_MOUNT_FPATH_LEN] = {0};
409✔
529
  SArray   *pDisks = NULL;
409✔
530
  // step 1: fetch clusterId from dnode.json
531
  (void)snprintf(file, sizeof(file), "%s%s%s%sdnode.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP);
409✔
532
  TAOS_CHECK_EXIT(taosStatFile(file, &size, NULL, NULL));
409✔
533
  TSDB_CHECK_NULL((pFile = taosOpenFile(file, TD_FILE_READ)), code, lino, _exit, terrno);
409✔
534
  TSDB_CHECK_NULL((content = taosMemoryMalloc(size + 1)), code, lino, _exit, terrno);
409✔
535
  if (taosReadFile(pFile, content, size) != size) {
409✔
536
    TAOS_CHECK_EXIT(terrno);
×
537
  }
538
  content[size] = '\0';
409✔
539
  pJson = tjsonParse(content);
409✔
540
  if (pJson == NULL) {
409✔
541
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
542
  }
543
  tjsonGetNumberValue(pJson, "dropped", dropped, code);
409✔
544
  TAOS_CHECK_EXIT(code);
409✔
545
  if (dropped == 1) {
409✔
546
    TAOS_CHECK_EXIT(TSDB_CODE_MND_MOUNT_DNODE_DROPPED);
×
547
  }
548
  tjsonGetNumberValue(pJson, "encryptScope", encryptScope, code);
409✔
549
  TAOS_CHECK_EXIT(code);
409✔
550
  if (encryptScope != 0) {
409✔
551
    TAOS_CHECK_EXIT(TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG);
×
552
  }
553
  tjsonGetNumberValue(pJson, "clusterId", clusterId, code);
409✔
554
  TAOS_CHECK_EXIT(code);
409✔
555
  if (clusterId == 0) {
409✔
556
    TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_CLUSTER_ID);
×
557
  }
558
  pMountInfo->clusterId = clusterId;
409✔
559
  if (content != NULL) taosMemoryFreeClear(content);
409✔
560
  if (pJson != NULL) {
409✔
561
    cJSON_Delete(pJson);
409✔
562
    pJson = NULL;
409✔
563
  }
564
  if (pFile != NULL) taosCloseFile(&pFile);
409✔
565
  // step 2: fetch dataDir from dnode/config/local.json
566
  TAOS_CHECK_EXIT(vmGetMountDisks(pMgmt, pReq->mountPath, &pDisks));
409✔
567
  int32_t nDisks = taosArrayGetSize(pDisks);
409✔
568
  if (nDisks < 1 || nDisks > TFS_MAX_DISKS) {
409✔
569
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
570
  }
571
  for (int32_t i = 0; i < nDisks; ++i) {
2,863✔
572
    SDiskCfg *pDisk = TARRAY_GET_ELEM(pDisks, i);
2,454✔
573
    if (!pMountInfo->pDisks[pDisk->level]) {
2,454✔
574
      pMountInfo->pDisks[pDisk->level] = taosArrayInit(1, sizeof(char *));
818✔
575
      if (!pMountInfo->pDisks[pDisk->level]) {
818✔
576
        TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
577
      }
578
    }
579
    char *pDir = taosStrdup(pDisk->dir);
2,454✔
580
    if (pDir == NULL) {
2,454✔
581
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
582
    }
583
    if (pDisk->primary == 1 && taosArrayGetSize(pMountInfo->pDisks[0])) {
2,454✔
584
      // put the primary disk to the first position of level 0
585
      if (!taosArrayInsert(pMountInfo->pDisks[0], 0, &pDir)) {
409✔
586
        taosMemFree(pDir);
×
587
        TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
588
      }
589
    } else if (!taosArrayPush(pMountInfo->pDisks[pDisk->level], &pDir)) {
4,090✔
590
      taosMemFree(pDir);
×
591
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
592
    }
593
  }
594
  for (int32_t i = 0; i < TFS_MAX_TIERS; ++i) {
1,636✔
595
    int32_t nDisk = taosArrayGetSize(pMountInfo->pDisks[i]);
1,227✔
596
    if (nDisk < (i == 0 ? 1 : 0) || nDisk > TFS_MAX_DISKS_PER_TIER) {
1,227✔
597
      dError("mount:%s, invalid disk number:%d at level:%d", pReq->mountName, nDisk, i);
×
598
      TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
599
    }
600
  }
601
_exit:
409✔
602
  if (content != NULL) taosMemoryFreeClear(content);
409✔
603
  if (pJson != NULL) cJSON_Delete(pJson);
409✔
604
  if (pFile != NULL) taosCloseFile(&pFile);
409✔
605
  if (pDisks != NULL) {
409✔
606
    taosArrayDestroy(pDisks);
409✔
607
    pDisks = NULL;
409✔
608
  }
609
  if (code != 0) {
409✔
610
    dError("mount:%s, failed to retrieve mount dnode at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
611
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
612
  } else {
613
    dInfo("mount:%s, success to retrieve mount dnode on dnode:%d, clusterId:%" PRId64 ", path:%s", pReq->mountName,
409✔
614
          pReq->dnodeId, pMountInfo->clusterId, pReq->mountPath);
615
  }
616
  TAOS_RETURN(code);
409✔
617
}
618

619
static int32_t vmRetrieveMountVnodes(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
409✔
620
  int32_t       code = 0, lino = 0;
409✔
621
  SWrapperCfg  *pCfgs = NULL;
409✔
622
  int32_t       numOfVnodes = 0;
409✔
623
  char          path[TSDB_MOUNT_FPATH_LEN] = {0};
409✔
624
  TdDirPtr      pDir = NULL;
409✔
625
  TdDirEntryPtr de = NULL;
409✔
626
  SVnodeMgmt    vnodeMgmt = {0};
409✔
627
  SArray       *pVgCfgs = NULL;
409✔
628
  SArray       *pDbInfos = NULL;
409✔
629
  SArray       *pDiskPrimarys = NULL;
409✔
630

631
  snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
409✔
632
  vnodeMgmt.path = path;
409✔
633
  TAOS_CHECK_EXIT(vmGetVnodeListFromFile(&vnodeMgmt, &pCfgs, &numOfVnodes));
409✔
634
  dInfo("mount:%s, num of vnodes is %d in path:%s", pReq->mountName, numOfVnodes, vnodeMgmt.path);
409✔
635
  TSDB_CHECK_NULL((pVgCfgs = taosArrayInit_s(sizeof(SVnodeInfo), numOfVnodes)), code, lino, _exit, terrno);
409✔
636
  TSDB_CHECK_NULL((pDiskPrimarys = taosArrayInit(numOfVnodes, sizeof(SMountDbVgId))), code, lino, _exit, terrno);
409✔
637

638
  int32_t nDiskLevel0 = taosArrayGetSize(pMountInfo->pDisks[0]);
409✔
639
  int32_t nVgDropped = 0, j = 0;
409✔
640
  for (int32_t i = 0; i < numOfVnodes; ++i) {
2,045✔
641
    SWrapperCfg *pCfg = &pCfgs[i];
1,636✔
642
    // in order to support multi-tier disk, the pCfg->path should be adapted according to the diskPrimary firstly
643
    if (nDiskLevel0 > 1) {
1,636✔
644
      char *pDir = taosArrayGet(pMountInfo->pDisks[0], pCfg->diskPrimary);
1,636✔
645
      if (!pDir) TAOS_CHECK_EXIT(TSDB_CODE_INTERNAL_ERROR);
1,636✔
646
      (void)snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%svnode%d", *(char **)pDir, TD_DIRSEP, TD_DIRSEP,
1,636✔
647
                     pCfg->vgId);
648
    }
649
    dInfo("mount:%s, vnode path:%s, dropped:%" PRIi8, pReq->mountName, pCfg->path, pCfg->dropped);
1,636✔
650
    if (pCfg->dropped) {
1,636✔
651
      ++nVgDropped;
×
652
      continue;
×
653
    }
654
    if (!taosCheckAccessFile(pCfg->path, TD_FILE_ACCESS_EXIST_OK | TD_FILE_ACCESS_READ_OK | TD_FILE_ACCESS_WRITE_OK)) {
1,636✔
655
      dError("mount:%s, vnode path:%s, no r/w authority", pReq->mountName, pCfg->path);
×
656
      TAOS_CHECK_EXIT(TSDB_CODE_MND_NO_RIGHTS);
×
657
    }
658
    SVnodeInfo *pInfo = TARRAY_GET_ELEM(pVgCfgs, j++);
1,636✔
659
    TAOS_CHECK_EXIT(vnodeLoadInfo(pCfg->path, pInfo));
1,636✔
660
    if (pInfo->config.syncCfg.replicaNum > 1) {
1,636✔
661
      dError("mount:%s, vnode path:%s, invalid replica:%d", pReq->mountName, pCfg->path,
×
662
             pInfo->config.syncCfg.replicaNum);
663
      TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_REPLICA);
×
664
    } else if (pInfo->config.vgId != pCfg->vgId) {
1,636✔
665
      dError("mount:%s, vnode path:%s, vgId:%d not match:%d", pReq->mountName, pCfg->path, pInfo->config.vgId,
×
666
             pCfg->vgId);
667
      TAOS_CHECK_EXIT(TSDB_CODE_FILE_CORRUPTED);
×
668
    } else if (pInfo->config.tdbEncryptData.encryptAlgrName[0] != '\0' ||
1,636✔
669
               pInfo->config.tsdbCfg.encryptData.encryptAlgrName[0] != '\0' ||
1,636✔
670
               pInfo->config.walCfg.encryptData.encryptAlgrName[0] != '\0') {
1,636✔
671
      dError("mount:%s, vnode path:%s, invalid encrypt algorithm, tdb:%s wal:%s tsdb:%s", pReq->mountName, pCfg->path,
×
672
             pInfo->config.tdbEncryptData.encryptAlgrName, pInfo->config.walCfg.encryptData.encryptAlgrName,
673
             pInfo->config.tsdbCfg.encryptData.encryptAlgrName);
674
      TAOS_CHECK_EXIT(TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG);
×
675
    }
676
    SMountDbVgId dbVgId = {.dbId = pInfo->config.dbId, .vgId = pInfo->config.vgId, .diskPrimary = pCfg->diskPrimary};
1,636✔
677
    TSDB_CHECK_NULL(taosArrayPush(pDiskPrimarys, &dbVgId), code, lino, _exit, terrno);
1,636✔
678
  }
679
  if (nVgDropped > 0) {
409✔
680
    dInfo("mount:%s, %d vnodes are dropped", pReq->mountName, nVgDropped);
×
681
    int32_t nVgToDrop = taosArrayGetSize(pVgCfgs) - nVgDropped;
×
682
    if (nVgToDrop > 0) taosArrayRemoveBatch(pVgCfgs, nVgToDrop - 1, nVgToDrop, NULL);
×
683
  }
684
  int32_t nVgCfg = taosArrayGetSize(pVgCfgs);
409✔
685
  int32_t nDiskPrimary = taosArrayGetSize(pDiskPrimarys);
409✔
686
  if (nVgCfg != nDiskPrimary) {
409✔
687
    dError("mount:%s, nVgCfg:%d not match nDiskPrimary:%d", pReq->mountName, nVgCfg, nDiskPrimary);
×
688
    TAOS_CHECK_EXIT(TSDB_CODE_APP_ERROR);
×
689
  }
690
  if (nVgCfg > 1) {
409✔
691
    taosArraySort(pVgCfgs, compareVnodeInfo);
409✔
692
    taosArraySort(pDiskPrimarys, compareVgDiskPrimary);
409✔
693
  }
694

695
  int64_t clusterId = pMountInfo->clusterId;
409✔
696
  int64_t dbId = 0, vgId = 0, nDb = 0;
409✔
697
  for (int32_t i = 0; i < nVgCfg; ++i) {
1,497✔
698
    SVnodeInfo *pInfo = TARRAY_GET_ELEM(pVgCfgs, i);
1,225✔
699
    if (clusterId != pInfo->config.syncCfg.nodeInfo->clusterId) {
1,225✔
700
      dError("mount:%s, clusterId:%" PRId64 " not match:%" PRId64, pReq->mountName, clusterId,
137✔
701
             pInfo->config.syncCfg.nodeInfo->clusterId);
702
      TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_CLUSTER_ID);
137✔
703
    }
704
    if (dbId != pInfo->config.dbId) {
1,088✔
705
      dbId = pInfo->config.dbId;
544✔
706
      ++nDb;
544✔
707
    }
708
    if (vgId == pInfo->config.vgId) {
1,088✔
709
      TAOS_CHECK_EXIT(TSDB_CODE_FILE_CORRUPTED);
×
710
    } else {
711
      vgId = pInfo->config.vgId;
1,088✔
712
    }
713
  }
714

715
  if (nDb > 0) {
272✔
716
    TSDB_CHECK_NULL((pDbInfos = taosArrayInit_s(sizeof(SMountDbInfo), nDb)), code, lino, _exit, terrno);
272✔
717
    int32_t dbIdx = -1;
272✔
718
    for (int32_t i = 0; i < nVgCfg; ++i) {
1,360✔
719
      SVnodeInfo   *pVgCfg = TARRAY_GET_ELEM(pVgCfgs, i);
1,088✔
720
      SMountDbVgId *pDiskPrimary = TARRAY_GET_ELEM(pDiskPrimarys, i);
1,088✔
721
      SMountDbInfo *pDbInfo = NULL;
1,088✔
722
      if (i == 0 || ((SMountDbInfo *)TARRAY_GET_ELEM(pDbInfos, dbIdx))->dbId != pVgCfg->config.dbId) {
1,088✔
723
        pDbInfo = TARRAY_GET_ELEM(pDbInfos, ++dbIdx);
544✔
724
        pDbInfo->dbId = pVgCfg->config.dbId;
544✔
725
        snprintf(pDbInfo->dbName, sizeof(pDbInfo->dbName), "%s", pVgCfg->config.dbname);
544✔
726
        TSDB_CHECK_NULL((pDbInfo->pVgs = taosArrayInit(nVgCfg / nDb, sizeof(SMountVgInfo))), code, lino, _exit, terrno);
544✔
727
      } else {
728
        pDbInfo = TARRAY_GET_ELEM(pDbInfos, dbIdx);
544✔
729
      }
730
      SMountVgInfo vgInfo = {
1,088✔
731
          .diskPrimary = pDiskPrimary->diskPrimary,
1,088✔
732
          .vgId = pVgCfg->config.vgId,
1,088✔
733
          .dbId = pVgCfg->config.dbId,
1,088✔
734
          .cacheLastSize = pVgCfg->config.cacheLastSize,
1,088✔
735
          .szPage = pVgCfg->config.szPage,
1,088✔
736
          .szCache = pVgCfg->config.szCache,
1,088✔
737
          .szBuf = pVgCfg->config.szBuf,
1,088✔
738
          .cacheLast = pVgCfg->config.cacheLast,
1,088✔
739
          .standby = pVgCfg->config.standby,
1,088✔
740
          .hashMethod = pVgCfg->config.hashMethod,
1,088✔
741
          .hashBegin = pVgCfg->config.hashBegin,
1,088✔
742
          .hashEnd = pVgCfg->config.hashEnd,
1,088✔
743
          .hashPrefix = pVgCfg->config.hashPrefix,
1,088✔
744
          .hashSuffix = pVgCfg->config.hashSuffix,
1,088✔
745
          .sttTrigger = pVgCfg->config.sttTrigger,
1,088✔
746
          .replications = pVgCfg->config.syncCfg.replicaNum,
1,088✔
747
          .precision = pVgCfg->config.tsdbCfg.precision,
1,088✔
748
          .compression = pVgCfg->config.tsdbCfg.compression,
1,088✔
749
          .slLevel = pVgCfg->config.tsdbCfg.slLevel,
1,088✔
750
          .daysPerFile = pVgCfg->config.tsdbCfg.days,
1,088✔
751
          .keep0 = pVgCfg->config.tsdbCfg.keep0,
1,088✔
752
          .keep1 = pVgCfg->config.tsdbCfg.keep1,
1,088✔
753
          .keep2 = pVgCfg->config.tsdbCfg.keep2,
1,088✔
754
          .keepTimeOffset = pVgCfg->config.tsdbCfg.keepTimeOffset,
1,088✔
755
          .minRows = pVgCfg->config.tsdbCfg.minRows,
1,088✔
756
          .maxRows = pVgCfg->config.tsdbCfg.maxRows,
1,088✔
757
          .tsdbPageSize = pVgCfg->config.tsdbPageSize / 1024,
1,088✔
758
          .ssChunkSize = pVgCfg->config.ssChunkSize,
1,088✔
759
          .ssKeepLocal = pVgCfg->config.ssKeepLocal,
1,088✔
760
          .ssCompact = pVgCfg->config.ssCompact,
1,088✔
761
          .walFsyncPeriod = pVgCfg->config.walCfg.fsyncPeriod,
1,088✔
762
          .walRetentionPeriod = pVgCfg->config.walCfg.retentionPeriod,
1,088✔
763
          .walRollPeriod = pVgCfg->config.walCfg.rollPeriod,
1,088✔
764
          .walRetentionSize = pVgCfg->config.walCfg.retentionSize,
1,088✔
765
          .walSegSize = pVgCfg->config.walCfg.segSize,
1,088✔
766
          .walLevel = pVgCfg->config.walCfg.level,
1,088✔
767
          .isAudit = pVgCfg->config.isAudit,
1,088✔
768
          .allowDrop = pVgCfg->config.allowDrop,
1,088✔
769
          .secureDelete = pVgCfg->config.secureDelete,
1,088✔
770
          //.encryptAlgorithm = pVgCfg->config.walCfg.encryptAlgorithm,
771
          .committed = pVgCfg->state.committed,
1,088✔
772
          .commitID = pVgCfg->state.commitID,
1,088✔
773
          .commitTerm = pVgCfg->state.commitTerm,
1,088✔
774
          .numOfSTables = pVgCfg->config.vndStats.numOfSTables,
1,088✔
775
          .numOfCTables = pVgCfg->config.vndStats.numOfCTables,
1,088✔
776
          .numOfNTables = pVgCfg->config.vndStats.numOfNTables,
1,088✔
777
      };
778
      TSDB_CHECK_NULL(taosArrayPush(pDbInfo->pVgs, &vgInfo), code, lino, _exit, terrno);
2,176✔
779
    }
780
  }
781

782
  pMountInfo->pDbs = pDbInfos;
272✔
783

784
_exit:
409✔
785
  if (code != 0) {
409✔
786
    dError("mount:%s, failed to retrieve mount vnode at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
137✔
787
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
788
  }
789
  taosArrayDestroy(pDiskPrimarys);
409✔
790
  taosArrayDestroy(pVgCfgs);
409✔
791
  taosMemoryFreeClear(pCfgs);
409✔
792
  TAOS_RETURN(code);
409✔
793
}
794

795
/**
796
 *   Retrieve the stables from vnode meta.
797
 */
798
static int32_t vmRetrieveMountStbs(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
272✔
799
  int32_t code = 0, lino = 0;
272✔
800
  char    path[TSDB_MOUNT_FPATH_LEN] = {0};
272✔
801
  int32_t nDb = taosArrayGetSize(pMountInfo->pDbs);
272✔
802
  SArray *suidList = NULL;
272✔
803
  SArray *pCols = NULL;
272✔
804
  SArray *pTags = NULL;
272✔
805
  SArray *pColExts = NULL;
272✔
806
  SArray *pTagExts = NULL;
272✔
807

808
  snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
272✔
809
  for (int32_t i = 0; i < nDb; ++i) {
816✔
810
    SMountDbInfo *pDbInfo = TARRAY_GET_ELEM(pMountInfo->pDbs, i);
544✔
811
    int32_t       nVg = taosArrayGetSize(pDbInfo->pVgs);
544✔
812
    for (int32_t j = 0; j < nVg; ++j) {
544✔
813
      SMountVgInfo *pVgInfo = TARRAY_GET_ELEM(pDbInfo->pVgs, j);
544✔
814
      SVnode        vnode = {
544✔
815
                 .config.vgId = pVgInfo->vgId,
544✔
816
                 .config.dbId = pVgInfo->dbId,
544✔
817
                 .config.cacheLastSize = pVgInfo->cacheLastSize,
544✔
818
                 .config.szPage = pVgInfo->szPage,
544✔
819
                 .config.szCache = pVgInfo->szCache,
544✔
820
                 .config.szBuf = pVgInfo->szBuf,
544✔
821
                 .config.cacheLast = pVgInfo->cacheLast,
544✔
822
                 .config.standby = pVgInfo->standby,
544✔
823
                 .config.hashMethod = pVgInfo->hashMethod,
544✔
824
                 .config.hashBegin = pVgInfo->hashBegin,
544✔
825
                 .config.hashEnd = pVgInfo->hashEnd,
544✔
826
                 .config.hashPrefix = pVgInfo->hashPrefix,
544✔
827
                 .config.hashSuffix = pVgInfo->hashSuffix,
544✔
828
                 .config.sttTrigger = pVgInfo->sttTrigger,
544✔
829
                 .config.syncCfg.replicaNum = pVgInfo->replications,
544✔
830
                 .config.tsdbCfg.precision = pVgInfo->precision,
544✔
831
                 .config.tsdbCfg.compression = pVgInfo->compression,
544✔
832
                 .config.tsdbCfg.slLevel = pVgInfo->slLevel,
544✔
833
                 .config.tsdbCfg.days = pVgInfo->daysPerFile,
544✔
834
                 .config.tsdbCfg.keep0 = pVgInfo->keep0,
544✔
835
                 .config.tsdbCfg.keep1 = pVgInfo->keep1,
544✔
836
                 .config.tsdbCfg.keep2 = pVgInfo->keep2,
544✔
837
                 .config.tsdbCfg.keepTimeOffset = pVgInfo->keepTimeOffset,
544✔
838
                 .config.tsdbCfg.minRows = pVgInfo->minRows,
544✔
839
                 .config.tsdbCfg.maxRows = pVgInfo->maxRows,
544✔
840
                 .config.tsdbPageSize = pVgInfo->tsdbPageSize,
544✔
841
                 .config.ssChunkSize = pVgInfo->ssChunkSize,
544✔
842
                 .config.ssKeepLocal = pVgInfo->ssKeepLocal,
544✔
843
                 .config.ssCompact = pVgInfo->ssCompact,
544✔
844
                 .config.isAudit = pVgInfo->isAudit,
544✔
845
                 .config.allowDrop = pVgInfo->allowDrop,
544✔
846
                 .config.secureDelete = pVgInfo->secureDelete,
544✔
847
                 .config.walCfg.fsyncPeriod = pVgInfo->walFsyncPeriod,
544✔
848
                 .config.walCfg.retentionPeriod = pVgInfo->walRetentionPeriod,
544✔
849
                 .config.walCfg.rollPeriod = pVgInfo->walRollPeriod,
544✔
850
                 .config.walCfg.retentionSize = pVgInfo->walRetentionSize,
544✔
851
                 .config.walCfg.segSize = pVgInfo->walSegSize,
544✔
852
                 .config.walCfg.level = pVgInfo->walLevel,
544✔
853
          //.config.walCfg.encryptAlgorithm = pVgInfo->encryptAlgorithm,
854
                 .diskPrimary = pVgInfo->diskPrimary,
544✔
855
      };
856
      void *vnodePath = taosArrayGet(pMountInfo->pDisks[0], pVgInfo->diskPrimary);
544✔
857
      snprintf(path, sizeof(path), "%s%s%s%svnode%d", *(char **)vnodePath, TD_DIRSEP, dmNodeName(VNODE), TD_DIRSEP,
544✔
858
               pVgInfo->vgId);
859
      vnode.path = path;
544✔
860

861
      int32_t rollback = vnodeShouldRollback(&vnode);
544✔
862
      if ((code = metaOpen(&vnode, &vnode.pMeta, rollback)) != 0) {
544✔
863
        dError("mount:%s, failed to retrieve stbs of vnode:%d for db:%" PRId64 " on dnode:%d since %s, path:%s",
×
864
               pReq->mountName, pVgInfo->vgId, pVgInfo->dbId, pReq->dnodeId, tstrerror(code), path);
865
        TAOS_CHECK_EXIT(code);
×
866
      } else {
867
        dInfo("mount:%s, success to retrieve stbs of vnode:%d for db:%" PRId64 " on dnode:%d, path:%s", pReq->mountName,
544✔
868
              pVgInfo->vgId, pVgInfo->dbId, pReq->dnodeId, path);
869

870
        SMetaReader mr = {0};
544✔
871
        tb_uid_t    suid = 0;
544✔
872
        SMeta      *pMeta = vnode.pMeta;
544✔
873

874
        metaReaderDoInit(&mr, pMeta, META_READER_LOCK);
544✔
875
        if (!suidList && !(suidList = taosArrayInit(1, sizeof(tb_uid_t)))) {
544✔
876
          TSDB_CHECK_CODE(terrno, lino, _exit0);
×
877
        }
878
        taosArrayClear(suidList);
544✔
879
        TSDB_CHECK_CODE(vnodeGetStbIdList(&vnode, 0, suidList), lino, _exit0);
544✔
880
        dInfo("mount:%s, vnode:%d, db:%" PRId64 ", stbs num:%d on dnode:%d", pReq->mountName, pVgInfo->vgId,
544✔
881
              pVgInfo->dbId, (int32_t)taosArrayGetSize(suidList), pReq->dnodeId);
882
        int32_t nStbs = taosArrayGetSize(suidList);
544✔
883
        if (!pDbInfo->pStbs && !(pDbInfo->pStbs = taosArrayInit(nStbs, sizeof(void *)))) {
544✔
884
          TSDB_CHECK_CODE(terrno, lino, _exit0);
×
885
        }
886
        for (int32_t i = 0; i < nStbs; ++i) {
2,176✔
887
          suid = *(tb_uid_t *)taosArrayGet(suidList, i);
1,632✔
888
          dInfo("mount:%s, vnode:%d, db:%" PRId64 ", stb suid:%" PRIu64 " on dnode:%d", pReq->mountName, pVgInfo->vgId,
1,632✔
889
                pVgInfo->dbId, suid, pReq->dnodeId);
890
          if ((code = metaReaderGetTableEntryByUidCache(&mr, suid)) < 0) {
1,632✔
891
            TSDB_CHECK_CODE(code, lino, _exit0);
×
892
          }
893
          if (mr.me.uid != suid || mr.me.type != TSDB_SUPER_TABLE ||
1,632✔
894
              mr.me.colCmpr.nCols != mr.me.stbEntry.schemaRow.nCols) {
1,632✔
895
            dError("mount:%s, vnode:%d, db:%" PRId64 ", stb info not match, suid:%" PRIu64 " expected:%" PRIu64
×
896
                   ", type:%" PRIi8 " expected:%d, nCmprCols:%d nCols:%d on dnode:%d",
897
                   pReq->mountName, pVgInfo->vgId, pVgInfo->dbId, mr.me.uid, suid, mr.me.type, TSDB_SUPER_TABLE,
898
                   mr.me.colCmpr.nCols, mr.me.stbEntry.schemaRow.nCols, pReq->dnodeId);
899
            TSDB_CHECK_CODE(TSDB_CODE_FILE_CORRUPTED, lino, _exit0);
×
900
          }
901
          SMountStbInfo stbInfo = {
1,632✔
902
              .req.source = TD_REQ_FROM_APP,
903
              .req.suid = suid,
904
              .req.colVer = mr.me.stbEntry.schemaRow.version,
1,632✔
905
              .req.tagVer = mr.me.stbEntry.schemaTag.version,
1,632✔
906
              .req.numOfColumns = mr.me.stbEntry.schemaRow.nCols,
1,632✔
907
              .req.numOfTags = mr.me.stbEntry.schemaTag.nCols,
1,632✔
908
              .req.virtualStb = TABLE_IS_VIRTUAL(mr.me.flags) ? 1 : 0,
1,632✔
909
          };
910
          snprintf(stbInfo.req.name, sizeof(stbInfo.req.name), "%s", mr.me.name);
1,632✔
911
          if (!pCols && !(pCols = taosArrayInit(stbInfo.req.numOfColumns, sizeof(SFieldWithOptions)))) {
1,632✔
912
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
913
          }
914
          if (!pTags && !(pTags = taosArrayInit(stbInfo.req.numOfTags, sizeof(SField)))) {
1,632✔
915
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
916
          }
917

918
          if (!pColExts && !(pColExts = taosArrayInit(stbInfo.req.numOfColumns, sizeof(col_id_t)))) {
1,632✔
919
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
920
          }
921
          if (!pTagExts && !(pTagExts = taosArrayInit(stbInfo.req.numOfTags, sizeof(col_id_t)))) {
1,632✔
922
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
923
          }
924
          taosArrayClear(pCols);
1,632✔
925
          taosArrayClear(pTags);
1,632✔
926
          taosArrayClear(pColExts);
1,632✔
927
          taosArrayClear(pTagExts);
1,632✔
928
          stbInfo.req.pColumns = pCols;
1,632✔
929
          stbInfo.req.pTags = pTags;
1,632✔
930
          stbInfo.pColExts = pColExts;
1,632✔
931
          stbInfo.pTagExts = pTagExts;
1,632✔
932

933
          for (int32_t c = 0; c < stbInfo.req.numOfColumns; ++c) {
9,792✔
934
            SSchema          *pSchema = mr.me.stbEntry.schemaRow.pSchema + c;
8,160✔
935
            SColCmpr         *pColComp = mr.me.colCmpr.pColCmpr + c;
8,160✔
936
            SFieldWithOptions col = {
8,160✔
937
                .type = pSchema->type,
8,160✔
938
                .flags = pSchema->flags,
8,160✔
939
                .bytes = pSchema->bytes,
8,160✔
940
                .compress = pColComp->alg,
8,160✔
941
            };
942
            (void)snprintf(col.name, sizeof(col.name), "%s", pSchema->name);
8,160✔
943
            if (pSchema->colId != pColComp->id) {
8,160✔
944
              TSDB_CHECK_CODE(TSDB_CODE_FILE_CORRUPTED, lino, _exit0);
×
945
            }
946
            if (mr.me.pExtSchemas) {
8,160✔
947
              col.typeMod = (mr.me.pExtSchemas + c)->typeMod;
×
948
            }
949
            TSDB_CHECK_NULL(taosArrayPush(pCols, &col), code, lino, _exit0, terrno);
8,160✔
950
            TSDB_CHECK_NULL(taosArrayPush(pColExts, &pSchema->colId), code, lino, _exit0, terrno);
16,320✔
951
          }
952
          for (int32_t t = 0; t < stbInfo.req.numOfTags; ++t) {
3,808✔
953
            SSchema *pSchema = mr.me.stbEntry.schemaTag.pSchema + t;
2,176✔
954
            SField   tag = {
2,176✔
955
                  .type = pSchema->type,
2,176✔
956
                  .flags = pSchema->flags,
2,176✔
957
                  .bytes = pSchema->bytes,
2,176✔
958
            };
959
            (void)snprintf(tag.name, sizeof(tag.name), "%s", pSchema->name);
2,176✔
960
            TSDB_CHECK_NULL(taosArrayPush(pTags, &tag), code, lino, _exit0, terrno);
2,176✔
961
            TSDB_CHECK_NULL(taosArrayPush(pTagExts, &pSchema->colId), code, lino, _exit0, terrno);
4,352✔
962
          }
963
          tDecoderClear(&mr.coder);
1,632✔
964

965
          // serialize the SMountStbInfo
966
          int32_t firstPartLen = 0;
1,632✔
967
          int32_t msgLen = tSerializeSMountStbInfo(NULL, 0, &firstPartLen, &stbInfo);
1,632✔
968
          if (msgLen <= 0) {
1,632✔
969
            TSDB_CHECK_CODE(msgLen < 0 ? msgLen : TSDB_CODE_INTERNAL_ERROR, lino, _exit0);
×
970
          }
971
          void *pBuf = taosMemoryMalloc((sizeof(int32_t) << 1) + msgLen);  // totalLen(4)|1stPartLen(4)|1stPart|2ndPart
1,632✔
972
          if (!pBuf) TSDB_CHECK_CODE(TSDB_CODE_OUT_OF_MEMORY, lino, _exit0);
1,632✔
973
          *(int32_t *)pBuf = (sizeof(int32_t) << 1) + msgLen;
1,632✔
974
          *(int32_t *)POINTER_SHIFT(pBuf, sizeof(int32_t)) = firstPartLen;
1,632✔
975
          if (tSerializeSMountStbInfo(POINTER_SHIFT(pBuf, (sizeof(int32_t) << 1)), msgLen, NULL, &stbInfo) <= 0) {
1,632✔
976
            taosMemoryFree(pBuf);
×
977
            TSDB_CHECK_CODE(msgLen < 0 ? msgLen : TSDB_CODE_INTERNAL_ERROR, lino, _exit0);
×
978
          }
979
          if (!taosArrayPush(pDbInfo->pStbs, &pBuf)) {
3,264✔
980
            taosMemoryFree(pBuf);
×
981
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
982
          }
983
        }
984
      _exit0:
544✔
985
        metaReaderClear(&mr);
544✔
986
        metaClose(&vnode.pMeta);
544✔
987
        TAOS_CHECK_EXIT(code);
544✔
988
      }
989
      break;  // retrieve stbs from one vnode is enough
544✔
990
    }
991
  }
992
_exit:
272✔
993
  if (code != 0) {
272✔
994
    dError("mount:%s, failed to retrieve mount stbs at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
995
           pReq->dnodeId, tstrerror(code), path);
996
  }
997
  taosArrayDestroy(suidList);
272✔
998
  taosArrayDestroy(pCols);
272✔
999
  taosArrayDestroy(pTags);
272✔
1000
  taosArrayDestroy(pColExts);
272✔
1001
  taosArrayDestroy(pTagExts);
272✔
1002
  TAOS_RETURN(code);
272✔
1003
}
1004

1005
int32_t vmMountCheckRunning(const char *mountName, const char *mountPath, TdFilePtr *pFile, int32_t retryLimit) {
1,223✔
1006
  int32_t code = 0, lino = 0;
1,223✔
1007
  int32_t retryTimes = 0;
1,223✔
1008
  char    filepath[PATH_MAX] = {0};
1,223✔
1009
  (void)snprintf(filepath, sizeof(filepath), "%s%s.running", mountPath, TD_DIRSEP);
1,223✔
1010
  TSDB_CHECK_NULL((*pFile = taosOpenFile(
1,223✔
1011
                       filepath, TD_FILE_CREATE | TD_FILE_READ | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_CLOEXEC)),
1012
                  code, lino, _exit, terrno);
1013
  int32_t ret = 0;
1,223✔
1014
  do {
1015
    ret = taosLockFile(*pFile);
1,497✔
1016
    if (ret == 0) break;
1,497✔
1017
    taosMsleep(1000);
411✔
1018
    ++retryTimes;
411✔
1019
    dError("mount:%s, failed to lock file:%s since %s, retryTimes:%d", mountName, filepath, tstrerror(ret), retryTimes);
411✔
1020
  } while (retryTimes < retryLimit);
411✔
1021
  TAOS_CHECK_EXIT(ret);
1,223✔
1022
_exit:
1,223✔
1023
  if (code != 0) {
1,223✔
1024
    (void)taosCloseFile(pFile);
137✔
1025
    *pFile = NULL;
137✔
1026
    dError("mount:%s, failed to check running at line %d since %s, path:%s", mountName, lino, tstrerror(code),
137✔
1027
           filepath);
1028
  }
1029
  TAOS_RETURN(code);
1,223✔
1030
}
1031

1032
static int32_t vmRetrieveMountPreCheck(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
820✔
1033
  int32_t code = 0, lino = 0;
820✔
1034
  char    path[TSDB_MOUNT_FPATH_LEN] = {0};
820✔
1035
  TSDB_CHECK_CONDITION(taosCheckAccessFile(pReq->mountPath, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
820✔
1036
  TAOS_CHECK_EXIT(vmMountCheckRunning(pReq->mountName, pReq->mountPath, &pMountInfo->pFile, 3));
683✔
1037
  (void)snprintf(path, sizeof(path), "%s%s%s%sdnode.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP);
546✔
1038
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
546✔
1039
  (void)snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(MNODE));
409✔
1040
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
409✔
1041
  (void)snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
409✔
1042
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
409✔
1043
  (void)snprintf(path, sizeof(path), "%s%s%s%sconfig%slocal.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP,
409✔
1044
           TD_DIRSEP);
1045
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
409✔
1046
_exit:
820✔
1047
  if (code != 0) {
820✔
1048
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
411✔
1049
           pReq->dnodeId, tstrerror(code), path);
1050
  }
1051
  TAOS_RETURN(code);
820✔
1052
}
1053

1054
static int32_t vmRetrieveMountPathImpl(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, SRetrieveMountPathReq *pReq,
820✔
1055
                                       SMountInfo *pMountInfo) {
1056
  int32_t code = 0, lino = 0;
820✔
1057
  pMountInfo->dnodeId = pReq->dnodeId;
820✔
1058
  pMountInfo->mountUid = pReq->mountUid;
820✔
1059
  (void)snprintf(pMountInfo->mountName, sizeof(pMountInfo->mountName), "%s", pReq->mountName);
820✔
1060
  (void)snprintf(pMountInfo->mountPath, sizeof(pMountInfo->mountPath), "%s", pReq->mountPath);
820✔
1061
  pMountInfo->ignoreExist = pReq->ignoreExist;
820✔
1062
  pMountInfo->valLen = pReq->valLen;
820✔
1063
  pMountInfo->pVal = pReq->pVal;
820✔
1064
  TAOS_CHECK_EXIT(vmRetrieveMountPreCheck(pMgmt, pReq, pMountInfo));
820✔
1065
  TAOS_CHECK_EXIT(vmRetrieveMountDnode(pMgmt, pReq, pMountInfo));
409✔
1066
  TAOS_CHECK_EXIT(vmRetrieveMountVnodes(pMgmt, pReq, pMountInfo));
409✔
1067
  TAOS_CHECK_EXIT(vmRetrieveMountStbs(pMgmt, pReq, pMountInfo));
272✔
1068
_exit:
272✔
1069
  if (code != 0) {
820✔
1070
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
548✔
1071
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
1072
  }
1073
  TAOS_RETURN(code);
820✔
1074
}
1075

1076
int32_t vmProcessRetrieveMountPathReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
820✔
1077
  int32_t               code = 0, lino = 0;
820✔
1078
  int32_t               rspCode = 0;
820✔
1079
  SVnodeMgmt            vndMgmt = {0};
820✔
1080
  SMountInfo            mountInfo = {0};
820✔
1081
  void                 *pBuf = NULL;
820✔
1082
  int32_t               bufLen = 0;
820✔
1083
  SRetrieveMountPathReq req = {0};
820✔
1084

1085
  vndMgmt = *pMgmt;
820✔
1086
  vndMgmt.path = NULL;
820✔
1087
  TAOS_CHECK_GOTO(tDeserializeSRetrieveMountPathReq(pMsg->pCont, pMsg->contLen, &req), &lino, _end);
820✔
1088
  dInfo("mount:%s, start to retrieve path:%s", req.mountName, req.mountPath);
820✔
1089
  TAOS_CHECK_GOTO(vmRetrieveMountPathImpl(&vndMgmt, pMsg, &req, &mountInfo), &lino, _end);
820✔
1090
_end:
820✔
1091
  TSDB_CHECK_CONDITION((bufLen = tSerializeSMountInfo(NULL, 0, &mountInfo)) >= 0, rspCode, lino, _exit, bufLen);
820✔
1092
  TSDB_CHECK_CONDITION((pBuf = rpcMallocCont(bufLen)), rspCode, lino, _exit, terrno);
820✔
1093
  TSDB_CHECK_CONDITION((bufLen = tSerializeSMountInfo(pBuf, bufLen, &mountInfo)) >= 0, rspCode, lino, _exit, bufLen);
820✔
1094
  pMsg->info.rsp = pBuf;
820✔
1095
  pMsg->info.rspLen = bufLen;
820✔
1096
_exit:
820✔
1097
  if (rspCode != 0) {
820✔
1098
    // corner case: if occurs, the client will not receive the response, and the client should be killed manually
1099
    dError("mount:%s, failed to retrieve mount at line %d since %s, dnode:%d, path:%s", req.mountName, lino,
×
1100
           tstrerror(rspCode), req.dnodeId, req.mountPath);
1101
    rpcFreeCont(pBuf);
×
1102
    code = rspCode;
×
1103
  } else if (code != 0) {
820✔
1104
    // the client would receive the response with error msg
1105
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", req.mountName, lino,
548✔
1106
           req.dnodeId, tstrerror(code), req.mountPath);
1107
  } else {
1108
    int32_t nVgs = 0;
272✔
1109
    int32_t nDbs = taosArrayGetSize(mountInfo.pDbs);
272✔
1110
    for (int32_t i = 0; i < nDbs; ++i) {
816✔
1111
      SMountDbInfo *pDb = TARRAY_GET_ELEM(mountInfo.pDbs, i);
544✔
1112
      nVgs += taosArrayGetSize(pDb->pVgs);
544✔
1113
    }
1114
    dInfo("mount:%s, success to retrieve mount, nDbs:%d, nVgs:%d, path:%s", req.mountName, nDbs, nVgs, req.mountPath);
272✔
1115
  }
1116
  taosMemFreeClear(vndMgmt.path);
820✔
1117
  tFreeMountInfo(&mountInfo, false);
820✔
1118
  TAOS_RETURN(code);
820✔
1119
}
1120

1121
static int32_t vmMountVnode(SVnodeMgmt *pMgmt, const char *path, SVnodeCfg *pCfg, int32_t diskPrimary,
1,088✔
1122
                            SMountVnodeReq *req, STfs *pMountTfs) {
1123
  int32_t    code = 0;
1,088✔
1124
  SVnodeInfo info = {0};
1,088✔
1125
  char       hostDir[TSDB_FILENAME_LEN] = {0};
1,088✔
1126
  char       mountDir[TSDB_FILENAME_LEN] = {0};
1,088✔
1127
  char       mountVnode[32] = {0};
1,088✔
1128

1129
  if ((code = vnodeCheckCfg(pCfg)) < 0) {
1,088✔
1130
    vError("vgId:%d, mount:%s, failed to mount vnode since:%s", pCfg->vgId, req->mountName, tstrerror(code));
×
1131
    return code;
×
1132
  }
1133

1134
  vnodeGetPrimaryDir(path, 0, pMgmt->pTfs, hostDir, TSDB_FILENAME_LEN);
1,088✔
1135
  if ((code = taosMkDir(hostDir))) {
1,088✔
1136
    vError("vgId:%d, mount:%s, failed to prepare vnode dir since %s, host path: %s", pCfg->vgId, req->mountName,
×
1137
           tstrerror(code), hostDir);
1138
    return code;
×
1139
  }
1140

1141
  info.config = *pCfg;  // copy the config
1,088✔
1142
  info.state.committed = req->committed;
1,088✔
1143
  info.state.commitID = req->commitID;
1,088✔
1144
  info.state.commitTerm = req->commitTerm;
1,088✔
1145
  info.state.applied = req->committed;
1,088✔
1146
  info.state.applyTerm = req->commitTerm;
1,088✔
1147
  info.config.vndStats.numOfSTables = req->numOfSTables;
1,088✔
1148
  info.config.vndStats.numOfCTables = req->numOfCTables;
1,088✔
1149
  info.config.vndStats.numOfNTables = req->numOfNTables;
1,088✔
1150

1151
  SVnodeInfo oldInfo = {0};
1,088✔
1152
  oldInfo.config = vnodeCfgDefault;
1,088✔
1153
  if (vnodeLoadInfo(hostDir, &oldInfo) == 0) {
1,088✔
1154
    if (oldInfo.config.dbId != info.config.dbId) {
×
1155
      code = TSDB_CODE_VND_ALREADY_EXIST_BUT_NOT_MATCH;
×
1156
      vError("vgId:%d, mount:%s, vnode config info already exists at %s. oldDbId:%" PRId64 "(%s) at cluster:%" PRId64
×
1157
             ", newDbId:%" PRId64 "(%s) at cluser:%" PRId64 ", code:%s",
1158
             oldInfo.config.vgId, req->mountName, hostDir, oldInfo.config.dbId, oldInfo.config.dbname,
1159
             oldInfo.config.syncCfg.nodeInfo[oldInfo.config.syncCfg.myIndex].clusterId, info.config.dbId,
1160
             info.config.dbname, info.config.syncCfg.nodeInfo[info.config.syncCfg.myIndex].clusterId, tstrerror(code));
1161

1162
    } else {
1163
      vWarn("vgId:%d, mount:%s, vnode config info already exists at %s.", oldInfo.config.vgId, req->mountName, hostDir);
×
1164
    }
1165
    return code;
×
1166
  }
1167

1168
  char hostSubDir[TSDB_FILENAME_LEN] = {0};
1,088✔
1169
  char mountSubDir[TSDB_FILENAME_LEN] = {0};
1,088✔
1170
  (void)snprintf(mountVnode, sizeof(mountVnode), "vnode%svnode%d", TD_DIRSEP, req->mountVgId);
1,088✔
1171
  vnodeGetPrimaryDir(mountVnode, diskPrimary, pMountTfs, mountDir, TSDB_FILENAME_LEN);
1,088✔
1172
  static const char *vndSubDirs[] = {"meta", "sync", "tq", "tsdb", "wal"};
1173
  for (int32_t i = 0; i < tListLen(vndSubDirs); ++i) {
6,528✔
1174
    (void)snprintf(hostSubDir, sizeof(hostSubDir), "%s%s%s", hostDir, TD_DIRSEP, vndSubDirs[i]);
5,440✔
1175
    (void)snprintf(mountSubDir, sizeof(mountSubDir), "%s%s%s", mountDir, TD_DIRSEP, vndSubDirs[i]);
5,440✔
1176
    if ((code = taosSymLink(mountSubDir, hostSubDir)) != 0) {
5,440✔
1177
      vError("vgId:%d, mount:%s, failed to create vnode symlink %s -> %s since %s", info.config.vgId, req->mountName,
×
1178
             mountSubDir, hostSubDir, tstrerror(code));
1179
      return code;
×
1180
    }
1181
  }
1182
  vInfo("vgId:%d, mount:save vnode config while create", info.config.vgId);
1,088✔
1183
  if ((code = vnodeSaveInfo(hostDir, &info)) < 0 || (code = vnodeCommitInfo(hostDir)) < 0) {
1,088✔
1184
    vError("vgId:%d, mount:%s, failed to save vnode config since %s, mount path: %s", pCfg ? pCfg->vgId : 0,
×
1185
           req->mountName, tstrerror(code), hostDir);
1186
    return code;
×
1187
  }
1188
  vInfo("vgId:%d, mount:%s, vnode is mounted from %s to %s", info.config.vgId, req->mountName, mountDir, hostDir);
1,088✔
1189
  return 0;
1,088✔
1190
}
1191

1192
int32_t vmProcessMountVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
1,088✔
1193
  int32_t          code = 0, lino = 0;
1,088✔
1194
  SMountVnodeReq   req = {0};
1,088✔
1195
  SCreateVnodeReq *pCreateReq = &req.createReq;
1,088✔
1196
  SVnodeCfg        vnodeCfg = {0};
1,088✔
1197
  SWrapperCfg      wrapperCfg = {0};
1,088✔
1198
  SVnode          *pImpl = NULL;
1,088✔
1199
  STfs            *pMountTfs = NULL;
1,088✔
1200
  char             path[TSDB_FILENAME_LEN] = {0};
1,088✔
1201
  bool             releaseTfs = false;
1,088✔
1202

1203
  if (tDeserializeSMountVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
1,088✔
1204
    dError("vgId:%d, failed to mount vnode since deserialize request error", pCreateReq->vgId);
×
1205
    return TSDB_CODE_INVALID_MSG;
×
1206
  }
1207

1208
  if (pCreateReq->learnerReplica == 0) {
1,088✔
1209
    pCreateReq->learnerSelfIndex = -1;
1,088✔
1210
  }
1211
  for (int32_t i = 0; i < pCreateReq->replica; ++i) {
2,039✔
1212
    dInfo("mount:%s, vgId:%d, replica:%d ep:%s:%u dnode:%d", req.mountName, pCreateReq->vgId, i,
1,088✔
1213
          pCreateReq->replicas[i].fqdn, pCreateReq->replicas[i].port, pCreateReq->replicas[i].id);
1214
  }
1215
  for (int32_t i = 0; i < pCreateReq->learnerReplica; ++i) {
1,088✔
1216
    dInfo("mount:%s, vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.mountName, pCreateReq->vgId, i,
×
1217
          pCreateReq->learnerReplicas[i].fqdn, pCreateReq->learnerReplicas[i].port, pCreateReq->replicas[i].id);
1218
  }
1219

1220
  SReplica *pReplica = NULL;
1,088✔
1221
  if (pCreateReq->selfIndex != -1) {
1,088✔
1222
    pReplica = &pCreateReq->replicas[pCreateReq->selfIndex];
1,088✔
1223
  } else {
1224
    pReplica = &pCreateReq->learnerReplicas[pCreateReq->learnerSelfIndex];
×
1225
  }
1226
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
1,088✔
1227
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
1,088✔
1228
    (void)tFreeSMountVnodeReq(&req);
×
1229
    code = TSDB_CODE_INVALID_MSG;
×
1230
    dError("mount:%s, vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode, reason:%s", req.mountName,
×
1231
           pCreateReq->vgId, pReplica->id, pReplica->fqdn, pReplica->port, tstrerror(code));
1232
    return code;
×
1233
  }
1234
  
1235
  if (taosWaitCfgKeyLoaded() != 0) {
1,088✔
1236
    (void)tFreeSMountVnodeReq(&req);
×
1237
    code = terrno;
×
1238
    dError("mount:%s, vgId:%d, failed to create vnode since encrypt key is not loaded, reason:%s", req.mountName,
×
1239
           pCreateReq->vgId, tstrerror(code));
1240
    return code;
×
1241
  }
1242

1243
  vmGenerateVnodeCfg(pCreateReq, &vnodeCfg);
1,088✔
1244
  vnodeCfg.mountVgId = req.mountVgId;
1,088✔
1245
  vmGenerateWrapperCfg(pMgmt, pCreateReq, &wrapperCfg);
1,088✔
1246
  wrapperCfg.mountId = req.mountId;
1,088✔
1247

1248
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, pCreateReq->vgId, false);
1,088✔
1249
  if (pVnode != NULL && (pCreateReq->replica == 1 || !pVnode->failed)) {
1,088✔
1250
    dError("mount:%s, vgId:%d, already exist", req.mountName, pCreateReq->vgId);
×
1251
    (void)tFreeSMountVnodeReq(&req);
×
1252
    vmReleaseVnode(pMgmt, pVnode);
×
1253
    code = TSDB_CODE_VND_ALREADY_EXIST;
×
1254
    return 0;
×
1255
  }
1256
  vmReleaseVnode(pMgmt, pVnode);
1,088✔
1257

1258
  wrapperCfg.diskPrimary = req.diskPrimary;
1,088✔
1259
  (void)snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
1,088✔
1260
  TAOS_CHECK_EXIT(vmAcquireMountTfs(pMgmt, req.mountId, req.mountName, req.mountPath, &pMountTfs));
1,088✔
1261
  releaseTfs = true;
1,088✔
1262

1263
  TAOS_CHECK_EXIT(vmMountVnode(pMgmt, path, &vnodeCfg, wrapperCfg.diskPrimary, &req, pMountTfs));
1,088✔
1264
  if (!(pImpl = vnodeOpen(path, 0, pMgmt->pTfs, pMountTfs, pMgmt->msgCb, true))) {
1,088✔
1265
    TAOS_CHECK_EXIT(terrno != 0 ? terrno : -1);
×
1266
  }
1267
  if ((code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl)) != 0) {
1,088✔
1268
    TAOS_CHECK_EXIT(terrno != 0 ? terrno : code);
×
1269
  }
1270
  TAOS_CHECK_EXIT(vnodeStart(pImpl));
1,088✔
1271
  TAOS_CHECK_EXIT(vmWriteVnodeListToFile(pMgmt));
1,088✔
1272
  TAOS_CHECK_EXIT(vmWriteMountListToFile(pMgmt));
1,088✔
1273
_exit:
1,088✔
1274
  vmCleanPrimaryDisk(pMgmt, pCreateReq->vgId);
1,088✔
1275
  if (code != 0) {
1,088✔
1276
    dError("mount:%s, vgId:%d, msgType:%s, failed at line %d to mount vnode since %s", req.mountName, pCreateReq->vgId,
×
1277
           TMSG_INFO(pMsg->msgType), lino, tstrerror(code));
1278
    vmCloseFailedVnode(pMgmt, pCreateReq->vgId);
×
1279
    vnodeClose(pImpl);
×
1280
    vnodeDestroy(0, path, pMgmt->pTfs, 0);
×
1281
    if (releaseTfs) vmReleaseMountTfs(pMgmt, req.mountId, 1);
×
1282
  } else {
1283
    dInfo("mount:%s, vgId:%d, msgType:%s, success to mount vnode", req.mountName, pCreateReq->vgId,
1,088✔
1284
          TMSG_INFO(pMsg->msgType));
1285
  }
1286

1287
  pMsg->code = code;
1,088✔
1288
  pMsg->info.rsp = NULL;
1,088✔
1289
  pMsg->info.rspLen = 0;
1,088✔
1290

1291
  (void)tFreeSMountVnodeReq(&req);
1,088✔
1292
  TAOS_RETURN(code);
1,088✔
1293
}
1294
#endif  // USE_MOUNT
1295

1296
// alter replica doesn't use this, but restore dnode still use this
1297
int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
2,027,268✔
1298
  SAlterVnodeTypeReq req = {0};
2,027,268✔
1299
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
2,027,268✔
1300
    terrno = TSDB_CODE_INVALID_MSG;
×
1301
    return -1;
×
1302
  }
1303

1304
  if (req.learnerReplicas == 0) {
1305
    req.learnerSelfIndex = -1;
1306
  }
1307

1308
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process alter-node-type-request", req.vgId,
2,027,268✔
1309
        TMSG_INFO(pMsg->msgType));
1310

1311
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
2,027,268✔
1312
  if (pVnode == NULL) {
2,027,268✔
1313
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
1314
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1315
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1316
    return -1;
×
1317
  }
1318

1319
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
2,027,268✔
1320
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
2,027,268✔
1321
  if (role == TAOS_SYNC_ROLE_VOTER) {
2,027,268✔
1322
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
1323
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
1324
    vmReleaseVnode(pMgmt, pVnode);
×
1325
    return -1;
×
1326
  }
1327

1328
  dInfo("vgId:%d, checking node catch up", req.vgId);
2,027,268✔
1329
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
2,027,268✔
1330
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
1,926,724✔
1331
    vmReleaseVnode(pMgmt, pVnode);
1,926,724✔
1332
    return -1;
1,926,724✔
1333
  }
1334

1335
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
100,544✔
1336

1337
  int32_t vgId = req.vgId;
100,544✔
1338
  dInfo("vgId:%d, start to alter vnode type replica:%d selfIndex:%d strict:%d changeVersion:%d", vgId, req.replica,
100,544✔
1339
        req.selfIndex, req.strict, req.changeVersion);
1340
  for (int32_t i = 0; i < req.replica; ++i) {
402,184✔
1341
    SReplica *pReplica = &req.replicas[i];
301,640✔
1342
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
301,640✔
1343
  }
1344
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
100,544✔
1345
    SReplica *pReplica = &req.learnerReplicas[i];
×
1346
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
×
1347
  }
1348

1349
  if (req.replica <= 0 || (req.selfIndex < 0 && req.learnerSelfIndex < 0) || req.selfIndex >= req.replica ||
100,544✔
1350
      req.learnerSelfIndex >= req.learnerReplica) {
100,544✔
1351
    terrno = TSDB_CODE_INVALID_MSG;
×
1352
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
1353
    vmReleaseVnode(pMgmt, pVnode);
×
1354
    return -1;
×
1355
  }
1356

1357
  SReplica *pReplica = NULL;
100,544✔
1358
  if (req.selfIndex != -1) {
100,544✔
1359
    pReplica = &req.replicas[req.selfIndex];
100,544✔
1360
  } else {
1361
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
×
1362
  }
1363

1364
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
100,544✔
1365
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
100,544✔
1366
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1367
    dError("vgId:%d, dnodeId:%d ep:%s:%u in request, ep:%s:%u in local, %s", vgId, pReplica->id, pReplica->fqdn,
×
1368
           pReplica->port, tsLocalFqdn, tsServerPort, tstrerror(terrno));
1369
    vmReleaseVnode(pMgmt, pVnode);
×
1370
    return -1;
×
1371
  }
1372

1373
  dInfo("vgId:%d, start to close vnode", vgId);
100,544✔
1374
  SWrapperCfg wrapperCfg = {
100,544✔
1375
      .dropped = pVnode->dropped,
100,544✔
1376
      .vgId = pVnode->vgId,
100,544✔
1377
      .vgVersion = pVnode->vgVersion,
100,544✔
1378
      .diskPrimary = pVnode->diskPrimary,
100,544✔
1379
  };
1380
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
100,544✔
1381

1382
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
100,544✔
1383
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
100,544✔
1384

1385
  int32_t diskPrimary = wrapperCfg.diskPrimary;
100,544✔
1386
  char    path[TSDB_FILENAME_LEN] = {0};
100,544✔
1387
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
100,544✔
1388

1389
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
100,544✔
1390
  if (vnodeAlterReplica(path, &req, diskPrimary, pMgmt->pTfs) < 0) {
100,544✔
1391
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
1392
    return -1;
×
1393
  }
1394

1395
  dInfo("vgId:%d, begin to open vnode", vgId);
100,544✔
1396
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
100,544✔
1397
  if (pImpl == NULL) {
100,544✔
1398
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
1399
    return -1;
×
1400
  }
1401

1402
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
100,544✔
1403
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
1404
    return -1;
×
1405
  }
1406

1407
  if (vnodeStart(pImpl) != 0) {
100,544✔
1408
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
1409
    return -1;
×
1410
  }
1411

1412
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process alter-node-type-request, vnode config is altered",
100,544✔
1413
        req.vgId, TMSG_INFO(pMsg->msgType));
1414
  return 0;
100,544✔
1415
}
1416

1417
int32_t vmProcessCheckLearnCatchupReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
1418
  SCheckLearnCatchupReq req = {0};
×
1419
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
×
1420
    terrno = TSDB_CODE_INVALID_MSG;
×
1421
    return -1;
×
1422
  }
1423

1424
  if (req.learnerReplicas == 0) {
1425
    req.learnerSelfIndex = -1;
1426
  }
1427

1428
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process check-learner-catchup-request", req.vgId,
×
1429
        TMSG_INFO(pMsg->msgType));
1430

1431
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
×
1432
  if (pVnode == NULL) {
×
1433
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
1434
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1435
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1436
    return -1;
×
1437
  }
1438

1439
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
×
1440
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
×
1441
  if (role == TAOS_SYNC_ROLE_VOTER) {
×
1442
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
1443
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
1444
    vmReleaseVnode(pMgmt, pVnode);
×
1445
    return -1;
×
1446
  }
1447

1448
  dInfo("vgId:%d, checking node catch up", req.vgId);
×
1449
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
×
1450
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
×
1451
    vmReleaseVnode(pMgmt, pVnode);
×
1452
    return -1;
×
1453
  }
1454

1455
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
×
1456

1457
  vmReleaseVnode(pMgmt, pVnode);
×
1458

1459
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process check-learner-catchup-request", req.vgId,
×
1460
        TMSG_INFO(pMsg->msgType));
1461

1462
  return 0;
×
1463
}
1464

1465
int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
27,193✔
1466
  SDisableVnodeWriteReq req = {0};
27,193✔
1467
  if (tDeserializeSDisableVnodeWriteReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
27,193✔
1468
    terrno = TSDB_CODE_INVALID_MSG;
×
1469
    return -1;
×
1470
  }
1471

1472
  dInfo("vgId:%d, vnode write disable:%d", req.vgId, req.disable);
27,193✔
1473

1474
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
27,193✔
1475
  if (pVnode == NULL) {
27,193✔
1476
    dError("vgId:%d, failed to disable write since %s", req.vgId, terrstr());
×
1477
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1478
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1479
    return -1;
×
1480
  }
1481

1482
  pVnode->disable = req.disable;
27,193✔
1483
  vmReleaseVnode(pMgmt, pVnode);
27,193✔
1484
  return 0;
27,193✔
1485
}
1486

1487
int32_t vmProcessSetKeepVersionReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
3,463✔
1488
  SMsgHead *pHead = pMsg->pCont;
3,463✔
1489
  pHead->contLen = ntohl(pHead->contLen);
3,463✔
1490
  pHead->vgId = ntohl(pHead->vgId);
3,463✔
1491

1492
  SVndSetKeepVersionReq req = {0};
3,463✔
1493
  if (tDeserializeSVndSetKeepVersionReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead),
3,463✔
1494
                                        &req) != 0) {
1495
    terrno = TSDB_CODE_INVALID_MSG;
×
1496
    return -1;
×
1497
  }
1498

1499
  dInfo("vgId:%d, set wal keep version to %" PRId64, pHead->vgId, req.keepVersion);
3,463✔
1500

1501
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
3,463✔
1502
  if (pVnode == NULL) {
3,463✔
1503
    dError("vgId:%d, failed to set keep version since %s", pHead->vgId, terrstr());
×
1504
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1505
    return -1;
×
1506
  }
1507

1508
  // Directly call vnodeSetWalKeepVersion for immediate effect (< 1ms)
1509
  // This bypasses Raft to avoid timing issues where WAL might be deleted
1510
  // before keepVersion is set through the Raft consensus process
1511
  int32_t code = vnodeSetWalKeepVersion(pVnode->pImpl, req.keepVersion);
3,463✔
1512
  if (code != TSDB_CODE_SUCCESS) {
3,463✔
1513
    dError("vgId:%d, failed to set keepVersion to %" PRId64 " since %s", pHead->vgId, req.keepVersion, tstrerror(code));
×
1514
    terrno = code;
×
1515
    vmReleaseVnode(pMgmt, pVnode);
×
1516
    return -1;
×
1517
  }
1518

1519
  dInfo("vgId:%d, successfully set keepVersion to %" PRId64, pHead->vgId, req.keepVersion);
3,463✔
1520

1521
  vmReleaseVnode(pMgmt, pVnode);
3,463✔
1522
  return 0;
3,463✔
1523
}
1524

1525
int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
26,447✔
1526
  SAlterVnodeHashRangeReq req = {0};
26,447✔
1527
  if (tDeserializeSAlterVnodeHashRangeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
26,447✔
1528
    terrno = TSDB_CODE_INVALID_MSG;
×
1529
    return -1;
×
1530
  }
1531

1532
  int32_t srcVgId = req.srcVgId;
26,447✔
1533
  int32_t dstVgId = req.dstVgId;
26,447✔
1534

1535
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, dstVgId);
26,447✔
1536
  if (pVnode != NULL) {
26,447✔
1537
    dError("vgId:%d, vnode already exist", dstVgId);
×
1538
    vmReleaseVnode(pMgmt, pVnode);
×
1539
    terrno = TSDB_CODE_VND_ALREADY_EXIST;
×
1540
    return -1;
×
1541
  }
1542

1543
  dInfo("vgId:%d, start to alter vnode hashrange:[%u, %u], dstVgId:%d", req.srcVgId, req.hashBegin, req.hashEnd,
26,447✔
1544
        req.dstVgId);
1545
  pVnode = vmAcquireVnode(pMgmt, srcVgId);
26,447✔
1546
  if (pVnode == NULL) {
26,447✔
1547
    dError("vgId:%d, failed to alter hashrange since %s", srcVgId, terrstr());
×
1548
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1549
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1550
    return -1;
×
1551
  }
1552

1553
  SWrapperCfg wrapperCfg = {
26,447✔
1554
      .dropped = pVnode->dropped,
26,447✔
1555
      .vgId = dstVgId,
1556
      .vgVersion = pVnode->vgVersion,
26,447✔
1557
      .diskPrimary = pVnode->diskPrimary,
26,447✔
1558
  };
1559
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
26,447✔
1560

1561
  // prepare alter
1562
  pVnode->toVgId = dstVgId;
26,447✔
1563
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
26,447✔
1564
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
1565
    return -1;
×
1566
  }
1567

1568
  dInfo("vgId:%d, close vnode", srcVgId);
26,447✔
1569
  vmCloseVnode(pMgmt, pVnode, true, false);
26,447✔
1570

1571
  int32_t diskPrimary = wrapperCfg.diskPrimary;
26,447✔
1572
  char    srcPath[TSDB_FILENAME_LEN] = {0};
26,447✔
1573
  char    dstPath[TSDB_FILENAME_LEN] = {0};
26,447✔
1574
  snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
26,447✔
1575
  snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
26,447✔
1576

1577
  dInfo("vgId:%d, alter vnode hashrange at %s", srcVgId, srcPath);
26,447✔
1578
  if (vnodeAlterHashRange(srcPath, dstPath, &req, diskPrimary, pMgmt->pTfs) < 0) {
26,447✔
1579
    dError("vgId:%d, failed to alter vnode hashrange since %s", srcVgId, terrstr());
×
1580
    return -1;
×
1581
  }
1582

1583
  dInfo("vgId:%d, open vnode", dstVgId);
26,447✔
1584
  SVnode *pImpl = vnodeOpen(dstPath, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
26,447✔
1585

1586
  if (pImpl == NULL) {
26,447✔
1587
    dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr());
×
1588
    return -1;
×
1589
  }
1590

1591
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
26,447✔
1592
    dError("vgId:%d, failed to open vnode mgmt since %s", dstVgId, terrstr());
×
1593
    return -1;
×
1594
  }
1595

1596
  if (vnodeStart(pImpl) != 0) {
26,447✔
1597
    dError("vgId:%d, failed to start sync since %s", dstVgId, terrstr());
×
1598
    return -1;
×
1599
  }
1600

1601
  // complete alter
1602
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
26,447✔
1603
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
1604
    return -1;
×
1605
  }
1606

1607
  dInfo("vgId:%d, vnode hashrange is altered", dstVgId);
26,447✔
1608
  return 0;
26,447✔
1609
}
1610

1611
int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
698,466✔
1612
  SAlterVnodeReplicaReq alterReq = {0};
698,466✔
1613
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
698,466✔
1614
    terrno = TSDB_CODE_INVALID_MSG;
×
1615
    return -1;
×
1616
  }
1617

1618
  if (alterReq.learnerReplica == 0) {
698,466✔
1619
    alterReq.learnerSelfIndex = -1;
499,801✔
1620
  }
1621

1622
  int32_t vgId = alterReq.vgId;
698,466✔
1623
  dInfo(
698,466✔
1624
      "vgId:%d, vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
1625
      "learnerSelfIndex:%d strict:%d changeVersion:%d",
1626
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
1627
      alterReq.learnerSelfIndex, alterReq.strict, alterReq.changeVersion);
1628

1629
  for (int32_t i = 0; i < alterReq.replica; ++i) {
2,723,395✔
1630
    SReplica *pReplica = &alterReq.replicas[i];
2,024,929✔
1631
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
2,024,929✔
1632
  }
1633
  for (int32_t i = 0; i < alterReq.learnerReplica; ++i) {
897,131✔
1634
    SReplica *pReplica = &alterReq.learnerReplicas[i];
198,665✔
1635
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
198,665✔
1636
  }
1637

1638
  if (alterReq.replica <= 0 || (alterReq.selfIndex < 0 && alterReq.learnerSelfIndex < 0) ||
698,466✔
1639
      alterReq.selfIndex >= alterReq.replica || alterReq.learnerSelfIndex >= alterReq.learnerReplica) {
698,466✔
1640
    terrno = TSDB_CODE_INVALID_MSG;
×
1641
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
1642
    return -1;
×
1643
  }
1644

1645
  SReplica *pReplica = NULL;
698,466✔
1646
  if (alterReq.selfIndex != -1) {
698,466✔
1647
    pReplica = &alterReq.replicas[alterReq.selfIndex];
698,466✔
1648
  } else {
1649
    pReplica = &alterReq.learnerReplicas[alterReq.learnerSelfIndex];
×
1650
  }
1651

1652
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
698,466✔
1653
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
698,466✔
1654
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1655
    dError("vgId:%d, dnodeId:%d ep:%s:%u in request, ep:%s:%u in lcoal, %s", vgId, pReplica->id, pReplica->fqdn,
×
1656
           pReplica->port, tsLocalFqdn, tsServerPort, tstrerror(terrno));
1657
    return -1;
×
1658
  }
1659

1660
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
698,466✔
1661
  if (pVnode == NULL) {
698,466✔
1662
    dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
×
1663
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1664
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1665
    return -1;
×
1666
  }
1667

1668
  dInfo("vgId:%d, start to close vnode", vgId);
698,466✔
1669
  SWrapperCfg wrapperCfg = {
698,466✔
1670
      .dropped = pVnode->dropped,
698,466✔
1671
      .vgId = pVnode->vgId,
698,466✔
1672
      .vgVersion = pVnode->vgVersion,
698,466✔
1673
      .diskPrimary = pVnode->diskPrimary,
698,466✔
1674
  };
1675
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
698,466✔
1676

1677
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
698,466✔
1678
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
698,466✔
1679

1680
  int32_t diskPrimary = wrapperCfg.diskPrimary;
698,466✔
1681
  char    path[TSDB_FILENAME_LEN] = {0};
698,466✔
1682
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
698,466✔
1683

1684
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
698,466✔
1685
  if (vnodeAlterReplica(path, &alterReq, diskPrimary, pMgmt->pTfs) < 0) {
698,466✔
1686
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
1687
    return -1;
×
1688
  }
1689

1690
  dInfo("vgId:%d, begin to open vnode", vgId);
698,466✔
1691
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
698,466✔
1692
  if (pImpl == NULL) {
698,466✔
1693
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
1694
    return -1;
×
1695
  }
1696

1697
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
698,466✔
1698
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
1699
    return -1;
×
1700
  }
1701

1702
  if (vnodeStart(pImpl) != 0) {
698,466✔
1703
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
1704
    return -1;
×
1705
  }
1706

1707
  dInfo(
698,466✔
1708
      "vgId:%d, vnode management handle msgType:%s, end to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
1709
      "learnerSelfIndex:%d strict:%d",
1710
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
1711
      alterReq.learnerSelfIndex, alterReq.strict);
1712
  return 0;
698,466✔
1713
}
1714

1715
int32_t vmProcessAlterVnodeElectBaselineReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
20,486✔
1716
  SAlterVnodeElectBaselineReq alterReq = {0};
20,486✔
1717
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
20,486✔
1718
    return TSDB_CODE_INVALID_MSG;
×
1719
  }
1720

1721
  int32_t vgId = alterReq.vgId;
20,486✔
1722
  dInfo(
20,486✔
1723
      "vgId:%d, process alter vnode elect-base-line msgType:%s, electBaseLine:%d",
1724
      vgId, TMSG_INFO(pMsg->msgType), alterReq.electBaseLine);
1725

1726
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
20,486✔
1727
  if (pVnode == NULL) {
20,486✔
1728
    dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
×
1729
    return terrno;
×
1730
  }
1731

1732
  if(vnodeSetElectBaseline(pVnode->pImpl, alterReq.electBaseLine) != 0){
20,486✔
1733
    vmReleaseVnode(pMgmt, pVnode);
×
1734
    return -1;
×
1735
  }
1736

1737
  vmReleaseVnode(pMgmt, pVnode);
20,486✔
1738
  return 0;
20,486✔
1739
}
1740

1741
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
1,733,691✔
1742
  int32_t       code = 0;
1,733,691✔
1743
  SDropVnodeReq dropReq = {0};
1,733,691✔
1744
  if (tDeserializeSDropVnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
1,733,691✔
1745
    terrno = TSDB_CODE_INVALID_MSG;
×
1746
    return terrno;
×
1747
  }
1748

1749
  int32_t vgId = dropReq.vgId;
1,733,691✔
1750
  dInfo("vgId:%d, start to drop vnode", vgId);
1,733,691✔
1751

1752
  if (dropReq.dnodeId != pMgmt->pData->dnodeId) {
1,733,691✔
1753
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1754
    dError("vgId:%d, dnodeId:%d, %s", dropReq.vgId, dropReq.dnodeId, tstrerror(terrno));
×
1755
    return terrno;
×
1756
  }
1757

1758
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, vgId, false);
1,733,691✔
1759
  if (pVnode == NULL) {
1,733,691✔
1760
    dInfo("vgId:%d, failed to drop since %s", vgId, terrstr());
×
1761
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1762
    return terrno;
×
1763
  }
1764

1765
  pVnode->dropped = 1;
1,733,691✔
1766
  if ((code = vmWriteVnodeListToFile(pMgmt)) != 0) {
1,733,691✔
1767
    pVnode->dropped = 0;
×
1768
    vmReleaseVnode(pMgmt, pVnode);
×
1769
    return code;
×
1770
  }
1771

1772
  vmCloseVnode(pMgmt, pVnode, false, false);
1,733,691✔
1773
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
1,733,691✔
1774
    dError("vgId:%d, failed to write vnode list since %s", vgId, terrstr());
×
1775
  }
1776

1777
  dInfo("vgId:%d, is dropped", vgId);
1,733,691✔
1778
  return 0;
1,733,691✔
1779
}
1780

1781
int32_t vmProcessArbHeartBeatReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
122,840✔
1782
  SVArbHeartBeatReq arbHbReq = {0};
122,840✔
1783
  SVArbHeartBeatRsp arbHbRsp = {0};
122,840✔
1784
  if (tDeserializeSVArbHeartBeatReq(pMsg->pCont, pMsg->contLen, &arbHbReq) != 0) {
122,840✔
1785
    terrno = TSDB_CODE_INVALID_MSG;
×
1786
    return -1;
×
1787
  }
1788

1789
  if (arbHbReq.dnodeId != pMgmt->pData->dnodeId) {
122,840✔
1790
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1791
    dError("dnodeId:%d, %s", arbHbReq.dnodeId, tstrerror(terrno));
×
1792
    goto _OVER;
×
1793
  }
1794

1795
  if (strlen(arbHbReq.arbToken) == 0) {
122,840✔
1796
    terrno = TSDB_CODE_INVALID_MSG;
×
1797
    dError("dnodeId:%d arbToken is empty", arbHbReq.dnodeId);
×
1798
    goto _OVER;
×
1799
  }
1800

1801
  size_t size = taosArrayGetSize(arbHbReq.hbMembers);
122,840✔
1802

1803
  arbHbRsp.dnodeId = pMgmt->pData->dnodeId;
122,840✔
1804
  tstrncpy(arbHbRsp.arbToken, arbHbReq.arbToken, TSDB_ARB_TOKEN_SIZE);
122,840✔
1805
  arbHbRsp.hbMembers = taosArrayInit(size, sizeof(SVArbHbRspMember));
122,840✔
1806
  if (arbHbRsp.hbMembers == NULL) {
122,840✔
1807
    goto _OVER;
×
1808
  }
1809

1810
  for (int32_t i = 0; i < size; i++) {
261,666✔
1811
    SVArbHbReqMember *pReqMember = taosArrayGet(arbHbReq.hbMembers, i);
138,826✔
1812
    SVnodeObj        *pVnode = vmAcquireVnode(pMgmt, pReqMember->vgId);
138,826✔
1813
    if (pVnode == NULL) {
138,826✔
1814
      dError("dnodeId:%d vgId:%d not found failed to process arb hb req", arbHbReq.dnodeId, pReqMember->vgId);
28,821✔
1815
      continue;
28,821✔
1816
    }
1817

1818
    SVArbHbRspMember rspMember = {0};
110,005✔
1819
    rspMember.vgId = pReqMember->vgId;
110,005✔
1820
    rspMember.hbSeq = pReqMember->hbSeq;
110,005✔
1821
    if (vnodeGetArbToken(pVnode->pImpl, rspMember.memberToken) != 0) {
110,005✔
1822
      dError("dnodeId:%d vgId:%d failed to get arb token", arbHbReq.dnodeId, pReqMember->vgId);
×
1823
      vmReleaseVnode(pMgmt, pVnode);
×
1824
      continue;
×
1825
    }
1826

1827
    if (vnodeUpdateArbTerm(pVnode->pImpl, arbHbReq.arbTerm) != 0) {
110,005✔
1828
      dError("dnodeId:%d vgId:%d failed to update arb term", arbHbReq.dnodeId, pReqMember->vgId);
×
1829
      vmReleaseVnode(pMgmt, pVnode);
×
1830
      continue;
×
1831
    }
1832

1833
    if (taosArrayPush(arbHbRsp.hbMembers, &rspMember) == NULL) {
220,010✔
1834
      dError("dnodeId:%d vgId:%d failed to push arb hb rsp member", arbHbReq.dnodeId, pReqMember->vgId);
×
1835
      vmReleaseVnode(pMgmt, pVnode);
×
1836
      goto _OVER;
×
1837
    }
1838

1839
    vmReleaseVnode(pMgmt, pVnode);
110,005✔
1840
  }
1841

1842
  SRpcMsg rspMsg = {.info = pMsg->info};
122,840✔
1843
  int32_t rspLen = tSerializeSVArbHeartBeatRsp(NULL, 0, &arbHbRsp);
122,840✔
1844
  if (rspLen < 0) {
122,840✔
1845
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1846
    goto _OVER;
×
1847
  }
1848

1849
  void *pRsp = rpcMallocCont(rspLen);
122,840✔
1850
  if (pRsp == NULL) {
122,840✔
1851
    terrno = terrno;
×
1852
    goto _OVER;
×
1853
  }
1854

1855
  if (tSerializeSVArbHeartBeatRsp(pRsp, rspLen, &arbHbRsp) <= 0) {
122,840✔
1856
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1857
    rpcFreeCont(pRsp);
×
1858
    goto _OVER;
×
1859
  }
1860
  pMsg->info.rsp = pRsp;
122,840✔
1861
  pMsg->info.rspLen = rspLen;
122,840✔
1862

1863
  terrno = TSDB_CODE_SUCCESS;
122,840✔
1864

1865
_OVER:
122,840✔
1866
  tFreeSVArbHeartBeatReq(&arbHbReq);
122,840✔
1867
  tFreeSVArbHeartBeatRsp(&arbHbRsp);
122,840✔
1868
  return terrno;
122,840✔
1869
}
1870

1871
int32_t vmProcessDnodeQueryCompactProgressReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
140,290✔
1872
  int32_t                       code = 0;
140,290✔
1873
  SDnodeQueryCompactProgressReq req = {0};
140,290✔
1874
  void                         *pRsp = NULL;
140,290✔
1875
  SVnodeObj                   **ppVnodes = NULL;
140,290✔
1876
  int32_t                       numOfVnodes = 0;
140,290✔
1877

1878
  code = tDeserializeSDnodeQueryCompactProgressReq(pMsg->pCont, pMsg->contLen, &req);
140,290✔
1879
  if (code != 0) {
140,290✔
NEW
1880
    dError("dnode:%d, failed to deserialize dnode-query-compact-progress req, code:%s",
×
1881
           pMgmt->pData->dnodeId, tstrerror(code));
NEW
1882
    goto _exit;
×
1883
  }
1884

1885
  dDebug("dnode:%d, receive dnode-query-compact-progress req, compactId:%d", pMgmt->pData->dnodeId, req.compactId);
140,290✔
1886

1887
  // collect compact progress from all running vnodes
1888
  SArray *pProgressArray = taosArrayInit(16, sizeof(SQueryCompactProgressRsp));
140,290✔
1889
  if (pProgressArray == NULL) {
140,290✔
NEW
1890
    code = TSDB_CODE_OUT_OF_MEMORY;
×
NEW
1891
    goto _exit;
×
1892
  }
1893

1894
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
140,290✔
1895
  if (code != 0) {
140,290✔
NEW
1896
    dError("dnode:%d, failed to get vnode list, code:%s", pMgmt->pData->dnodeId, tstrerror(code));
×
NEW
1897
    taosArrayDestroy(pProgressArray);
×
NEW
1898
    goto _exit;
×
1899
  }
1900

1901
  for (int32_t i = 0; i < numOfVnodes; i++) {
629,939✔
1902
    SVnodeObj *pVnode = ppVnodes[i];
489,649✔
1903
    if (pVnode == NULL) {
489,649✔
NEW
1904
      continue;
×
1905
    }
1906
    if (pVnode->failed || pVnode->pImpl == NULL) {
489,649✔
NEW
1907
      vmReleaseVnode(pMgmt, pVnode);
×
NEW
1908
      continue;
×
1909
    }
1910
#ifdef TD_ENTERPRISE
1911
    SQueryCompactProgressRsp vnodeRsp = {0};
489,649✔
1912
    vnodeRsp.dnodeId = pMgmt->pData->dnodeId;
489,649✔
1913
    if (vnodeGetCompactProgress(pVnode->pImpl, req.compactId, &vnodeRsp) == 0 && vnodeRsp.compactId != 0) {
489,649✔
1914
      if (taosArrayPush(pProgressArray, &vnodeRsp) == NULL) {
489,649✔
NEW
1915
        dError("dnode:%d, vgId:%d, failed to push compact progress", pMgmt->pData->dnodeId, pVnode->vgId);
×
1916
      }
1917
    }
1918
#endif
1919
    vmReleaseVnode(pMgmt, pVnode);
489,649✔
1920
  }
1921
  taosMemoryFree(ppVnodes);
140,290✔
1922

1923
  SDnodeQueryCompactProgressRsp rsp = {0};
140,290✔
1924
  rsp.dnodeId       = pMgmt->pData->dnodeId;
140,290✔
1925
  rsp.numOfVnodes   = (int32_t)taosArrayGetSize(pProgressArray);
140,290✔
1926
  rsp.vnodeProgress = (rsp.numOfVnodes > 0) ? (SQueryCompactProgressRsp *)taosArrayGet(pProgressArray, 0) : NULL;
140,290✔
1927

1928
  dInfo("dnode:%d, send dnode-query-compact-progress rsp, numOfVnodes:%d", rsp.dnodeId, rsp.numOfVnodes);
140,290✔
1929

1930
  int32_t rspLen = tSerializeSDnodeQueryCompactProgressRsp(NULL, 0, &rsp);
140,290✔
1931
  if (rspLen < 0) {
140,290✔
NEW
1932
    code = rspLen;
×
NEW
1933
    taosArrayDestroy(pProgressArray);
×
NEW
1934
    goto _exit;
×
1935
  }
1936

1937
  pRsp = rpcMallocCont(rspLen);
140,290✔
1938
  if (pRsp == NULL) {
140,290✔
NEW
1939
    code = TSDB_CODE_OUT_OF_MEMORY;
×
NEW
1940
    taosArrayDestroy(pProgressArray);
×
NEW
1941
    goto _exit;
×
1942
  }
1943

1944
  if (tSerializeSDnodeQueryCompactProgressRsp(pRsp, rspLen, &rsp) < 0) {
140,290✔
NEW
1945
    code = TSDB_CODE_INVALID_MSG;
×
NEW
1946
    rpcFreeCont(pRsp);
×
NEW
1947
    pRsp = NULL;
×
NEW
1948
    taosArrayDestroy(pProgressArray);
×
NEW
1949
    goto _exit;
×
1950
  }
1951

1952
  taosArrayDestroy(pProgressArray);
140,290✔
1953
  pMsg->info.rsp    = pRsp;
140,290✔
1954
  pMsg->info.rspLen = rspLen;
140,290✔
1955

1956
_exit:
140,290✔
1957
  return code;
140,290✔
1958
}
1959

1960
SArray *vmGetMsgHandles() {
643,013✔
1961
  int32_t code = -1;
643,013✔
1962
  SArray *pArray = taosArrayInit(32, sizeof(SMgmtHandle));
643,013✔
1963
  if (pArray == NULL) goto _OVER;
643,013✔
1964

1965
  if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
643,013✔
1966
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
643,013✔
1967
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
643,013✔
1968
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
643,013✔
1969
  if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
643,013✔
1970
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
643,013✔
1971
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
643,013✔
1972
  if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
643,013✔
1973
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
643,013✔
1974
  if (dmSetMgmtHandle(pArray, TDMT_VND_VSUBTABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
643,013✔
1975
  if (dmSetMgmtHandle(pArray, TDMT_VND_VSTB_REF_DBS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
643,013✔
1976
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_CFG, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
643,013✔
1977
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
643,013✔
1978
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
643,013✔
1979
  if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
643,013✔
1980
  if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
643,013✔
1981
  if (dmSetMgmtHandle(pArray, TDMT_SCH_TASK_NOTIFY, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
643,013✔
1982
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
643,013✔
1983
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
643,013✔
1984
  if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_TTL_EXPIRED_TBS, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
643,013✔
1985
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
643,013✔
1986
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
643,013✔
1987
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
643,013✔
1988
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
643,013✔
1989
  if (dmSetMgmtHandle(pArray, TDMT_VND_SNODE_DROP_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
643,013✔
1990
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
643,013✔
1991
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DELETE_SUB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
643,013✔
1992
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
643,013✔
1993
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
643,013✔
1994
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_WALINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
643,013✔
1995
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_COMMITTEDINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
643,013✔
1996
  if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
643,013✔
1997
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
643,013✔
1998
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
643,013✔
1999
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
643,013✔
2000
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
643,013✔
2001
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
643,013✔
2002
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_COMPACT_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
643,013✔
2003
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_TRIM_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
643,013✔
2004
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_SCAN_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
643,013✔
2005
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
643,013✔
2006
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_SCAN, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
643,013✔
2007
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
643,013✔
2008
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_NAME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
643,013✔
2009
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
643,013✔
2010
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
643,013✔
2011
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
643,013✔
2012

2013
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
643,013✔
2014
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
643,013✔
2015
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
643,013✔
2016
  if (dmSetMgmtHandle(pArray, TDMT_VND_DISABLE_WRITE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
643,013✔
2017
  if (dmSetMgmtHandle(pArray, TDMT_VND_SET_KEEP_VERSION, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
643,013✔
2018
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
643,013✔
2019
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
643,013✔
2020
  if (dmSetMgmtHandle(pArray, TDMT_VND_SCAN, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
643,013✔
2021
  if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
643,013✔
2022
  if (dmSetMgmtHandle(pArray, TDMT_VND_LIST_SSMIGRATE_FILESETS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
643,013✔
2023
  if (dmSetMgmtHandle(pArray, TDMT_VND_SSMIGRATE_FILESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
643,013✔
2024
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_SSMIGRATE_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
643,013✔
2025
  if (dmSetMgmtHandle(pArray, TDMT_VND_FOLLOWER_SSMIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
643,013✔
2026
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_SSMIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
643,013✔
2027
  if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM_WAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
643,013✔
2028
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
643,013✔
2029
  if (dmSetMgmtHandle(pArray, TDMT_DND_MOUNT_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
643,013✔
2030
  if (dmSetMgmtHandle(pArray, TDMT_DND_RETRIEVE_MOUNT_PATH, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
643,013✔
2031
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
643,013✔
2032
  if (dmSetMgmtHandle(pArray, TDMT_DND_QUERY_COMPACT_PROGRESS, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
643,013✔
2033
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
643,013✔
2034
  if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
643,013✔
2035
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CONFIG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
643,013✔
2036
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_ELECTBASELINE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
643,013✔
2037

2038
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT_ELECTION, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
643,013✔
2039
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
643,013✔
2040
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
643,013✔
2041
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
643,013✔
2042
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
643,013✔
2043
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
643,013✔
2044
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
643,013✔
2045
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
643,013✔
2046
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
643,013✔
2047
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
643,013✔
2048
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
643,013✔
2049
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_FORCE_FOLLOWER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
643,013✔
2050

2051
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
643,013✔
2052
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
643,013✔
2053
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
643,013✔
2054
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
643,013✔
2055
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
643,013✔
2056

2057
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_HEARTBEAT, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
643,013✔
2058
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_CHECK_SYNC, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
643,013✔
2059
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_ASSIGNED_LEADER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
643,013✔
2060

2061
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_FETCH, vmPutMsgToStreamReaderQueue, 0) == NULL) goto _OVER;
643,013✔
2062
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_PULL, vmPutMsgToStreamReaderQueue, 0) == NULL) goto _OVER;
643,013✔
2063
  if (dmSetMgmtHandle(pArray, TDMT_VND_AUDIT_RECORD, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
643,013✔
2064

2065
  code = 0;
643,013✔
2066

2067
_OVER:
643,013✔
2068
  if (code != 0) {
643,013✔
2069
    taosArrayDestroy(pArray);
×
2070
    return NULL;
×
2071
  } else {
2072
    return pArray;
643,013✔
2073
  }
2074
}
2075

2076
void vmUpdateMetricsInfo(SVnodeMgmt *pMgmt, int64_t clusterId) {
×
2077
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
2078

2079
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
2080
  while (pIter) {
×
2081
    SVnodeObj **ppVnode = pIter;
×
2082
    if (ppVnode == NULL || *ppVnode == NULL) {
×
2083
      continue;
×
2084
    }
2085

2086
    SVnodeObj *pVnode = *ppVnode;
×
2087
    if (!pVnode->failed) {
×
2088
      SRawWriteMetrics metrics = {0};
×
2089
      if (vnodeGetRawWriteMetrics(pVnode->pImpl, &metrics) == 0) {
×
2090
        // Add the metrics to the global metrics system with cluster ID
2091
        SName   name = {0};
×
2092
        int32_t code = tNameFromString(&name, pVnode->pImpl->config.dbname, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
2093
        if (code < 0) {
×
2094
          dError("failed to get db name since %s", tstrerror(code));
×
2095
          continue;
×
2096
        }
2097
        code = addWriteMetrics(pVnode->vgId, pMgmt->pData->dnodeId, clusterId, tsLocalEp, name.dbname, &metrics);
×
2098
        if (code != TSDB_CODE_SUCCESS) {
×
2099
          dError("Failed to add write metrics for vgId: %d, code: %d", pVnode->vgId, code);
×
2100
        } else {
2101
          // After successfully adding metrics, reset the vnode's write metrics using atomic operations
2102
          if (vnodeResetRawWriteMetrics(pVnode->pImpl, &metrics) != 0) {
×
2103
            dError("Failed to reset write metrics for vgId: %d", pVnode->vgId);
×
2104
          }
2105
        }
2106
      } else {
2107
        dError("Failed to get write metrics for vgId: %d", pVnode->vgId);
×
2108
      }
2109
    }
2110
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
2111
  }
2112

2113
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
2114
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc