• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4829

30 Oct 2025 09:25AM UTC coverage: 49.734% (-11.3%) from 61.071%
#4829

push

travis-ci

web-flow
Merge pull request #33435 from taosdata/3.0

merge 3.0

123072 of 323930 branches covered (37.99%)

Branch coverage included in aggregate %.

7 of 25 new or added lines in 3 files covered. (28.0%)

35232 existing lines in 327 files now uncovered.

172062 of 269495 relevant lines covered (63.85%)

70709785.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

26.9
/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "metrics.h"
18
#include "taos_monitor.h"
19
#include "vmInt.h"
20
#include "vnd.h"
21
#include "vnodeInt.h"
22

23
extern taos_counter_t *tsInsertCounter;
24

25
// Forward declaration for function defined in metrics.c
26
extern int32_t addWriteMetrics(int32_t vgId, int32_t dnodeId, int64_t clusterId, const char *dnodeEp,
27
                               const char *dbname, const SRawWriteMetrics *pRawMetrics);
28

29
void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
8,223,872✔
30
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad));
8,223,872✔
31
  if (pInfo->pVloads == NULL) return;
8,223,872!
32

33
  tfsUpdateSize(pMgmt->pTfs);
8,223,872✔
34

35
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
8,223,872✔
36

37
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
8,223,872✔
38
  while (pIter) {
37,565,196✔
39
    SVnodeObj **ppVnode = pIter;
29,341,324✔
40
    if (ppVnode == NULL || *ppVnode == NULL) continue;
29,341,324!
41

42
    SVnodeObj *pVnode = *ppVnode;
29,341,324✔
43
    SVnodeLoad vload = {.vgId = pVnode->vgId};
29,341,324✔
44
    if (!pVnode->failed) {
29,341,324!
45
      if (vnodeGetLoad(pVnode->pImpl, &vload) != 0) {
29,341,324!
46
        dError("failed to get vnode load");
×
47
      }
48
      if (isReset) vnodeResetLoad(pVnode->pImpl, &vload);
29,341,324!
49
    }
50
    if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
58,682,648!
51
      dError("failed to push vnode load");
×
52
    }
53
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
29,341,324✔
54
  }
55

56
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
8,223,872✔
57
}
58

59
void vmSetVnodeSyncTimeout(SVnodeMgmt *pMgmt) {
×
60
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
61

62
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
63
  while (pIter) {
×
64
    SVnodeObj **ppVnode = pIter;
×
65
    if (ppVnode == NULL || *ppVnode == NULL) continue;
×
66

67
    SVnodeObj *pVnode = *ppVnode;
×
68

69
    if (vnodeSetSyncTimeout(pVnode->pImpl, tsVnodeElectIntervalMs) != 0) {
×
70
      dError("vgId:%d, failed to vnodeSetSyncTimeout", pVnode->vgId);
×
71
    }
72
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
73
  }
74

75
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
76
}
×
77

78
void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
×
79
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoadLite));
×
80
  if (!pInfo->pVloads) return;
×
81

82
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
83

84
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
85
  while (pIter) {
×
86
    SVnodeObj **ppVnode = pIter;
×
87
    if (ppVnode == NULL || *ppVnode == NULL) continue;
×
88

89
    SVnodeObj *pVnode = *ppVnode;
×
90
    if (!pVnode->failed) {
×
91
      SVnodeLoadLite vload = {0};
×
92
      if (vnodeGetLoadLite(pVnode->pImpl, &vload) == 0) {
×
93
        if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
×
94
          taosArrayDestroy(pInfo->pVloads);
×
95
          pInfo->pVloads = NULL;
×
96
          break;
×
97
        }
98
      }
99
    }
100
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
101
  }
102

103
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
104
}
105

UNCOV
106
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
×
UNCOV
107
  SMonVloadInfo vloads = {0};
×
UNCOV
108
  vmGetVnodeLoads(pMgmt, &vloads, true);
×
109

UNCOV
110
  SArray *pVloads = vloads.pVloads;
×
UNCOV
111
  if (pVloads == NULL) return;
×
112

UNCOV
113
  int32_t totalVnodes = 0;
×
UNCOV
114
  int32_t masterNum = 0;
×
UNCOV
115
  int64_t numOfSelectReqs = 0;
×
UNCOV
116
  int64_t numOfInsertReqs = 0;
×
UNCOV
117
  int64_t numOfInsertSuccessReqs = 0;
×
UNCOV
118
  int64_t numOfBatchInsertReqs = 0;
×
UNCOV
119
  int64_t numOfBatchInsertSuccessReqs = 0;
×
120

UNCOV
121
  for (int32_t i = 0; i < taosArrayGetSize(pVloads); ++i) {
×
UNCOV
122
    SVnodeLoad *pLoad = taosArrayGet(pVloads, i);
×
UNCOV
123
    numOfSelectReqs += pLoad->numOfSelectReqs;
×
UNCOV
124
    numOfInsertReqs += pLoad->numOfInsertReqs;
×
UNCOV
125
    numOfInsertSuccessReqs += pLoad->numOfInsertSuccessReqs;
×
UNCOV
126
    numOfBatchInsertReqs += pLoad->numOfBatchInsertReqs;
×
UNCOV
127
    numOfBatchInsertSuccessReqs += pLoad->numOfBatchInsertSuccessReqs;
×
UNCOV
128
    if (pLoad->syncState == TAOS_SYNC_STATE_LEADER || pLoad->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
×
UNCOV
129
      masterNum++;
×
130
    }
UNCOV
131
    totalVnodes++;
×
132
  }
133

UNCOV
134
  pInfo->vstat.totalVnodes = totalVnodes;
×
UNCOV
135
  pInfo->vstat.masterNum = masterNum;
×
UNCOV
136
  pInfo->vstat.numOfSelectReqs = numOfSelectReqs;
×
UNCOV
137
  pInfo->vstat.numOfInsertReqs = numOfInsertReqs;                          // delta
×
UNCOV
138
  pInfo->vstat.numOfInsertSuccessReqs = numOfInsertSuccessReqs;            // delta
×
UNCOV
139
  pInfo->vstat.numOfBatchInsertReqs = numOfBatchInsertReqs;                // delta
×
UNCOV
140
  pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;  // delta
×
UNCOV
141
  pMgmt->state.totalVnodes = totalVnodes;
×
UNCOV
142
  pMgmt->state.masterNum = masterNum;
×
UNCOV
143
  pMgmt->state.numOfSelectReqs = numOfSelectReqs;
×
UNCOV
144
  pMgmt->state.numOfInsertReqs = numOfInsertReqs;
×
UNCOV
145
  pMgmt->state.numOfInsertSuccessReqs = numOfInsertSuccessReqs;
×
UNCOV
146
  pMgmt->state.numOfBatchInsertReqs = numOfBatchInsertReqs;
×
UNCOV
147
  pMgmt->state.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;
×
148

UNCOV
149
  if (tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs) != 0) {
×
150
    dError("failed to get tfs monitor info");
×
151
  }
UNCOV
152
  taosArrayDestroy(pVloads);
×
153
}
154

UNCOV
155
void vmCleanExpriedSamples(SVnodeMgmt *pMgmt) {
×
UNCOV
156
  int list_size = taos_counter_get_keys_size(tsInsertCounter);
×
UNCOV
157
  if (list_size == 0) return;
×
158
  int32_t *vgroup_ids;
×
159
  char   **keys;
×
160
  int      r = 0;
×
161
  r = taos_counter_get_vgroup_ids(tsInsertCounter, &keys, &vgroup_ids, &list_size);
×
162
  if (r) {
×
163
    dError("failed to get vgroup ids");
×
164
    return;
×
165
  }
166
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
167
  for (int i = 0; i < list_size; i++) {
×
168
    int32_t vgroup_id = vgroup_ids[i];
×
169
    void   *vnode = taosHashGet(pMgmt->runngingHash, &vgroup_id, sizeof(int32_t));
×
170
    if (vnode == NULL) {
×
171
      r = taos_counter_delete(tsInsertCounter, keys[i]);
×
172
      if (r) {
×
173
        dError("failed to delete monitor sample key:%s", keys[i]);
×
174
      }
175
    }
176
  }
177
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
178
  if (vgroup_ids) taosMemoryFree(vgroup_ids);
×
179
  if (keys) taosMemoryFree(keys);
×
180
  return;
×
181
}
182

183
void vmCleanExpiredMetrics(SVnodeMgmt *pMgmt) {
×
184
  if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0 || !tsEnableMetrics) {
×
185
    return;
×
186
  }
187

188
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
189
  void     *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
190
  SHashObj *pValidVgroups = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
×
191
  if (pValidVgroups == NULL) {
×
192
    (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
193
    return;
×
194
  }
195

196
  while (pIter != NULL) {
×
197
    SVnodeObj **ppVnode = pIter;
×
198
    if (ppVnode && *ppVnode) {
×
199
      int32_t vgId = (*ppVnode)->vgId;
×
200
      char    dummy = 1;  // hash table value (we only care about the key)
×
201
      if (taosHashPut(pValidVgroups, &vgId, sizeof(int32_t), &dummy, sizeof(char)) != 0) {
×
202
        dError("failed to put vgId:%d to valid vgroups hash", vgId);
×
203
      }
204
    }
205
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
206
  }
207
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
208

209
  // Clean expired metrics by removing metrics for non-existent vgroups
210
  int32_t code = cleanupExpiredMetrics(pValidVgroups);
×
211
  if (code != TSDB_CODE_SUCCESS) {
×
212
    dError("failed to clean expired metrics, code:%d", code);
×
213
  }
214

215
  taosHashCleanup(pValidVgroups);
×
216
}
217

218
static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
578,010✔
219
  memcpy(pCfg, &vnodeCfgDefault, sizeof(SVnodeCfg));
578,010!
220

221
  pCfg->vgId = pCreate->vgId;
578,010✔
222
  tstrncpy(pCfg->dbname, pCreate->db, sizeof(pCfg->dbname));
577,201!
223
  pCfg->dbId = pCreate->dbUid;
578,010✔
224
  pCfg->szPage = pCreate->pageSize * 1024;
578,010✔
225
  pCfg->szCache = pCreate->pages;
578,204✔
226
  pCfg->cacheLast = pCreate->cacheLast;
578,010✔
227
  pCfg->cacheLastSize = pCreate->cacheLastSize;
577,685✔
228
  pCfg->szBuf = (uint64_t)pCreate->buffer * 1024 * 1024;
577,262✔
229
  pCfg->isWeak = true;
577,380✔
230
  pCfg->isTsma = pCreate->isTsma;
577,610✔
231
  pCfg->tsdbCfg.compression = pCreate->compression;
578,204✔
232
  pCfg->tsdbCfg.precision = pCreate->precision;
577,951✔
233
  pCfg->tsdbCfg.days = pCreate->daysPerFile;
578,010✔
234
  pCfg->tsdbCfg.keep0 = pCreate->daysToKeep0;
578,010✔
235
  pCfg->tsdbCfg.keep1 = pCreate->daysToKeep1;
578,263✔
236
  pCfg->tsdbCfg.keep2 = pCreate->daysToKeep2;
577,951✔
237
  pCfg->tsdbCfg.keepTimeOffset = pCreate->keepTimeOffset;
577,974✔
238
  pCfg->tsdbCfg.minRows = pCreate->minRows;
577,519✔
239
  pCfg->tsdbCfg.maxRows = pCreate->maxRows;
578,010✔
240
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
241
  pCfg->tsdbCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
576,826✔
242
  if (pCfg->tsdbCfg.encryptAlgorithm == DND_CA_SM4) {
576,924✔
243
    tstrncpy(pCfg->tsdbCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
758!
244
  }
245
#else
246
  pCfg->tsdbCfg.encryptAlgorithm = 0;
247
#endif
248

249
  pCfg->walCfg.vgId = pCreate->vgId;  // pCreate->mountVgId ? pCreate->mountVgId : pCreate->vgId;
577,363✔
250
  pCfg->walCfg.fsyncPeriod = pCreate->walFsyncPeriod;
577,102✔
251
  pCfg->walCfg.retentionPeriod = pCreate->walRetentionPeriod;
576,995✔
252
  pCfg->walCfg.rollPeriod = pCreate->walRollPeriod;
575,517✔
253
  pCfg->walCfg.retentionSize = pCreate->walRetentionSize;
576,928✔
254
  pCfg->walCfg.segSize = pCreate->walSegmentSize;
576,118✔
255
  pCfg->walCfg.level = pCreate->walLevel;
576,243✔
256
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
257
  pCfg->walCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
576,646✔
258
  if (pCfg->walCfg.encryptAlgorithm == DND_CA_SM4) {
576,857✔
259
    tstrncpy(pCfg->walCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
758!
260
  }
261
#else
262
  pCfg->walCfg.encryptAlgorithm = 0;
263
#endif
264

265
#if defined(TD_ENTERPRISE) || defined(TD_ASTRA_TODO)
266
  pCfg->tdbEncryptAlgorithm = pCreate->encryptAlgorithm;
576,635✔
267
  if (pCfg->tdbEncryptAlgorithm == DND_CA_SM4) {
575,838✔
268
    tstrncpy(pCfg->tdbEncryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
758!
269
  }
270
#else
271
  pCfg->tdbEncryptAlgorithm = 0;
272
#endif
273

274
  pCfg->sttTrigger = pCreate->sstTrigger;
575,533✔
275
  pCfg->hashBegin = pCreate->hashBegin;
575,877✔
276
  pCfg->hashEnd = pCreate->hashEnd;
574,575✔
277
  pCfg->hashMethod = pCreate->hashMethod;
575,310✔
278
  pCfg->hashPrefix = pCreate->hashPrefix;
574,393✔
279
  pCfg->hashSuffix = pCreate->hashSuffix;
575,271✔
280
  pCfg->tsdbPageSize = pCreate->tsdbPageSize * 1024;
575,945✔
281

282
  pCfg->ssChunkSize = pCreate->ssChunkSize;
575,032✔
283
  pCfg->ssKeepLocal = pCreate->ssKeepLocal;
574,463✔
284
  pCfg->ssCompact = pCreate->ssCompact;
575,760✔
285

286
  pCfg->standby = 0;
575,056✔
287
  pCfg->syncCfg.replicaNum = 0;
575,491✔
288
  pCfg->syncCfg.totalReplicaNum = 0;
575,597✔
289
  pCfg->syncCfg.changeVersion = pCreate->changeVersion;
576,489✔
290

291
  memset(&pCfg->syncCfg.nodeInfo, 0, sizeof(pCfg->syncCfg.nodeInfo));
577,042!
292
  for (int32_t i = 0; i < pCreate->replica; ++i) {
1,432,291✔
293
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
854,750✔
294
    pNode->nodeId = pCreate->replicas[pCfg->syncCfg.replicaNum].id;
854,538✔
295
    pNode->nodePort = pCreate->replicas[pCfg->syncCfg.replicaNum].port;
854,206✔
296
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
853,017✔
297
    tstrncpy(pNode->nodeFqdn, pCreate->replicas[pCfg->syncCfg.replicaNum].fqdn, TSDB_FQDN_LEN);
853,338!
298
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
855,100✔
299
    pCfg->syncCfg.replicaNum++;
855,980✔
300
  }
301
  if (pCreate->selfIndex != -1) {
577,486✔
302
    pCfg->syncCfg.myIndex = pCreate->selfIndex;
573,468✔
303
  }
304
  for (int32_t i = pCfg->syncCfg.replicaNum; i < pCreate->replica + pCreate->learnerReplica; ++i) {
579,615✔
305
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
3,752✔
306
    pNode->nodeId = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].id;
3,752✔
307
    pNode->nodePort = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].port;
3,752✔
308
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
3,752✔
309
    tstrncpy(pNode->nodeFqdn, pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].fqdn, TSDB_FQDN_LEN);
3,752!
310
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
3,752✔
311
    pCfg->syncCfg.totalReplicaNum++;
3,752✔
312
  }
313
  pCfg->syncCfg.totalReplicaNum += pCfg->syncCfg.replicaNum;
576,899✔
314
  if (pCreate->learnerSelfIndex != -1) {
574,396✔
315
    pCfg->syncCfg.myIndex = pCreate->replica + pCreate->learnerSelfIndex;
3,752✔
316
  }
317
}
574,157✔
318

319
static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) {
574,602✔
320
  pCfg->vgId = pCreate->vgId;
574,602✔
321
  pCfg->vgVersion = pCreate->vgVersion;
573,122✔
322
  pCfg->dropped = 0;
574,817✔
323
  snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCreate->vgId);
574,543!
324
}
576,863✔
325

326
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
573,848✔
327
  SCreateVnodeReq req = {0};
573,848✔
328
  SVnodeCfg       vnodeCfg = {0};
578,263✔
329
  SWrapperCfg     wrapperCfg = {0};
576,997✔
330
  int32_t         code = -1;
577,438✔
331
  char            path[TSDB_FILENAME_LEN] = {0};
577,438✔
332

333
  if (tDeserializeSCreateVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
577,730!
334
    return TSDB_CODE_INVALID_MSG;
×
335
  }
336

337
  if (req.learnerReplica == 0) {
578,136✔
338
    req.learnerSelfIndex = -1;
574,384✔
339
  }
340

341
  dInfo(
578,136!
342
      "vgId:%d, vnode management handle msgType:%s, start to create vnode, page:%d pageSize:%d buffer:%d szPage:%d "
343
      "szBuf:%" PRIu64 ", cacheLast:%d cacheLastSize:%d sstTrigger:%d tsdbPageSize:%d %d dbname:%s dbId:%" PRId64
344
      ", days:%d keep0:%d keep1:%d keep2:%d keepTimeOffset%d ssChunkSize:%d ssKeepLocal:%d ssCompact:%d tsma:%d "
345
      "precision:%d compression:%d minRows:%d maxRows:%d"
346
      ", wal fsync:%d level:%d retentionPeriod:%d retentionSize:%" PRId64 " rollPeriod:%d segSize:%" PRId64
347
      ", hash method:%d begin:%u end:%u prefix:%d surfix:%d replica:%d selfIndex:%d "
348
      "learnerReplica:%d learnerSelfIndex:%d strict:%d changeVersion:%d encryptAlgorithm:%d",
349
      req.vgId, TMSG_INFO(pMsg->msgType), req.pages, req.pageSize, req.buffer, req.pageSize * 1024,
350
      (uint64_t)req.buffer * 1024 * 1024, req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize,
351
      req.tsdbPageSize * 1024, req.db, req.dbUid, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
352
      req.keepTimeOffset, req.ssChunkSize, req.ssKeepLocal, req.ssCompact, req.isTsma, req.precision, req.compression,
353
      req.minRows, req.maxRows, req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize,
354
      req.walRollPeriod, req.walSegmentSize, req.hashMethod, req.hashBegin, req.hashEnd, req.hashPrefix, req.hashSuffix,
355
      req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict, req.changeVersion,
356
      req.encryptAlgorithm);
357

358
  for (int32_t i = 0; i < req.replica; ++i) {
1,435,020✔
359
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port,
856,757!
360
          req.replicas[i].id);
361
  }
362
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
582,015✔
363
    dInfo("vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.vgId, i, req.learnerReplicas[i].fqdn,
3,752!
364
          req.learnerReplicas[i].port, req.replicas[i].id);
365
  }
366

367
  SReplica *pReplica = NULL;
578,263✔
368
  if (req.selfIndex != -1) {
578,263✔
369
    pReplica = &req.replicas[req.selfIndex];
574,511✔
370
  } else {
371
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
3,752✔
372
  }
373
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
578,263!
374
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
578,010!
375
    (void)tFreeSCreateVnodeReq(&req);
253✔
376

377
    code = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
378
    dError("vgId:%d, dnodeId:%d ep:%s:%u in request, ep:%s:%u in local, %s", req.vgId, pReplica->id,
×
379
           pReplica->fqdn, pReplica->port, tsLocalFqdn, tsServerPort, tstrerror(code));
380
    return code;
×
381
  }
382

383
  if (req.encryptAlgorithm == DND_CA_SM4) {
578,010✔
384
    if (strlen(tsEncryptKey) == 0) {
758!
385
      (void)tFreeSCreateVnodeReq(&req);
×
386
      code = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
387
      dError("vgId:%d, failed to create vnode since encrypt key is empty, reason:%s", req.vgId, tstrerror(code));
×
388
      return code;
×
389
    }
390
  }
391

392
  vmGenerateVnodeCfg(&req, &vnodeCfg);
578,010✔
393

394
  vmGenerateWrapperCfg(pMgmt, &req, &wrapperCfg);
575,213✔
395

396
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, req.vgId, false);
577,863✔
397
  if (pVnode != NULL && (req.replica == 1 || !pVnode->failed)) {
578,061!
398
    dError("vgId:%d, already exist", req.vgId);
18,046!
399
    (void)tFreeSCreateVnodeReq(&req);
18,046✔
400
    vmReleaseVnode(pMgmt, pVnode);
18,046✔
401
    code = TSDB_CODE_VND_ALREADY_EXIST;
18,046✔
402
    return 0;
18,046✔
403
  }
404

405
  int32_t diskPrimary = vmGetPrimaryDisk(pMgmt, vnodeCfg.vgId);
560,015✔
406
  if (diskPrimary < 0) {
559,510!
407
    diskPrimary = vmAllocPrimaryDisk(pMgmt, vnodeCfg.vgId);
559,712✔
408
  }
409
  wrapperCfg.diskPrimary = diskPrimary;
560,015✔
410

411
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
560,015✔
412

413
  if ((code = vnodeCreate(path, &vnodeCfg, diskPrimary, pMgmt->pTfs)) < 0) {
560,015!
414
    dError("vgId:%d, failed to create vnode since %s", req.vgId, tstrerror(code));
×
415
    vmReleaseVnode(pMgmt, pVnode);
×
416
    vmCleanPrimaryDisk(pMgmt, req.vgId);
×
417
    (void)tFreeSCreateVnodeReq(&req);
×
418
    return code;
×
419
  }
420

421
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, true);
560,217✔
422
  if (pImpl == NULL) {
560,217!
423
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
424
    code = terrno != 0 ? terrno : -1;
×
425
    goto _OVER;
×
426
  }
427

428
  code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl);
560,217✔
429
  if (code != 0) {
560,217!
430
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
431
    code = terrno != 0 ? terrno : code;
×
432
    goto _OVER;
×
433
  }
434

435
  code = vnodeStart(pImpl);
560,217✔
436
  if (code != 0) {
560,217!
437
    dError("vgId:%d, failed to start sync since %s", req.vgId, terrstr());
×
438
    goto _OVER;
×
439
  }
440

441
  code = vmWriteVnodeListToFile(pMgmt);
560,217✔
442
  if (code != 0) {
560,217!
443
    code = terrno != 0 ? terrno : code;
×
444
    goto _OVER;
×
445
  }
446

447
_OVER:
560,217✔
448
  vmCleanPrimaryDisk(pMgmt, req.vgId);
560,217✔
449

450
  if (code != 0) {
560,217!
451
    vmCloseFailedVnode(pMgmt, req.vgId);
×
452

453
    vnodeClose(pImpl);
×
454
    vnodeDestroy(0, path, pMgmt->pTfs, 0);
×
455
  } else {
456
    dInfo("vgId:%d, vnode management handle msgType:%s, end to create vnode, vnode is created", req.vgId,
560,217!
457
          TMSG_INFO(pMsg->msgType));
458
  }
459

460
  (void)tFreeSCreateVnodeReq(&req);
560,217✔
461
  terrno = code;
560,217✔
462
  return code;
560,217✔
463
}
464

465
#ifdef USE_MOUNT
466
typedef struct {
467
  int64_t dbId;
468
  int32_t vgId;
469
  int32_t diskPrimary;
470
} SMountDbVgId;
471
extern int32_t vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo);
472
extern int32_t mndFetchSdbStables(const char *mntName, const char *path, void *output);
473

UNCOV
474
static int compareVnodeInfo(const void *p1, const void *p2) {
×
UNCOV
475
  SVnodeInfo *v1 = (SVnodeInfo *)p1;
×
UNCOV
476
  SVnodeInfo *v2 = (SVnodeInfo *)p2;
×
477

UNCOV
478
  if (v1->config.dbId == v2->config.dbId) {
×
UNCOV
479
    if (v1->config.vgId == v2->config.vgId) {
×
480
      return 0;
×
481
    }
UNCOV
482
    return v1->config.vgId > v2->config.vgId ? 1 : -1;
×
483
  }
484

UNCOV
485
  return v1->config.dbId > v2->config.dbId ? 1 : -1;
×
486
}
UNCOV
487
static int compareVgDiskPrimary(const void *p1, const void *p2) {
×
UNCOV
488
  SMountDbVgId *v1 = (SMountDbVgId *)p1;
×
UNCOV
489
  SMountDbVgId *v2 = (SMountDbVgId *)p2;
×
490

UNCOV
491
  if (v1->dbId == v2->dbId) {
×
UNCOV
492
    if (v1->vgId == v2->vgId) {
×
493
      return 0;
×
494
    }
UNCOV
495
    return v1->vgId > v2->vgId ? 1 : -1;
×
496
  }
497

UNCOV
498
  return v1->dbId > v2->dbId ? 1 : -1;
×
499
}
500

UNCOV
501
static int32_t vmRetrieveMountDnode(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
×
UNCOV
502
  int32_t   code = 0, lino = 0;
×
UNCOV
503
  TdFilePtr pFile = NULL;
×
UNCOV
504
  char     *content = NULL;
×
UNCOV
505
  SJson    *pJson = NULL;
×
UNCOV
506
  int64_t   size = 0;
×
UNCOV
507
  int64_t   clusterId = 0, dropped = 0, encryptScope = 0;
×
UNCOV
508
  char      file[TSDB_MOUNT_FPATH_LEN] = {0};
×
UNCOV
509
  SArray   *pDisks = NULL;
×
510
  // step 1: fetch clusterId from dnode.json
UNCOV
511
  (void)snprintf(file, sizeof(file), "%s%s%s%sdnode.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP);
×
UNCOV
512
  TAOS_CHECK_EXIT(taosStatFile(file, &size, NULL, NULL));
×
UNCOV
513
  TSDB_CHECK_NULL((pFile = taosOpenFile(file, TD_FILE_READ)), code, lino, _exit, terrno);
×
UNCOV
514
  TSDB_CHECK_NULL((content = taosMemoryMalloc(size + 1)), code, lino, _exit, terrno);
×
UNCOV
515
  if (taosReadFile(pFile, content, size) != size) {
×
516
    TAOS_CHECK_EXIT(terrno);
×
517
  }
UNCOV
518
  content[size] = '\0';
×
UNCOV
519
  pJson = tjsonParse(content);
×
UNCOV
520
  if (pJson == NULL) {
×
521
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
522
  }
UNCOV
523
  tjsonGetNumberValue(pJson, "dropped", dropped, code);
×
UNCOV
524
  TAOS_CHECK_EXIT(code);
×
UNCOV
525
  if (dropped == 1) {
×
526
    TAOS_CHECK_EXIT(TSDB_CODE_MND_MOUNT_DNODE_DROPPED);
×
527
  }
UNCOV
528
  tjsonGetNumberValue(pJson, "encryptScope", encryptScope, code);
×
UNCOV
529
  TAOS_CHECK_EXIT(code);
×
UNCOV
530
  if (encryptScope != 0) {
×
531
    TAOS_CHECK_EXIT(TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG);
×
532
  }
UNCOV
533
  tjsonGetNumberValue(pJson, "clusterId", clusterId, code);
×
UNCOV
534
  TAOS_CHECK_EXIT(code);
×
UNCOV
535
  if (clusterId == 0) {
×
536
    TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_CLUSTER_ID);
×
537
  }
UNCOV
538
  pMountInfo->clusterId = clusterId;
×
UNCOV
539
  if (content != NULL) taosMemoryFreeClear(content);
×
UNCOV
540
  if (pJson != NULL) {
×
UNCOV
541
    cJSON_Delete(pJson);
×
UNCOV
542
    pJson = NULL;
×
543
  }
UNCOV
544
  if (pFile != NULL) taosCloseFile(&pFile);
×
545
  // step 2: fetch dataDir from dnode/config/local.json
UNCOV
546
  TAOS_CHECK_EXIT(vmGetMountDisks(pMgmt, pReq->mountPath, &pDisks));
×
UNCOV
547
  int32_t nDisks = taosArrayGetSize(pDisks);
×
UNCOV
548
  if (nDisks < 1 || nDisks > TFS_MAX_DISKS) {
×
549
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
550
  }
UNCOV
551
  for (int32_t i = 0; i < nDisks; ++i) {
×
UNCOV
552
    SDiskCfg *pDisk = TARRAY_GET_ELEM(pDisks, i);
×
UNCOV
553
    if (!pMountInfo->pDisks[pDisk->level]) {
×
UNCOV
554
      pMountInfo->pDisks[pDisk->level] = taosArrayInit(1, sizeof(char *));
×
UNCOV
555
      if (!pMountInfo->pDisks[pDisk->level]) {
×
556
        TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
557
      }
558
    }
UNCOV
559
    char *pDir = taosStrdup(pDisk->dir);
×
UNCOV
560
    if (pDir == NULL) {
×
561
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
562
    }
UNCOV
563
    if (pDisk->primary == 1 && taosArrayGetSize(pMountInfo->pDisks[0])) {
×
564
      // put the primary disk to the first position of level 0
UNCOV
565
      if (!taosArrayInsert(pMountInfo->pDisks[0], 0, &pDir)) {
×
566
        taosMemFree(pDir);
×
567
        TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
568
      }
UNCOV
569
    } else if (!taosArrayPush(pMountInfo->pDisks[pDisk->level], &pDir)) {
×
570
      taosMemFree(pDir);
×
571
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
572
    }
573
  }
UNCOV
574
  for (int32_t i = 0; i < TFS_MAX_TIERS; ++i) {
×
UNCOV
575
    int32_t nDisk = taosArrayGetSize(pMountInfo->pDisks[i]);
×
UNCOV
576
    if (nDisk < (i == 0 ? 1 : 0) || nDisk > TFS_MAX_DISKS_PER_TIER) {
×
577
      dError("mount:%s, invalid disk number:%d at level:%d", pReq->mountName, nDisk, i);
×
578
      TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
579
    }
580
  }
UNCOV
581
_exit:
×
UNCOV
582
  if (content != NULL) taosMemoryFreeClear(content);
×
UNCOV
583
  if (pJson != NULL) cJSON_Delete(pJson);
×
UNCOV
584
  if (pFile != NULL) taosCloseFile(&pFile);
×
UNCOV
585
  if (pDisks != NULL) {
×
UNCOV
586
    taosArrayDestroy(pDisks);
×
UNCOV
587
    pDisks = NULL;
×
588
  }
UNCOV
589
  if (code != 0) {
×
590
    dError("mount:%s, failed to retrieve mount dnode at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
591
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
592
  } else {
UNCOV
593
    dInfo("mount:%s, success to retrieve mount dnode on dnode:%d, clusterId:%" PRId64 ", path:%s", pReq->mountName,
×
594
          pReq->dnodeId, pMountInfo->clusterId, pReq->mountPath);
595
  }
UNCOV
596
  TAOS_RETURN(code);
×
597
}
598

UNCOV
599
static int32_t vmRetrieveMountVnodes(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
×
UNCOV
600
  int32_t       code = 0, lino = 0;
×
UNCOV
601
  SWrapperCfg  *pCfgs = NULL;
×
UNCOV
602
  int32_t       numOfVnodes = 0;
×
UNCOV
603
  char          path[TSDB_MOUNT_FPATH_LEN] = {0};
×
UNCOV
604
  TdDirPtr      pDir = NULL;
×
UNCOV
605
  TdDirEntryPtr de = NULL;
×
UNCOV
606
  SVnodeMgmt    vnodeMgmt = {0};
×
UNCOV
607
  SArray       *pVgCfgs = NULL;
×
UNCOV
608
  SArray       *pDbInfos = NULL;
×
UNCOV
609
  SArray       *pDiskPrimarys = NULL;
×
610

UNCOV
611
  snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
×
UNCOV
612
  vnodeMgmt.path = path;
×
UNCOV
613
  TAOS_CHECK_EXIT(vmGetVnodeListFromFile(&vnodeMgmt, &pCfgs, &numOfVnodes));
×
UNCOV
614
  dInfo("mount:%s, num of vnodes is %d in path:%s", pReq->mountName, numOfVnodes, vnodeMgmt.path);
×
UNCOV
615
  TSDB_CHECK_NULL((pVgCfgs = taosArrayInit_s(sizeof(SVnodeInfo), numOfVnodes)), code, lino, _exit, terrno);
×
UNCOV
616
  TSDB_CHECK_NULL((pDiskPrimarys = taosArrayInit(numOfVnodes, sizeof(SMountDbVgId))), code, lino, _exit, terrno);
×
617

UNCOV
618
  int32_t nDiskLevel0 = taosArrayGetSize(pMountInfo->pDisks[0]);
×
UNCOV
619
  int32_t nVgDropped = 0, j = 0;
×
UNCOV
620
  for (int32_t i = 0; i < numOfVnodes; ++i) {
×
UNCOV
621
    SWrapperCfg *pCfg = &pCfgs[i];
×
622
    // in order to support multi-tier disk, the pCfg->path should be adapted according to the diskPrimary firstly
UNCOV
623
    if (nDiskLevel0 > 1) {
×
UNCOV
624
      char *pDir = taosArrayGet(pMountInfo->pDisks[0], pCfg->diskPrimary);
×
UNCOV
625
      if (!pDir) TAOS_CHECK_EXIT(TSDB_CODE_INTERNAL_ERROR);
×
UNCOV
626
      (void)snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%svnode%d", *(char **)pDir, TD_DIRSEP, TD_DIRSEP,
×
627
                     pCfg->vgId);
628
    }
UNCOV
629
    dInfo("mount:%s, vnode path:%s, dropped:%" PRIi8, pReq->mountName, pCfg->path, pCfg->dropped);
×
UNCOV
630
    if (pCfg->dropped) {
×
631
      ++nVgDropped;
×
632
      continue;
×
633
    }
UNCOV
634
    if (!taosCheckAccessFile(pCfg->path, TD_FILE_ACCESS_EXIST_OK | TD_FILE_ACCESS_READ_OK | TD_FILE_ACCESS_WRITE_OK)) {
×
635
      dError("mount:%s, vnode path:%s, no r/w authority", pReq->mountName, pCfg->path);
×
636
      TAOS_CHECK_EXIT(TSDB_CODE_MND_NO_RIGHTS);
×
637
    }
UNCOV
638
    SVnodeInfo *pInfo = TARRAY_GET_ELEM(pVgCfgs, j++);
×
UNCOV
639
    TAOS_CHECK_EXIT(vnodeLoadInfo(pCfg->path, pInfo));
×
UNCOV
640
    if (pInfo->config.syncCfg.replicaNum > 1) {
×
641
      dError("mount:%s, vnode path:%s, invalid replica:%d", pReq->mountName, pCfg->path,
×
642
             pInfo->config.syncCfg.replicaNum);
643
      TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_REPLICA);
×
UNCOV
644
    } else if (pInfo->config.vgId != pCfg->vgId) {
×
645
      dError("mount:%s, vnode path:%s, vgId:%d not match:%d", pReq->mountName, pCfg->path, pInfo->config.vgId,
×
646
             pCfg->vgId);
647
      TAOS_CHECK_EXIT(TSDB_CODE_FILE_CORRUPTED);
×
UNCOV
648
    } else if (pInfo->config.tdbEncryptAlgorithm || pInfo->config.tsdbCfg.encryptAlgorithm ||
×
UNCOV
649
               pInfo->config.walCfg.encryptAlgorithm) {
×
650
      dError("mount:%s, vnode path:%s, invalid encrypt algorithm, tdb:%d wal:%d tsdb:%d", pReq->mountName, pCfg->path,
×
651
             pInfo->config.tdbEncryptAlgorithm, pInfo->config.walCfg.encryptAlgorithm,
652
             pInfo->config.tsdbCfg.encryptAlgorithm);
653
      TAOS_CHECK_EXIT(TSDB_CODE_DNODE_INVALID_ENCRYPT_CONFIG);
×
654
    }
UNCOV
655
    SMountDbVgId dbVgId = {.dbId = pInfo->config.dbId, .vgId = pInfo->config.vgId, .diskPrimary = pCfg->diskPrimary};
×
UNCOV
656
    TSDB_CHECK_NULL(taosArrayPush(pDiskPrimarys, &dbVgId), code, lino, _exit, terrno);
×
657
  }
UNCOV
658
  if (nVgDropped > 0) {
×
659
    dInfo("mount:%s, %d vnodes are dropped", pReq->mountName, nVgDropped);
×
660
    int32_t nVgToDrop = taosArrayGetSize(pVgCfgs) - nVgDropped;
×
661
    if (nVgToDrop > 0) taosArrayRemoveBatch(pVgCfgs, nVgToDrop - 1, nVgToDrop, NULL);
×
662
  }
UNCOV
663
  int32_t nVgCfg = taosArrayGetSize(pVgCfgs);
×
UNCOV
664
  int32_t nDiskPrimary = taosArrayGetSize(pDiskPrimarys);
×
UNCOV
665
  if (nVgCfg != nDiskPrimary) {
×
666
    dError("mount:%s, nVgCfg:%d not match nDiskPrimary:%d", pReq->mountName, nVgCfg, nDiskPrimary);
×
667
    TAOS_CHECK_EXIT(TSDB_CODE_APP_ERROR);
×
668
  }
UNCOV
669
  if (nVgCfg > 1) {
×
UNCOV
670
    taosArraySort(pVgCfgs, compareVnodeInfo);
×
UNCOV
671
    taosArraySort(pDiskPrimarys, compareVgDiskPrimary);
×
672
  }
673

UNCOV
674
  int64_t clusterId = pMountInfo->clusterId;
×
UNCOV
675
  int64_t dbId = 0, vgId = 0, nDb = 0;
×
UNCOV
676
  for (int32_t i = 0; i < nVgCfg; ++i) {
×
UNCOV
677
    SVnodeInfo *pInfo = TARRAY_GET_ELEM(pVgCfgs, i);
×
UNCOV
678
    if (clusterId != pInfo->config.syncCfg.nodeInfo->clusterId) {
×
UNCOV
679
      dError("mount:%s, clusterId:%" PRId64 " not match:%" PRId64, pReq->mountName, clusterId,
×
680
             pInfo->config.syncCfg.nodeInfo->clusterId);
UNCOV
681
      TAOS_CHECK_EXIT(TSDB_CODE_MND_INVALID_CLUSTER_ID);
×
682
    }
UNCOV
683
    if (dbId != pInfo->config.dbId) {
×
UNCOV
684
      dbId = pInfo->config.dbId;
×
UNCOV
685
      ++nDb;
×
686
    }
UNCOV
687
    if (vgId == pInfo->config.vgId) {
×
688
      TAOS_CHECK_EXIT(TSDB_CODE_FILE_CORRUPTED);
×
689
    } else {
UNCOV
690
      vgId = pInfo->config.vgId;
×
691
    }
692
  }
693

UNCOV
694
  if (nDb > 0) {
×
UNCOV
695
    TSDB_CHECK_NULL((pDbInfos = taosArrayInit_s(sizeof(SMountDbInfo), nDb)), code, lino, _exit, terrno);
×
UNCOV
696
    int32_t dbIdx = -1;
×
UNCOV
697
    for (int32_t i = 0; i < nVgCfg; ++i) {
×
UNCOV
698
      SVnodeInfo   *pVgCfg = TARRAY_GET_ELEM(pVgCfgs, i);
×
UNCOV
699
      SMountDbVgId *pDiskPrimary = TARRAY_GET_ELEM(pDiskPrimarys, i);
×
UNCOV
700
      SMountDbInfo *pDbInfo = NULL;
×
UNCOV
701
      if (i == 0 || ((SMountDbInfo *)TARRAY_GET_ELEM(pDbInfos, dbIdx))->dbId != pVgCfg->config.dbId) {
×
UNCOV
702
        pDbInfo = TARRAY_GET_ELEM(pDbInfos, ++dbIdx);
×
UNCOV
703
        pDbInfo->dbId = pVgCfg->config.dbId;
×
UNCOV
704
        snprintf(pDbInfo->dbName, sizeof(pDbInfo->dbName), "%s", pVgCfg->config.dbname);
×
UNCOV
705
        TSDB_CHECK_NULL((pDbInfo->pVgs = taosArrayInit(nVgCfg / nDb, sizeof(SMountVgInfo))), code, lino, _exit, terrno);
×
706
      } else {
UNCOV
707
        pDbInfo = TARRAY_GET_ELEM(pDbInfos, dbIdx);
×
708
      }
UNCOV
709
      SMountVgInfo vgInfo = {
×
UNCOV
710
          .diskPrimary = pDiskPrimary->diskPrimary,
×
UNCOV
711
          .vgId = pVgCfg->config.vgId,
×
UNCOV
712
          .dbId = pVgCfg->config.dbId,
×
UNCOV
713
          .cacheLastSize = pVgCfg->config.cacheLastSize,
×
UNCOV
714
          .szPage = pVgCfg->config.szPage,
×
UNCOV
715
          .szCache = pVgCfg->config.szCache,
×
UNCOV
716
          .szBuf = pVgCfg->config.szBuf,
×
UNCOV
717
          .cacheLast = pVgCfg->config.cacheLast,
×
UNCOV
718
          .standby = pVgCfg->config.standby,
×
UNCOV
719
          .hashMethod = pVgCfg->config.hashMethod,
×
UNCOV
720
          .hashBegin = pVgCfg->config.hashBegin,
×
UNCOV
721
          .hashEnd = pVgCfg->config.hashEnd,
×
UNCOV
722
          .hashPrefix = pVgCfg->config.hashPrefix,
×
UNCOV
723
          .hashSuffix = pVgCfg->config.hashSuffix,
×
UNCOV
724
          .sttTrigger = pVgCfg->config.sttTrigger,
×
UNCOV
725
          .replications = pVgCfg->config.syncCfg.replicaNum,
×
UNCOV
726
          .precision = pVgCfg->config.tsdbCfg.precision,
×
UNCOV
727
          .compression = pVgCfg->config.tsdbCfg.compression,
×
UNCOV
728
          .slLevel = pVgCfg->config.tsdbCfg.slLevel,
×
UNCOV
729
          .daysPerFile = pVgCfg->config.tsdbCfg.days,
×
UNCOV
730
          .keep0 = pVgCfg->config.tsdbCfg.keep0,
×
UNCOV
731
          .keep1 = pVgCfg->config.tsdbCfg.keep1,
×
UNCOV
732
          .keep2 = pVgCfg->config.tsdbCfg.keep2,
×
UNCOV
733
          .keepTimeOffset = pVgCfg->config.tsdbCfg.keepTimeOffset,
×
UNCOV
734
          .minRows = pVgCfg->config.tsdbCfg.minRows,
×
UNCOV
735
          .maxRows = pVgCfg->config.tsdbCfg.maxRows,
×
UNCOV
736
          .tsdbPageSize = pVgCfg->config.tsdbPageSize / 1024,
×
UNCOV
737
          .ssChunkSize = pVgCfg->config.ssChunkSize,
×
UNCOV
738
          .ssKeepLocal = pVgCfg->config.ssKeepLocal,
×
UNCOV
739
          .ssCompact = pVgCfg->config.ssCompact,
×
UNCOV
740
          .walFsyncPeriod = pVgCfg->config.walCfg.fsyncPeriod,
×
UNCOV
741
          .walRetentionPeriod = pVgCfg->config.walCfg.retentionPeriod,
×
UNCOV
742
          .walRollPeriod = pVgCfg->config.walCfg.rollPeriod,
×
UNCOV
743
          .walRetentionSize = pVgCfg->config.walCfg.retentionSize,
×
UNCOV
744
          .walSegSize = pVgCfg->config.walCfg.segSize,
×
UNCOV
745
          .walLevel = pVgCfg->config.walCfg.level,
×
UNCOV
746
          .encryptAlgorithm = pVgCfg->config.walCfg.encryptAlgorithm,
×
UNCOV
747
          .committed = pVgCfg->state.committed,
×
UNCOV
748
          .commitID = pVgCfg->state.commitID,
×
UNCOV
749
          .commitTerm = pVgCfg->state.commitTerm,
×
UNCOV
750
          .numOfSTables = pVgCfg->config.vndStats.numOfSTables,
×
UNCOV
751
          .numOfCTables = pVgCfg->config.vndStats.numOfCTables,
×
UNCOV
752
          .numOfNTables = pVgCfg->config.vndStats.numOfNTables,
×
753
      };
UNCOV
754
      TSDB_CHECK_NULL(taosArrayPush(pDbInfo->pVgs, &vgInfo), code, lino, _exit, terrno);
×
755
    }
756
  }
757

UNCOV
758
  pMountInfo->pDbs = pDbInfos;
×
759

UNCOV
760
_exit:
×
UNCOV
761
  if (code != 0) {
×
UNCOV
762
    dError("mount:%s, failed to retrieve mount vnode at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
763
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
764
  }
UNCOV
765
  taosArrayDestroy(pDiskPrimarys);
×
UNCOV
766
  taosArrayDestroy(pVgCfgs);
×
UNCOV
767
  taosMemoryFreeClear(pCfgs);
×
UNCOV
768
  TAOS_RETURN(code);
×
769
}
770

771
/**
772
 *   Retrieve the stables from vnode meta.
773
 */
UNCOV
774
static int32_t vmRetrieveMountStbs(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
×
UNCOV
775
  int32_t code = 0, lino = 0;
×
UNCOV
776
  char    path[TSDB_MOUNT_FPATH_LEN] = {0};
×
UNCOV
777
  int32_t nDb = taosArrayGetSize(pMountInfo->pDbs);
×
UNCOV
778
  SArray *suidList = NULL;
×
UNCOV
779
  SArray *pCols = NULL;
×
UNCOV
780
  SArray *pTags = NULL;
×
UNCOV
781
  SArray *pColExts = NULL;
×
UNCOV
782
  SArray *pTagExts = NULL;
×
783

UNCOV
784
  snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
×
UNCOV
785
  for (int32_t i = 0; i < nDb; ++i) {
×
UNCOV
786
    SMountDbInfo *pDbInfo = TARRAY_GET_ELEM(pMountInfo->pDbs, i);
×
UNCOV
787
    int32_t       nVg = taosArrayGetSize(pDbInfo->pVgs);
×
UNCOV
788
    for (int32_t j = 0; j < nVg; ++j) {
×
UNCOV
789
      SMountVgInfo *pVgInfo = TARRAY_GET_ELEM(pDbInfo->pVgs, j);
×
UNCOV
790
      SVnode        vnode = {
×
UNCOV
791
                 .config.vgId = pVgInfo->vgId,
×
UNCOV
792
                 .config.dbId = pVgInfo->dbId,
×
UNCOV
793
                 .config.cacheLastSize = pVgInfo->cacheLastSize,
×
UNCOV
794
                 .config.szPage = pVgInfo->szPage,
×
UNCOV
795
                 .config.szCache = pVgInfo->szCache,
×
UNCOV
796
                 .config.szBuf = pVgInfo->szBuf,
×
UNCOV
797
                 .config.cacheLast = pVgInfo->cacheLast,
×
UNCOV
798
                 .config.standby = pVgInfo->standby,
×
UNCOV
799
                 .config.hashMethod = pVgInfo->hashMethod,
×
UNCOV
800
                 .config.hashBegin = pVgInfo->hashBegin,
×
UNCOV
801
                 .config.hashEnd = pVgInfo->hashEnd,
×
UNCOV
802
                 .config.hashPrefix = pVgInfo->hashPrefix,
×
UNCOV
803
                 .config.hashSuffix = pVgInfo->hashSuffix,
×
UNCOV
804
                 .config.sttTrigger = pVgInfo->sttTrigger,
×
UNCOV
805
                 .config.syncCfg.replicaNum = pVgInfo->replications,
×
UNCOV
806
                 .config.tsdbCfg.precision = pVgInfo->precision,
×
UNCOV
807
                 .config.tsdbCfg.compression = pVgInfo->compression,
×
UNCOV
808
                 .config.tsdbCfg.slLevel = pVgInfo->slLevel,
×
UNCOV
809
                 .config.tsdbCfg.days = pVgInfo->daysPerFile,
×
UNCOV
810
                 .config.tsdbCfg.keep0 = pVgInfo->keep0,
×
UNCOV
811
                 .config.tsdbCfg.keep1 = pVgInfo->keep1,
×
UNCOV
812
                 .config.tsdbCfg.keep2 = pVgInfo->keep2,
×
UNCOV
813
                 .config.tsdbCfg.keepTimeOffset = pVgInfo->keepTimeOffset,
×
UNCOV
814
                 .config.tsdbCfg.minRows = pVgInfo->minRows,
×
UNCOV
815
                 .config.tsdbCfg.maxRows = pVgInfo->maxRows,
×
UNCOV
816
                 .config.tsdbPageSize = pVgInfo->tsdbPageSize,
×
UNCOV
817
                 .config.ssChunkSize = pVgInfo->ssChunkSize,
×
UNCOV
818
                 .config.ssKeepLocal = pVgInfo->ssKeepLocal,
×
UNCOV
819
                 .config.ssCompact = pVgInfo->ssCompact,
×
UNCOV
820
                 .config.walCfg.fsyncPeriod = pVgInfo->walFsyncPeriod,
×
UNCOV
821
                 .config.walCfg.retentionPeriod = pVgInfo->walRetentionPeriod,
×
UNCOV
822
                 .config.walCfg.rollPeriod = pVgInfo->walRollPeriod,
×
UNCOV
823
                 .config.walCfg.retentionSize = pVgInfo->walRetentionSize,
×
UNCOV
824
                 .config.walCfg.segSize = pVgInfo->walSegSize,
×
UNCOV
825
                 .config.walCfg.level = pVgInfo->walLevel,
×
UNCOV
826
                 .config.walCfg.encryptAlgorithm = pVgInfo->encryptAlgorithm,
×
UNCOV
827
                 .diskPrimary = pVgInfo->diskPrimary,
×
828
      };
UNCOV
829
      void *vnodePath = taosArrayGet(pMountInfo->pDisks[0], pVgInfo->diskPrimary);
×
UNCOV
830
      snprintf(path, sizeof(path), "%s%s%s%svnode%d", *(char **)vnodePath, TD_DIRSEP, dmNodeName(VNODE), TD_DIRSEP,
×
831
               pVgInfo->vgId);
UNCOV
832
      vnode.path = path;
×
833

UNCOV
834
      int32_t rollback = vnodeShouldRollback(&vnode);
×
UNCOV
835
      if ((code = metaOpen(&vnode, &vnode.pMeta, rollback)) != 0) {
×
836
        dError("mount:%s, failed to retrieve stbs of vnode:%d for db:%" PRId64 " on dnode:%d since %s, path:%s",
×
837
               pReq->mountName, pVgInfo->vgId, pVgInfo->dbId, pReq->dnodeId, tstrerror(code), path);
838
        TAOS_CHECK_EXIT(code);
×
839
      } else {
UNCOV
840
        dInfo("mount:%s, success to retrieve stbs of vnode:%d for db:%" PRId64 " on dnode:%d, path:%s", pReq->mountName,
×
841
              pVgInfo->vgId, pVgInfo->dbId, pReq->dnodeId, path);
842

UNCOV
843
        SMetaReader mr = {0};
×
UNCOV
844
        tb_uid_t    suid = 0;
×
UNCOV
845
        SMeta      *pMeta = vnode.pMeta;
×
846

UNCOV
847
        metaReaderDoInit(&mr, pMeta, META_READER_LOCK);
×
UNCOV
848
        if (!suidList && !(suidList = taosArrayInit(1, sizeof(tb_uid_t)))) {
×
849
          TSDB_CHECK_CODE(terrno, lino, _exit0);
×
850
        }
UNCOV
851
        taosArrayClear(suidList);
×
UNCOV
852
        TSDB_CHECK_CODE(vnodeGetStbIdList(&vnode, 0, suidList), lino, _exit0);
×
UNCOV
853
        dInfo("mount:%s, vnode:%d, db:%" PRId64 ", stbs num:%d on dnode:%d", pReq->mountName, pVgInfo->vgId,
×
854
              pVgInfo->dbId, (int32_t)taosArrayGetSize(suidList), pReq->dnodeId);
UNCOV
855
        int32_t nStbs = taosArrayGetSize(suidList);
×
UNCOV
856
        if (!pDbInfo->pStbs && !(pDbInfo->pStbs = taosArrayInit(nStbs, sizeof(void *)))) {
×
857
          TSDB_CHECK_CODE(terrno, lino, _exit0);
×
858
        }
UNCOV
859
        for (int32_t i = 0; i < nStbs; ++i) {
×
UNCOV
860
          suid = *(tb_uid_t *)taosArrayGet(suidList, i);
×
UNCOV
861
          dInfo("mount:%s, vnode:%d, db:%" PRId64 ", stb suid:%" PRIu64 " on dnode:%d", pReq->mountName, pVgInfo->vgId,
×
862
                pVgInfo->dbId, suid, pReq->dnodeId);
UNCOV
863
          if ((code = metaReaderGetTableEntryByUidCache(&mr, suid)) < 0) {
×
864
            TSDB_CHECK_CODE(code, lino, _exit0);
×
865
          }
UNCOV
866
          if (mr.me.uid != suid || mr.me.type != TSDB_SUPER_TABLE ||
×
UNCOV
867
              mr.me.colCmpr.nCols != mr.me.stbEntry.schemaRow.nCols) {
×
868
            dError("mount:%s, vnode:%d, db:%" PRId64 ", stb info not match, suid:%" PRIu64 " expected:%" PRIu64
×
869
                   ", type:%" PRIi8 " expected:%d, nCmprCols:%d nCols:%d on dnode:%d",
870
                   pReq->mountName, pVgInfo->vgId, pVgInfo->dbId, mr.me.uid, suid, mr.me.type, TSDB_SUPER_TABLE,
871
                   mr.me.colCmpr.nCols, mr.me.stbEntry.schemaRow.nCols, pReq->dnodeId);
872
            TSDB_CHECK_CODE(TSDB_CODE_FILE_CORRUPTED, lino, _exit0);
×
873
          }
UNCOV
874
          SMountStbInfo stbInfo = {
×
875
              .req.source = TD_REQ_FROM_APP,
876
              .req.suid = suid,
UNCOV
877
              .req.colVer = mr.me.stbEntry.schemaRow.version,
×
UNCOV
878
              .req.tagVer = mr.me.stbEntry.schemaTag.version,
×
UNCOV
879
              .req.numOfColumns = mr.me.stbEntry.schemaRow.nCols,
×
UNCOV
880
              .req.numOfTags = mr.me.stbEntry.schemaTag.nCols,
×
UNCOV
881
              .req.virtualStb = TABLE_IS_VIRTUAL(mr.me.flags) ? 1 : 0,
×
882
          };
UNCOV
883
          snprintf(stbInfo.req.name, sizeof(stbInfo.req.name), "%s", mr.me.name);
×
UNCOV
884
          if (!pCols && !(pCols = taosArrayInit(stbInfo.req.numOfColumns, sizeof(SFieldWithOptions)))) {
×
885
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
886
          }
UNCOV
887
          if (!pTags && !(pTags = taosArrayInit(stbInfo.req.numOfTags, sizeof(SField)))) {
×
888
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
889
          }
890

UNCOV
891
          if (!pColExts && !(pColExts = taosArrayInit(stbInfo.req.numOfColumns, sizeof(col_id_t)))) {
×
892
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
893
          }
UNCOV
894
          if (!pTagExts && !(pTagExts = taosArrayInit(stbInfo.req.numOfTags, sizeof(col_id_t)))) {
×
895
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
896
          }
UNCOV
897
          taosArrayClear(pCols);
×
UNCOV
898
          taosArrayClear(pTags);
×
UNCOV
899
          taosArrayClear(pColExts);
×
UNCOV
900
          taosArrayClear(pTagExts);
×
UNCOV
901
          stbInfo.req.pColumns = pCols;
×
UNCOV
902
          stbInfo.req.pTags = pTags;
×
UNCOV
903
          stbInfo.pColExts = pColExts;
×
UNCOV
904
          stbInfo.pTagExts = pTagExts;
×
905

UNCOV
906
          for (int32_t c = 0; c < stbInfo.req.numOfColumns; ++c) {
×
UNCOV
907
            SSchema          *pSchema = mr.me.stbEntry.schemaRow.pSchema + c;
×
UNCOV
908
            SColCmpr         *pColComp = mr.me.colCmpr.pColCmpr + c;
×
UNCOV
909
            SFieldWithOptions col = {
×
UNCOV
910
                .type = pSchema->type,
×
UNCOV
911
                .flags = pSchema->flags,
×
UNCOV
912
                .bytes = pSchema->bytes,
×
UNCOV
913
                .compress = pColComp->alg,
×
914
            };
UNCOV
915
            (void)snprintf(col.name, sizeof(col.name), "%s", pSchema->name);
×
UNCOV
916
            if (pSchema->colId != pColComp->id) {
×
917
              TSDB_CHECK_CODE(TSDB_CODE_FILE_CORRUPTED, lino, _exit0);
×
918
            }
UNCOV
919
            if (mr.me.pExtSchemas) {
×
920
              col.typeMod = (mr.me.pExtSchemas + c)->typeMod;
×
921
            }
UNCOV
922
            TSDB_CHECK_NULL(taosArrayPush(pCols, &col), code, lino, _exit0, terrno);
×
UNCOV
923
            TSDB_CHECK_NULL(taosArrayPush(pColExts, &pSchema->colId), code, lino, _exit0, terrno);
×
924
          }
UNCOV
925
          for (int32_t t = 0; t < stbInfo.req.numOfTags; ++t) {
×
UNCOV
926
            SSchema *pSchema = mr.me.stbEntry.schemaTag.pSchema + t;
×
UNCOV
927
            SField   tag = {
×
UNCOV
928
                  .type = pSchema->type,
×
UNCOV
929
                  .flags = pSchema->flags,
×
UNCOV
930
                  .bytes = pSchema->bytes,
×
931
            };
UNCOV
932
            (void)snprintf(tag.name, sizeof(tag.name), "%s", pSchema->name);
×
UNCOV
933
            TSDB_CHECK_NULL(taosArrayPush(pTags, &tag), code, lino, _exit0, terrno);
×
UNCOV
934
            TSDB_CHECK_NULL(taosArrayPush(pTagExts, &pSchema->colId), code, lino, _exit0, terrno);
×
935
          }
UNCOV
936
          tDecoderClear(&mr.coder);
×
937

938
          // serialize the SMountStbInfo
UNCOV
939
          int32_t firstPartLen = 0;
×
UNCOV
940
          int32_t msgLen = tSerializeSMountStbInfo(NULL, 0, &firstPartLen, &stbInfo);
×
UNCOV
941
          if (msgLen <= 0) {
×
942
            TSDB_CHECK_CODE(msgLen < 0 ? msgLen : TSDB_CODE_INTERNAL_ERROR, lino, _exit0);
×
943
          }
UNCOV
944
          void *pBuf = taosMemoryMalloc((sizeof(int32_t) << 1) + msgLen);  // totalLen(4)|1stPartLen(4)|1stPart|2ndPart
×
UNCOV
945
          if (!pBuf) TSDB_CHECK_CODE(TSDB_CODE_OUT_OF_MEMORY, lino, _exit0);
×
UNCOV
946
          *(int32_t *)pBuf = (sizeof(int32_t) << 1) + msgLen;
×
UNCOV
947
          *(int32_t *)POINTER_SHIFT(pBuf, sizeof(int32_t)) = firstPartLen;
×
UNCOV
948
          if (tSerializeSMountStbInfo(POINTER_SHIFT(pBuf, (sizeof(int32_t) << 1)), msgLen, NULL, &stbInfo) <= 0) {
×
949
            taosMemoryFree(pBuf);
×
950
            TSDB_CHECK_CODE(msgLen < 0 ? msgLen : TSDB_CODE_INTERNAL_ERROR, lino, _exit0);
×
951
          }
UNCOV
952
          if (!taosArrayPush(pDbInfo->pStbs, &pBuf)) {
×
953
            taosMemoryFree(pBuf);
×
954
            TSDB_CHECK_CODE(terrno, lino, _exit0);
×
955
          }
956
        }
UNCOV
957
      _exit0:
×
UNCOV
958
        metaReaderClear(&mr);
×
UNCOV
959
        metaClose(&vnode.pMeta);
×
UNCOV
960
        TAOS_CHECK_EXIT(code);
×
961
      }
UNCOV
962
      break;  // retrieve stbs from one vnode is enough
×
963
    }
964
  }
UNCOV
965
_exit:
×
UNCOV
966
  if (code != 0) {
×
967
    dError("mount:%s, failed to retrieve mount stbs at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
968
           pReq->dnodeId, tstrerror(code), path);
969
  }
UNCOV
970
  taosArrayDestroy(suidList);
×
UNCOV
971
  taosArrayDestroy(pCols);
×
UNCOV
972
  taosArrayDestroy(pTags);
×
UNCOV
973
  taosArrayDestroy(pColExts);
×
UNCOV
974
  taosArrayDestroy(pTagExts);
×
UNCOV
975
  TAOS_RETURN(code);
×
976
}
977

UNCOV
978
int32_t vmMountCheckRunning(const char *mountName, const char *mountPath, TdFilePtr *pFile, int32_t retryLimit) {
×
UNCOV
979
  int32_t code = 0, lino = 0;
×
UNCOV
980
  int32_t retryTimes = 0;
×
UNCOV
981
  char    filepath[PATH_MAX] = {0};
×
UNCOV
982
  (void)snprintf(filepath, sizeof(filepath), "%s%s.running", mountPath, TD_DIRSEP);
×
UNCOV
983
  TSDB_CHECK_NULL((*pFile = taosOpenFile(
×
984
                       filepath, TD_FILE_CREATE | TD_FILE_READ | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_CLOEXEC)),
985
                  code, lino, _exit, terrno);
UNCOV
986
  int32_t ret = 0;
×
987
  do {
UNCOV
988
    ret = taosLockFile(*pFile);
×
UNCOV
989
    if (ret == 0) break;
×
UNCOV
990
    taosMsleep(1000);
×
UNCOV
991
    ++retryTimes;
×
UNCOV
992
    dError("mount:%s, failed to lock file:%s since %s, retryTimes:%d", mountName, filepath, tstrerror(ret), retryTimes);
×
UNCOV
993
  } while (retryTimes < retryLimit);
×
UNCOV
994
  TAOS_CHECK_EXIT(ret);
×
UNCOV
995
_exit:
×
UNCOV
996
  if (code != 0) {
×
UNCOV
997
    (void)taosCloseFile(pFile);
×
UNCOV
998
    *pFile = NULL;
×
UNCOV
999
    dError("mount:%s, failed to check running at line %d since %s, path:%s", mountName, lino, tstrerror(code),
×
1000
           filepath);
1001
  }
UNCOV
1002
  TAOS_RETURN(code);
×
1003
}
1004

UNCOV
1005
static int32_t vmRetrieveMountPreCheck(SVnodeMgmt *pMgmt, SRetrieveMountPathReq *pReq, SMountInfo *pMountInfo) {
×
UNCOV
1006
  int32_t code = 0, lino = 0;
×
UNCOV
1007
  char    path[TSDB_MOUNT_FPATH_LEN] = {0};
×
UNCOV
1008
  TSDB_CHECK_CONDITION(taosCheckAccessFile(pReq->mountPath, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
×
UNCOV
1009
  TAOS_CHECK_EXIT(vmMountCheckRunning(pReq->mountName, pReq->mountPath, &pMountInfo->pFile, 3));
×
UNCOV
1010
  (void)snprintf(path, sizeof(path), "%s%s%s%sdnode.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP);
×
UNCOV
1011
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
×
UNCOV
1012
  (void)snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(MNODE));
×
UNCOV
1013
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
×
UNCOV
1014
  (void)snprintf(path, sizeof(path), "%s%s%s", pReq->mountPath, TD_DIRSEP, dmNodeName(VNODE));
×
UNCOV
1015
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
×
UNCOV
1016
  (void)snprintf(path, sizeof(path), "%s%s%s%sconfig%slocal.json", pReq->mountPath, TD_DIRSEP, dmNodeName(DNODE), TD_DIRSEP,
×
1017
           TD_DIRSEP);
UNCOV
1018
  TSDB_CHECK_CONDITION(taosCheckAccessFile(path, O_RDONLY), code, lino, _exit, TAOS_SYSTEM_ERROR(errno));
×
UNCOV
1019
_exit:
×
UNCOV
1020
  if (code != 0) {
×
UNCOV
1021
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
1022
           pReq->dnodeId, tstrerror(code), path);
1023
  }
UNCOV
1024
  TAOS_RETURN(code);
×
1025
}
1026

UNCOV
1027
static int32_t vmRetrieveMountPathImpl(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, SRetrieveMountPathReq *pReq,
×
1028
                                       SMountInfo *pMountInfo) {
UNCOV
1029
  int32_t code = 0, lino = 0;
×
UNCOV
1030
  pMountInfo->dnodeId = pReq->dnodeId;
×
UNCOV
1031
  pMountInfo->mountUid = pReq->mountUid;
×
UNCOV
1032
  (void)tsnprintf(pMountInfo->mountName, sizeof(pMountInfo->mountName), "%s", pReq->mountName);
×
UNCOV
1033
  (void)tsnprintf(pMountInfo->mountPath, sizeof(pMountInfo->mountPath), "%s", pReq->mountPath);
×
UNCOV
1034
  pMountInfo->ignoreExist = pReq->ignoreExist;
×
UNCOV
1035
  pMountInfo->valLen = pReq->valLen;
×
UNCOV
1036
  pMountInfo->pVal = pReq->pVal;
×
UNCOV
1037
  TAOS_CHECK_EXIT(vmRetrieveMountPreCheck(pMgmt, pReq, pMountInfo));
×
UNCOV
1038
  TAOS_CHECK_EXIT(vmRetrieveMountDnode(pMgmt, pReq, pMountInfo));
×
UNCOV
1039
  TAOS_CHECK_EXIT(vmRetrieveMountVnodes(pMgmt, pReq, pMountInfo));
×
UNCOV
1040
  TAOS_CHECK_EXIT(vmRetrieveMountStbs(pMgmt, pReq, pMountInfo));
×
UNCOV
1041
_exit:
×
UNCOV
1042
  if (code != 0) {
×
UNCOV
1043
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", pReq->mountName, lino,
×
1044
           pReq->dnodeId, tstrerror(code), pReq->mountPath);
1045
  }
UNCOV
1046
  TAOS_RETURN(code);
×
1047
}
1048

UNCOV
1049
int32_t vmProcessRetrieveMountPathReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
UNCOV
1050
  int32_t               code = 0, lino = 0;
×
UNCOV
1051
  int32_t               rspCode = 0;
×
UNCOV
1052
  SVnodeMgmt            vndMgmt = {0};
×
UNCOV
1053
  SMountInfo            mountInfo = {0};
×
UNCOV
1054
  void                 *pBuf = NULL;
×
UNCOV
1055
  int32_t               bufLen = 0;
×
UNCOV
1056
  SRetrieveMountPathReq req = {0};
×
1057

UNCOV
1058
  vndMgmt = *pMgmt;
×
UNCOV
1059
  vndMgmt.path = NULL;
×
UNCOV
1060
  TAOS_CHECK_GOTO(tDeserializeSRetrieveMountPathReq(pMsg->pCont, pMsg->contLen, &req), &lino, _end);
×
UNCOV
1061
  dInfo("mount:%s, start to retrieve path:%s", req.mountName, req.mountPath);
×
UNCOV
1062
  TAOS_CHECK_GOTO(vmRetrieveMountPathImpl(&vndMgmt, pMsg, &req, &mountInfo), &lino, _end);
×
UNCOV
1063
_end:
×
UNCOV
1064
  TSDB_CHECK_CONDITION((bufLen = tSerializeSMountInfo(NULL, 0, &mountInfo)) >= 0, rspCode, lino, _exit, bufLen);
×
UNCOV
1065
  TSDB_CHECK_CONDITION((pBuf = rpcMallocCont(bufLen)), rspCode, lino, _exit, terrno);
×
UNCOV
1066
  TSDB_CHECK_CONDITION((bufLen = tSerializeSMountInfo(pBuf, bufLen, &mountInfo)) >= 0, rspCode, lino, _exit, bufLen);
×
UNCOV
1067
  pMsg->info.rsp = pBuf;
×
UNCOV
1068
  pMsg->info.rspLen = bufLen;
×
UNCOV
1069
_exit:
×
UNCOV
1070
  if (rspCode != 0) {
×
1071
    // corner case: if occurs, the client will not receive the response, and the client should be killed manually
1072
    dError("mount:%s, failed to retrieve mount at line %d since %s, dnode:%d, path:%s", req.mountName, lino,
×
1073
           tstrerror(rspCode), req.dnodeId, req.mountPath);
1074
    rpcFreeCont(pBuf);
×
1075
    code = rspCode;
×
UNCOV
1076
  } else if (code != 0) {
×
1077
    // the client would receive the response with error msg
UNCOV
1078
    dError("mount:%s, failed to retrieve mount at line %d on dnode:%d since %s, path:%s", req.mountName, lino,
×
1079
           req.dnodeId, tstrerror(code), req.mountPath);
1080
  } else {
UNCOV
1081
    int32_t nVgs = 0;
×
UNCOV
1082
    int32_t nDbs = taosArrayGetSize(mountInfo.pDbs);
×
UNCOV
1083
    for (int32_t i = 0; i < nDbs; ++i) {
×
UNCOV
1084
      SMountDbInfo *pDb = TARRAY_GET_ELEM(mountInfo.pDbs, i);
×
UNCOV
1085
      nVgs += taosArrayGetSize(pDb->pVgs);
×
1086
    }
UNCOV
1087
    dInfo("mount:%s, success to retrieve mount, nDbs:%d, nVgs:%d, path:%s", req.mountName, nDbs, nVgs, req.mountPath);
×
1088
  }
UNCOV
1089
  taosMemFreeClear(vndMgmt.path);
×
UNCOV
1090
  tFreeMountInfo(&mountInfo, false);
×
UNCOV
1091
  TAOS_RETURN(code);
×
1092
}
1093

UNCOV
1094
static int32_t vmMountVnode(SVnodeMgmt *pMgmt, const char *path, SVnodeCfg *pCfg, int32_t diskPrimary,
×
1095
                            SMountVnodeReq *req, STfs *pMountTfs) {
UNCOV
1096
  int32_t    code = 0;
×
UNCOV
1097
  SVnodeInfo info = {0};
×
UNCOV
1098
  char       hostDir[TSDB_FILENAME_LEN] = {0};
×
UNCOV
1099
  char       mountDir[TSDB_FILENAME_LEN] = {0};
×
UNCOV
1100
  char       mountVnode[32] = {0};
×
1101

UNCOV
1102
  if ((code = vnodeCheckCfg(pCfg)) < 0) {
×
1103
    vError("vgId:%d, mount:%s, failed to mount vnode since:%s", pCfg->vgId, req->mountName, tstrerror(code));
×
1104
    return code;
×
1105
  }
1106

UNCOV
1107
  vnodeGetPrimaryDir(path, 0, pMgmt->pTfs, hostDir, TSDB_FILENAME_LEN);
×
UNCOV
1108
  if ((code = taosMkDir(hostDir))) {
×
1109
    vError("vgId:%d, mount:%s, failed to prepare vnode dir since %s, host path: %s", pCfg->vgId, req->mountName,
×
1110
           tstrerror(code), hostDir);
1111
    return code;
×
1112
  }
1113

UNCOV
1114
  info.config = *pCfg;  // copy the config
×
UNCOV
1115
  info.state.committed = req->committed;
×
UNCOV
1116
  info.state.commitID = req->commitID;
×
UNCOV
1117
  info.state.commitTerm = req->commitTerm;
×
UNCOV
1118
  info.state.applied = req->committed;
×
UNCOV
1119
  info.state.applyTerm = req->commitTerm;
×
UNCOV
1120
  info.config.vndStats.numOfSTables = req->numOfSTables;
×
UNCOV
1121
  info.config.vndStats.numOfCTables = req->numOfCTables;
×
UNCOV
1122
  info.config.vndStats.numOfNTables = req->numOfNTables;
×
1123

UNCOV
1124
  SVnodeInfo oldInfo = {0};
×
UNCOV
1125
  oldInfo.config = vnodeCfgDefault;
×
UNCOV
1126
  if (vnodeLoadInfo(hostDir, &oldInfo) == 0) {
×
1127
    if (oldInfo.config.dbId != info.config.dbId) {
×
1128
      code = TSDB_CODE_VND_ALREADY_EXIST_BUT_NOT_MATCH;
×
1129
      vError("vgId:%d, mount:%s, vnode config info already exists at %s. oldDbId:%" PRId64 "(%s) at cluster:%" PRId64
×
1130
             ", newDbId:%" PRId64 "(%s) at cluser:%" PRId64 ", code:%s",
1131
             oldInfo.config.vgId, req->mountName, hostDir, oldInfo.config.dbId, oldInfo.config.dbname,
1132
             oldInfo.config.syncCfg.nodeInfo[oldInfo.config.syncCfg.myIndex].clusterId, info.config.dbId,
1133
             info.config.dbname, info.config.syncCfg.nodeInfo[info.config.syncCfg.myIndex].clusterId, tstrerror(code));
1134

1135
    } else {
1136
      vWarn("vgId:%d, mount:%s, vnode config info already exists at %s.", oldInfo.config.vgId, req->mountName, hostDir);
×
1137
    }
1138
    return code;
×
1139
  }
1140

UNCOV
1141
  char hostSubDir[TSDB_FILENAME_LEN] = {0};
×
UNCOV
1142
  char mountSubDir[TSDB_FILENAME_LEN] = {0};
×
UNCOV
1143
  (void)snprintf(mountVnode, sizeof(mountVnode), "vnode%svnode%d", TD_DIRSEP, req->mountVgId);
×
UNCOV
1144
  vnodeGetPrimaryDir(mountVnode, diskPrimary, pMountTfs, mountDir, TSDB_FILENAME_LEN);
×
1145
  static const char *vndSubDirs[] = {"meta", "sync", "tq", "tsdb", "wal"};
UNCOV
1146
  for (int32_t i = 0; i < tListLen(vndSubDirs); ++i) {
×
UNCOV
1147
    (void)snprintf(hostSubDir, sizeof(hostSubDir), "%s%s%s", hostDir, TD_DIRSEP, vndSubDirs[i]);
×
UNCOV
1148
    (void)snprintf(mountSubDir, sizeof(mountSubDir), "%s%s%s", mountDir, TD_DIRSEP, vndSubDirs[i]);
×
UNCOV
1149
    if ((code = taosSymLink(mountSubDir, hostSubDir)) != 0) {
×
1150
      vError("vgId:%d, mount:%s, failed to create vnode symlink %s -> %s since %s", info.config.vgId, req->mountName,
×
1151
             mountSubDir, hostSubDir, tstrerror(code));
1152
      return code;
×
1153
    }
1154
  }
UNCOV
1155
  vInfo("vgId:%d, mount:save vnode config while create", info.config.vgId);
×
UNCOV
1156
  if ((code = vnodeSaveInfo(hostDir, &info)) < 0 || (code = vnodeCommitInfo(hostDir)) < 0) {
×
1157
    vError("vgId:%d, mount:%s, failed to save vnode config since %s, mount path: %s", pCfg ? pCfg->vgId : 0,
×
1158
           req->mountName, tstrerror(code), hostDir);
1159
    return code;
×
1160
  }
UNCOV
1161
  vInfo("vgId:%d, mount:%s, vnode is mounted from %s to %s", info.config.vgId, req->mountName, mountDir, hostDir);
×
UNCOV
1162
  return 0;
×
1163
}
1164

UNCOV
1165
int32_t vmProcessMountVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
UNCOV
1166
  int32_t          code = 0, lino = 0;
×
UNCOV
1167
  SMountVnodeReq   req = {0};
×
UNCOV
1168
  SCreateVnodeReq *pCreateReq = &req.createReq;
×
UNCOV
1169
  SVnodeCfg        vnodeCfg = {0};
×
UNCOV
1170
  SWrapperCfg      wrapperCfg = {0};
×
UNCOV
1171
  SVnode          *pImpl = NULL;
×
UNCOV
1172
  STfs            *pMountTfs = NULL;
×
UNCOV
1173
  char             path[TSDB_FILENAME_LEN] = {0};
×
UNCOV
1174
  bool             releaseTfs = false;
×
1175

UNCOV
1176
  if (tDeserializeSMountVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
×
1177
    dError("vgId:%d, failed to mount vnode since deserialize request error", pCreateReq->vgId);
×
1178
    return TSDB_CODE_INVALID_MSG;
×
1179
  }
1180

UNCOV
1181
  if (pCreateReq->learnerReplica == 0) {
×
UNCOV
1182
    pCreateReq->learnerSelfIndex = -1;
×
1183
  }
UNCOV
1184
  for (int32_t i = 0; i < pCreateReq->replica; ++i) {
×
UNCOV
1185
    dInfo("mount:%s, vgId:%d, replica:%d ep:%s:%u dnode:%d", req.mountName, pCreateReq->vgId, i,
×
1186
          pCreateReq->replicas[i].fqdn, pCreateReq->replicas[i].port, pCreateReq->replicas[i].id);
1187
  }
UNCOV
1188
  for (int32_t i = 0; i < pCreateReq->learnerReplica; ++i) {
×
1189
    dInfo("mount:%s, vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.mountName, pCreateReq->vgId, i,
×
1190
          pCreateReq->learnerReplicas[i].fqdn, pCreateReq->learnerReplicas[i].port, pCreateReq->replicas[i].id);
1191
  }
1192

UNCOV
1193
  SReplica *pReplica = NULL;
×
UNCOV
1194
  if (pCreateReq->selfIndex != -1) {
×
UNCOV
1195
    pReplica = &pCreateReq->replicas[pCreateReq->selfIndex];
×
1196
  } else {
1197
    pReplica = &pCreateReq->learnerReplicas[pCreateReq->learnerSelfIndex];
×
1198
  }
UNCOV
1199
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
×
UNCOV
1200
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
×
1201
    (void)tFreeSMountVnodeReq(&req);
×
1202
    code = TSDB_CODE_INVALID_MSG;
×
1203
    dError("mount:%s, vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode, reason:%s", req.mountName,
×
1204
           pCreateReq->vgId, pReplica->id, pReplica->fqdn, pReplica->port, tstrerror(code));
1205
    return code;
×
1206
  }
UNCOV
1207
  vmGenerateVnodeCfg(pCreateReq, &vnodeCfg);
×
UNCOV
1208
  vnodeCfg.mountVgId = req.mountVgId;
×
UNCOV
1209
  vmGenerateWrapperCfg(pMgmt, pCreateReq, &wrapperCfg);
×
UNCOV
1210
  wrapperCfg.mountId = req.mountId;
×
1211

UNCOV
1212
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, pCreateReq->vgId, false);
×
UNCOV
1213
  if (pVnode != NULL && (pCreateReq->replica == 1 || !pVnode->failed)) {
×
1214
    dError("mount:%s, vgId:%d, already exist", req.mountName, pCreateReq->vgId);
×
1215
    (void)tFreeSMountVnodeReq(&req);
×
1216
    vmReleaseVnode(pMgmt, pVnode);
×
1217
    code = TSDB_CODE_VND_ALREADY_EXIST;
×
1218
    return 0;
×
1219
  }
UNCOV
1220
  vmReleaseVnode(pMgmt, pVnode);
×
1221

UNCOV
1222
  wrapperCfg.diskPrimary = req.diskPrimary;
×
UNCOV
1223
  (void)snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
×
UNCOV
1224
  TAOS_CHECK_EXIT(vmAcquireMountTfs(pMgmt, req.mountId, req.mountName, req.mountPath, &pMountTfs));
×
UNCOV
1225
  releaseTfs = true;
×
1226

UNCOV
1227
  TAOS_CHECK_EXIT(vmMountVnode(pMgmt, path, &vnodeCfg, wrapperCfg.diskPrimary, &req, pMountTfs));
×
UNCOV
1228
  if (!(pImpl = vnodeOpen(path, 0, pMgmt->pTfs, pMountTfs, pMgmt->msgCb, true))) {
×
1229
    TAOS_CHECK_EXIT(terrno != 0 ? terrno : -1);
×
1230
  }
UNCOV
1231
  if ((code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl)) != 0) {
×
1232
    TAOS_CHECK_EXIT(terrno != 0 ? terrno : code);
×
1233
  }
UNCOV
1234
  TAOS_CHECK_EXIT(vnodeStart(pImpl));
×
UNCOV
1235
  TAOS_CHECK_EXIT(vmWriteVnodeListToFile(pMgmt));
×
UNCOV
1236
  TAOS_CHECK_EXIT(vmWriteMountListToFile(pMgmt));
×
UNCOV
1237
_exit:
×
UNCOV
1238
  vmCleanPrimaryDisk(pMgmt, pCreateReq->vgId);
×
UNCOV
1239
  if (code != 0) {
×
1240
    dError("mount:%s, vgId:%d, msgType:%s, failed at line %d to mount vnode since %s", req.mountName, pCreateReq->vgId,
×
1241
           TMSG_INFO(pMsg->msgType), lino, tstrerror(code));
1242
    vmCloseFailedVnode(pMgmt, pCreateReq->vgId);
×
1243
    vnodeClose(pImpl);
×
1244
    vnodeDestroy(0, path, pMgmt->pTfs, 0);
×
1245
    if (releaseTfs) vmReleaseMountTfs(pMgmt, req.mountId, 1);
×
1246
  } else {
UNCOV
1247
    dInfo("mount:%s, vgId:%d, msgType:%s, success to mount vnode", req.mountName, pCreateReq->vgId,
×
1248
          TMSG_INFO(pMsg->msgType));
1249
  }
1250

UNCOV
1251
  pMsg->code = code;
×
UNCOV
1252
  pMsg->info.rsp = NULL;
×
UNCOV
1253
  pMsg->info.rspLen = 0;
×
1254

UNCOV
1255
  (void)tFreeSMountVnodeReq(&req);
×
UNCOV
1256
  TAOS_RETURN(code);
×
1257
}
1258
#endif  // USE_MOUNT
1259

1260
// alter replica doesn't use this, but restore dnode still use this
1261
int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
8,160✔
1262
  SAlterVnodeTypeReq req = {0};
8,160✔
1263
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
8,160!
1264
    terrno = TSDB_CODE_INVALID_MSG;
×
1265
    return -1;
×
1266
  }
1267

1268
  if (req.learnerReplicas == 0) {
1269
    req.learnerSelfIndex = -1;
1270
  }
1271

1272
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process alter-node-type-request", req.vgId,
8,160!
1273
        TMSG_INFO(pMsg->msgType));
1274

1275
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
8,160✔
1276
  if (pVnode == NULL) {
8,160!
1277
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
1278
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1279
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1280
    return -1;
×
1281
  }
1282

1283
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
8,160✔
1284
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
8,160!
1285
  if (role == TAOS_SYNC_ROLE_VOTER) {
8,160!
1286
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
1287
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
1288
    vmReleaseVnode(pMgmt, pVnode);
×
1289
    return -1;
×
1290
  }
1291

1292
  dInfo("vgId:%d, checking node catch up", req.vgId);
8,160!
1293
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
8,160✔
1294
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
6,832✔
1295
    vmReleaseVnode(pMgmt, pVnode);
6,832✔
1296
    return -1;
6,832✔
1297
  }
1298

1299
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
1,328!
1300

1301
  int32_t vgId = req.vgId;
1,328✔
1302
  dInfo("vgId:%d, start to alter vnode type replica:%d selfIndex:%d strict:%d changeVersion:%d", vgId, req.replica,
1,328!
1303
        req.selfIndex, req.strict, req.changeVersion);
1304
  for (int32_t i = 0; i < req.replica; ++i) {
5,044✔
1305
    SReplica *pReplica = &req.replicas[i];
3,716✔
1306
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
3,716!
1307
  }
1308
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
1,328!
1309
    SReplica *pReplica = &req.learnerReplicas[i];
×
1310
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
×
1311
  }
1312

1313
  if (req.replica <= 0 || (req.selfIndex < 0 && req.learnerSelfIndex < 0) || req.selfIndex >= req.replica ||
1,328!
1314
      req.learnerSelfIndex >= req.learnerReplica) {
1,328!
1315
    terrno = TSDB_CODE_INVALID_MSG;
×
1316
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
1317
    vmReleaseVnode(pMgmt, pVnode);
×
1318
    return -1;
×
1319
  }
1320

1321
  SReplica *pReplica = NULL;
1,328✔
1322
  if (req.selfIndex != -1) {
1,328!
1323
    pReplica = &req.replicas[req.selfIndex];
1,328✔
1324
  } else {
1325
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
×
1326
  }
1327

1328
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
1,328!
1329
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
1,328!
1330
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1331
    dError("vgId:%d, dnodeId:%d ep:%s:%u in request, ep:%s:%u in local, %s", vgId, pReplica->id, pReplica->fqdn,
×
1332
           pReplica->port, tsLocalFqdn, tsServerPort, tstrerror(terrno));
1333
    vmReleaseVnode(pMgmt, pVnode);
×
1334
    return -1;
×
1335
  }
1336

1337
  dInfo("vgId:%d, start to close vnode", vgId);
1,328!
1338
  SWrapperCfg wrapperCfg = {
1,328✔
1339
      .dropped = pVnode->dropped,
1,328✔
1340
      .vgId = pVnode->vgId,
1,328✔
1341
      .vgVersion = pVnode->vgVersion,
1,328✔
1342
      .diskPrimary = pVnode->diskPrimary,
1,328✔
1343
  };
1344
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
1,328!
1345

1346
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
1,328✔
1347
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
1,328✔
1348

1349
  int32_t diskPrimary = wrapperCfg.diskPrimary;
1,328✔
1350
  char    path[TSDB_FILENAME_LEN] = {0};
1,328✔
1351
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
1,328✔
1352

1353
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
1,328!
1354
  if (vnodeAlterReplica(path, &req, diskPrimary, pMgmt->pTfs) < 0) {
1,328!
1355
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
1356
    return -1;
×
1357
  }
1358

1359
  dInfo("vgId:%d, begin to open vnode", vgId);
1,328!
1360
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
1,328✔
1361
  if (pImpl == NULL) {
1,328!
1362
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
1363
    return -1;
×
1364
  }
1365

1366
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
1,328!
1367
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
1368
    return -1;
×
1369
  }
1370

1371
  if (vnodeStart(pImpl) != 0) {
1,328!
1372
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
1373
    return -1;
×
1374
  }
1375

1376
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process alter-node-type-request, vnode config is altered",
1,328!
1377
        req.vgId, TMSG_INFO(pMsg->msgType));
1378
  return 0;
1,328✔
1379
}
1380

1381
int32_t vmProcessCheckLearnCatchupReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
1382
  SCheckLearnCatchupReq req = {0};
×
1383
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
×
1384
    terrno = TSDB_CODE_INVALID_MSG;
×
1385
    return -1;
×
1386
  }
1387

1388
  if (req.learnerReplicas == 0) {
1389
    req.learnerSelfIndex = -1;
1390
  }
1391

1392
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process check-learner-catchup-request", req.vgId,
×
1393
        TMSG_INFO(pMsg->msgType));
1394

1395
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
×
1396
  if (pVnode == NULL) {
×
1397
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
1398
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1399
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1400
    return -1;
×
1401
  }
1402

1403
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
×
1404
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
×
1405
  if (role == TAOS_SYNC_ROLE_VOTER) {
×
1406
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
1407
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
1408
    vmReleaseVnode(pMgmt, pVnode);
×
1409
    return -1;
×
1410
  }
1411

1412
  dInfo("vgId:%d, checking node catch up", req.vgId);
×
1413
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
×
1414
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
×
1415
    vmReleaseVnode(pMgmt, pVnode);
×
1416
    return -1;
×
1417
  }
1418

1419
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
×
1420

1421
  vmReleaseVnode(pMgmt, pVnode);
×
1422

1423
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process check-learner-catchup-request", req.vgId,
×
1424
        TMSG_INFO(pMsg->msgType));
1425

1426
  return 0;
×
1427
}
1428

1429
int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
536✔
1430
  SDisableVnodeWriteReq req = {0};
536✔
1431
  if (tDeserializeSDisableVnodeWriteReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
536!
1432
    terrno = TSDB_CODE_INVALID_MSG;
×
1433
    return -1;
×
1434
  }
1435

1436
  dInfo("vgId:%d, vnode write disable:%d", req.vgId, req.disable);
536!
1437

1438
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
536✔
1439
  if (pVnode == NULL) {
536!
1440
    dError("vgId:%d, failed to disable write since %s", req.vgId, terrstr());
×
1441
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1442
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1443
    return -1;
×
1444
  }
1445

1446
  pVnode->disable = req.disable;
536✔
1447
  vmReleaseVnode(pMgmt, pVnode);
536✔
1448
  return 0;
536✔
1449
}
1450

1451
int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
536✔
1452
  SAlterVnodeHashRangeReq req = {0};
536✔
1453
  if (tDeserializeSAlterVnodeHashRangeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
536!
1454
    terrno = TSDB_CODE_INVALID_MSG;
×
1455
    return -1;
×
1456
  }
1457

1458
  int32_t srcVgId = req.srcVgId;
536✔
1459
  int32_t dstVgId = req.dstVgId;
536✔
1460

1461
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, dstVgId);
536✔
1462
  if (pVnode != NULL) {
536!
1463
    dError("vgId:%d, vnode already exist", dstVgId);
×
1464
    vmReleaseVnode(pMgmt, pVnode);
×
1465
    terrno = TSDB_CODE_VND_ALREADY_EXIST;
×
1466
    return -1;
×
1467
  }
1468

1469
  dInfo("vgId:%d, start to alter vnode hashrange:[%u, %u], dstVgId:%d", req.srcVgId, req.hashBegin, req.hashEnd,
536!
1470
        req.dstVgId);
1471
  pVnode = vmAcquireVnode(pMgmt, srcVgId);
536✔
1472
  if (pVnode == NULL) {
536!
1473
    dError("vgId:%d, failed to alter hashrange since %s", srcVgId, terrstr());
×
1474
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1475
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1476
    return -1;
×
1477
  }
1478

1479
  SWrapperCfg wrapperCfg = {
536✔
1480
      .dropped = pVnode->dropped,
536✔
1481
      .vgId = dstVgId,
1482
      .vgVersion = pVnode->vgVersion,
536✔
1483
      .diskPrimary = pVnode->diskPrimary,
536✔
1484
  };
1485
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
536!
1486

1487
  // prepare alter
1488
  pVnode->toVgId = dstVgId;
536✔
1489
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
536!
1490
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
1491
    return -1;
×
1492
  }
1493

1494
  dInfo("vgId:%d, close vnode", srcVgId);
536!
1495
  vmCloseVnode(pMgmt, pVnode, true, false);
536✔
1496

1497
  int32_t diskPrimary = wrapperCfg.diskPrimary;
536✔
1498
  char    srcPath[TSDB_FILENAME_LEN] = {0};
536✔
1499
  char    dstPath[TSDB_FILENAME_LEN] = {0};
536✔
1500
  snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
536!
1501
  snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
536!
1502

1503
  dInfo("vgId:%d, alter vnode hashrange at %s", srcVgId, srcPath);
536!
1504
  if (vnodeAlterHashRange(srcPath, dstPath, &req, diskPrimary, pMgmt->pTfs) < 0) {
536!
1505
    dError("vgId:%d, failed to alter vnode hashrange since %s", srcVgId, terrstr());
×
1506
    return -1;
×
1507
  }
1508

1509
  dInfo("vgId:%d, open vnode", dstVgId);
536!
1510
  SVnode *pImpl = vnodeOpen(dstPath, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
536✔
1511

1512
  if (pImpl == NULL) {
536!
1513
    dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr());
×
1514
    return -1;
×
1515
  }
1516

1517
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
536!
1518
    dError("vgId:%d, failed to open vnode mgmt since %s", dstVgId, terrstr());
×
1519
    return -1;
×
1520
  }
1521

1522
  if (vnodeStart(pImpl) != 0) {
536!
1523
    dError("vgId:%d, failed to start sync since %s", dstVgId, terrstr());
×
1524
    return -1;
×
1525
  }
1526

1527
  // complete alter
1528
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
536!
1529
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
1530
    return -1;
×
1531
  }
1532

1533
  dInfo("vgId:%d, vnode hashrange is altered", dstVgId);
536!
1534
  return 0;
536✔
1535
}
1536

1537
int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
11,432✔
1538
  SAlterVnodeReplicaReq alterReq = {0};
11,432✔
1539
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
11,432!
1540
    terrno = TSDB_CODE_INVALID_MSG;
×
1541
    return -1;
×
1542
  }
1543

1544
  if (alterReq.learnerReplica == 0) {
11,432✔
1545
    alterReq.learnerSelfIndex = -1;
8,060✔
1546
  }
1547

1548
  int32_t vgId = alterReq.vgId;
11,432✔
1549
  dInfo(
11,432!
1550
      "vgId:%d, vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
1551
      "learnerSelfIndex:%d strict:%d changeVersion:%d",
1552
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
1553
      alterReq.learnerSelfIndex, alterReq.strict, alterReq.changeVersion);
1554

1555
  for (int32_t i = 0; i < alterReq.replica; ++i) {
41,540✔
1556
    SReplica *pReplica = &alterReq.replicas[i];
30,108✔
1557
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
30,108!
1558
  }
1559
  for (int32_t i = 0; i < alterReq.learnerReplica; ++i) {
14,804✔
1560
    SReplica *pReplica = &alterReq.learnerReplicas[i];
3,372✔
1561
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
3,372!
1562
  }
1563

1564
  if (alterReq.replica <= 0 || (alterReq.selfIndex < 0 && alterReq.learnerSelfIndex < 0) ||
11,432!
1565
      alterReq.selfIndex >= alterReq.replica || alterReq.learnerSelfIndex >= alterReq.learnerReplica) {
11,432!
1566
    terrno = TSDB_CODE_INVALID_MSG;
×
1567
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
1568
    return -1;
×
1569
  }
1570

1571
  SReplica *pReplica = NULL;
11,432✔
1572
  if (alterReq.selfIndex != -1) {
11,432!
1573
    pReplica = &alterReq.replicas[alterReq.selfIndex];
11,432✔
1574
  } else {
1575
    pReplica = &alterReq.learnerReplicas[alterReq.learnerSelfIndex];
×
1576
  }
1577

1578
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
11,432!
1579
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
11,432!
1580
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1581
    dError("vgId:%d, dnodeId:%d ep:%s:%u in request, ep:%s:%u in lcoal, %s", vgId, pReplica->id, pReplica->fqdn,
×
1582
           pReplica->port, tsLocalFqdn, tsServerPort, tstrerror(terrno));
1583
    return -1;
×
1584
  }
1585

1586
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
11,432✔
1587
  if (pVnode == NULL) {
11,432!
1588
    dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
×
1589
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1590
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
1591
    return -1;
×
1592
  }
1593

1594
  dInfo("vgId:%d, start to close vnode", vgId);
11,432!
1595
  SWrapperCfg wrapperCfg = {
11,432✔
1596
      .dropped = pVnode->dropped,
11,432✔
1597
      .vgId = pVnode->vgId,
11,432✔
1598
      .vgVersion = pVnode->vgVersion,
11,432✔
1599
      .diskPrimary = pVnode->diskPrimary,
11,432✔
1600
  };
1601
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
11,432!
1602

1603
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
11,432✔
1604
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
11,432✔
1605

1606
  int32_t diskPrimary = wrapperCfg.diskPrimary;
11,432✔
1607
  char    path[TSDB_FILENAME_LEN] = {0};
11,432✔
1608
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
11,432✔
1609

1610
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
11,432!
1611
  if (vnodeAlterReplica(path, &alterReq, diskPrimary, pMgmt->pTfs) < 0) {
11,432!
1612
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
1613
    return -1;
×
1614
  }
1615

1616
  dInfo("vgId:%d, begin to open vnode", vgId);
11,432!
1617
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, NULL, pMgmt->msgCb, false);
11,432✔
1618
  if (pImpl == NULL) {
11,432!
1619
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
1620
    return -1;
×
1621
  }
1622

1623
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
11,432!
1624
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
1625
    return -1;
×
1626
  }
1627

1628
  if (vnodeStart(pImpl) != 0) {
11,432!
1629
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
1630
    return -1;
×
1631
  }
1632

1633
  dInfo(
11,432!
1634
      "vgId:%d, vnode management handle msgType:%s, end to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
1635
      "learnerSelfIndex:%d strict:%d",
1636
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
1637
      alterReq.learnerSelfIndex, alterReq.strict);
1638
  return 0;
11,432✔
1639
}
1640

UNCOV
1641
int32_t vmProcessAlterVnodeElectBaselineReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
UNCOV
1642
  SAlterVnodeElectBaselineReq alterReq = {0};
×
UNCOV
1643
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
×
1644
    return TSDB_CODE_INVALID_MSG;
×
1645
  }
1646

UNCOV
1647
  int32_t vgId = alterReq.vgId;
×
UNCOV
1648
  dInfo(
×
1649
      "vgId:%d, process alter vnode elect-base-line msgType:%s, electBaseLine:%d",
1650
      vgId, TMSG_INFO(pMsg->msgType), alterReq.electBaseLine);
1651

UNCOV
1652
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
×
UNCOV
1653
  if (pVnode == NULL) {
×
1654
    dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
×
1655
    return terrno;
×
1656
  }
1657

UNCOV
1658
  if(vnodeSetElectBaseline(pVnode->pImpl, alterReq.electBaseLine) != 0){
×
1659
    vmReleaseVnode(pMgmt, pVnode);
×
1660
    return -1;
×
1661
  }
1662

UNCOV
1663
  vmReleaseVnode(pMgmt, pVnode);
×
UNCOV
1664
  return 0;
×
1665
}
1666

1667
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
230,251✔
1668
  int32_t       code = 0;
230,251✔
1669
  SDropVnodeReq dropReq = {0};
230,251✔
1670
  if (tDeserializeSDropVnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
230,251!
1671
    terrno = TSDB_CODE_INVALID_MSG;
×
1672
    return terrno;
×
1673
  }
1674

1675
  int32_t vgId = dropReq.vgId;
230,251✔
1676
  dInfo("vgId:%d, start to drop vnode", vgId);
230,251!
1677

1678
  if (dropReq.dnodeId != pMgmt->pData->dnodeId) {
230,251!
1679
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1680
    dError("vgId:%d, dnodeId:%d, %s", dropReq.vgId, dropReq.dnodeId, tstrerror(terrno));
×
1681
    return terrno;
×
1682
  }
1683

1684
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, vgId, false);
230,251✔
1685
  if (pVnode == NULL) {
230,251!
1686
    dInfo("vgId:%d, failed to drop since %s", vgId, terrstr());
×
1687
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
1688
    return terrno;
×
1689
  }
1690

1691
  pVnode->dropped = 1;
230,251✔
1692
  if ((code = vmWriteVnodeListToFile(pMgmt)) != 0) {
230,251!
1693
    pVnode->dropped = 0;
×
1694
    vmReleaseVnode(pMgmt, pVnode);
×
1695
    return code;
×
1696
  }
1697

1698
  vmCloseVnode(pMgmt, pVnode, false, false);
230,251✔
1699
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
230,251!
1700
    dError("vgId:%d, failed to write vnode list since %s", vgId, terrstr());
×
1701
  }
1702

1703
  dInfo("vgId:%d, is dropped", vgId);
230,251!
1704
  return 0;
230,251✔
1705
}
1706

1707
int32_t vmProcessArbHeartBeatReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
8,760✔
1708
  SVArbHeartBeatReq arbHbReq = {0};
8,760✔
1709
  SVArbHeartBeatRsp arbHbRsp = {0};
8,760✔
1710
  if (tDeserializeSVArbHeartBeatReq(pMsg->pCont, pMsg->contLen, &arbHbReq) != 0) {
8,760!
1711
    terrno = TSDB_CODE_INVALID_MSG;
×
1712
    return -1;
×
1713
  }
1714

1715
  if (arbHbReq.dnodeId != pMgmt->pData->dnodeId) {
8,760!
1716
    terrno = TSDB_CODE_DNODE_NOT_MATCH_WITH_LOCAL;
×
1717
    dError("dnodeId:%d, %s", arbHbReq.dnodeId, tstrerror(terrno));
×
1718
    goto _OVER;
×
1719
  }
1720

1721
  if (strlen(arbHbReq.arbToken) == 0) {
8,760!
1722
    terrno = TSDB_CODE_INVALID_MSG;
×
1723
    dError("dnodeId:%d arbToken is empty", arbHbReq.dnodeId);
×
1724
    goto _OVER;
×
1725
  }
1726

1727
  size_t size = taosArrayGetSize(arbHbReq.hbMembers);
8,760✔
1728

1729
  arbHbRsp.dnodeId = pMgmt->pData->dnodeId;
8,760✔
1730
  tstrncpy(arbHbRsp.arbToken, arbHbReq.arbToken, TSDB_ARB_TOKEN_SIZE);
8,760!
1731
  arbHbRsp.hbMembers = taosArrayInit(size, sizeof(SVArbHbRspMember));
8,760✔
1732
  if (arbHbRsp.hbMembers == NULL) {
8,760!
1733
    goto _OVER;
×
1734
  }
1735

1736
  for (int32_t i = 0; i < size; i++) {
24,090✔
1737
    SVArbHbReqMember *pReqMember = taosArrayGet(arbHbReq.hbMembers, i);
15,330✔
1738
    SVnodeObj        *pVnode = vmAcquireVnode(pMgmt, pReqMember->vgId);
15,330✔
1739
    if (pVnode == NULL) {
15,330!
UNCOV
1740
      dError("dnodeId:%d vgId:%d not found failed to process arb hb req", arbHbReq.dnodeId, pReqMember->vgId);
×
UNCOV
1741
      continue;
×
1742
    }
1743

1744
    SVArbHbRspMember rspMember = {0};
15,330✔
1745
    rspMember.vgId = pReqMember->vgId;
15,330✔
1746
    rspMember.hbSeq = pReqMember->hbSeq;
15,330✔
1747
    if (vnodeGetArbToken(pVnode->pImpl, rspMember.memberToken) != 0) {
15,330!
1748
      dError("dnodeId:%d vgId:%d failed to get arb token", arbHbReq.dnodeId, pReqMember->vgId);
×
1749
      vmReleaseVnode(pMgmt, pVnode);
×
1750
      continue;
×
1751
    }
1752

1753
    if (vnodeUpdateArbTerm(pVnode->pImpl, arbHbReq.arbTerm) != 0) {
15,330!
1754
      dError("dnodeId:%d vgId:%d failed to update arb term", arbHbReq.dnodeId, pReqMember->vgId);
×
1755
      vmReleaseVnode(pMgmt, pVnode);
×
1756
      continue;
×
1757
    }
1758

1759
    if (taosArrayPush(arbHbRsp.hbMembers, &rspMember) == NULL) {
30,660!
1760
      dError("dnodeId:%d vgId:%d failed to push arb hb rsp member", arbHbReq.dnodeId, pReqMember->vgId);
×
1761
      vmReleaseVnode(pMgmt, pVnode);
×
1762
      goto _OVER;
×
1763
    }
1764

1765
    vmReleaseVnode(pMgmt, pVnode);
15,330✔
1766
  }
1767

1768
  SRpcMsg rspMsg = {.info = pMsg->info};
8,760✔
1769
  int32_t rspLen = tSerializeSVArbHeartBeatRsp(NULL, 0, &arbHbRsp);
8,760✔
1770
  if (rspLen < 0) {
8,760!
1771
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1772
    goto _OVER;
×
1773
  }
1774

1775
  void *pRsp = rpcMallocCont(rspLen);
8,760✔
1776
  if (pRsp == NULL) {
8,760!
1777
    terrno = terrno;
×
1778
    goto _OVER;
×
1779
  }
1780

1781
  if (tSerializeSVArbHeartBeatRsp(pRsp, rspLen, &arbHbRsp) <= 0) {
8,760!
1782
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1783
    rpcFreeCont(pRsp);
×
1784
    goto _OVER;
×
1785
  }
1786
  pMsg->info.rsp = pRsp;
8,760✔
1787
  pMsg->info.rspLen = rspLen;
8,760✔
1788

1789
  terrno = TSDB_CODE_SUCCESS;
8,760✔
1790

1791
_OVER:
8,760✔
1792
  tFreeSVArbHeartBeatReq(&arbHbReq);
8,760✔
1793
  tFreeSVArbHeartBeatRsp(&arbHbRsp);
8,760✔
1794
  return terrno;
8,760✔
1795
}
1796

1797
SArray *vmGetMsgHandles() {
141,299✔
1798
  int32_t code = -1;
141,299✔
1799
  SArray *pArray = taosArrayInit(32, sizeof(SMgmtHandle));
141,299✔
1800
  if (pArray == NULL) goto _OVER;
141,299!
1801

1802
  if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
141,299!
1803
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
141,299!
1804
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
141,299!
1805
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
141,299!
1806
  if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
141,299!
1807
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
141,299!
1808
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
141,299!
1809
  if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
141,299!
1810
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
141,299!
1811
  if (dmSetMgmtHandle(pArray, TDMT_VND_VSUBTABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
141,299!
1812
  if (dmSetMgmtHandle(pArray, TDMT_VND_VSTB_REF_DBS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
141,299!
1813
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_CFG, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
141,299!
1814
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
141,299!
1815
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
141,299!
1816
  if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
141,299!
1817
  if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
141,299!
1818
  if (dmSetMgmtHandle(pArray, TDMT_SCH_TASK_NOTIFY, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
141,299!
1819
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
141,299!
1820
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
141,299!
1821
  if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_TTL_EXPIRED_TBS, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
141,299!
1822
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
141,299!
1823
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
141,299!
1824
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
141,299!
1825
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
141,299!
1826
  if (dmSetMgmtHandle(pArray, TDMT_VND_SNODE_DROP_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
141,299!
1827
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
141,299!
1828
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DELETE_SUB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
141,299!
1829
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
141,299!
1830
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SEEK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
141,299!
1831
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_ADD_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
141,299!
1832
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DEL_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
141,299!
1833
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
141,299!
1834
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME_PUSH, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
141,299!
1835
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_WALINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
141,299!
1836
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_COMMITTEDINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
141,299!
1837
  if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
141,299!
1838
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
141,299!
1839
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
141,299!
1840
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
141,299!
1841
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
141,299!
1842
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
141,299!
1843
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_COMPACT_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
141,299!
1844
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_TRIM_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
141,299!
1845
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_SCAN_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
141,299!
1846
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
141,299!
1847
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_SCAN, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
141,299!
1848
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
141,299!
1849
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_NAME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
141,299!
1850
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
141,299!
1851
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
141,299!
1852
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
141,299!
1853

1854
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
141,299!
1855
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
141,299!
1856
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
141,299!
1857
  if (dmSetMgmtHandle(pArray, TDMT_VND_DISABLE_WRITE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
141,299!
1858
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
141,299!
1859
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
141,299!
1860
  if (dmSetMgmtHandle(pArray, TDMT_VND_SCAN, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
141,299!
1861
  if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
141,299!
1862

1863
  if (dmSetMgmtHandle(pArray, TDMT_VND_LIST_SSMIGRATE_FILESETS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
141,299!
1864
  if (dmSetMgmtHandle(pArray, TDMT_VND_SSMIGRATE_FILESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
141,299!
1865
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_SSMIGRATE_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
141,299!
1866
  if (dmSetMgmtHandle(pArray, TDMT_VND_FOLLOWER_SSMIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
141,299!
1867
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_SSMIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
141,299!
1868

1869
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
141,299!
1870
  if (dmSetMgmtHandle(pArray, TDMT_DND_MOUNT_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
141,299!
1871
  if (dmSetMgmtHandle(pArray, TDMT_DND_RETRIEVE_MOUNT_PATH, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
141,299!
1872
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
141,299!
1873
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
141,299!
1874
  if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
141,299!
1875
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CONFIG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
141,299!
1876
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_ELECTBASELINE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
141,299!
1877

1878
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT_ELECTION, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
141,299!
1879
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
141,299!
1880
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
141,299!
1881
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
141,299!
1882
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
141,299!
1883
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
141,299!
1884
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
141,299!
1885
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
141,299!
1886
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
141,299!
1887
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
141,299!
1888
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
141,299!
1889
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_FORCE_FOLLOWER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
141,299!
1890

1891
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
141,299!
1892
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
141,299!
1893
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
141,299!
1894
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
141,299!
1895
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
141,299!
1896

1897
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_HEARTBEAT, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
141,299!
1898
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_CHECK_SYNC, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
141,299!
1899
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_ASSIGNED_LEADER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
141,299!
1900

1901
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_FETCH, vmPutMsgToStreamReaderQueue, 0) == NULL) goto _OVER;
141,299!
1902
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_PULL, vmPutMsgToStreamReaderQueue, 0) == NULL) goto _OVER;
141,299!
1903
  code = 0;
141,299✔
1904

1905
_OVER:
141,299✔
1906
  if (code != 0) {
141,299!
1907
    taosArrayDestroy(pArray);
×
1908
    return NULL;
×
1909
  } else {
1910
    return pArray;
141,299✔
1911
  }
1912
}
1913

1914
void vmUpdateMetricsInfo(SVnodeMgmt *pMgmt, int64_t clusterId) {
×
1915
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
×
1916

1917
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
1918
  while (pIter) {
×
1919
    SVnodeObj **ppVnode = pIter;
×
1920
    if (ppVnode == NULL || *ppVnode == NULL) {
×
1921
      continue;
×
1922
    }
1923

1924
    SVnodeObj *pVnode = *ppVnode;
×
1925
    if (!pVnode->failed) {
×
1926
      SRawWriteMetrics metrics = {0};
×
1927
      if (vnodeGetRawWriteMetrics(pVnode->pImpl, &metrics) == 0) {
×
1928
        // Add the metrics to the global metrics system with cluster ID
1929
        SName   name = {0};
×
1930
        int32_t code = tNameFromString(&name, pVnode->pImpl->config.dbname, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1931
        if (code < 0) {
×
1932
          dError("failed to get db name since %s", tstrerror(code));
×
1933
          continue;
×
1934
        }
1935
        code = addWriteMetrics(pVnode->vgId, pMgmt->pData->dnodeId, clusterId, tsLocalEp, name.dbname, &metrics);
×
1936
        if (code != TSDB_CODE_SUCCESS) {
×
1937
          dError("Failed to add write metrics for vgId: %d, code: %d", pVnode->vgId, code);
×
1938
        } else {
1939
          // After successfully adding metrics, reset the vnode's write metrics using atomic operations
1940
          if (vnodeResetRawWriteMetrics(pVnode->pImpl, &metrics) != 0) {
×
1941
            dError("Failed to reset write metrics for vgId: %d", pVnode->vgId);
×
1942
          }
1943
        }
1944
      } else {
1945
        dError("Failed to get write metrics for vgId: %d", pVnode->vgId);
×
1946
      }
1947
    }
1948
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
1949
  }
1950

1951
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
1952
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc