• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3558

17 Dec 2024 06:05AM UTC coverage: 59.778% (+1.6%) from 58.204%
#3558

push

travis-ci

web-flow
Merge pull request #29179 from taosdata/merge/mainto3.0

merge: form main to 3.0 branch

132787 of 287595 branches covered (46.17%)

Branch coverage included in aggregate %.

104 of 191 new or added lines in 5 files covered. (54.45%)

6085 existing lines in 168 files now uncovered.

209348 of 284746 relevant lines covered (73.52%)

8164844.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.75
/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "taos_monitor.h"
18
#include "vmInt.h"
19

20
extern taos_counter_t *tsInsertCounter;
21

22
void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
82,776✔
23
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad));
82,776✔
24
  if (pInfo->pVloads == NULL) return;
82,776!
25

26
  tfsUpdateSize(pMgmt->pTfs);
82,776✔
27

28
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
82,776✔
29

30
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
82,776✔
31
  while (pIter) {
302,921✔
32
    SVnodeObj **ppVnode = pIter;
220,145✔
33
    if (ppVnode == NULL || *ppVnode == NULL) continue;
220,145!
34

35
    SVnodeObj *pVnode = *ppVnode;
220,145✔
36
    SVnodeLoad vload = {.vgId = pVnode->vgId};
220,145✔
37
    if (!pVnode->failed) {
220,145!
38
      if (vnodeGetLoad(pVnode->pImpl, &vload) != 0) {
220,145!
39
        dError("failed to get vnode load");
×
40
      }
41
      if (isReset) vnodeResetLoad(pVnode->pImpl, &vload);
220,145✔
42
    }
43
    if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
440,290!
44
      dError("failed to push vnode load");
×
45
    }
46
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
220,145✔
47
  }
48

49
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
82,776✔
50
}
51

52
void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
×
53
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoadLite));
×
54
  if (!pInfo->pVloads) return;
×
55

56
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
×
57

58
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
59
  while (pIter) {
×
60
    SVnodeObj **ppVnode = pIter;
×
61
    if (ppVnode == NULL || *ppVnode == NULL) continue;
×
62

63
    SVnodeObj *pVnode = *ppVnode;
×
64
    if (!pVnode->failed) {
×
65
      SVnodeLoadLite vload = {0};
×
66
      if (vnodeGetLoadLite(pVnode->pImpl, &vload) == 0) {
×
67
        if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
×
68
          taosArrayDestroy(pInfo->pVloads);
×
69
          pInfo->pVloads = NULL;
×
70
          break;
×
71
        }
72
      }
73
    }
74
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
75
  }
76

77
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
×
78
}
79

80
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
12✔
81
  SMonVloadInfo vloads = {0};
12✔
82
  vmGetVnodeLoads(pMgmt, &vloads, true);
12✔
83

84
  SArray *pVloads = vloads.pVloads;
12✔
85
  if (pVloads == NULL) return;
12!
86

87
  int32_t totalVnodes = 0;
12✔
88
  int32_t masterNum = 0;
12✔
89
  int64_t numOfSelectReqs = 0;
12✔
90
  int64_t numOfInsertReqs = 0;
12✔
91
  int64_t numOfInsertSuccessReqs = 0;
12✔
92
  int64_t numOfBatchInsertReqs = 0;
12✔
93
  int64_t numOfBatchInsertSuccessReqs = 0;
12✔
94

95
  for (int32_t i = 0; i < taosArrayGetSize(pVloads); ++i) {
38✔
96
    SVnodeLoad *pLoad = taosArrayGet(pVloads, i);
26✔
97
    numOfSelectReqs += pLoad->numOfSelectReqs;
26✔
98
    numOfInsertReqs += pLoad->numOfInsertReqs;
26✔
99
    numOfInsertSuccessReqs += pLoad->numOfInsertSuccessReqs;
26✔
100
    numOfBatchInsertReqs += pLoad->numOfBatchInsertReqs;
26✔
101
    numOfBatchInsertSuccessReqs += pLoad->numOfBatchInsertSuccessReqs;
26✔
102
    if (pLoad->syncState == TAOS_SYNC_STATE_LEADER || pLoad->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
26!
103
      masterNum++;
26✔
104
    }
105
    totalVnodes++;
26✔
106
  }
107

108
  pInfo->vstat.totalVnodes = totalVnodes;
12✔
109
  pInfo->vstat.masterNum = masterNum;
12✔
110
  pInfo->vstat.numOfSelectReqs = numOfSelectReqs;
12✔
111
  pInfo->vstat.numOfInsertReqs = numOfInsertReqs;                          // delta
12✔
112
  pInfo->vstat.numOfInsertSuccessReqs = numOfInsertSuccessReqs;            // delta
12✔
113
  pInfo->vstat.numOfBatchInsertReqs = numOfBatchInsertReqs;                // delta
12✔
114
  pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;  // delta
12✔
115
  pMgmt->state.totalVnodes = totalVnodes;
12✔
116
  pMgmt->state.masterNum = masterNum;
12✔
117
  pMgmt->state.numOfSelectReqs = numOfSelectReqs;
12✔
118
  pMgmt->state.numOfInsertReqs = numOfInsertReqs;
12✔
119
  pMgmt->state.numOfInsertSuccessReqs = numOfInsertSuccessReqs;
12✔
120
  pMgmt->state.numOfBatchInsertReqs = numOfBatchInsertReqs;
12✔
121
  pMgmt->state.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;
12✔
122

123
  if (tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs) != 0) {
12!
124
    dError("failed to get tfs monitor info");
×
125
  }
126
  taosArrayDestroy(pVloads);
12✔
127
}
128

129
void vmCleanExpriedSamples(SVnodeMgmt *pMgmt) {
12✔
130
  int list_size = taos_counter_get_keys_size(tsInsertCounter);
12✔
131
  if (list_size == 0) return;
12!
132
  int32_t *vgroup_ids;
133
  char   **keys;
134
  int      r = 0;
×
135
  r = taos_counter_get_vgroup_ids(tsInsertCounter, &keys, &vgroup_ids, &list_size);
×
136
  if (r) {
×
137
    dError("failed to get vgroup ids");
×
138
    return;
×
139
  }
140
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
×
141
  for (int i = 0; i < list_size; i++) {
×
142
    int32_t vgroup_id = vgroup_ids[i];
×
143
    void   *vnode = taosHashGet(pMgmt->runngingHash, &vgroup_id, sizeof(int32_t));
×
144
    if (vnode == NULL) {
×
145
      r = taos_counter_delete(tsInsertCounter, keys[i]);
×
146
      if (r) {
×
147
        dError("failed to delete monitor sample key:%s", keys[i]);
×
148
      }
149
    }
150
  }
151
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
×
152
  if (vgroup_ids) taosMemoryFree(vgroup_ids);
×
153
  if (keys) taosMemoryFree(keys);
×
154
  return;
×
155
}
156

157
static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
8,110✔
158
  memcpy(pCfg, &vnodeCfgDefault, sizeof(SVnodeCfg));
8,110✔
159

160
  pCfg->vgId = pCreate->vgId;
8,110✔
161
  tstrncpy(pCfg->dbname, pCreate->db, sizeof(pCfg->dbname));
8,110✔
162
  pCfg->dbId = pCreate->dbUid;
8,110✔
163
  pCfg->szPage = pCreate->pageSize * 1024;
8,110✔
164
  pCfg->szCache = pCreate->pages;
8,110✔
165
  pCfg->cacheLast = pCreate->cacheLast;
8,110✔
166
  pCfg->cacheLastSize = pCreate->cacheLastSize;
8,110✔
167
  pCfg->szBuf = (uint64_t)pCreate->buffer * 1024 * 1024;
8,110✔
168
  pCfg->isWeak = true;
8,110✔
169
  pCfg->isTsma = pCreate->isTsma;
8,110✔
170
  pCfg->tsdbCfg.compression = pCreate->compression;
8,110✔
171
  pCfg->tsdbCfg.precision = pCreate->precision;
8,110✔
172
  pCfg->tsdbCfg.days = pCreate->daysPerFile;
8,110✔
173
  pCfg->tsdbCfg.keep0 = pCreate->daysToKeep0;
8,110✔
174
  pCfg->tsdbCfg.keep1 = pCreate->daysToKeep1;
8,110✔
175
  pCfg->tsdbCfg.keep2 = pCreate->daysToKeep2;
8,110✔
176
  pCfg->tsdbCfg.keepTimeOffset = pCreate->keepTimeOffset;
8,110✔
177
  pCfg->tsdbCfg.minRows = pCreate->minRows;
8,110✔
178
  pCfg->tsdbCfg.maxRows = pCreate->maxRows;
8,110✔
179
  for (size_t i = 0; i < taosArrayGetSize(pCreate->pRetensions); ++i) {
8,128✔
180
    SRetention *pRetention = &pCfg->tsdbCfg.retentions[i];
18✔
181
    memcpy(pRetention, taosArrayGet(pCreate->pRetensions, i), sizeof(SRetention));
18✔
182
    if (i == 0) {
18✔
183
      if ((pRetention->freq >= 0 && pRetention->keep > 0)) pCfg->isRsma = 1;
6!
184
    }
185
  }
186
#if defined(TD_ENTERPRISE)
187
  pCfg->tsdbCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
8,110✔
188
  if (pCfg->tsdbCfg.encryptAlgorithm == DND_CA_SM4) {
8,110!
189
    tstrncpy(pCfg->tsdbCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
×
190
  }
191
#else
192
  pCfg->tsdbCfg.encryptAlgorithm = 0;
193
#endif
194

195
  pCfg->walCfg.vgId = pCreate->vgId;
8,110✔
196
  pCfg->walCfg.fsyncPeriod = pCreate->walFsyncPeriod;
8,110✔
197
  pCfg->walCfg.retentionPeriod = pCreate->walRetentionPeriod;
8,110✔
198
  pCfg->walCfg.rollPeriod = pCreate->walRollPeriod;
8,110✔
199
  pCfg->walCfg.retentionSize = pCreate->walRetentionSize;
8,110✔
200
  pCfg->walCfg.segSize = pCreate->walSegmentSize;
8,110✔
201
  pCfg->walCfg.level = pCreate->walLevel;
8,110✔
202
#if defined(TD_ENTERPRISE)
203
  pCfg->walCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
8,110✔
204
  if (pCfg->walCfg.encryptAlgorithm == DND_CA_SM4) {
8,110!
205
    tstrncpy(pCfg->walCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
×
206
  }
207
#else
208
  pCfg->walCfg.encryptAlgorithm = 0;
209
#endif
210

211
#if defined(TD_ENTERPRISE)
212
  pCfg->tdbEncryptAlgorithm = pCreate->encryptAlgorithm;
8,110✔
213
  if (pCfg->tdbEncryptAlgorithm == DND_CA_SM4) {
8,110!
214
    tstrncpy(pCfg->tdbEncryptKey, tsEncryptKey, ENCRYPT_KEY_LEN);
×
215
  }
216
#else
217
  pCfg->tdbEncryptAlgorithm = 0;
218
#endif
219

220
  pCfg->sttTrigger = pCreate->sstTrigger;
8,110✔
221
  pCfg->hashBegin = pCreate->hashBegin;
8,110✔
222
  pCfg->hashEnd = pCreate->hashEnd;
8,110✔
223
  pCfg->hashMethod = pCreate->hashMethod;
8,110✔
224
  pCfg->hashPrefix = pCreate->hashPrefix;
8,110✔
225
  pCfg->hashSuffix = pCreate->hashSuffix;
8,110✔
226
  pCfg->tsdbPageSize = pCreate->tsdbPageSize * 1024;
8,110✔
227

228
  pCfg->s3ChunkSize = pCreate->s3ChunkSize;
8,110✔
229
  pCfg->s3KeepLocal = pCreate->s3KeepLocal;
8,110✔
230
  pCfg->s3Compact = pCreate->s3Compact;
8,110✔
231

232
  pCfg->standby = 0;
8,110✔
233
  pCfg->syncCfg.replicaNum = 0;
8,110✔
234
  pCfg->syncCfg.totalReplicaNum = 0;
8,110✔
235
  pCfg->syncCfg.changeVersion = pCreate->changeVersion;
8,110✔
236

237
  memset(&pCfg->syncCfg.nodeInfo, 0, sizeof(pCfg->syncCfg.nodeInfo));
8,110✔
238
  for (int32_t i = 0; i < pCreate->replica; ++i) {
18,116✔
239
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
10,006✔
240
    pNode->nodeId = pCreate->replicas[pCfg->syncCfg.replicaNum].id;
10,006✔
241
    pNode->nodePort = pCreate->replicas[pCfg->syncCfg.replicaNum].port;
10,006✔
242
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
10,006✔
243
    tstrncpy(pNode->nodeFqdn, pCreate->replicas[pCfg->syncCfg.replicaNum].fqdn, TSDB_FQDN_LEN);
10,006✔
244
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
10,006✔
245
    pCfg->syncCfg.replicaNum++;
10,006✔
246
  }
247
  if (pCreate->selfIndex != -1) {
8,110✔
248
    pCfg->syncCfg.myIndex = pCreate->selfIndex;
7,932✔
249
  }
250
  for (int32_t i = pCfg->syncCfg.replicaNum; i < pCreate->replica + pCreate->learnerReplica; ++i) {
8,288✔
251
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
178✔
252
    pNode->nodeId = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].id;
178✔
253
    pNode->nodePort = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].port;
178✔
254
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
178✔
255
    tstrncpy(pNode->nodeFqdn, pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].fqdn, TSDB_FQDN_LEN);
178✔
256
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
178✔
257
    pCfg->syncCfg.totalReplicaNum++;
178✔
258
  }
259
  pCfg->syncCfg.totalReplicaNum += pCfg->syncCfg.replicaNum;
8,110✔
260
  if (pCreate->learnerSelfIndex != -1) {
8,110✔
261
    pCfg->syncCfg.myIndex = pCreate->replica + pCreate->learnerSelfIndex;
178✔
262
  }
263
}
8,110✔
264

265
static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) {
8,108✔
266
  pCfg->vgId = pCreate->vgId;
8,108✔
267
  pCfg->vgVersion = pCreate->vgVersion;
8,108✔
268
  pCfg->dropped = 0;
8,108✔
269
  snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCreate->vgId);
8,108✔
270
}
8,108✔
271

272
static int32_t vmTsmaAdjustDays(SVnodeCfg *pCfg, SCreateVnodeReq *pReq) {
8,109✔
273
  if (pReq->isTsma) {
8,109✔
274
    SMsgHead *smaMsg = pReq->pTsma;
26✔
275
    uint32_t  contLen = (uint32_t)(htonl(smaMsg->contLen) - sizeof(SMsgHead));
26✔
276
    return smaGetTSmaDays(pCfg, POINTER_SHIFT(smaMsg, sizeof(SMsgHead)), contLen, &pCfg->tsdbCfg.days);
26✔
277
  }
278
  return 0;
8,083✔
279
}
280

281
#if 0
282
static int32_t vmTsmaProcessCreate(SVnode *pVnode, SCreateVnodeReq *pReq) {
283
  if (pReq->isTsma) {
284
    SMsgHead *smaMsg = pReq->pTsma;
285
    uint32_t  contLen = (uint32_t)(htonl(smaMsg->contLen) - sizeof(SMsgHead));
286
    return vnodeProcessCreateTSma(pVnode, POINTER_SHIFT(smaMsg, sizeof(SMsgHead)), contLen);
287
  }
288
  return 0;
289
}
290
#endif
291

292
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
8,100✔
293
  SCreateVnodeReq req = {0};
8,100✔
294
  SVnodeCfg       vnodeCfg = {0};
8,100✔
295
  SWrapperCfg     wrapperCfg = {0};
8,100✔
296
  int32_t         code = -1;
8,100✔
297
  char            path[TSDB_FILENAME_LEN] = {0};
8,100✔
298

299
  if (tDeserializeSCreateVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
8,100!
300
    return TSDB_CODE_INVALID_MSG;
×
301
  }
302

303
  if (req.learnerReplica == 0) {
8,065✔
304
    req.learnerSelfIndex = -1;
7,898✔
305
  }
306

307
  dInfo(
8,065✔
308
      "vgId:%d, vnode management handle msgType:%s, start to create vnode, page:%d pageSize:%d buffer:%d szPage:%d "
309
      "szBuf:%" PRIu64 ", cacheLast:%d cacheLastSize:%d sstTrigger:%d tsdbPageSize:%d %d dbname:%s dbId:%" PRId64
310
      ", days:%d keep0:%d keep1:%d keep2:%d keepTimeOffset%d s3ChunkSize:%d s3KeepLocal:%d s3Compact:%d tsma:%d "
311
      "precision:%d compression:%d minRows:%d maxRows:%d"
312
      ", wal fsync:%d level:%d retentionPeriod:%d retentionSize:%" PRId64 " rollPeriod:%d segSize:%" PRId64
313
      ", hash method:%d begin:%u end:%u prefix:%d surfix:%d replica:%d selfIndex:%d "
314
      "learnerReplica:%d learnerSelfIndex:%d strict:%d changeVersion:%d encryptAlgorithm:%d",
315
      req.vgId, TMSG_INFO(pMsg->msgType), req.pages, req.pageSize, req.buffer, req.pageSize * 1024,
316
      (uint64_t)req.buffer * 1024 * 1024, req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize,
317
      req.tsdbPageSize * 1024, req.db, req.dbUid, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
318
      req.keepTimeOffset, req.s3ChunkSize, req.s3KeepLocal, req.s3Compact, req.isTsma, req.precision, req.compression,
319
      req.minRows, req.maxRows, req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize,
320
      req.walRollPeriod, req.walSegmentSize, req.hashMethod, req.hashBegin, req.hashEnd, req.hashPrefix, req.hashSuffix,
321
      req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict, req.changeVersion,
322
      req.encryptAlgorithm);
323

324
  for (int32_t i = 0; i < req.replica; ++i) {
18,115✔
325
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port,
10,005!
326
          req.replicas[i].id);
327
  }
328
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
8,288✔
329
    dInfo("vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.vgId, i, req.learnerReplicas[i].fqdn,
178!
330
          req.learnerReplicas[i].port, req.replicas[i].id);
331
  }
332

333
  SReplica *pReplica = NULL;
8,110✔
334
  if (req.selfIndex != -1) {
8,110✔
335
    pReplica = &req.replicas[req.selfIndex];
7,932✔
336
  } else {
337
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
178✔
338
  }
339
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
8,110!
340
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
8,110!
341
    code = TSDB_CODE_INVALID_MSG;
×
342
    dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode, reason:%s", req.vgId, pReplica->id,
×
343
           pReplica->fqdn, pReplica->port, tstrerror(code));
344
    return code;
×
345
  }
346

347
  if (req.encryptAlgorithm == DND_CA_SM4) {
8,110!
348
    if (strlen(tsEncryptKey) == 0) {
×
349
      code = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
350
      dError("vgId:%d, failed to create vnode since encrypt key is empty, reason:%s", req.vgId, tstrerror(code));
×
351
      return code;
×
352
    }
353
  }
354

355
  vmGenerateVnodeCfg(&req, &vnodeCfg);
8,110✔
356

357
  if ((code = vmTsmaAdjustDays(&vnodeCfg, &req)) < 0) {
8,109!
358
    dError("vgId:%d, failed to adjust tsma days since %s", req.vgId, tstrerror(code));
×
359
    goto _OVER;
×
360
  }
361

362
  vmGenerateWrapperCfg(pMgmt, &req, &wrapperCfg);
8,109✔
363

364
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, req.vgId, false);
8,106✔
365
  if (pVnode != NULL && (req.replica == 1 || !pVnode->failed)) {
8,110!
UNCOV
366
    dError("vgId:%d, already exist", req.vgId);
×
UNCOV
367
    (void)tFreeSCreateVnodeReq(&req);
×
UNCOV
368
    vmReleaseVnode(pMgmt, pVnode);
×
UNCOV
369
    code = TSDB_CODE_VND_ALREADY_EXIST;
×
UNCOV
370
    return 0;
×
371
  }
372

373
  int32_t diskPrimary = vmGetPrimaryDisk(pMgmt, vnodeCfg.vgId);
8,110✔
374
  if (diskPrimary < 0) {
8,108!
375
    diskPrimary = vmAllocPrimaryDisk(pMgmt, vnodeCfg.vgId);
8,108✔
376
  }
377
  wrapperCfg.diskPrimary = diskPrimary;
8,110✔
378

379
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
8,110✔
380

381
  if ((code = vnodeCreate(path, &vnodeCfg, diskPrimary, pMgmt->pTfs)) < 0) {
8,110!
382
    dError("vgId:%d, failed to create vnode since %s", req.vgId, tstrerror(code));
×
383
    vmReleaseVnode(pMgmt, pVnode);
×
384
    vmCleanPrimaryDisk(pMgmt, req.vgId);
×
385
    (void)tFreeSCreateVnodeReq(&req);
×
386
    return code;
×
387
  }
388

389
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, true);
8,110✔
390
  if (pImpl == NULL) {
8,110!
391
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
392
    code = terrno != 0 ? terrno : -1;
×
393
    goto _OVER;
×
394
  }
395

396
  code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl);
8,110✔
397
  if (code != 0) {
8,110!
398
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
399
    code = terrno != 0 ? terrno : code;
×
400
    goto _OVER;
×
401
  }
402

403
#if 0
404
  code = vmTsmaProcessCreate(pImpl, &req);
405
  if (code != 0) {
406
    dError("vgId:%d, failed to create tsma since %s", req.vgId, terrstr());
407
    code = terrno;
408
    goto _OVER;
409
  }
410
#endif
411

412
  code = vnodeStart(pImpl);
8,110✔
413
  if (code != 0) {
8,110!
414
    dError("vgId:%d, failed to start sync since %s", req.vgId, terrstr());
×
415
    goto _OVER;
×
416
  }
417

418
  code = vmWriteVnodeListToFile(pMgmt);
8,110✔
419
  if (code != 0) {
8,110!
420
    code = terrno != 0 ? terrno : code;
×
421
    goto _OVER;
×
422
  }
423

424
_OVER:
8,110✔
425
  vmCleanPrimaryDisk(pMgmt, req.vgId);
8,110✔
426

427
  if (code != 0) {
8,110!
428
    vmCloseFailedVnode(pMgmt, req.vgId);
×
429

430
    vnodeClose(pImpl);
×
431
    vnodeDestroy(0, path, pMgmt->pTfs, 0);
×
432
  } else {
433
    dInfo("vgId:%d, vnode management handle msgType:%s, end to create vnode, vnode is created", req.vgId,
8,110!
434
          TMSG_INFO(pMsg->msgType));
435
  }
436

437
  (void)tFreeSCreateVnodeReq(&req);
8,110✔
438
  terrno = code;
8,110✔
439
  return code;
8,110✔
440
}
441

442
// alter replica doesn't use this, but restore dnode still use this
443
int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
4,373✔
444
  SAlterVnodeTypeReq req = {0};
4,373✔
445
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
4,373!
446
    terrno = TSDB_CODE_INVALID_MSG;
×
447
    return -1;
×
448
  }
449

450
  if (req.learnerReplicas == 0) {
451
    req.learnerSelfIndex = -1;
452
  }
453

454
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process alter-node-type-request", req.vgId,
4,373!
455
        TMSG_INFO(pMsg->msgType));
456

457
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
4,373✔
458
  if (pVnode == NULL) {
4,373!
459
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
460
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
461
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
462
    return -1;
×
463
  }
464

465
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
4,373✔
466
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
4,373!
467
  if (role == TAOS_SYNC_ROLE_VOTER) {
4,373!
468
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
469
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
470
    vmReleaseVnode(pMgmt, pVnode);
×
471
    return -1;
×
472
  }
473

474
  dInfo("vgId:%d, checking node catch up", req.vgId);
4,373!
475
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
4,373✔
476
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
4,196✔
477
    vmReleaseVnode(pMgmt, pVnode);
4,196✔
478
    return -1;
4,196✔
479
  }
480

481
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
177!
482

483
  int32_t vgId = req.vgId;
177✔
484
  dInfo("vgId:%d, start to alter vnode type replica:%d selfIndex:%d strict:%d changeVersion:%d", vgId, req.replica,
177!
485
        req.selfIndex, req.strict, req.changeVersion);
486
  for (int32_t i = 0; i < req.replica; ++i) {
685✔
487
    SReplica *pReplica = &req.replicas[i];
508✔
488
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
508!
489
  }
490
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
177!
491
    SReplica *pReplica = &req.learnerReplicas[i];
×
492
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
×
493
  }
494

495
  if (req.replica <= 0 || (req.selfIndex < 0 && req.learnerSelfIndex < 0) || req.selfIndex >= req.replica ||
177!
496
      req.learnerSelfIndex >= req.learnerReplica) {
177!
497
    terrno = TSDB_CODE_INVALID_MSG;
×
498
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
499
    vmReleaseVnode(pMgmt, pVnode);
×
500
    return -1;
×
501
  }
502

503
  SReplica *pReplica = NULL;
177✔
504
  if (req.selfIndex != -1) {
177!
505
    pReplica = &req.replicas[req.selfIndex];
177✔
506
  } else {
507
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
×
508
  }
509

510
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
177!
511
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
177!
512
    terrno = TSDB_CODE_INVALID_MSG;
×
513
    dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode", vgId, pReplica->id, pReplica->fqdn,
×
514
           pReplica->port);
515
    vmReleaseVnode(pMgmt, pVnode);
×
516
    return -1;
×
517
  }
518

519
  dInfo("vgId:%d, start to close vnode", vgId);
177!
520
  SWrapperCfg wrapperCfg = {
177✔
521
      .dropped = pVnode->dropped,
177✔
522
      .vgId = pVnode->vgId,
177✔
523
      .vgVersion = pVnode->vgVersion,
177✔
524
      .diskPrimary = pVnode->diskPrimary,
177✔
525
  };
526
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
177✔
527

528
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
177✔
529
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
177✔
530

531
  int32_t diskPrimary = wrapperCfg.diskPrimary;
177✔
532
  char    path[TSDB_FILENAME_LEN] = {0};
177✔
533
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
177✔
534

535
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
177!
536
  if (vnodeAlterReplica(path, &req, diskPrimary, pMgmt->pTfs) < 0) {
177!
537
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
538
    return -1;
×
539
  }
540

541
  dInfo("vgId:%d, begin to open vnode", vgId);
177!
542
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, false);
177✔
543
  if (pImpl == NULL) {
177!
544
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
545
    return -1;
×
546
  }
547

548
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
177!
549
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
550
    return -1;
×
551
  }
552

553
  if (vnodeStart(pImpl) != 0) {
177!
554
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
555
    return -1;
×
556
  }
557

558
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process alter-node-type-request, vnode config is altered",
177!
559
        req.vgId, TMSG_INFO(pMsg->msgType));
560
  return 0;
177✔
561
}
562

563
int32_t vmProcessCheckLearnCatchupReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
564
  SCheckLearnCatchupReq req = {0};
×
565
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
×
566
    terrno = TSDB_CODE_INVALID_MSG;
×
567
    return -1;
×
568
  }
569

570
  if (req.learnerReplicas == 0) {
571
    req.learnerSelfIndex = -1;
572
  }
573

574
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process check-learner-catchup-request", req.vgId,
×
575
        TMSG_INFO(pMsg->msgType));
576

577
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
×
578
  if (pVnode == NULL) {
×
579
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
580
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
581
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
582
    return -1;
×
583
  }
584

585
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
×
586
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
×
587
  if (role == TAOS_SYNC_ROLE_VOTER) {
×
588
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
589
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
590
    vmReleaseVnode(pMgmt, pVnode);
×
591
    return -1;
×
592
  }
593

594
  dInfo("vgId:%d, checking node catch up", req.vgId);
×
595
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
×
596
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
×
597
    vmReleaseVnode(pMgmt, pVnode);
×
598
    return -1;
×
599
  }
600

601
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
×
602

603
  vmReleaseVnode(pMgmt, pVnode);
×
604

605
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process check-learner-catchup-request", req.vgId,
×
606
        TMSG_INFO(pMsg->msgType));
607

608
  return 0;
×
609
}
610

611
int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
40✔
612
  SDisableVnodeWriteReq req = {0};
40✔
613
  if (tDeserializeSDisableVnodeWriteReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
40!
614
    terrno = TSDB_CODE_INVALID_MSG;
×
615
    return -1;
×
616
  }
617

618
  dInfo("vgId:%d, vnode write disable:%d", req.vgId, req.disable);
40!
619

620
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
40✔
621
  if (pVnode == NULL) {
40!
622
    dError("vgId:%d, failed to disable write since %s", req.vgId, terrstr());
×
623
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
624
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
625
    return -1;
×
626
  }
627

628
  pVnode->disable = req.disable;
40✔
629
  vmReleaseVnode(pMgmt, pVnode);
40✔
630
  return 0;
40✔
631
}
632

633
int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
40✔
634
  SAlterVnodeHashRangeReq req = {0};
40✔
635
  if (tDeserializeSAlterVnodeHashRangeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
40!
636
    terrno = TSDB_CODE_INVALID_MSG;
×
637
    return -1;
×
638
  }
639

640
  int32_t srcVgId = req.srcVgId;
40✔
641
  int32_t dstVgId = req.dstVgId;
40✔
642

643
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, dstVgId);
40✔
644
  if (pVnode != NULL) {
40!
645
    dError("vgId:%d, vnode already exist", dstVgId);
×
646
    vmReleaseVnode(pMgmt, pVnode);
×
647
    terrno = TSDB_CODE_VND_ALREADY_EXIST;
×
648
    return -1;
×
649
  }
650

651
  dInfo("vgId:%d, start to alter vnode hashrange:[%u, %u], dstVgId:%d", req.srcVgId, req.hashBegin, req.hashEnd,
40!
652
        req.dstVgId);
653
  pVnode = vmAcquireVnode(pMgmt, srcVgId);
40✔
654
  if (pVnode == NULL) {
40!
655
    dError("vgId:%d, failed to alter hashrange since %s", srcVgId, terrstr());
×
656
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
657
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
658
    return -1;
×
659
  }
660

661
  SWrapperCfg wrapperCfg = {
40✔
662
      .dropped = pVnode->dropped,
40✔
663
      .vgId = dstVgId,
664
      .vgVersion = pVnode->vgVersion,
40✔
665
      .diskPrimary = pVnode->diskPrimary,
40✔
666
  };
667
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
40✔
668

669
  // prepare alter
670
  pVnode->toVgId = dstVgId;
40✔
671
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
40!
672
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
673
    return -1;
×
674
  }
675

676
  dInfo("vgId:%d, close vnode", srcVgId);
40!
677
  vmCloseVnode(pMgmt, pVnode, true, false);
40✔
678

679
  int32_t diskPrimary = wrapperCfg.diskPrimary;
40✔
680
  char    srcPath[TSDB_FILENAME_LEN] = {0};
40✔
681
  char    dstPath[TSDB_FILENAME_LEN] = {0};
40✔
682
  snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
40✔
683
  snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
40✔
684

685
  dInfo("vgId:%d, alter vnode hashrange at %s", srcVgId, srcPath);
40!
686
  if (vnodeAlterHashRange(srcPath, dstPath, &req, diskPrimary, pMgmt->pTfs) < 0) {
40!
687
    dError("vgId:%d, failed to alter vnode hashrange since %s", srcVgId, terrstr());
×
688
    return -1;
×
689
  }
690

691
  dInfo("vgId:%d, open vnode", dstVgId);
40!
692
  SVnode *pImpl = vnodeOpen(dstPath, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, false);
40✔
693

694
  if (pImpl == NULL) {
40!
695
    dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr());
×
696
    return -1;
×
697
  }
698

699
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
40!
700
    dError("vgId:%d, failed to open vnode mgmt since %s", dstVgId, terrstr());
×
701
    return -1;
×
702
  }
703

704
  if (vnodeStart(pImpl) != 0) {
40!
705
    dError("vgId:%d, failed to start sync since %s", dstVgId, terrstr());
×
706
    return -1;
×
707
  }
708

709
  // complete alter
710
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
40!
711
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
712
    return -1;
×
713
  }
714

715
  dInfo("vgId:%d, vnode hashrange is altered", dstVgId);
40!
716
  return 0;
40✔
717
}
718

719
int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
1,125✔
720
  SAlterVnodeReplicaReq alterReq = {0};
1,125✔
721
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
1,125!
722
    terrno = TSDB_CODE_INVALID_MSG;
×
723
    return -1;
×
724
  }
725

726
  if (alterReq.learnerReplica == 0) {
1,125✔
727
    alterReq.learnerSelfIndex = -1;
812✔
728
  }
729

730
  int32_t vgId = alterReq.vgId;
1,125✔
731
  dInfo(
1,125!
732
      "vgId:%d, vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
733
      "learnerSelfIndex:%d strict:%d changeVersion:%d",
734
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
735
      alterReq.learnerSelfIndex, alterReq.strict, alterReq.changeVersion);
736

737
  for (int32_t i = 0; i < alterReq.replica; ++i) {
4,284✔
738
    SReplica *pReplica = &alterReq.replicas[i];
3,159✔
739
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
3,159!
740
  }
741
  for (int32_t i = 0; i < alterReq.learnerReplica; ++i) {
1,438✔
742
    SReplica *pReplica = &alterReq.learnerReplicas[i];
313✔
743
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
313!
744
  }
745

746
  if (alterReq.replica <= 0 || (alterReq.selfIndex < 0 && alterReq.learnerSelfIndex < 0) ||
1,125!
747
      alterReq.selfIndex >= alterReq.replica || alterReq.learnerSelfIndex >= alterReq.learnerReplica) {
1,125!
748
    terrno = TSDB_CODE_INVALID_MSG;
×
749
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
750
    return -1;
×
751
  }
752

753
  SReplica *pReplica = NULL;
1,125✔
754
  if (alterReq.selfIndex != -1) {
1,125!
755
    pReplica = &alterReq.replicas[alterReq.selfIndex];
1,125✔
756
  } else {
757
    pReplica = &alterReq.learnerReplicas[alterReq.learnerSelfIndex];
×
758
  }
759

760
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
1,125!
761
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
1,125!
762
    terrno = TSDB_CODE_INVALID_MSG;
×
763
    dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode", vgId, pReplica->id, pReplica->fqdn,
×
764
           pReplica->port);
765
    return -1;
×
766
  }
767

768
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
1,125✔
769
  if (pVnode == NULL) {
1,125!
770
    dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
×
771
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
772
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
773
    return -1;
×
774
  }
775

776
  dInfo("vgId:%d, start to close vnode", vgId);
1,125!
777
  SWrapperCfg wrapperCfg = {
1,125✔
778
      .dropped = pVnode->dropped,
1,125✔
779
      .vgId = pVnode->vgId,
1,125✔
780
      .vgVersion = pVnode->vgVersion,
1,125✔
781
      .diskPrimary = pVnode->diskPrimary,
1,125✔
782
  };
783
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
1,125✔
784

785
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
1,125✔
786
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
1,125✔
787

788
  int32_t diskPrimary = wrapperCfg.diskPrimary;
1,125✔
789
  char    path[TSDB_FILENAME_LEN] = {0};
1,125✔
790
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
1,125✔
791

792
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
1,125!
793
  if (vnodeAlterReplica(path, &alterReq, diskPrimary, pMgmt->pTfs) < 0) {
1,125!
794
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
795
    return -1;
×
796
  }
797

798
  dInfo("vgId:%d, begin to open vnode", vgId);
1,125!
799
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, false);
1,125✔
800
  if (pImpl == NULL) {
1,125!
801
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
802
    return -1;
×
803
  }
804

805
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
1,125!
806
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
807
    return -1;
×
808
  }
809

810
  if (vnodeStart(pImpl) != 0) {
1,125!
811
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
812
    return -1;
×
813
  }
814

815
  dInfo(
1,125!
816
      "vgId:%d, vnode management handle msgType:%s, end to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
817
      "learnerSelfIndex:%d strict:%d",
818
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
819
      alterReq.learnerSelfIndex, alterReq.strict);
820
  return 0;
1,125✔
821
}
822

823
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
3,872✔
824
  int32_t       code = 0;
3,872✔
825
  SDropVnodeReq dropReq = {0};
3,872✔
826
  if (tDeserializeSDropVnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
3,872!
827
    terrno = TSDB_CODE_INVALID_MSG;
×
828
    return terrno;
×
829
  }
830

831
  int32_t vgId = dropReq.vgId;
3,872✔
832
  dInfo("vgId:%d, start to drop vnode", vgId);
3,872!
833

834
  if (dropReq.dnodeId != pMgmt->pData->dnodeId) {
3,872!
835
    terrno = TSDB_CODE_INVALID_MSG;
×
836
    dError("vgId:%d, dnodeId:%d not matched with local dnode", dropReq.vgId, dropReq.dnodeId);
×
837
    return terrno;
×
838
  }
839

840
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, vgId, false);
3,872✔
841
  if (pVnode == NULL) {
3,872!
842
    dInfo("vgId:%d, failed to drop since %s", vgId, terrstr());
×
843
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
844
    return terrno;
×
845
  }
846

847
  pVnode->dropped = 1;
3,872✔
848
  if ((code = vmWriteVnodeListToFile(pMgmt)) != 0) {
3,872!
849
    pVnode->dropped = 0;
×
850
    vmReleaseVnode(pMgmt, pVnode);
×
851
    return code;
×
852
  }
853

854
  vmCloseVnode(pMgmt, pVnode, false, false);
3,872✔
855
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
3,872!
856
    dError("vgId:%d, failed to write vnode list since %s", vgId, terrstr());
×
857
  }
858

859
  dInfo("vgId:%d, is dropped", vgId);
3,872!
860
  return 0;
3,872✔
861
}
862

863
int32_t vmProcessArbHeartBeatReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
9✔
864
  SVArbHeartBeatReq arbHbReq = {0};
9✔
865
  SVArbHeartBeatRsp arbHbRsp = {0};
9✔
866
  if (tDeserializeSVArbHeartBeatReq(pMsg->pCont, pMsg->contLen, &arbHbReq) != 0) {
9!
867
    terrno = TSDB_CODE_INVALID_MSG;
×
868
    return -1;
×
869
  }
870

871
  if (arbHbReq.dnodeId != pMgmt->pData->dnodeId) {
9!
872
    terrno = TSDB_CODE_INVALID_MSG;
×
873
    dError("dnodeId:%d not matched with local dnode", arbHbReq.dnodeId);
×
874
    goto _OVER;
×
875
  }
876

877
  if (strlen(arbHbReq.arbToken) == 0) {
9!
878
    terrno = TSDB_CODE_INVALID_MSG;
×
879
    dError("dnodeId:%d arbToken is empty", arbHbReq.dnodeId);
×
880
    goto _OVER;
×
881
  }
882

883
  size_t size = taosArrayGetSize(arbHbReq.hbMembers);
9✔
884

885
  arbHbRsp.dnodeId = pMgmt->pData->dnodeId;
9✔
886
  tstrncpy(arbHbRsp.arbToken, arbHbReq.arbToken, TSDB_ARB_TOKEN_SIZE);
9✔
887
  arbHbRsp.hbMembers = taosArrayInit(size, sizeof(SVArbHbRspMember));
9✔
888
  if (arbHbRsp.hbMembers == NULL) {
9!
889
    goto _OVER;
×
890
  }
891

892
  for (int32_t i = 0; i < size; i++) {
25✔
893
    SVArbHbReqMember *pReqMember = taosArrayGet(arbHbReq.hbMembers, i);
16✔
894
    SVnodeObj        *pVnode = vmAcquireVnode(pMgmt, pReqMember->vgId);
16✔
895
    if (pVnode == NULL) {
16!
896
      dError("dnodeId:%d vgId:%d not found failed to process arb hb req", arbHbReq.dnodeId, pReqMember->vgId);
×
897
      continue;
×
898
    }
899

900
    SVArbHbRspMember rspMember = {0};
16✔
901
    rspMember.vgId = pReqMember->vgId;
16✔
902
    rspMember.hbSeq = pReqMember->hbSeq;
16✔
903
    if (vnodeGetArbToken(pVnode->pImpl, rspMember.memberToken) != 0) {
16!
904
      dError("dnodeId:%d vgId:%d failed to get arb token", arbHbReq.dnodeId, pReqMember->vgId);
×
905
      vmReleaseVnode(pMgmt, pVnode);
×
906
      continue;
×
907
    }
908

909
    if (vnodeUpdateArbTerm(pVnode->pImpl, arbHbReq.arbTerm) != 0) {
16!
910
      dError("dnodeId:%d vgId:%d failed to update arb term", arbHbReq.dnodeId, pReqMember->vgId);
×
911
      vmReleaseVnode(pMgmt, pVnode);
×
912
      continue;
×
913
    }
914

915
    if (taosArrayPush(arbHbRsp.hbMembers, &rspMember) == NULL) {
32!
916
      dError("dnodeId:%d vgId:%d failed to push arb hb rsp member", arbHbReq.dnodeId, pReqMember->vgId);
×
917
      vmReleaseVnode(pMgmt, pVnode);
×
918
      goto _OVER;
×
919
    }
920

921
    vmReleaseVnode(pMgmt, pVnode);
16✔
922
  }
923

924
  SRpcMsg rspMsg = {.info = pMsg->info};
9✔
925
  int32_t rspLen = tSerializeSVArbHeartBeatRsp(NULL, 0, &arbHbRsp);
9✔
926
  if (rspLen < 0) {
9!
927
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
928
    goto _OVER;
×
929
  }
930

931
  void *pRsp = rpcMallocCont(rspLen);
9✔
932
  if (pRsp == NULL) {
9!
933
    terrno = terrno;
×
934
    goto _OVER;
×
935
  }
936

937
  if (tSerializeSVArbHeartBeatRsp(pRsp, rspLen, &arbHbRsp) <= 0) {
9!
938
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
939
    rpcFreeCont(pRsp);
×
940
    goto _OVER;
×
941
  }
942
  pMsg->info.rsp = pRsp;
9✔
943
  pMsg->info.rspLen = rspLen;
9✔
944

945
  terrno = TSDB_CODE_SUCCESS;
9✔
946

947
_OVER:
9✔
948
  tFreeSVArbHeartBeatReq(&arbHbReq);
9✔
949
  tFreeSVArbHeartBeatRsp(&arbHbRsp);
9✔
950
  return terrno;
9✔
951
}
952

953
SArray *vmGetMsgHandles() {
1,881✔
954
  int32_t code = -1;
1,881✔
955
  SArray *pArray = taosArrayInit(32, sizeof(SMgmtHandle));
1,881✔
956
  if (pArray == NULL) goto _OVER;
1,881!
957

958
  if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
959
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
1,881!
960
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
1,881!
961
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
1,881!
962
  if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
1,881!
963
  if (dmSetMgmtHandle(pArray, TDMT_VND_EXEC_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
1,881!
964
  if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,881!
965
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,881!
966
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
967
  if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
968
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,881!
969
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_CFG, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,881!
970
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,881!
971
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,881!
972
  if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,881!
973
  if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,881!
974
  if (dmSetMgmtHandle(pArray, TDMT_SCH_TASK_NOTIFY, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,881!
975
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
976
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
977
  if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_TTL_EXPIRED_TBS, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
978
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
979
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
980
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
981
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
982
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_SMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
983
  if (dmSetMgmtHandle(pArray, TDMT_VND_CANCEL_SMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
984
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
985
  if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
986
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
987
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DELETE_SUB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
988
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
989
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SEEK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,881!
990
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_ADD_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
991
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DEL_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
992
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
1,881!
993
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME_PUSH, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
1,881!
994
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_WALINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,881!
995
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_COMMITTEDINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,881!
996
  if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
997
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
998
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
999
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,881!
1000
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
1001
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
1002
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_COMPACT_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,881!
1003
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
1004
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_NAME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
1,881!
1005

1006
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
1007
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
1008
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
1,881!
1009
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
1,881!
1010
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
1,881!
1011
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
1,881!
1012
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
1,881!
1013
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
1,881!
1014
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
1,881!
1015
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
1016
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
1017
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
1018
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
1019
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
1,881!
1020
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
1,881!
1021
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
1,881!
1022
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
1,881!
1023
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
1024
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
1025
  if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
1,881!
1026
  if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
1,881!
1027
  if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
1,881!
1028
  if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
1,881!
1029

1030
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
1031
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_CONSEN_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
1032

1033
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,881!
1034
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
1035
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
1036
  if (dmSetMgmtHandle(pArray, TDMT_VND_DISABLE_WRITE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,881!
1037
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,881!
1038
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
1039
  if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
1040
  if (dmSetMgmtHandle(pArray, TDMT_VND_S3MIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
1041
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
1,881!
1042
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,881!
1043
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,881!
1044
  if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,881!
1045
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CONFIG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
1046

1047
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT_ELECTION, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,881!
1048
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,881!
1049
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,881!
1050
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,881!
1051
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,881!
1052
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,881!
1053
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,881!
1054
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,881!
1055
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,881!
1056
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,881!
1057
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,881!
1058
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_FORCE_FOLLOWER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,881!
1059

1060
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
1,881!
1061
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
1,881!
1062
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
1,881!
1063
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
1,881!
1064
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
1,881!
1065

1066
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_HEARTBEAT, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
1,881!
1067
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_CHECK_SYNC, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
1,881!
1068
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_ASSIGNED_LEADER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
1,881!
1069

1070
  code = 0;
1,881✔
1071

1072
_OVER:
1,881✔
1073
  if (code != 0) {
1,881!
1074
    taosArrayDestroy(pArray);
×
1075
    return NULL;
×
1076
  } else {
1077
    return pArray;
1,881✔
1078
  }
1079
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc