• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4863

26 Nov 2025 05:46AM UTC coverage: 64.539% (+0.2%) from 64.294%
#4863

push

travis-ci

guanshengliang
Merge branch '3.0' into cover/3.0

767 of 945 new or added lines in 33 files covered. (81.16%)

701 existing lines in 109 files now uncovered.

158204 of 245129 relevant lines covered (64.54%)

111903199.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.26
/source/dnode/mgmt/mgmt_vnode/src/vmInt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "vmInt.h"
18
#include "libs/function/tudf.h"
19
#include "osMemory.h"
20
#include "tfs.h"
21
#include "vnd.h"
22

23
int32_t vmGetPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
2,944,096✔
24
  int32_t    diskId = -1;
2,944,096✔
25
  SVnodeObj *pVnode = NULL;
2,944,096✔
26

27
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
2,946,169✔
28
  int32_t r = taosHashGetDup(pMgmt->runngingHash, &vgId, sizeof(int32_t), (void *)&pVnode);
2,939,056✔
29
  if (pVnode != NULL) {
2,946,133✔
30
    diskId = pVnode->diskPrimary;
×
31
  }
32
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
2,946,133✔
33
  return diskId;
2,947,290✔
34
}
35

36
static void vmFreeVnodeObj(SVnodeObj **ppVnode) {
7,744,648✔
37
  if (!ppVnode || !(*ppVnode)) return;
7,744,648✔
38

39
  SVnodeObj *pVnode = *ppVnode;
7,744,648✔
40

41
  int32_t refCount = atomic_load_32(&pVnode->refCount);
7,745,673✔
42
  while (refCount > 0) {
7,744,648✔
UNCOV
43
    dWarn("vgId:%d, vnode is refenced, retry to free in 200ms, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
×
UNCOV
44
    taosMsleep(200);
×
UNCOV
45
    refCount = atomic_load_32(&pVnode->refCount);
×
46
  }
47

48
  taosMemoryFree(pVnode->path);
7,744,648✔
49
  taosMemoryFree(pVnode);
7,744,648✔
50
  ppVnode[0] = NULL;
7,746,289✔
51
}
52

53
static int32_t vmRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId, int32_t diskId) {
2,948,077✔
54
  int32_t    code = 0;
2,948,077✔
55
  SVnodeObj *pCreatingVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
2,948,077✔
56
  if (pCreatingVnode == NULL) {
2,948,077✔
57
    dError("failed to alloc vnode since %s", terrstr());
×
58
    return terrno;
×
59
  }
60
  (void)memset(pCreatingVnode, 0, sizeof(SVnodeObj));
2,948,077✔
61

62
  pCreatingVnode->vgId = vgId;
2,948,077✔
63
  pCreatingVnode->diskPrimary = diskId;
2,948,077✔
64

65
  code = taosThreadRwlockWrlock(&pMgmt->hashLock);
2,948,077✔
66
  if (code != 0) {
2,948,077✔
67
    taosMemoryFree(pCreatingVnode);
×
68
    return code;
×
69
  }
70

71
  dTrace("vgId:%d, put vnode into creating hash, pCreatingVnode:%p", vgId, pCreatingVnode);
2,948,077✔
72
  code = taosHashPut(pMgmt->creatingHash, &vgId, sizeof(int32_t), &pCreatingVnode, sizeof(SVnodeObj *));
2,948,077✔
73
  if (code != 0) {
2,948,077✔
74
    dError("vgId:%d, failed to put vnode to creatingHash", vgId);
×
75
    taosMemoryFree(pCreatingVnode);
×
76
  }
77

78
  int32_t r = taosThreadRwlockUnlock(&pMgmt->hashLock);
2,948,077✔
79
  if (r != 0) {
2,948,077✔
80
    dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r));
×
81
  }
82

83
  return code;
2,948,077✔
84
}
85

86
static void vmUnRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId) {
2,950,621✔
87
  SVnodeObj *pOld = NULL;
2,950,621✔
88

89
  (void)taosThreadRwlockWrlock(&pMgmt->hashLock);
2,950,621✔
90
  int32_t r = taosHashGetDup(pMgmt->creatingHash, &vgId, sizeof(int32_t), (void *)&pOld);
2,950,621✔
91
  if (r != 0) {
2,950,621✔
92
    dError("vgId:%d, failed to get vnode from creating Hash", vgId);
×
93
  }
94
  dTrace("vgId:%d, remove from creating Hash", vgId);
2,950,621✔
95
  r = taosHashRemove(pMgmt->creatingHash, &vgId, sizeof(int32_t));
2,950,621✔
96
  if (r != 0) {
2,950,621✔
97
    dError("vgId:%d, failed to remove vnode from creatingHash", vgId);
2,544✔
98
  }
99
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
2,950,621✔
100

101
  if (pOld) {
2,950,621✔
102
    dTrace("vgId:%d, free vnode pOld:%p", vgId, &pOld);
2,948,077✔
103
    vmFreeVnodeObj(&pOld);
2,948,077✔
104
  }
105

106
_OVER:
2,544✔
107
  if (r != 0) {
2,950,621✔
108
    dError("vgId:%d, failed to remove vnode from creatingHash since %s", vgId, tstrerror(r));
2,544✔
109
  }
110
}
2,950,621✔
111

112
int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
2,942,454✔
113
  int32_t code = 0;
2,942,454✔
114
  STfs   *pTfs = pMgmt->pTfs;
2,942,454✔
115
  int32_t diskId = 0;
2,942,159✔
116
  if (!pTfs) {
2,942,159✔
117
    return diskId;
×
118
  }
119

120
  // search fs
121
  char vnodePath[TSDB_FILENAME_LEN] = {0};
2,942,159✔
122
  snprintf(vnodePath, TSDB_FILENAME_LEN - 1, "vnode%svnode%d", TD_DIRSEP, vgId);
2,939,879✔
123
  char fname[TSDB_FILENAME_LEN] = {0};
2,939,879✔
124
  char fnameTmp[TSDB_FILENAME_LEN] = {0};
2,943,532✔
125
  snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME);
2,926,889✔
126
  snprintf(fnameTmp, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME_TMP);
2,926,889✔
127

128
  diskId = tfsSearch(pTfs, 0, fname);
2,926,889✔
129
  if (diskId >= 0) {
2,944,232✔
130
    return diskId;
×
131
  }
132
  diskId = tfsSearch(pTfs, 0, fnameTmp);
2,944,232✔
133
  if (diskId >= 0) {
2,942,369✔
134
    return diskId;
×
135
  }
136

137
  // alloc
138
  int32_t     disks[TFS_MAX_DISKS_PER_TIER] = {0};
2,942,369✔
139
  int32_t     numOfVnodes = 0;
2,944,432✔
140
  SVnodeObj **ppVnodes = NULL;
2,946,738✔
141

142
  code = taosThreadMutexLock(&pMgmt->mutex);
2,946,548✔
143
  if (code != 0) {
2,948,077✔
144
    return code;
×
145
  }
146

147
  code = vmGetAllVnodeListFromHashWithCreating(pMgmt, &numOfVnodes, &ppVnodes);
2,948,077✔
148
  if (code != 0) {
2,948,077✔
149
    int32_t r = taosThreadMutexUnlock(&pMgmt->mutex);
×
150
    if (r != 0) {
×
151
      dError("vgId:%d, failed to unlock mutex since %s", vgId, tstrerror(r));
×
152
    }
153
    return code;
×
154
  }
155

156
  for (int32_t v = 0; v < numOfVnodes; v++) {
12,895,292✔
157
    SVnodeObj *pVnode = ppVnodes[v];
9,947,215✔
158
    disks[pVnode->diskPrimary] += 1;
9,947,215✔
159
  }
160

161
  int32_t minVal = INT_MAX;
2,948,077✔
162
  int32_t ndisk = tfsGetDisksAtLevel(pTfs, 0);
2,948,077✔
163
  diskId = 0;
2,948,077✔
164
  for (int32_t id = 0; id < ndisk; id++) {
6,093,477✔
165
    if (minVal > disks[id]) {
3,145,400✔
166
      minVal = disks[id];
2,992,588✔
167
      diskId = id;
2,992,588✔
168
    }
169
  }
170
  code = vmRegisterCreatingState(pMgmt, vgId, diskId);
2,948,077✔
171
  if (code != 0) {
2,948,077✔
172
    int32_t r = taosThreadMutexUnlock(&pMgmt->mutex);
×
173
    if (r != 0) {
×
174
      dError("vgId:%d, failed to unlock mutex since %s", vgId, tstrerror(r));
×
175
    }
176
    goto _OVER;
×
177
  }
178

179
  code = taosThreadMutexUnlock(&pMgmt->mutex);
2,948,077✔
180
  if (code != 0) {
2,948,077✔
181
    goto _OVER;
×
182
  }
183

184
_OVER:
2,948,077✔
185

186
  for (int32_t i = 0; i < numOfVnodes; ++i) {
12,895,292✔
187
    if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
9,947,215✔
188
    vmReleaseVnode(pMgmt, ppVnodes[i]);
9,947,215✔
189
  }
190
  if (ppVnodes != NULL) {
2,948,077✔
191
    taosMemoryFree(ppVnodes);
2,948,077✔
192
  }
193

194
  if (code != 0) {
2,948,077✔
195
    dError("vgId:%d, failed to alloc disk since %s", vgId, tstrerror(code));
×
196
    return code;
×
197
  } else {
198
    dInfo("vgId:%d, alloc disk:%d of level 0. ndisk:%d, vnodes: %d", vgId, diskId, ndisk, numOfVnodes);
2,948,077✔
199
    return diskId;
2,948,077✔
200
  }
201
}
202

203
void vmCleanPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) { vmUnRegisterCreatingState(pMgmt, vgId); }
2,950,621✔
204

205
SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict) {
1,655,821,732✔
206
  SVnodeObj *pVnode = NULL;
1,655,821,732✔
207

208
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
1,655,849,559✔
209
  int32_t r = taosHashGetDup(pMgmt->runngingHash, &vgId, sizeof(int32_t), (void *)&pVnode);
1,656,075,513✔
210
  if (pVnode == NULL || strict && (pVnode->dropped || pVnode->failed)) {
1,656,004,237✔
211
    terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
13,190,882✔
212
    dDebug("vgId:%d, acquire vnode failed.", vgId);
13,053,552✔
213
    pVnode = NULL;
13,052,575✔
214
  } else {
215
    int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
1,642,899,479✔
216
    dTrace("vgId:%d, acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
1,643,003,055✔
217
  }
218
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
1,656,055,630✔
219

220
  return pVnode;
1,656,080,665✔
221
}
222

223
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) { return vmAcquireVnodeImpl(pMgmt, vgId, true); }
1,651,364,944✔
224

225
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
1,683,641,258✔
226
  if (pVnode == NULL) return;
1,683,641,258✔
227

228
  //(void)taosThreadRwlockRdlock(&pMgmt->lock);
229
  int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
1,683,638,714✔
230
  dTrace("vgId:%d, release vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
1,683,672,420✔
231
  //(void)taosThreadRwlockUnlock(&pMgmt->lock);
232
}
233

234
static int32_t vmRegisterRunningState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
4,092,910✔
235
  SVnodeObj *pOld = NULL;
4,092,910✔
236
  dInfo("vgId:%d, put vnode into running hash", pVnode->vgId);
4,092,910✔
237

238
  int32_t r = taosHashGetDup(pMgmt->runngingHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
4,092,910✔
239
  if (r != 0) {
4,092,910✔
240
    dError("vgId:%d, failed to get vnode from hash", pVnode->vgId);
×
241
  }
242
  if (pOld) {
4,092,910✔
243
    vmFreeVnodeObj(&pOld);
×
244
  }
245
  int32_t code = taosHashPut(pMgmt->runngingHash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
4,092,910✔
246

247
  return code;
4,092,910✔
248
}
249

250
static void vmUnRegisterRunningState(SVnodeMgmt *pMgmt, int32_t vgId) {
4,092,910✔
251
  dInfo("vgId:%d, remove from hash", vgId);
4,092,910✔
252
  int32_t r = taosHashRemove(pMgmt->runngingHash, &vgId, sizeof(int32_t));
4,092,910✔
253
  if (r != 0) {
4,092,910✔
254
    dError("vgId:%d, failed to remove vnode since %s", vgId, tstrerror(r));
×
255
  }
256
}
4,092,910✔
257

258
static int32_t vmRegisterClosedState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
705,302✔
259
  int32_t    code = 0;
705,302✔
260
  dInfo("vgId:%d, put into closed hash", pVnode->vgId);
705,302✔
261
  SVnodeObj *pClosedVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
705,302✔
262
  if (pClosedVnode == NULL) {
705,302✔
263
    dError("failed to alloc vnode since %s", terrstr());
×
264
    return terrno;
×
265
  }
266
  (void)memset(pClosedVnode, 0, sizeof(SVnodeObj));
705,302✔
267

268
  pClosedVnode->vgId = pVnode->vgId;
705,302✔
269
  pClosedVnode->dropped = pVnode->dropped;
705,302✔
270
  pClosedVnode->vgVersion = pVnode->vgVersion;
705,302✔
271
  pClosedVnode->diskPrimary = pVnode->diskPrimary;
705,302✔
272
  pClosedVnode->toVgId = pVnode->toVgId;
705,302✔
273
  pClosedVnode->mountId = pVnode->mountId;
705,302✔
274

275
  SVnodeObj *pOld = NULL;
705,302✔
276
  int32_t    r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
705,302✔
277
  if (r != 0) {
705,302✔
278
    dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
×
279
  }
280
  if (pOld) {
705,302✔
281
    vmFreeVnodeObj(&pOld);
×
282
  }
283
  dInfo("vgId:%d, put vnode to closedHash", pVnode->vgId);
705,302✔
284
  r = taosHashPut(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), &pClosedVnode, sizeof(SVnodeObj *));
705,302✔
285
  if (r != 0) {
705,302✔
286
    dError("vgId:%d, failed to put vnode to closedHash", pVnode->vgId);
×
287
  }
288

289
  return code;
705,302✔
290
}
291

292
static void vmUnRegisterClosedState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
4,092,910✔
293
  SVnodeObj *pOld = NULL;
4,092,910✔
294
  dInfo("vgId:%d, remove from closed hash", pVnode->vgId);
4,092,910✔
295
  int32_t r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
4,092,910✔
296
  if (r != 0) {
4,092,910✔
297
    dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
×
298
  }
299
  if (pOld != NULL) {
4,092,910✔
300
    vmFreeVnodeObj(&pOld);
705,302✔
301
    dInfo("vgId:%d, remove from closedHash", pVnode->vgId);
705,302✔
302
    r = taosHashRemove(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t));
705,302✔
303
    if (r != 0) {
705,302✔
NEW
304
      if (r == TSDB_CODE_NOT_FOUND) {
×
NEW
305
        dWarn("vgId:%d, vnode not found in closedHash when unregistering", pVnode->vgId);
×
306
      } else {
NEW
307
        dError("vgId:%d, failed to remove vnode from hash when unregistering since %s", pVnode->vgId, tstrerror(r));
×
308
      }
309
    }
310
  }
311
}
4,092,910✔
312
#ifdef USE_MOUNT
313
int32_t vmAcquireMountTfs(SVnodeMgmt *pMgmt, int64_t mountId, const char *mountName, const char *mountPath,
5,080✔
314
                          STfs **ppTfs) {
315
  int32_t    code = 0, lino = 0;
5,080✔
316
  TdFilePtr  pFile = NULL;
5,080✔
317
  SArray    *pDisks = NULL;
5,080✔
318
  SMountTfs *pMountTfs = NULL;
5,080✔
319
  bool       unlock = false;
5,080✔
320

321
  pMountTfs = taosHashGet(pMgmt->mountTfsHash, &mountId, sizeof(mountId));
5,080✔
322
  if (pMountTfs && *(SMountTfs **)pMountTfs) {
5,080✔
323
    if (!(*ppTfs = (*(SMountTfs **)pMountTfs)->pTfs)) {
3,172✔
324
      TAOS_CHECK_EXIT(TSDB_CODE_INTERNAL_ERROR);
×
325
    }
326
    (void)atomic_add_fetch_32(&(*(SMountTfs **)pMountTfs)->nRef, 1);
3,172✔
327
    TAOS_RETURN(code);
3,172✔
328
  }
329
  if (!mountPath || mountPath[0] == 0 || mountId == 0) {
1,908✔
330
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_PARA);
×
331
  }
332
  (void)(taosThreadMutexLock(&pMgmt->mutex));
1,908✔
333
  unlock = true;
1,908✔
334
  pMountTfs = taosHashGet(pMgmt->mountTfsHash, &mountId, sizeof(mountId));
1,908✔
335
  if (pMountTfs && *(SMountTfs **)pMountTfs) {
1,908✔
336
    if (!(*ppTfs = (*(SMountTfs **)pMountTfs)->pTfs)) {
1,272✔
337
      TAOS_CHECK_EXIT(TSDB_CODE_INTERNAL_ERROR);
×
338
    }
339
    (void)taosThreadMutexUnlock(&pMgmt->mutex);
1,272✔
340
    (void)atomic_add_fetch_32(&(*(SMountTfs **)pMountTfs)->nRef, 1);
1,272✔
341
    TAOS_RETURN(code);
1,272✔
342
  }
343

344
  TAOS_CHECK_EXIT(vmMountCheckRunning(mountName, mountPath, &pFile, 3));
636✔
345
  TAOS_CHECK_EXIT(vmGetMountDisks(pMgmt, mountPath, &pDisks));
636✔
346
  int32_t numOfDisks = taosArrayGetSize(pDisks);
636✔
347
  if (numOfDisks <= 0) {
636✔
348
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
349
  }
350
  TSDB_CHECK_NULL((pMountTfs = taosMemoryCalloc(1, sizeof(SMountTfs))), code, lino, _exit, terrno);
636✔
351
  if (mountName) (void)snprintf(pMountTfs->name, sizeof(pMountTfs->name), "%s", mountName);
636✔
352
  if (mountPath) (void)snprintf(pMountTfs->path, sizeof(pMountTfs->path), "%s", mountPath);
636✔
353
  pMountTfs->pFile = pFile;
636✔
354
  atomic_store_32(&pMountTfs->nRef, 2);  // init and acquire
636✔
355
  TAOS_CHECK_EXIT(tfsOpen(TARRAY_GET_ELEM(pDisks, 0), numOfDisks, &pMountTfs->pTfs));
636✔
356
  TAOS_CHECK_EXIT(taosHashPut(pMgmt->mountTfsHash, &mountId, sizeof(mountId), &pMountTfs, POINTER_BYTES));
636✔
357
_exit:
636✔
358
  if (unlock) {
636✔
359
    (void)taosThreadMutexUnlock(&pMgmt->mutex);
636✔
360
  }
361
  taosArrayDestroy(pDisks);
636✔
362
  if (code != 0) {
636✔
363
    dError("mount:%" PRIi64 ",%s, failed at line %d to get mount tfs since %s", mountId, mountPath ? mountPath : "NULL",
×
364
           lino, tstrerror(code));
365
    if (pFile) {
×
366
      (void)taosUnLockFile(pFile);
×
367
      (void)taosCloseFile(&pFile);
×
368
    }
369
    if (pMountTfs) {
×
370
      tfsClose(pMountTfs->pTfs);
×
371
      taosMemoryFree(pMountTfs);
×
372
    }
373
    *ppTfs = NULL;
×
374
  } else {
375
    *ppTfs = pMountTfs->pTfs;
636✔
376
  }
377

378
  TAOS_RETURN(code);
636✔
379
}
380
#endif
381

382
bool vmReleaseMountTfs(SVnodeMgmt *pMgmt, int64_t mountId, int32_t minRef) {
5,080✔
383
#ifdef USE_MOUNT
384
  SMountTfs *pMountTfs = NULL;
5,080✔
385
  int32_t    nRef = INT32_MAX, code = 0;
5,080✔
386

387
  pMountTfs = taosHashGet(pMgmt->mountTfsHash, &mountId, sizeof(mountId));
5,080✔
388
  if (pMountTfs && *(SMountTfs **)pMountTfs) {
5,080✔
389
    if ((nRef = atomic_sub_fetch_32(&(*(SMountTfs **)pMountTfs)->nRef, 1)) <= minRef) {
5,080✔
390
      (void)(taosThreadMutexLock(&pMgmt->mutex));
634✔
391
      SMountTfs *pTmp = taosHashGet(pMgmt->mountTfsHash, &mountId, sizeof(mountId));
634✔
392
      if (pTmp && *(SMountTfs **)pTmp) {
634✔
393
        dInfo("mount:%" PRIi64 ", ref:%d, release mount tfs", mountId, nRef);
634✔
394
        tfsClose((*(SMountTfs **)pTmp)->pTfs);
634✔
395
        if ((*(SMountTfs **)pTmp)->pFile) {
634✔
396
          (void)taosUnLockFile((*(SMountTfs **)pTmp)->pFile);
634✔
397
          (void)taosCloseFile(&(*(SMountTfs **)pTmp)->pFile);
634✔
398
        }
399
        taosMemoryFree(*(SMountTfs **)pTmp);
634✔
400
        if ((code = taosHashRemove(pMgmt->mountTfsHash, &mountId, sizeof(mountId))) < 0) {
634✔
401
          dError("failed at line %d to remove mountId:%" PRIi64 " from mount tfs hash", __LINE__, mountId);
×
402
        }
403
      }
404
      (void)taosThreadMutexUnlock(&pMgmt->mutex);
634✔
405
      return true;
634✔
406
    }
407
  }
408
#endif
409
  return false;
4,446✔
410
}
411

412
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
4,092,114✔
413
  SVnodeObj *pVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
4,092,114✔
414
  if (pVnode == NULL) {
4,092,552✔
415
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
416
    return -1;
×
417
  }
418

419
  pVnode->vgId = pCfg->vgId;
4,092,552✔
420
  pVnode->vgVersion = pCfg->vgVersion;
4,092,552✔
421
  pVnode->diskPrimary = pCfg->diskPrimary;
4,092,910✔
422
  pVnode->mountId = pCfg->mountId;
4,092,552✔
423
  pVnode->refCount = 0;
4,092,910✔
424
  pVnode->dropped = 0;
4,092,552✔
425
  pVnode->failed = 0;
4,092,552✔
426
  pVnode->path = taosStrdup(pCfg->path);
4,092,545✔
427
  pVnode->pImpl = pImpl;
4,092,910✔
428

429
  if (pVnode->path == NULL) {
4,092,552✔
430
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
431
    taosMemoryFree(pVnode);
×
432
    return -1;
×
433
  }
434

435
  if (pImpl) {
4,092,545✔
436
    if (vmAllocQueue(pMgmt, pVnode) != 0) {
4,092,545✔
437
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
438
      taosMemoryFree(pVnode->path);
×
439
      taosMemoryFree(pVnode);
×
440
      return -1;
×
441
    }
442
  } else {
443
    pVnode->failed = 1;
×
444
  }
445

446
  (void)taosThreadRwlockWrlock(&pMgmt->hashLock);
4,092,910✔
447
  int32_t code = vmRegisterRunningState(pMgmt, pVnode);
4,092,910✔
448
  vmUnRegisterClosedState(pMgmt, pVnode);
4,092,910✔
449
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
4,092,910✔
450

451
  TAOS_RETURN(code);
4,092,910✔
452
}
453

454
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal, bool keepClosed) {
4,092,532✔
455
  char path[TSDB_FILENAME_LEN] = {0};
4,092,532✔
456
  bool atExit = true;
4,092,910✔
457

458
  if (pVnode->pImpl && vnodeIsLeader(pVnode->pImpl)) {
4,092,910✔
459
    vnodeProposeCommitOnNeed(pVnode->pImpl, atExit);
3,160,225✔
460
  }
461

462
  (void)taosThreadRwlockWrlock(&pMgmt->hashLock);
4,092,223✔
463
  vmUnRegisterRunningState(pMgmt, pVnode->vgId);
4,092,910✔
464
  if (keepClosed) {
4,092,910✔
465
    if (vmRegisterClosedState(pMgmt, pVnode) != 0) {
705,302✔
466
      (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
467
      return;
×
468
    };
469
  }
470
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
4,092,910✔
471

472
  vmReleaseVnode(pMgmt, pVnode);
4,092,910✔
473

474
  if (pVnode->failed) {
4,092,910✔
475
    goto _closed;
×
476
  }
477
  dInfo("vgId:%d, pre close", pVnode->vgId);
4,092,910✔
478
  vnodePreClose(pVnode->pImpl);
4,092,910✔
479

480
  dInfo("vgId:%d, wait for vnode ref become 0", pVnode->vgId);
4,092,910✔
481
  while (pVnode->refCount > 0) taosMsleep(10);
4,092,910✔
482

483
  dInfo("vgId:%d, wait for vnode write queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteW.queue,
4,092,910✔
484
        taosQueueGetThreadId(pVnode->pWriteW.queue));
485
  tMultiWorkerCleanup(&pVnode->pWriteW);
4,092,910✔
486

487
  dInfo("vgId:%d, wait for vnode sync queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue,
4,092,910✔
488
        taosQueueGetThreadId(pVnode->pSyncW.queue));
489
  tMultiWorkerCleanup(&pVnode->pSyncW);
4,092,910✔
490

491
  dInfo("vgId:%d, wait for vnode sync rd queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncRdW.queue,
4,092,910✔
492
        taosQueueGetThreadId(pVnode->pSyncRdW.queue));
493
  tMultiWorkerCleanup(&pVnode->pSyncRdW);
4,092,910✔
494

495
  dInfo("vgId:%d, wait for vnode apply queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue,
4,092,910✔
496
        taosQueueGetThreadId(pVnode->pApplyW.queue));
497
  tMultiWorkerCleanup(&pVnode->pApplyW);
4,092,910✔
498

499
  dInfo("vgId:%d, wait for vnode fetch queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
4,092,816✔
500
        taosQueueGetThreadId(pVnode->pFetchQ));
501
  while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
4,092,910✔
502

503
  dInfo("vgId:%d, wait for vnode query queue:%p is empty", pVnode->vgId, pVnode->pQueryQ);
4,092,910✔
504
  while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
4,092,910✔
505

506
  dInfo("vgId:%d, wait for vnode stream reader queue:%p is empty", pVnode->vgId, pVnode->pStreamReaderQ);
4,092,910✔
507
  while (!taosQueueEmpty(pVnode->pStreamReaderQ)) taosMsleep(10);
4,092,910✔
508

509
  dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId);
4,092,910✔
510

511
  dInfo("vgId:%d, post close", pVnode->vgId);
4,092,625✔
512
  vnodePostClose(pVnode->pImpl);
4,092,910✔
513

514
  vmFreeQueue(pMgmt, pVnode);
4,092,910✔
515

516
  if (commitAndRemoveWal) {
4,092,910✔
517
    dInfo("vgId:%d, commit data for vnode split", pVnode->vgId);
20,049✔
518
    if (vnodeSyncCommit(pVnode->pImpl) != 0) {
20,049✔
519
      dError("vgId:%d, failed to commit data", pVnode->vgId);
×
520
    }
521
    if (vnodeBegin(pVnode->pImpl) != 0) {
20,049✔
522
      dError("vgId:%d, failed to begin", pVnode->vgId);
×
523
    }
524
    dInfo("vgId:%d, commit data finished", pVnode->vgId);
20,049✔
525
  }
526

527
  int32_t nodeId = vnodeNodeId(pVnode->pImpl);
4,092,910✔
528
  vnodeClose(pVnode->pImpl);
4,092,910✔
529
  pVnode->pImpl = NULL;
4,088,887✔
530

531
_closed:
4,088,876✔
532
  dInfo("vgId:%d, vnode is closed", pVnode->vgId);
4,088,226✔
533

534
  if (commitAndRemoveWal) {
4,092,910✔
535
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d%swal", TD_DIRSEP, pVnode->vgId, TD_DIRSEP);
20,049✔
536
    dInfo("vgId:%d, remove all wals, path:%s", pVnode->vgId, path);
20,049✔
537
    if (tfsRmdir(pMgmt->pTfs, path) != 0) {
20,049✔
538
      dTrace("vgId:%d, failed to remove wals, path:%s", pVnode->vgId, path);
×
539
    }
540
    if (tfsMkdir(pMgmt->pTfs, path) != 0) {
20,049✔
541
      dTrace("vgId:%d, failed to create wals, path:%s", pVnode->vgId, path);
×
542
    }
543
  }
544

545
  if (pVnode->dropped) {
4,092,910✔
546
    dInfo("vgId:%d, vnode is destroyed, dropped:%d", pVnode->vgId, pVnode->dropped);
1,587,067✔
547
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId);
1,587,067✔
548
    vnodeDestroy(pVnode->vgId, path, pMgmt->pTfs, nodeId);
1,587,067✔
549
  }
550
  if (pVnode->mountId && vmReleaseMountTfs(pMgmt, pVnode->mountId, pVnode->dropped ? 1 : 0)) {
4,092,910✔
551
    if (vmWriteMountListToFile(pMgmt) != 0) {
634✔
552
      dError("vgId:%d, failed at line %d to write mount list since %s", pVnode->vgId, __LINE__, terrstr());
×
553
    }
554
  }
555

556
  vmFreeVnodeObj(&pVnode);
4,092,535✔
557
}
558

559
void vmCloseFailedVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
×
560
  int32_t r = 0;
×
561
  r = taosThreadRwlockWrlock(&pMgmt->hashLock);
×
562
  if (r != 0) {
×
563
    dError("vgId:%d, failed to lock since %s", vgId, tstrerror(r));
×
564
  }
565
  if (r == 0) {
×
566
    vmUnRegisterRunningState(pMgmt, vgId);
×
567
  }
568
  r = taosThreadRwlockUnlock(&pMgmt->hashLock);
×
569
  if (r != 0) {
×
570
    dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r));
×
571
  }
572
}
×
573

574
static int32_t vmRestoreVgroupId(SWrapperCfg *pCfg, STfs *pTfs) {
×
575
  int32_t srcVgId = pCfg->vgId;
×
576
  int32_t dstVgId = pCfg->toVgId;
×
577
  if (dstVgId == 0) return 0;
×
578

579
  char srcPath[TSDB_FILENAME_LEN];
×
580
  char dstPath[TSDB_FILENAME_LEN];
×
581

582
  snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
×
583
  snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
×
584

585
  int32_t diskPrimary = pCfg->diskPrimary;
×
586
  int32_t vgId = vnodeRestoreVgroupId(srcPath, dstPath, srcVgId, dstVgId, diskPrimary, pTfs);
×
587
  if (vgId <= 0) {
×
588
    dError("vgId:%d, failed to restore vgroup id. srcPath: %s", pCfg->vgId, srcPath);
×
589
    return -1;
×
590
  }
591

592
  pCfg->vgId = vgId;
×
593
  pCfg->toVgId = 0;
×
594
  return 0;
×
595
}
596

597
static void *vmOpenVnodeInThread(void *param) {
412,892✔
598
  SVnodeThread *pThread = param;
412,892✔
599
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
412,892✔
600
  char          path[TSDB_FILENAME_LEN];
411,972✔
601

602
  dInfo("thread:%d, start to open or destroy %d vnodes", pThread->threadIndex, pThread->vnodeNum);
412,892✔
603
  setThreadName("open-vnodes");
412,892✔
604

605
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
829,830✔
606
    SWrapperCfg *pCfg = &pThread->pCfgs[v];
416,938✔
607
    if (pCfg->dropped) {
416,938✔
608
      char stepDesc[TSDB_STEP_DESC_LEN] = {0};
×
609
      snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to destroy, %d of %d have been dropped", pCfg->vgId,
×
610
               pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
611
      tmsgReportStartup("vnode-destroy", stepDesc);
×
612

613
      snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
×
614
      vnodeDestroy(pCfg->vgId, path, pMgmt->pTfs, 0);
×
615
      pThread->updateVnodesList = true;
×
616
      pThread->dropped++;
×
617
      (void)atomic_add_fetch_32(&pMgmt->state.dropVnodes, 1);
×
618
      continue;
×
619
    }
620

621
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
416,938✔
622
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
416,938✔
623
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
624
    tmsgReportStartup("vnode-open", stepDesc);
416,938✔
625

626
    if (pCfg->toVgId) {
416,938✔
627
      if (vmRestoreVgroupId(pCfg, pMgmt->pTfs) != 0) {
×
628
        dError("vgId:%d, failed to restore vgroup id by thread:%d", pCfg->vgId, pThread->threadIndex);
×
629
        pThread->failed++;
×
630
        continue;
×
631
      }
632
      pThread->updateVnodesList = true;
×
633
    }
634

635
    int32_t diskPrimary = pCfg->mountId == 0 ? pCfg->diskPrimary : 0;
416,938✔
636
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
416,938✔
637

638
    STfs *pMountTfs = NULL;
416,938✔
639
#ifdef USE_MOUNT
640
    bool releaseTfs = false;
416,938✔
641
    if (pCfg->mountId) {
416,938✔
642
      if (vmAcquireMountTfs(pMgmt, pCfg->mountId, NULL, NULL, &pMountTfs) != 0) {
2,536✔
643
        dError("vgId:%d, failed to get mount tfs by thread:%d", pCfg->vgId, pThread->threadIndex);
×
644
        pThread->failed++;
×
645
        continue;
×
646
      }
647
      releaseTfs = true;
2,536✔
648
    }
649
#endif
650

651
    SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMountTfs, pMgmt->msgCb, false);
416,938✔
652

653
    if (pImpl == NULL) {
416,938✔
654
      dError("vgId:%d, failed to open vnode by thread:%d since %s", pCfg->vgId, pThread->threadIndex, terrstr());
×
655
      if (terrno != TSDB_CODE_NEED_RETRY) {
×
656
        pThread->failed++;
×
657
#ifdef USE_MOUNT
658
        if (releaseTfs) vmReleaseMountTfs(pMgmt, pCfg->mountId, 0);
×
659
#endif
660
        continue;
×
661
      }
662
    }
663

664
    if (pImpl != NULL) {
416,938✔
665
      if (vmOpenVnode(pMgmt, pCfg, pImpl) != 0) {
416,938✔
666
        dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
×
667
        pThread->failed++;
×
668
#ifdef USE_MOUNT
669
        if (releaseTfs) vmReleaseMountTfs(pMgmt, pCfg->mountId, 0);
×
670
#endif
671
        continue;
×
672
      }
673
    }
674

675
    dInfo("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
416,938✔
676
    pThread->opened++;
416,938✔
677
    (void)atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
416,938✔
678
  }
679

680
  dInfo("thread:%d, numOfVnodes:%d, opened:%d dropped:%d failed:%d", pThread->threadIndex, pThread->vnodeNum,
410,001✔
681
        pThread->opened, pThread->dropped, pThread->failed);
682
  return NULL;
410,001✔
683
}
684

685
#ifdef USE_MOUNT
686
static int32_t vmOpenMountTfs(SVnodeMgmt *pMgmt) {
667,629✔
687
  int32_t    code = 0, lino = 0;
667,629✔
688
  int32_t    numOfMounts = 0;
667,629✔
689
  SMountCfg *pMountCfgs = NULL;
667,629✔
690
  SArray    *pDisks = NULL;
667,629✔
691
  TdFilePtr  pFile = NULL;
667,629✔
692
  SMountTfs *pMountTfs = NULL;
667,629✔
693

694
  TAOS_CHECK_EXIT(vmGetMountListFromFile(pMgmt, &pMountCfgs, &numOfMounts));
667,629✔
695
  for (int32_t i = 0; i < numOfMounts; ++i) {
668,263✔
696
    SMountCfg *pCfg = &pMountCfgs[i];
634✔
697
    if (taosHashGet(pMgmt->mountTfsHash, &pCfg->mountId, sizeof(pCfg->mountId))) {
634✔
698
      TAOS_CHECK_EXIT(TSDB_CODE_INTERNAL_ERROR);
×
699
    }
700
    TAOS_CHECK_EXIT(vmMountCheckRunning(pCfg->name, pCfg->path, &pFile, 3));
634✔
701
    TAOS_CHECK_EXIT(vmGetMountDisks(pMgmt, pCfg->path, &pDisks));
634✔
702
    int32_t nDisks = taosArrayGetSize(pDisks);
634✔
703
    if (nDisks < 1 || nDisks > TFS_MAX_DISKS) {
634✔
704
      dError("mount:%s, %" PRIi64 ", %s, invalid number of disks:%d, expect 1 to %d", pCfg->name, pCfg->mountId,
×
705
             pCfg->path, nDisks, TFS_MAX_DISKS);
706
      TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
707
    }
708

709
    TSDB_CHECK_NULL((pMountTfs = taosMemoryCalloc(1, sizeof(SMountTfs))), code, lino, _exit, terrno);
634✔
710
    TAOS_CHECK_EXIT(tfsOpen(TARRAY_GET_ELEM(pDisks, 0), TARRAY_SIZE(pDisks), &pMountTfs->pTfs));
634✔
711
    (void)snprintf(pMountTfs->name, sizeof(pMountTfs->name), "%s", pCfg->name);
634✔
712
    (void)snprintf(pMountTfs->path, sizeof(pMountTfs->path), "%s", pCfg->path);
634✔
713
    pMountTfs->pFile = pFile;
634✔
714
    pMountTfs->nRef = 1;
634✔
715
    TAOS_CHECK_EXIT(taosHashPut(pMgmt->mountTfsHash, &pCfg->mountId, sizeof(pCfg->mountId), &pMountTfs, POINTER_BYTES));
634✔
716
    taosArrayDestroy(pDisks);
634✔
717
    pDisks = NULL;
634✔
718
    pMountTfs = NULL;
634✔
719
    pFile = NULL;
634✔
720
  }
721
_exit:
667,629✔
722
  if (code != 0) {
667,629✔
723
    dError("failed to open mount tfs at line %d since %s", lino, tstrerror(code));
×
724
    if (pFile) {
×
725
      (void)taosUnLockFile(pFile);
×
726
      (void)taosCloseFile(&pFile);
×
727
    }
728
    if (pMountTfs) {
×
729
      tfsClose(pMountTfs->pTfs);
×
730
      taosMemoryFree(pMountTfs);
×
731
    }
732
    taosArrayDestroy(pDisks);
×
733
  }
734
  taosMemoryFree(pMountCfgs);
667,629✔
735
  TAOS_RETURN(code);
667,629✔
736
}
737
#endif
738
static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
667,629✔
739
  pMgmt->runngingHash =
667,629✔
740
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
667,629✔
741
  if (pMgmt->runngingHash == NULL) {
667,629✔
742
    dError("failed to init vnode hash since %s", terrstr());
×
743
    return TSDB_CODE_OUT_OF_MEMORY;
×
744
  }
745

746
  pMgmt->closedHash =
667,629✔
747
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
667,629✔
748
  if (pMgmt->closedHash == NULL) {
667,629✔
749
    dError("failed to init vnode closed hash since %s", terrstr());
×
750
    return TSDB_CODE_OUT_OF_MEMORY;
×
751
  }
752

753
  pMgmt->creatingHash =
667,629✔
754
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
667,629✔
755
  if (pMgmt->creatingHash == NULL) {
667,629✔
756
    dError("failed to init vnode creatingHash hash since %s", terrstr());
×
757
    return TSDB_CODE_OUT_OF_MEMORY;
×
758
  }
759

760
  SWrapperCfg *pCfgs = NULL;
667,629✔
761
  int32_t      numOfVnodes = 0;
667,629✔
762
  int32_t      code = 0;
667,629✔
763
  if ((code = vmGetVnodeListFromFile(pMgmt, &pCfgs, &numOfVnodes)) != 0) {
667,629✔
764
    dInfo("failed to get vnode list from disk since %s", tstrerror(code));
×
765
    return code;
×
766
  }
767

768
  pMgmt->state.totalVnodes = numOfVnodes;
667,629✔
769

770
  int32_t threadNum = tsNumOfCores / 2;
667,629✔
771
  if (threadNum < 1) threadNum = 1;
667,629✔
772
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
667,629✔
773

774
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
667,629✔
775
  if (threads == NULL) {
667,629✔
776
    dError("failed to allocate memory for threads since %s", terrstr());
×
777
    taosMemoryFree(pCfgs);
×
778
    return terrno;
×
779
  }
780

781
  for (int32_t t = 0; t < threadNum; ++t) {
14,020,209✔
782
    threads[t].threadIndex = t;
13,352,580✔
783
    threads[t].pMgmt = pMgmt;
13,352,580✔
784
    threads[t].pCfgs = taosMemoryCalloc(vnodesPerThread, sizeof(SWrapperCfg));
13,352,580✔
785
  }
786

787
  for (int32_t v = 0; v < numOfVnodes; ++v) {
1,084,567✔
788
    int32_t       t = v % threadNum;
416,938✔
789
    SVnodeThread *pThread = &threads[t];
416,938✔
790
    pThread->pCfgs[pThread->vnodeNum++] = pCfgs[v];
416,938✔
791
  }
792

793
  dInfo("open %d vnodes with %d threads", numOfVnodes, threadNum);
667,629✔
794

795
  for (int32_t t = 0; t < threadNum; ++t) {
14,020,209✔
796
    SVnodeThread *pThread = &threads[t];
13,352,580✔
797
    if (pThread->vnodeNum == 0) continue;
13,352,580✔
798

799
    TdThreadAttr thAttr;
411,972✔
800
    (void)taosThreadAttrInit(&thAttr);
412,892✔
801
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
412,892✔
802
#ifdef TD_COMPACT_OS
803
    (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
804
#endif
805
    if (taosThreadCreate(&pThread->thread, &thAttr, vmOpenVnodeInThread, pThread) != 0) {
412,892✔
806
      dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(ERRNO));
×
807
    }
808

809
    (void)taosThreadAttrDestroy(&thAttr);
412,892✔
810
  }
811

812
  bool updateVnodesList = false;
667,629✔
813

814
  for (int32_t t = 0; t < threadNum; ++t) {
14,020,209✔
815
    SVnodeThread *pThread = &threads[t];
13,352,580✔
816
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
13,352,580✔
817
      (void)taosThreadJoin(pThread->thread, NULL);
412,892✔
818
      taosThreadClear(&pThread->thread);
412,892✔
819
    }
820
    taosMemoryFree(pThread->pCfgs);
13,352,580✔
821
    if (pThread->updateVnodesList) updateVnodesList = true;
13,352,580✔
822
  }
823
  taosMemoryFree(threads);
667,629✔
824
  taosMemoryFree(pCfgs);
667,629✔
825

826
  if ((pMgmt->state.openVnodes + pMgmt->state.dropVnodes) != pMgmt->state.totalVnodes) {
667,629✔
827
    dError("there are total vnodes:%d, opened:%d", pMgmt->state.totalVnodes, pMgmt->state.openVnodes);
×
828
    return terrno = TSDB_CODE_VND_INIT_FAILED;
×
829
  }
830

831
  if (updateVnodesList && (code = vmWriteVnodeListToFile(pMgmt)) != 0) {
667,629✔
832
    dError("failed to write vnode list since %s", tstrerror(code));
×
833
    return code;
×
834
  }
835

836
#ifdef USE_MOUNT
837
  bool  updateMountList = false;
667,629✔
838
  void *pIter = NULL;
667,629✔
839
  while ((pIter = taosHashIterate(pMgmt->mountTfsHash, pIter))) {
668,263✔
840
    SMountTfs *pMountTfs = *(SMountTfs **)pIter;
634✔
841
    if (pMountTfs && atomic_load_32(&pMountTfs->nRef) <= 1) {
634✔
842
      size_t  keyLen = 0;
×
843
      int64_t mountId = *(int64_t *)taosHashGetKey(pIter, &keyLen);
×
844
      dInfo("mount:%s, %s, %" PRIi64 ", ref:%d, remove unused mount tfs", pMountTfs->name, pMountTfs->path, mountId,
×
845
            atomic_load_32(&pMountTfs->nRef));
846
      if (pMountTfs->pFile) {
×
847
        (void)taosUnLockFile(pMountTfs->pFile);
×
848
        (void)taosCloseFile(&pMountTfs->pFile);
×
849
      }
850
      tfsClose(pMountTfs->pTfs);
×
851
      taosMemoryFree(pMountTfs);
×
852
      if ((code = taosHashRemove(pMgmt->mountTfsHash, &mountId, keyLen)) != 0) {
×
853
        dWarn("failed at line %d to remove mount:%s, %s, %" PRIi64 " from mount tfs hash since %s", __LINE__,
×
854
              pMountTfs->name, pMountTfs->path, mountId, tstrerror(code));
855
      }
856
      updateMountList = true;
×
857
    }
858
  }
859
  if (updateMountList && (code = vmWriteMountListToFile(pMgmt)) != 0) {
667,629✔
860
    dError("failed to write mount list at line %d since %s", __LINE__, tstrerror(code));
×
861
    return code;
×
862
  }
863
#endif
864

865
  dInfo("successfully opened %d vnodes", pMgmt->state.totalVnodes);
667,629✔
866
  return 0;
667,629✔
867
}
868

869
static void *vmCloseVnodeInThread(void *param) {
1,731,085✔
870
  SVnodeThread *pThread = param;
1,731,085✔
871
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
1,731,085✔
872

873
  dInfo("thread:%d, start to close %d vnodes", pThread->threadIndex, pThread->vnodeNum);
1,731,085✔
874
  setThreadName("close-vnodes");
1,731,085✔
875

876
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
3,510,961✔
877
    SVnodeObj *pVnode = pThread->ppVnodes[v];
1,780,492✔
878

879
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
1,780,492✔
880
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to close, %d of %d have been closed", pVnode->vgId,
1,780,189✔
881
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
882
    tmsgReportStartup("vnode-close", stepDesc);
1,780,492✔
883

884
    vmCloseVnode(pMgmt, pVnode, false, false);
1,780,492✔
885
  }
886

887
  dInfo("thread:%d, numOfVnodes:%d is closed", pThread->threadIndex, pThread->vnodeNum);
1,731,085✔
888
  return NULL;
1,731,085✔
889
}
890

891
static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
667,629✔
892
  int32_t code = 0;
667,629✔
893
  dInfo("start to close all vnodes");
667,629✔
894
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
667,629✔
895
  dInfo("vnodes mgmt worker is stopped");
667,629✔
896
  tSingleWorkerCleanup(&pMgmt->mgmtMultiWorker);
667,629✔
897
  dInfo("vnodes multiple mgmt worker is stopped");
667,629✔
898

899
  int32_t     numOfVnodes = 0;
667,629✔
900
  SVnodeObj **ppVnodes = NULL;
667,629✔
901
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
667,629✔
902
  if (code != 0) {
667,629✔
903
    dError("failed to get vnode list since %s", tstrerror(code));
×
904
    return;
×
905
  }
906

907
  int32_t threadNum = tsNumOfCores / 2;
667,629✔
908
  if (threadNum < 1) threadNum = 1;
667,629✔
909
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
667,629✔
910

911
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
667,629✔
912
  for (int32_t t = 0; t < threadNum; ++t) {
14,020,209✔
913
    threads[t].threadIndex = t;
13,352,580✔
914
    threads[t].pMgmt = pMgmt;
13,352,580✔
915
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
13,352,580✔
916
  }
917

918
  for (int32_t v = 0; v < numOfVnodes; ++v) {
2,448,121✔
919
    int32_t       t = v % threadNum;
1,780,492✔
920
    SVnodeThread *pThread = &threads[t];
1,780,492✔
921
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
1,780,492✔
922
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
1,780,492✔
923
    }
924
  }
925

926
  pMgmt->state.openVnodes = 0;
667,629✔
927
  dInfo("close %d vnodes with %d threads", numOfVnodes, threadNum);
667,629✔
928

929
  for (int32_t t = 0; t < threadNum; ++t) {
14,020,209✔
930
    SVnodeThread *pThread = &threads[t];
13,352,580✔
931
    if (pThread->vnodeNum == 0) continue;
13,352,580✔
932

933
    TdThreadAttr thAttr;
1,727,216✔
934
    (void)taosThreadAttrInit(&thAttr);
1,731,085✔
935
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,731,085✔
936
#ifdef TD_COMPACT_OS
937
    (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
938
#endif
939
    if (taosThreadCreate(&pThread->thread, &thAttr, vmCloseVnodeInThread, pThread) != 0) {
1,731,085✔
940
      dError("thread:%d, failed to create thread to close vnode since %s", pThread->threadIndex, strerror(ERRNO));
×
941
    }
942

943
    (void)taosThreadAttrDestroy(&thAttr);
1,731,085✔
944
  }
945

946
  for (int32_t t = 0; t < threadNum; ++t) {
14,020,209✔
947
    SVnodeThread *pThread = &threads[t];
13,352,580✔
948
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
13,352,580✔
949
      (void)taosThreadJoin(pThread->thread, NULL);
1,731,085✔
950
      taosThreadClear(&pThread->thread);
1,731,085✔
951
    }
952
    taosMemoryFree(pThread->ppVnodes);
13,352,580✔
953
  }
954
  taosMemoryFree(threads);
667,629✔
955

956
  if (ppVnodes != NULL) {
667,629✔
957
    taosMemoryFree(ppVnodes);
667,629✔
958
  }
959

960
  if (pMgmt->runngingHash != NULL) {
667,629✔
961
    taosHashCleanup(pMgmt->runngingHash);
667,629✔
962
    pMgmt->runngingHash = NULL;
667,629✔
963
  }
964

965
  void *pIter = taosHashIterate(pMgmt->closedHash, NULL);
667,629✔
966
  while (pIter) {
667,629✔
967
    SVnodeObj **ppVnode = pIter;
×
968
    vmFreeVnodeObj(ppVnode);
×
969
    pIter = taosHashIterate(pMgmt->closedHash, pIter);
×
970
  }
971

972
  if (pMgmt->closedHash != NULL) {
667,629✔
973
    taosHashCleanup(pMgmt->closedHash);
667,629✔
974
    pMgmt->closedHash = NULL;
667,629✔
975
  }
976

977
  pIter = taosHashIterate(pMgmt->creatingHash, NULL);
667,629✔
978
  while (pIter) {
667,629✔
979
    SVnodeObj **ppVnode = pIter;
×
980
    vmFreeVnodeObj(ppVnode);
×
981
    pIter = taosHashIterate(pMgmt->creatingHash, pIter);
×
982
  }
983

984
  if (pMgmt->creatingHash != NULL) {
667,629✔
985
    taosHashCleanup(pMgmt->creatingHash);
667,629✔
986
    pMgmt->creatingHash = NULL;
667,629✔
987
  }
988

989
#ifdef USE_MOUNT
990
  pIter = NULL;
667,629✔
991
  while ((pIter = taosHashIterate(pMgmt->mountTfsHash, pIter))) {
668,265✔
992
    SMountTfs *mountTfs = *(SMountTfs **)pIter;
636✔
993
    if (mountTfs->pFile) {
636✔
994
      (void)taosUnLockFile(mountTfs->pFile);
636✔
995
      (void)taosCloseFile(&mountTfs->pFile);
636✔
996
    }
997
    tfsClose(mountTfs->pTfs);
636✔
998
    taosMemoryFree(mountTfs);
636✔
999
  }
1000
  taosHashCleanup(pMgmt->mountTfsHash);
667,629✔
1001
  pMgmt->mountTfsHash = NULL;
667,629✔
1002
#endif
1003

1004
  dInfo("total vnodes:%d are all closed", numOfVnodes);
667,629✔
1005
}
1006

1007
static void vmCleanup(SVnodeMgmt *pMgmt) {
667,629✔
1008
  vmCloseVnodes(pMgmt);
667,629✔
1009
  vmStopWorker(pMgmt);
667,629✔
1010
  vnodeCleanup();
667,629✔
1011
  (void)taosThreadRwlockDestroy(&pMgmt->hashLock);
667,629✔
1012
  (void)taosThreadMutexDestroy(&pMgmt->mutex);
667,629✔
1013
  taosMemoryFree(pMgmt);
667,629✔
1014
}
667,629✔
1015

1016
static void vmCheckSyncTimeout(SVnodeMgmt *pMgmt) {
1,166,792✔
1017
  int32_t     code = 0;
1,166,792✔
1018
  int32_t     numOfVnodes = 0;
1,166,792✔
1019
  SVnodeObj **ppVnodes = NULL;
1,166,792✔
1020
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
1,166,792✔
1021
  if (code != 0) {
1,166,792✔
1022
    dError("failed to get vnode list since %s", tstrerror(code));
×
1023
    return;
×
1024
  }
1025

1026
  if (ppVnodes != NULL) {
1,166,792✔
1027
    for (int32_t i = 0; i < numOfVnodes; ++i) {
4,714,686✔
1028
      SVnodeObj *pVnode = ppVnodes[i];
3,547,894✔
1029
      if (!pVnode->failed) {
3,547,894✔
1030
        vnodeSyncCheckTimeout(pVnode->pImpl);
3,547,894✔
1031
      }
1032
      vmReleaseVnode(pMgmt, pVnode);
3,547,894✔
1033
    }
1034
    taosMemoryFree(ppVnodes);
1,166,792✔
1035
  }
1036
}
1037

1038
static void *vmThreadFp(void *param) {
667,629✔
1039
  SVnodeMgmt *pMgmt = param;
667,629✔
1040
  int64_t     lastTime = 0;
667,629✔
1041
  setThreadName("vnode-timer");
667,629✔
1042

1043
  while (1) {
433,271,246✔
1044
    lastTime++;
433,938,875✔
1045
    taosMsleep(100);
433,938,875✔
1046
    if (pMgmt->stop) break;
433,938,875✔
1047
    if (lastTime % 10 != 0) continue;
433,271,246✔
1048

1049
    int64_t sec = lastTime / 10;
43,018,198✔
1050
    if (sec % (VNODE_TIMEOUT_SEC / 2) == 0) {
43,018,198✔
1051
      vmCheckSyncTimeout(pMgmt);
1,166,792✔
1052
    }
1053
  }
1054

1055
  return NULL;
667,629✔
1056
}
1057

1058
static int32_t vmInitTimer(SVnodeMgmt *pMgmt) {
667,629✔
1059
  int32_t      code = 0;
667,629✔
1060
  TdThreadAttr thAttr;
663,868✔
1061
  (void)taosThreadAttrInit(&thAttr);
667,629✔
1062
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
667,629✔
1063
#ifdef TD_COMPACT_OS
1064
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
1065
#endif
1066
  if (taosThreadCreate(&pMgmt->thread, &thAttr, vmThreadFp, pMgmt) != 0) {
667,629✔
1067
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
1068
    dError("failed to create vnode timer thread since %s", tstrerror(code));
×
1069
    return code;
×
1070
  }
1071

1072
  (void)taosThreadAttrDestroy(&thAttr);
667,629✔
1073
  return 0;
667,629✔
1074
}
1075

1076
static void vmCleanupTimer(SVnodeMgmt *pMgmt) {
667,629✔
1077
  pMgmt->stop = true;
667,629✔
1078
  if (taosCheckPthreadValid(pMgmt->thread)) {
667,629✔
1079
    (void)taosThreadJoin(pMgmt->thread, NULL);
667,629✔
1080
    taosThreadClear(&pMgmt->thread);
667,629✔
1081
  }
1082
}
667,629✔
1083

1084
static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
667,629✔
1085
  int32_t code = -1;
667,629✔
1086

1087
  SVnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SVnodeMgmt));
667,629✔
1088
  if (pMgmt == NULL) {
667,629✔
1089
    code = terrno;
×
1090
    goto _OVER;
×
1091
  }
1092

1093
  pMgmt->pData = pInput->pData;
667,629✔
1094
  pMgmt->path = pInput->path;
667,629✔
1095
  pMgmt->name = pInput->name;
667,629✔
1096
  pMgmt->msgCb = pInput->msgCb;
667,629✔
1097
  pMgmt->msgCb.putToQueueFp = (PutToQueueFp)vmPutRpcMsgToQueue;
667,629✔
1098
  pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
667,629✔
1099
  pMgmt->msgCb.mgmt = pMgmt;
667,629✔
1100

1101
  code = taosThreadRwlockInit(&pMgmt->hashLock, NULL);
667,629✔
1102
  if (code != 0) {
667,629✔
1103
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
1104
    goto _OVER;
×
1105
  }
1106

1107
  code = taosThreadMutexInit(&pMgmt->mutex, NULL);
667,629✔
1108
  if (code != 0) {
667,629✔
1109
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
1110
    goto _OVER;
×
1111
  }
1112

1113
  pMgmt->pTfs = pInput->pTfs;
667,629✔
1114
  if (pMgmt->pTfs == NULL) {
667,629✔
1115
    dError("tfs is null.");
×
1116
    goto _OVER;
×
1117
  }
1118
#ifdef USE_MOUNT
1119
  if (!(pMgmt->mountTfsHash =
667,629✔
1120
            taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK))) {
667,629✔
1121
    dError("failed to init mountTfsHash since %s", terrstr());
×
1122
    return TSDB_CODE_OUT_OF_MEMORY;
×
1123
  }
1124
  if ((code = vmOpenMountTfs(pMgmt)) != 0) {
667,629✔
1125
    goto _OVER;
×
1126
  }
1127
#endif
1128
  tmsgReportStartup("vnode-tfs", "initialized");
667,629✔
1129
  if ((code = walInit(pInput->stopDnodeFp)) != 0) {
667,629✔
1130
    dError("failed to init wal since %s", tstrerror(code));
×
1131
    goto _OVER;
×
1132
  }
1133

1134
  tmsgReportStartup("vnode-wal", "initialized");
667,629✔
1135

1136
  if ((code = syncInit()) != 0) {
667,629✔
1137
    dError("failed to open sync since %s", tstrerror(code));
×
1138
    goto _OVER;
×
1139
  }
1140
  tmsgReportStartup("vnode-sync", "initialized");
667,629✔
1141

1142
  if ((code = vnodeInit(pInput->stopDnodeFp)) != 0) {
667,629✔
1143
    dError("failed to init vnode since %s", tstrerror(code));
×
1144
    goto _OVER;
×
1145
  }
1146
  tmsgReportStartup("vnode-commit", "initialized");
667,629✔
1147

1148
  if ((code = vmStartWorker(pMgmt)) != 0) {
667,629✔
1149
    dError("failed to init workers since %s", tstrerror(code));
×
1150
    goto _OVER;
×
1151
  }
1152
  tmsgReportStartup("vnode-worker", "initialized");
667,629✔
1153

1154
  if ((code = vmOpenVnodes(pMgmt)) != 0) {
667,629✔
1155
    dError("failed to open all vnodes since %s", tstrerror(code));
×
1156
    goto _OVER;
×
1157
  }
1158
  tmsgReportStartup("vnode-vnodes", "initialized");
667,629✔
1159

1160
  if ((code = udfcOpen()) != 0) {
667,629✔
1161
    dError("failed to open udfc in vnode since %s", tstrerror(code));
×
1162
    goto _OVER;
×
1163
  }
1164

1165
  code = 0;
667,629✔
1166

1167
_OVER:
667,629✔
1168
  if (code == 0) {
667,629✔
1169
    pOutput->pMgmt = pMgmt;
667,629✔
1170
  } else {
1171
    dError("failed to init vnodes-mgmt since %s", tstrerror(code));
×
1172
    vmCleanup(pMgmt);
×
1173
  }
1174

1175
  return code;
667,629✔
1176
}
1177

1178
static int32_t vmRequire(const SMgmtInputOpt *pInput, bool *required) {
670,849✔
1179
  *required = tsNumOfSupportVnodes > 0;
670,849✔
1180
  return 0;
670,849✔
1181
}
1182

1183
static void *vmRestoreVnodeInThread(void *param) {
412,892✔
1184
  SVnodeThread *pThread = param;
412,892✔
1185
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
412,892✔
1186

1187
  dInfo("thread:%d, start to restore %d vnodes", pThread->threadIndex, pThread->vnodeNum);
412,892✔
1188
  setThreadName("restore-vnodes");
412,892✔
1189

1190
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
829,830✔
1191
    SVnodeObj *pVnode = pThread->ppVnodes[v];
416,938✔
1192
    if (pVnode->failed) {
416,938✔
1193
      dError("vgId:%d, cannot restore a vnode in failed mode.", pVnode->vgId);
×
1194
      continue;
×
1195
    }
1196

1197
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
416,938✔
1198
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been restored", pVnode->vgId,
416,938✔
1199
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
1200
    tmsgReportStartup("vnode-restore", stepDesc);
416,938✔
1201

1202
    int32_t code = vnodeStart(pVnode->pImpl);
416,647✔
1203
    if (code != 0) {
416,938✔
1204
      dError("vgId:%d, failed to restore vnode by thread:%d", pVnode->vgId, pThread->threadIndex);
×
1205
      pThread->failed++;
×
1206
    } else {
1207
      dInfo("vgId:%d, is restored by thread:%d", pVnode->vgId, pThread->threadIndex);
416,938✔
1208
      pThread->opened++;
416,938✔
1209
      (void)atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
416,938✔
1210
    }
1211
  }
1212

1213
  dInfo("thread:%d, numOfVnodes:%d, restored:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
412,892✔
1214
        pThread->failed);
1215
  return NULL;
412,892✔
1216
}
1217

1218
static int32_t vmStartVnodes(SVnodeMgmt *pMgmt) {
667,629✔
1219
  int32_t     code = 0;
667,629✔
1220
  int32_t     numOfVnodes = 0;
667,629✔
1221
  SVnodeObj **ppVnodes = NULL;
667,629✔
1222
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
667,629✔
1223
  if (code != 0) {
667,629✔
1224
    dError("failed to get vnode list since %s", tstrerror(code));
×
1225
    return code;
×
1226
  }
1227

1228
  int32_t threadNum = tsNumOfCores / 2;
667,629✔
1229
  if (threadNum < 1) threadNum = 1;
667,629✔
1230
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
667,629✔
1231

1232
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
667,629✔
1233
  if (threads == NULL) {
667,629✔
1234
    return terrno;
×
1235
  }
1236

1237
  for (int32_t t = 0; t < threadNum; ++t) {
14,020,209✔
1238
    threads[t].threadIndex = t;
13,352,580✔
1239
    threads[t].pMgmt = pMgmt;
13,352,580✔
1240
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
13,352,580✔
1241
    if (threads[t].ppVnodes == NULL) {
13,352,580✔
1242
      code = terrno;
×
1243
      break;
×
1244
    }
1245
  }
1246

1247
  for (int32_t v = 0; v < numOfVnodes; ++v) {
1,084,567✔
1248
    int32_t       t = v % threadNum;
416,938✔
1249
    SVnodeThread *pThread = &threads[t];
416,938✔
1250
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
416,938✔
1251
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
416,938✔
1252
    }
1253
  }
1254

1255
  pMgmt->state.openVnodes = 0;
667,629✔
1256
  pMgmt->state.dropVnodes = 0;
667,629✔
1257
  dInfo("restore %d vnodes with %d threads", numOfVnodes, threadNum);
667,629✔
1258

1259
  for (int32_t t = 0; t < threadNum; ++t) {
14,020,209✔
1260
    SVnodeThread *pThread = &threads[t];
13,352,580✔
1261
    if (pThread->vnodeNum == 0) continue;
13,352,580✔
1262

1263
    TdThreadAttr thAttr;
411,972✔
1264
    (void)taosThreadAttrInit(&thAttr);
412,892✔
1265
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
412,892✔
1266
    if (taosThreadCreate(&pThread->thread, &thAttr, vmRestoreVnodeInThread, pThread) != 0) {
412,892✔
1267
      dError("thread:%d, failed to create thread to restore vnode since %s", pThread->threadIndex, strerror(ERRNO));
×
1268
    }
1269

1270
    (void)taosThreadAttrDestroy(&thAttr);
412,892✔
1271
  }
1272

1273
  for (int32_t t = 0; t < threadNum; ++t) {
14,020,209✔
1274
    SVnodeThread *pThread = &threads[t];
13,352,580✔
1275
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
13,352,580✔
1276
      (void)taosThreadJoin(pThread->thread, NULL);
412,892✔
1277
      taosThreadClear(&pThread->thread);
412,892✔
1278
    }
1279
    taosMemoryFree(pThread->ppVnodes);
13,352,580✔
1280
  }
1281
  taosMemoryFree(threads);
667,629✔
1282

1283
  for (int32_t i = 0; i < numOfVnodes; ++i) {
1,084,567✔
1284
    if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
416,938✔
1285
    vmReleaseVnode(pMgmt, ppVnodes[i]);
416,938✔
1286
  }
1287

1288
  if (ppVnodes != NULL) {
667,629✔
1289
    taosMemoryFree(ppVnodes);
667,629✔
1290
  }
1291

1292
  return vmInitTimer(pMgmt);
667,629✔
1293

1294
_exit:
1295
  for (int32_t t = 0; t < threadNum; ++t) {
1296
    SVnodeThread *pThread = &threads[t];
1297
    taosMemoryFree(pThread->ppVnodes);
1298
  }
1299
  taosMemoryFree(threads);
1300
  return code;
1301
}
1302

1303
static void vmStop(SVnodeMgmt *pMgmt) { vmCleanupTimer(pMgmt); }
667,629✔
1304

1305
SMgmtFunc vmGetMgmtFunc() {
670,849✔
1306
  SMgmtFunc mgmtFunc = {0};
670,849✔
1307
  mgmtFunc.openFp = vmInit;
670,849✔
1308
  mgmtFunc.closeFp = (NodeCloseFp)vmCleanup;
670,849✔
1309
  mgmtFunc.startFp = (NodeStartFp)vmStartVnodes;
670,849✔
1310
  mgmtFunc.stopFp = (NodeStopFp)vmStop;
670,849✔
1311
  mgmtFunc.requiredFp = vmRequire;
670,849✔
1312
  mgmtFunc.getHandlesFp = vmGetMsgHandles;
670,849✔
1313

1314
  return mgmtFunc;
670,849✔
1315
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc