• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3544

30 Nov 2024 03:06AM UTC coverage: 60.88% (+0.04%) from 60.842%
#3544

push

travis-ci

web-flow
Merge pull request #28988 from taosdata/main

merge: from main to 3.0 branch

120724 of 253479 branches covered (47.63%)

Branch coverage included in aggregate %.

407 of 489 new or added lines in 21 files covered. (83.23%)

1148 existing lines in 113 files now uncovered.

201919 of 276488 relevant lines covered (73.03%)

18898587.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.17
/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "taos_monitor.h"
18
#include "vmInt.h"
19

20
extern taos_counter_t *tsInsertCounter;
21

22
void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
104,600✔
23
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad));
104,600✔
24
  if (pInfo->pVloads == NULL) return;
104,600!
25

26
  tfsUpdateSize(pMgmt->pTfs);
104,600✔
27

28
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
104,600✔
29

30
  void *pIter = taosHashIterate(pMgmt->hash, NULL);
104,600✔
31
  while (pIter) {
1,123,454✔
32
    SVnodeObj **ppVnode = pIter;
1,018,854✔
33
    if (ppVnode == NULL || *ppVnode == NULL) continue;
1,018,854!
34

35
    SVnodeObj *pVnode = *ppVnode;
1,018,854✔
36
    SVnodeLoad vload = {.vgId = pVnode->vgId};
1,018,854✔
37
    if (!pVnode->failed) {
1,018,854!
38
      if (vnodeGetLoad(pVnode->pImpl, &vload) != 0) {
1,018,854!
39
        dError("failed to get vnode load");
×
40
      }
41
      if (isReset) vnodeResetLoad(pVnode->pImpl, &vload);
1,018,854✔
42
    }
43
    if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
2,037,708!
44
      dError("failed to push vnode load");
×
45
    }
46
    pIter = taosHashIterate(pMgmt->hash, pIter);
1,018,854✔
47
  }
48

49
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
104,600✔
50
}
51

52
void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
×
53
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoadLite));
×
54
  if (!pInfo->pVloads) return;
×
55

56
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
×
57

58
  void *pIter = taosHashIterate(pMgmt->hash, NULL);
×
59
  while (pIter) {
×
60
    SVnodeObj **ppVnode = pIter;
×
61
    if (ppVnode == NULL || *ppVnode == NULL) continue;
×
62

63
    SVnodeObj *pVnode = *ppVnode;
×
64
    if (!pVnode->failed) {
×
65
      SVnodeLoadLite vload = {0};
×
66
      if (vnodeGetLoadLite(pVnode->pImpl, &vload) == 0) {
×
67
        if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
×
68
          taosArrayDestroy(pInfo->pVloads);
×
69
          pInfo->pVloads = NULL;
×
70
          break;
×
71
        }
72
      }
73
    }
74
    pIter = taosHashIterate(pMgmt->hash, pIter);
×
75
  }
76

77
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
×
78
}
79

80
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
12✔
81
  SMonVloadInfo vloads = {0};
12✔
82
  vmGetVnodeLoads(pMgmt, &vloads, true);
12✔
83

84
  SArray *pVloads = vloads.pVloads;
12✔
85
  if (pVloads == NULL) return;
12!
86

87
  int32_t totalVnodes = 0;
12✔
88
  int32_t masterNum = 0;
12✔
89
  int64_t numOfSelectReqs = 0;
12✔
90
  int64_t numOfInsertReqs = 0;
12✔
91
  int64_t numOfInsertSuccessReqs = 0;
12✔
92
  int64_t numOfBatchInsertReqs = 0;
12✔
93
  int64_t numOfBatchInsertSuccessReqs = 0;
12✔
94

95
  for (int32_t i = 0; i < taosArrayGetSize(pVloads); ++i) {
38✔
96
    SVnodeLoad *pLoad = taosArrayGet(pVloads, i);
26✔
97
    numOfSelectReqs += pLoad->numOfSelectReqs;
26✔
98
    numOfInsertReqs += pLoad->numOfInsertReqs;
26✔
99
    numOfInsertSuccessReqs += pLoad->numOfInsertSuccessReqs;
26✔
100
    numOfBatchInsertReqs += pLoad->numOfBatchInsertReqs;
26✔
101
    numOfBatchInsertSuccessReqs += pLoad->numOfBatchInsertSuccessReqs;
26✔
102
    if (pLoad->syncState == TAOS_SYNC_STATE_LEADER || pLoad->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
26!
103
      masterNum++;
26✔
104
    }
105
    totalVnodes++;
26✔
106
  }
107

108
  pInfo->vstat.totalVnodes = totalVnodes;
12✔
109
  pInfo->vstat.masterNum = masterNum;
12✔
110
  pInfo->vstat.numOfSelectReqs = numOfSelectReqs;
12✔
111
  pInfo->vstat.numOfInsertReqs = numOfInsertReqs;                          // delta
12✔
112
  pInfo->vstat.numOfInsertSuccessReqs = numOfInsertSuccessReqs;            // delta
12✔
113
  pInfo->vstat.numOfBatchInsertReqs = numOfBatchInsertReqs;                // delta
12✔
114
  pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;  // delta
12✔
115
  pMgmt->state.totalVnodes = totalVnodes;
12✔
116
  pMgmt->state.masterNum = masterNum;
12✔
117
  pMgmt->state.numOfSelectReqs = numOfSelectReqs;
12✔
118
  pMgmt->state.numOfInsertReqs = numOfInsertReqs;
12✔
119
  pMgmt->state.numOfInsertSuccessReqs = numOfInsertSuccessReqs;
12✔
120
  pMgmt->state.numOfBatchInsertReqs = numOfBatchInsertReqs;
12✔
121
  pMgmt->state.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;
12✔
122

123
  if (tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs) != 0) {
12!
124
    dError("failed to get tfs monitor info");
×
125
  }
126
  taosArrayDestroy(pVloads);
12✔
127
}
128

129
void vmCleanExpriedSamples(SVnodeMgmt *pMgmt) {
12✔
130
  int list_size = taos_counter_get_keys_size(tsInsertCounter);
12✔
131
  if (list_size == 0) return;
12!
132
  int32_t *vgroup_ids;
133
  char   **keys;
134
  int      r = 0;
×
135
  r = taos_counter_get_vgroup_ids(tsInsertCounter, &keys, &vgroup_ids, &list_size);
×
136
  if (r) {
×
137
    dError("failed to get vgroup ids");
×
138
    return;
×
139
  }
140
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
×
141
  for (int i = 0; i < list_size; i++) {
×
142
    int32_t vgroup_id = vgroup_ids[i];
×
143
    void   *vnode = taosHashGet(pMgmt->hash, &vgroup_id, sizeof(int32_t));
×
144
    if (vnode == NULL) {
×
145
      r = taos_counter_delete(tsInsertCounter, keys[i]);
×
146
      if (r) {
×
147
        dError("failed to delete monitor sample key:%s", keys[i]);
×
148
      }
149
    }
150
  }
151
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
×
152
  if (vgroup_ids) taosMemoryFree(vgroup_ids);
×
153
  if (keys) taosMemoryFree(keys);
×
154
  return;
×
155
}
156

157
static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
11,279✔
158
  memcpy(pCfg, &vnodeCfgDefault, sizeof(SVnodeCfg));
11,279✔
159

160
  pCfg->vgId = pCreate->vgId;
11,279✔
161
  tstrncpy(pCfg->dbname, pCreate->db, sizeof(pCfg->dbname));
11,279✔
162
  pCfg->dbId = pCreate->dbUid;
11,279✔
163
  pCfg->szPage = pCreate->pageSize * 1024;
11,279✔
164
  pCfg->szCache = pCreate->pages;
11,279✔
165
  pCfg->cacheLast = pCreate->cacheLast;
11,279✔
166
  pCfg->cacheLastSize = pCreate->cacheLastSize;
11,279✔
167
  pCfg->szBuf = (uint64_t)pCreate->buffer * 1024 * 1024;
11,279✔
168
  pCfg->isWeak = true;
11,279✔
169
  pCfg->isTsma = pCreate->isTsma;
11,279✔
170
  pCfg->tsdbCfg.compression = pCreate->compression;
11,279✔
171
  pCfg->tsdbCfg.precision = pCreate->precision;
11,279✔
172
  pCfg->tsdbCfg.days = pCreate->daysPerFile;
11,279✔
173
  pCfg->tsdbCfg.keep0 = pCreate->daysToKeep0;
11,279✔
174
  pCfg->tsdbCfg.keep1 = pCreate->daysToKeep1;
11,279✔
175
  pCfg->tsdbCfg.keep2 = pCreate->daysToKeep2;
11,279✔
176
  pCfg->tsdbCfg.keepTimeOffset = pCreate->keepTimeOffset;
11,279✔
177
  pCfg->tsdbCfg.minRows = pCreate->minRows;
11,279✔
178
  pCfg->tsdbCfg.maxRows = pCreate->maxRows;
11,279✔
179
  for (size_t i = 0; i < taosArrayGetSize(pCreate->pRetensions); ++i) {
11,297✔
180
    SRetention *pRetention = &pCfg->tsdbCfg.retentions[i];
21✔
181
    memcpy(pRetention, taosArrayGet(pCreate->pRetensions, i), sizeof(SRetention));
21✔
182
    if (i == 0) {
18✔
183
      if ((pRetention->freq >= 0 && pRetention->keep > 0)) pCfg->isRsma = 1;
6!
184
    }
185
  }
186
#if defined(TD_ENTERPRISE)
187
  pCfg->tsdbCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
11,277✔
188
  if (pCfg->tsdbCfg.encryptAlgorithm == DND_CA_SM4) {
11,277✔
189
    strncpy(pCfg->tsdbCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN);
1✔
190
  }
191
#else
192
  pCfg->tsdbCfg.encryptAlgorithm = 0;
193
#endif
194

195
  pCfg->walCfg.vgId = pCreate->vgId;
11,277✔
196
  pCfg->walCfg.fsyncPeriod = pCreate->walFsyncPeriod;
11,277✔
197
  pCfg->walCfg.retentionPeriod = pCreate->walRetentionPeriod;
11,277✔
198
  pCfg->walCfg.rollPeriod = pCreate->walRollPeriod;
11,277✔
199
  pCfg->walCfg.retentionSize = pCreate->walRetentionSize;
11,277✔
200
  pCfg->walCfg.segSize = pCreate->walSegmentSize;
11,277✔
201
  pCfg->walCfg.level = pCreate->walLevel;
11,277✔
202
#if defined(TD_ENTERPRISE)
203
  pCfg->walCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
11,277✔
204
  if (pCfg->walCfg.encryptAlgorithm == DND_CA_SM4) {
11,277✔
205
    strncpy(pCfg->walCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN);
1✔
206
  }
207
#else
208
  pCfg->walCfg.encryptAlgorithm = 0;
209
#endif
210

211
#if defined(TD_ENTERPRISE)
212
  pCfg->tdbEncryptAlgorithm = pCreate->encryptAlgorithm;
11,277✔
213
  if (pCfg->tdbEncryptAlgorithm == DND_CA_SM4) {
11,277✔
214
    strncpy(pCfg->tdbEncryptKey, tsEncryptKey, ENCRYPT_KEY_LEN);
1✔
215
  }
216
#else
217
  pCfg->tdbEncryptAlgorithm = 0;
218
#endif
219

220
  pCfg->sttTrigger = pCreate->sstTrigger;
11,277✔
221
  pCfg->hashBegin = pCreate->hashBegin;
11,277✔
222
  pCfg->hashEnd = pCreate->hashEnd;
11,277✔
223
  pCfg->hashMethod = pCreate->hashMethod;
11,277✔
224
  pCfg->hashPrefix = pCreate->hashPrefix;
11,277✔
225
  pCfg->hashSuffix = pCreate->hashSuffix;
11,277✔
226
  pCfg->tsdbPageSize = pCreate->tsdbPageSize * 1024;
11,277✔
227

228
  pCfg->s3ChunkSize = pCreate->s3ChunkSize;
11,277✔
229
  pCfg->s3KeepLocal = pCreate->s3KeepLocal;
11,277✔
230
  pCfg->s3Compact = pCreate->s3Compact;
11,277✔
231

232
  pCfg->standby = 0;
11,277✔
233
  pCfg->syncCfg.replicaNum = 0;
11,277✔
234
  pCfg->syncCfg.totalReplicaNum = 0;
11,277✔
235
  pCfg->syncCfg.changeVersion = pCreate->changeVersion;
11,277✔
236

237
  memset(&pCfg->syncCfg.nodeInfo, 0, sizeof(pCfg->syncCfg.nodeInfo));
11,277✔
238
  for (int32_t i = 0; i < pCreate->replica; ++i) {
25,449✔
239
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
14,169✔
240
    pNode->nodeId = pCreate->replicas[pCfg->syncCfg.replicaNum].id;
14,169✔
241
    pNode->nodePort = pCreate->replicas[pCfg->syncCfg.replicaNum].port;
14,169✔
242
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
14,169✔
243
    tstrncpy(pNode->nodeFqdn, pCreate->replicas[pCfg->syncCfg.replicaNum].fqdn, TSDB_FQDN_LEN);
14,169✔
244
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
14,169✔
245
    pCfg->syncCfg.replicaNum++;
14,172✔
246
  }
247
  if (pCreate->selfIndex != -1) {
11,280✔
248
    pCfg->syncCfg.myIndex = pCreate->selfIndex;
11,061✔
249
  }
250
  for (int32_t i = pCfg->syncCfg.replicaNum; i < pCreate->replica + pCreate->learnerReplica; ++i) {
11,499✔
251
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
219✔
252
    pNode->nodeId = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].id;
219✔
253
    pNode->nodePort = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].port;
219✔
254
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
219✔
255
    tstrncpy(pNode->nodeFqdn, pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].fqdn, TSDB_FQDN_LEN);
219✔
256
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
219✔
257
    pCfg->syncCfg.totalReplicaNum++;
219✔
258
  }
259
  pCfg->syncCfg.totalReplicaNum += pCfg->syncCfg.replicaNum;
11,280✔
260
  if (pCreate->learnerSelfIndex != -1) {
11,280✔
261
    pCfg->syncCfg.myIndex = pCreate->replica + pCreate->learnerSelfIndex;
219✔
262
  }
263
}
11,280✔
264

265
static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) {
11,278✔
266
  pCfg->vgId = pCreate->vgId;
11,278✔
267
  pCfg->vgVersion = pCreate->vgVersion;
11,278✔
268
  pCfg->dropped = 0;
11,278✔
269
  snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCreate->vgId);
11,278✔
270
}
11,278✔
271

272
static int32_t vmTsmaAdjustDays(SVnodeCfg *pCfg, SCreateVnodeReq *pReq) {
11,278✔
273
  if (pReq->isTsma) {
11,278✔
274
    SMsgHead *smaMsg = pReq->pTsma;
31✔
275
    uint32_t  contLen = (uint32_t)(htonl(smaMsg->contLen) - sizeof(SMsgHead));
31✔
276
    return smaGetTSmaDays(pCfg, POINTER_SHIFT(smaMsg, sizeof(SMsgHead)), contLen, &pCfg->tsdbCfg.days);
31✔
277
  }
278
  return 0;
11,247✔
279
}
280

281
#if 0
282
static int32_t vmTsmaProcessCreate(SVnode *pVnode, SCreateVnodeReq *pReq) {
283
  if (pReq->isTsma) {
284
    SMsgHead *smaMsg = pReq->pTsma;
285
    uint32_t  contLen = (uint32_t)(htonl(smaMsg->contLen) - sizeof(SMsgHead));
286
    return vnodeProcessCreateTSma(pVnode, POINTER_SHIFT(smaMsg, sizeof(SMsgHead)), contLen);
287
  }
288
  return 0;
289
}
290
#endif
291

292
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
11,029✔
293
  SCreateVnodeReq req = {0};
11,029✔
294
  SVnodeCfg       vnodeCfg = {0};
11,029✔
295
  SWrapperCfg     wrapperCfg = {0};
11,029✔
296
  int32_t         code = -1;
11,029✔
297
  char            path[TSDB_FILENAME_LEN] = {0};
11,029✔
298

299
  if (tDeserializeSCreateVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
11,029!
300
    return TSDB_CODE_INVALID_MSG;
×
301
  }
302

303
  if (req.learnerReplica == 0) {
11,172✔
304
    req.learnerSelfIndex = -1;
10,978✔
305
  }
306

307
  dInfo(
11,172!
308
      "vgId:%d, vnode management handle msgType:%s, start to create vnode, page:%d pageSize:%d buffer:%d szPage:%d "
309
      "szBuf:%" PRIu64 ", cacheLast:%d cacheLastSize:%d sstTrigger:%d tsdbPageSize:%d %d dbname:%s dbId:%" PRId64
310
      ", days:%d keep0:%d keep1:%d keep2:%d keepTimeOffset%d s3ChunkSize:%d s3KeepLocal:%d s3Compact:%d tsma:%d "
311
      "precision:%d compression:%d minRows:%d maxRows:%d"
312
      ", wal fsync:%d level:%d retentionPeriod:%d retentionSize:%" PRId64 " rollPeriod:%d segSize:%" PRId64
313
      ", hash method:%d begin:%u end:%u prefix:%d surfix:%d replica:%d selfIndex:%d "
314
      "learnerReplica:%d learnerSelfIndex:%d strict:%d changeVersion:%d encryptAlgorithm:%d",
315
      req.vgId, TMSG_INFO(pMsg->msgType), req.pages, req.pageSize, req.buffer, req.pageSize * 1024,
316
      (uint64_t)req.buffer * 1024 * 1024, req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize,
317
      req.tsdbPageSize * 1024, req.db, req.dbUid, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
318
      req.keepTimeOffset, req.s3ChunkSize, req.s3KeepLocal, req.s3Compact, req.isTsma, req.precision, req.compression,
319
      req.minRows, req.maxRows, req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize,
320
      req.walRollPeriod, req.walSegmentSize, req.hashMethod, req.hashBegin, req.hashEnd, req.hashPrefix, req.hashSuffix,
321
      req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict, req.changeVersion,
322
      req.encryptAlgorithm);
323

324
  for (int32_t i = 0; i < req.replica; ++i) {
25,452✔
325
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port,
14,172!
326
          req.replicas[i].id);
327
  }
328
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
11,499✔
329
    dInfo("vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.vgId, i, req.learnerReplicas[i].fqdn,
219!
330
          req.learnerReplicas[i].port, req.replicas[i].id);
331
  }
332

333
  SReplica *pReplica = NULL;
11,280✔
334
  if (req.selfIndex != -1) {
11,280✔
335
    pReplica = &req.replicas[req.selfIndex];
11,061✔
336
  } else {
337
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
219✔
338
  }
339
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
11,280!
340
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
11,279!
UNCOV
341
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
342
    dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode, reason:%s", req.vgId, pReplica->id,
×
343
           pReplica->fqdn, pReplica->port, tstrerror(code));
344
    return code;
×
345
  }
346

347
  if (req.encryptAlgorithm == DND_CA_SM4) {
11,280✔
348
    if (strlen(tsEncryptKey) == 0) {
1!
349
      code = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
350
      dError("vgId:%d, failed to create vnode since encrypt key is empty, reason:%s", req.vgId, tstrerror(code));
×
351
      return code;
×
352
    }
353
  }
354

355
  vmGenerateVnodeCfg(&req, &vnodeCfg);
11,280✔
356

357
  if ((code = vmTsmaAdjustDays(&vnodeCfg, &req)) < 0) {
11,278!
358
    dError("vgId:%d, failed to adjust tsma days since %s", req.vgId, tstrerror(code));
×
359
    goto _OVER;
×
360
  }
361

362
  vmGenerateWrapperCfg(pMgmt, &req, &wrapperCfg);
11,279✔
363

364
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, req.vgId, false);
11,280✔
365
  if (pVnode != NULL && (req.replica == 1 || !pVnode->failed)) {
11,280!
366
    dError("vgId:%d, already exist", req.vgId);
66!
367
    (void)tFreeSCreateVnodeReq(&req);
66✔
368
    vmReleaseVnode(pMgmt, pVnode);
66✔
369
    code = TSDB_CODE_VND_ALREADY_EXIST;
66✔
370
    return 0;
66✔
371
  }
372

373
  int32_t diskPrimary = vmGetPrimaryDisk(pMgmt, vnodeCfg.vgId);
11,214✔
374
  if (diskPrimary < 0) {
11,211!
375
    diskPrimary = vmAllocPrimaryDisk(pMgmt, vnodeCfg.vgId);
11,211✔
376
  }
377
  wrapperCfg.diskPrimary = diskPrimary;
11,214✔
378

379
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
11,214✔
380

381
  if (vnodeCreate(path, &vnodeCfg, diskPrimary, pMgmt->pTfs) < 0) {
11,214!
382
    dError("vgId:%d, failed to create vnode since %s", req.vgId, terrstr());
×
383
    vmReleaseVnode(pMgmt, pVnode);
×
384
    vmRemoveFromCreatingHash(pMgmt, req.vgId);
×
385
    (void)tFreeSCreateVnodeReq(&req);
×
386
    code = terrno != 0 ? terrno : -1;
×
387
    return code;
×
388
  }
389

390
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, true);
11,214✔
391
  if (pImpl == NULL) {
11,214!
392
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
393
    code = terrno != 0 ? terrno : -1;
×
394
    goto _OVER;
×
395
  }
396

397
  code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl);
11,214✔
398
  if (code != 0) {
11,214!
399
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
400
    code = terrno != 0 ? terrno : code;
×
401
    goto _OVER;
×
402
  }
403

404
#if 0
405
  code = vmTsmaProcessCreate(pImpl, &req);
406
  if (code != 0) {
407
    dError("vgId:%d, failed to create tsma since %s", req.vgId, terrstr());
408
    code = terrno;
409
    goto _OVER;
410
  }
411
#endif
412

413
  code = vnodeStart(pImpl);
11,214✔
414
  if (code != 0) {
11,214!
415
    dError("vgId:%d, failed to start sync since %s", req.vgId, terrstr());
×
416
    goto _OVER;
×
417
  }
418

419
  code = vmWriteVnodeListToFile(pMgmt);
11,214✔
420
  if (code != 0) {
11,214!
421
    code = terrno != 0 ? terrno : code;
×
422
    goto _OVER;
×
423
  }
424

425
_OVER:
11,214✔
426
  vmRemoveFromCreatingHash(pMgmt, req.vgId);
11,214✔
427

428
  if (code != 0) {
11,214!
429
    int32_t r = 0;
×
430
    r = taosThreadRwlockWrlock(&pMgmt->lock);
×
431
    if (r != 0) {
×
432
      dError("vgId:%d, failed to lock since %s", req.vgId, tstrerror(r));
×
433
    }
434
    if (r == 0) {
×
435
      dInfo("vgId:%d, remove from hash", req.vgId);
×
436
      r = taosHashRemove(pMgmt->hash, &req.vgId, sizeof(int32_t));
×
437
      if (r != 0) {
×
438
        dError("vgId:%d, failed to remove vnode since %s", req.vgId, tstrerror(r));
×
439
      }
440
    }
441
    r = taosThreadRwlockUnlock(&pMgmt->lock);
×
442
    if (r != 0) {
×
443
      dError("vgId:%d, failed to unlock since %s", req.vgId, tstrerror(r));
×
444
    }
445
    vnodeClose(pImpl);
×
446
    vnodeDestroy(0, path, pMgmt->pTfs, 0);
×
447
  } else {
448
    dInfo("vgId:%d, vnode management handle msgType:%s, end to create vnode, vnode is created", req.vgId,
11,214!
449
          TMSG_INFO(pMsg->msgType));
450
  }
451

452
  (void)tFreeSCreateVnodeReq(&req);
11,214✔
453
  terrno = code;
11,214✔
454
  return code;
11,214✔
455
}
456

457
// alter replica doesn't use this, but restore dnode still use this
458
int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
5,098✔
459
  SAlterVnodeTypeReq req = {0};
5,098✔
460
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
5,098!
461
    terrno = TSDB_CODE_INVALID_MSG;
×
462
    return -1;
×
463
  }
464

465
  if (req.learnerReplicas == 0) {
466
    req.learnerSelfIndex = -1;
467
  }
468

469
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process alter-node-type-request", req.vgId,
5,098!
470
        TMSG_INFO(pMsg->msgType));
471

472
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
5,098✔
473
  if (pVnode == NULL) {
5,098!
474
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
475
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
476
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
477
    return -1;
×
478
  }
479

480
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
5,098✔
481
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
5,098!
482
  if (role == TAOS_SYNC_ROLE_VOTER) {
5,098!
483
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
484
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
485
    vmReleaseVnode(pMgmt, pVnode);
×
486
    return -1;
×
487
  }
488

489
  dInfo("vgId:%d, checking node catch up", req.vgId);
5,098!
490
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
5,098✔
491
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
4,880✔
492
    vmReleaseVnode(pMgmt, pVnode);
4,880✔
493
    return -1;
4,880✔
494
  }
495

496
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
218!
497

498
  int32_t vgId = req.vgId;
218✔
499
  dInfo("vgId:%d, start to alter vnode type replica:%d selfIndex:%d strict:%d changeVersion:%d", vgId, req.replica,
218!
500
        req.selfIndex, req.strict, req.changeVersion);
501
  for (int32_t i = 0; i < req.replica; ++i) {
808✔
502
    SReplica *pReplica = &req.replicas[i];
590✔
503
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
590!
504
  }
505
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
218!
506
    SReplica *pReplica = &req.learnerReplicas[i];
×
507
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
×
508
  }
509

510
  if (req.replica <= 0 || (req.selfIndex < 0 && req.learnerSelfIndex < 0) || req.selfIndex >= req.replica ||
218!
511
      req.learnerSelfIndex >= req.learnerReplica) {
218!
512
    terrno = TSDB_CODE_INVALID_MSG;
×
513
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
514
    vmReleaseVnode(pMgmt, pVnode);
×
515
    return -1;
×
516
  }
517

518
  SReplica *pReplica = NULL;
218✔
519
  if (req.selfIndex != -1) {
218!
520
    pReplica = &req.replicas[req.selfIndex];
218✔
521
  } else {
522
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
×
523
  }
524

525
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
218!
526
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
218!
527
    terrno = TSDB_CODE_INVALID_MSG;
×
528
    dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode", vgId, pReplica->id, pReplica->fqdn,
×
529
           pReplica->port);
530
    vmReleaseVnode(pMgmt, pVnode);
×
531
    return -1;
×
532
  }
533

534
  dInfo("vgId:%d, start to close vnode", vgId);
218!
535
  SWrapperCfg wrapperCfg = {
218✔
536
      .dropped = pVnode->dropped,
218✔
537
      .vgId = pVnode->vgId,
218✔
538
      .vgVersion = pVnode->vgVersion,
218✔
539
      .diskPrimary = pVnode->diskPrimary,
218✔
540
  };
541
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
218✔
542

543
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
218✔
544
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
218✔
545

546
  int32_t diskPrimary = wrapperCfg.diskPrimary;
218✔
547
  char    path[TSDB_FILENAME_LEN] = {0};
218✔
548
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
218✔
549

550
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
218!
551
  if (vnodeAlterReplica(path, &req, diskPrimary, pMgmt->pTfs) < 0) {
218!
552
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
553
    return -1;
×
554
  }
555

556
  dInfo("vgId:%d, begin to open vnode", vgId);
218!
557
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, false);
218✔
558
  if (pImpl == NULL) {
218!
559
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
560
    return -1;
×
561
  }
562

563
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
218!
564
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
565
    return -1;
×
566
  }
567

568
  if (vnodeStart(pImpl) != 0) {
218!
569
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
570
    return -1;
×
571
  }
572

573
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process alter-node-type-request, vnode config is altered",
218!
574
        req.vgId, TMSG_INFO(pMsg->msgType));
575
  return 0;
218✔
576
}
577

578
int32_t vmProcessCheckLearnCatchupReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
579
  SCheckLearnCatchupReq req = {0};
×
580
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
×
581
    terrno = TSDB_CODE_INVALID_MSG;
×
582
    return -1;
×
583
  }
584

585
  if (req.learnerReplicas == 0) {
586
    req.learnerSelfIndex = -1;
587
  }
588

589
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process check-learner-catchup-request", req.vgId,
×
590
        TMSG_INFO(pMsg->msgType));
591

592
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
×
593
  if (pVnode == NULL) {
×
594
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
595
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
596
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
597
    return -1;
×
598
  }
599

600
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
×
601
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
×
602
  if (role == TAOS_SYNC_ROLE_VOTER) {
×
603
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
604
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
605
    vmReleaseVnode(pMgmt, pVnode);
×
606
    return -1;
×
607
  }
608

609
  dInfo("vgId:%d, checking node catch up", req.vgId);
×
610
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
×
611
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
×
612
    vmReleaseVnode(pMgmt, pVnode);
×
613
    return -1;
×
614
  }
615

616
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
×
617

618
  vmReleaseVnode(pMgmt, pVnode);
×
619

620
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process check-learner-catchup-request", req.vgId,
×
621
        TMSG_INFO(pMsg->msgType));
622

623
  return 0;
×
624
}
625

626
int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
88✔
627
  SDisableVnodeWriteReq req = {0};
88✔
628
  if (tDeserializeSDisableVnodeWriteReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
88!
629
    terrno = TSDB_CODE_INVALID_MSG;
×
630
    return -1;
×
631
  }
632

633
  dInfo("vgId:%d, vnode write disable:%d", req.vgId, req.disable);
88!
634

635
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
88✔
636
  if (pVnode == NULL) {
88!
637
    dError("vgId:%d, failed to disable write since %s", req.vgId, terrstr());
×
638
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
639
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
640
    return -1;
×
641
  }
642

643
  pVnode->disable = req.disable;
88✔
644
  vmReleaseVnode(pMgmt, pVnode);
88✔
645
  return 0;
88✔
646
}
647

648
int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
88✔
649
  SAlterVnodeHashRangeReq req = {0};
88✔
650
  if (tDeserializeSAlterVnodeHashRangeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
88!
651
    terrno = TSDB_CODE_INVALID_MSG;
×
652
    return -1;
×
653
  }
654

655
  int32_t srcVgId = req.srcVgId;
88✔
656
  int32_t dstVgId = req.dstVgId;
88✔
657

658
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, dstVgId);
88✔
659
  if (pVnode != NULL) {
88!
660
    dError("vgId:%d, vnode already exist", dstVgId);
×
661
    vmReleaseVnode(pMgmt, pVnode);
×
662
    terrno = TSDB_CODE_VND_ALREADY_EXIST;
×
663
    return -1;
×
664
  }
665

666
  dInfo("vgId:%d, start to alter vnode hashrange:[%u, %u], dstVgId:%d", req.srcVgId, req.hashBegin, req.hashEnd,
88!
667
        req.dstVgId);
668
  pVnode = vmAcquireVnode(pMgmt, srcVgId);
88✔
669
  if (pVnode == NULL) {
88!
670
    dError("vgId:%d, failed to alter hashrange since %s", srcVgId, terrstr());
×
671
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
672
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
673
    return -1;
×
674
  }
675

676
  SWrapperCfg wrapperCfg = {
88✔
677
      .dropped = pVnode->dropped,
88✔
678
      .vgId = dstVgId,
679
      .vgVersion = pVnode->vgVersion,
88✔
680
      .diskPrimary = pVnode->diskPrimary,
88✔
681
  };
682
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
88✔
683

684
  // prepare alter
685
  pVnode->toVgId = dstVgId;
88✔
686
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
88!
687
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
688
    return -1;
×
689
  }
690

691
  dInfo("vgId:%d, close vnode", srcVgId);
88!
692
  vmCloseVnode(pMgmt, pVnode, true, false);
88✔
693

694
  int32_t diskPrimary = wrapperCfg.diskPrimary;
88✔
695
  char    srcPath[TSDB_FILENAME_LEN] = {0};
88✔
696
  char    dstPath[TSDB_FILENAME_LEN] = {0};
88✔
697
  snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
88✔
698
  snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
88✔
699

700
  dInfo("vgId:%d, alter vnode hashrange at %s", srcVgId, srcPath);
88!
701
  if (vnodeAlterHashRange(srcPath, dstPath, &req, diskPrimary, pMgmt->pTfs) < 0) {
88!
702
    dError("vgId:%d, failed to alter vnode hashrange since %s", srcVgId, terrstr());
×
703
    return -1;
×
704
  }
705

706
  dInfo("vgId:%d, open vnode", dstVgId);
88!
707
  SVnode *pImpl = vnodeOpen(dstPath, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, false);
88✔
708

709
  if (pImpl == NULL) {
88!
710
    dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr());
×
711
    return -1;
×
712
  }
713

714
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
88!
715
    dError("vgId:%d, failed to open vnode mgmt since %s", dstVgId, terrstr());
×
716
    return -1;
×
717
  }
718

719
  if (vnodeStart(pImpl) != 0) {
88!
720
    dError("vgId:%d, failed to start sync since %s", dstVgId, terrstr());
×
721
    return -1;
×
722
  }
723

724
  // complete alter
725
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
88!
726
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
727
    return -1;
×
728
  }
729

730
  dInfo("vgId:%d, vnode hashrange is altered", dstVgId);
88!
731
  return 0;
88✔
732
}
733

734
int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
1,271✔
735
  SAlterVnodeReplicaReq alterReq = {0};
1,271✔
736
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
1,271!
737
    terrno = TSDB_CODE_INVALID_MSG;
×
738
    return -1;
×
739
  }
740

741
  if (alterReq.learnerReplica == 0) {
1,271✔
742
    alterReq.learnerSelfIndex = -1;
917✔
743
  }
744

745
  int32_t vgId = alterReq.vgId;
1,271✔
746
  dInfo(
1,271!
747
      "vgId:%d, vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
748
      "learnerSelfIndex:%d strict:%d changeVersion:%d",
749
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
750
      alterReq.learnerSelfIndex, alterReq.strict, alterReq.changeVersion);
751

752
  for (int32_t i = 0; i < alterReq.replica; ++i) {
4,731✔
753
    SReplica *pReplica = &alterReq.replicas[i];
3,460✔
754
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
3,460!
755
  }
756
  for (int32_t i = 0; i < alterReq.learnerReplica; ++i) {
1,625✔
757
    SReplica *pReplica = &alterReq.learnerReplicas[i];
354✔
758
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
354!
759
  }
760

761
  if (alterReq.replica <= 0 || (alterReq.selfIndex < 0 && alterReq.learnerSelfIndex < 0) ||
1,271!
762
      alterReq.selfIndex >= alterReq.replica || alterReq.learnerSelfIndex >= alterReq.learnerReplica) {
1,271!
763
    terrno = TSDB_CODE_INVALID_MSG;
×
764
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
765
    return -1;
×
766
  }
767

768
  SReplica *pReplica = NULL;
1,271✔
769
  if (alterReq.selfIndex != -1) {
1,271!
770
    pReplica = &alterReq.replicas[alterReq.selfIndex];
1,271✔
771
  } else {
772
    pReplica = &alterReq.learnerReplicas[alterReq.learnerSelfIndex];
×
773
  }
774

775
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
1,271!
776
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
1,271!
777
    terrno = TSDB_CODE_INVALID_MSG;
×
778
    dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode", vgId, pReplica->id, pReplica->fqdn,
×
779
           pReplica->port);
780
    return -1;
×
781
  }
782

783
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
1,271✔
784
  if (pVnode == NULL) {
1,271!
785
    dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
×
786
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
787
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
788
    return -1;
×
789
  }
790

791
  dInfo("vgId:%d, start to close vnode", vgId);
1,271!
792
  SWrapperCfg wrapperCfg = {
1,271✔
793
      .dropped = pVnode->dropped,
1,271✔
794
      .vgId = pVnode->vgId,
1,271✔
795
      .vgVersion = pVnode->vgVersion,
1,271✔
796
      .diskPrimary = pVnode->diskPrimary,
1,271✔
797
  };
798
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
1,271✔
799

800
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
1,271✔
801
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
1,271✔
802

803
  int32_t diskPrimary = wrapperCfg.diskPrimary;
1,271✔
804
  char    path[TSDB_FILENAME_LEN] = {0};
1,271✔
805
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
1,271✔
806

807
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
1,271!
808
  if (vnodeAlterReplica(path, &alterReq, diskPrimary, pMgmt->pTfs) < 0) {
1,271!
809
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
810
    return -1;
×
811
  }
812

813
  dInfo("vgId:%d, begin to open vnode", vgId);
1,271!
814
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, false);
1,271✔
815
  if (pImpl == NULL) {
1,271!
816
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
817
    return -1;
×
818
  }
819

820
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
1,271!
821
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
822
    return -1;
×
823
  }
824

825
  if (vnodeStart(pImpl) != 0) {
1,271!
826
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
827
    return -1;
×
828
  }
829

830
  dInfo(
1,271!
831
      "vgId:%d, vnode management handle msgType:%s, end to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
832
      "learnerSelfIndex:%d strict:%d",
833
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
834
      alterReq.learnerSelfIndex, alterReq.strict);
835
  return 0;
1,271✔
836
}
837

838
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
5,166✔
839
  int32_t       code = 0;
5,166✔
840
  SDropVnodeReq dropReq = {0};
5,166✔
841
  if (tDeserializeSDropVnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
5,166!
842
    terrno = TSDB_CODE_INVALID_MSG;
×
843
    return terrno;
×
844
  }
845

846
  int32_t vgId = dropReq.vgId;
5,166✔
847
  dInfo("vgId:%d, start to drop vnode", vgId);
5,166!
848

849
  if (dropReq.dnodeId != pMgmt->pData->dnodeId) {
5,166!
850
    terrno = TSDB_CODE_INVALID_MSG;
×
851
    dError("vgId:%d, dnodeId:%d not matched with local dnode", dropReq.vgId, dropReq.dnodeId);
×
852
    return terrno;
×
853
  }
854

855
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, vgId, false);
5,166✔
856
  if (pVnode == NULL) {
5,166!
857
    dInfo("vgId:%d, failed to drop since %s", vgId, terrstr());
×
858
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
859
    return terrno;
×
860
  }
861

862
  pVnode->dropped = 1;
5,166✔
863
  if ((code = vmWriteVnodeListToFile(pMgmt)) != 0) {
5,166!
864
    pVnode->dropped = 0;
×
865
    vmReleaseVnode(pMgmt, pVnode);
×
866
    return code;
×
867
  }
868

869
  vmCloseVnode(pMgmt, pVnode, false, false);
5,166✔
870
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
5,166!
871
    dError("vgId:%d, failed to write vnode list since %s", vgId, terrstr());
×
872
  }
873

874
  dInfo("vgId:%d, is dropped", vgId);
5,166!
875
  return 0;
5,166✔
876
}
877

878
int32_t vmProcessArbHeartBeatReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
7✔
879
  SVArbHeartBeatReq arbHbReq = {0};
7✔
880
  SVArbHeartBeatRsp arbHbRsp = {0};
7✔
881
  if (tDeserializeSVArbHeartBeatReq(pMsg->pCont, pMsg->contLen, &arbHbReq) != 0) {
7!
882
    terrno = TSDB_CODE_INVALID_MSG;
×
883
    return -1;
×
884
  }
885

886
  if (arbHbReq.dnodeId != pMgmt->pData->dnodeId) {
7!
887
    terrno = TSDB_CODE_INVALID_MSG;
×
888
    dError("dnodeId:%d not matched with local dnode", arbHbReq.dnodeId);
×
889
    goto _OVER;
×
890
  }
891

892
  if (strlen(arbHbReq.arbToken) == 0) {
7!
893
    terrno = TSDB_CODE_INVALID_MSG;
×
894
    dError("dnodeId:%d arbToken is empty", arbHbReq.dnodeId);
×
895
    goto _OVER;
×
896
  }
897

898
  size_t size = taosArrayGetSize(arbHbReq.hbMembers);
7✔
899

900
  arbHbRsp.dnodeId = pMgmt->pData->dnodeId;
7✔
901
  strncpy(arbHbRsp.arbToken, arbHbReq.arbToken, TSDB_ARB_TOKEN_SIZE);
7✔
902
  arbHbRsp.hbMembers = taosArrayInit(size, sizeof(SVArbHbRspMember));
7✔
903
  if (arbHbRsp.hbMembers == NULL) {
7!
904
    goto _OVER;
×
905
  }
906

907
  for (int32_t i = 0; i < size; i++) {
19✔
908
    SVArbHbReqMember *pReqMember = taosArrayGet(arbHbReq.hbMembers, i);
12✔
909
    SVnodeObj        *pVnode = vmAcquireVnode(pMgmt, pReqMember->vgId);
12✔
910
    if (pVnode == NULL) {
12!
911
      dError("dnodeId:%d vgId:%d not found failed to process arb hb req", arbHbReq.dnodeId, pReqMember->vgId);
×
912
      continue;
×
913
    }
914

915
    SVArbHbRspMember rspMember = {0};
12✔
916
    rspMember.vgId = pReqMember->vgId;
12✔
917
    rspMember.hbSeq = pReqMember->hbSeq;
12✔
918
    if (vnodeGetArbToken(pVnode->pImpl, rspMember.memberToken) != 0) {
12!
919
      dError("dnodeId:%d vgId:%d failed to get arb token", arbHbReq.dnodeId, pReqMember->vgId);
×
920
      vmReleaseVnode(pMgmt, pVnode);
×
921
      continue;
×
922
    }
923

924
    if (vnodeUpdateArbTerm(pVnode->pImpl, arbHbReq.arbTerm) != 0) {
12!
925
      dError("dnodeId:%d vgId:%d failed to update arb term", arbHbReq.dnodeId, pReqMember->vgId);
×
926
      vmReleaseVnode(pMgmt, pVnode);
×
927
      continue;
×
928
    }
929

930
    if (taosArrayPush(arbHbRsp.hbMembers, &rspMember) == NULL) {
24!
931
      dError("dnodeId:%d vgId:%d failed to push arb hb rsp member", arbHbReq.dnodeId, pReqMember->vgId);
×
932
      vmReleaseVnode(pMgmt, pVnode);
×
933
      goto _OVER;
×
934
    }
935

936
    vmReleaseVnode(pMgmt, pVnode);
12✔
937
  }
938

939
  SRpcMsg rspMsg = {.info = pMsg->info};
7✔
940
  int32_t rspLen = tSerializeSVArbHeartBeatRsp(NULL, 0, &arbHbRsp);
7✔
941
  if (rspLen < 0) {
7!
942
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
943
    goto _OVER;
×
944
  }
945

946
  void *pRsp = rpcMallocCont(rspLen);
7✔
947
  if (pRsp == NULL) {
7!
948
    terrno = terrno;
×
949
    goto _OVER;
×
950
  }
951

952
  if (tSerializeSVArbHeartBeatRsp(pRsp, rspLen, &arbHbRsp) <= 0) {
7!
953
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
954
    rpcFreeCont(pRsp);
×
955
    goto _OVER;
×
956
  }
957
  pMsg->info.rsp = pRsp;
7✔
958
  pMsg->info.rspLen = rspLen;
7✔
959

960
  terrno = TSDB_CODE_SUCCESS;
7✔
961

962
_OVER:
7✔
963
  tFreeSVArbHeartBeatReq(&arbHbReq);
7✔
964
  tFreeSVArbHeartBeatRsp(&arbHbRsp);
7✔
965
  return terrno;
7✔
966
}
967

968
SArray *vmGetMsgHandles() {
2,448✔
969
  int32_t code = -1;
2,448✔
970
  SArray *pArray = taosArrayInit(32, sizeof(SMgmtHandle));
2,448✔
971
  if (pArray == NULL) goto _OVER;
2,448!
972

973
  if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
974
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,448!
975
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,448!
976
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,448!
977
  if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,448!
978
  if (dmSetMgmtHandle(pArray, TDMT_VND_EXEC_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,448!
979
  if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,448!
980
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,448!
981
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
982
  if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
983
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,448!
984
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_CFG, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,448!
985
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,448!
986
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,448!
987
  if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,448!
988
  if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,448!
989
  if (dmSetMgmtHandle(pArray, TDMT_SCH_TASK_NOTIFY, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,448!
990
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
991
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
992
  if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_TTL_EXPIRED_TBS, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
993
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
994
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
995
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
996
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
997
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_SMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
998
  if (dmSetMgmtHandle(pArray, TDMT_VND_CANCEL_SMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
999
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
1000
  if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
1001
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
1002
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DELETE_SUB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
1003
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
1004
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SEEK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,448!
1005
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_ADD_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
1006
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DEL_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
1007
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,448!
1008
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME_PUSH, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,448!
1009
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_WALINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,448!
1010
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_COMMITTEDINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,448!
1011
  if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
1012
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
1013
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
1014
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,448!
1015
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
1016
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
1017
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_COMPACT_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,448!
1018
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
1019
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_NAME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,448!
1020

1021
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
1022
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
1023
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,448!
1024
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,448!
1025
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,448!
1026
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,448!
1027
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,448!
1028
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,448!
1029
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,448!
1030
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
1031
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
1032
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
1033
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
1034
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,448!
1035
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,448!
1036
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,448!
1037
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,448!
1038
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
1039
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
1040
  if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,448!
1041
  if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,448!
1042
  if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,448!
1043
  if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,448!
1044

1045
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
1046
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_CONSEN_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
1047

1048
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,448!
1049
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
1050
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
1051
  if (dmSetMgmtHandle(pArray, TDMT_VND_DISABLE_WRITE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,448!
1052
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,448!
1053
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
1054
  if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
1055
  if (dmSetMgmtHandle(pArray, TDMT_VND_S3MIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
1056
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
2,448!
1057
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,448!
1058
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,448!
1059
  if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,448!
1060
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CONFIG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
1061

1062
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT_ELECTION, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,448!
1063
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,448!
1064
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,448!
1065
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,448!
1066
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,448!
1067
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,448!
1068
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,448!
1069
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,448!
1070
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,448!
1071
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,448!
1072
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,448!
1073
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_FORCE_FOLLOWER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,448!
1074

1075
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
2,448!
1076
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
2,448!
1077
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
2,448!
1078
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
2,448!
1079
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
2,448!
1080

1081
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_HEARTBEAT, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,448!
1082
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_CHECK_SYNC, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,448!
1083
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_ASSIGNED_LEADER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,448!
1084

1085
  code = 0;
2,448✔
1086

1087
_OVER:
2,448✔
1088
  if (code != 0) {
2,448!
1089
    taosArrayDestroy(pArray);
×
1090
    return NULL;
×
1091
  } else {
1092
    return pArray;
2,448✔
1093
  }
1094
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc