• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3532

20 Nov 2024 07:11AM UTC coverage: 60.78% (+0.6%) from 60.213%
#3532

push

travis-ci

web-flow
Merge pull request #28823 from taosdata/fix/3.0/TD-32587

fix:[TD-32587]fix stmt segmentation fault

119943 of 252352 branches covered (47.53%)

Branch coverage included in aggregate %.

1 of 4 new or added lines in 1 file covered. (25.0%)

463 existing lines in 99 files now uncovered.

200682 of 275165 relevant lines covered (72.93%)

15642683.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.11
/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "taos_monitor.h"
18
#include "vmInt.h"
19

20
extern taos_counter_t *tsInsertCounter;
21

22
void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
100,691✔
23
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad));
100,691✔
24
  if (pInfo->pVloads == NULL) return;
100,691!
25

26
  tfsUpdateSize(pMgmt->pTfs);
100,691✔
27

28
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
100,691✔
29

30
  void *pIter = taosHashIterate(pMgmt->hash, NULL);
100,691✔
31
  while (pIter) {
1,055,427✔
32
    SVnodeObj **ppVnode = pIter;
954,736✔
33
    if (ppVnode == NULL || *ppVnode == NULL) continue;
954,736!
34

35
    SVnodeObj *pVnode = *ppVnode;
954,736✔
36
    SVnodeLoad vload = {.vgId = pVnode->vgId};
954,736✔
37
    if (!pVnode->failed) {
954,736!
38
      if (vnodeGetLoad(pVnode->pImpl, &vload) != 0) {
954,736!
39
        dError("failed to get vnode load");
×
40
      }
41
      if (isReset) vnodeResetLoad(pVnode->pImpl, &vload);
954,736✔
42
    }
43
    if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
1,909,472!
44
      dError("failed to push vnode load");
×
45
    }
46
    pIter = taosHashIterate(pMgmt->hash, pIter);
954,736✔
47
  }
48

49
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
100,691✔
50
}
51

52
void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
×
53
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoadLite));
×
54
  if (!pInfo->pVloads) return;
×
55

56
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
×
57

58
  void *pIter = taosHashIterate(pMgmt->hash, NULL);
×
59
  while (pIter) {
×
60
    SVnodeObj **ppVnode = pIter;
×
61
    if (ppVnode == NULL || *ppVnode == NULL) continue;
×
62

63
    SVnodeObj *pVnode = *ppVnode;
×
64
    if (!pVnode->failed) {
×
65
      SVnodeLoadLite vload = {0};
×
66
      if (vnodeGetLoadLite(pVnode->pImpl, &vload) == 0) {
×
67
        if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
×
68
          taosArrayDestroy(pInfo->pVloads);
×
69
          pInfo->pVloads = NULL;
×
70
          break;
×
71
        }
72
      }
73
    }
74
    pIter = taosHashIterate(pMgmt->hash, pIter);
×
75
  }
76

77
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
×
78
}
79

80
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
12✔
81
  SMonVloadInfo vloads = {0};
12✔
82
  vmGetVnodeLoads(pMgmt, &vloads, true);
12✔
83

84
  SArray *pVloads = vloads.pVloads;
12✔
85
  if (pVloads == NULL) return;
12!
86

87
  int32_t totalVnodes = 0;
12✔
88
  int32_t masterNum = 0;
12✔
89
  int64_t numOfSelectReqs = 0;
12✔
90
  int64_t numOfInsertReqs = 0;
12✔
91
  int64_t numOfInsertSuccessReqs = 0;
12✔
92
  int64_t numOfBatchInsertReqs = 0;
12✔
93
  int64_t numOfBatchInsertSuccessReqs = 0;
12✔
94

95
  for (int32_t i = 0; i < taosArrayGetSize(pVloads); ++i) {
38✔
96
    SVnodeLoad *pLoad = taosArrayGet(pVloads, i);
26✔
97
    numOfSelectReqs += pLoad->numOfSelectReqs;
26✔
98
    numOfInsertReqs += pLoad->numOfInsertReqs;
26✔
99
    numOfInsertSuccessReqs += pLoad->numOfInsertSuccessReqs;
26✔
100
    numOfBatchInsertReqs += pLoad->numOfBatchInsertReqs;
26✔
101
    numOfBatchInsertSuccessReqs += pLoad->numOfBatchInsertSuccessReqs;
26✔
102
    if (pLoad->syncState == TAOS_SYNC_STATE_LEADER || pLoad->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
26!
103
      masterNum++;
26✔
104
    }
105
    totalVnodes++;
26✔
106
  }
107

108
  pInfo->vstat.totalVnodes = totalVnodes;
12✔
109
  pInfo->vstat.masterNum = masterNum;
12✔
110
  pInfo->vstat.numOfSelectReqs = numOfSelectReqs;
12✔
111
  pInfo->vstat.numOfInsertReqs = numOfInsertReqs;                          // delta
12✔
112
  pInfo->vstat.numOfInsertSuccessReqs = numOfInsertSuccessReqs;            // delta
12✔
113
  pInfo->vstat.numOfBatchInsertReqs = numOfBatchInsertReqs;                // delta
12✔
114
  pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;  // delta
12✔
115
  pMgmt->state.totalVnodes = totalVnodes;
12✔
116
  pMgmt->state.masterNum = masterNum;
12✔
117
  pMgmt->state.numOfSelectReqs = numOfSelectReqs;
12✔
118
  pMgmt->state.numOfInsertReqs = numOfInsertReqs;
12✔
119
  pMgmt->state.numOfInsertSuccessReqs = numOfInsertSuccessReqs;
12✔
120
  pMgmt->state.numOfBatchInsertReqs = numOfBatchInsertReqs;
12✔
121
  pMgmt->state.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;
12✔
122

123
  if (tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs) != 0) {
12!
124
    dError("failed to get tfs monitor info");
×
125
  }
126
  taosArrayDestroy(pVloads);
12✔
127
}
128

129
void vmCleanExpriedSamples(SVnodeMgmt *pMgmt) {
12✔
130
  int list_size = taos_counter_get_keys_size(tsInsertCounter);
12✔
131
  if (list_size == 0) return;
12!
132
  int32_t *vgroup_ids;
133
  char   **keys;
134
  int      r = 0;
×
135
  r = taos_counter_get_vgroup_ids(tsInsertCounter, &keys, &vgroup_ids, &list_size);
×
136
  if (r) {
×
137
    dError("failed to get vgroup ids");
×
138
    return;
×
139
  }
140
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
×
141
  for (int i = 0; i < list_size; i++) {
×
142
    int32_t vgroup_id = vgroup_ids[i];
×
143
    void   *vnode = taosHashGet(pMgmt->hash, &vgroup_id, sizeof(int32_t));
×
144
    if (vnode == NULL) {
×
145
      r = taos_counter_delete(tsInsertCounter, keys[i]);
×
146
      if (r) {
×
147
        dError("failed to delete monitor sample key:%s", keys[i]);
×
148
      }
149
    }
150
  }
151
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
×
152
  if (vgroup_ids) taosMemoryFree(vgroup_ids);
×
153
  if (keys) taosMemoryFree(keys);
×
154
  return;
×
155
}
156

157
static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
11,309✔
158
  memcpy(pCfg, &vnodeCfgDefault, sizeof(SVnodeCfg));
11,309✔
159

160
  pCfg->vgId = pCreate->vgId;
11,309✔
161
  tstrncpy(pCfg->dbname, pCreate->db, sizeof(pCfg->dbname));
11,309✔
162
  pCfg->dbId = pCreate->dbUid;
11,309✔
163
  pCfg->szPage = pCreate->pageSize * 1024;
11,309✔
164
  pCfg->szCache = pCreate->pages;
11,309✔
165
  pCfg->cacheLast = pCreate->cacheLast;
11,309✔
166
  pCfg->cacheLastSize = pCreate->cacheLastSize;
11,309✔
167
  pCfg->szBuf = (uint64_t)pCreate->buffer * 1024 * 1024;
11,309✔
168
  pCfg->isWeak = true;
11,309✔
169
  pCfg->isTsma = pCreate->isTsma;
11,309✔
170
  pCfg->tsdbCfg.compression = pCreate->compression;
11,309✔
171
  pCfg->tsdbCfg.precision = pCreate->precision;
11,309✔
172
  pCfg->tsdbCfg.days = pCreate->daysPerFile;
11,309✔
173
  pCfg->tsdbCfg.keep0 = pCreate->daysToKeep0;
11,309✔
174
  pCfg->tsdbCfg.keep1 = pCreate->daysToKeep1;
11,309✔
175
  pCfg->tsdbCfg.keep2 = pCreate->daysToKeep2;
11,309✔
176
  pCfg->tsdbCfg.keepTimeOffset = pCreate->keepTimeOffset;
11,309✔
177
  pCfg->tsdbCfg.minRows = pCreate->minRows;
11,309✔
178
  pCfg->tsdbCfg.maxRows = pCreate->maxRows;
11,309✔
179
  for (size_t i = 0; i < taosArrayGetSize(pCreate->pRetensions); ++i) {
11,327✔
180
    SRetention *pRetention = &pCfg->tsdbCfg.retentions[i];
17✔
181
    memcpy(pRetention, taosArrayGet(pCreate->pRetensions, i), sizeof(SRetention));
17✔
182
    if (i == 0) {
18✔
183
      if ((pRetention->freq >= 0 && pRetention->keep > 0)) pCfg->isRsma = 1;
6!
184
    }
185
  }
186
#if defined(TD_ENTERPRISE)
187
  pCfg->tsdbCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
11,309✔
188
  if (pCfg->tsdbCfg.encryptAlgorithm == DND_CA_SM4) {
11,309✔
189
    strncpy(pCfg->tsdbCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN);
1✔
190
  }
191
#else
192
  pCfg->tsdbCfg.encryptAlgorithm = 0;
193
#endif
194

195
  pCfg->walCfg.vgId = pCreate->vgId;
11,309✔
196
  pCfg->walCfg.fsyncPeriod = pCreate->walFsyncPeriod;
11,309✔
197
  pCfg->walCfg.retentionPeriod = pCreate->walRetentionPeriod;
11,309✔
198
  pCfg->walCfg.rollPeriod = pCreate->walRollPeriod;
11,309✔
199
  pCfg->walCfg.retentionSize = pCreate->walRetentionSize;
11,309✔
200
  pCfg->walCfg.segSize = pCreate->walSegmentSize;
11,309✔
201
  pCfg->walCfg.level = pCreate->walLevel;
11,309✔
202
#if defined(TD_ENTERPRISE)
203
  pCfg->walCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
11,309✔
204
  if (pCfg->walCfg.encryptAlgorithm == DND_CA_SM4) {
11,309✔
205
    strncpy(pCfg->walCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN);
1✔
206
  }
207
#else
208
  pCfg->walCfg.encryptAlgorithm = 0;
209
#endif
210

211
#if defined(TD_ENTERPRISE)
212
  pCfg->tdbEncryptAlgorithm = pCreate->encryptAlgorithm;
11,309✔
213
  if (pCfg->tdbEncryptAlgorithm == DND_CA_SM4) {
11,309✔
214
    strncpy(pCfg->tdbEncryptKey, tsEncryptKey, ENCRYPT_KEY_LEN);
1✔
215
  }
216
#else
217
  pCfg->tdbEncryptAlgorithm = 0;
218
#endif
219

220
  pCfg->sttTrigger = pCreate->sstTrigger;
11,309✔
221
  pCfg->hashBegin = pCreate->hashBegin;
11,309✔
222
  pCfg->hashEnd = pCreate->hashEnd;
11,309✔
223
  pCfg->hashMethod = pCreate->hashMethod;
11,309✔
224
  pCfg->hashPrefix = pCreate->hashPrefix;
11,309✔
225
  pCfg->hashSuffix = pCreate->hashSuffix;
11,309✔
226
  pCfg->tsdbPageSize = pCreate->tsdbPageSize * 1024;
11,309✔
227

228
  pCfg->s3ChunkSize = pCreate->s3ChunkSize;
11,309✔
229
  pCfg->s3KeepLocal = pCreate->s3KeepLocal;
11,309✔
230
  pCfg->s3Compact = pCreate->s3Compact;
11,309✔
231

232
  pCfg->standby = 0;
11,309✔
233
  pCfg->syncCfg.replicaNum = 0;
11,309✔
234
  pCfg->syncCfg.totalReplicaNum = 0;
11,309✔
235
  pCfg->syncCfg.changeVersion = pCreate->changeVersion;
11,309✔
236

237
  memset(&pCfg->syncCfg.nodeInfo, 0, sizeof(pCfg->syncCfg.nodeInfo));
11,309✔
238
  for (int32_t i = 0; i < pCreate->replica; ++i) {
25,647✔
239
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
14,341✔
240
    pNode->nodeId = pCreate->replicas[pCfg->syncCfg.replicaNum].id;
14,341✔
241
    pNode->nodePort = pCreate->replicas[pCfg->syncCfg.replicaNum].port;
14,341✔
242
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
14,341✔
243
    tstrncpy(pNode->nodeFqdn, pCreate->replicas[pCfg->syncCfg.replicaNum].fqdn, TSDB_FQDN_LEN);
14,341✔
244
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
14,341✔
245
    pCfg->syncCfg.replicaNum++;
14,338✔
246
  }
247
  if (pCreate->selfIndex != -1) {
11,306✔
248
    pCfg->syncCfg.myIndex = pCreate->selfIndex;
11,121✔
249
  }
250
  for (int32_t i = pCfg->syncCfg.replicaNum; i < pCreate->replica + pCreate->learnerReplica; ++i) {
11,492✔
251
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
186✔
252
    pNode->nodeId = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].id;
186✔
253
    pNode->nodePort = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].port;
186✔
254
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
186✔
255
    tstrncpy(pNode->nodeFqdn, pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].fqdn, TSDB_FQDN_LEN);
186✔
256
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
186✔
257
    pCfg->syncCfg.totalReplicaNum++;
186✔
258
  }
259
  pCfg->syncCfg.totalReplicaNum += pCfg->syncCfg.replicaNum;
11,306✔
260
  if (pCreate->learnerSelfIndex != -1) {
11,306✔
261
    pCfg->syncCfg.myIndex = pCreate->replica + pCreate->learnerSelfIndex;
186✔
262
  }
263
}
11,306✔
264

265
static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) {
11,308✔
266
  pCfg->vgId = pCreate->vgId;
11,308✔
267
  pCfg->vgVersion = pCreate->vgVersion;
11,308✔
268
  pCfg->dropped = 0;
11,308✔
269
  snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCreate->vgId);
11,308✔
270
}
11,308✔
271

272
static int32_t vmTsmaAdjustDays(SVnodeCfg *pCfg, SCreateVnodeReq *pReq) {
11,307✔
273
  if (pReq->isTsma) {
11,307✔
274
    SMsgHead *smaMsg = pReq->pTsma;
31✔
275
    uint32_t  contLen = (uint32_t)(htonl(smaMsg->contLen) - sizeof(SMsgHead));
31✔
276
    return smaGetTSmaDays(pCfg, POINTER_SHIFT(smaMsg, sizeof(SMsgHead)), contLen, &pCfg->tsdbCfg.days);
31✔
277
  }
278
  return 0;
11,276✔
279
}
280

281
#if 0
282
static int32_t vmTsmaProcessCreate(SVnode *pVnode, SCreateVnodeReq *pReq) {
283
  if (pReq->isTsma) {
284
    SMsgHead *smaMsg = pReq->pTsma;
285
    uint32_t  contLen = (uint32_t)(htonl(smaMsg->contLen) - sizeof(SMsgHead));
286
    return vnodeProcessCreateTSma(pVnode, POINTER_SHIFT(smaMsg, sizeof(SMsgHead)), contLen);
287
  }
288
  return 0;
289
}
290
#endif
291

292
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
11,168✔
293
  SCreateVnodeReq req = {0};
11,168✔
294
  SVnodeCfg       vnodeCfg = {0};
11,168✔
295
  SWrapperCfg     wrapperCfg = {0};
11,168✔
296
  int32_t         code = -1;
11,168✔
297
  char            path[TSDB_FILENAME_LEN] = {0};
11,168✔
298

299
  if (tDeserializeSCreateVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
11,168!
300
    return TSDB_CODE_INVALID_MSG;
×
301
  }
302

303
  if (req.learnerReplica == 0) {
11,215✔
304
    req.learnerSelfIndex = -1;
11,043✔
305
  }
306

307
  dInfo(
11,215!
308
      "vgId:%d, vnode management handle msgType:%s, start to create vnode, page:%d pageSize:%d buffer:%d szPage:%d "
309
      "szBuf:%" PRIu64 ", cacheLast:%d cacheLastSize:%d sstTrigger:%d tsdbPageSize:%d %d dbname:%s dbId:%" PRId64
310
      ", days:%d keep0:%d keep1:%d keep2:%d keepTimeOffset%d s3ChunkSize:%d s3KeepLocal:%d s3Compact:%d tsma:%d "
311
      "precision:%d compression:%d minRows:%d maxRows:%d"
312
      ", wal fsync:%d level:%d retentionPeriod:%d retentionSize:%" PRId64 " rollPeriod:%d segSize:%" PRId64
313
      ", hash method:%d begin:%u end:%u prefix:%d surfix:%d replica:%d selfIndex:%d "
314
      "learnerReplica:%d learnerSelfIndex:%d strict:%d changeVersion:%d encryptAlgorithm:%d",
315
      req.vgId, TMSG_INFO(pMsg->msgType), req.pages, req.pageSize, req.buffer, req.pageSize * 1024,
316
      (uint64_t)req.buffer * 1024 * 1024, req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize,
317
      req.tsdbPageSize * 1024, req.db, req.dbUid, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
318
      req.keepTimeOffset, req.s3ChunkSize, req.s3KeepLocal, req.s3Compact, req.isTsma, req.precision, req.compression,
319
      req.minRows, req.maxRows, req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize,
320
      req.walRollPeriod, req.walSegmentSize, req.hashMethod, req.hashBegin, req.hashEnd, req.hashPrefix, req.hashSuffix,
321
      req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict, req.changeVersion,
322
      req.encryptAlgorithm);
323

324
  for (int32_t i = 0; i < req.replica; ++i) {
25,650✔
325
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port,
14,341!
326
          req.replicas[i].id);
327
  }
328
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
11,495✔
329
    dInfo("vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.vgId, i, req.learnerReplicas[i].fqdn,
186!
330
          req.learnerReplicas[i].port, req.replicas[i].id);
331
  }
332

333
  SReplica *pReplica = NULL;
11,309✔
334
  if (req.selfIndex != -1) {
11,309✔
335
    pReplica = &req.replicas[req.selfIndex];
11,123✔
336
  } else {
337
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
186✔
338
  }
339
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
11,309!
340
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
11,309!
UNCOV
341
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
342
    dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode, reason:%s", req.vgId, pReplica->id,
×
343
           pReplica->fqdn, pReplica->port, tstrerror(code));
344
    return code;
×
345
  }
346

347
  if (req.encryptAlgorithm == DND_CA_SM4) {
11,309✔
348
    if (strlen(tsEncryptKey) == 0) {
1!
349
      code = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
350
      dError("vgId:%d, failed to create vnode since encrypt key is empty, reason:%s", req.vgId, tstrerror(code));
×
351
      return code;
×
352
    }
353
  }
354

355
  vmGenerateVnodeCfg(&req, &vnodeCfg);
11,309✔
356

357
  if ((code = vmTsmaAdjustDays(&vnodeCfg, &req)) < 0) {
11,306!
358
    dError("vgId:%d, failed to adjust tsma days since %s", req.vgId, tstrerror(code));
×
359
    goto _OVER;
×
360
  }
361

362
  vmGenerateWrapperCfg(pMgmt, &req, &wrapperCfg);
11,308✔
363

364
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, req.vgId, false);
11,307✔
365
  if (pVnode != NULL && (req.replica == 1 || !pVnode->failed)) {
11,309!
366
    dError("vgId:%d, already exist", req.vgId);
179!
367
    (void)tFreeSCreateVnodeReq(&req);
179✔
368
    vmReleaseVnode(pMgmt, pVnode);
179✔
369
    code = TSDB_CODE_VND_ALREADY_EXIST;
179✔
370
    return 0;
179✔
371
  }
372

373
  int32_t diskPrimary = vmGetPrimaryDisk(pMgmt, vnodeCfg.vgId);
11,130✔
374
  if (diskPrimary < 0) {
11,125!
375
    diskPrimary = vmAllocPrimaryDisk(pMgmt, vnodeCfg.vgId);
11,125✔
376
  }
377
  wrapperCfg.diskPrimary = diskPrimary;
11,130✔
378

379
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
11,130✔
380

381
  if (vnodeCreate(path, &vnodeCfg, diskPrimary, pMgmt->pTfs) < 0) {
11,130!
382
    dError("vgId:%d, failed to create vnode since %s", req.vgId, terrstr());
×
383
    vmReleaseVnode(pMgmt, pVnode);
×
384
    (void)tFreeSCreateVnodeReq(&req);
×
385
    code = terrno != 0 ? terrno : -1;
×
386
    return code;
×
387
  }
388

389
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, true);
11,130✔
390
  if (pImpl == NULL) {
11,129!
391
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
392
    code = terrno != 0 ? terrno : -1;
×
393
    goto _OVER;
×
394
  }
395

396
  code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl);
11,129✔
397
  if (code != 0) {
11,130!
398
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
399
    code = terrno != 0 ? terrno : code;
×
400
    goto _OVER;
×
401
  }
402

403
#if 0
404
  code = vmTsmaProcessCreate(pImpl, &req);
405
  if (code != 0) {
406
    dError("vgId:%d, failed to create tsma since %s", req.vgId, terrstr());
407
    code = terrno;
408
    goto _OVER;
409
  }
410
#endif
411

412
  code = vnodeStart(pImpl);
11,130✔
413
  if (code != 0) {
11,130!
414
    dError("vgId:%d, failed to start sync since %s", req.vgId, terrstr());
×
415
    goto _OVER;
×
416
  }
417

418
  code = vmWriteVnodeListToFile(pMgmt);
11,130✔
419
  if (code != 0) {
11,130!
420
    code = terrno != 0 ? terrno : code;
×
421
    goto _OVER;
×
422
  }
423

424
_OVER:
11,130✔
425
  if (code != 0) {
11,130!
426
    int32_t r = 0;
×
427
    r = taosThreadRwlockWrlock(&pMgmt->lock);
×
428
    if (r != 0) {
×
429
      dError("vgId:%d, failed to lock since %s", req.vgId, tstrerror(r));
×
430
    }
431
    if (r == 0) {
×
432
      dInfo("vgId:%d, remove from hash", req.vgId);
×
433
      r = taosHashRemove(pMgmt->hash, &req.vgId, sizeof(int32_t));
×
434
      if (r != 0) {
×
435
        dError("vgId:%d, failed to remove vnode since %s", req.vgId, tstrerror(r));
×
436
      }
437
    }
438
    r = taosThreadRwlockUnlock(&pMgmt->lock);
×
439
    if (r != 0) {
×
440
      dError("vgId:%d, failed to unlock since %s", req.vgId, tstrerror(r));
×
441
    }
442
    vnodeClose(pImpl);
×
443
    vnodeDestroy(0, path, pMgmt->pTfs, 0);
×
444
  } else {
445
    dInfo("vgId:%d, vnode management handle msgType:%s, end to create vnode, vnode is created", req.vgId,
11,130!
446
          TMSG_INFO(pMsg->msgType));
447
  }
448

449
  (void)tFreeSCreateVnodeReq(&req);
11,130✔
450
  terrno = code;
11,130✔
451
  return code;
11,130✔
452
}
453

454
// alter replica doesn't use this, but restore dnode still use this
455
int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
4,318✔
456
  SAlterVnodeTypeReq req = {0};
4,318✔
457
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
4,318!
458
    terrno = TSDB_CODE_INVALID_MSG;
×
459
    return -1;
×
460
  }
461

462
  if (req.learnerReplicas == 0) {
463
    req.learnerSelfIndex = -1;
464
  }
465

466
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process alter-node-type-request", req.vgId,
4,318!
467
        TMSG_INFO(pMsg->msgType));
468

469
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
4,318✔
470
  if (pVnode == NULL) {
4,318!
471
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
472
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
473
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
474
    return -1;
×
475
  }
476

477
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
4,318✔
478
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
4,318!
479
  if (role == TAOS_SYNC_ROLE_VOTER) {
4,318!
480
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
481
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
482
    vmReleaseVnode(pMgmt, pVnode);
×
483
    return -1;
×
484
  }
485

486
  dInfo("vgId:%d, checking node catch up", req.vgId);
4,318!
487
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
4,318✔
488
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
4,133✔
489
    vmReleaseVnode(pMgmt, pVnode);
4,133✔
490
    return -1;
4,133✔
491
  }
492

493
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
185!
494

495
  int32_t vgId = req.vgId;
185✔
496
  dInfo("vgId:%d, start to alter vnode type replica:%d selfIndex:%d strict:%d changeVersion:%d", vgId, req.replica,
185!
497
        req.selfIndex, req.strict, req.changeVersion);
498
  for (int32_t i = 0; i < req.replica; ++i) {
677✔
499
    SReplica *pReplica = &req.replicas[i];
492✔
500
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
492!
501
  }
502
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
185!
503
    SReplica *pReplica = &req.learnerReplicas[i];
×
504
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
×
505
  }
506

507
  if (req.replica <= 0 || (req.selfIndex < 0 && req.learnerSelfIndex < 0) || req.selfIndex >= req.replica ||
185!
508
      req.learnerSelfIndex >= req.learnerReplica) {
185!
509
    terrno = TSDB_CODE_INVALID_MSG;
×
510
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
511
    vmReleaseVnode(pMgmt, pVnode);
×
512
    return -1;
×
513
  }
514

515
  SReplica *pReplica = NULL;
185✔
516
  if (req.selfIndex != -1) {
185!
517
    pReplica = &req.replicas[req.selfIndex];
185✔
518
  } else {
519
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
×
520
  }
521

522
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
185!
523
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
185!
524
    terrno = TSDB_CODE_INVALID_MSG;
×
525
    dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode", vgId, pReplica->id, pReplica->fqdn,
×
526
           pReplica->port);
527
    vmReleaseVnode(pMgmt, pVnode);
×
528
    return -1;
×
529
  }
530

531
  dInfo("vgId:%d, start to close vnode", vgId);
185!
532
  SWrapperCfg wrapperCfg = {
185✔
533
      .dropped = pVnode->dropped,
185✔
534
      .vgId = pVnode->vgId,
185✔
535
      .vgVersion = pVnode->vgVersion,
185✔
536
      .diskPrimary = pVnode->diskPrimary,
185✔
537
  };
538
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
185✔
539

540
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
185✔
541
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
185✔
542

543
  int32_t diskPrimary = wrapperCfg.diskPrimary;
185✔
544
  char    path[TSDB_FILENAME_LEN] = {0};
185✔
545
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
185✔
546

547
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
185!
548
  if (vnodeAlterReplica(path, &req, diskPrimary, pMgmt->pTfs) < 0) {
185!
549
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
550
    return -1;
×
551
  }
552

553
  dInfo("vgId:%d, begin to open vnode", vgId);
185!
554
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, false);
185✔
555
  if (pImpl == NULL) {
185!
556
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
557
    return -1;
×
558
  }
559

560
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
185!
561
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
562
    return -1;
×
563
  }
564

565
  if (vnodeStart(pImpl) != 0) {
185!
566
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
567
    return -1;
×
568
  }
569

570
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process alter-node-type-request, vnode config is altered",
185!
571
        req.vgId, TMSG_INFO(pMsg->msgType));
572
  return 0;
185✔
573
}
574

575
int32_t vmProcessCheckLearnCatchupReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
576
  SCheckLearnCatchupReq req = {0};
×
577
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
×
578
    terrno = TSDB_CODE_INVALID_MSG;
×
579
    return -1;
×
580
  }
581

582
  if (req.learnerReplicas == 0) {
583
    req.learnerSelfIndex = -1;
584
  }
585

586
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process check-learner-catchup-request", req.vgId,
×
587
        TMSG_INFO(pMsg->msgType));
588

589
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
×
590
  if (pVnode == NULL) {
×
591
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
592
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
593
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
594
    return -1;
×
595
  }
596

597
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
×
598
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
×
599
  if (role == TAOS_SYNC_ROLE_VOTER) {
×
600
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
601
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
602
    vmReleaseVnode(pMgmt, pVnode);
×
603
    return -1;
×
604
  }
605

606
  dInfo("vgId:%d, checking node catch up", req.vgId);
×
607
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
×
608
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
×
609
    vmReleaseVnode(pMgmt, pVnode);
×
610
    return -1;
×
611
  }
612

613
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
×
614

615
  vmReleaseVnode(pMgmt, pVnode);
×
616

617
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process check-learner-catchup-request", req.vgId,
×
618
        TMSG_INFO(pMsg->msgType));
619

620
  return 0;
×
621
}
622

623
int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
86✔
624
  SDisableVnodeWriteReq req = {0};
86✔
625
  if (tDeserializeSDisableVnodeWriteReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
86!
626
    terrno = TSDB_CODE_INVALID_MSG;
×
627
    return -1;
×
628
  }
629

630
  dInfo("vgId:%d, vnode write disable:%d", req.vgId, req.disable);
86!
631

632
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
86✔
633
  if (pVnode == NULL) {
86!
634
    dError("vgId:%d, failed to disable write since %s", req.vgId, terrstr());
×
635
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
636
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
637
    return -1;
×
638
  }
639

640
  pVnode->disable = req.disable;
86✔
641
  vmReleaseVnode(pMgmt, pVnode);
86✔
642
  return 0;
86✔
643
}
644

645
int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
86✔
646
  SAlterVnodeHashRangeReq req = {0};
86✔
647
  if (tDeserializeSAlterVnodeHashRangeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
86!
648
    terrno = TSDB_CODE_INVALID_MSG;
×
649
    return -1;
×
650
  }
651

652
  int32_t srcVgId = req.srcVgId;
86✔
653
  int32_t dstVgId = req.dstVgId;
86✔
654

655
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, dstVgId);
86✔
656
  if (pVnode != NULL) {
86!
657
    dError("vgId:%d, vnode already exist", dstVgId);
×
658
    vmReleaseVnode(pMgmt, pVnode);
×
659
    terrno = TSDB_CODE_VND_ALREADY_EXIST;
×
660
    return -1;
×
661
  }
662

663
  dInfo("vgId:%d, start to alter vnode hashrange:[%u, %u], dstVgId:%d", req.srcVgId, req.hashBegin, req.hashEnd,
86!
664
        req.dstVgId);
665
  pVnode = vmAcquireVnode(pMgmt, srcVgId);
86✔
666
  if (pVnode == NULL) {
86!
667
    dError("vgId:%d, failed to alter hashrange since %s", srcVgId, terrstr());
×
668
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
669
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
670
    return -1;
×
671
  }
672

673
  SWrapperCfg wrapperCfg = {
86✔
674
      .dropped = pVnode->dropped,
86✔
675
      .vgId = dstVgId,
676
      .vgVersion = pVnode->vgVersion,
86✔
677
      .diskPrimary = pVnode->diskPrimary,
86✔
678
  };
679
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
86✔
680

681
  // prepare alter
682
  pVnode->toVgId = dstVgId;
86✔
683
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
86!
684
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
685
    return -1;
×
686
  }
687

688
  dInfo("vgId:%d, close vnode", srcVgId);
86!
689
  vmCloseVnode(pMgmt, pVnode, true, false);
86✔
690

691
  int32_t diskPrimary = wrapperCfg.diskPrimary;
86✔
692
  char    srcPath[TSDB_FILENAME_LEN] = {0};
86✔
693
  char    dstPath[TSDB_FILENAME_LEN] = {0};
86✔
694
  snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
86✔
695
  snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
86✔
696

697
  dInfo("vgId:%d, alter vnode hashrange at %s", srcVgId, srcPath);
86!
698
  if (vnodeAlterHashRange(srcPath, dstPath, &req, diskPrimary, pMgmt->pTfs) < 0) {
86!
699
    dError("vgId:%d, failed to alter vnode hashrange since %s", srcVgId, terrstr());
×
700
    return -1;
×
701
  }
702

703
  dInfo("vgId:%d, open vnode", dstVgId);
86!
704
  SVnode *pImpl = vnodeOpen(dstPath, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, false);
86✔
705

706
  if (pImpl == NULL) {
86!
707
    dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr());
×
708
    return -1;
×
709
  }
710

711
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
86!
712
    dError("vgId:%d, failed to open vnode mgmt since %s", dstVgId, terrstr());
×
713
    return -1;
×
714
  }
715

716
  if (vnodeStart(pImpl) != 0) {
86!
717
    dError("vgId:%d, failed to start sync since %s", dstVgId, terrstr());
×
718
    return -1;
×
719
  }
720

721
  // complete alter
722
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
86!
723
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
724
    return -1;
×
725
  }
726

727
  dInfo("vgId:%d, vnode hashrange is altered", dstVgId);
86!
728
  return 0;
86✔
729
}
730

731
int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
1,069✔
732
  SAlterVnodeReplicaReq alterReq = {0};
1,069✔
733
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
1,069!
734
    terrno = TSDB_CODE_INVALID_MSG;
×
735
    return -1;
×
736
  }
737

738
  if (alterReq.learnerReplica == 0) {
1,069✔
739
    alterReq.learnerSelfIndex = -1;
780✔
740
  }
741

742
  int32_t vgId = alterReq.vgId;
1,069✔
743
  dInfo(
1,069!
744
      "vgId:%d, vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
745
      "learnerSelfIndex:%d strict:%d changeVersion:%d",
746
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
747
      alterReq.learnerSelfIndex, alterReq.strict, alterReq.changeVersion);
748

749
  for (int32_t i = 0; i < alterReq.replica; ++i) {
3,942✔
750
    SReplica *pReplica = &alterReq.replicas[i];
2,873✔
751
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
2,873!
752
  }
753
  for (int32_t i = 0; i < alterReq.learnerReplica; ++i) {
1,358✔
754
    SReplica *pReplica = &alterReq.learnerReplicas[i];
289✔
755
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
289!
756
  }
757

758
  if (alterReq.replica <= 0 || (alterReq.selfIndex < 0 && alterReq.learnerSelfIndex < 0) ||
1,069!
759
      alterReq.selfIndex >= alterReq.replica || alterReq.learnerSelfIndex >= alterReq.learnerReplica) {
1,069!
760
    terrno = TSDB_CODE_INVALID_MSG;
×
761
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
762
    return -1;
×
763
  }
764

765
  SReplica *pReplica = NULL;
1,069✔
766
  if (alterReq.selfIndex != -1) {
1,069!
767
    pReplica = &alterReq.replicas[alterReq.selfIndex];
1,069✔
768
  } else {
769
    pReplica = &alterReq.learnerReplicas[alterReq.learnerSelfIndex];
×
770
  }
771

772
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
1,069!
773
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
1,069!
774
    terrno = TSDB_CODE_INVALID_MSG;
×
775
    dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode", vgId, pReplica->id, pReplica->fqdn,
×
776
           pReplica->port);
777
    return -1;
×
778
  }
779

780
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
1,069✔
781
  if (pVnode == NULL) {
1,069!
782
    dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
×
783
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
784
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
785
    return -1;
×
786
  }
787

788
  dInfo("vgId:%d, start to close vnode", vgId);
1,069!
789
  SWrapperCfg wrapperCfg = {
1,069✔
790
      .dropped = pVnode->dropped,
1,069✔
791
      .vgId = pVnode->vgId,
1,069✔
792
      .vgVersion = pVnode->vgVersion,
1,069✔
793
      .diskPrimary = pVnode->diskPrimary,
1,069✔
794
  };
795
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
1,069✔
796

797
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
1,069✔
798
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
1,069✔
799

800
  int32_t diskPrimary = wrapperCfg.diskPrimary;
1,069✔
801
  char    path[TSDB_FILENAME_LEN] = {0};
1,069✔
802
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
1,069✔
803

804
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
1,069!
805
  if (vnodeAlterReplica(path, &alterReq, diskPrimary, pMgmt->pTfs) < 0) {
1,069!
806
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
807
    return -1;
×
808
  }
809

810
  dInfo("vgId:%d, begin to open vnode", vgId);
1,069!
811
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, false);
1,069✔
812
  if (pImpl == NULL) {
1,069!
813
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
814
    return -1;
×
815
  }
816

817
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
1,069!
818
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
819
    return -1;
×
820
  }
821

822
  if (vnodeStart(pImpl) != 0) {
1,069!
823
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
824
    return -1;
×
825
  }
826

827
  dInfo(
1,069!
828
      "vgId:%d, vnode management handle msgType:%s, end to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
829
      "learnerSelfIndex:%d strict:%d",
830
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
831
      alterReq.learnerSelfIndex, alterReq.strict);
832
  return 0;
1,069✔
833
}
834

835
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
5,215✔
836
  int32_t       code = 0;
5,215✔
837
  SDropVnodeReq dropReq = {0};
5,215✔
838
  if (tDeserializeSDropVnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
5,215!
839
    terrno = TSDB_CODE_INVALID_MSG;
×
840
    return terrno;
×
841
  }
842

843
  int32_t vgId = dropReq.vgId;
5,215✔
844
  dInfo("vgId:%d, start to drop vnode", vgId);
5,215!
845

846
  if (dropReq.dnodeId != pMgmt->pData->dnodeId) {
5,215!
847
    terrno = TSDB_CODE_INVALID_MSG;
×
848
    dError("vgId:%d, dnodeId:%d not matched with local dnode", dropReq.vgId, dropReq.dnodeId);
×
849
    return terrno;
×
850
  }
851

852
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, vgId, false);
5,215✔
853
  if (pVnode == NULL) {
5,215!
854
    dInfo("vgId:%d, failed to drop since %s", vgId, terrstr());
×
855
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
856
    return terrno;
×
857
  }
858

859
  pVnode->dropped = 1;
5,215✔
860
  if ((code = vmWriteVnodeListToFile(pMgmt)) != 0) {
5,215!
861
    pVnode->dropped = 0;
×
862
    vmReleaseVnode(pMgmt, pVnode);
×
863
    return code;
×
864
  }
865

866
  vmCloseVnode(pMgmt, pVnode, false, false);
5,215✔
867
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
5,215!
868
    dError("vgId:%d, failed to write vnode list since %s", vgId, terrstr());
×
869
  }
870

871
  dInfo("vgId:%d, is dropped", vgId);
5,215!
872
  return 0;
5,215✔
873
}
874

875
int32_t vmProcessArbHeartBeatReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
7✔
876
  SVArbHeartBeatReq arbHbReq = {0};
7✔
877
  SVArbHeartBeatRsp arbHbRsp = {0};
7✔
878
  if (tDeserializeSVArbHeartBeatReq(pMsg->pCont, pMsg->contLen, &arbHbReq) != 0) {
7!
879
    terrno = TSDB_CODE_INVALID_MSG;
×
880
    return -1;
×
881
  }
882

883
  if (arbHbReq.dnodeId != pMgmt->pData->dnodeId) {
7!
884
    terrno = TSDB_CODE_INVALID_MSG;
×
885
    dError("dnodeId:%d not matched with local dnode", arbHbReq.dnodeId);
×
886
    goto _OVER;
×
887
  }
888

889
  if (strlen(arbHbReq.arbToken) == 0) {
7!
890
    terrno = TSDB_CODE_INVALID_MSG;
×
891
    dError("dnodeId:%d arbToken is empty", arbHbReq.dnodeId);
×
892
    goto _OVER;
×
893
  }
894

895
  size_t size = taosArrayGetSize(arbHbReq.hbMembers);
7✔
896

897
  arbHbRsp.dnodeId = pMgmt->pData->dnodeId;
7✔
898
  strncpy(arbHbRsp.arbToken, arbHbReq.arbToken, TSDB_ARB_TOKEN_SIZE);
7✔
899
  arbHbRsp.hbMembers = taosArrayInit(size, sizeof(SVArbHbRspMember));
7✔
900
  if (arbHbRsp.hbMembers == NULL) {
7!
901
    goto _OVER;
×
902
  }
903

904
  for (int32_t i = 0; i < size; i++) {
19✔
905
    SVArbHbReqMember *pReqMember = taosArrayGet(arbHbReq.hbMembers, i);
12✔
906
    SVnodeObj        *pVnode = vmAcquireVnode(pMgmt, pReqMember->vgId);
12✔
907
    if (pVnode == NULL) {
12!
908
      dError("dnodeId:%d vgId:%d not found failed to process arb hb req", arbHbReq.dnodeId, pReqMember->vgId);
×
909
      continue;
×
910
    }
911

912
    SVArbHbRspMember rspMember = {0};
12✔
913
    rspMember.vgId = pReqMember->vgId;
12✔
914
    rspMember.hbSeq = pReqMember->hbSeq;
12✔
915
    if (vnodeGetArbToken(pVnode->pImpl, rspMember.memberToken) != 0) {
12!
916
      dError("dnodeId:%d vgId:%d failed to get arb token", arbHbReq.dnodeId, pReqMember->vgId);
×
917
      vmReleaseVnode(pMgmt, pVnode);
×
918
      continue;
×
919
    }
920

921
    if (vnodeUpdateArbTerm(pVnode->pImpl, arbHbReq.arbTerm) != 0) {
12!
922
      dError("dnodeId:%d vgId:%d failed to update arb term", arbHbReq.dnodeId, pReqMember->vgId);
×
923
      vmReleaseVnode(pMgmt, pVnode);
×
924
      continue;
×
925
    }
926

927
    if (taosArrayPush(arbHbRsp.hbMembers, &rspMember) == NULL) {
24!
928
      dError("dnodeId:%d vgId:%d failed to push arb hb rsp member", arbHbReq.dnodeId, pReqMember->vgId);
×
929
      vmReleaseVnode(pMgmt, pVnode);
×
930
      goto _OVER;
×
931
    }
932

933
    vmReleaseVnode(pMgmt, pVnode);
12✔
934
  }
935

936
  SRpcMsg rspMsg = {.info = pMsg->info};
7✔
937
  int32_t rspLen = tSerializeSVArbHeartBeatRsp(NULL, 0, &arbHbRsp);
7✔
938
  if (rspLen < 0) {
7!
939
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
940
    goto _OVER;
×
941
  }
942

943
  void *pRsp = rpcMallocCont(rspLen);
7✔
944
  if (pRsp == NULL) {
7!
945
    terrno = terrno;
×
946
    goto _OVER;
×
947
  }
948

949
  if (tSerializeSVArbHeartBeatRsp(pRsp, rspLen, &arbHbRsp) <= 0) {
7!
950
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
951
    rpcFreeCont(pRsp);
×
952
    goto _OVER;
×
953
  }
954
  pMsg->info.rsp = pRsp;
7✔
955
  pMsg->info.rspLen = rspLen;
7✔
956

957
  terrno = TSDB_CODE_SUCCESS;
7✔
958

959
_OVER:
7✔
960
  tFreeSVArbHeartBeatReq(&arbHbReq);
7✔
961
  tFreeSVArbHeartBeatRsp(&arbHbRsp);
7✔
962
  return terrno;
7✔
963
}
964

965
SArray *vmGetMsgHandles() {
2,405✔
966
  int32_t code = -1;
2,405✔
967
  SArray *pArray = taosArrayInit(32, sizeof(SMgmtHandle));
2,405✔
968
  if (pArray == NULL) goto _OVER;
2,405!
969

970
  if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
971
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,405!
972
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,405!
973
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,405!
974
  if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,405!
975
  if (dmSetMgmtHandle(pArray, TDMT_VND_EXEC_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,405!
976
  if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,405!
977
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,405!
978
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
979
  if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
980
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,405!
981
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_CFG, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,405!
982
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,405!
983
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,405!
984
  if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,405!
985
  if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,405!
986
  if (dmSetMgmtHandle(pArray, TDMT_SCH_TASK_NOTIFY, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,405!
987
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
988
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
989
  if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_TTL_EXPIRED_TBS, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
990
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
991
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
992
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
993
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
994
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_SMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
995
  if (dmSetMgmtHandle(pArray, TDMT_VND_CANCEL_SMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
996
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
997
  if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
998
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
999
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DELETE_SUB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
1000
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
1001
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SEEK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,405!
1002
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_ADD_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
1003
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DEL_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
1004
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,405!
1005
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME_PUSH, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
2,405!
1006
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_WALINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,405!
1007
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_COMMITTEDINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,405!
1008
  if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
1009
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
1010
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
1011
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,405!
1012
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
1013
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
1014
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_COMPACT_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,405!
1015
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
1016
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_NAME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
2,405!
1017

1018
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
1019
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
1020
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,405!
1021
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,405!
1022
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,405!
1023
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,405!
1024
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,405!
1025
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,405!
1026
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,405!
1027
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
1028
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
1029
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
1030
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
1031
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,405!
1032
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,405!
1033
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,405!
1034
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,405!
1035
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
1036
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
1037
  if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,405!
1038
  if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,405!
1039
  if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,405!
1040
  if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
2,405!
1041

1042
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
1043
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_CONSEN_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
1044

1045
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,405!
1046
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
1047
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
1048
  if (dmSetMgmtHandle(pArray, TDMT_VND_DISABLE_WRITE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,405!
1049
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,405!
1050
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
1051
  if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
1052
  if (dmSetMgmtHandle(pArray, TDMT_VND_S3MIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
1053
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
2,405!
1054
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,405!
1055
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,405!
1056
  if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,405!
1057
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CONFIG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
1058

1059
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT_ELECTION, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,405!
1060
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,405!
1061
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,405!
1062
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,405!
1063
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,405!
1064
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,405!
1065
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,405!
1066
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,405!
1067
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,405!
1068
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,405!
1069
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,405!
1070
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_FORCE_FOLLOWER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,405!
1071

1072
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
2,405!
1073
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
2,405!
1074
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
2,405!
1075
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
2,405!
1076
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
2,405!
1077

1078
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_HEARTBEAT, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
2,405!
1079
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_CHECK_SYNC, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
2,405!
1080
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_ASSIGNED_LEADER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
2,405!
1081

1082
  code = 0;
2,405✔
1083

1084
_OVER:
2,405✔
1085
  if (code != 0) {
2,405!
1086
    taosArrayDestroy(pArray);
×
1087
    return NULL;
×
1088
  } else {
1089
    return pArray;
2,405✔
1090
  }
1091
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc