• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3546

03 Dec 2024 10:02AM UTC coverage: 60.691% (-0.1%) from 60.839%
#3546

push

travis-ci

web-flow
Merge pull request #29015 from taosdata/fix/TS-5668

[TS-5668] fix(keeper): fix endpoint value too long for column/tag and eliminate warnings

120577 of 253823 branches covered (47.5%)

Branch coverage included in aggregate %.

201666 of 277134 relevant lines covered (72.77%)

18719900.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.17
/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "taos_monitor.h"
18
#include "vmInt.h"
19

20
extern taos_counter_t *tsInsertCounter;
21

22
void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
102,872✔
23
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad));
102,872✔
24
  if (pInfo->pVloads == NULL) return;
102,872!
25

26
  tfsUpdateSize(pMgmt->pTfs);
102,872✔
27

28
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
102,872✔
29

30
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
102,872✔
31
  while (pIter) {
1,085,047✔
32
    SVnodeObj **ppVnode = pIter;
982,175✔
33
    if (ppVnode == NULL || *ppVnode == NULL) continue;
982,175!
34

35
    SVnodeObj *pVnode = *ppVnode;
982,175✔
36
    SVnodeLoad vload = {.vgId = pVnode->vgId};
982,175✔
37
    if (!pVnode->failed) {
982,175!
38
      if (vnodeGetLoad(pVnode->pImpl, &vload) != 0) {
982,175!
39
        dError("failed to get vnode load");
×
40
      }
41
      if (isReset) vnodeResetLoad(pVnode->pImpl, &vload);
982,175✔
42
    }
43
    if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
1,964,350!
44
      dError("failed to push vnode load");
×
45
    }
46
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
982,175✔
47
  }
48

49
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
102,872✔
50
}
51

52
void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
×
53
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoadLite));
×
54
  if (!pInfo->pVloads) return;
×
55

56
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
×
57

58
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
59
  while (pIter) {
×
60
    SVnodeObj **ppVnode = pIter;
×
61
    if (ppVnode == NULL || *ppVnode == NULL) continue;
×
62

63
    SVnodeObj *pVnode = *ppVnode;
×
64
    if (!pVnode->failed) {
×
65
      SVnodeLoadLite vload = {0};
×
66
      if (vnodeGetLoadLite(pVnode->pImpl, &vload) == 0) {
×
67
        if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
×
68
          taosArrayDestroy(pInfo->pVloads);
×
69
          pInfo->pVloads = NULL;
×
70
          break;
×
71
        }
72
      }
73
    }
74
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
75
  }
76

77
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
×
78
}
79

80
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
12✔
81
  SMonVloadInfo vloads = {0};
12✔
82
  vmGetVnodeLoads(pMgmt, &vloads, true);
12✔
83

84
  SArray *pVloads = vloads.pVloads;
12✔
85
  if (pVloads == NULL) return;
12!
86

87
  int32_t totalVnodes = 0;
12✔
88
  int32_t masterNum = 0;
12✔
89
  int64_t numOfSelectReqs = 0;
12✔
90
  int64_t numOfInsertReqs = 0;
12✔
91
  int64_t numOfInsertSuccessReqs = 0;
12✔
92
  int64_t numOfBatchInsertReqs = 0;
12✔
93
  int64_t numOfBatchInsertSuccessReqs = 0;
12✔
94

95
  for (int32_t i = 0; i < taosArrayGetSize(pVloads); ++i) {
38✔
96
    SVnodeLoad *pLoad = taosArrayGet(pVloads, i);
26✔
97
    numOfSelectReqs += pLoad->numOfSelectReqs;
26✔
98
    numOfInsertReqs += pLoad->numOfInsertReqs;
26✔
99
    numOfInsertSuccessReqs += pLoad->numOfInsertSuccessReqs;
26✔
100
    numOfBatchInsertReqs += pLoad->numOfBatchInsertReqs;
26✔
101
    numOfBatchInsertSuccessReqs += pLoad->numOfBatchInsertSuccessReqs;
26✔
102
    if (pLoad->syncState == TAOS_SYNC_STATE_LEADER || pLoad->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
26!
103
      masterNum++;
26✔
104
    }
105
    totalVnodes++;
26✔
106
  }
107

108
  pInfo->vstat.totalVnodes = totalVnodes;
12✔
109
  pInfo->vstat.masterNum = masterNum;
12✔
110
  pInfo->vstat.numOfSelectReqs = numOfSelectReqs;
12✔
111
  pInfo->vstat.numOfInsertReqs = numOfInsertReqs;                          // delta
12✔
112
  pInfo->vstat.numOfInsertSuccessReqs = numOfInsertSuccessReqs;            // delta
12✔
113
  pInfo->vstat.numOfBatchInsertReqs = numOfBatchInsertReqs;                // delta
12✔
114
  pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;  // delta
12✔
115
  pMgmt->state.totalVnodes = totalVnodes;
12✔
116
  pMgmt->state.masterNum = masterNum;
12✔
117
  pMgmt->state.numOfSelectReqs = numOfSelectReqs;
12✔
118
  pMgmt->state.numOfInsertReqs = numOfInsertReqs;
12✔
119
  pMgmt->state.numOfInsertSuccessReqs = numOfInsertSuccessReqs;
12✔
120
  pMgmt->state.numOfBatchInsertReqs = numOfBatchInsertReqs;
12✔
121
  pMgmt->state.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;
12✔
122

123
  if (tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs) != 0) {
12!
124
    dError("failed to get tfs monitor info");
×
125
  }
126
  taosArrayDestroy(pVloads);
12✔
127
}
128

129
void vmCleanExpriedSamples(SVnodeMgmt *pMgmt) {
12✔
130
  int list_size = taos_counter_get_keys_size(tsInsertCounter);
12✔
131
  if (list_size == 0) return;
12!
132
  int32_t *vgroup_ids;
133
  char   **keys;
134
  int      r = 0;
×
135
  r = taos_counter_get_vgroup_ids(tsInsertCounter, &keys, &vgroup_ids, &list_size);
×
136
  if (r) {
×
137
    dError("failed to get vgroup ids");
×
138
    return;
×
139
  }
140
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
×
141
  for (int i = 0; i < list_size; i++) {
×
142
    int32_t vgroup_id = vgroup_ids[i];
×
143
    void   *vnode = taosHashGet(pMgmt->runngingHash, &vgroup_id, sizeof(int32_t));
×
144
    if (vnode == NULL) {
×
145
      r = taos_counter_delete(tsInsertCounter, keys[i]);
×
146
      if (r) {
×
147
        dError("failed to delete monitor sample key:%s", keys[i]);
×
148
      }
149
    }
150
  }
151
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
×
152
  if (vgroup_ids) taosMemoryFree(vgroup_ids);
×
153
  if (keys) taosMemoryFree(keys);
×
154
  return;
×
155
}
156

157
static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
11,390✔
158
  memcpy(pCfg, &vnodeCfgDefault, sizeof(SVnodeCfg));
11,390✔
159

160
  pCfg->vgId = pCreate->vgId;
11,390✔
161
  tstrncpy(pCfg->dbname, pCreate->db, sizeof(pCfg->dbname));
11,390✔
162
  pCfg->dbId = pCreate->dbUid;
11,390✔
163
  pCfg->szPage = pCreate->pageSize * 1024;
11,390✔
164
  pCfg->szCache = pCreate->pages;
11,390✔
165
  pCfg->cacheLast = pCreate->cacheLast;
11,390✔
166
  pCfg->cacheLastSize = pCreate->cacheLastSize;
11,390✔
167
  pCfg->szBuf = (uint64_t)pCreate->buffer * 1024 * 1024;
11,390✔
168
  pCfg->isWeak = true;
11,390✔
169
  pCfg->isTsma = pCreate->isTsma;
11,390✔
170
  pCfg->tsdbCfg.compression = pCreate->compression;
11,390✔
171
  pCfg->tsdbCfg.precision = pCreate->precision;
11,390✔
172
  pCfg->tsdbCfg.days = pCreate->daysPerFile;
11,390✔
173
  pCfg->tsdbCfg.keep0 = pCreate->daysToKeep0;
11,390✔
174
  pCfg->tsdbCfg.keep1 = pCreate->daysToKeep1;
11,390✔
175
  pCfg->tsdbCfg.keep2 = pCreate->daysToKeep2;
11,390✔
176
  pCfg->tsdbCfg.keepTimeOffset = pCreate->keepTimeOffset;
11,390✔
177
  pCfg->tsdbCfg.minRows = pCreate->minRows;
11,390✔
178
  pCfg->tsdbCfg.maxRows = pCreate->maxRows;
11,390✔
179
  for (size_t i = 0; i < taosArrayGetSize(pCreate->pRetensions); ++i) {
11,408✔
180
    SRetention *pRetention = &pCfg->tsdbCfg.retentions[i];
18✔
181
    memcpy(pRetention, taosArrayGet(pCreate->pRetensions, i), sizeof(SRetention));
18✔
182
    if (i == 0) {
18✔
183
      if ((pRetention->freq >= 0 && pRetention->keep > 0)) pCfg->isRsma = 1;
6!
184
    }
185
  }
186
#if defined(TD_ENTERPRISE)
187
  pCfg->tsdbCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
11,390✔
188
  if (pCfg->tsdbCfg.encryptAlgorithm == DND_CA_SM4) {
11,390✔
189
    strncpy(pCfg->tsdbCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN);
1✔
190
  }
191
#else
192
  pCfg->tsdbCfg.encryptAlgorithm = 0;
193
#endif
194

195
  pCfg->walCfg.vgId = pCreate->vgId;
11,390✔
196
  pCfg->walCfg.fsyncPeriod = pCreate->walFsyncPeriod;
11,390✔
197
  pCfg->walCfg.retentionPeriod = pCreate->walRetentionPeriod;
11,390✔
198
  pCfg->walCfg.rollPeriod = pCreate->walRollPeriod;
11,390✔
199
  pCfg->walCfg.retentionSize = pCreate->walRetentionSize;
11,390✔
200
  pCfg->walCfg.segSize = pCreate->walSegmentSize;
11,390✔
201
  pCfg->walCfg.level = pCreate->walLevel;
11,390✔
202
#if defined(TD_ENTERPRISE)
203
  pCfg->walCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
11,390✔
204
  if (pCfg->walCfg.encryptAlgorithm == DND_CA_SM4) {
11,390✔
205
    strncpy(pCfg->walCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN);
1✔
206
  }
207
#else
208
  pCfg->walCfg.encryptAlgorithm = 0;
209
#endif
210

211
#if defined(TD_ENTERPRISE)
212
  pCfg->tdbEncryptAlgorithm = pCreate->encryptAlgorithm;
11,390✔
213
  if (pCfg->tdbEncryptAlgorithm == DND_CA_SM4) {
11,390✔
214
    strncpy(pCfg->tdbEncryptKey, tsEncryptKey, ENCRYPT_KEY_LEN);
1✔
215
  }
216
#else
217
  pCfg->tdbEncryptAlgorithm = 0;
218
#endif
219

220
  pCfg->sttTrigger = pCreate->sstTrigger;
11,390✔
221
  pCfg->hashBegin = pCreate->hashBegin;
11,390✔
222
  pCfg->hashEnd = pCreate->hashEnd;
11,390✔
223
  pCfg->hashMethod = pCreate->hashMethod;
11,390✔
224
  pCfg->hashPrefix = pCreate->hashPrefix;
11,390✔
225
  pCfg->hashSuffix = pCreate->hashSuffix;
11,390✔
226
  pCfg->tsdbPageSize = pCreate->tsdbPageSize * 1024;
11,390✔
227

228
  pCfg->s3ChunkSize = pCreate->s3ChunkSize;
11,390✔
229
  pCfg->s3KeepLocal = pCreate->s3KeepLocal;
11,390✔
230
  pCfg->s3Compact = pCreate->s3Compact;
11,390✔
231

232
  pCfg->standby = 0;
11,390✔
233
  pCfg->syncCfg.replicaNum = 0;
11,390✔
234
  pCfg->syncCfg.totalReplicaNum = 0;
11,390✔
235
  pCfg->syncCfg.changeVersion = pCreate->changeVersion;
11,390✔
236

237
  memset(&pCfg->syncCfg.nodeInfo, 0, sizeof(pCfg->syncCfg.nodeInfo));
11,390✔
238
  for (int32_t i = 0; i < pCreate->replica; ++i) {
25,820✔
239
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
14,429✔
240
    pNode->nodeId = pCreate->replicas[pCfg->syncCfg.replicaNum].id;
14,429✔
241
    pNode->nodePort = pCreate->replicas[pCfg->syncCfg.replicaNum].port;
14,429✔
242
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
14,429✔
243
    tstrncpy(pNode->nodeFqdn, pCreate->replicas[pCfg->syncCfg.replicaNum].fqdn, TSDB_FQDN_LEN);
14,429✔
244
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
14,429✔
245
    pCfg->syncCfg.replicaNum++;
14,430✔
246
  }
247
  if (pCreate->selfIndex != -1) {
11,391✔
248
    pCfg->syncCfg.myIndex = pCreate->selfIndex;
11,171✔
249
  }
250
  for (int32_t i = pCfg->syncCfg.replicaNum; i < pCreate->replica + pCreate->learnerReplica; ++i) {
11,610✔
251
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
219✔
252
    pNode->nodeId = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].id;
219✔
253
    pNode->nodePort = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].port;
219✔
254
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
219✔
255
    tstrncpy(pNode->nodeFqdn, pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].fqdn, TSDB_FQDN_LEN);
219✔
256
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
219✔
257
    pCfg->syncCfg.totalReplicaNum++;
219✔
258
  }
259
  pCfg->syncCfg.totalReplicaNum += pCfg->syncCfg.replicaNum;
11,391✔
260
  if (pCreate->learnerSelfIndex != -1) {
11,391✔
261
    pCfg->syncCfg.myIndex = pCreate->replica + pCreate->learnerSelfIndex;
219✔
262
  }
263
}
11,391✔
264

265
static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) {
11,387✔
266
  pCfg->vgId = pCreate->vgId;
11,387✔
267
  pCfg->vgVersion = pCreate->vgVersion;
11,387✔
268
  pCfg->dropped = 0;
11,387✔
269
  snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCreate->vgId);
11,387✔
270
}
11,387✔
271

272
static int32_t vmTsmaAdjustDays(SVnodeCfg *pCfg, SCreateVnodeReq *pReq) {
11,388✔
273
  if (pReq->isTsma) {
11,388✔
274
    SMsgHead *smaMsg = pReq->pTsma;
31✔
275
    uint32_t  contLen = (uint32_t)(htonl(smaMsg->contLen) - sizeof(SMsgHead));
31✔
276
    return smaGetTSmaDays(pCfg, POINTER_SHIFT(smaMsg, sizeof(SMsgHead)), contLen, &pCfg->tsdbCfg.days);
31✔
277
  }
278
  return 0;
11,357✔
279
}
280

281
#if 0
282
static int32_t vmTsmaProcessCreate(SVnode *pVnode, SCreateVnodeReq *pReq) {
283
  if (pReq->isTsma) {
284
    SMsgHead *smaMsg = pReq->pTsma;
285
    uint32_t  contLen = (uint32_t)(htonl(smaMsg->contLen) - sizeof(SMsgHead));
286
    return vnodeProcessCreateTSma(pVnode, POINTER_SHIFT(smaMsg, sizeof(SMsgHead)), contLen);
287
  }
288
  return 0;
289
}
290
#endif
291

292
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
11,387✔
293
  SCreateVnodeReq req = {0};
11,387✔
294
  SVnodeCfg       vnodeCfg = {0};
11,387✔
295
  SWrapperCfg     wrapperCfg = {0};
11,387✔
296
  int32_t         code = -1;
11,387✔
297
  char            path[TSDB_FILENAME_LEN] = {0};
11,387✔
298

299
  if (tDeserializeSCreateVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
11,387!
300
    return TSDB_CODE_INVALID_MSG;
×
301
  }
302

303
  if (req.learnerReplica == 0) {
11,310✔
304
    req.learnerSelfIndex = -1;
11,106✔
305
  }
306

307
  dInfo(
11,310!
308
      "vgId:%d, vnode management handle msgType:%s, start to create vnode, page:%d pageSize:%d buffer:%d szPage:%d "
309
      "szBuf:%" PRIu64 ", cacheLast:%d cacheLastSize:%d sstTrigger:%d tsdbPageSize:%d %d dbname:%s dbId:%" PRId64
310
      ", days:%d keep0:%d keep1:%d keep2:%d keepTimeOffset%d s3ChunkSize:%d s3KeepLocal:%d s3Compact:%d tsma:%d "
311
      "precision:%d compression:%d minRows:%d maxRows:%d"
312
      ", wal fsync:%d level:%d retentionPeriod:%d retentionSize:%" PRId64 " rollPeriod:%d segSize:%" PRId64
313
      ", hash method:%d begin:%u end:%u prefix:%d surfix:%d replica:%d selfIndex:%d "
314
      "learnerReplica:%d learnerSelfIndex:%d strict:%d changeVersion:%d encryptAlgorithm:%d",
315
      req.vgId, TMSG_INFO(pMsg->msgType), req.pages, req.pageSize, req.buffer, req.pageSize * 1024,
316
      (uint64_t)req.buffer * 1024 * 1024, req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize,
317
      req.tsdbPageSize * 1024, req.db, req.dbUid, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
318
      req.keepTimeOffset, req.s3ChunkSize, req.s3KeepLocal, req.s3Compact, req.isTsma, req.precision, req.compression,
319
      req.minRows, req.maxRows, req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize,
320
      req.walRollPeriod, req.walSegmentSize, req.hashMethod, req.hashBegin, req.hashEnd, req.hashPrefix, req.hashSuffix,
321
      req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict, req.changeVersion,
322
      req.encryptAlgorithm);
323

324
  for (int32_t i = 0; i < req.replica; ++i) {
25,820✔
325
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port,
14,430!
326
          req.replicas[i].id);
327
  }
328
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
11,609✔
329
    dInfo("vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.vgId, i, req.learnerReplicas[i].fqdn,
219!
330
          req.learnerReplicas[i].port, req.replicas[i].id);
331
  }
332

333
  SReplica *pReplica = NULL;
11,390✔
334
  if (req.selfIndex != -1) {
11,390✔
335
    pReplica = &req.replicas[req.selfIndex];
11,171✔
336
  } else {
337
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
219✔
338
  }
339
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
11,390!
340
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
11,390!
341
    code = TSDB_CODE_INVALID_MSG;
×
342
    dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode, reason:%s", req.vgId, pReplica->id,
×
343
           pReplica->fqdn, pReplica->port, tstrerror(code));
344
    return code;
×
345
  }
346

347
  if (req.encryptAlgorithm == DND_CA_SM4) {
11,390✔
348
    if (strlen(tsEncryptKey) == 0) {
1!
349
      code = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
350
      dError("vgId:%d, failed to create vnode since encrypt key is empty, reason:%s", req.vgId, tstrerror(code));
×
351
      return code;
×
352
    }
353
  }
354

355
  vmGenerateVnodeCfg(&req, &vnodeCfg);
11,390✔
356

357
  if ((code = vmTsmaAdjustDays(&vnodeCfg, &req)) < 0) {
11,388!
358
    dError("vgId:%d, failed to adjust tsma days since %s", req.vgId, tstrerror(code));
×
359
    goto _OVER;
×
360
  }
361

362
  vmGenerateWrapperCfg(pMgmt, &req, &wrapperCfg);
11,387✔
363

364
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, req.vgId, false);
11,387✔
365
  if (pVnode != NULL && (req.replica == 1 || !pVnode->failed)) {
11,389!
366
    dError("vgId:%d, already exist", req.vgId);
144!
367
    (void)tFreeSCreateVnodeReq(&req);
144✔
368
    vmReleaseVnode(pMgmt, pVnode);
144✔
369
    code = TSDB_CODE_VND_ALREADY_EXIST;
144✔
370
    return 0;
144✔
371
  }
372

373
  int32_t diskPrimary = vmGetPrimaryDisk(pMgmt, vnodeCfg.vgId);
11,245✔
374
  if (diskPrimary < 0) {
11,242!
375
    diskPrimary = vmAllocPrimaryDisk(pMgmt, vnodeCfg.vgId);
11,243✔
376
  }
377
  wrapperCfg.diskPrimary = diskPrimary;
11,245✔
378

379
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
11,245✔
380

381
  if (vnodeCreate(path, &vnodeCfg, diskPrimary, pMgmt->pTfs) < 0) {
11,245!
382
    dError("vgId:%d, failed to create vnode since %s", req.vgId, terrstr());
×
383
    vmReleaseVnode(pMgmt, pVnode);
×
384
    vmCleanPrimaryDisk(pMgmt, req.vgId);
×
385
    (void)tFreeSCreateVnodeReq(&req);
×
386
    code = terrno != 0 ? terrno : -1;
×
387
    return code;
×
388
  }
389

390
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, true);
11,246✔
391
  if (pImpl == NULL) {
11,246!
392
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
393
    code = terrno != 0 ? terrno : -1;
×
394
    goto _OVER;
×
395
  }
396

397
  code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl);
11,246✔
398
  if (code != 0) {
11,246!
399
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
400
    code = terrno != 0 ? terrno : code;
×
401
    goto _OVER;
×
402
  }
403

404
#if 0
405
  code = vmTsmaProcessCreate(pImpl, &req);
406
  if (code != 0) {
407
    dError("vgId:%d, failed to create tsma since %s", req.vgId, terrstr());
408
    code = terrno;
409
    goto _OVER;
410
  }
411
#endif
412

413
  code = vnodeStart(pImpl);
11,246✔
414
  if (code != 0) {
11,246!
415
    dError("vgId:%d, failed to start sync since %s", req.vgId, terrstr());
×
416
    goto _OVER;
×
417
  }
418

419
  code = vmWriteVnodeListToFile(pMgmt);
11,246✔
420
  if (code != 0) {
11,246!
421
    code = terrno != 0 ? terrno : code;
×
422
    goto _OVER;
×
423
  }
424

425
_OVER:
11,246✔
426
  vmCleanPrimaryDisk(pMgmt, req.vgId);
11,246✔
427

428
  if (code != 0) {
11,246!
429
    vmCloseFailedVnode(pMgmt, req.vgId);
×
430

431
    vnodeClose(pImpl);
×
432
    vnodeDestroy(0, path, pMgmt->pTfs, 0);
×
433
  } else {
434
    dInfo("vgId:%d, vnode management handle msgType:%s, end to create vnode, vnode is created", req.vgId,
11,246!
435
          TMSG_INFO(pMsg->msgType));
436
  }
437

438
  (void)tFreeSCreateVnodeReq(&req);
11,246✔
439
  terrno = code;
11,246✔
440
  return code;
11,246✔
441
}
442

443
// alter replica doesn't use this, but restore dnode still use this
444
int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
5,588✔
445
  SAlterVnodeTypeReq req = {0};
5,588✔
446
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
5,588!
447
    terrno = TSDB_CODE_INVALID_MSG;
×
448
    return -1;
×
449
  }
450

451
  if (req.learnerReplicas == 0) {
452
    req.learnerSelfIndex = -1;
453
  }
454

455
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process alter-node-type-request", req.vgId,
5,588!
456
        TMSG_INFO(pMsg->msgType));
457

458
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
5,588✔
459
  if (pVnode == NULL) {
5,588!
460
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
461
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
462
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
463
    return -1;
×
464
  }
465

466
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
5,588✔
467
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
5,588!
468
  if (role == TAOS_SYNC_ROLE_VOTER) {
5,588!
469
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
470
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
471
    vmReleaseVnode(pMgmt, pVnode);
×
472
    return -1;
×
473
  }
474

475
  dInfo("vgId:%d, checking node catch up", req.vgId);
5,588!
476
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
5,588✔
477
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
5,370✔
478
    vmReleaseVnode(pMgmt, pVnode);
5,370✔
479
    return -1;
5,370✔
480
  }
481

482
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
218!
483

484
  int32_t vgId = req.vgId;
218✔
485
  dInfo("vgId:%d, start to alter vnode type replica:%d selfIndex:%d strict:%d changeVersion:%d", vgId, req.replica,
218!
486
        req.selfIndex, req.strict, req.changeVersion);
487
  for (int32_t i = 0; i < req.replica; ++i) {
808✔
488
    SReplica *pReplica = &req.replicas[i];
590✔
489
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
590!
490
  }
491
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
218!
492
    SReplica *pReplica = &req.learnerReplicas[i];
×
493
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
×
494
  }
495

496
  if (req.replica <= 0 || (req.selfIndex < 0 && req.learnerSelfIndex < 0) || req.selfIndex >= req.replica ||
218!
497
      req.learnerSelfIndex >= req.learnerReplica) {
218!
498
    terrno = TSDB_CODE_INVALID_MSG;
×
499
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
500
    vmReleaseVnode(pMgmt, pVnode);
×
501
    return -1;
×
502
  }
503

504
  SReplica *pReplica = NULL;
218✔
505
  if (req.selfIndex != -1) {
218!
506
    pReplica = &req.replicas[req.selfIndex];
218✔
507
  } else {
508
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
×
509
  }
510

511
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
218!
512
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
218!
513
    terrno = TSDB_CODE_INVALID_MSG;
×
514
    dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode", vgId, pReplica->id, pReplica->fqdn,
×
515
           pReplica->port);
516
    vmReleaseVnode(pMgmt, pVnode);
×
517
    return -1;
×
518
  }
519

520
  dInfo("vgId:%d, start to close vnode", vgId);
218!
521
  SWrapperCfg wrapperCfg = {
218✔
522
      .dropped = pVnode->dropped,
218✔
523
      .vgId = pVnode->vgId,
218✔
524
      .vgVersion = pVnode->vgVersion,
218✔
525
      .diskPrimary = pVnode->diskPrimary,
218✔
526
  };
527
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
218✔
528

529
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
218✔
530
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
218✔
531

532
  int32_t diskPrimary = wrapperCfg.diskPrimary;
218✔
533
  char    path[TSDB_FILENAME_LEN] = {0};
218✔
534
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
218✔
535

536
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
218!
537
  if (vnodeAlterReplica(path, &req, diskPrimary, pMgmt->pTfs) < 0) {
218!
538
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
539
    return -1;
×
540
  }
541

542
  dInfo("vgId:%d, begin to open vnode", vgId);
218!
543
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, false);
218✔
544
  if (pImpl == NULL) {
218!
545
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
546
    return -1;
×
547
  }
548

549
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
218!
550
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
551
    return -1;
×
552
  }
553

554
  if (vnodeStart(pImpl) != 0) {
218!
555
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
556
    return -1;
×
557
  }
558

559
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process alter-node-type-request, vnode config is altered",
218!
560
        req.vgId, TMSG_INFO(pMsg->msgType));
561
  return 0;
218✔
562
}
563

564
int32_t vmProcessCheckLearnCatchupReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
565
  SCheckLearnCatchupReq req = {0};
×
566
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
×
567
    terrno = TSDB_CODE_INVALID_MSG;
×
568
    return -1;
×
569
  }
570

571
  if (req.learnerReplicas == 0) {
572
    req.learnerSelfIndex = -1;
573
  }
574

575
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process check-learner-catchup-request", req.vgId,
×
576
        TMSG_INFO(pMsg->msgType));
577

578
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
×
579
  if (pVnode == NULL) {
×
580
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
581
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
582
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
583
    return -1;
×
584
  }
585

586
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
×
587
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
×
588
  if (role == TAOS_SYNC_ROLE_VOTER) {
×
589
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
590
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
591
    vmReleaseVnode(pMgmt, pVnode);
×
592
    return -1;
×
593
  }
594

595
  dInfo("vgId:%d, checking node catch up", req.vgId);
×
596
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
×
597
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
×
598
    vmReleaseVnode(pMgmt, pVnode);
×
599
    return -1;
×
600
  }
601

602
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
×
603

604
  vmReleaseVnode(pMgmt, pVnode);
×
605

606
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process check-learner-catchup-request", req.vgId,
×
607
        TMSG_INFO(pMsg->msgType));
608

609
  return 0;
×
610
}
611

612
int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
88✔
613
  SDisableVnodeWriteReq req = {0};
88✔
614
  if (tDeserializeSDisableVnodeWriteReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
88!
615
    terrno = TSDB_CODE_INVALID_MSG;
×
616
    return -1;
×
617
  }
618

619
  dInfo("vgId:%d, vnode write disable:%d", req.vgId, req.disable);
88!
620

621
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
88✔
622
  if (pVnode == NULL) {
88!
623
    dError("vgId:%d, failed to disable write since %s", req.vgId, terrstr());
×
624
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
625
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
626
    return -1;
×
627
  }
628

629
  pVnode->disable = req.disable;
88✔
630
  vmReleaseVnode(pMgmt, pVnode);
88✔
631
  return 0;
88✔
632
}
633

634
int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
88✔
635
  SAlterVnodeHashRangeReq req = {0};
88✔
636
  if (tDeserializeSAlterVnodeHashRangeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
88!
637
    terrno = TSDB_CODE_INVALID_MSG;
×
638
    return -1;
×
639
  }
640

641
  int32_t srcVgId = req.srcVgId;
88✔
642
  int32_t dstVgId = req.dstVgId;
88✔
643

644
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, dstVgId);
88✔
645
  if (pVnode != NULL) {
88!
646
    dError("vgId:%d, vnode already exist", dstVgId);
×
647
    vmReleaseVnode(pMgmt, pVnode);
×
648
    terrno = TSDB_CODE_VND_ALREADY_EXIST;
×
649
    return -1;
×
650
  }
651

652
  dInfo("vgId:%d, start to alter vnode hashrange:[%u, %u], dstVgId:%d", req.srcVgId, req.hashBegin, req.hashEnd,
88!
653
        req.dstVgId);
654
  pVnode = vmAcquireVnode(pMgmt, srcVgId);
88✔
655
  if (pVnode == NULL) {
88!
656
    dError("vgId:%d, failed to alter hashrange since %s", srcVgId, terrstr());
×
657
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
658
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
659
    return -1;
×
660
  }
661

662
  SWrapperCfg wrapperCfg = {
88✔
663
      .dropped = pVnode->dropped,
88✔
664
      .vgId = dstVgId,
665
      .vgVersion = pVnode->vgVersion,
88✔
666
      .diskPrimary = pVnode->diskPrimary,
88✔
667
  };
668
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
88✔
669

670
  // prepare alter
671
  pVnode->toVgId = dstVgId;
88✔
672
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
88!
673
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
674
    return -1;
×
675
  }
676

677
  dInfo("vgId:%d, close vnode", srcVgId);
88!
678
  vmCloseVnode(pMgmt, pVnode, true, false);
88✔
679

680
  int32_t diskPrimary = wrapperCfg.diskPrimary;
88✔
681
  char    srcPath[TSDB_FILENAME_LEN] = {0};
88✔
682
  char    dstPath[TSDB_FILENAME_LEN] = {0};
88✔
683
  snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
88✔
684
  snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
88✔
685

686
  dInfo("vgId:%d, alter vnode hashrange at %s", srcVgId, srcPath);
88!
687
  if (vnodeAlterHashRange(srcPath, dstPath, &req, diskPrimary, pMgmt->pTfs) < 0) {
88!
688
    dError("vgId:%d, failed to alter vnode hashrange since %s", srcVgId, terrstr());
×
689
    return -1;
×
690
  }
691

692
  dInfo("vgId:%d, open vnode", dstVgId);
88!
693
  SVnode *pImpl = vnodeOpen(dstPath, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, false);
88✔
694

695
  if (pImpl == NULL) {
88!
696
    dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr());
×
697
    return -1;
×
698
  }
699

700
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
88!
701
    dError("vgId:%d, failed to open vnode mgmt since %s", dstVgId, terrstr());
×
702
    return -1;
×
703
  }
704

705
  if (vnodeStart(pImpl) != 0) {
88!
706
    dError("vgId:%d, failed to start sync since %s", dstVgId, terrstr());
×
707
    return -1;
×
708
  }
709

710
  // complete alter
711
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
88!
712
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
713
    return -1;
×
714
  }
715

716
  dInfo("vgId:%d, vnode hashrange is altered", dstVgId);
88!
717
  return 0;
88✔
718
}
719

720
int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
1,273✔
721
  SAlterVnodeReplicaReq alterReq = {0};
1,273✔
722
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
1,273!
723
    terrno = TSDB_CODE_INVALID_MSG;
×
724
    return -1;
×
725
  }
726

727
  if (alterReq.learnerReplica == 0) {
1,273✔
728
    alterReq.learnerSelfIndex = -1;
919✔
729
  }
730

731
  int32_t vgId = alterReq.vgId;
1,273✔
732
  dInfo(
1,273!
733
      "vgId:%d, vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
734
      "learnerSelfIndex:%d strict:%d changeVersion:%d",
735
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
736
      alterReq.learnerSelfIndex, alterReq.strict, alterReq.changeVersion);
737

738
  for (int32_t i = 0; i < alterReq.replica; ++i) {
4,737✔
739
    SReplica *pReplica = &alterReq.replicas[i];
3,464✔
740
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
3,464!
741
  }
742
  for (int32_t i = 0; i < alterReq.learnerReplica; ++i) {
1,627✔
743
    SReplica *pReplica = &alterReq.learnerReplicas[i];
354✔
744
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
354!
745
  }
746

747
  if (alterReq.replica <= 0 || (alterReq.selfIndex < 0 && alterReq.learnerSelfIndex < 0) ||
1,273!
748
      alterReq.selfIndex >= alterReq.replica || alterReq.learnerSelfIndex >= alterReq.learnerReplica) {
1,273!
749
    terrno = TSDB_CODE_INVALID_MSG;
×
750
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
751
    return -1;
×
752
  }
753

754
  SReplica *pReplica = NULL;
1,273✔
755
  if (alterReq.selfIndex != -1) {
1,273!
756
    pReplica = &alterReq.replicas[alterReq.selfIndex];
1,273✔
757
  } else {
758
    pReplica = &alterReq.learnerReplicas[alterReq.learnerSelfIndex];
×
759
  }
760

761
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
1,273!
762
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
1,273!
763
    terrno = TSDB_CODE_INVALID_MSG;
×
764
    dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode", vgId, pReplica->id, pReplica->fqdn,
×
765
           pReplica->port);
766
    return -1;
×
767
  }
768

769
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
1,273✔
770
  if (pVnode == NULL) {
1,273!
771
    dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
×
772
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
773
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
774
    return -1;
×
775
  }
776

777
  dInfo("vgId:%d, start to close vnode", vgId);
1,273!
778
  SWrapperCfg wrapperCfg = {
1,273✔
779
      .dropped = pVnode->dropped,
1,273✔
780
      .vgId = pVnode->vgId,
1,273✔
781
      .vgVersion = pVnode->vgVersion,
1,273✔
782
      .diskPrimary = pVnode->diskPrimary,
1,273✔
783
  };
784
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
1,273✔
785

786
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
1,273✔
787
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
1,273✔
788

789
  int32_t diskPrimary = wrapperCfg.diskPrimary;
1,273✔
790
  char    path[TSDB_FILENAME_LEN] = {0};
1,273✔
791
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
1,273✔
792

793
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
1,273!
794
  if (vnodeAlterReplica(path, &alterReq, diskPrimary, pMgmt->pTfs) < 0) {
1,273!
795
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
796
    return -1;
×
797
  }
798

799
  dInfo("vgId:%d, begin to open vnode", vgId);
1,273!
800
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, false);
1,273✔
801
  if (pImpl == NULL) {
1,273!
802
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
803
    return -1;
×
804
  }
805

806
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
1,273!
807
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
808
    return -1;
×
809
  }
810

811
  if (vnodeStart(pImpl) != 0) {
1,273!
812
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
813
    return -1;
×
814
  }
815

816
  dInfo(
1,273!
817
      "vgId:%d, vnode management handle msgType:%s, end to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
818
      "learnerSelfIndex:%d strict:%d",
819
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
820
      alterReq.learnerSelfIndex, alterReq.strict);
821
  return 0;
1,273✔
822
}
823

824
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
5,192✔
825
  int32_t       code = 0;
5,192✔
826
  SDropVnodeReq dropReq = {0};
5,192✔
827
  if (tDeserializeSDropVnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
5,192!
828
    terrno = TSDB_CODE_INVALID_MSG;
×
829
    return terrno;
×
830
  }
831

832
  int32_t vgId = dropReq.vgId;
5,192✔
833
  dInfo("vgId:%d, start to drop vnode", vgId);
5,192!
834

835
  if (dropReq.dnodeId != pMgmt->pData->dnodeId) {
5,192!
836
    terrno = TSDB_CODE_INVALID_MSG;
×
837
    dError("vgId:%d, dnodeId:%d not matched with local dnode", dropReq.vgId, dropReq.dnodeId);
×
838
    return terrno;
×
839
  }
840

841
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, vgId, false);
5,192✔
842
  if (pVnode == NULL) {
5,192!
843
    dInfo("vgId:%d, failed to drop since %s", vgId, terrstr());
×
844
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
845
    return terrno;
×
846
  }
847

848
  pVnode->dropped = 1;
5,192✔
849
  if ((code = vmWriteVnodeListToFile(pMgmt)) != 0) {
5,192!
850
    pVnode->dropped = 0;
×
851
    vmReleaseVnode(pMgmt, pVnode);
×
852
    return code;
×
853
  }
854

855
  vmCloseVnode(pMgmt, pVnode, false, false);
5,192✔
856
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
5,192!
857
    dError("vgId:%d, failed to write vnode list since %s", vgId, terrstr());
×
858
  }
859

860
  dInfo("vgId:%d, is dropped", vgId);
5,192!
861
  return 0;
5,192✔
862
}
863

864
int32_t vmProcessArbHeartBeatReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
9✔
865
  SVArbHeartBeatReq arbHbReq = {0};
9✔
866
  SVArbHeartBeatRsp arbHbRsp = {0};
9✔
867
  if (tDeserializeSVArbHeartBeatReq(pMsg->pCont, pMsg->contLen, &arbHbReq) != 0) {
9!
868
    terrno = TSDB_CODE_INVALID_MSG;
×
869
    return -1;
×
870
  }
871

872
  if (arbHbReq.dnodeId != pMgmt->pData->dnodeId) {
9!
873
    terrno = TSDB_CODE_INVALID_MSG;
×
874
    dError("dnodeId:%d not matched with local dnode", arbHbReq.dnodeId);
×
875
    goto _OVER;
×
876
  }
877

878
  if (strlen(arbHbReq.arbToken) == 0) {
9!
879
    terrno = TSDB_CODE_INVALID_MSG;
×
880
    dError("dnodeId:%d arbToken is empty", arbHbReq.dnodeId);
×
881
    goto _OVER;
×
882
  }
883

884
  size_t size = taosArrayGetSize(arbHbReq.hbMembers);
9✔
885

886
  arbHbRsp.dnodeId = pMgmt->pData->dnodeId;
9✔
887
  strncpy(arbHbRsp.arbToken, arbHbReq.arbToken, TSDB_ARB_TOKEN_SIZE);
9✔
888
  arbHbRsp.hbMembers = taosArrayInit(size, sizeof(SVArbHbRspMember));
9✔
889
  if (arbHbRsp.hbMembers == NULL) {
9!
890
    goto _OVER;
×
891
  }
892

893
  for (int32_t i = 0; i < size; i++) {
25✔
894
    SVArbHbReqMember *pReqMember = taosArrayGet(arbHbReq.hbMembers, i);
16✔
895
    SVnodeObj        *pVnode = vmAcquireVnode(pMgmt, pReqMember->vgId);
16✔
896
    if (pVnode == NULL) {
16!
897
      dError("dnodeId:%d vgId:%d not found failed to process arb hb req", arbHbReq.dnodeId, pReqMember->vgId);
×
898
      continue;
×
899
    }
900

901
    SVArbHbRspMember rspMember = {0};
16✔
902
    rspMember.vgId = pReqMember->vgId;
16✔
903
    rspMember.hbSeq = pReqMember->hbSeq;
16✔
904
    if (vnodeGetArbToken(pVnode->pImpl, rspMember.memberToken) != 0) {
16!
905
      dError("dnodeId:%d vgId:%d failed to get arb token", arbHbReq.dnodeId, pReqMember->vgId);
×
906
      vmReleaseVnode(pMgmt, pVnode);
×
907
      continue;
×
908
    }
909

910
    if (vnodeUpdateArbTerm(pVnode->pImpl, arbHbReq.arbTerm) != 0) {
16!
911
      dError("dnodeId:%d vgId:%d failed to update arb term", arbHbReq.dnodeId, pReqMember->vgId);
×
912
      vmReleaseVnode(pMgmt, pVnode);
×
913
      continue;
×
914
    }
915

916
    if (taosArrayPush(arbHbRsp.hbMembers, &rspMember) == NULL) {
32!
917
      dError("dnodeId:%d vgId:%d failed to push arb hb rsp member", arbHbReq.dnodeId, pReqMember->vgId);
×
918
      vmReleaseVnode(pMgmt, pVnode);
×
919
      goto _OVER;
×
920
    }
921

922
    vmReleaseVnode(pMgmt, pVnode);
16✔
923
  }
924

925
  SRpcMsg rspMsg = {.info = pMsg->info};
9✔
926
  int32_t rspLen = tSerializeSVArbHeartBeatRsp(NULL, 0, &arbHbRsp);
9✔
927
  if (rspLen < 0) {
9!
928
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
929
    goto _OVER;
×
930
  }
931

932
  void *pRsp = rpcMallocCont(rspLen);
9✔
933
  if (pRsp == NULL) {
9!
934
    terrno = terrno;
×
935
    goto _OVER;
×
936
  }
937

938
  if (tSerializeSVArbHeartBeatRsp(pRsp, rspLen, &arbHbRsp) <= 0) {
9!
939
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
940
    rpcFreeCont(pRsp);
×
941
    goto _OVER;
×
942
  }
943
  pMsg->info.rsp = pRsp;
9✔
944
  pMsg->info.rspLen = rspLen;
9✔
945

946
  terrno = TSDB_CODE_SUCCESS;
9✔
947

948
_OVER:
9✔
949
  tFreeSVArbHeartBeatReq(&arbHbReq);
9✔
950
  tFreeSVArbHeartBeatRsp(&arbHbRsp);
9✔
951
  return terrno;
9✔
952
}
953

954
SArray *vmGetMsgHandles() {
2,449✔
955
  int32_t code = -1;
2,449✔
956
  SArray *pArray = taosArrayInit(32, sizeof(SMgmtHandle));
2,449✔
957
  if (pArray == NULL) goto _OVER;
2,449!
958

959
  if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
960
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,449!
961
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,449!
962
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,449!
963
  if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,449!
964
  if (dmSetMgmtHandle(pArray, TDMT_VND_EXEC_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,449!
965
  if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,449!
966
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,449!
967
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
968
  if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
969
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,449!
970
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_CFG, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,449!
971
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,449!
972
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,449!
973
  if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,449!
974
  if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,449!
975
  if (dmSetMgmtHandle(pArray, TDMT_SCH_TASK_NOTIFY, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,449!
976
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
977
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
978
  if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_TTL_EXPIRED_TBS, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
979
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
980
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
981
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
982
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
983
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_SMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
984
  if (dmSetMgmtHandle(pArray, TDMT_VND_CANCEL_SMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
985
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
986
  if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
987
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
988
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DELETE_SUB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
989
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
990
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SEEK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,449!
991
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_ADD_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
992
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DEL_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
993
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,449!
994
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME_PUSH, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,449!
995
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_WALINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,449!
996
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_COMMITTEDINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,449!
997
  if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
998
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
999
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
1000
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,449!
1001
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
1002
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
1003
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_COMPACT_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,449!
1004
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
1005
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_NAME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,449!
1006

1007
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
1008
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
1009
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,449!
1010
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,449!
1011
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,449!
1012
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,449!
1013
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,449!
1014
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,449!
1015
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,449!
1016
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
1017
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
1018
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
1019
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
1020
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,449!
1021
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,449!
1022
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,449!
1023
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,449!
1024
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
1025
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
1026
  if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,449!
1027
  if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,449!
1028
  if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,449!
1029
  if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,449!
1030

1031
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
1032
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_CONSEN_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
1033

1034
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,449!
1035
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
1036
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
1037
  if (dmSetMgmtHandle(pArray, TDMT_VND_DISABLE_WRITE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,449!
1038
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,449!
1039
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
1040
  if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
1041
  if (dmSetMgmtHandle(pArray, TDMT_VND_S3MIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
1042
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
2,449!
1043
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,449!
1044
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,449!
1045
  if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,449!
1046
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CONFIG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
1047

1048
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT_ELECTION, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,449!
1049
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,449!
1050
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,449!
1051
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,449!
1052
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,449!
1053
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,449!
1054
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,449!
1055
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,449!
1056
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,449!
1057
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,449!
1058
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,449!
1059
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_FORCE_FOLLOWER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,449!
1060

1061
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
2,449!
1062
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
2,449!
1063
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
2,449!
1064
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
2,449!
1065
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
2,449!
1066

1067
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_HEARTBEAT, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,449!
1068
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_CHECK_SYNC, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,449!
1069
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_ASSIGNED_LEADER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,449!
1070

1071
  code = 0;
2,449✔
1072

1073
_OVER:
2,449✔
1074
  if (code != 0) {
2,449!
1075
    taosArrayDestroy(pArray);
×
1076
    return NULL;
×
1077
  } else {
1078
    return pArray;
2,449✔
1079
  }
1080
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc