• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3621

22 Feb 2025 11:44AM UTC coverage: 2.037% (-61.5%) from 63.573%
#3621

push

travis-ci

web-flow
Merge pull request #29874 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

4357 of 287032 branches covered (1.52%)

Branch coverage included in aggregate %.

0 of 174 new or added lines in 18 files covered. (0.0%)

213359 existing lines in 469 files now uncovered.

7260 of 283369 relevant lines covered (2.56%)

23737.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "taos_monitor.h"
18
#include "vmInt.h"
19

20
extern taos_counter_t *tsInsertCounter;
21

UNCOV
22
void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
×
UNCOV
23
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad));
×
UNCOV
24
  if (pInfo->pVloads == NULL) return;
×
25

UNCOV
26
  tfsUpdateSize(pMgmt->pTfs);
×
27

UNCOV
28
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
×
29

UNCOV
30
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
UNCOV
31
  while (pIter) {
×
UNCOV
32
    SVnodeObj **ppVnode = pIter;
×
UNCOV
33
    if (ppVnode == NULL || *ppVnode == NULL) continue;
×
34

UNCOV
35
    SVnodeObj *pVnode = *ppVnode;
×
UNCOV
36
    SVnodeLoad vload = {.vgId = pVnode->vgId};
×
UNCOV
37
    if (!pVnode->failed) {
×
UNCOV
38
      if (vnodeGetLoad(pVnode->pImpl, &vload) != 0) {
×
39
        dError("failed to get vnode load");
×
40
      }
UNCOV
41
      if (isReset) vnodeResetLoad(pVnode->pImpl, &vload);
×
42
    }
UNCOV
43
    if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
×
44
      dError("failed to push vnode load");
×
45
    }
UNCOV
46
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
47
  }
48

UNCOV
49
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
×
50
}
51

52
void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
×
53
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoadLite));
×
54
  if (!pInfo->pVloads) return;
×
55

56
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
×
57

58
  void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
×
59
  while (pIter) {
×
60
    SVnodeObj **ppVnode = pIter;
×
61
    if (ppVnode == NULL || *ppVnode == NULL) continue;
×
62

63
    SVnodeObj *pVnode = *ppVnode;
×
64
    if (!pVnode->failed) {
×
65
      SVnodeLoadLite vload = {0};
×
66
      if (vnodeGetLoadLite(pVnode->pImpl, &vload) == 0) {
×
67
        if (taosArrayPush(pInfo->pVloads, &vload) == NULL) {
×
68
          taosArrayDestroy(pInfo->pVloads);
×
69
          pInfo->pVloads = NULL;
×
70
          break;
×
71
        }
72
      }
73
    }
74
    pIter = taosHashIterate(pMgmt->runngingHash, pIter);
×
75
  }
76

77
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
×
78
}
79

UNCOV
80
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
×
UNCOV
81
  SMonVloadInfo vloads = {0};
×
UNCOV
82
  vmGetVnodeLoads(pMgmt, &vloads, true);
×
83

UNCOV
84
  SArray *pVloads = vloads.pVloads;
×
UNCOV
85
  if (pVloads == NULL) return;
×
86

UNCOV
87
  int32_t totalVnodes = 0;
×
UNCOV
88
  int32_t masterNum = 0;
×
UNCOV
89
  int64_t numOfSelectReqs = 0;
×
UNCOV
90
  int64_t numOfInsertReqs = 0;
×
UNCOV
91
  int64_t numOfInsertSuccessReqs = 0;
×
UNCOV
92
  int64_t numOfBatchInsertReqs = 0;
×
UNCOV
93
  int64_t numOfBatchInsertSuccessReqs = 0;
×
94

UNCOV
95
  for (int32_t i = 0; i < taosArrayGetSize(pVloads); ++i) {
×
UNCOV
96
    SVnodeLoad *pLoad = taosArrayGet(pVloads, i);
×
UNCOV
97
    numOfSelectReqs += pLoad->numOfSelectReqs;
×
UNCOV
98
    numOfInsertReqs += pLoad->numOfInsertReqs;
×
UNCOV
99
    numOfInsertSuccessReqs += pLoad->numOfInsertSuccessReqs;
×
UNCOV
100
    numOfBatchInsertReqs += pLoad->numOfBatchInsertReqs;
×
UNCOV
101
    numOfBatchInsertSuccessReqs += pLoad->numOfBatchInsertSuccessReqs;
×
UNCOV
102
    if (pLoad->syncState == TAOS_SYNC_STATE_LEADER || pLoad->syncState == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
×
UNCOV
103
      masterNum++;
×
104
    }
UNCOV
105
    totalVnodes++;
×
106
  }
107

UNCOV
108
  pInfo->vstat.totalVnodes = totalVnodes;
×
UNCOV
109
  pInfo->vstat.masterNum = masterNum;
×
UNCOV
110
  pInfo->vstat.numOfSelectReqs = numOfSelectReqs;
×
UNCOV
111
  pInfo->vstat.numOfInsertReqs = numOfInsertReqs;                          // delta
×
UNCOV
112
  pInfo->vstat.numOfInsertSuccessReqs = numOfInsertSuccessReqs;            // delta
×
UNCOV
113
  pInfo->vstat.numOfBatchInsertReqs = numOfBatchInsertReqs;                // delta
×
UNCOV
114
  pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;  // delta
×
UNCOV
115
  pMgmt->state.totalVnodes = totalVnodes;
×
UNCOV
116
  pMgmt->state.masterNum = masterNum;
×
UNCOV
117
  pMgmt->state.numOfSelectReqs = numOfSelectReqs;
×
UNCOV
118
  pMgmt->state.numOfInsertReqs = numOfInsertReqs;
×
UNCOV
119
  pMgmt->state.numOfInsertSuccessReqs = numOfInsertSuccessReqs;
×
UNCOV
120
  pMgmt->state.numOfBatchInsertReqs = numOfBatchInsertReqs;
×
UNCOV
121
  pMgmt->state.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;
×
122

UNCOV
123
  if (tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs) != 0) {
×
124
    dError("failed to get tfs monitor info");
×
125
  }
UNCOV
126
  taosArrayDestroy(pVloads);
×
127
}
128

UNCOV
129
void vmCleanExpriedSamples(SVnodeMgmt *pMgmt) {
×
UNCOV
130
  int list_size = taos_counter_get_keys_size(tsInsertCounter);
×
UNCOV
131
  if (list_size == 0) return;
×
132
  int32_t *vgroup_ids;
133
  char   **keys;
134
  int      r = 0;
×
135
  r = taos_counter_get_vgroup_ids(tsInsertCounter, &keys, &vgroup_ids, &list_size);
×
136
  if (r) {
×
137
    dError("failed to get vgroup ids");
×
138
    return;
×
139
  }
140
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
×
141
  for (int i = 0; i < list_size; i++) {
×
142
    int32_t vgroup_id = vgroup_ids[i];
×
143
    void   *vnode = taosHashGet(pMgmt->runngingHash, &vgroup_id, sizeof(int32_t));
×
144
    if (vnode == NULL) {
×
145
      r = taos_counter_delete(tsInsertCounter, keys[i]);
×
146
      if (r) {
×
147
        dError("failed to delete monitor sample key:%s", keys[i]);
×
148
      }
149
    }
150
  }
151
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
×
152
  if (vgroup_ids) taosMemoryFree(vgroup_ids);
×
153
  if (keys) taosMemoryFree(keys);
×
154
  return;
×
155
}
156

UNCOV
157
static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
×
UNCOV
158
  memcpy(pCfg, &vnodeCfgDefault, sizeof(SVnodeCfg));
×
159

UNCOV
160
  pCfg->vgId = pCreate->vgId;
×
UNCOV
161
  tstrncpy(pCfg->dbname, pCreate->db, sizeof(pCfg->dbname));
×
UNCOV
162
  pCfg->dbId = pCreate->dbUid;
×
UNCOV
163
  pCfg->szPage = pCreate->pageSize * 1024;
×
UNCOV
164
  pCfg->szCache = pCreate->pages;
×
UNCOV
165
  pCfg->cacheLast = pCreate->cacheLast;
×
UNCOV
166
  pCfg->cacheLastSize = pCreate->cacheLastSize;
×
UNCOV
167
  pCfg->szBuf = (uint64_t)pCreate->buffer * 1024 * 1024;
×
UNCOV
168
  pCfg->isWeak = true;
×
UNCOV
169
  pCfg->isTsma = pCreate->isTsma;
×
UNCOV
170
  pCfg->tsdbCfg.compression = pCreate->compression;
×
UNCOV
171
  pCfg->tsdbCfg.precision = pCreate->precision;
×
UNCOV
172
  pCfg->tsdbCfg.days = pCreate->daysPerFile;
×
UNCOV
173
  pCfg->tsdbCfg.keep0 = pCreate->daysToKeep0;
×
UNCOV
174
  pCfg->tsdbCfg.keep1 = pCreate->daysToKeep1;
×
UNCOV
175
  pCfg->tsdbCfg.keep2 = pCreate->daysToKeep2;
×
UNCOV
176
  pCfg->tsdbCfg.keepTimeOffset = pCreate->keepTimeOffset;
×
UNCOV
177
  pCfg->tsdbCfg.minRows = pCreate->minRows;
×
UNCOV
178
  pCfg->tsdbCfg.maxRows = pCreate->maxRows;
×
UNCOV
179
  for (size_t i = 0; i < taosArrayGetSize(pCreate->pRetensions); ++i) {
×
UNCOV
180
    SRetention *pRetention = &pCfg->tsdbCfg.retentions[i];
×
UNCOV
181
    memcpy(pRetention, taosArrayGet(pCreate->pRetensions, i), sizeof(SRetention));
×
UNCOV
182
    if (i == 0) {
×
UNCOV
183
      if ((pRetention->freq >= 0 && pRetention->keep > 0)) pCfg->isRsma = 1;
×
184
    }
185
  }
186
#if defined(TD_ENTERPRISE)
UNCOV
187
  pCfg->tsdbCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
×
UNCOV
188
  if (pCfg->tsdbCfg.encryptAlgorithm == DND_CA_SM4) {
×
189
    tstrncpy(pCfg->tsdbCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
×
190
  }
191
#else
192
  pCfg->tsdbCfg.encryptAlgorithm = 0;
193
#endif
194

UNCOV
195
  pCfg->walCfg.vgId = pCreate->vgId;
×
UNCOV
196
  pCfg->walCfg.fsyncPeriod = pCreate->walFsyncPeriod;
×
UNCOV
197
  pCfg->walCfg.retentionPeriod = pCreate->walRetentionPeriod;
×
UNCOV
198
  pCfg->walCfg.rollPeriod = pCreate->walRollPeriod;
×
UNCOV
199
  pCfg->walCfg.retentionSize = pCreate->walRetentionSize;
×
UNCOV
200
  pCfg->walCfg.segSize = pCreate->walSegmentSize;
×
UNCOV
201
  pCfg->walCfg.level = pCreate->walLevel;
×
202
#if defined(TD_ENTERPRISE)
UNCOV
203
  pCfg->walCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
×
UNCOV
204
  if (pCfg->walCfg.encryptAlgorithm == DND_CA_SM4) {
×
205
    tstrncpy(pCfg->walCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
×
206
  }
207
#else
208
  pCfg->walCfg.encryptAlgorithm = 0;
209
#endif
210

211
#if defined(TD_ENTERPRISE)
UNCOV
212
  pCfg->tdbEncryptAlgorithm = pCreate->encryptAlgorithm;
×
UNCOV
213
  if (pCfg->tdbEncryptAlgorithm == DND_CA_SM4) {
×
214
    tstrncpy(pCfg->tdbEncryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
×
215
  }
216
#else
217
  pCfg->tdbEncryptAlgorithm = 0;
218
#endif
219

UNCOV
220
  pCfg->sttTrigger = pCreate->sstTrigger;
×
UNCOV
221
  pCfg->hashBegin = pCreate->hashBegin;
×
UNCOV
222
  pCfg->hashEnd = pCreate->hashEnd;
×
UNCOV
223
  pCfg->hashMethod = pCreate->hashMethod;
×
UNCOV
224
  pCfg->hashPrefix = pCreate->hashPrefix;
×
UNCOV
225
  pCfg->hashSuffix = pCreate->hashSuffix;
×
UNCOV
226
  pCfg->tsdbPageSize = pCreate->tsdbPageSize * 1024;
×
227

UNCOV
228
  pCfg->s3ChunkSize = pCreate->s3ChunkSize;
×
UNCOV
229
  pCfg->s3KeepLocal = pCreate->s3KeepLocal;
×
UNCOV
230
  pCfg->s3Compact = pCreate->s3Compact;
×
231

UNCOV
232
  pCfg->standby = 0;
×
UNCOV
233
  pCfg->syncCfg.replicaNum = 0;
×
UNCOV
234
  pCfg->syncCfg.totalReplicaNum = 0;
×
UNCOV
235
  pCfg->syncCfg.changeVersion = pCreate->changeVersion;
×
236

UNCOV
237
  memset(&pCfg->syncCfg.nodeInfo, 0, sizeof(pCfg->syncCfg.nodeInfo));
×
UNCOV
238
  for (int32_t i = 0; i < pCreate->replica; ++i) {
×
UNCOV
239
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
×
UNCOV
240
    pNode->nodeId = pCreate->replicas[pCfg->syncCfg.replicaNum].id;
×
UNCOV
241
    pNode->nodePort = pCreate->replicas[pCfg->syncCfg.replicaNum].port;
×
UNCOV
242
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
×
UNCOV
243
    tstrncpy(pNode->nodeFqdn, pCreate->replicas[pCfg->syncCfg.replicaNum].fqdn, TSDB_FQDN_LEN);
×
UNCOV
244
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
×
UNCOV
245
    pCfg->syncCfg.replicaNum++;
×
246
  }
UNCOV
247
  if (pCreate->selfIndex != -1) {
×
UNCOV
248
    pCfg->syncCfg.myIndex = pCreate->selfIndex;
×
249
  }
UNCOV
250
  for (int32_t i = pCfg->syncCfg.replicaNum; i < pCreate->replica + pCreate->learnerReplica; ++i) {
×
UNCOV
251
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
×
UNCOV
252
    pNode->nodeId = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].id;
×
UNCOV
253
    pNode->nodePort = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].port;
×
UNCOV
254
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
UNCOV
255
    tstrncpy(pNode->nodeFqdn, pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].fqdn, TSDB_FQDN_LEN);
×
UNCOV
256
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
×
UNCOV
257
    pCfg->syncCfg.totalReplicaNum++;
×
258
  }
UNCOV
259
  pCfg->syncCfg.totalReplicaNum += pCfg->syncCfg.replicaNum;
×
UNCOV
260
  if (pCreate->learnerSelfIndex != -1) {
×
UNCOV
261
    pCfg->syncCfg.myIndex = pCreate->replica + pCreate->learnerSelfIndex;
×
262
  }
UNCOV
263
}
×
264

UNCOV
265
static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) {
×
UNCOV
266
  pCfg->vgId = pCreate->vgId;
×
UNCOV
267
  pCfg->vgVersion = pCreate->vgVersion;
×
UNCOV
268
  pCfg->dropped = 0;
×
UNCOV
269
  snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCreate->vgId);
×
UNCOV
270
}
×
271

UNCOV
272
static int32_t vmTsmaAdjustDays(SVnodeCfg *pCfg, SCreateVnodeReq *pReq) {
×
UNCOV
273
  if (pReq->isTsma) {
×
UNCOV
274
    SMsgHead *smaMsg = pReq->pTsma;
×
UNCOV
275
    uint32_t  contLen = (uint32_t)(htonl(smaMsg->contLen) - sizeof(SMsgHead));
×
UNCOV
276
    return smaGetTSmaDays(pCfg, POINTER_SHIFT(smaMsg, sizeof(SMsgHead)), contLen, &pCfg->tsdbCfg.days);
×
277
  }
UNCOV
278
  return 0;
×
279
}
280

281
#if 0
282
static int32_t vmTsmaProcessCreate(SVnode *pVnode, SCreateVnodeReq *pReq) {
283
  if (pReq->isTsma) {
284
    SMsgHead *smaMsg = pReq->pTsma;
285
    uint32_t  contLen = (uint32_t)(htonl(smaMsg->contLen) - sizeof(SMsgHead));
286
    return vnodeProcessCreateTSma(pVnode, POINTER_SHIFT(smaMsg, sizeof(SMsgHead)), contLen);
287
  }
288
  return 0;
289
}
290
#endif
291

UNCOV
292
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
UNCOV
293
  SCreateVnodeReq req = {0};
×
UNCOV
294
  SVnodeCfg       vnodeCfg = {0};
×
UNCOV
295
  SWrapperCfg     wrapperCfg = {0};
×
UNCOV
296
  int32_t         code = -1;
×
UNCOV
297
  char            path[TSDB_FILENAME_LEN] = {0};
×
298

UNCOV
299
  if (tDeserializeSCreateVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
×
300
    return TSDB_CODE_INVALID_MSG;
×
301
  }
302

UNCOV
303
  if (req.learnerReplica == 0) {
×
UNCOV
304
    req.learnerSelfIndex = -1;
×
305
  }
306

UNCOV
307
  dInfo(
×
308
      "vgId:%d, vnode management handle msgType:%s, start to create vnode, page:%d pageSize:%d buffer:%d szPage:%d "
309
      "szBuf:%" PRIu64 ", cacheLast:%d cacheLastSize:%d sstTrigger:%d tsdbPageSize:%d %d dbname:%s dbId:%" PRId64
310
      ", days:%d keep0:%d keep1:%d keep2:%d keepTimeOffset%d s3ChunkSize:%d s3KeepLocal:%d s3Compact:%d tsma:%d "
311
      "precision:%d compression:%d minRows:%d maxRows:%d"
312
      ", wal fsync:%d level:%d retentionPeriod:%d retentionSize:%" PRId64 " rollPeriod:%d segSize:%" PRId64
313
      ", hash method:%d begin:%u end:%u prefix:%d surfix:%d replica:%d selfIndex:%d "
314
      "learnerReplica:%d learnerSelfIndex:%d strict:%d changeVersion:%d encryptAlgorithm:%d",
315
      req.vgId, TMSG_INFO(pMsg->msgType), req.pages, req.pageSize, req.buffer, req.pageSize * 1024,
316
      (uint64_t)req.buffer * 1024 * 1024, req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize,
317
      req.tsdbPageSize * 1024, req.db, req.dbUid, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
318
      req.keepTimeOffset, req.s3ChunkSize, req.s3KeepLocal, req.s3Compact, req.isTsma, req.precision, req.compression,
319
      req.minRows, req.maxRows, req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize,
320
      req.walRollPeriod, req.walSegmentSize, req.hashMethod, req.hashBegin, req.hashEnd, req.hashPrefix, req.hashSuffix,
321
      req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict, req.changeVersion,
322
      req.encryptAlgorithm);
323

UNCOV
324
  for (int32_t i = 0; i < req.replica; ++i) {
×
UNCOV
325
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port,
×
326
          req.replicas[i].id);
327
  }
UNCOV
328
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
×
UNCOV
329
    dInfo("vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.vgId, i, req.learnerReplicas[i].fqdn,
×
330
          req.learnerReplicas[i].port, req.replicas[i].id);
331
  }
332

UNCOV
333
  SReplica *pReplica = NULL;
×
UNCOV
334
  if (req.selfIndex != -1) {
×
UNCOV
335
    pReplica = &req.replicas[req.selfIndex];
×
336
  } else {
UNCOV
337
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
×
338
  }
UNCOV
339
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
×
UNCOV
340
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
×
341
    code = TSDB_CODE_INVALID_MSG;
×
342
    dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode, reason:%s", req.vgId, pReplica->id,
×
343
           pReplica->fqdn, pReplica->port, tstrerror(code));
344
    return code;
×
345
  }
346

UNCOV
347
  if (req.encryptAlgorithm == DND_CA_SM4) {
×
348
    if (strlen(tsEncryptKey) == 0) {
×
349
      code = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
×
350
      dError("vgId:%d, failed to create vnode since encrypt key is empty, reason:%s", req.vgId, tstrerror(code));
×
351
      return code;
×
352
    }
353
  }
354

UNCOV
355
  vmGenerateVnodeCfg(&req, &vnodeCfg);
×
356

UNCOV
357
  if ((code = vmTsmaAdjustDays(&vnodeCfg, &req)) < 0) {
×
358
    dError("vgId:%d, failed to adjust tsma days since %s", req.vgId, tstrerror(code));
×
359
    goto _OVER;
×
360
  }
361

UNCOV
362
  vmGenerateWrapperCfg(pMgmt, &req, &wrapperCfg);
×
363

UNCOV
364
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, req.vgId, false);
×
UNCOV
365
  if (pVnode != NULL && (req.replica == 1 || !pVnode->failed)) {
×
UNCOV
366
    dError("vgId:%d, already exist", req.vgId);
×
UNCOV
367
    (void)tFreeSCreateVnodeReq(&req);
×
UNCOV
368
    vmReleaseVnode(pMgmt, pVnode);
×
UNCOV
369
    code = TSDB_CODE_VND_ALREADY_EXIST;
×
UNCOV
370
    return 0;
×
371
  }
372

UNCOV
373
  int32_t diskPrimary = vmGetPrimaryDisk(pMgmt, vnodeCfg.vgId);
×
UNCOV
374
  if (diskPrimary < 0) {
×
UNCOV
375
    diskPrimary = vmAllocPrimaryDisk(pMgmt, vnodeCfg.vgId);
×
376
  }
UNCOV
377
  wrapperCfg.diskPrimary = diskPrimary;
×
378

UNCOV
379
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
×
380

UNCOV
381
  if ((code = vnodeCreate(path, &vnodeCfg, diskPrimary, pMgmt->pTfs)) < 0) {
×
382
    dError("vgId:%d, failed to create vnode since %s", req.vgId, tstrerror(code));
×
383
    vmReleaseVnode(pMgmt, pVnode);
×
384
    vmCleanPrimaryDisk(pMgmt, req.vgId);
×
385
    (void)tFreeSCreateVnodeReq(&req);
×
386
    return code;
×
387
  }
388

UNCOV
389
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, true);
×
UNCOV
390
  if (pImpl == NULL) {
×
391
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
392
    code = terrno != 0 ? terrno : -1;
×
393
    goto _OVER;
×
394
  }
395

UNCOV
396
  code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl);
×
UNCOV
397
  if (code != 0) {
×
398
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
×
399
    code = terrno != 0 ? terrno : code;
×
400
    goto _OVER;
×
401
  }
402

403
#if 0
404
  code = vmTsmaProcessCreate(pImpl, &req);
405
  if (code != 0) {
406
    dError("vgId:%d, failed to create tsma since %s", req.vgId, terrstr());
407
    code = terrno;
408
    goto _OVER;
409
  }
410
#endif
411

UNCOV
412
  code = vnodeStart(pImpl);
×
UNCOV
413
  if (code != 0) {
×
414
    dError("vgId:%d, failed to start sync since %s", req.vgId, terrstr());
×
415
    goto _OVER;
×
416
  }
417

UNCOV
418
  code = vmWriteVnodeListToFile(pMgmt);
×
UNCOV
419
  if (code != 0) {
×
420
    code = terrno != 0 ? terrno : code;
×
421
    goto _OVER;
×
422
  }
423

UNCOV
424
_OVER:
×
UNCOV
425
  vmCleanPrimaryDisk(pMgmt, req.vgId);
×
426

UNCOV
427
  if (code != 0) {
×
428
    vmCloseFailedVnode(pMgmt, req.vgId);
×
429

430
    vnodeClose(pImpl);
×
431
    vnodeDestroy(0, path, pMgmt->pTfs, 0);
×
432
  } else {
UNCOV
433
    dInfo("vgId:%d, vnode management handle msgType:%s, end to create vnode, vnode is created", req.vgId,
×
434
          TMSG_INFO(pMsg->msgType));
435
  }
436

UNCOV
437
  (void)tFreeSCreateVnodeReq(&req);
×
UNCOV
438
  terrno = code;
×
UNCOV
439
  return code;
×
440
}
441

442
// alter replica doesn't use this, but restore dnode still use this
UNCOV
443
int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
UNCOV
444
  SAlterVnodeTypeReq req = {0};
×
UNCOV
445
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
×
446
    terrno = TSDB_CODE_INVALID_MSG;
×
447
    return -1;
×
448
  }
449

450
  if (req.learnerReplicas == 0) {
451
    req.learnerSelfIndex = -1;
452
  }
453

UNCOV
454
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process alter-node-type-request", req.vgId,
×
455
        TMSG_INFO(pMsg->msgType));
456

UNCOV
457
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
×
UNCOV
458
  if (pVnode == NULL) {
×
459
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
460
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
461
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
462
    return -1;
×
463
  }
464

UNCOV
465
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
×
UNCOV
466
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
×
UNCOV
467
  if (role == TAOS_SYNC_ROLE_VOTER) {
×
468
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
469
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
470
    vmReleaseVnode(pMgmt, pVnode);
×
471
    return -1;
×
472
  }
473

UNCOV
474
  dInfo("vgId:%d, checking node catch up", req.vgId);
×
UNCOV
475
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
×
UNCOV
476
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
×
UNCOV
477
    vmReleaseVnode(pMgmt, pVnode);
×
UNCOV
478
    return -1;
×
479
  }
480

UNCOV
481
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
×
482

UNCOV
483
  int32_t vgId = req.vgId;
×
UNCOV
484
  dInfo("vgId:%d, start to alter vnode type replica:%d selfIndex:%d strict:%d changeVersion:%d", vgId, req.replica,
×
485
        req.selfIndex, req.strict, req.changeVersion);
UNCOV
486
  for (int32_t i = 0; i < req.replica; ++i) {
×
UNCOV
487
    SReplica *pReplica = &req.replicas[i];
×
UNCOV
488
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
×
489
  }
UNCOV
490
  for (int32_t i = 0; i < req.learnerReplica; ++i) {
×
491
    SReplica *pReplica = &req.learnerReplicas[i];
×
492
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
×
493
  }
494

UNCOV
495
  if (req.replica <= 0 || (req.selfIndex < 0 && req.learnerSelfIndex < 0) || req.selfIndex >= req.replica ||
×
UNCOV
496
      req.learnerSelfIndex >= req.learnerReplica) {
×
497
    terrno = TSDB_CODE_INVALID_MSG;
×
498
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
499
    vmReleaseVnode(pMgmt, pVnode);
×
500
    return -1;
×
501
  }
502

UNCOV
503
  SReplica *pReplica = NULL;
×
UNCOV
504
  if (req.selfIndex != -1) {
×
UNCOV
505
    pReplica = &req.replicas[req.selfIndex];
×
506
  } else {
507
    pReplica = &req.learnerReplicas[req.learnerSelfIndex];
×
508
  }
509

UNCOV
510
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
×
UNCOV
511
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
×
512
    terrno = TSDB_CODE_INVALID_MSG;
×
513
    dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode", vgId, pReplica->id, pReplica->fqdn,
×
514
           pReplica->port);
515
    vmReleaseVnode(pMgmt, pVnode);
×
516
    return -1;
×
517
  }
518

UNCOV
519
  dInfo("vgId:%d, start to close vnode", vgId);
×
UNCOV
520
  SWrapperCfg wrapperCfg = {
×
UNCOV
521
      .dropped = pVnode->dropped,
×
UNCOV
522
      .vgId = pVnode->vgId,
×
UNCOV
523
      .vgVersion = pVnode->vgVersion,
×
UNCOV
524
      .diskPrimary = pVnode->diskPrimary,
×
525
  };
UNCOV
526
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
×
527

UNCOV
528
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
×
UNCOV
529
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
×
530

UNCOV
531
  int32_t diskPrimary = wrapperCfg.diskPrimary;
×
UNCOV
532
  char    path[TSDB_FILENAME_LEN] = {0};
×
UNCOV
533
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
×
534

UNCOV
535
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
×
UNCOV
536
  if (vnodeAlterReplica(path, &req, diskPrimary, pMgmt->pTfs) < 0) {
×
537
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
538
    return -1;
×
539
  }
540

UNCOV
541
  dInfo("vgId:%d, begin to open vnode", vgId);
×
UNCOV
542
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, false);
×
UNCOV
543
  if (pImpl == NULL) {
×
544
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
545
    return -1;
×
546
  }
547

UNCOV
548
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
×
549
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
550
    return -1;
×
551
  }
552

UNCOV
553
  if (vnodeStart(pImpl) != 0) {
×
554
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
555
    return -1;
×
556
  }
557

UNCOV
558
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process alter-node-type-request, vnode config is altered",
×
559
        req.vgId, TMSG_INFO(pMsg->msgType));
UNCOV
560
  return 0;
×
561
}
562

563
int32_t vmProcessCheckLearnCatchupReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
564
  SCheckLearnCatchupReq req = {0};
×
565
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
×
566
    terrno = TSDB_CODE_INVALID_MSG;
×
567
    return -1;
×
568
  }
569

570
  if (req.learnerReplicas == 0) {
571
    req.learnerSelfIndex = -1;
572
  }
573

574
  dInfo("vgId:%d, vnode management handle msgType:%s, start to process check-learner-catchup-request", req.vgId,
×
575
        TMSG_INFO(pMsg->msgType));
576

577
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
×
578
  if (pVnode == NULL) {
×
579
    dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
×
580
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
581
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
582
    return -1;
×
583
  }
584

585
  ESyncRole role = vnodeGetRole(pVnode->pImpl);
×
586
  dInfo("vgId:%d, checking node role:%d", req.vgId, role);
×
587
  if (role == TAOS_SYNC_ROLE_VOTER) {
×
588
    dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
×
589
    terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
×
590
    vmReleaseVnode(pMgmt, pVnode);
×
591
    return -1;
×
592
  }
593

594
  dInfo("vgId:%d, checking node catch up", req.vgId);
×
595
  if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
×
596
    terrno = TSDB_CODE_VND_NOT_CATCH_UP;
×
597
    vmReleaseVnode(pMgmt, pVnode);
×
598
    return -1;
×
599
  }
600

601
  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
×
602

603
  vmReleaseVnode(pMgmt, pVnode);
×
604

605
  dInfo("vgId:%d, vnode management handle msgType:%s, end to process check-learner-catchup-request", req.vgId,
×
606
        TMSG_INFO(pMsg->msgType));
607

608
  return 0;
×
609
}
610

UNCOV
611
int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
UNCOV
612
  SDisableVnodeWriteReq req = {0};
×
UNCOV
613
  if (tDeserializeSDisableVnodeWriteReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
×
614
    terrno = TSDB_CODE_INVALID_MSG;
×
615
    return -1;
×
616
  }
617

UNCOV
618
  dInfo("vgId:%d, vnode write disable:%d", req.vgId, req.disable);
×
619

UNCOV
620
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
×
UNCOV
621
  if (pVnode == NULL) {
×
622
    dError("vgId:%d, failed to disable write since %s", req.vgId, terrstr());
×
623
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
624
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
625
    return -1;
×
626
  }
627

UNCOV
628
  pVnode->disable = req.disable;
×
UNCOV
629
  vmReleaseVnode(pMgmt, pVnode);
×
UNCOV
630
  return 0;
×
631
}
632

UNCOV
633
int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
UNCOV
634
  SAlterVnodeHashRangeReq req = {0};
×
UNCOV
635
  if (tDeserializeSAlterVnodeHashRangeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
×
636
    terrno = TSDB_CODE_INVALID_MSG;
×
637
    return -1;
×
638
  }
639

UNCOV
640
  int32_t srcVgId = req.srcVgId;
×
UNCOV
641
  int32_t dstVgId = req.dstVgId;
×
642

UNCOV
643
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, dstVgId);
×
UNCOV
644
  if (pVnode != NULL) {
×
645
    dError("vgId:%d, vnode already exist", dstVgId);
×
646
    vmReleaseVnode(pMgmt, pVnode);
×
647
    terrno = TSDB_CODE_VND_ALREADY_EXIST;
×
648
    return -1;
×
649
  }
650

UNCOV
651
  dInfo("vgId:%d, start to alter vnode hashrange:[%u, %u], dstVgId:%d", req.srcVgId, req.hashBegin, req.hashEnd,
×
652
        req.dstVgId);
UNCOV
653
  pVnode = vmAcquireVnode(pMgmt, srcVgId);
×
UNCOV
654
  if (pVnode == NULL) {
×
655
    dError("vgId:%d, failed to alter hashrange since %s", srcVgId, terrstr());
×
656
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
657
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
658
    return -1;
×
659
  }
660

UNCOV
661
  SWrapperCfg wrapperCfg = {
×
UNCOV
662
      .dropped = pVnode->dropped,
×
663
      .vgId = dstVgId,
UNCOV
664
      .vgVersion = pVnode->vgVersion,
×
UNCOV
665
      .diskPrimary = pVnode->diskPrimary,
×
666
  };
UNCOV
667
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
×
668

669
  // prepare alter
UNCOV
670
  pVnode->toVgId = dstVgId;
×
UNCOV
671
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
×
672
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
673
    return -1;
×
674
  }
675

UNCOV
676
  dInfo("vgId:%d, close vnode", srcVgId);
×
UNCOV
677
  vmCloseVnode(pMgmt, pVnode, true, false);
×
678

UNCOV
679
  int32_t diskPrimary = wrapperCfg.diskPrimary;
×
UNCOV
680
  char    srcPath[TSDB_FILENAME_LEN] = {0};
×
UNCOV
681
  char    dstPath[TSDB_FILENAME_LEN] = {0};
×
UNCOV
682
  snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
×
UNCOV
683
  snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
×
684

UNCOV
685
  dInfo("vgId:%d, alter vnode hashrange at %s", srcVgId, srcPath);
×
UNCOV
686
  if (vnodeAlterHashRange(srcPath, dstPath, &req, diskPrimary, pMgmt->pTfs) < 0) {
×
687
    dError("vgId:%d, failed to alter vnode hashrange since %s", srcVgId, terrstr());
×
688
    return -1;
×
689
  }
690

UNCOV
691
  dInfo("vgId:%d, open vnode", dstVgId);
×
UNCOV
692
  SVnode *pImpl = vnodeOpen(dstPath, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, false);
×
693

UNCOV
694
  if (pImpl == NULL) {
×
695
    dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr());
×
696
    return -1;
×
697
  }
698

UNCOV
699
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
×
700
    dError("vgId:%d, failed to open vnode mgmt since %s", dstVgId, terrstr());
×
701
    return -1;
×
702
  }
703

UNCOV
704
  if (vnodeStart(pImpl) != 0) {
×
705
    dError("vgId:%d, failed to start sync since %s", dstVgId, terrstr());
×
706
    return -1;
×
707
  }
708

709
  // complete alter
UNCOV
710
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
×
711
    dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
×
712
    return -1;
×
713
  }
714

UNCOV
715
  dInfo("vgId:%d, vnode hashrange is altered", dstVgId);
×
UNCOV
716
  return 0;
×
717
}
718

UNCOV
719
int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
UNCOV
720
  SAlterVnodeReplicaReq alterReq = {0};
×
UNCOV
721
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
×
722
    terrno = TSDB_CODE_INVALID_MSG;
×
723
    return -1;
×
724
  }
725

UNCOV
726
  if (alterReq.learnerReplica == 0) {
×
UNCOV
727
    alterReq.learnerSelfIndex = -1;
×
728
  }
729

UNCOV
730
  int32_t vgId = alterReq.vgId;
×
UNCOV
731
  dInfo(
×
732
      "vgId:%d, vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
733
      "learnerSelfIndex:%d strict:%d changeVersion:%d",
734
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
735
      alterReq.learnerSelfIndex, alterReq.strict, alterReq.changeVersion);
736

UNCOV
737
  for (int32_t i = 0; i < alterReq.replica; ++i) {
×
UNCOV
738
    SReplica *pReplica = &alterReq.replicas[i];
×
UNCOV
739
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
×
740
  }
UNCOV
741
  for (int32_t i = 0; i < alterReq.learnerReplica; ++i) {
×
UNCOV
742
    SReplica *pReplica = &alterReq.learnerReplicas[i];
×
UNCOV
743
    dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
×
744
  }
745

UNCOV
746
  if (alterReq.replica <= 0 || (alterReq.selfIndex < 0 && alterReq.learnerSelfIndex < 0) ||
×
UNCOV
747
      alterReq.selfIndex >= alterReq.replica || alterReq.learnerSelfIndex >= alterReq.learnerReplica) {
×
748
    terrno = TSDB_CODE_INVALID_MSG;
×
749
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
×
750
    return -1;
×
751
  }
752

UNCOV
753
  SReplica *pReplica = NULL;
×
UNCOV
754
  if (alterReq.selfIndex != -1) {
×
UNCOV
755
    pReplica = &alterReq.replicas[alterReq.selfIndex];
×
756
  } else {
757
    pReplica = &alterReq.learnerReplicas[alterReq.learnerSelfIndex];
×
758
  }
759

UNCOV
760
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
×
UNCOV
761
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
×
762
    terrno = TSDB_CODE_INVALID_MSG;
×
763
    dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode", vgId, pReplica->id, pReplica->fqdn,
×
764
           pReplica->port);
765
    return -1;
×
766
  }
767

UNCOV
768
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
×
UNCOV
769
  if (pVnode == NULL) {
×
770
    dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
×
771
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
772
    if (pVnode) vmReleaseVnode(pMgmt, pVnode);
×
773
    return -1;
×
774
  }
775

UNCOV
776
  dInfo("vgId:%d, start to close vnode", vgId);
×
UNCOV
777
  SWrapperCfg wrapperCfg = {
×
UNCOV
778
      .dropped = pVnode->dropped,
×
UNCOV
779
      .vgId = pVnode->vgId,
×
UNCOV
780
      .vgVersion = pVnode->vgVersion,
×
UNCOV
781
      .diskPrimary = pVnode->diskPrimary,
×
782
  };
UNCOV
783
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
×
784

UNCOV
785
  bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
×
UNCOV
786
  vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
×
787

UNCOV
788
  int32_t diskPrimary = wrapperCfg.diskPrimary;
×
UNCOV
789
  char    path[TSDB_FILENAME_LEN] = {0};
×
UNCOV
790
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
×
791

UNCOV
792
  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
×
UNCOV
793
  if (vnodeAlterReplica(path, &alterReq, diskPrimary, pMgmt->pTfs) < 0) {
×
794
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
×
795
    return -1;
×
796
  }
797

UNCOV
798
  dInfo("vgId:%d, begin to open vnode", vgId);
×
UNCOV
799
  SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, false);
×
UNCOV
800
  if (pImpl == NULL) {
×
801
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
×
802
    return -1;
×
803
  }
804

UNCOV
805
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
×
806
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
×
807
    return -1;
×
808
  }
809

UNCOV
810
  if (vnodeStart(pImpl) != 0) {
×
811
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
×
812
    return -1;
×
813
  }
814

UNCOV
815
  dInfo(
×
816
      "vgId:%d, vnode management handle msgType:%s, end to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
817
      "learnerSelfIndex:%d strict:%d",
818
      vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
819
      alterReq.learnerSelfIndex, alterReq.strict);
UNCOV
820
  return 0;
×
821
}
822

UNCOV
823
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
UNCOV
824
  int32_t       code = 0;
×
UNCOV
825
  SDropVnodeReq dropReq = {0};
×
UNCOV
826
  if (tDeserializeSDropVnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
×
827
    terrno = TSDB_CODE_INVALID_MSG;
×
828
    return terrno;
×
829
  }
830

UNCOV
831
  int32_t vgId = dropReq.vgId;
×
UNCOV
832
  dInfo("vgId:%d, start to drop vnode", vgId);
×
833

UNCOV
834
  if (dropReq.dnodeId != pMgmt->pData->dnodeId) {
×
835
    terrno = TSDB_CODE_INVALID_MSG;
×
836
    dError("vgId:%d, dnodeId:%d not matched with local dnode", dropReq.vgId, dropReq.dnodeId);
×
837
    return terrno;
×
838
  }
839

UNCOV
840
  SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, vgId, false);
×
UNCOV
841
  if (pVnode == NULL) {
×
842
    dInfo("vgId:%d, failed to drop since %s", vgId, terrstr());
×
843
    terrno = TSDB_CODE_VND_NOT_EXIST;
×
844
    return terrno;
×
845
  }
846

UNCOV
847
  pVnode->dropped = 1;
×
UNCOV
848
  if ((code = vmWriteVnodeListToFile(pMgmt)) != 0) {
×
849
    pVnode->dropped = 0;
×
850
    vmReleaseVnode(pMgmt, pVnode);
×
851
    return code;
×
852
  }
853

UNCOV
854
  vmCloseVnode(pMgmt, pVnode, false, false);
×
UNCOV
855
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
×
856
    dError("vgId:%d, failed to write vnode list since %s", vgId, terrstr());
×
857
  }
858

UNCOV
859
  dInfo("vgId:%d, is dropped", vgId);
×
UNCOV
860
  return 0;
×
861
}
862

UNCOV
863
int32_t vmProcessArbHeartBeatReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
×
UNCOV
864
  SVArbHeartBeatReq arbHbReq = {0};
×
UNCOV
865
  SVArbHeartBeatRsp arbHbRsp = {0};
×
UNCOV
866
  if (tDeserializeSVArbHeartBeatReq(pMsg->pCont, pMsg->contLen, &arbHbReq) != 0) {
×
867
    terrno = TSDB_CODE_INVALID_MSG;
×
868
    return -1;
×
869
  }
870

UNCOV
871
  if (arbHbReq.dnodeId != pMgmt->pData->dnodeId) {
×
872
    terrno = TSDB_CODE_INVALID_MSG;
×
873
    dError("dnodeId:%d not matched with local dnode", arbHbReq.dnodeId);
×
874
    goto _OVER;
×
875
  }
876

UNCOV
877
  if (strlen(arbHbReq.arbToken) == 0) {
×
878
    terrno = TSDB_CODE_INVALID_MSG;
×
879
    dError("dnodeId:%d arbToken is empty", arbHbReq.dnodeId);
×
880
    goto _OVER;
×
881
  }
882

UNCOV
883
  size_t size = taosArrayGetSize(arbHbReq.hbMembers);
×
884

UNCOV
885
  arbHbRsp.dnodeId = pMgmt->pData->dnodeId;
×
UNCOV
886
  tstrncpy(arbHbRsp.arbToken, arbHbReq.arbToken, TSDB_ARB_TOKEN_SIZE);
×
UNCOV
887
  arbHbRsp.hbMembers = taosArrayInit(size, sizeof(SVArbHbRspMember));
×
UNCOV
888
  if (arbHbRsp.hbMembers == NULL) {
×
889
    goto _OVER;
×
890
  }
891

UNCOV
892
  for (int32_t i = 0; i < size; i++) {
×
UNCOV
893
    SVArbHbReqMember *pReqMember = taosArrayGet(arbHbReq.hbMembers, i);
×
UNCOV
894
    SVnodeObj        *pVnode = vmAcquireVnode(pMgmt, pReqMember->vgId);
×
UNCOV
895
    if (pVnode == NULL) {
×
896
      dError("dnodeId:%d vgId:%d not found failed to process arb hb req", arbHbReq.dnodeId, pReqMember->vgId);
×
897
      continue;
×
898
    }
899

UNCOV
900
    SVArbHbRspMember rspMember = {0};
×
UNCOV
901
    rspMember.vgId = pReqMember->vgId;
×
UNCOV
902
    rspMember.hbSeq = pReqMember->hbSeq;
×
UNCOV
903
    if (vnodeGetArbToken(pVnode->pImpl, rspMember.memberToken) != 0) {
×
904
      dError("dnodeId:%d vgId:%d failed to get arb token", arbHbReq.dnodeId, pReqMember->vgId);
×
905
      vmReleaseVnode(pMgmt, pVnode);
×
906
      continue;
×
907
    }
908

UNCOV
909
    if (vnodeUpdateArbTerm(pVnode->pImpl, arbHbReq.arbTerm) != 0) {
×
910
      dError("dnodeId:%d vgId:%d failed to update arb term", arbHbReq.dnodeId, pReqMember->vgId);
×
911
      vmReleaseVnode(pMgmt, pVnode);
×
912
      continue;
×
913
    }
914

UNCOV
915
    if (taosArrayPush(arbHbRsp.hbMembers, &rspMember) == NULL) {
×
916
      dError("dnodeId:%d vgId:%d failed to push arb hb rsp member", arbHbReq.dnodeId, pReqMember->vgId);
×
917
      vmReleaseVnode(pMgmt, pVnode);
×
918
      goto _OVER;
×
919
    }
920

UNCOV
921
    vmReleaseVnode(pMgmt, pVnode);
×
922
  }
923

UNCOV
924
  SRpcMsg rspMsg = {.info = pMsg->info};
×
UNCOV
925
  int32_t rspLen = tSerializeSVArbHeartBeatRsp(NULL, 0, &arbHbRsp);
×
UNCOV
926
  if (rspLen < 0) {
×
927
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
928
    goto _OVER;
×
929
  }
930

UNCOV
931
  void *pRsp = rpcMallocCont(rspLen);
×
UNCOV
932
  if (pRsp == NULL) {
×
933
    terrno = terrno;
×
934
    goto _OVER;
×
935
  }
936

UNCOV
937
  if (tSerializeSVArbHeartBeatRsp(pRsp, rspLen, &arbHbRsp) <= 0) {
×
938
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
939
    rpcFreeCont(pRsp);
×
940
    goto _OVER;
×
941
  }
UNCOV
942
  pMsg->info.rsp = pRsp;
×
UNCOV
943
  pMsg->info.rspLen = rspLen;
×
944

UNCOV
945
  terrno = TSDB_CODE_SUCCESS;
×
946

UNCOV
947
_OVER:
×
UNCOV
948
  tFreeSVArbHeartBeatReq(&arbHbReq);
×
UNCOV
949
  tFreeSVArbHeartBeatRsp(&arbHbRsp);
×
UNCOV
950
  return terrno;
×
951
}
952

UNCOV
953
SArray *vmGetMsgHandles() {
×
UNCOV
954
  int32_t code = -1;
×
UNCOV
955
  SArray *pArray = taosArrayInit(32, sizeof(SMgmtHandle));
×
UNCOV
956
  if (pArray == NULL) goto _OVER;
×
957

UNCOV
958
  if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
UNCOV
959
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
×
UNCOV
960
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
×
UNCOV
961
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
×
UNCOV
962
  if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
×
UNCOV
963
  if (dmSetMgmtHandle(pArray, TDMT_VND_EXEC_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
×
UNCOV
964
  if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
×
UNCOV
965
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
×
UNCOV
966
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
UNCOV
967
  if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
UNCOV
968
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
×
UNCOV
969
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_CFG, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
×
UNCOV
970
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
×
UNCOV
971
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
×
UNCOV
972
  if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
×
UNCOV
973
  if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
×
UNCOV
974
  if (dmSetMgmtHandle(pArray, TDMT_SCH_TASK_NOTIFY, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
×
UNCOV
975
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
UNCOV
976
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
UNCOV
977
  if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_TTL_EXPIRED_TBS, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
UNCOV
978
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
UNCOV
979
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
UNCOV
980
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
UNCOV
981
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
UNCOV
982
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_SMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
UNCOV
983
  if (dmSetMgmtHandle(pArray, TDMT_VND_CANCEL_SMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
UNCOV
984
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
UNCOV
985
  if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
UNCOV
986
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
UNCOV
987
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DELETE_SUB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
UNCOV
988
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
UNCOV
989
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SEEK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
×
UNCOV
990
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_ADD_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
UNCOV
991
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DEL_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
UNCOV
992
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
×
UNCOV
993
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME_PUSH, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
×
UNCOV
994
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_WALINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
×
UNCOV
995
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_COMMITTEDINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
×
UNCOV
996
  if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
UNCOV
997
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
UNCOV
998
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
UNCOV
999
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
×
UNCOV
1000
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
UNCOV
1001
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
UNCOV
1002
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_COMPACT_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
×
UNCOV
1003
  if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
UNCOV
1004
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_NAME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
×
1005

UNCOV
1006
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
UNCOV
1007
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
UNCOV
1008
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
×
UNCOV
1009
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
×
UNCOV
1010
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
×
UNCOV
1011
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
×
UNCOV
1012
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
×
UNCOV
1013
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
×
UNCOV
1014
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
×
UNCOV
1015
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
×
UNCOV
1016
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
×
UNCOV
1017
  if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
×
UNCOV
1018
  if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
×
UNCOV
1019
  if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
×
NEW
1020
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY, vmPutMsgToStreamLongExecQueue, 0) == NULL) goto _OVER;
×
1021

UNCOV
1022
  if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
×
NEW
1023
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
×
NEW
1024
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
×
1025

UNCOV
1026
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
UNCOV
1027
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_CONSEN_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
NEW
1028
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
NEW
1029
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
NEW
1030
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
NEW
1031
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
NEW
1032
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
NEW
1033
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
1034

UNCOV
1035
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
×
UNCOV
1036
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
UNCOV
1037
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
UNCOV
1038
  if (dmSetMgmtHandle(pArray, TDMT_VND_DISABLE_WRITE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
×
UNCOV
1039
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
×
UNCOV
1040
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
UNCOV
1041
  if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
UNCOV
1042
  if (dmSetMgmtHandle(pArray, TDMT_VND_S3MIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
UNCOV
1043
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
×
UNCOV
1044
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
×
UNCOV
1045
  if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
×
UNCOV
1046
  if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
×
UNCOV
1047
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CONFIG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
1048

UNCOV
1049
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT_ELECTION, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
×
UNCOV
1050
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
×
UNCOV
1051
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
×
UNCOV
1052
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
×
UNCOV
1053
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
×
UNCOV
1054
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
×
UNCOV
1055
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
×
UNCOV
1056
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
×
UNCOV
1057
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
×
UNCOV
1058
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
×
UNCOV
1059
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
×
UNCOV
1060
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_FORCE_FOLLOWER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
×
1061

UNCOV
1062
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
×
UNCOV
1063
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
×
UNCOV
1064
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
×
UNCOV
1065
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
×
UNCOV
1066
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
×
1067

UNCOV
1068
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_HEARTBEAT, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
×
UNCOV
1069
  if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_CHECK_SYNC, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
×
UNCOV
1070
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_ASSIGNED_LEADER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
×
1071

UNCOV
1072
  code = 0;
×
1073

UNCOV
1074
_OVER:
×
UNCOV
1075
  if (code != 0) {
×
1076
    taosArrayDestroy(pArray);
×
1077
    return NULL;
×
1078
  } else {
UNCOV
1079
    return pArray;
×
1080
  }
1081
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc