• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4788

14 Oct 2025 11:21AM UTC coverage: 60.992% (-2.3%) from 63.264%
#4788

push

travis-ci

web-flow
Merge 7ca9b50f9 into 19574fe21

154868 of 324306 branches covered (47.75%)

Branch coverage included in aggregate %.

207304 of 269498 relevant lines covered (76.92%)

125773493.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.09
/source/dnode/mgmt/mgmt_vnode/src/vmInt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "vmInt.h"
18
#include "libs/function/tudf.h"
19
#include "osMemory.h"
20
#include "tfs.h"
21
#include "vnd.h"
22

23
int32_t vmGetPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
2,929,054✔
24
  int32_t    diskId = -1;
2,929,054✔
25
  SVnodeObj *pVnode = NULL;
2,929,054✔
26

27
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
2,926,981✔
28
  int32_t r = taosHashGetDup(pMgmt->runngingHash, &vgId, sizeof(int32_t), (void *)&pVnode);
2,934,769✔
29
  if (pVnode != NULL) {
2,934,107!
30
    diskId = pVnode->diskPrimary;
×
31
  }
32
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
2,934,107✔
33
  return diskId;
2,934,080✔
34
}
35

36
static void vmFreeVnodeObj(SVnodeObj **ppVnode) {
7,699,262✔
37
  if (!ppVnode || !(*ppVnode)) return;
7,699,262!
38

39
  SVnodeObj *pVnode = *ppVnode;
7,699,262✔
40

41
  int32_t refCount = atomic_load_32(&pVnode->refCount);
7,699,262✔
42
  while (refCount > 0) {
7,699,445✔
43
    dWarn("vgId:%d, vnode is refenced, retry to free in 200ms, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
183!
44
    taosMsleep(200);
183✔
45
    refCount = atomic_load_32(&pVnode->refCount);
183✔
46
  }
47

48
  taosMemoryFree(pVnode->path);
7,699,262!
49
  taosMemoryFree(pVnode);
7,698,376!
50
  ppVnode[0] = NULL;
7,699,084✔
51
}
52

53
static int32_t vmRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId, int32_t diskId) {
2,935,176✔
54
  int32_t    code = 0;
2,935,176✔
55
  SVnodeObj *pCreatingVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
2,935,176!
56
  if (pCreatingVnode == NULL) {
2,935,176!
57
    dError("failed to alloc vnode since %s", terrstr());
×
58
    return terrno;
×
59
  }
60
  (void)memset(pCreatingVnode, 0, sizeof(SVnodeObj));
2,935,176!
61

62
  pCreatingVnode->vgId = vgId;
2,935,176✔
63
  pCreatingVnode->diskPrimary = diskId;
2,935,176✔
64

65
  code = taosThreadRwlockWrlock(&pMgmt->hashLock);
2,935,176✔
66
  if (code != 0) {
2,935,176!
67
    taosMemoryFree(pCreatingVnode);
×
68
    return code;
×
69
  }
70

71
  dTrace("vgId:%d, put vnode into creating hash, pCreatingVnode:%p", vgId, pCreatingVnode);
2,935,176✔
72
  code = taosHashPut(pMgmt->creatingHash, &vgId, sizeof(int32_t), &pCreatingVnode, sizeof(SVnodeObj *));
2,935,176✔
73
  if (code != 0) {
2,935,176!
74
    dError("vgId:%d, failed to put vnode to creatingHash", vgId);
×
75
    taosMemoryFree(pCreatingVnode);
×
76
  }
77

78
  int32_t r = taosThreadRwlockUnlock(&pMgmt->hashLock);
2,935,176✔
79
  if (r != 0) {
2,935,176!
80
    dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r));
×
81
  }
82

83
  return code;
2,935,176✔
84
}
85

86
static void vmUnRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId) {
2,936,480✔
87
  SVnodeObj *pOld = NULL;
2,936,480✔
88

89
  (void)taosThreadRwlockWrlock(&pMgmt->hashLock);
2,936,480✔
90
  int32_t r = taosHashGetDup(pMgmt->creatingHash, &vgId, sizeof(int32_t), (void *)&pOld);
2,936,480✔
91
  if (r != 0) {
2,936,480!
92
    dError("vgId:%d, failed to get vnode from creating Hash", vgId);
×
93
  }
94
  dTrace("vgId:%d, remove from creating Hash", vgId);
2,936,480✔
95
  r = taosHashRemove(pMgmt->creatingHash, &vgId, sizeof(int32_t));
2,936,480✔
96
  if (r != 0) {
2,936,480✔
97
    dError("vgId:%d, failed to remove vnode from creatingHash", vgId);
1,304!
98
  }
99
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
2,936,480✔
100

101
  if (pOld) {
2,936,480✔
102
    dTrace("vgId:%d, free vnode pOld:%p", vgId, &pOld);
2,935,176✔
103
    vmFreeVnodeObj(&pOld);
2,935,176✔
104
  }
105

106
_OVER:
1,304✔
107
  if (r != 0) {
2,936,302✔
108
    dError("vgId:%d, failed to remove vnode from creatingHash since %s", vgId, tstrerror(r));
1,304!
109
  }
110
}
2,936,302✔
111

112
int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
2,926,282✔
113
  int32_t code = 0;
2,926,282✔
114
  STfs   *pTfs = pMgmt->pTfs;
2,926,282✔
115
  int32_t diskId = 0;
2,922,785✔
116
  if (!pTfs) {
2,922,785!
117
    return diskId;
×
118
  }
119

120
  // search fs
121
  char vnodePath[TSDB_FILENAME_LEN] = {0};
2,922,785✔
122
  snprintf(vnodePath, TSDB_FILENAME_LEN - 1, "vnode%svnode%d", TD_DIRSEP, vgId);
2,924,535!
123
  char fname[TSDB_FILENAME_LEN] = {0};
2,924,535✔
124
  char fnameTmp[TSDB_FILENAME_LEN] = {0};
2,931,025✔
125
  snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME);
2,920,533!
126
  snprintf(fnameTmp, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME_TMP);
2,920,533!
127

128
  diskId = tfsSearch(pTfs, 0, fname);
2,920,533✔
129
  if (diskId >= 0) {
2,926,319!
130
    return diskId;
×
131
  }
132
  diskId = tfsSearch(pTfs, 0, fnameTmp);
2,926,319✔
133
  if (diskId >= 0) {
2,934,346!
134
    return diskId;
×
135
  }
136

137
  // alloc
138
  int32_t     disks[TFS_MAX_DISKS_PER_TIER] = {0};
2,934,346✔
139
  int32_t     numOfVnodes = 0;
2,934,346✔
140
  SVnodeObj **ppVnodes = NULL;
2,934,438✔
141

142
  code = taosThreadMutexLock(&pMgmt->mutex);
2,924,734✔
143
  if (code != 0) {
2,935,176!
144
    return code;
×
145
  }
146

147
  code = vmGetAllVnodeListFromHashWithCreating(pMgmt, &numOfVnodes, &ppVnodes);
2,935,176✔
148
  if (code != 0) {
2,935,176!
149
    int32_t r = taosThreadMutexUnlock(&pMgmt->mutex);
×
150
    if (r != 0) {
×
151
      dError("vgId:%d, failed to unlock mutex since %s", vgId, tstrerror(r));
×
152
    }
153
    return code;
×
154
  }
155

156
  for (int32_t v = 0; v < numOfVnodes; v++) {
10,289,998✔
157
    SVnodeObj *pVnode = ppVnodes[v];
7,354,822✔
158
    disks[pVnode->diskPrimary] += 1;
7,354,822✔
159
  }
160

161
  int32_t minVal = INT_MAX;
2,935,176✔
162
  int32_t ndisk = tfsGetDisksAtLevel(pTfs, 0);
2,935,176✔
163
  diskId = 0;
2,935,176✔
164
  for (int32_t id = 0; id < ndisk; id++) {
6,004,918✔
165
    if (minVal > disks[id]) {
3,069,742✔
166
      minVal = disks[id];
2,958,578✔
167
      diskId = id;
2,958,578✔
168
    }
169
  }
170
  code = vmRegisterCreatingState(pMgmt, vgId, diskId);
2,935,176✔
171
  if (code != 0) {
2,935,176!
172
    int32_t r = taosThreadMutexUnlock(&pMgmt->mutex);
×
173
    if (r != 0) {
×
174
      dError("vgId:%d, failed to unlock mutex since %s", vgId, tstrerror(r));
×
175
    }
176
    goto _OVER;
×
177
  }
178

179
  code = taosThreadMutexUnlock(&pMgmt->mutex);
2,935,176✔
180
  if (code != 0) {
2,935,176!
181
    goto _OVER;
×
182
  }
183

184
_OVER:
2,935,176✔
185

186
  for (int32_t i = 0; i < numOfVnodes; ++i) {
10,289,998✔
187
    if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
7,354,822!
188
    vmReleaseVnode(pMgmt, ppVnodes[i]);
7,354,822✔
189
  }
190
  if (ppVnodes != NULL) {
2,935,176!
191
    taosMemoryFree(ppVnodes);
2,935,176!
192
  }
193

194
  if (code != 0) {
2,935,176!
195
    dError("vgId:%d, failed to alloc disk since %s", vgId, tstrerror(code));
×
196
    return code;
×
197
  } else {
198
    dInfo("vgId:%d, alloc disk:%d of level 0. ndisk:%d, vnodes: %d", vgId, diskId, ndisk, numOfVnodes);
2,935,176!
199
    return diskId;
2,935,176✔
200
  }
201
}
202

203
void vmCleanPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) { vmUnRegisterCreatingState(pMgmt, vgId); }
2,936,480✔
204

205
SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict) {
1,841,699,806✔
206
  SVnodeObj *pVnode = NULL;
1,841,699,806✔
207

208
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
1,841,731,494✔
209
  int32_t r = taosHashGetDup(pMgmt->runngingHash, &vgId, sizeof(int32_t), (void *)&pVnode);
1,842,070,376✔
210
  if (pVnode == NULL || strict && (pVnode->dropped || pVnode->failed)) {
1,841,962,261!
211
    terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
12,975,189✔
212
    dDebug("vgId:%d, acquire vnode failed.", vgId);
12,974,569✔
213
    pVnode = NULL;
12,974,480✔
214
  } else {
215
    int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
1,829,039,046✔
216
    dTrace("vgId:%d, acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
1,829,065,452✔
217
  }
218
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
1,842,039,932✔
219

220
  return pVnode;
1,841,949,279✔
221
}
222

223
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) { return vmAcquireVnodeImpl(pMgmt, vgId, true); }
1,837,303,905✔
224

225
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
1,860,849,637✔
226
  if (pVnode == NULL) return;
1,860,849,637✔
227

228
  //(void)taosThreadRwlockRdlock(&pMgmt->lock);
229
  int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
1,860,848,333✔
230
  dTrace("vgId:%d, release vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
1,860,926,997✔
231
  //(void)taosThreadRwlockUnlock(&pMgmt->lock);
232
}
233

234
static int32_t vmRegisterRunningState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
4,049,180✔
235
  SVnodeObj *pOld = NULL;
4,049,180✔
236
  dInfo("vgId:%d, put vnode into running hash", pVnode->vgId);
4,049,180!
237

238
  int32_t r = taosHashGetDup(pMgmt->runngingHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
4,049,180✔
239
  if (r != 0) {
4,049,180!
240
    dError("vgId:%d, failed to get vnode from hash", pVnode->vgId);
×
241
  }
242
  if (pOld) {
4,049,180!
243
    vmFreeVnodeObj(&pOld);
×
244
  }
245
  int32_t code = taosHashPut(pMgmt->runngingHash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
4,049,180✔
246

247
  return code;
4,049,180✔
248
}
249

250
static void vmUnRegisterRunningState(SVnodeMgmt *pMgmt, int32_t vgId) {
4,049,180✔
251
  dInfo("vgId:%d, remove from hash", vgId);
4,049,180!
252
  int32_t r = taosHashRemove(pMgmt->runngingHash, &vgId, sizeof(int32_t));
4,049,180✔
253
  if (r != 0) {
4,049,180!
254
    dError("vgId:%d, failed to remove vnode since %s", vgId, tstrerror(r));
×
255
  }
256
}
4,049,180✔
257

258
static int32_t vmRegisterClosedState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
714,906✔
259
  int32_t    code = 0;
714,906✔
260
  dInfo("vgId:%d, put into closed hash", pVnode->vgId);
714,906!
261
  SVnodeObj *pClosedVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
714,906!
262
  if (pClosedVnode == NULL) {
714,906!
263
    dError("failed to alloc vnode since %s", terrstr());
×
264
    return terrno;
×
265
  }
266
  (void)memset(pClosedVnode, 0, sizeof(SVnodeObj));
714,906!
267

268
  pClosedVnode->vgId = pVnode->vgId;
714,906✔
269
  pClosedVnode->dropped = pVnode->dropped;
714,906✔
270
  pClosedVnode->vgVersion = pVnode->vgVersion;
714,906✔
271
  pClosedVnode->diskPrimary = pVnode->diskPrimary;
714,906✔
272
  pClosedVnode->toVgId = pVnode->toVgId;
714,906✔
273
  pClosedVnode->mountId = pVnode->mountId;
714,906✔
274

275
  SVnodeObj *pOld = NULL;
714,906✔
276
  int32_t    r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
714,906✔
277
  if (r != 0) {
714,906!
278
    dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
×
279
  }
280
  if (pOld) {
714,906!
281
    vmFreeVnodeObj(&pOld);
×
282
  }
283
  dInfo("vgId:%d, put vnode to closedHash", pVnode->vgId);
714,906!
284
  r = taosHashPut(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), &pClosedVnode, sizeof(SVnodeObj *));
714,906✔
285
  if (r != 0) {
714,906!
286
    dError("vgId:%d, failed to put vnode to closedHash", pVnode->vgId);
×
287
  }
288

289
  return code;
714,906✔
290
}
291

292
static void vmUnRegisterClosedState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
4,049,180✔
293
  SVnodeObj *pOld = NULL;
4,049,180✔
294
  dInfo("vgId:%d, remove from closed hash", pVnode->vgId);
4,049,180!
295
  int32_t r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
4,049,180✔
296
  if (r != 0) {
4,049,180!
297
    dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
×
298
  }
299
  if (pOld != NULL) {
4,049,180✔
300
    vmFreeVnodeObj(&pOld);
714,906✔
301
    dInfo("vgId:%d, remove from closedHash", pVnode->vgId);
714,906!
302
    r = taosHashRemove(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t));
714,906✔
303
    if (r != 0) {
714,906!
304
      dError("vgId:%d, failed to remove vnode from hash", pVnode->vgId);
×
305
    }
306
  }
307
}
4,049,180✔
308
#ifdef USE_MOUNT
309
int32_t vmAcquireMountTfs(SVnodeMgmt *pMgmt, int64_t mountId, const char *mountName, const char *mountPath,
2,596✔
310
                          STfs **ppTfs) {
311
  int32_t    code = 0, lino = 0;
2,596✔
312
  TdFilePtr  pFile = NULL;
2,596✔
313
  SArray    *pDisks = NULL;
2,596✔
314
  SMountTfs *pMountTfs = NULL;
2,596✔
315
  bool       unlock = false;
2,596✔
316

317
  pMountTfs = taosHashGet(pMgmt->mountTfsHash, &mountId, sizeof(mountId));
2,596✔
318
  if (pMountTfs && *(SMountTfs **)pMountTfs) {
2,596!
319
    if (!(*ppTfs = (*(SMountTfs **)pMountTfs)->pTfs)) {
1,292!
320
      TAOS_CHECK_EXIT(TSDB_CODE_INTERNAL_ERROR);
×
321
    }
322
    (void)atomic_add_fetch_32(&(*(SMountTfs **)pMountTfs)->nRef, 1);
1,292✔
323
    TAOS_RETURN(code);
1,292✔
324
  }
325
  if (!mountPath || mountPath[0] == 0 || mountId == 0) {
1,304!
326
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_PARA);
×
327
  }
328
  (void)(taosThreadMutexLock(&pMgmt->mutex));
1,304✔
329
  unlock = true;
1,304✔
330
  pMountTfs = taosHashGet(pMgmt->mountTfsHash, &mountId, sizeof(mountId));
1,304✔
331
  if (pMountTfs && *(SMountTfs **)pMountTfs) {
1,304!
332
    if (!(*ppTfs = (*(SMountTfs **)pMountTfs)->pTfs)) {
978!
333
      TAOS_CHECK_EXIT(TSDB_CODE_INTERNAL_ERROR);
×
334
    }
335
    (void)taosThreadMutexUnlock(&pMgmt->mutex);
978✔
336
    (void)atomic_add_fetch_32(&(*(SMountTfs **)pMountTfs)->nRef, 1);
978✔
337
    TAOS_RETURN(code);
978✔
338
  }
339

340
  TAOS_CHECK_EXIT(vmMountCheckRunning(mountName, mountPath, &pFile, 3));
326!
341
  TAOS_CHECK_EXIT(vmGetMountDisks(pMgmt, mountPath, &pDisks));
326!
342
  int32_t numOfDisks = taosArrayGetSize(pDisks);
326✔
343
  if (numOfDisks <= 0) {
326!
344
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
345
  }
346
  TSDB_CHECK_NULL((pMountTfs = taosMemoryCalloc(1, sizeof(SMountTfs))), code, lino, _exit, terrno);
326!
347
  if (mountName) (void)snprintf(pMountTfs->name, sizeof(pMountTfs->name), "%s", mountName);
326!
348
  if (mountPath) (void)snprintf(pMountTfs->path, sizeof(pMountTfs->path), "%s", mountPath);
326!
349
  pMountTfs->pFile = pFile;
326✔
350
  atomic_store_32(&pMountTfs->nRef, 2);  // init and acquire
326✔
351
  TAOS_CHECK_EXIT(tfsOpen(TARRAY_GET_ELEM(pDisks, 0), numOfDisks, &pMountTfs->pTfs));
326!
352
  TAOS_CHECK_EXIT(taosHashPut(pMgmt->mountTfsHash, &mountId, sizeof(mountId), &pMountTfs, POINTER_BYTES));
326!
353
_exit:
326✔
354
  if (unlock) {
326!
355
    (void)taosThreadMutexUnlock(&pMgmt->mutex);
326✔
356
  }
357
  taosArrayDestroy(pDisks);
326✔
358
  if (code != 0) {
326!
359
    dError("mount:%" PRIi64 ",%s, failed at line %d to get mount tfs since %s", mountId, mountPath ? mountPath : "NULL",
×
360
           lino, tstrerror(code));
361
    if (pFile) {
×
362
      (void)taosUnLockFile(pFile);
×
363
      (void)taosCloseFile(&pFile);
×
364
    }
365
    if (pMountTfs) {
×
366
      tfsClose(pMountTfs->pTfs);
×
367
      taosMemoryFree(pMountTfs);
×
368
    }
369
    *ppTfs = NULL;
×
370
  } else {
371
    *ppTfs = pMountTfs->pTfs;
326✔
372
  }
373

374
  TAOS_RETURN(code);
326✔
375
}
376
#endif
377

378
bool vmReleaseMountTfs(SVnodeMgmt *pMgmt, int64_t mountId, int32_t minRef) {
2,596✔
379
#ifdef USE_MOUNT
380
  SMountTfs *pMountTfs = NULL;
2,596✔
381
  int32_t    nRef = INT32_MAX, code = 0;
2,596✔
382

383
  pMountTfs = taosHashGet(pMgmt->mountTfsHash, &mountId, sizeof(mountId));
2,596✔
384
  if (pMountTfs && *(SMountTfs **)pMountTfs) {
2,596!
385
    if ((nRef = atomic_sub_fetch_32(&(*(SMountTfs **)pMountTfs)->nRef, 1)) <= minRef) {
2,596✔
386
      (void)(taosThreadMutexLock(&pMgmt->mutex));
323✔
387
      SMountTfs *pTmp = taosHashGet(pMgmt->mountTfsHash, &mountId, sizeof(mountId));
323✔
388
      if (pTmp && *(SMountTfs **)pTmp) {
323!
389
        dInfo("mount:%" PRIi64 ", ref:%d, release mount tfs", mountId, nRef);
323!
390
        tfsClose((*(SMountTfs **)pTmp)->pTfs);
323✔
391
        if ((*(SMountTfs **)pTmp)->pFile) {
323!
392
          (void)taosUnLockFile((*(SMountTfs **)pTmp)->pFile);
323✔
393
          (void)taosCloseFile(&(*(SMountTfs **)pTmp)->pFile);
323✔
394
        }
395
        taosMemoryFree(*(SMountTfs **)pTmp);
323!
396
        if ((code = taosHashRemove(pMgmt->mountTfsHash, &mountId, sizeof(mountId))) < 0) {
323!
397
          dError("failed at line %d to remove mountId:%" PRIi64 " from mount tfs hash", __LINE__, mountId);
×
398
        }
399
      }
400
      (void)taosThreadMutexUnlock(&pMgmt->mutex);
323✔
401
      return true;
323✔
402
    }
403
  }
404
#endif
405
  return false;
2,273✔
406
}
407

408
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
4,047,820✔
409
  SVnodeObj *pVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
4,047,820!
410
  if (pVnode == NULL) {
4,049,180!
411
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
412
    return -1;
×
413
  }
414

415
  pVnode->vgId = pCfg->vgId;
4,049,180✔
416
  pVnode->vgVersion = pCfg->vgVersion;
4,049,180✔
417
  pVnode->diskPrimary = pCfg->diskPrimary;
4,049,180✔
418
  pVnode->mountId = pCfg->mountId;
4,049,180✔
419
  pVnode->refCount = 0;
4,049,180✔
420
  pVnode->dropped = 0;
4,049,180✔
421
  pVnode->failed = 0;
4,048,740✔
422
  pVnode->path = taosStrdup(pCfg->path);
4,049,180!
423
  pVnode->pImpl = pImpl;
4,049,180✔
424

425
  if (pVnode->path == NULL) {
4,049,180!
426
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
427
    taosMemoryFree(pVnode);
×
428
    return -1;
×
429
  }
430

431
  if (pImpl) {
4,049,180!
432
    if (vmAllocQueue(pMgmt, pVnode) != 0) {
4,049,180!
433
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
434
      taosMemoryFree(pVnode->path);
×
435
      taosMemoryFree(pVnode);
×
436
      return -1;
×
437
    }
438
  } else {
439
    pVnode->failed = 1;
×
440
  }
441

442
  (void)taosThreadRwlockWrlock(&pMgmt->hashLock);
4,049,180✔
443
  int32_t code = vmRegisterRunningState(pMgmt, pVnode);
4,049,180✔
444
  vmUnRegisterClosedState(pMgmt, pVnode);
4,049,180✔
445
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
4,049,180✔
446

447
  TAOS_RETURN(code);
4,049,180✔
448
}
449

450
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal, bool keepClosed) {
4,048,977✔
451
  char path[TSDB_FILENAME_LEN] = {0};
4,048,977✔
452
  bool atExit = true;
4,049,180✔
453

454
  if (pVnode->pImpl && vnodeIsLeader(pVnode->pImpl)) {
4,049,180✔
455
    vnodeProposeCommitOnNeed(pVnode->pImpl, atExit);
3,169,774✔
456
  }
457

458
  (void)taosThreadRwlockWrlock(&pMgmt->hashLock);
4,048,607✔
459
  vmUnRegisterRunningState(pMgmt, pVnode->vgId);
4,049,180✔
460
  if (keepClosed) {
4,049,180✔
461
    if (vmRegisterClosedState(pMgmt, pVnode) != 0) {
714,906!
462
      (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
463
      return;
×
464
    };
465
  }
466
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
4,049,180✔
467

468
  vmReleaseVnode(pMgmt, pVnode);
4,049,180✔
469

470
  if (pVnode->failed) {
4,049,180!
471
    goto _closed;
×
472
  }
473
  dInfo("vgId:%d, pre close", pVnode->vgId);
4,049,180!
474
  vnodePreClose(pVnode->pImpl);
4,049,180✔
475

476
  dInfo("vgId:%d, wait for vnode ref become 0", pVnode->vgId);
4,049,180!
477
  while (pVnode->refCount > 0) taosMsleep(10);
4,049,180!
478

479
  dInfo("vgId:%d, wait for vnode write queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteW.queue,
4,049,180!
480
        taosQueueGetThreadId(pVnode->pWriteW.queue));
481
  tMultiWorkerCleanup(&pVnode->pWriteW);
4,049,180✔
482

483
  dInfo("vgId:%d, wait for vnode sync queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue,
4,049,180!
484
        taosQueueGetThreadId(pVnode->pSyncW.queue));
485
  tMultiWorkerCleanup(&pVnode->pSyncW);
4,049,180✔
486

487
  dInfo("vgId:%d, wait for vnode sync rd queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncRdW.queue,
4,049,180!
488
        taosQueueGetThreadId(pVnode->pSyncRdW.queue));
489
  tMultiWorkerCleanup(&pVnode->pSyncRdW);
4,049,180✔
490

491
  dInfo("vgId:%d, wait for vnode apply queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue,
4,049,180!
492
        taosQueueGetThreadId(pVnode->pApplyW.queue));
493
  tMultiWorkerCleanup(&pVnode->pApplyW);
4,049,180✔
494

495
  dInfo("vgId:%d, wait for vnode fetch queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
4,049,180!
496
        taosQueueGetThreadId(pVnode->pFetchQ));
497
  while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
4,049,180!
498

499
  dInfo("vgId:%d, wait for vnode query queue:%p is empty", pVnode->vgId, pVnode->pQueryQ);
4,049,180!
500
  while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
4,049,180!
501

502
  dInfo("vgId:%d, wait for vnode stream reader queue:%p is empty", pVnode->vgId, pVnode->pStreamReaderQ);
4,049,180!
503
  while (!taosQueueEmpty(pVnode->pStreamReaderQ)) taosMsleep(10);
4,049,180!
504

505
  dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId);
4,049,180!
506

507
  dInfo("vgId:%d, post close", pVnode->vgId);
4,049,180!
508
  vnodePostClose(pVnode->pImpl);
4,049,180✔
509

510
  vmFreeQueue(pMgmt, pVnode);
4,046,578✔
511

512
  if (commitAndRemoveWal) {
4,049,069✔
513
    dInfo("vgId:%d, commit data for vnode split", pVnode->vgId);
8,565!
514
    if (vnodeSyncCommit(pVnode->pImpl) != 0) {
8,565!
515
      dError("vgId:%d, failed to commit data", pVnode->vgId);
×
516
    }
517
    if (vnodeBegin(pVnode->pImpl) != 0) {
8,565!
518
      dError("vgId:%d, failed to begin", pVnode->vgId);
×
519
    }
520
    dInfo("vgId:%d, commit data finished", pVnode->vgId);
8,565!
521
  }
522

523
  int32_t nodeId = vnodeNodeId(pVnode->pImpl);
4,049,069✔
524
  vnodeClose(pVnode->pImpl);
4,049,180✔
525
  pVnode->pImpl = NULL;
4,048,987✔
526

527
_closed:
4,048,045✔
528
  dInfo("vgId:%d, vnode is closed", pVnode->vgId);
4,048,045!
529

530
  if (commitAndRemoveWal) {
4,049,180✔
531
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d%swal", TD_DIRSEP, pVnode->vgId, TD_DIRSEP);
8,565!
532
    dInfo("vgId:%d, remove all wals, path:%s", pVnode->vgId, path);
8,565!
533
    if (tfsRmdir(pMgmt->pTfs, path) != 0) {
8,565!
534
      dTrace("vgId:%d, failed to remove wals, path:%s", pVnode->vgId, path);
×
535
    }
536
    if (tfsMkdir(pMgmt->pTfs, path) != 0) {
8,565!
537
      dTrace("vgId:%d, failed to create wals, path:%s", pVnode->vgId, path);
×
538
    }
539
  }
540

541
  if (pVnode->dropped) {
4,049,180✔
542
    dInfo("vgId:%d, vnode is destroyed, dropped:%d", pVnode->vgId, pVnode->dropped);
1,569,136!
543
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId);
1,569,136!
544
    vnodeDestroy(pVnode->vgId, path, pMgmt->pTfs, nodeId);
1,569,136✔
545
  }
546
  if (pVnode->mountId && vmReleaseMountTfs(pMgmt, pVnode->mountId, pVnode->dropped ? 1 : 0)) {
4,049,180✔
547
    if (vmWriteMountListToFile(pMgmt) != 0) {
323!
548
      dError("vgId:%d, failed at line %d to write mount list since %s", pVnode->vgId, __LINE__, terrstr());
×
549
    }
550
  }
551

552
  vmFreeVnodeObj(&pVnode);
4,049,180✔
553
}
554

555
void vmCloseFailedVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
×
556
  int32_t r = 0;
×
557
  r = taosThreadRwlockWrlock(&pMgmt->hashLock);
×
558
  if (r != 0) {
×
559
    dError("vgId:%d, failed to lock since %s", vgId, tstrerror(r));
×
560
  }
561
  if (r == 0) {
×
562
    vmUnRegisterRunningState(pMgmt, vgId);
×
563
  }
564
  r = taosThreadRwlockUnlock(&pMgmt->hashLock);
×
565
  if (r != 0) {
×
566
    dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r));
×
567
  }
568
}
×
569

570
static int32_t vmRestoreVgroupId(SWrapperCfg *pCfg, STfs *pTfs) {
×
571
  int32_t srcVgId = pCfg->vgId;
×
572
  int32_t dstVgId = pCfg->toVgId;
×
573
  if (dstVgId == 0) return 0;
×
574

575
  char srcPath[TSDB_FILENAME_LEN];
×
576
  char dstPath[TSDB_FILENAME_LEN];
×
577

578
  snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
×
579
  snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
×
580

581
  int32_t diskPrimary = pCfg->diskPrimary;
×
582
  int32_t vgId = vnodeRestoreVgroupId(srcPath, dstPath, srcVgId, dstVgId, diskPrimary, pTfs);
×
583
  if (vgId <= 0) {
×
584
    dError("vgId:%d, failed to restore vgroup id. srcPath: %s", pCfg->vgId, srcPath);
×
585
    return -1;
×
586
  }
587

588
  pCfg->vgId = vgId;
×
589
  pCfg->toVgId = 0;
×
590
  return 0;
×
591
}
592

593
static void *vmOpenVnodeInThread(void *param) {
386,333✔
594
  SVnodeThread *pThread = param;
386,333✔
595
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
386,333✔
596
  char          path[TSDB_FILENAME_LEN];
385,833✔
597

598
  dInfo("thread:%d, start to open or destroy %d vnodes", pThread->threadIndex, pThread->vnodeNum);
386,333!
599
  setThreadName("open-vnodes");
386,333✔
600

601
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
775,562✔
602
    SWrapperCfg *pCfg = &pThread->pCfgs[v];
389,229✔
603
    if (pCfg->dropped) {
389,229!
604
      char stepDesc[TSDB_STEP_DESC_LEN] = {0};
×
605
      snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to destroy, %d of %d have been dropped", pCfg->vgId,
×
606
               pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
607
      tmsgReportStartup("vnode-destroy", stepDesc);
×
608

609
      snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
×
610
      vnodeDestroy(pCfg->vgId, path, pMgmt->pTfs, 0);
×
611
      pThread->updateVnodesList = true;
×
612
      pThread->dropped++;
×
613
      (void)atomic_add_fetch_32(&pMgmt->state.dropVnodes, 1);
×
614
      continue;
×
615
    }
616

617
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
389,229✔
618
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
389,229!
619
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
620
    tmsgReportStartup("vnode-open", stepDesc);
389,229✔
621

622
    if (pCfg->toVgId) {
389,229!
623
      if (vmRestoreVgroupId(pCfg, pMgmt->pTfs) != 0) {
×
624
        dError("vgId:%d, failed to restore vgroup id by thread:%d", pCfg->vgId, pThread->threadIndex);
×
625
        pThread->failed++;
×
626
        continue;
×
627
      }
628
      pThread->updateVnodesList = true;
×
629
    }
630

631
    int32_t diskPrimary = pCfg->mountId == 0 ? pCfg->diskPrimary : 0;
389,229✔
632
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
389,229!
633

634
    STfs *pMountTfs = NULL;
389,229✔
635
#ifdef USE_MOUNT
636
    bool releaseTfs = false;
389,229✔
637
    if (pCfg->mountId) {
389,229✔
638
      if (vmAcquireMountTfs(pMgmt, pCfg->mountId, NULL, NULL, &pMountTfs) != 0) {
1,292!
639
        dError("vgId:%d, failed to get mount tfs by thread:%d", pCfg->vgId, pThread->threadIndex);
×
640
        pThread->failed++;
×
641
        continue;
×
642
      }
643
      releaseTfs = true;
1,292✔
644
    }
645
#endif
646

647
    SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMountTfs, pMgmt->msgCb, false);
389,229✔
648

649
    if (pImpl == NULL) {
389,229!
650
      dError("vgId:%d, failed to open vnode by thread:%d since %s", pCfg->vgId, pThread->threadIndex, terrstr());
×
651
      if (terrno != TSDB_CODE_NEED_RETRY) {
×
652
        pThread->failed++;
×
653
#ifdef USE_MOUNT
654
        if (releaseTfs) vmReleaseMountTfs(pMgmt, pCfg->mountId, 0);
×
655
#endif
656
        continue;
×
657
      }
658
    }
659

660
    if (pImpl != NULL) {
389,229!
661
      if (vmOpenVnode(pMgmt, pCfg, pImpl) != 0) {
389,229!
662
        dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
×
663
        pThread->failed++;
×
664
#ifdef USE_MOUNT
665
        if (releaseTfs) vmReleaseMountTfs(pMgmt, pCfg->mountId, 0);
×
666
#endif
667
        continue;
×
668
      }
669
    }
670

671
    dInfo("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
389,229!
672
    pThread->opened++;
389,229✔
673
    (void)atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
389,229✔
674
  }
675

676
  dInfo("thread:%d, numOfVnodes:%d, opened:%d dropped:%d failed:%d", pThread->threadIndex, pThread->vnodeNum,
386,333!
677
        pThread->opened, pThread->dropped, pThread->failed);
678
  return NULL;
386,333✔
679
}
680

681
#ifdef USE_MOUNT
682
static int32_t vmOpenMountTfs(SVnodeMgmt *pMgmt) {
744,904✔
683
  int32_t    code = 0, lino = 0;
744,904✔
684
  int32_t    numOfMounts = 0;
744,904✔
685
  SMountCfg *pMountCfgs = NULL;
744,904✔
686
  SArray    *pDisks = NULL;
744,904✔
687
  TdFilePtr  pFile = NULL;
744,904✔
688
  SMountTfs *pMountTfs = NULL;
744,904✔
689

690
  TAOS_CHECK_EXIT(vmGetMountListFromFile(pMgmt, &pMountCfgs, &numOfMounts));
744,904!
691
  for (int32_t i = 0; i < numOfMounts; ++i) {
745,227✔
692
    SMountCfg *pCfg = &pMountCfgs[i];
323✔
693
    if (taosHashGet(pMgmt->mountTfsHash, &pCfg->mountId, sizeof(pCfg->mountId))) {
323!
694
      TAOS_CHECK_EXIT(TSDB_CODE_INTERNAL_ERROR);
×
695
    }
696
    TAOS_CHECK_EXIT(vmMountCheckRunning(pCfg->name, pCfg->path, &pFile, 3));
323!
697
    TAOS_CHECK_EXIT(vmGetMountDisks(pMgmt, pCfg->path, &pDisks));
323!
698
    int32_t nDisks = taosArrayGetSize(pDisks);
323✔
699
    if (nDisks < 1 || nDisks > TFS_MAX_DISKS) {
323!
700
      dError("mount:%s, %" PRIi64 ", %s, invalid number of disks:%d, expect 1 to %d", pCfg->name, pCfg->mountId,
×
701
             pCfg->path, nDisks, TFS_MAX_DISKS);
702
      TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
703
    }
704

705
    TSDB_CHECK_NULL((pMountTfs = taosMemoryCalloc(1, sizeof(SMountTfs))), code, lino, _exit, terrno);
323!
706
    TAOS_CHECK_EXIT(tfsOpen(TARRAY_GET_ELEM(pDisks, 0), TARRAY_SIZE(pDisks), &pMountTfs->pTfs));
323!
707
    (void)snprintf(pMountTfs->name, sizeof(pMountTfs->name), "%s", pCfg->name);
323✔
708
    (void)snprintf(pMountTfs->path, sizeof(pMountTfs->path), "%s", pCfg->path);
323✔
709
    pMountTfs->pFile = pFile;
323✔
710
    pMountTfs->nRef = 1;
323✔
711
    TAOS_CHECK_EXIT(taosHashPut(pMgmt->mountTfsHash, &pCfg->mountId, sizeof(pCfg->mountId), &pMountTfs, POINTER_BYTES));
323!
712
    taosArrayDestroy(pDisks);
323✔
713
    pDisks = NULL;
323✔
714
    pMountTfs = NULL;
323✔
715
    pFile = NULL;
323✔
716
  }
717
_exit:
744,904✔
718
  if (code != 0) {
744,904!
719
    dError("failed to open mount tfs at line %d since %s", lino, tstrerror(code));
×
720
    if (pFile) {
×
721
      (void)taosUnLockFile(pFile);
×
722
      (void)taosCloseFile(&pFile);
×
723
    }
724
    if (pMountTfs) {
×
725
      tfsClose(pMountTfs->pTfs);
×
726
      taosMemoryFree(pMountTfs);
×
727
    }
728
    taosArrayDestroy(pDisks);
×
729
  }
730
  taosMemoryFree(pMountCfgs);
744,904!
731
  TAOS_RETURN(code);
744,904✔
732
}
733
#endif
734
static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
744,904✔
735
  pMgmt->runngingHash =
744,904✔
736
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
744,904✔
737
  if (pMgmt->runngingHash == NULL) {
744,904!
738
    dError("failed to init vnode hash since %s", terrstr());
×
739
    return TSDB_CODE_OUT_OF_MEMORY;
×
740
  }
741

742
  pMgmt->closedHash =
744,904✔
743
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
744,904✔
744
  if (pMgmt->closedHash == NULL) {
744,904!
745
    dError("failed to init vnode closed hash since %s", terrstr());
×
746
    return TSDB_CODE_OUT_OF_MEMORY;
×
747
  }
748

749
  pMgmt->creatingHash =
744,904✔
750
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
744,904✔
751
  if (pMgmt->creatingHash == NULL) {
744,904!
752
    dError("failed to init vnode creatingHash hash since %s", terrstr());
×
753
    return TSDB_CODE_OUT_OF_MEMORY;
×
754
  }
755

756
  SWrapperCfg *pCfgs = NULL;
744,904✔
757
  int32_t      numOfVnodes = 0;
744,904✔
758
  int32_t      code = 0;
744,904✔
759
  if ((code = vmGetVnodeListFromFile(pMgmt, &pCfgs, &numOfVnodes)) != 0) {
744,904!
760
    dInfo("failed to get vnode list from disk since %s", tstrerror(code));
×
761
    return code;
×
762
  }
763

764
  pMgmt->state.totalVnodes = numOfVnodes;
744,904✔
765

766
  int32_t threadNum = tsNumOfCores / 2;
744,904!
767
  if (threadNum < 1) threadNum = 1;
744,904!
768
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
744,904!
769

770
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
744,904!
771
  if (threads == NULL) {
744,904!
772
    dError("failed to allocate memory for threads since %s", terrstr());
×
773
    taosMemoryFree(pCfgs);
×
774
    return terrno;
×
775
  }
776

777
  for (int32_t t = 0; t < threadNum; ++t) {
15,642,984✔
778
    threads[t].threadIndex = t;
14,898,080✔
779
    threads[t].pMgmt = pMgmt;
14,898,080✔
780
    threads[t].pCfgs = taosMemoryCalloc(vnodesPerThread, sizeof(SWrapperCfg));
14,898,080!
781
  }
782

783
  for (int32_t v = 0; v < numOfVnodes; ++v) {
1,134,133✔
784
    int32_t       t = v % threadNum;
389,229!
785
    SVnodeThread *pThread = &threads[t];
389,229✔
786
    pThread->pCfgs[pThread->vnodeNum++] = pCfgs[v];
389,229✔
787
  }
788

789
  dInfo("open %d vnodes with %d threads", numOfVnodes, threadNum);
744,904!
790

791
  for (int32_t t = 0; t < threadNum; ++t) {
15,642,984✔
792
    SVnodeThread *pThread = &threads[t];
14,898,080✔
793
    if (pThread->vnodeNum == 0) continue;
14,898,080✔
794

795
    TdThreadAttr thAttr;
385,833✔
796
    (void)taosThreadAttrInit(&thAttr);
386,333✔
797
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
386,333✔
798
#ifdef TD_COMPACT_OS
799
    (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
800
#endif
801
    if (taosThreadCreate(&pThread->thread, &thAttr, vmOpenVnodeInThread, pThread) != 0) {
386,333!
802
      dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(ERRNO));
×
803
    }
804

805
    (void)taosThreadAttrDestroy(&thAttr);
386,333✔
806
  }
807

808
  bool updateVnodesList = false;
744,904✔
809

810
  for (int32_t t = 0; t < threadNum; ++t) {
15,642,984✔
811
    SVnodeThread *pThread = &threads[t];
14,898,080✔
812
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
14,898,080!
813
      (void)taosThreadJoin(pThread->thread, NULL);
386,333✔
814
      taosThreadClear(&pThread->thread);
386,333✔
815
    }
816
    taosMemoryFree(pThread->pCfgs);
14,898,080!
817
    if (pThread->updateVnodesList) updateVnodesList = true;
14,898,080!
818
  }
819
  taosMemoryFree(threads);
744,904!
820
  taosMemoryFree(pCfgs);
744,904!
821

822
  if ((pMgmt->state.openVnodes + pMgmt->state.dropVnodes) != pMgmt->state.totalVnodes) {
744,904!
823
    dError("there are total vnodes:%d, opened:%d", pMgmt->state.totalVnodes, pMgmt->state.openVnodes);
×
824
    return terrno = TSDB_CODE_VND_INIT_FAILED;
×
825
  }
826

827
  if (updateVnodesList && (code = vmWriteVnodeListToFile(pMgmt)) != 0) {
744,904!
828
    dError("failed to write vnode list since %s", tstrerror(code));
×
829
    return code;
×
830
  }
831

832
#ifdef USE_MOUNT
833
  bool  updateMountList = false;
744,904✔
834
  void *pIter = NULL;
744,904✔
835
  while ((pIter = taosHashIterate(pMgmt->mountTfsHash, pIter))) {
745,227✔
836
    SMountTfs *pMountTfs = *(SMountTfs **)pIter;
323✔
837
    if (pMountTfs && atomic_load_32(&pMountTfs->nRef) <= 1) {
323!
838
      size_t  keyLen = 0;
×
839
      int64_t mountId = *(int64_t *)taosHashGetKey(pIter, &keyLen);
×
840
      dInfo("mount:%s, %s, %" PRIi64 ", ref:%d, remove unused mount tfs", pMountTfs->name, pMountTfs->path, mountId,
×
841
            atomic_load_32(&pMountTfs->nRef));
842
      if (pMountTfs->pFile) {
×
843
        (void)taosUnLockFile(pMountTfs->pFile);
×
844
        (void)taosCloseFile(&pMountTfs->pFile);
×
845
      }
846
      tfsClose(pMountTfs->pTfs);
×
847
      taosMemoryFree(pMountTfs);
×
848
      if ((code = taosHashRemove(pMgmt->mountTfsHash, &mountId, keyLen)) != 0) {
×
849
        dWarn("failed at line %d to remove mount:%s, %s, %" PRIi64 " from mount tfs hash since %s", __LINE__,
×
850
              pMountTfs->name, pMountTfs->path, mountId, tstrerror(code));
851
      }
852
      updateMountList = true;
×
853
    }
854
  }
855
  if (updateMountList && (code = vmWriteMountListToFile(pMgmt)) != 0) {
744,904!
856
    dError("failed to write mount list at line %d since %s", __LINE__, tstrerror(code));
×
857
    return code;
×
858
  }
859
#endif
860

861
  dInfo("successfully opened %d vnodes", pMgmt->state.totalVnodes);
744,904!
862
  return 0;
744,904✔
863
}
864

865
static void *vmCloseVnodeInThread(void *param) {
1,732,317✔
866
  SVnodeThread *pThread = param;
1,732,317✔
867
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
1,732,317✔
868

869
  dInfo("thread:%d, start to close %d vnodes", pThread->threadIndex, pThread->vnodeNum);
1,732,317!
870
  setThreadName("close-vnodes");
1,732,317✔
871

872
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
3,488,890✔
873
    SVnodeObj *pVnode = pThread->ppVnodes[v];
1,756,573✔
874

875
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
1,756,573✔
876
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to close, %d of %d have been closed", pVnode->vgId,
1,756,378!
877
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
878
    tmsgReportStartup("vnode-close", stepDesc);
1,756,573✔
879

880
    vmCloseVnode(pMgmt, pVnode, false, false);
1,755,899✔
881
  }
882

883
  dInfo("thread:%d, numOfVnodes:%d is closed", pThread->threadIndex, pThread->vnodeNum);
1,732,317!
884
  return NULL;
1,732,317✔
885
}
886

887
static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
744,904✔
888
  int32_t code = 0;
744,904✔
889
  dInfo("start to close all vnodes");
744,904!
890
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
744,904✔
891
  dInfo("vnodes mgmt worker is stopped");
744,904!
892
  tSingleWorkerCleanup(&pMgmt->mgmtMultiWorker);
744,904✔
893
  dInfo("vnodes multiple mgmt worker is stopped");
744,904!
894

895
  int32_t     numOfVnodes = 0;
744,904✔
896
  SVnodeObj **ppVnodes = NULL;
744,904✔
897
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
744,904✔
898
  if (code != 0) {
744,904!
899
    dError("failed to get vnode list since %s", tstrerror(code));
×
900
    return;
×
901
  }
902

903
  int32_t threadNum = tsNumOfCores / 2;
744,904!
904
  if (threadNum < 1) threadNum = 1;
744,904!
905
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
744,904!
906

907
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
744,904!
908
  for (int32_t t = 0; t < threadNum; ++t) {
15,642,984✔
909
    threads[t].threadIndex = t;
14,898,080✔
910
    threads[t].pMgmt = pMgmt;
14,898,080✔
911
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
14,898,080!
912
  }
913

914
  for (int32_t v = 0; v < numOfVnodes; ++v) {
2,501,477✔
915
    int32_t       t = v % threadNum;
1,756,573!
916
    SVnodeThread *pThread = &threads[t];
1,756,573✔
917
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
1,756,573!
918
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
1,756,573✔
919
    }
920
  }
921

922
  pMgmt->state.openVnodes = 0;
744,904✔
923
  dInfo("close %d vnodes with %d threads", numOfVnodes, threadNum);
744,904!
924

925
  for (int32_t t = 0; t < threadNum; ++t) {
15,642,984✔
926
    SVnodeThread *pThread = &threads[t];
14,898,080✔
927
    if (pThread->vnodeNum == 0) continue;
14,898,080✔
928

929
    TdThreadAttr thAttr;
1,730,518✔
930
    (void)taosThreadAttrInit(&thAttr);
1,732,317✔
931
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,732,317✔
932
#ifdef TD_COMPACT_OS
933
    (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
934
#endif
935
    if (taosThreadCreate(&pThread->thread, &thAttr, vmCloseVnodeInThread, pThread) != 0) {
1,732,317!
936
      dError("thread:%d, failed to create thread to close vnode since %s", pThread->threadIndex, strerror(ERRNO));
×
937
    }
938

939
    (void)taosThreadAttrDestroy(&thAttr);
1,732,317✔
940
  }
941

942
  for (int32_t t = 0; t < threadNum; ++t) {
15,642,984✔
943
    SVnodeThread *pThread = &threads[t];
14,898,080✔
944
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
14,898,080!
945
      (void)taosThreadJoin(pThread->thread, NULL);
1,732,317✔
946
      taosThreadClear(&pThread->thread);
1,732,317✔
947
    }
948
    taosMemoryFree(pThread->ppVnodes);
14,898,080!
949
  }
950
  taosMemoryFree(threads);
744,904!
951

952
  if (ppVnodes != NULL) {
744,904!
953
    taosMemoryFree(ppVnodes);
744,904!
954
  }
955

956
  if (pMgmt->runngingHash != NULL) {
744,904!
957
    taosHashCleanup(pMgmt->runngingHash);
744,904✔
958
    pMgmt->runngingHash = NULL;
744,904✔
959
  }
960

961
  void *pIter = taosHashIterate(pMgmt->closedHash, NULL);
744,904✔
962
  while (pIter) {
744,904!
963
    SVnodeObj **ppVnode = pIter;
×
964
    vmFreeVnodeObj(ppVnode);
×
965
    pIter = taosHashIterate(pMgmt->closedHash, pIter);
×
966
  }
967

968
  if (pMgmt->closedHash != NULL) {
744,904!
969
    taosHashCleanup(pMgmt->closedHash);
744,904✔
970
    pMgmt->closedHash = NULL;
744,904✔
971
  }
972

973
  pIter = taosHashIterate(pMgmt->creatingHash, NULL);
744,904✔
974
  while (pIter) {
744,904!
975
    SVnodeObj **ppVnode = pIter;
×
976
    vmFreeVnodeObj(ppVnode);
×
977
    pIter = taosHashIterate(pMgmt->creatingHash, pIter);
×
978
  }
979

980
  if (pMgmt->creatingHash != NULL) {
744,904!
981
    taosHashCleanup(pMgmt->creatingHash);
744,904✔
982
    pMgmt->creatingHash = NULL;
744,904✔
983
  }
984

985
#ifdef USE_MOUNT
986
  pIter = NULL;
744,904✔
987
  while ((pIter = taosHashIterate(pMgmt->mountTfsHash, pIter))) {
745,230✔
988
    SMountTfs *mountTfs = *(SMountTfs **)pIter;
326✔
989
    if (mountTfs->pFile) {
326!
990
      (void)taosUnLockFile(mountTfs->pFile);
326✔
991
      (void)taosCloseFile(&mountTfs->pFile);
326✔
992
    }
993
    tfsClose(mountTfs->pTfs);
326✔
994
    taosMemoryFree(mountTfs);
326!
995
  }
996
  taosHashCleanup(pMgmt->mountTfsHash);
744,904✔
997
  pMgmt->mountTfsHash = NULL;
744,904✔
998
#endif
999

1000
  dInfo("total vnodes:%d are all closed", numOfVnodes);
744,904!
1001
}
1002

1003
static void vmCleanup(SVnodeMgmt *pMgmt) {
744,904✔
1004
  vmCloseVnodes(pMgmt);
744,904✔
1005
  vmStopWorker(pMgmt);
744,904✔
1006
  vnodeCleanup();
744,904✔
1007
  (void)taosThreadRwlockDestroy(&pMgmt->hashLock);
744,904✔
1008
  (void)taosThreadMutexDestroy(&pMgmt->mutex);
744,904✔
1009
  taosMemoryFree(pMgmt);
744,904!
1010
}
744,904✔
1011

1012
static void vmCheckSyncTimeout(SVnodeMgmt *pMgmt) {
1,018,612✔
1013
  int32_t     code = 0;
1,018,612✔
1014
  int32_t     numOfVnodes = 0;
1,018,612✔
1015
  SVnodeObj **ppVnodes = NULL;
1,018,612✔
1016
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
1,018,612✔
1017
  if (code != 0) {
1,018,612!
1018
    dError("failed to get vnode list since %s", tstrerror(code));
×
1019
    return;
×
1020
  }
1021

1022
  if (ppVnodes != NULL) {
1,018,612!
1023
    for (int32_t i = 0; i < numOfVnodes; ++i) {
3,993,880✔
1024
      SVnodeObj *pVnode = ppVnodes[i];
2,975,268✔
1025
      if (!pVnode->failed) {
2,975,268!
1026
        vnodeSyncCheckTimeout(pVnode->pImpl);
2,975,268✔
1027
      }
1028
      vmReleaseVnode(pMgmt, pVnode);
2,975,268✔
1029
    }
1030
    taosMemoryFree(ppVnodes);
1,018,612!
1031
  }
1032
}
1033

1034
static void *vmThreadFp(void *param) {
744,904✔
1035
  SVnodeMgmt *pMgmt = param;
744,904✔
1036
  int64_t     lastTime = 0;
744,904✔
1037
  setThreadName("vnode-timer");
744,904✔
1038

1039
  while (1) {
393,249,995✔
1040
    lastTime++;
393,994,899✔
1041
    taosMsleep(100);
393,994,899✔
1042
    if (pMgmt->stop) break;
393,994,899✔
1043
    if (lastTime % 10 != 0) continue;
393,249,995✔
1044

1045
    int64_t sec = lastTime / 10;
38,987,871✔
1046
    if (sec % (VNODE_TIMEOUT_SEC / 2) == 0) {
38,987,871✔
1047
      vmCheckSyncTimeout(pMgmt);
1,018,612✔
1048
    }
1049
  }
1050

1051
  return NULL;
744,904✔
1052
}
1053

1054
static int32_t vmInitTimer(SVnodeMgmt *pMgmt) {
744,904✔
1055
  int32_t      code = 0;
744,904✔
1056
  TdThreadAttr thAttr;
743,517✔
1057
  (void)taosThreadAttrInit(&thAttr);
744,904✔
1058
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
744,904✔
1059
#ifdef TD_COMPACT_OS
1060
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
1061
#endif
1062
  if (taosThreadCreate(&pMgmt->thread, &thAttr, vmThreadFp, pMgmt) != 0) {
744,904!
1063
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
1064
    dError("failed to create vnode timer thread since %s", tstrerror(code));
×
1065
    return code;
×
1066
  }
1067

1068
  (void)taosThreadAttrDestroy(&thAttr);
744,904✔
1069
  return 0;
744,904✔
1070
}
1071

1072
static void vmCleanupTimer(SVnodeMgmt *pMgmt) {
744,904✔
1073
  pMgmt->stop = true;
744,904✔
1074
  if (taosCheckPthreadValid(pMgmt->thread)) {
744,904!
1075
    (void)taosThreadJoin(pMgmt->thread, NULL);
744,904✔
1076
    taosThreadClear(&pMgmt->thread);
744,904✔
1077
  }
1078
}
744,904✔
1079

1080
static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
744,904✔
1081
  int32_t code = -1;
744,904✔
1082

1083
  SVnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SVnodeMgmt));
744,904!
1084
  if (pMgmt == NULL) {
744,904!
1085
    code = terrno;
×
1086
    goto _OVER;
×
1087
  }
1088

1089
  pMgmt->pData = pInput->pData;
744,904✔
1090
  pMgmt->path = pInput->path;
744,904✔
1091
  pMgmt->name = pInput->name;
744,904✔
1092
  pMgmt->msgCb = pInput->msgCb;
744,904✔
1093
  pMgmt->msgCb.putToQueueFp = (PutToQueueFp)vmPutRpcMsgToQueue;
744,904✔
1094
  pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
744,904✔
1095
  pMgmt->msgCb.mgmt = pMgmt;
744,904✔
1096

1097
  code = taosThreadRwlockInit(&pMgmt->hashLock, NULL);
744,904✔
1098
  if (code != 0) {
744,904!
1099
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
1100
    goto _OVER;
×
1101
  }
1102

1103
  code = taosThreadMutexInit(&pMgmt->mutex, NULL);
744,904✔
1104
  if (code != 0) {
744,904!
1105
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
1106
    goto _OVER;
×
1107
  }
1108

1109
  pMgmt->pTfs = pInput->pTfs;
744,904✔
1110
  if (pMgmt->pTfs == NULL) {
744,904!
1111
    dError("tfs is null.");
×
1112
    goto _OVER;
×
1113
  }
1114
#ifdef USE_MOUNT
1115
  if (!(pMgmt->mountTfsHash =
744,904!
1116
            taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK))) {
744,904✔
1117
    dError("failed to init mountTfsHash since %s", terrstr());
×
1118
    return TSDB_CODE_OUT_OF_MEMORY;
×
1119
  }
1120
  if ((code = vmOpenMountTfs(pMgmt)) != 0) {
744,904!
1121
    goto _OVER;
×
1122
  }
1123
#endif
1124
  tmsgReportStartup("vnode-tfs", "initialized");
744,904✔
1125
  if ((code = walInit(pInput->stopDnodeFp)) != 0) {
744,904!
1126
    dError("failed to init wal since %s", tstrerror(code));
×
1127
    goto _OVER;
×
1128
  }
1129

1130
  tmsgReportStartup("vnode-wal", "initialized");
744,904✔
1131

1132
  if ((code = syncInit()) != 0) {
744,904!
1133
    dError("failed to open sync since %s", tstrerror(code));
×
1134
    goto _OVER;
×
1135
  }
1136
  tmsgReportStartup("vnode-sync", "initialized");
744,904✔
1137

1138
  if ((code = vnodeInit(pInput->stopDnodeFp)) != 0) {
744,904!
1139
    dError("failed to init vnode since %s", tstrerror(code));
×
1140
    goto _OVER;
×
1141
  }
1142
  tmsgReportStartup("vnode-commit", "initialized");
744,904✔
1143

1144
  if ((code = vmStartWorker(pMgmt)) != 0) {
744,904!
1145
    dError("failed to init workers since %s", tstrerror(code));
×
1146
    goto _OVER;
×
1147
  }
1148
  tmsgReportStartup("vnode-worker", "initialized");
744,904✔
1149

1150
  if ((code = vmOpenVnodes(pMgmt)) != 0) {
744,904!
1151
    dError("failed to open all vnodes since %s", tstrerror(code));
×
1152
    goto _OVER;
×
1153
  }
1154
  tmsgReportStartup("vnode-vnodes", "initialized");
744,904✔
1155

1156
  if ((code = udfcOpen()) != 0) {
744,904!
1157
    dError("failed to open udfc in vnode since %s", tstrerror(code));
×
1158
    goto _OVER;
×
1159
  }
1160

1161
  code = 0;
744,904✔
1162

1163
_OVER:
744,904✔
1164
  if (code == 0) {
744,904!
1165
    pOutput->pMgmt = pMgmt;
744,904✔
1166
  } else {
1167
    dError("failed to init vnodes-mgmt since %s", tstrerror(code));
×
1168
    vmCleanup(pMgmt);
×
1169
  }
1170

1171
  return code;
744,904✔
1172
}
1173

1174
static int32_t vmRequire(const SMgmtInputOpt *pInput, bool *required) {
747,081✔
1175
  *required = tsNumOfSupportVnodes > 0;
747,081✔
1176
  return 0;
747,081✔
1177
}
1178

1179
static void *vmRestoreVnodeInThread(void *param) {
386,333✔
1180
  SVnodeThread *pThread = param;
386,333✔
1181
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
386,333✔
1182

1183
  dInfo("thread:%d, start to restore %d vnodes", pThread->threadIndex, pThread->vnodeNum);
386,333!
1184
  setThreadName("restore-vnodes");
386,333✔
1185

1186
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
775,562✔
1187
    SVnodeObj *pVnode = pThread->ppVnodes[v];
389,229✔
1188
    if (pVnode->failed) {
389,229!
1189
      dError("vgId:%d, cannot restore a vnode in failed mode.", pVnode->vgId);
×
1190
      continue;
×
1191
    }
1192

1193
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
389,229✔
1194
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been restored", pVnode->vgId,
389,229!
1195
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
1196
    tmsgReportStartup("vnode-restore", stepDesc);
389,229✔
1197

1198
    int32_t code = vnodeStart(pVnode->pImpl);
389,229✔
1199
    if (code != 0) {
389,229!
1200
      dError("vgId:%d, failed to restore vnode by thread:%d", pVnode->vgId, pThread->threadIndex);
×
1201
      pThread->failed++;
×
1202
    } else {
1203
      dInfo("vgId:%d, is restored by thread:%d", pVnode->vgId, pThread->threadIndex);
389,229!
1204
      pThread->opened++;
389,229✔
1205
      (void)atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
389,229✔
1206
    }
1207
  }
1208

1209
  dInfo("thread:%d, numOfVnodes:%d, restored:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
386,333!
1210
        pThread->failed);
1211
  return NULL;
386,333✔
1212
}
1213

1214
static int32_t vmStartVnodes(SVnodeMgmt *pMgmt) {
744,904✔
1215
  int32_t     code = 0;
744,904✔
1216
  int32_t     numOfVnodes = 0;
744,904✔
1217
  SVnodeObj **ppVnodes = NULL;
744,904✔
1218
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
744,904✔
1219
  if (code != 0) {
744,904!
1220
    dError("failed to get vnode list since %s", tstrerror(code));
×
1221
    return code;
×
1222
  }
1223

1224
  int32_t threadNum = tsNumOfCores / 2;
744,904!
1225
  if (threadNum < 1) threadNum = 1;
744,904!
1226
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
744,904!
1227

1228
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
744,904!
1229
  if (threads == NULL) {
744,904!
1230
    return terrno;
×
1231
  }
1232

1233
  for (int32_t t = 0; t < threadNum; ++t) {
15,642,984✔
1234
    threads[t].threadIndex = t;
14,898,080✔
1235
    threads[t].pMgmt = pMgmt;
14,898,080✔
1236
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
14,898,080!
1237
    if (threads[t].ppVnodes == NULL) {
14,898,080!
1238
      code = terrno;
×
1239
      break;
×
1240
    }
1241
  }
1242

1243
  for (int32_t v = 0; v < numOfVnodes; ++v) {
1,134,133✔
1244
    int32_t       t = v % threadNum;
389,229!
1245
    SVnodeThread *pThread = &threads[t];
389,229✔
1246
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
389,229!
1247
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
389,229✔
1248
    }
1249
  }
1250

1251
  pMgmt->state.openVnodes = 0;
744,904✔
1252
  pMgmt->state.dropVnodes = 0;
744,904✔
1253
  dInfo("restore %d vnodes with %d threads", numOfVnodes, threadNum);
744,904!
1254

1255
  for (int32_t t = 0; t < threadNum; ++t) {
15,642,984✔
1256
    SVnodeThread *pThread = &threads[t];
14,898,080✔
1257
    if (pThread->vnodeNum == 0) continue;
14,898,080✔
1258

1259
    TdThreadAttr thAttr;
385,833✔
1260
    (void)taosThreadAttrInit(&thAttr);
386,333✔
1261
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
386,333✔
1262
    if (taosThreadCreate(&pThread->thread, &thAttr, vmRestoreVnodeInThread, pThread) != 0) {
386,333!
1263
      dError("thread:%d, failed to create thread to restore vnode since %s", pThread->threadIndex, strerror(ERRNO));
×
1264
    }
1265

1266
    (void)taosThreadAttrDestroy(&thAttr);
386,333✔
1267
  }
1268

1269
  for (int32_t t = 0; t < threadNum; ++t) {
15,642,984✔
1270
    SVnodeThread *pThread = &threads[t];
14,898,080✔
1271
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
14,898,080!
1272
      (void)taosThreadJoin(pThread->thread, NULL);
386,333✔
1273
      taosThreadClear(&pThread->thread);
386,333✔
1274
    }
1275
    taosMemoryFree(pThread->ppVnodes);
14,898,080!
1276
  }
1277
  taosMemoryFree(threads);
744,904!
1278

1279
  for (int32_t i = 0; i < numOfVnodes; ++i) {
1,134,133✔
1280
    if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
389,229!
1281
    vmReleaseVnode(pMgmt, ppVnodes[i]);
389,229✔
1282
  }
1283

1284
  if (ppVnodes != NULL) {
744,904!
1285
    taosMemoryFree(ppVnodes);
744,904!
1286
  }
1287

1288
  return vmInitTimer(pMgmt);
744,904✔
1289

1290
_exit:
1291
  for (int32_t t = 0; t < threadNum; ++t) {
1292
    SVnodeThread *pThread = &threads[t];
1293
    taosMemoryFree(pThread->ppVnodes);
1294
  }
1295
  taosMemoryFree(threads);
1296
  return code;
1297
}
1298

1299
static void vmStop(SVnodeMgmt *pMgmt) { vmCleanupTimer(pMgmt); }
744,904✔
1300

1301
SMgmtFunc vmGetMgmtFunc() {
747,081✔
1302
  SMgmtFunc mgmtFunc = {0};
747,081✔
1303
  mgmtFunc.openFp = vmInit;
747,081✔
1304
  mgmtFunc.closeFp = (NodeCloseFp)vmCleanup;
747,081✔
1305
  mgmtFunc.startFp = (NodeStartFp)vmStartVnodes;
747,081✔
1306
  mgmtFunc.stopFp = (NodeStopFp)vmStop;
747,081✔
1307
  mgmtFunc.requiredFp = vmRequire;
747,081✔
1308
  mgmtFunc.getHandlesFp = vmGetMsgHandles;
747,081✔
1309

1310
  return mgmtFunc;
747,081✔
1311
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc