• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4488

12 Jul 2025 07:47AM UTC coverage: 62.207% (-0.7%) from 62.948%
#4488

push

travis-ci

web-flow
docs: update stream docs (#31822)

157961 of 324087 branches covered (48.74%)

Branch coverage included in aggregate %.

244465 of 322830 relevant lines covered (75.73%)

6561668.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.43
/source/dnode/mgmt/mgmt_vnode/src/vmInt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "vmInt.h"
18
#include "libs/function/tudf.h"
19
#include "osMemory.h"
20
#include "tfs.h"
21
#include "vnd.h"
22

23
int32_t vmGetPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
11,249✔
24
  int32_t    diskId = -1;
11,249✔
25
  SVnodeObj *pVnode = NULL;
11,249✔
26

27
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
11,249✔
28
  int32_t r = taosHashGetDup(pMgmt->runngingHash, &vgId, sizeof(int32_t), (void *)&pVnode);
11,249✔
29
  if (pVnode != NULL) {
11,249!
30
    diskId = pVnode->diskPrimary;
×
31
  }
32
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
11,249✔
33
  return diskId;
11,248✔
34
}
35

36
static void vmFreeVnodeObj(SVnodeObj **ppVnode) {
27,792✔
37
  if (!ppVnode || !(*ppVnode)) return;
27,792!
38

39
  SVnodeObj *pVnode = *ppVnode;
27,792✔
40

41
  int32_t refCount = atomic_load_32(&pVnode->refCount);
27,792✔
42
  while (refCount > 0) {
27,793✔
43
    dWarn("vgId:%d, vnode is refenced, retry to free in 200ms, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
1!
44
    taosMsleep(200);
1✔
45
    refCount = atomic_load_32(&pVnode->refCount);
1✔
46
  }
47

48
  taosMemoryFree(pVnode->path);
27,792!
49
  taosMemoryFree(pVnode);
27,792!
50
  ppVnode[0] = NULL;
27,792✔
51
}
52

53
static int32_t vmRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId, int32_t diskId) {
11,249✔
54
  int32_t    code = 0;
11,249✔
55
  SVnodeObj *pCreatingVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
11,249!
56
  if (pCreatingVnode == NULL) {
11,249!
57
    dError("failed to alloc vnode since %s", terrstr());
×
58
    return terrno;
×
59
  }
60
  (void)memset(pCreatingVnode, 0, sizeof(SVnodeObj));
11,249✔
61

62
  pCreatingVnode->vgId = vgId;
11,249✔
63
  pCreatingVnode->diskPrimary = diskId;
11,249✔
64

65
  code = taosThreadRwlockWrlock(&pMgmt->hashLock);
11,249✔
66
  if (code != 0) {
11,249!
67
    taosMemoryFree(pCreatingVnode);
×
68
    return code;
×
69
  }
70

71
  dTrace("vgId:%d, put vnode into creating hash, pCreatingVnode:%p", vgId, pCreatingVnode);
11,249✔
72
  code = taosHashPut(pMgmt->creatingHash, &vgId, sizeof(int32_t), &pCreatingVnode, sizeof(SVnodeObj *));
11,249✔
73
  if (code != 0) {
11,249!
74
    dError("vgId:%d, failed to put vnode to creatingHash", vgId);
×
75
    taosMemoryFree(pCreatingVnode);
×
76
  }
77

78
  int32_t r = taosThreadRwlockUnlock(&pMgmt->hashLock);
11,249✔
79
  if (r != 0) {
11,249!
80
    dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r));
×
81
  }
82

83
  return code;
11,249✔
84
}
85

86
static void vmUnRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId) {
11,249✔
87
  SVnodeObj *pOld = NULL;
11,249✔
88

89
  (void)taosThreadRwlockWrlock(&pMgmt->hashLock);
11,249✔
90
  int32_t r = taosHashGetDup(pMgmt->creatingHash, &vgId, sizeof(int32_t), (void *)&pOld);
11,249✔
91
  if (r != 0) {
11,249!
92
    dError("vgId:%d, failed to get vnode from creating Hash", vgId);
×
93
  }
94
  dTrace("vgId:%d, remove from creating Hash", vgId);
11,249✔
95
  r = taosHashRemove(pMgmt->creatingHash, &vgId, sizeof(int32_t));
11,249✔
96
  if (r != 0) {
11,249!
97
    dError("vgId:%d, failed to remove vnode from creatingHash", vgId);
×
98
  }
99
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
11,249✔
100

101
  if (pOld) {
11,249!
102
    dTrace("vgId:%d, free vnode pOld:%p", vgId, &pOld);
11,249✔
103
    vmFreeVnodeObj(&pOld);
11,249✔
104
  }
105

106
_OVER:
×
107
  if (r != 0) {
11,249!
108
    dError("vgId:%d, failed to remove vnode from creatingHash since %s", vgId, tstrerror(r));
×
109
  }
110
}
11,249✔
111

112
int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
11,248✔
113
  int32_t code = 0;
11,248✔
114
  STfs   *pTfs = pMgmt->pTfs;
11,248✔
115
  int32_t diskId = 0;
11,248✔
116
  if (!pTfs) {
11,248!
117
    return diskId;
×
118
  }
119

120
  // search fs
121
  char vnodePath[TSDB_FILENAME_LEN] = {0};
11,248✔
122
  snprintf(vnodePath, TSDB_FILENAME_LEN - 1, "vnode%svnode%d", TD_DIRSEP, vgId);
11,248✔
123
  char fname[TSDB_FILENAME_LEN] = {0};
11,248✔
124
  char fnameTmp[TSDB_FILENAME_LEN] = {0};
11,248✔
125
  snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME);
11,248✔
126
  snprintf(fnameTmp, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME_TMP);
11,248✔
127

128
  diskId = tfsSearch(pTfs, 0, fname);
11,248✔
129
  if (diskId >= 0) {
11,218!
130
    return diskId;
×
131
  }
132
  diskId = tfsSearch(pTfs, 0, fnameTmp);
11,218✔
133
  if (diskId >= 0) {
11,233!
134
    return diskId;
×
135
  }
136

137
  // alloc
138
  int32_t     disks[TFS_MAX_DISKS_PER_TIER] = {0};
11,233✔
139
  int32_t     numOfVnodes = 0;
11,233✔
140
  SVnodeObj **ppVnodes = NULL;
11,233✔
141

142
  code = taosThreadMutexLock(&pMgmt->mutex);
11,233✔
143
  if (code != 0) {
11,249!
144
    return code;
×
145
  }
146

147
  code = vmGetAllVnodeListFromHashWithCreating(pMgmt, &numOfVnodes, &ppVnodes);
11,249✔
148
  if (code != 0) {
11,249!
149
    int32_t r = taosThreadMutexUnlock(&pMgmt->mutex);
×
150
    if (r != 0) {
×
151
      dError("vgId:%d, failed to unlock mutex since %s", vgId, tstrerror(r));
×
152
    }
153
    return code;
×
154
  }
155

156
  for (int32_t v = 0; v < numOfVnodes; v++) {
42,919✔
157
    SVnodeObj *pVnode = ppVnodes[v];
31,670✔
158
    disks[pVnode->diskPrimary] += 1;
31,670✔
159
  }
160

161
  int32_t minVal = INT_MAX;
11,249✔
162
  int32_t ndisk = tfsGetDisksAtLevel(pTfs, 0);
11,249✔
163
  diskId = 0;
11,249✔
164
  for (int32_t id = 0; id < ndisk; id++) {
22,597✔
165
    if (minVal > disks[id]) {
11,348✔
166
      minVal = disks[id];
11,261✔
167
      diskId = id;
11,261✔
168
    }
169
  }
170
  code = vmRegisterCreatingState(pMgmt, vgId, diskId);
11,249✔
171
  if (code != 0) {
11,249!
172
    int32_t r = taosThreadMutexUnlock(&pMgmt->mutex);
×
173
    if (r != 0) {
×
174
      dError("vgId:%d, failed to unlock mutex since %s", vgId, tstrerror(r));
×
175
    }
176
    goto _OVER;
×
177
  }
178

179
  code = taosThreadMutexUnlock(&pMgmt->mutex);
11,249✔
180
  if (code != 0) {
11,249!
181
    goto _OVER;
×
182
  }
183

184
_OVER:
11,249✔
185

186
  for (int32_t i = 0; i < numOfVnodes; ++i) {
42,919✔
187
    if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
31,670!
188
    vmReleaseVnode(pMgmt, ppVnodes[i]);
31,670✔
189
  }
190
  if (ppVnodes != NULL) {
11,249!
191
    taosMemoryFree(ppVnodes);
11,249!
192
  }
193

194
  if (code != 0) {
11,249!
195
    dError("vgId:%d, failed to alloc disk since %s", vgId, tstrerror(code));
×
196
    return code;
×
197
  } else {
198
    dInfo("vgId:%d, alloc disk:%d of level 0. ndisk:%d, vnodes: %d", vgId, diskId, ndisk, numOfVnodes);
11,249!
199
    return diskId;
11,249✔
200
  }
201
}
202

203
void vmCleanPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) { vmUnRegisterCreatingState(pMgmt, vgId); }
11,249✔
204

205
SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict) {
12,104,821✔
206
  SVnodeObj *pVnode = NULL;
12,104,821✔
207

208
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
12,104,821✔
209
  int32_t r = taosHashGetDup(pMgmt->runngingHash, &vgId, sizeof(int32_t), (void *)&pVnode);
12,114,896✔
210
  if (pVnode == NULL || strict && (pVnode->dropped || pVnode->failed)) {
12,113,881!
211
    terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
50,862✔
212
    pVnode = NULL;
51,348✔
213
  } else {
214
    int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
12,063,019✔
215
    dTrace("vgId:%d, acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
12,063,164✔
216
  }
217
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
12,114,515✔
218

219
  return pVnode;
12,115,093✔
220
}
221

222
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) { return vmAcquireVnodeImpl(pMgmt, vgId, true); }
12,083,853✔
223

224
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
12,180,650✔
225
  if (pVnode == NULL) return;
12,180,650!
226

227
  //(void)taosThreadRwlockRdlock(&pMgmt->lock);
228
  int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
12,180,650✔
229
  dTrace("vgId:%d, release vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
12,185,139✔
230
  //(void)taosThreadRwlockUnlock(&pMgmt->lock);
231
}
232

233
static int32_t vmRegisterRunningState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
14,528✔
234
  SVnodeObj *pOld = NULL;
14,528✔
235
  dInfo("vgId:%d, put vnode into running hash", pVnode->vgId);
14,528!
236

237
  int32_t r = taosHashGetDup(pMgmt->runngingHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
14,528✔
238
  if (r != 0) {
14,528!
239
    dError("vgId:%d, failed to get vnode from hash", pVnode->vgId);
×
240
  }
241
  if (pOld) {
14,528!
242
    vmFreeVnodeObj(&pOld);
×
243
  }
244
  int32_t code = taosHashPut(pMgmt->runngingHash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
14,528✔
245

246
  return code;
14,528✔
247
}
248

249
static void vmUnRegisterRunningState(SVnodeMgmt *pMgmt, int32_t vgId) {
14,528✔
250
  dInfo("vgId:%d, remove from hash", vgId);
14,528!
251
  int32_t r = taosHashRemove(pMgmt->runngingHash, &vgId, sizeof(int32_t));
14,528✔
252
  if (r != 0) {
14,528!
253
    dError("vgId:%d, failed to remove vnode since %s", vgId, tstrerror(r));
×
254
  }
255
}
14,528✔
256

257
static int32_t vmRegisterClosedState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
2,015✔
258
  int32_t    code = 0;
2,015✔
259
  dInfo("vgId:%d, put into closed hash", pVnode->vgId);
2,015!
260
  SVnodeObj *pClosedVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
2,015!
261
  if (pClosedVnode == NULL) {
2,015!
262
    dError("failed to alloc vnode since %s", terrstr());
×
263
    return terrno;
×
264
  }
265
  (void)memset(pClosedVnode, 0, sizeof(SVnodeObj));
2,015✔
266

267
  pClosedVnode->vgId = pVnode->vgId;
2,015✔
268
  pClosedVnode->dropped = pVnode->dropped;
2,015✔
269
  pClosedVnode->vgVersion = pVnode->vgVersion;
2,015✔
270
  pClosedVnode->diskPrimary = pVnode->diskPrimary;
2,015✔
271
  pClosedVnode->toVgId = pVnode->toVgId;
2,015✔
272
  pClosedVnode->mountId = pVnode->mountId;
2,015✔
273

274
  SVnodeObj *pOld = NULL;
2,015✔
275
  int32_t    r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
2,015✔
276
  if (r != 0) {
2,015!
277
    dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
×
278
  }
279
  if (pOld) {
2,015!
280
    vmFreeVnodeObj(&pOld);
×
281
  }
282
  dInfo("vgId:%d, put vnode to closedHash", pVnode->vgId);
2,015!
283
  r = taosHashPut(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), &pClosedVnode, sizeof(SVnodeObj *));
2,015✔
284
  if (r != 0) {
2,015!
285
    dError("vgId:%d, failed to put vnode to closedHash", pVnode->vgId);
×
286
  }
287

288
  return code;
2,015✔
289
}
290

291
static void vmUnRegisterClosedState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
14,528✔
292
  SVnodeObj *pOld = NULL;
14,528✔
293
  dInfo("vgId:%d, remove from closed hash", pVnode->vgId);
14,528!
294
  int32_t    r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
14,528✔
295
  if (r != 0) {
14,528!
296
    dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
×
297
  }
298
  if (pOld != NULL) {
14,528✔
299
    vmFreeVnodeObj(&pOld);
2,015✔
300
    dInfo("vgId:%d, remove from closedHash", pVnode->vgId);
2,015!
301
    r = taosHashRemove(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t));
2,015✔
302
    if (r != 0) {
2,015!
303
      dError("vgId:%d, failed to remove vnode from hash", pVnode->vgId);
×
304
    }
305
  }
306
}
14,528✔
307
#ifdef USE_MOUNT
308
int32_t vmAcquireMountTfs(SVnodeMgmt *pMgmt, int64_t mountId, const char* mountName, const char *mountPath, STfs **ppTfs) {
×
309
  int32_t    code = 0, lino = 0;
×
310
  TdFilePtr  pFile = NULL;
×
311
  SArray    *pDisks = NULL;
×
312
  SMountTfs *pMountTfs = NULL;
×
313
  bool       unlock = false;
×
314

315
  pMountTfs = taosHashGet(pMgmt->mountTfsHash, &mountId, sizeof(mountId));
×
316
  if (pMountTfs && *(SMountTfs **)pMountTfs) {
×
317
    if (!(*ppTfs = (*(SMountTfs **)pMountTfs)->pTfs)) {
×
318
      TAOS_CHECK_EXIT(TSDB_CODE_INTERNAL_ERROR);
×
319
    }
320
    atomic_add_fetch_32(&(*(SMountTfs **)pMountTfs)->nRef, 1);
×
321
    TAOS_RETURN(code);
×
322
  }
323
  if (!mountPath || mountPath[0] == 0 || mountId == 0) {
×
324
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_PARA);
×
325
  }
326
  (void)(taosThreadMutexLock(&pMgmt->mutex));
×
327
  unlock = true;
×
328
  pMountTfs = taosHashGet(pMgmt->mountTfsHash, &mountId, sizeof(mountId));
×
329
  if (pMountTfs && *(SMountTfs **)pMountTfs) {
×
330
    if (!(*ppTfs = (*(SMountTfs **)pMountTfs)->pTfs)) {
×
331
      TAOS_CHECK_EXIT(TSDB_CODE_INTERNAL_ERROR);
×
332
    }
333
    (void)taosThreadMutexUnlock(&pMgmt->mutex);
×
334
    atomic_add_fetch_32(&(*(SMountTfs **)pMountTfs)->nRef, 1);
×
335
    TAOS_RETURN(code);
×
336
  }
337

338
  TAOS_CHECK_EXIT(vmMountCheckRunning(mountName, mountPath, &pFile, 3));
×
339
  TAOS_CHECK_EXIT(vmGetMountDisks(pMgmt, mountPath, &pDisks));
×
340
  int32_t numOfDisks = taosArrayGetSize(pDisks);
×
341
  if (numOfDisks <= 0) {
×
342
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
343
  }
344
  TSDB_CHECK_NULL((pMountTfs = taosMemoryCalloc(1, sizeof(SMountTfs))), code, lino, _exit, terrno);
×
345
  if (mountName) (void)snprintf(pMountTfs->name, sizeof(pMountTfs->name), "%s", mountName);
×
346
  if (mountPath) (void)snprintf(pMountTfs->path, sizeof(pMountTfs->path), "%s", mountPath);
×
347
  pMountTfs->pFile = pFile;
×
348
  atomic_store_32(&pMountTfs->nRef, 2);  // init and acquire
×
349
  TAOS_CHECK_EXIT(tfsOpen(TARRAY_GET_ELEM(pDisks, 0), numOfDisks, &pMountTfs->pTfs));
×
350
  TAOS_CHECK_EXIT(taosHashPut(pMgmt->mountTfsHash, &mountId, sizeof(mountId), &pMountTfs, POINTER_BYTES));
×
351
_exit:
×
352
  if (unlock) {
×
353
    (void)taosThreadMutexUnlock(&pMgmt->mutex);
×
354
  }
355
  taosArrayDestroy(pDisks);
×
356
  if (code != 0) {
×
357
    dError("mount:%" PRIi64 ",%s, failed at line %d to get mount tfs since %s", mountId, mountPath ? mountPath : "NULL",
×
358
           lino, tstrerror(code));
359
    if (pFile) {
×
360
      (void)taosUnLockFile(pFile);
×
361
      (void)taosCloseFile(&pFile);
×
362
    }
363
    if (pMountTfs) {
×
364
      tfsClose(pMountTfs->pTfs);
×
365
      taosMemoryFree(pMountTfs);
×
366
    }
367
    *ppTfs = NULL;
×
368
  } else {
369
    *ppTfs = pMountTfs->pTfs;
×
370
  }
371

372
  TAOS_RETURN(code);
×
373
}
374
#endif
375

376
bool vmReleaseMountTfs(SVnodeMgmt *pMgmt, int64_t mountId, int32_t minRef) {
×
377
#ifdef USE_MOUNT
378
  SMountTfs *pMountTfs = NULL;
×
379
  int32_t    nRef = INT32_MAX;
×
380

381
  pMountTfs = taosHashGet(pMgmt->mountTfsHash, &mountId, sizeof(mountId));
×
382
  if (pMountTfs && *(SMountTfs **)pMountTfs) {
×
383
    if ((nRef = atomic_sub_fetch_32(&(*(SMountTfs **)pMountTfs)->nRef, 1)) <= minRef) {
×
384
      (void)(taosThreadMutexLock(&pMgmt->mutex));
×
385
      SMountTfs *pTmp = taosHashGet(pMgmt->mountTfsHash, &mountId, sizeof(mountId));
×
386
      if (pTmp && *(SMountTfs **)pTmp) {
×
387
        dInfo("mount:%" PRIi64 ", ref:%d, release mount tfs", mountId, nRef);
×
388
        tfsClose((*(SMountTfs **)pTmp)->pTfs);
×
389
        if ((*(SMountTfs **)pTmp)->pFile) {
×
390
          (void)taosUnLockFile((*(SMountTfs **)pTmp)->pFile);
×
391
          (void)taosCloseFile(&(*(SMountTfs **)pTmp)->pFile);
×
392
        }
393
        taosMemoryFree(*(SMountTfs **)pTmp);
×
394
        taosHashRemove(pMgmt->mountTfsHash, &mountId, sizeof(mountId));
×
395
      }
396
      (void)taosThreadMutexUnlock(&pMgmt->mutex);
×
397
      return true;
×
398
    }
399
  }
400
#endif
401
  return false;
×
402
}
403

404

405
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
14,528✔
406
  SVnodeObj *pVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
14,528!
407
  if (pVnode == NULL) {
14,528!
408
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
409
    return -1;
×
410
  }
411

412
  pVnode->vgId = pCfg->vgId;
14,528✔
413
  pVnode->vgVersion = pCfg->vgVersion;
14,528✔
414
  pVnode->diskPrimary = pCfg->diskPrimary;
14,528✔
415
  pVnode->mountId = pCfg->mountId;
14,528✔
416
  pVnode->refCount = 0;
14,528✔
417
  pVnode->dropped = 0;
14,528✔
418
  pVnode->failed = 0;
14,528✔
419
  pVnode->path = taosStrdup(pCfg->path);
14,528!
420
  pVnode->pImpl = pImpl;
14,528✔
421

422
  if (pVnode->path == NULL) {
14,528!
423
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
424
    taosMemoryFree(pVnode);
×
425
    return -1;
×
426
  }
427

428
  if (pImpl) {
14,528!
429
    if (vmAllocQueue(pMgmt, pVnode) != 0) {
14,528!
430
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
431
      taosMemoryFree(pVnode->path);
×
432
      taosMemoryFree(pVnode);
×
433
      return -1;
×
434
    }
435
  } else {
436
    pVnode->failed = 1;
×
437
  }
438

439
  (void)taosThreadRwlockWrlock(&pMgmt->hashLock);
14,528✔
440
  int32_t code = vmRegisterRunningState(pMgmt, pVnode);
14,528✔
441
  vmUnRegisterClosedState(pMgmt, pVnode);
14,528✔
442
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
14,528✔
443

444
  TAOS_RETURN(code);
14,528✔
445
}
446

447
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal, bool keepClosed) {
14,528✔
448
  char path[TSDB_FILENAME_LEN] = {0};
14,528✔
449
  bool atExit = true;
14,528✔
450

451
  if (pVnode->pImpl && vnodeIsLeader(pVnode->pImpl)) {
14,528✔
452
    vnodeProposeCommitOnNeed(pVnode->pImpl, atExit);
11,733✔
453
  }
454

455
  (void)taosThreadRwlockWrlock(&pMgmt->hashLock);
14,524✔
456
  vmUnRegisterRunningState(pMgmt, pVnode->vgId);
14,528✔
457
  if (keepClosed) {
14,528✔
458
    if (vmRegisterClosedState(pMgmt, pVnode) != 0) {
2,015!
459
      (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
460
      return;
×
461
    };
462
  }
463
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
14,528✔
464

465
  vmReleaseVnode(pMgmt, pVnode);
14,528✔
466

467
  if (pVnode->failed) {
14,528!
468
    goto _closed;
×
469
  }
470
  dInfo("vgId:%d, pre close", pVnode->vgId);
14,528!
471
  vnodePreClose(pVnode->pImpl);
14,528✔
472

473
  dInfo("vgId:%d, wait for vnode ref become 0", pVnode->vgId);
14,524!
474
  while (pVnode->refCount > 0) taosMsleep(10);
14,528!
475

476
  dInfo("vgId:%d, wait for vnode write queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteW.queue,
14,528!
477
        taosQueueGetThreadId(pVnode->pWriteW.queue));
478
  tMultiWorkerCleanup(&pVnode->pWriteW);
14,528✔
479

480
  dInfo("vgId:%d, wait for vnode sync queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue,
14,528!
481
        taosQueueGetThreadId(pVnode->pSyncW.queue));
482
  tMultiWorkerCleanup(&pVnode->pSyncW);
14,528✔
483

484
  dInfo("vgId:%d, wait for vnode sync rd queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncRdW.queue,
14,527!
485
        taosQueueGetThreadId(pVnode->pSyncRdW.queue));
486
  tMultiWorkerCleanup(&pVnode->pSyncRdW);
14,528✔
487

488
  dInfo("vgId:%d, wait for vnode apply queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue,
14,528!
489
        taosQueueGetThreadId(pVnode->pApplyW.queue));
490
  tMultiWorkerCleanup(&pVnode->pApplyW);
14,528✔
491

492
  dInfo("vgId:%d, wait for vnode fetch queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
14,526!
493
        taosQueueGetThreadId(pVnode->pFetchQ));
494
  while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
14,528!
495

496
  dInfo("vgId:%d, wait for vnode query queue:%p is empty", pVnode->vgId, pVnode->pQueryQ);
14,528!
497
  while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
14,528!
498

499
  tqNotifyClose(pVnode->pImpl->pTq);
14,528✔
500

501
  dInfo("vgId:%d, wait for vnode stream queue:%p is empty, %d remains", pVnode->vgId,
14,528!
502
        pVnode->pStreamQ, taosQueueItemSize(pVnode->pStreamQ));
503
  while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(50);
14,528!
504

505
  dInfo("vgId:%d, wait for vnode stream ctrl queue:%p is empty", pVnode->vgId, pVnode->pStreamCtrlQ);
14,528!
506
  while (!taosQueueEmpty(pVnode->pStreamCtrlQ)) taosMsleep(50);
14,528!
507

508
  dInfo("vgId:%d, wait for vnode stream long-exec queue:%p is empty, %d remains", pVnode->vgId,
14,528!
509
        pVnode->pStreamLongExecQ, taosQueueItemSize(pVnode->pStreamLongExecQ));
510
  while (!taosQueueEmpty(pVnode->pStreamLongExecQ)) taosMsleep(50);
14,528!
511

512
  dInfo("vgId:%d, wait for vnode stream chkpt queue:%p is empty", pVnode->vgId, pVnode->pStreamChkQ);
14,528!
513
  while (!taosQueueEmpty(pVnode->pStreamChkQ)) taosMsleep(10);
14,528!
514

515
  dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId);
14,528!
516

517
  dInfo("vgId:%d, post close", pVnode->vgId);
14,528!
518
  vnodePostClose(pVnode->pImpl);
14,528✔
519

520
  vmFreeQueue(pMgmt, pVnode);
14,528✔
521

522
  if (commitAndRemoveWal) {
14,528✔
523
    dInfo("vgId:%d, commit data for vnode split", pVnode->vgId);
32!
524
    if (vnodeSyncCommit(pVnode->pImpl) != 0) {
32!
525
      dError("vgId:%d, failed to commit data", pVnode->vgId);
×
526
    }
527
    if (vnodeBegin(pVnode->pImpl) != 0) {
32!
528
      dError("vgId:%d, failed to begin", pVnode->vgId);
×
529
    }
530
    dInfo("vgId:%d, commit data finished", pVnode->vgId);
32!
531
  }
532

533
  int32_t nodeId = vnodeNodeId(pVnode->pImpl);
14,528✔
534
  vnodeClose(pVnode->pImpl);
14,527✔
535
  pVnode->pImpl = NULL;
14,528✔
536

537
_closed:
14,528✔
538
  dInfo("vgId:%d, vnode is closed", pVnode->vgId);
14,528!
539

540
  if (commitAndRemoveWal) {
14,528✔
541
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d%swal", TD_DIRSEP, pVnode->vgId, TD_DIRSEP);
32✔
542
    dInfo("vgId:%d, remove all wals, path:%s", pVnode->vgId, path);
32!
543
    if (tfsRmdir(pMgmt->pTfs, path) != 0) {
32!
544
      dTrace("vgId:%d, failed to remove wals, path:%s", pVnode->vgId, path);
×
545
    }
546
    if (tfsMkdir(pMgmt->pTfs, path) != 0) {
32!
547
      dTrace("vgId:%d, failed to create wals, path:%s", pVnode->vgId, path);
×
548
    }
549
  }
550

551
  if (pVnode->dropped) {
14,528✔
552
    dInfo("vgId:%d, vnode is destroyed, dropped:%d", pVnode->vgId, pVnode->dropped);
4,616!
553
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId);
4,616✔
554
    vnodeDestroy(pVnode->vgId, path, pMgmt->pTfs, nodeId);
4,616✔
555
  }
556
  if (pVnode->mountId && vmReleaseMountTfs(pMgmt, pVnode->mountId, pVnode->dropped ? 1 : 0)) {
14,528!
557
    vmWriteMountListToFile(pMgmt);
×
558
  }
559

560
  vmFreeVnodeObj(&pVnode);
14,528✔
561
}
562

563
void vmCloseFailedVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
×
564
  int32_t r = 0;
×
565
  r = taosThreadRwlockWrlock(&pMgmt->hashLock);
×
566
  if (r != 0) {
×
567
    dError("vgId:%d, failed to lock since %s", vgId, tstrerror(r));
×
568
  }
569
  if (r == 0) {
×
570
    vmUnRegisterRunningState(pMgmt, vgId);
×
571
  }
572
  r = taosThreadRwlockUnlock(&pMgmt->hashLock);
×
573
  if (r != 0) {
×
574
    dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r));
×
575
  }
576
}
×
577

578
static int32_t vmRestoreVgroupId(SWrapperCfg *pCfg, STfs *pTfs) {
×
579
  int32_t srcVgId = pCfg->vgId;
×
580
  int32_t dstVgId = pCfg->toVgId;
×
581
  if (dstVgId == 0) return 0;
×
582

583
  char srcPath[TSDB_FILENAME_LEN];
584
  char dstPath[TSDB_FILENAME_LEN];
585

586
  snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
×
587
  snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
×
588

589
  int32_t diskPrimary = pCfg->diskPrimary;
×
590
  int32_t vgId = vnodeRestoreVgroupId(srcPath, dstPath, srcVgId, dstVgId, diskPrimary, pTfs);
×
591
  if (vgId <= 0) {
×
592
    dError("vgId:%d, failed to restore vgroup id. srcPath: %s", pCfg->vgId, srcPath);
×
593
    return -1;
×
594
  }
595

596
  pCfg->vgId = vgId;
×
597
  pCfg->toVgId = 0;
×
598
  return 0;
×
599
}
600

601
static void *vmOpenVnodeInThread(void *param) {
1,216✔
602
  SVnodeThread *pThread = param;
1,216✔
603
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
1,216✔
604
  char          path[TSDB_FILENAME_LEN];
605

606
  dInfo("thread:%d, start to open or destroy %d vnodes", pThread->threadIndex, pThread->vnodeNum);
1,216!
607
  setThreadName("open-vnodes");
1,216✔
608

609
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
2,447✔
610
    SWrapperCfg *pCfg = &pThread->pCfgs[v];
1,232✔
611
    if (pCfg->dropped) {
1,232!
612
      char stepDesc[TSDB_STEP_DESC_LEN] = {0};
×
613
      snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to destroy, %d of %d have been dropped", pCfg->vgId,
×
614
               pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
615
      tmsgReportStartup("vnode-destroy", stepDesc);
×
616

617
      snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
×
618
      vnodeDestroy(pCfg->vgId, path, pMgmt->pTfs, 0);
×
619
      pThread->updateVnodesList = true;
×
620
      pThread->dropped++;
×
621
      (void)atomic_add_fetch_32(&pMgmt->state.dropVnodes, 1);
×
622
      continue;
×
623
    }
624

625
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
1,232✔
626
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
1,232✔
627
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
628
    tmsgReportStartup("vnode-open", stepDesc);
1,232✔
629

630
    if (pCfg->toVgId) {
1,232!
631
      if (vmRestoreVgroupId(pCfg, pMgmt->pTfs) != 0) {
×
632
        dError("vgId:%d, failed to restore vgroup id by thread:%d", pCfg->vgId, pThread->threadIndex);
×
633
        pThread->failed++;
×
634
        continue;
×
635
      }
636
      pThread->updateVnodesList = true;
×
637
    }
638

639
    int32_t diskPrimary = pCfg->mountId == 0 ? pCfg->diskPrimary : 0;
1,232!
640
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
1,232✔
641

642
    STfs *pMountTfs = NULL;
1,232✔
643
#ifdef USE_MOUNT
644
    bool releaseTfs = false;
1,232✔
645
    if (pCfg->mountId) {
1,232!
646
      if (vmAcquireMountTfs(pMgmt, pCfg->mountId, NULL, NULL, &pMountTfs) != 0) {
×
647
        dError("vgId:%d, failed to get mount tfs by thread:%d", pCfg->vgId, pThread->threadIndex);
×
648
        pThread->failed++;
×
649
        continue;
×
650
      }
651
      releaseTfs = true;
×
652
    }
653
#endif
654

655
    SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMountTfs, pMgmt->msgCb, false);
1,232✔
656

657
    if (pImpl == NULL) {
1,232!
658
      dError("vgId:%d, failed to open vnode by thread:%d since %s", pCfg->vgId, pThread->threadIndex, terrstr());
×
659
      if (terrno != TSDB_CODE_NEED_RETRY) {
×
660
        pThread->failed++;
×
661
#ifdef USE_MOUNT
662
        if (releaseTfs) vmReleaseMountTfs(pMgmt, pCfg->mountId, 0);
×
663
#endif
664
        continue;
×
665
      }
666
    }
667

668
    if (pImpl != NULL) {
1,232!
669
      if (vmOpenVnode(pMgmt, pCfg, pImpl) != 0) {
1,232!
670
        dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
×
671
        pThread->failed++;
×
672
#ifdef USE_MOUNT
673
        if (releaseTfs) vmReleaseMountTfs(pMgmt, pCfg->mountId, 0);
×
674
#endif
675
        continue;
×
676
      }
677
    }
678

679
    dInfo("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
1,232!
680
    pThread->opened++;
1,232✔
681
    (void)atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
1,232✔
682
  }
683

684
  dInfo("thread:%d, numOfVnodes:%d, opened:%d dropped:%d failed:%d", pThread->threadIndex, pThread->vnodeNum,
1,215!
685
        pThread->opened, pThread->dropped, pThread->failed);
686
  return NULL;
1,216✔
687
}
688

689
#ifdef USE_MOUNT
690
static int32_t vmOpenMountTfs(SVnodeMgmt *pMgmt) {
3,021✔
691
  int32_t    code = 0, lino = 0;
3,021✔
692
  int32_t    numOfMounts = 0;
3,021✔
693
  SMountCfg *pMountCfgs = NULL;
3,021✔
694
  SArray    *pDisks = NULL;
3,021✔
695
  TdFilePtr  pFile = NULL;
3,021✔
696
  SMountTfs *pMountTfs = NULL;
3,021✔
697

698
  TAOS_CHECK_EXIT(vmGetMountListFromFile(pMgmt, &pMountCfgs, &numOfMounts));
3,021!
699
  for (int32_t i = 0; i < numOfMounts; ++i) {
3,021!
700
    SMountCfg *pCfg = &pMountCfgs[i];
×
701
    if (taosHashGet(pMgmt->mountTfsHash, &pCfg->mountId, sizeof(pCfg->mountId))) {
×
702
      TAOS_CHECK_EXIT(TSDB_CODE_INTERNAL_ERROR);
×
703
    }
704
    TAOS_CHECK_EXIT(vmMountCheckRunning(pCfg->name, pCfg->path, &pFile, 3));
×
705
    TAOS_CHECK_EXIT(vmGetMountDisks(pMgmt, pCfg->path, &pDisks));
×
706
    int32_t nDisks = taosArrayGetSize(pDisks);
×
707
    if (nDisks < 1 || nDisks > TFS_MAX_DISKS) {
×
708
      dError("mount:%s, %" PRIi64 ", %s, invalid number of disks:%d, expect 1 to %d", pCfg->name, pCfg->mountId,
×
709
             pCfg->path, nDisks, TFS_MAX_DISKS);
710
      TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
711
    }
712

713
    TSDB_CHECK_NULL((pMountTfs = taosMemoryCalloc(1, sizeof(SMountTfs))), code, lino, _exit, terrno);
×
714
    TAOS_CHECK_EXIT(tfsOpen(TARRAY_GET_ELEM(pDisks, 0), TARRAY_SIZE(pDisks), &pMountTfs->pTfs));
×
715
    (void)snprintf(pMountTfs->name, sizeof(pMountTfs->name), "%s", pCfg->name);
×
716
    (void)snprintf(pMountTfs->path, sizeof(pMountTfs->path), "%s", pCfg->path);
×
717
    pMountTfs->pFile = pFile;
×
718
    pMountTfs->nRef = 1;
×
719
    TAOS_CHECK_EXIT(taosHashPut(pMgmt->mountTfsHash, &pCfg->mountId, sizeof(pCfg->mountId), &pMountTfs, POINTER_BYTES));
×
720
    taosArrayDestroy(pDisks);
×
721
    pDisks = NULL;
×
722
    pMountTfs = NULL;
×
723
    pFile = NULL;
×
724
  }
725
_exit:
3,021✔
726
  if (code != 0) {
3,021!
727
    dError("failed to open mount tfs at line %d since %s", lino, tstrerror(code));
×
728
    if(pFile) {
×
729
      (void)taosUnLockFile(pFile);
×
730
      (void)taosCloseFile(&pFile);
×
731
    }
732
    if (pMountTfs) {
×
733
      tfsClose(pMountTfs->pTfs);
×
734
      taosMemoryFree(pMountTfs);
×
735
    }
736
    taosArrayDestroy(pDisks);
×
737
  }
738
  taosMemoryFree(pMountCfgs);
3,021!
739
  TAOS_RETURN(code);
3,021✔
740
}
741
#endif
742
static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
3,021✔
743
  pMgmt->runngingHash =
3,021✔
744
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
3,021✔
745
  if (pMgmt->runngingHash == NULL) {
3,021!
746
    dError("failed to init vnode hash since %s", terrstr());
×
747
    return TSDB_CODE_OUT_OF_MEMORY;
×
748
  }
749

750
  pMgmt->closedHash =
3,021✔
751
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
3,021✔
752
  if (pMgmt->closedHash == NULL) {
3,021!
753
    dError("failed to init vnode closed hash since %s", terrstr());
×
754
    return TSDB_CODE_OUT_OF_MEMORY;
×
755
  }
756

757
  pMgmt->creatingHash =
3,021✔
758
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
3,021✔
759
  if (pMgmt->creatingHash == NULL) {
3,021!
760
    dError("failed to init vnode creatingHash hash since %s", terrstr());
×
761
    return TSDB_CODE_OUT_OF_MEMORY;
×
762
  }
763

764
  SWrapperCfg *pCfgs = NULL;
3,021✔
765
  int32_t      numOfVnodes = 0;
3,021✔
766
  int32_t      code = 0;
3,021✔
767
  if ((code = vmGetVnodeListFromFile(pMgmt, &pCfgs, &numOfVnodes)) != 0) {
3,021!
768
    dInfo("failed to get vnode list from disk since %s", tstrerror(code));
×
769
    return code;
×
770
  }
771

772
  pMgmt->state.totalVnodes = numOfVnodes;
3,021✔
773

774
  int32_t threadNum = tsNumOfCores / 2;
3,021✔
775
  if (threadNum < 1) threadNum = 1;
3,021!
776
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
3,021✔
777

778
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
3,021!
779
  if (threads == NULL) {
3,021!
780
    dError("failed to allocate memory for threads since %s", terrstr());
×
781
    taosMemoryFree(pCfgs);
×
782
    return terrno;
×
783
  }
784

785
  for (int32_t t = 0; t < threadNum; ++t) {
63,441✔
786
    threads[t].threadIndex = t;
60,420✔
787
    threads[t].pMgmt = pMgmt;
60,420✔
788
    threads[t].pCfgs = taosMemoryCalloc(vnodesPerThread, sizeof(SWrapperCfg));
60,420!
789
  }
790

791
  for (int32_t v = 0; v < numOfVnodes; ++v) {
4,253✔
792
    int32_t       t = v % threadNum;
1,232✔
793
    SVnodeThread *pThread = &threads[t];
1,232✔
794
    pThread->pCfgs[pThread->vnodeNum++] = pCfgs[v];
1,232✔
795
  }
796

797
  dInfo("open %d vnodes with %d threads", numOfVnodes, threadNum);
3,021!
798

799
  for (int32_t t = 0; t < threadNum; ++t) {
63,441✔
800
    SVnodeThread *pThread = &threads[t];
60,420✔
801
    if (pThread->vnodeNum == 0) continue;
60,420✔
802

803
    TdThreadAttr thAttr;
804
    (void)taosThreadAttrInit(&thAttr);
1,216✔
805
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,216✔
806
#ifdef TD_COMPACT_OS
807
    (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
808
#endif
809
    if (taosThreadCreate(&pThread->thread, &thAttr, vmOpenVnodeInThread, pThread) != 0) {
1,216!
810
      dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(ERRNO));
×
811
    }
812

813
    (void)taosThreadAttrDestroy(&thAttr);
1,216✔
814
  }
815

816
  bool updateVnodesList = false;
3,021✔
817

818
  for (int32_t t = 0; t < threadNum; ++t) {
63,441✔
819
    SVnodeThread *pThread = &threads[t];
60,420✔
820
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
60,420!
821
      (void)taosThreadJoin(pThread->thread, NULL);
1,216✔
822
      taosThreadClear(&pThread->thread);
1,216✔
823
    }
824
    taosMemoryFree(pThread->pCfgs);
60,420!
825
    if (pThread->updateVnodesList) updateVnodesList = true;
60,420!
826
  }
827
  taosMemoryFree(threads);
3,021!
828
  taosMemoryFree(pCfgs);
3,021!
829

830
  if ((pMgmt->state.openVnodes + pMgmt->state.dropVnodes) != pMgmt->state.totalVnodes) {
3,021!
831
    dError("there are total vnodes:%d, opened:%d", pMgmt->state.totalVnodes, pMgmt->state.openVnodes);
×
832
    return terrno = TSDB_CODE_VND_INIT_FAILED;
×
833
  }
834

835
  if (updateVnodesList && (code = vmWriteVnodeListToFile(pMgmt)) != 0) {
3,021!
836
    dError("failed to write vnode list since %s", tstrerror(code));
×
837
    return code;
×
838
  }
839

840
#ifdef USE_MOUNT
841
  bool  updateMountList = false;
3,021✔
842
  void *pIter = NULL;
3,021✔
843
  while ((pIter = taosHashIterate(pMgmt->mountTfsHash, pIter))) {
3,021!
844
    SMountTfs *pMountTfs = *(SMountTfs **)pIter;
×
845
    if (pMountTfs && atomic_load_32(&pMountTfs->nRef) <= 1) {
×
846
      size_t  keyLen = 0;
×
847
      int64_t mountId = *(int64_t *)taosHashGetKey(pIter, &keyLen);
×
848
      dInfo("mount:%s, %s, %" PRIi64 ", ref:%d, remove unused mount tfs", pMountTfs->name, pMountTfs->path, mountId,
×
849
            atomic_load_32(&pMountTfs->nRef));
850
      if(pMountTfs->pFile) {
×
851
        (void)taosUnLockFile(pMountTfs->pFile);
×
852
        (void)taosCloseFile(&pMountTfs->pFile);
×
853
      }
854
      tfsClose(pMountTfs->pTfs);
×
855
      taosMemoryFree(pMountTfs);
×
856
      taosHashRemove(pMgmt->mountTfsHash, &mountId, keyLen);
×
857
      updateMountList = true;
×
858
    }
859
  }
860
  if (updateMountList && (code = vmWriteMountListToFile(pMgmt)) != 0) {
3,021!
861
    dError("failed to write mount list at line %d since %s", __LINE__, tstrerror(code));
×
862
    return code;
×
863
  }
864
#endif
865

866
  dInfo("successfully opened %d vnodes", pMgmt->state.totalVnodes);
3,021!
867
  return 0;
3,021✔
868
}
869

870
static void *vmCloseVnodeInThread(void *param) {
7,728✔
871
  SVnodeThread *pThread = param;
7,728✔
872
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
7,728✔
873

874
  dInfo("thread:%d, start to close %d vnodes", pThread->threadIndex, pThread->vnodeNum);
7,728✔
875
  setThreadName("close-vnodes");
7,729✔
876

877
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
15,593✔
878
    SVnodeObj *pVnode = pThread->ppVnodes[v];
7,865✔
879

880
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
7,865✔
881
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to close, %d of %d have been closed", pVnode->vgId,
7,865✔
882
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
883
    tmsgReportStartup("vnode-close", stepDesc);
7,865✔
884

885
    vmCloseVnode(pMgmt, pVnode, false, false);
7,864✔
886
  }
887

888
  dInfo("thread:%d, numOfVnodes:%d is closed", pThread->threadIndex, pThread->vnodeNum);
7,728!
889
  return NULL;
7,728✔
890
}
891

892
static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
3,021✔
893
  int32_t code = 0;
3,021✔
894
  dInfo("start to close all vnodes");
3,021!
895
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
3,021✔
896
  dInfo("vnodes mgmt worker is stopped");
3,021!
897
  tSingleWorkerCleanup(&pMgmt->mgmtMultiWorker);
3,021✔
898
  dInfo("vnodes multiple mgmt worker is stopped");
3,021!
899

900
  int32_t     numOfVnodes = 0;
3,021✔
901
  SVnodeObj **ppVnodes = NULL;
3,021✔
902
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
3,021✔
903
  if (code != 0) {
3,021!
904
    dError("failed to get vnode list since %s", tstrerror(code));
×
905
    return;
×
906
  }
907

908
  int32_t threadNum = tsNumOfCores / 2;
3,021✔
909
  if (threadNum < 1) threadNum = 1;
3,021!
910
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
3,021✔
911

912
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
3,021!
913
  for (int32_t t = 0; t < threadNum; ++t) {
63,441✔
914
    threads[t].threadIndex = t;
60,420✔
915
    threads[t].pMgmt = pMgmt;
60,420✔
916
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
60,420!
917
  }
918

919
  for (int32_t v = 0; v < numOfVnodes; ++v) {
10,886✔
920
    int32_t       t = v % threadNum;
7,865✔
921
    SVnodeThread *pThread = &threads[t];
7,865✔
922
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
7,865!
923
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
7,865✔
924
    }
925
  }
926

927
  pMgmt->state.openVnodes = 0;
3,021✔
928
  dInfo("close %d vnodes with %d threads", numOfVnodes, threadNum);
3,021!
929

930
  int64_t st = taosGetTimestampMs();
3,021✔
931
  dInfo("notify all streams closed in all %d vnodes, ts:%" PRId64, numOfVnodes, st);
3,021!
932
  if (ppVnodes != NULL) {
3,021!
933
    for (int32_t i = 0; i < numOfVnodes; ++i) {
10,886✔
934
      if (ppVnodes[i] != NULL) {
7,865!
935
        if (ppVnodes[i]->pImpl != NULL) {
7,865!
936
          tqNotifyClose(ppVnodes[i]->pImpl->pTq);
7,865✔
937
        }
938
      }
939
    }
940
  }
941

942
  int64_t et = taosGetTimestampMs();
3,021✔
943
  dInfo("notify close stream completed in %d vnodes, elapsed time: %" PRId64 "ms", numOfVnodes, et - st);
3,021!
944

945
  for (int32_t t = 0; t < threadNum; ++t) {
63,441✔
946
    SVnodeThread *pThread = &threads[t];
60,420✔
947
    if (pThread->vnodeNum == 0) continue;
60,420✔
948

949
    TdThreadAttr thAttr;
950
    (void)taosThreadAttrInit(&thAttr);
7,728✔
951
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
7,728✔
952
#ifdef TD_COMPACT_OS
953
    (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
954
#endif
955
    if (taosThreadCreate(&pThread->thread, &thAttr, vmCloseVnodeInThread, pThread) != 0) {
7,728!
956
      dError("thread:%d, failed to create thread to close vnode since %s", pThread->threadIndex, strerror(ERRNO));
×
957
    }
958

959
    (void)taosThreadAttrDestroy(&thAttr);
7,728✔
960
  }
961

962
  for (int32_t t = 0; t < threadNum; ++t) {
63,441✔
963
    SVnodeThread *pThread = &threads[t];
60,420✔
964
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
60,420!
965
      (void)taosThreadJoin(pThread->thread, NULL);
7,728✔
966
      taosThreadClear(&pThread->thread);
7,728✔
967
    }
968
    taosMemoryFree(pThread->ppVnodes);
60,420!
969
  }
970
  taosMemoryFree(threads);
3,021!
971

972
  if (ppVnodes != NULL) {
3,021!
973
    taosMemoryFree(ppVnodes);
3,021!
974
  }
975

976
  if (pMgmt->runngingHash != NULL) {
3,021!
977
    taosHashCleanup(pMgmt->runngingHash);
3,021✔
978
    pMgmt->runngingHash = NULL;
3,021✔
979
  }
980

981
  void *pIter = taosHashIterate(pMgmt->closedHash, NULL);
3,021✔
982
  while (pIter) {
3,021!
983
    SVnodeObj **ppVnode = pIter;
×
984
    vmFreeVnodeObj(ppVnode);
×
985
    pIter = taosHashIterate(pMgmt->closedHash, pIter);
×
986
  }
987

988
  if (pMgmt->closedHash != NULL) {
3,021!
989
    taosHashCleanup(pMgmt->closedHash);
3,021✔
990
    pMgmt->closedHash = NULL;
3,021✔
991
  }
992

993
  pIter = taosHashIterate(pMgmt->creatingHash, NULL);
3,021✔
994
  while (pIter) {
3,021!
995
    SVnodeObj **ppVnode = pIter;
×
996
    vmFreeVnodeObj(ppVnode);
×
997
    pIter = taosHashIterate(pMgmt->creatingHash, pIter);
×
998
  }
999

1000
  if (pMgmt->creatingHash != NULL) {
3,021!
1001
    taosHashCleanup(pMgmt->creatingHash);
3,021✔
1002
    pMgmt->creatingHash = NULL;
3,021✔
1003
  }
1004

1005
#ifdef USE_MOUNT
1006
  pIter = NULL;
3,021✔
1007
  while ((pIter = taosHashIterate(pMgmt->mountTfsHash, pIter))) {
3,021!
1008
    SMountTfs *mountTfs = *(SMountTfs **)pIter;
×
1009
    if(mountTfs->pFile) {
×
1010
      (void)taosUnLockFile(mountTfs->pFile);
×
1011
      (void)taosCloseFile(&mountTfs->pFile);
×
1012
    }
1013
    tfsClose(mountTfs->pTfs);
×
1014
    taosMemoryFree(mountTfs);
×
1015
  }
1016
  taosHashCleanup(pMgmt->mountTfsHash);
3,021✔
1017
  pMgmt->mountTfsHash = NULL;
3,021✔
1018
#endif
1019

1020
  dInfo("total vnodes:%d are all closed", numOfVnodes);
3,021!
1021
}
1022

1023
static void vmCleanup(SVnodeMgmt *pMgmt) {
3,021✔
1024
  vmCloseVnodes(pMgmt);
3,021✔
1025
  vmStopWorker(pMgmt);
3,021✔
1026
  vnodeCleanup();
3,021✔
1027
  (void)taosThreadRwlockDestroy(&pMgmt->hashLock);
3,021✔
1028
  (void)taosThreadMutexDestroy(&pMgmt->mutex);
3,021✔
1029
  taosMemoryFree(pMgmt);
3,021!
1030
}
3,021✔
1031

1032
static void vmCheckSyncTimeout(SVnodeMgmt *pMgmt) {
2,642✔
1033
  int32_t     code = 0;
2,642✔
1034
  int32_t     numOfVnodes = 0;
2,642✔
1035
  SVnodeObj **ppVnodes = NULL;
2,642✔
1036
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
2,642✔
1037
  if (code != 0) {
2,642!
1038
    dError("failed to get vnode list since %s", tstrerror(code));
×
1039
    return;
×
1040
  }
1041

1042
  if (ppVnodes != NULL) {
2,642!
1043
    for (int32_t i = 0; i < numOfVnodes; ++i) {
9,309✔
1044
      SVnodeObj *pVnode = ppVnodes[i];
6,667✔
1045
      if (!pVnode->failed) {
6,667!
1046
        vnodeSyncCheckTimeout(pVnode->pImpl);
6,667✔
1047
      }
1048
      vmReleaseVnode(pMgmt, pVnode);
6,667✔
1049
    }
1050
    taosMemoryFree(ppVnodes);
2,642!
1051
  }
1052
}
1053

1054
static void *vmThreadFp(void *param) {
3,021✔
1055
  SVnodeMgmt *pMgmt = param;
3,021✔
1056
  int64_t     lastTime = 0;
3,021✔
1057
  setThreadName("vnode-timer");
3,021✔
1058

1059
  while (1) {
1,122,754✔
1060
    lastTime++;
1,125,775✔
1061
    taosMsleep(100);
1,125,775✔
1062
    if (pMgmt->stop) break;
1,125,775✔
1063
    if (lastTime % 10 != 0) continue;
1,122,754✔
1064

1065
    int64_t sec = lastTime / 10;
110,937✔
1066
    if (sec % (VNODE_TIMEOUT_SEC / 2) == 0) {
110,937✔
1067
      vmCheckSyncTimeout(pMgmt);
2,642✔
1068
    }
1069
  }
1070

1071
  return NULL;
3,021✔
1072
}
1073

1074
static int32_t vmInitTimer(SVnodeMgmt *pMgmt) {
3,021✔
1075
  int32_t      code = 0;
3,021✔
1076
  TdThreadAttr thAttr;
1077
  (void)taosThreadAttrInit(&thAttr);
3,021✔
1078
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
3,021✔
1079
#ifdef TD_COMPACT_OS
1080
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
1081
#endif
1082
  if (taosThreadCreate(&pMgmt->thread, &thAttr, vmThreadFp, pMgmt) != 0) {
3,021!
1083
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
1084
    dError("failed to create vnode timer thread since %s", tstrerror(code));
×
1085
    return code;
×
1086
  }
1087

1088
  (void)taosThreadAttrDestroy(&thAttr);
3,021✔
1089
  return 0;
3,021✔
1090
}
1091

1092
static void vmCleanupTimer(SVnodeMgmt *pMgmt) {
3,021✔
1093
  pMgmt->stop = true;
3,021✔
1094
  if (taosCheckPthreadValid(pMgmt->thread)) {
3,021!
1095
    (void)taosThreadJoin(pMgmt->thread, NULL);
3,021✔
1096
    taosThreadClear(&pMgmt->thread);
3,021✔
1097
  }
1098
}
3,021✔
1099

1100
static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
3,021✔
1101
  int32_t code = -1;
3,021✔
1102

1103
  SVnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SVnodeMgmt));
3,021!
1104
  if (pMgmt == NULL) {
3,021!
1105
    code = terrno;
×
1106
    goto _OVER;
×
1107
  }
1108

1109
  pMgmt->pData = pInput->pData;
3,021✔
1110
  pMgmt->path = pInput->path;
3,021✔
1111
  pMgmt->name = pInput->name;
3,021✔
1112
  pMgmt->msgCb = pInput->msgCb;
3,021✔
1113
  pMgmt->msgCb.putToQueueFp = (PutToQueueFp)vmPutRpcMsgToQueue;
3,021✔
1114
  pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
3,021✔
1115
  pMgmt->msgCb.mgmt = pMgmt;
3,021✔
1116

1117
  code = taosThreadRwlockInit(&pMgmt->hashLock, NULL);
3,021✔
1118
  if (code != 0) {
3,021!
1119
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
1120
    goto _OVER;
×
1121
  }
1122

1123
  code = taosThreadMutexInit(&pMgmt->mutex, NULL);
3,021✔
1124
  if (code != 0) {
3,021!
1125
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
1126
    goto _OVER;
×
1127
  }
1128

1129
  pMgmt->pTfs = pInput->pTfs;
3,021✔
1130
  if (pMgmt->pTfs == NULL) {
3,021!
1131
    dError("tfs is null.");
×
1132
    goto _OVER;
×
1133
  }
1134
#ifdef USE_MOUNT
1135
  if (!(pMgmt->mountTfsHash =
3,021!
1136
            taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK))) {
3,021✔
1137
    dError("failed to init mountTfsHash since %s", terrstr());
×
1138
    return TSDB_CODE_OUT_OF_MEMORY;
×
1139
  }
1140
  if ((code = vmOpenMountTfs(pMgmt)) != 0) {
3,021!
1141
    goto _OVER;
×
1142
  }
1143
#endif
1144
  tmsgReportStartup("vnode-tfs", "initialized");
3,021✔
1145
  if ((code = walInit(pInput->stopDnodeFp)) != 0) {
3,021!
1146
    dError("failed to init wal since %s", tstrerror(code));
×
1147
    goto _OVER;
×
1148
  }
1149

1150
  tmsgReportStartup("vnode-wal", "initialized");
3,021✔
1151

1152
  if ((code = syncInit()) != 0) {
3,021!
1153
    dError("failed to open sync since %s", tstrerror(code));
×
1154
    goto _OVER;
×
1155
  }
1156
  tmsgReportStartup("vnode-sync", "initialized");
3,021✔
1157

1158
  if ((code = vnodeInit(pInput->stopDnodeFp)) != 0) {
3,021!
1159
    dError("failed to init vnode since %s", tstrerror(code));
×
1160
    goto _OVER;
×
1161
  }
1162
  tmsgReportStartup("vnode-commit", "initialized");
3,021✔
1163

1164
  if ((code = vmStartWorker(pMgmt)) != 0) {
3,021!
1165
    dError("failed to init workers since %s", tstrerror(code));
×
1166
    goto _OVER;
×
1167
  }
1168
  tmsgReportStartup("vnode-worker", "initialized");
3,021✔
1169

1170
  if ((code = vmOpenVnodes(pMgmt)) != 0) {
3,021!
1171
    dError("failed to open all vnodes since %s", tstrerror(code));
×
1172
    goto _OVER;
×
1173
  }
1174
  tmsgReportStartup("vnode-vnodes", "initialized");
3,021✔
1175

1176
  if ((code = udfcOpen()) != 0) {
3,021!
1177
    dError("failed to open udfc in vnode since %s", tstrerror(code));
×
1178
    goto _OVER;
×
1179
  }
1180

1181
  code = 0;
3,021✔
1182

1183
_OVER:
3,021✔
1184
  if (code == 0) {
3,021!
1185
    pOutput->pMgmt = pMgmt;
3,021✔
1186
  } else {
1187
    dError("failed to init vnodes-mgmt since %s", tstrerror(code));
×
1188
    vmCleanup(pMgmt);
×
1189
  }
1190

1191
  return code;
3,021✔
1192
}
1193

1194
static int32_t vmRequire(const SMgmtInputOpt *pInput, bool *required) {
3,063✔
1195
  *required = tsNumOfSupportVnodes > 0;
3,063✔
1196
  return 0;
3,063✔
1197
}
1198

1199
static void *vmRestoreVnodeInThread(void *param) {
1,216✔
1200
  SVnodeThread *pThread = param;
1,216✔
1201
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
1,216✔
1202

1203
  dInfo("thread:%d, start to restore %d vnodes", pThread->threadIndex, pThread->vnodeNum);
1,216!
1204
  setThreadName("restore-vnodes");
1,216✔
1205

1206
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
2,448✔
1207
    SVnodeObj *pVnode = pThread->ppVnodes[v];
1,232✔
1208
    if (pVnode->failed) {
1,232!
1209
      dError("vgId:%d, cannot restore a vnode in failed mode.", pVnode->vgId);
×
1210
      continue;
×
1211
    }
1212

1213
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
1,232✔
1214
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been restored", pVnode->vgId,
1,232✔
1215
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
1216
    tmsgReportStartup("vnode-restore", stepDesc);
1,232✔
1217

1218
    int32_t code = vnodeStart(pVnode->pImpl);
1,232✔
1219
    if (code != 0) {
1,232!
1220
      dError("vgId:%d, failed to restore vnode by thread:%d", pVnode->vgId, pThread->threadIndex);
×
1221
      pThread->failed++;
×
1222
    } else {
1223
      dInfo("vgId:%d, is restored by thread:%d", pVnode->vgId, pThread->threadIndex);
1,232!
1224
      pThread->opened++;
1,232✔
1225
      (void)atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
1,232✔
1226
    }
1227
  }
1228

1229
  dInfo("thread:%d, numOfVnodes:%d, restored:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
1,216!
1230
        pThread->failed);
1231
  return NULL;
1,216✔
1232
}
1233

1234
static int32_t vmStartVnodes(SVnodeMgmt *pMgmt) {
3,021✔
1235
  int32_t     code = 0;
3,021✔
1236
  int32_t     numOfVnodes = 0;
3,021✔
1237
  SVnodeObj **ppVnodes = NULL;
3,021✔
1238
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
3,021✔
1239
  if (code != 0) {
3,021!
1240
    dError("failed to get vnode list since %s", tstrerror(code));
×
1241
    return code;
×
1242
  }
1243

1244
  int32_t threadNum = tsNumOfCores / 2;
3,021✔
1245
  if (threadNum < 1) threadNum = 1;
3,021!
1246
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
3,021✔
1247

1248
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
3,021!
1249
  if (threads == NULL) {
3,021!
1250
    return terrno;
×
1251
  }
1252

1253
  for (int32_t t = 0; t < threadNum; ++t) {
63,441✔
1254
    threads[t].threadIndex = t;
60,420✔
1255
    threads[t].pMgmt = pMgmt;
60,420✔
1256
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
60,420!
1257
    if (threads[t].ppVnodes == NULL) {
60,420!
1258
      code = terrno;
×
1259
      break;
×
1260
    }
1261
  }
1262

1263
  for (int32_t v = 0; v < numOfVnodes; ++v) {
4,253✔
1264
    int32_t       t = v % threadNum;
1,232✔
1265
    SVnodeThread *pThread = &threads[t];
1,232✔
1266
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
1,232!
1267
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
1,232✔
1268
    }
1269
  }
1270

1271
  pMgmt->state.openVnodes = 0;
3,021✔
1272
  pMgmt->state.dropVnodes = 0;
3,021✔
1273
  dInfo("restore %d vnodes with %d threads", numOfVnodes, threadNum);
3,021!
1274

1275
  for (int32_t t = 0; t < threadNum; ++t) {
63,441✔
1276
    SVnodeThread *pThread = &threads[t];
60,420✔
1277
    if (pThread->vnodeNum == 0) continue;
60,420✔
1278

1279
    TdThreadAttr thAttr;
1280
    (void)taosThreadAttrInit(&thAttr);
1,216✔
1281
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,216✔
1282
    if (taosThreadCreate(&pThread->thread, &thAttr, vmRestoreVnodeInThread, pThread) != 0) {
1,216!
1283
      dError("thread:%d, failed to create thread to restore vnode since %s", pThread->threadIndex, strerror(ERRNO));
×
1284
    }
1285

1286
    (void)taosThreadAttrDestroy(&thAttr);
1,216✔
1287
  }
1288

1289
  for (int32_t t = 0; t < threadNum; ++t) {
63,441✔
1290
    SVnodeThread *pThread = &threads[t];
60,420✔
1291
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
60,420!
1292
      (void)taosThreadJoin(pThread->thread, NULL);
1,216✔
1293
      taosThreadClear(&pThread->thread);
1,216✔
1294
    }
1295
    taosMemoryFree(pThread->ppVnodes);
60,420!
1296
  }
1297
  taosMemoryFree(threads);
3,021!
1298

1299
  for (int32_t i = 0; i < numOfVnodes; ++i) {
4,253✔
1300
    if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
1,232!
1301
    vmReleaseVnode(pMgmt, ppVnodes[i]);
1,232✔
1302
  }
1303

1304
  if (ppVnodes != NULL) {
3,021!
1305
    taosMemoryFree(ppVnodes);
3,021!
1306
  }
1307

1308
  return vmInitTimer(pMgmt);
3,021✔
1309

1310
_exit:
1311
  for (int32_t t = 0; t < threadNum; ++t) {
1312
    SVnodeThread *pThread = &threads[t];
1313
    taosMemoryFree(pThread->ppVnodes);
1314
  }
1315
  taosMemoryFree(threads);
1316
  return code;
1317
}
1318

1319
static void vmStop(SVnodeMgmt *pMgmt) { vmCleanupTimer(pMgmt); }
3,021✔
1320

1321
SMgmtFunc vmGetMgmtFunc() {
3,063✔
1322
  SMgmtFunc mgmtFunc = {0};
3,063✔
1323
  mgmtFunc.openFp = vmInit;
3,063✔
1324
  mgmtFunc.closeFp = (NodeCloseFp)vmCleanup;
3,063✔
1325
  mgmtFunc.startFp = (NodeStartFp)vmStartVnodes;
3,063✔
1326
  mgmtFunc.stopFp = (NodeStopFp)vmStop;
3,063✔
1327
  mgmtFunc.requiredFp = vmRequire;
3,063✔
1328
  mgmtFunc.getHandlesFp = vmGetMsgHandles;
3,063✔
1329

1330
  return mgmtFunc;
3,063✔
1331
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc