• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4822

27 Oct 2025 05:42AM UTC coverage: 59.732% (+1.0%) from 58.728%
#4822

push

travis-ci

web-flow
Merge pull request #33377 from taosdata/fix/main/rename-udf-path

fix: update UDF example links to correct file paths

121214 of 258518 branches covered (46.89%)

Branch coverage included in aggregate %.

193636 of 268583 relevant lines covered (72.1%)

4002399.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.59
/source/dnode/mgmt/mgmt_vnode/src/vmInt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "vmInt.h"
18
#include "libs/function/tudf.h"
19
#include "osMemory.h"
20
#include "tfs.h"
21
#include "vnd.h"
22

23
int32_t vmGetPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
6,880✔
24
  int32_t    diskId = -1;
6,880✔
25
  SVnodeObj *pVnode = NULL;
6,880✔
26

27
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
6,880✔
28
  int32_t r = taosHashGetDup(pMgmt->runngingHash, &vgId, sizeof(int32_t), (void *)&pVnode);
6,895✔
29
  if (pVnode != NULL) {
6,892!
30
    diskId = pVnode->diskPrimary;
×
31
  }
32
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
6,892✔
33
  return diskId;
6,893✔
34
}
35

36
static void vmFreeVnodeObj(SVnodeObj **ppVnode) {
17,780✔
37
  if (!ppVnode || !(*ppVnode)) return;
17,780!
38

39
  SVnodeObj *pVnode = *ppVnode;
17,780✔
40

41
  int32_t refCount = atomic_load_32(&pVnode->refCount);
17,780✔
42
  while (refCount > 0) {
17,780!
43
    dWarn("vgId:%d, vnode is refenced, retry to free in 200ms, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
×
44
    taosMsleep(200);
×
45
    refCount = atomic_load_32(&pVnode->refCount);
×
46
  }
47

48
  taosMemoryFree(pVnode->path);
17,780!
49
  taosMemoryFree(pVnode);
17,780!
50
  ppVnode[0] = NULL;
17,780✔
51
}
52

53
static int32_t vmRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId, int32_t diskId) {
6,896✔
54
  int32_t    code = 0;
6,896✔
55
  SVnodeObj *pCreatingVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
6,896!
56
  if (pCreatingVnode == NULL) {
6,896!
57
    dError("failed to alloc vnode since %s", terrstr());
×
58
    return terrno;
×
59
  }
60
  (void)memset(pCreatingVnode, 0, sizeof(SVnodeObj));
6,896✔
61

62
  pCreatingVnode->vgId = vgId;
6,896✔
63
  pCreatingVnode->diskPrimary = diskId;
6,896✔
64

65
  code = taosThreadRwlockWrlock(&pMgmt->hashLock);
6,896✔
66
  if (code != 0) {
6,896!
67
    taosMemoryFree(pCreatingVnode);
×
68
    return code;
×
69
  }
70

71
  dTrace("vgId:%d, put vnode into creating hash, pCreatingVnode:%p", vgId, pCreatingVnode);
6,896✔
72
  code = taosHashPut(pMgmt->creatingHash, &vgId, sizeof(int32_t), &pCreatingVnode, sizeof(SVnodeObj *));
6,896✔
73
  if (code != 0) {
6,896!
74
    dError("vgId:%d, failed to put vnode to creatingHash", vgId);
×
75
    taosMemoryFree(pCreatingVnode);
×
76
  }
77

78
  int32_t r = taosThreadRwlockUnlock(&pMgmt->hashLock);
6,896✔
79
  if (r != 0) {
6,896!
80
    dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r));
×
81
  }
82

83
  return code;
6,896✔
84
}
85

86
static void vmUnRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId) {
6,904✔
87
  SVnodeObj *pOld = NULL;
6,904✔
88

89
  (void)taosThreadRwlockWrlock(&pMgmt->hashLock);
6,904✔
90
  int32_t r = taosHashGetDup(pMgmt->creatingHash, &vgId, sizeof(int32_t), (void *)&pOld);
6,904✔
91
  if (r != 0) {
6,904!
92
    dError("vgId:%d, failed to get vnode from creating Hash", vgId);
×
93
  }
94
  dTrace("vgId:%d, remove from creating Hash", vgId);
6,904✔
95
  r = taosHashRemove(pMgmt->creatingHash, &vgId, sizeof(int32_t));
6,904✔
96
  if (r != 0) {
6,904✔
97
    dError("vgId:%d, failed to remove vnode from creatingHash", vgId);
8!
98
  }
99
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
6,904✔
100

101
  if (pOld) {
6,904✔
102
    dTrace("vgId:%d, free vnode pOld:%p", vgId, &pOld);
6,896✔
103
    vmFreeVnodeObj(&pOld);
6,896✔
104
  }
105

106
_OVER:
8✔
107
  if (r != 0) {
6,904✔
108
    dError("vgId:%d, failed to remove vnode from creatingHash since %s", vgId, tstrerror(r));
8!
109
  }
110
}
6,904✔
111

112
int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
6,881✔
113
  int32_t code = 0;
6,881✔
114
  STfs   *pTfs = pMgmt->pTfs;
6,881✔
115
  int32_t diskId = 0;
6,881✔
116
  if (!pTfs) {
6,881!
117
    return diskId;
×
118
  }
119

120
  // search fs
121
  char vnodePath[TSDB_FILENAME_LEN] = {0};
6,881✔
122
  snprintf(vnodePath, TSDB_FILENAME_LEN - 1, "vnode%svnode%d", TD_DIRSEP, vgId);
6,881✔
123
  char fname[TSDB_FILENAME_LEN] = {0};
6,881✔
124
  char fnameTmp[TSDB_FILENAME_LEN] = {0};
6,881✔
125
  snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME);
6,881✔
126
  snprintf(fnameTmp, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME_TMP);
6,881✔
127

128
  diskId = tfsSearch(pTfs, 0, fname);
6,881✔
129
  if (diskId >= 0) {
6,881!
130
    return diskId;
×
131
  }
132
  diskId = tfsSearch(pTfs, 0, fnameTmp);
6,881✔
133
  if (diskId >= 0) {
6,888!
134
    return diskId;
×
135
  }
136

137
  // alloc
138
  int32_t     disks[TFS_MAX_DISKS_PER_TIER] = {0};
6,888✔
139
  int32_t     numOfVnodes = 0;
6,888✔
140
  SVnodeObj **ppVnodes = NULL;
6,888✔
141

142
  code = taosThreadMutexLock(&pMgmt->mutex);
6,888✔
143
  if (code != 0) {
6,896!
144
    return code;
×
145
  }
146

147
  code = vmGetAllVnodeListFromHashWithCreating(pMgmt, &numOfVnodes, &ppVnodes);
6,896✔
148
  if (code != 0) {
6,896!
149
    int32_t r = taosThreadMutexUnlock(&pMgmt->mutex);
×
150
    if (r != 0) {
×
151
      dError("vgId:%d, failed to unlock mutex since %s", vgId, tstrerror(r));
×
152
    }
153
    return code;
×
154
  }
155

156
  for (int32_t v = 0; v < numOfVnodes; v++) {
30,032✔
157
    SVnodeObj *pVnode = ppVnodes[v];
23,136✔
158
    disks[pVnode->diskPrimary] += 1;
23,136✔
159
  }
160

161
  int32_t minVal = INT_MAX;
6,896✔
162
  int32_t ndisk = tfsGetDisksAtLevel(pTfs, 0);
6,896✔
163
  diskId = 0;
6,896✔
164
  for (int32_t id = 0; id < ndisk; id++) {
14,157✔
165
    if (minVal > disks[id]) {
7,261✔
166
      minVal = disks[id];
6,978✔
167
      diskId = id;
6,978✔
168
    }
169
  }
170
  code = vmRegisterCreatingState(pMgmt, vgId, diskId);
6,896✔
171
  if (code != 0) {
6,896!
172
    int32_t r = taosThreadMutexUnlock(&pMgmt->mutex);
×
173
    if (r != 0) {
×
174
      dError("vgId:%d, failed to unlock mutex since %s", vgId, tstrerror(r));
×
175
    }
176
    goto _OVER;
×
177
  }
178

179
  code = taosThreadMutexUnlock(&pMgmt->mutex);
6,896✔
180
  if (code != 0) {
6,896!
181
    goto _OVER;
×
182
  }
183

184
_OVER:
6,896✔
185

186
  for (int32_t i = 0; i < numOfVnodes; ++i) {
30,032✔
187
    if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
23,135!
188
    vmReleaseVnode(pMgmt, ppVnodes[i]);
23,135✔
189
  }
190
  if (ppVnodes != NULL) {
6,897✔
191
    taosMemoryFree(ppVnodes);
6,896!
192
  }
193

194
  if (code != 0) {
6,896!
195
    dError("vgId:%d, failed to alloc disk since %s", vgId, tstrerror(code));
×
196
    return code;
×
197
  } else {
198
    dInfo("vgId:%d, alloc disk:%d of level 0. ndisk:%d, vnodes: %d", vgId, diskId, ndisk, numOfVnodes);
6,896!
199
    return diskId;
6,896✔
200
  }
201
}
202

203
void vmCleanPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) { vmUnRegisterCreatingState(pMgmt, vgId); }
6,904✔
204

205
SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict) {
4,864,445✔
206
  SVnodeObj *pVnode = NULL;
4,864,445✔
207

208
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
4,864,445✔
209
  int32_t r = taosHashGetDup(pMgmt->runngingHash, &vgId, sizeof(int32_t), (void *)&pVnode);
4,864,975✔
210
  if (pVnode == NULL || strict && (pVnode->dropped || pVnode->failed)) {
4,864,915!
211
    terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
34,765✔
212
    dDebug("vgId:%d, acquire vnode failed.", vgId);
34,782✔
213
    pVnode = NULL;
34,783✔
214
  } else {
215
    int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
4,830,150✔
216
    dTrace("vgId:%d, acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
4,830,187✔
217
  }
218
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
4,864,970✔
219

220
  return pVnode;
4,864,917✔
221
}
222

223
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) { return vmAcquireVnodeImpl(pMgmt, vgId, true); }
4,854,308✔
224

225
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
4,917,703✔
226
  if (pVnode == NULL) return;
4,917,703✔
227

228
  //(void)taosThreadRwlockRdlock(&pMgmt->lock);
229
  int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
4,917,695✔
230
  dTrace("vgId:%d, release vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
4,917,925✔
231
  //(void)taosThreadRwlockUnlock(&pMgmt->lock);
232
}
233

234
static int32_t vmRegisterRunningState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
9,497✔
235
  SVnodeObj *pOld = NULL;
9,497✔
236
  dInfo("vgId:%d, put vnode into running hash", pVnode->vgId);
9,497!
237

238
  int32_t r = taosHashGetDup(pMgmt->runngingHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
9,497✔
239
  if (r != 0) {
9,497!
240
    dError("vgId:%d, failed to get vnode from hash", pVnode->vgId);
×
241
  }
242
  if (pOld) {
9,497!
243
    vmFreeVnodeObj(&pOld);
×
244
  }
245
  int32_t code = taosHashPut(pMgmt->runngingHash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
9,497✔
246

247
  return code;
9,497✔
248
}
249

250
static void vmUnRegisterRunningState(SVnodeMgmt *pMgmt, int32_t vgId) {
9,497✔
251
  dInfo("vgId:%d, remove from hash", vgId);
9,497!
252
  int32_t r = taosHashRemove(pMgmt->runngingHash, &vgId, sizeof(int32_t));
9,497✔
253
  if (r != 0) {
9,497!
254
    dError("vgId:%d, failed to remove vnode since %s", vgId, tstrerror(r));
×
255
  }
256
}
9,497✔
257

258
static int32_t vmRegisterClosedState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
1,387✔
259
  int32_t    code = 0;
1,387✔
260
  dInfo("vgId:%d, put into closed hash", pVnode->vgId);
1,387!
261
  SVnodeObj *pClosedVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
1,387!
262
  if (pClosedVnode == NULL) {
1,387!
263
    dError("failed to alloc vnode since %s", terrstr());
×
264
    return terrno;
×
265
  }
266
  (void)memset(pClosedVnode, 0, sizeof(SVnodeObj));
1,387✔
267

268
  pClosedVnode->vgId = pVnode->vgId;
1,387✔
269
  pClosedVnode->dropped = pVnode->dropped;
1,387✔
270
  pClosedVnode->vgVersion = pVnode->vgVersion;
1,387✔
271
  pClosedVnode->diskPrimary = pVnode->diskPrimary;
1,387✔
272
  pClosedVnode->toVgId = pVnode->toVgId;
1,387✔
273
  pClosedVnode->mountId = pVnode->mountId;
1,387✔
274

275
  SVnodeObj *pOld = NULL;
1,387✔
276
  int32_t    r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
1,387✔
277
  if (r != 0) {
1,387!
278
    dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
×
279
  }
280
  if (pOld) {
1,387!
281
    vmFreeVnodeObj(&pOld);
×
282
  }
283
  dInfo("vgId:%d, put vnode to closedHash", pVnode->vgId);
1,387!
284
  r = taosHashPut(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), &pClosedVnode, sizeof(SVnodeObj *));
1,387✔
285
  if (r != 0) {
1,387!
286
    dError("vgId:%d, failed to put vnode to closedHash", pVnode->vgId);
×
287
  }
288

289
  return code;
1,387✔
290
}
291

292
static void vmUnRegisterClosedState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
9,497✔
293
  SVnodeObj *pOld = NULL;
9,497✔
294
  dInfo("vgId:%d, remove from closed hash", pVnode->vgId);
9,497!
295
  int32_t r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
9,497✔
296
  if (r != 0) {
9,497!
297
    dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
×
298
  }
299
  if (pOld != NULL) {
9,497✔
300
    vmFreeVnodeObj(&pOld);
1,387✔
301
    dInfo("vgId:%d, remove from closedHash", pVnode->vgId);
1,387!
302
    r = taosHashRemove(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t));
1,387✔
303
    if (r != 0) {
1,387!
304
      dError("vgId:%d, failed to remove vnode from hash", pVnode->vgId);
×
305
    }
306
  }
307
}
9,497✔
308
#ifdef USE_MOUNT
309
int32_t vmAcquireMountTfs(SVnodeMgmt *pMgmt, int64_t mountId, const char *mountName, const char *mountPath,
16✔
310
                          STfs **ppTfs) {
311
  int32_t    code = 0, lino = 0;
16✔
312
  TdFilePtr  pFile = NULL;
16✔
313
  SArray    *pDisks = NULL;
16✔
314
  SMountTfs *pMountTfs = NULL;
16✔
315
  bool       unlock = false;
16✔
316

317
  pMountTfs = taosHashGet(pMgmt->mountTfsHash, &mountId, sizeof(mountId));
16✔
318
  if (pMountTfs && *(SMountTfs **)pMountTfs) {
16!
319
    if (!(*ppTfs = (*(SMountTfs **)pMountTfs)->pTfs)) {
8!
320
      TAOS_CHECK_EXIT(TSDB_CODE_INTERNAL_ERROR);
×
321
    }
322
    (void)atomic_add_fetch_32(&(*(SMountTfs **)pMountTfs)->nRef, 1);
8✔
323
    TAOS_RETURN(code);
8✔
324
  }
325
  if (!mountPath || mountPath[0] == 0 || mountId == 0) {
8!
326
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_PARA);
×
327
  }
328
  (void)(taosThreadMutexLock(&pMgmt->mutex));
8✔
329
  unlock = true;
8✔
330
  pMountTfs = taosHashGet(pMgmt->mountTfsHash, &mountId, sizeof(mountId));
8✔
331
  if (pMountTfs && *(SMountTfs **)pMountTfs) {
8!
332
    if (!(*ppTfs = (*(SMountTfs **)pMountTfs)->pTfs)) {
6!
333
      TAOS_CHECK_EXIT(TSDB_CODE_INTERNAL_ERROR);
×
334
    }
335
    (void)taosThreadMutexUnlock(&pMgmt->mutex);
6✔
336
    (void)atomic_add_fetch_32(&(*(SMountTfs **)pMountTfs)->nRef, 1);
6✔
337
    TAOS_RETURN(code);
6✔
338
  }
339

340
  TAOS_CHECK_EXIT(vmMountCheckRunning(mountName, mountPath, &pFile, 3));
2!
341
  TAOS_CHECK_EXIT(vmGetMountDisks(pMgmt, mountPath, &pDisks));
2!
342
  int32_t numOfDisks = taosArrayGetSize(pDisks);
2✔
343
  if (numOfDisks <= 0) {
2!
344
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
345
  }
346
  TSDB_CHECK_NULL((pMountTfs = taosMemoryCalloc(1, sizeof(SMountTfs))), code, lino, _exit, terrno);
2!
347
  if (mountName) (void)snprintf(pMountTfs->name, sizeof(pMountTfs->name), "%s", mountName);
2!
348
  if (mountPath) (void)snprintf(pMountTfs->path, sizeof(pMountTfs->path), "%s", mountPath);
2!
349
  pMountTfs->pFile = pFile;
2✔
350
  atomic_store_32(&pMountTfs->nRef, 2);  // init and acquire
2✔
351
  TAOS_CHECK_EXIT(tfsOpen(TARRAY_GET_ELEM(pDisks, 0), numOfDisks, &pMountTfs->pTfs));
2!
352
  TAOS_CHECK_EXIT(taosHashPut(pMgmt->mountTfsHash, &mountId, sizeof(mountId), &pMountTfs, POINTER_BYTES));
2!
353
_exit:
2✔
354
  if (unlock) {
2!
355
    (void)taosThreadMutexUnlock(&pMgmt->mutex);
2✔
356
  }
357
  taosArrayDestroy(pDisks);
2✔
358
  if (code != 0) {
2!
359
    dError("mount:%" PRIi64 ",%s, failed at line %d to get mount tfs since %s", mountId, mountPath ? mountPath : "NULL",
×
360
           lino, tstrerror(code));
361
    if (pFile) {
×
362
      (void)taosUnLockFile(pFile);
×
363
      (void)taosCloseFile(&pFile);
×
364
    }
365
    if (pMountTfs) {
×
366
      tfsClose(pMountTfs->pTfs);
×
367
      taosMemoryFree(pMountTfs);
×
368
    }
369
    *ppTfs = NULL;
×
370
  } else {
371
    *ppTfs = pMountTfs->pTfs;
2✔
372
  }
373

374
  TAOS_RETURN(code);
2✔
375
}
376
#endif
377

378
bool vmReleaseMountTfs(SVnodeMgmt *pMgmt, int64_t mountId, int32_t minRef) {
16✔
379
#ifdef USE_MOUNT
380
  SMountTfs *pMountTfs = NULL;
16✔
381
  int32_t    nRef = INT32_MAX, code = 0;
16✔
382

383
  pMountTfs = taosHashGet(pMgmt->mountTfsHash, &mountId, sizeof(mountId));
16✔
384
  if (pMountTfs && *(SMountTfs **)pMountTfs) {
16!
385
    if ((nRef = atomic_sub_fetch_32(&(*(SMountTfs **)pMountTfs)->nRef, 1)) <= minRef) {
16✔
386
      (void)(taosThreadMutexLock(&pMgmt->mutex));
2✔
387
      SMountTfs *pTmp = taosHashGet(pMgmt->mountTfsHash, &mountId, sizeof(mountId));
2✔
388
      if (pTmp && *(SMountTfs **)pTmp) {
2!
389
        dInfo("mount:%" PRIi64 ", ref:%d, release mount tfs", mountId, nRef);
2!
390
        tfsClose((*(SMountTfs **)pTmp)->pTfs);
2✔
391
        if ((*(SMountTfs **)pTmp)->pFile) {
2!
392
          (void)taosUnLockFile((*(SMountTfs **)pTmp)->pFile);
2✔
393
          (void)taosCloseFile(&(*(SMountTfs **)pTmp)->pFile);
2✔
394
        }
395
        taosMemoryFree(*(SMountTfs **)pTmp);
2!
396
        if ((code = taosHashRemove(pMgmt->mountTfsHash, &mountId, sizeof(mountId))) < 0) {
2!
397
          dError("failed at line %d to remove mountId:%" PRIi64 " from mount tfs hash", __LINE__, mountId);
×
398
        }
399
      }
400
      (void)taosThreadMutexUnlock(&pMgmt->mutex);
2✔
401
      return true;
2✔
402
    }
403
  }
404
#endif
405
  return false;
14✔
406
}
407

408
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
9,497✔
409
  SVnodeObj *pVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
9,497!
410
  if (pVnode == NULL) {
9,497!
411
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
412
    return -1;
×
413
  }
414

415
  pVnode->vgId = pCfg->vgId;
9,497✔
416
  pVnode->vgVersion = pCfg->vgVersion;
9,497✔
417
  pVnode->diskPrimary = pCfg->diskPrimary;
9,497✔
418
  pVnode->mountId = pCfg->mountId;
9,497✔
419
  pVnode->refCount = 0;
9,497✔
420
  pVnode->dropped = 0;
9,497✔
421
  pVnode->failed = 0;
9,497✔
422
  pVnode->path = taosStrdup(pCfg->path);
9,497!
423
  pVnode->pImpl = pImpl;
9,497✔
424

425
  if (pVnode->path == NULL) {
9,497!
426
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
427
    taosMemoryFree(pVnode);
×
428
    return -1;
×
429
  }
430

431
  if (pImpl) {
9,497!
432
    if (vmAllocQueue(pMgmt, pVnode) != 0) {
9,497!
433
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
434
      taosMemoryFree(pVnode->path);
×
435
      taosMemoryFree(pVnode);
×
436
      return -1;
×
437
    }
438
  } else {
439
    pVnode->failed = 1;
×
440
  }
441

442
  (void)taosThreadRwlockWrlock(&pMgmt->hashLock);
9,497✔
443
  int32_t code = vmRegisterRunningState(pMgmt, pVnode);
9,497✔
444
  vmUnRegisterClosedState(pMgmt, pVnode);
9,497✔
445
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
9,497✔
446

447
  TAOS_RETURN(code);
9,497✔
448
}
449

450
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal, bool keepClosed) {
9,495✔
451
  char path[TSDB_FILENAME_LEN] = {0};
9,495✔
452
  bool atExit = true;
9,495✔
453

454
  if (pVnode->pImpl && vnodeIsLeader(pVnode->pImpl)) {
9,495!
455
    vnodeProposeCommitOnNeed(pVnode->pImpl, atExit);
7,276✔
456
  }
457

458
  (void)taosThreadRwlockWrlock(&pMgmt->hashLock);
9,490✔
459
  vmUnRegisterRunningState(pMgmt, pVnode->vgId);
9,497✔
460
  if (keepClosed) {
9,497✔
461
    if (vmRegisterClosedState(pMgmt, pVnode) != 0) {
1,387!
462
      (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
463
      return;
×
464
    };
465
  }
466
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
9,497✔
467

468
  vmReleaseVnode(pMgmt, pVnode);
9,497✔
469

470
  if (pVnode->failed) {
9,497!
471
    goto _closed;
×
472
  }
473
  dInfo("vgId:%d, pre close", pVnode->vgId);
9,497!
474
  vnodePreClose(pVnode->pImpl);
9,497✔
475

476
  dInfo("vgId:%d, wait for vnode ref become 0", pVnode->vgId);
9,497!
477
  while (pVnode->refCount > 0) taosMsleep(10);
9,497!
478

479
  dInfo("vgId:%d, wait for vnode write queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteW.queue,
9,497!
480
        taosQueueGetThreadId(pVnode->pWriteW.queue));
481
  tMultiWorkerCleanup(&pVnode->pWriteW);
9,497✔
482

483
  dInfo("vgId:%d, wait for vnode sync queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue,
9,497!
484
        taosQueueGetThreadId(pVnode->pSyncW.queue));
485
  tMultiWorkerCleanup(&pVnode->pSyncW);
9,497✔
486

487
  dInfo("vgId:%d, wait for vnode sync rd queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncRdW.queue,
9,497!
488
        taosQueueGetThreadId(pVnode->pSyncRdW.queue));
489
  tMultiWorkerCleanup(&pVnode->pSyncRdW);
9,497✔
490

491
  dInfo("vgId:%d, wait for vnode apply queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue,
9,497!
492
        taosQueueGetThreadId(pVnode->pApplyW.queue));
493
  tMultiWorkerCleanup(&pVnode->pApplyW);
9,497✔
494

495
  dInfo("vgId:%d, wait for vnode fetch queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
9,497!
496
        taosQueueGetThreadId(pVnode->pFetchQ));
497
  while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
9,497!
498

499
  dInfo("vgId:%d, wait for vnode query queue:%p is empty", pVnode->vgId, pVnode->pQueryQ);
9,497!
500
  while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
9,497!
501

502
  dInfo("vgId:%d, wait for vnode stream reader queue:%p is empty", pVnode->vgId, pVnode->pStreamReaderQ);
9,497!
503
  while (!taosQueueEmpty(pVnode->pStreamReaderQ)) taosMsleep(10);
9,497!
504

505
  dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId);
9,497!
506

507
  dInfo("vgId:%d, post close", pVnode->vgId);
9,497!
508
  vnodePostClose(pVnode->pImpl);
9,497✔
509

510
  vmFreeQueue(pMgmt, pVnode);
9,497✔
511

512
  if (commitAndRemoveWal) {
9,497✔
513
    dInfo("vgId:%d, commit data for vnode split", pVnode->vgId);
65!
514
    if (vnodeSyncCommit(pVnode->pImpl) != 0) {
65!
515
      dError("vgId:%d, failed to commit data", pVnode->vgId);
×
516
    }
517
    if (vnodeBegin(pVnode->pImpl) != 0) {
65!
518
      dError("vgId:%d, failed to begin", pVnode->vgId);
×
519
    }
520
    dInfo("vgId:%d, commit data finished", pVnode->vgId);
65!
521
  }
522

523
  int32_t nodeId = vnodeNodeId(pVnode->pImpl);
9,497✔
524
  vnodeClose(pVnode->pImpl);
9,497✔
525
  pVnode->pImpl = NULL;
9,492✔
526

527
_closed:
9,492✔
528
  dInfo("vgId:%d, vnode is closed", pVnode->vgId);
9,492!
529

530
  if (commitAndRemoveWal) {
9,496✔
531
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d%swal", TD_DIRSEP, pVnode->vgId, TD_DIRSEP);
65✔
532
    dInfo("vgId:%d, remove all wals, path:%s", pVnode->vgId, path);
65!
533
    if (tfsRmdir(pMgmt->pTfs, path) != 0) {
65!
534
      dTrace("vgId:%d, failed to remove wals, path:%s", pVnode->vgId, path);
×
535
    }
536
    if (tfsMkdir(pMgmt->pTfs, path) != 0) {
65!
537
      dTrace("vgId:%d, failed to create wals, path:%s", pVnode->vgId, path);
×
538
    }
539
  }
540

541
  if (pVnode->dropped) {
9,496✔
542
    dInfo("vgId:%d, vnode is destroyed, dropped:%d", pVnode->vgId, pVnode->dropped);
3,079!
543
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId);
3,079✔
544
    vnodeDestroy(pVnode->vgId, path, pMgmt->pTfs, nodeId);
3,079✔
545
  }
546
  if (pVnode->mountId && vmReleaseMountTfs(pMgmt, pVnode->mountId, pVnode->dropped ? 1 : 0)) {
9,496✔
547
    if (vmWriteMountListToFile(pMgmt) != 0) {
2!
548
      dError("vgId:%d, failed at line %d to write mount list since %s", pVnode->vgId, __LINE__, terrstr());
×
549
    }
550
  }
551

552
  vmFreeVnodeObj(&pVnode);
9,496✔
553
}
554

555
void vmCloseFailedVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
×
556
  int32_t r = 0;
×
557
  r = taosThreadRwlockWrlock(&pMgmt->hashLock);
×
558
  if (r != 0) {
×
559
    dError("vgId:%d, failed to lock since %s", vgId, tstrerror(r));
×
560
  }
561
  if (r == 0) {
×
562
    vmUnRegisterRunningState(pMgmt, vgId);
×
563
  }
564
  r = taosThreadRwlockUnlock(&pMgmt->hashLock);
×
565
  if (r != 0) {
×
566
    dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r));
×
567
  }
568
}
×
569

570
static int32_t vmRestoreVgroupId(SWrapperCfg *pCfg, STfs *pTfs) {
×
571
  int32_t srcVgId = pCfg->vgId;
×
572
  int32_t dstVgId = pCfg->toVgId;
×
573
  if (dstVgId == 0) return 0;
×
574

575
  char srcPath[TSDB_FILENAME_LEN];
576
  char dstPath[TSDB_FILENAME_LEN];
577

578
  snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
×
579
  snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
×
580

581
  int32_t diskPrimary = pCfg->diskPrimary;
×
582
  int32_t vgId = vnodeRestoreVgroupId(srcPath, dstPath, srcVgId, dstVgId, diskPrimary, pTfs);
×
583
  if (vgId <= 0) {
×
584
    dError("vgId:%d, failed to restore vgroup id. srcPath: %s", pCfg->vgId, srcPath);
×
585
    return -1;
×
586
  }
587

588
  pCfg->vgId = vgId;
×
589
  pCfg->toVgId = 0;
×
590
  return 0;
×
591
}
592

593
static void *vmOpenVnodeInThread(void *param) {
1,117✔
594
  SVnodeThread *pThread = param;
1,117✔
595
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
1,117✔
596
  char          path[TSDB_FILENAME_LEN];
597

598
  dInfo("thread:%d, start to open or destroy %d vnodes", pThread->threadIndex, pThread->vnodeNum);
1,117!
599
  setThreadName("open-vnodes");
1,117✔
600

601
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
2,257✔
602
    SWrapperCfg *pCfg = &pThread->pCfgs[v];
1,140✔
603
    if (pCfg->dropped) {
1,140!
604
      char stepDesc[TSDB_STEP_DESC_LEN] = {0};
×
605
      snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to destroy, %d of %d have been dropped", pCfg->vgId,
×
606
               pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
607
      tmsgReportStartup("vnode-destroy", stepDesc);
×
608

609
      snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
×
610
      vnodeDestroy(pCfg->vgId, path, pMgmt->pTfs, 0);
×
611
      pThread->updateVnodesList = true;
×
612
      pThread->dropped++;
×
613
      (void)atomic_add_fetch_32(&pMgmt->state.dropVnodes, 1);
×
614
      continue;
×
615
    }
616

617
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
1,140✔
618
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
1,140✔
619
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
620
    tmsgReportStartup("vnode-open", stepDesc);
1,140✔
621

622
    if (pCfg->toVgId) {
1,141✔
623
      if (vmRestoreVgroupId(pCfg, pMgmt->pTfs) != 0) {
1!
624
        dError("vgId:%d, failed to restore vgroup id by thread:%d", pCfg->vgId, pThread->threadIndex);
×
625
        pThread->failed++;
×
626
        continue;
×
627
      }
628
      pThread->updateVnodesList = true;
×
629
    }
630

631
    int32_t diskPrimary = pCfg->mountId == 0 ? pCfg->diskPrimary : 0;
1,140✔
632
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
1,140✔
633

634
    STfs *pMountTfs = NULL;
1,140✔
635
#ifdef USE_MOUNT
636
    bool releaseTfs = false;
1,140✔
637
    if (pCfg->mountId) {
1,140✔
638
      if (vmAcquireMountTfs(pMgmt, pCfg->mountId, NULL, NULL, &pMountTfs) != 0) {
8!
639
        dError("vgId:%d, failed to get mount tfs by thread:%d", pCfg->vgId, pThread->threadIndex);
×
640
        pThread->failed++;
×
641
        continue;
×
642
      }
643
      releaseTfs = true;
8✔
644
    }
645
#endif
646

647
    SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMountTfs, pMgmt->msgCb, false);
1,140✔
648

649
    if (pImpl == NULL) {
1,141!
650
      dError("vgId:%d, failed to open vnode by thread:%d since %s", pCfg->vgId, pThread->threadIndex, terrstr());
×
651
      if (terrno != TSDB_CODE_NEED_RETRY) {
×
652
        pThread->failed++;
×
653
#ifdef USE_MOUNT
654
        if (releaseTfs) vmReleaseMountTfs(pMgmt, pCfg->mountId, 0);
×
655
#endif
656
        continue;
×
657
      }
658
    }
659

660
    if (pImpl != NULL) {
1,141!
661
      if (vmOpenVnode(pMgmt, pCfg, pImpl) != 0) {
1,141!
662
        dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
×
663
        pThread->failed++;
×
664
#ifdef USE_MOUNT
665
        if (releaseTfs) vmReleaseMountTfs(pMgmt, pCfg->mountId, 0);
×
666
#endif
667
        continue;
×
668
      }
669
    }
670

671
    dInfo("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
1,141!
672
    pThread->opened++;
1,141✔
673
    (void)atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
1,141✔
674
  }
675

676
  dInfo("thread:%d, numOfVnodes:%d, opened:%d dropped:%d failed:%d", pThread->threadIndex, pThread->vnodeNum,
1,117!
677
        pThread->opened, pThread->dropped, pThread->failed);
678
  return NULL;
1,117✔
679
}
680

681
#ifdef USE_MOUNT
682
static int32_t vmOpenMountTfs(SVnodeMgmt *pMgmt) {
1,746✔
683
  int32_t    code = 0, lino = 0;
1,746✔
684
  int32_t    numOfMounts = 0;
1,746✔
685
  SMountCfg *pMountCfgs = NULL;
1,746✔
686
  SArray    *pDisks = NULL;
1,746✔
687
  TdFilePtr  pFile = NULL;
1,746✔
688
  SMountTfs *pMountTfs = NULL;
1,746✔
689

690
  TAOS_CHECK_EXIT(vmGetMountListFromFile(pMgmt, &pMountCfgs, &numOfMounts));
1,746!
691
  for (int32_t i = 0; i < numOfMounts; ++i) {
1,748✔
692
    SMountCfg *pCfg = &pMountCfgs[i];
2✔
693
    if (taosHashGet(pMgmt->mountTfsHash, &pCfg->mountId, sizeof(pCfg->mountId))) {
2!
694
      TAOS_CHECK_EXIT(TSDB_CODE_INTERNAL_ERROR);
×
695
    }
696
    TAOS_CHECK_EXIT(vmMountCheckRunning(pCfg->name, pCfg->path, &pFile, 3));
2!
697
    TAOS_CHECK_EXIT(vmGetMountDisks(pMgmt, pCfg->path, &pDisks));
2!
698
    int32_t nDisks = taosArrayGetSize(pDisks);
2✔
699
    if (nDisks < 1 || nDisks > TFS_MAX_DISKS) {
2!
700
      dError("mount:%s, %" PRIi64 ", %s, invalid number of disks:%d, expect 1 to %d", pCfg->name, pCfg->mountId,
×
701
             pCfg->path, nDisks, TFS_MAX_DISKS);
702
      TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
703
    }
704

705
    TSDB_CHECK_NULL((pMountTfs = taosMemoryCalloc(1, sizeof(SMountTfs))), code, lino, _exit, terrno);
2!
706
    TAOS_CHECK_EXIT(tfsOpen(TARRAY_GET_ELEM(pDisks, 0), TARRAY_SIZE(pDisks), &pMountTfs->pTfs));
2!
707
    (void)snprintf(pMountTfs->name, sizeof(pMountTfs->name), "%s", pCfg->name);
2✔
708
    (void)snprintf(pMountTfs->path, sizeof(pMountTfs->path), "%s", pCfg->path);
2✔
709
    pMountTfs->pFile = pFile;
2✔
710
    pMountTfs->nRef = 1;
2✔
711
    TAOS_CHECK_EXIT(taosHashPut(pMgmt->mountTfsHash, &pCfg->mountId, sizeof(pCfg->mountId), &pMountTfs, POINTER_BYTES));
2!
712
    taosArrayDestroy(pDisks);
2✔
713
    pDisks = NULL;
2✔
714
    pMountTfs = NULL;
2✔
715
    pFile = NULL;
2✔
716
  }
717
_exit:
1,746✔
718
  if (code != 0) {
1,746!
719
    dError("failed to open mount tfs at line %d since %s", lino, tstrerror(code));
×
720
    if (pFile) {
×
721
      (void)taosUnLockFile(pFile);
×
722
      (void)taosCloseFile(&pFile);
×
723
    }
724
    if (pMountTfs) {
×
725
      tfsClose(pMountTfs->pTfs);
×
726
      taosMemoryFree(pMountTfs);
×
727
    }
728
    taosArrayDestroy(pDisks);
×
729
  }
730
  taosMemoryFree(pMountCfgs);
1,746!
731
  TAOS_RETURN(code);
1,746✔
732
}
733
#endif
734
static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
1,746✔
735
  pMgmt->runngingHash =
1,746✔
736
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
1,746✔
737
  if (pMgmt->runngingHash == NULL) {
1,746!
738
    dError("failed to init vnode hash since %s", terrstr());
×
739
    return TSDB_CODE_OUT_OF_MEMORY;
×
740
  }
741

742
  pMgmt->closedHash =
1,746✔
743
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
1,746✔
744
  if (pMgmt->closedHash == NULL) {
1,746!
745
    dError("failed to init vnode closed hash since %s", terrstr());
×
746
    return TSDB_CODE_OUT_OF_MEMORY;
×
747
  }
748

749
  pMgmt->creatingHash =
1,746✔
750
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
1,746✔
751
  if (pMgmt->creatingHash == NULL) {
1,746!
752
    dError("failed to init vnode creatingHash hash since %s", terrstr());
×
753
    return TSDB_CODE_OUT_OF_MEMORY;
×
754
  }
755

756
  SWrapperCfg *pCfgs = NULL;
1,746✔
757
  int32_t      numOfVnodes = 0;
1,746✔
758
  int32_t      code = 0;
1,746✔
759
  if ((code = vmGetVnodeListFromFile(pMgmt, &pCfgs, &numOfVnodes)) != 0) {
1,746!
760
    dInfo("failed to get vnode list from disk since %s", tstrerror(code));
×
761
    return code;
×
762
  }
763

764
  pMgmt->state.totalVnodes = numOfVnodes;
1,746✔
765

766
  int32_t threadNum = tsNumOfCores / 2;
1,746✔
767
  if (threadNum < 1) threadNum = 1;
1,746!
768
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
1,746✔
769

770
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
1,746!
771
  if (threads == NULL) {
1,746!
772
    dError("failed to allocate memory for threads since %s", terrstr());
×
773
    taosMemoryFree(pCfgs);
×
774
    return terrno;
×
775
  }
776

777
  for (int32_t t = 0; t < threadNum; ++t) {
36,666✔
778
    threads[t].threadIndex = t;
34,920✔
779
    threads[t].pMgmt = pMgmt;
34,920✔
780
    threads[t].pCfgs = taosMemoryCalloc(vnodesPerThread, sizeof(SWrapperCfg));
34,920!
781
  }
782

783
  for (int32_t v = 0; v < numOfVnodes; ++v) {
2,887✔
784
    int32_t       t = v % threadNum;
1,141✔
785
    SVnodeThread *pThread = &threads[t];
1,141✔
786
    pThread->pCfgs[pThread->vnodeNum++] = pCfgs[v];
1,141✔
787
  }
788

789
  dInfo("open %d vnodes with %d threads", numOfVnodes, threadNum);
1,746!
790

791
  for (int32_t t = 0; t < threadNum; ++t) {
36,666✔
792
    SVnodeThread *pThread = &threads[t];
34,920✔
793
    if (pThread->vnodeNum == 0) continue;
34,920✔
794

795
    TdThreadAttr thAttr;
796
    (void)taosThreadAttrInit(&thAttr);
1,117✔
797
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,117✔
798
#ifdef TD_COMPACT_OS
799
    (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
800
#endif
801
    if (taosThreadCreate(&pThread->thread, &thAttr, vmOpenVnodeInThread, pThread) != 0) {
1,117!
802
      dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(ERRNO));
×
803
    }
804

805
    (void)taosThreadAttrDestroy(&thAttr);
1,117✔
806
  }
807

808
  bool updateVnodesList = false;
1,746✔
809

810
  for (int32_t t = 0; t < threadNum; ++t) {
36,666✔
811
    SVnodeThread *pThread = &threads[t];
34,920✔
812
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
34,920!
813
      (void)taosThreadJoin(pThread->thread, NULL);
1,117✔
814
      taosThreadClear(&pThread->thread);
1,117✔
815
    }
816
    taosMemoryFree(pThread->pCfgs);
34,920!
817
    if (pThread->updateVnodesList) updateVnodesList = true;
34,920!
818
  }
819
  taosMemoryFree(threads);
1,746!
820
  taosMemoryFree(pCfgs);
1,746!
821

822
  if ((pMgmt->state.openVnodes + pMgmt->state.dropVnodes) != pMgmt->state.totalVnodes) {
1,746!
823
    dError("there are total vnodes:%d, opened:%d", pMgmt->state.totalVnodes, pMgmt->state.openVnodes);
×
824
    return terrno = TSDB_CODE_VND_INIT_FAILED;
×
825
  }
826

827
  if (updateVnodesList && (code = vmWriteVnodeListToFile(pMgmt)) != 0) {
1,746!
828
    dError("failed to write vnode list since %s", tstrerror(code));
×
829
    return code;
×
830
  }
831

832
#ifdef USE_MOUNT
833
  bool  updateMountList = false;
1,746✔
834
  void *pIter = NULL;
1,746✔
835
  while ((pIter = taosHashIterate(pMgmt->mountTfsHash, pIter))) {
1,748✔
836
    SMountTfs *pMountTfs = *(SMountTfs **)pIter;
2✔
837
    if (pMountTfs && atomic_load_32(&pMountTfs->nRef) <= 1) {
2!
838
      size_t  keyLen = 0;
×
839
      int64_t mountId = *(int64_t *)taosHashGetKey(pIter, &keyLen);
×
840
      dInfo("mount:%s, %s, %" PRIi64 ", ref:%d, remove unused mount tfs", pMountTfs->name, pMountTfs->path, mountId,
×
841
            atomic_load_32(&pMountTfs->nRef));
842
      if (pMountTfs->pFile) {
×
843
        (void)taosUnLockFile(pMountTfs->pFile);
×
844
        (void)taosCloseFile(&pMountTfs->pFile);
×
845
      }
846
      tfsClose(pMountTfs->pTfs);
×
847
      taosMemoryFree(pMountTfs);
×
848
      if ((code = taosHashRemove(pMgmt->mountTfsHash, &mountId, keyLen)) != 0) {
×
849
        dWarn("failed at line %d to remove mount:%s, %s, %" PRIi64 " from mount tfs hash since %s", __LINE__,
×
850
              pMountTfs->name, pMountTfs->path, mountId, tstrerror(code));
851
      }
852
      updateMountList = true;
×
853
    }
854
  }
855
  if (updateMountList && (code = vmWriteMountListToFile(pMgmt)) != 0) {
1,746!
856
    dError("failed to write mount list at line %d since %s", __LINE__, tstrerror(code));
×
857
    return code;
×
858
  }
859
#endif
860

861
  dInfo("successfully opened %d vnodes", pMgmt->state.totalVnodes);
1,746!
862
  return 0;
1,746✔
863
}
864

865
static void *vmCloseVnodeInThread(void *param) {
4,801✔
866
  SVnodeThread *pThread = param;
4,801✔
867
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
4,801✔
868

869
  dInfo("thread:%d, start to close %d vnodes", pThread->threadIndex, pThread->vnodeNum);
4,801✔
870
  setThreadName("close-vnodes");
4,803✔
871

872
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
9,768✔
873
    SVnodeObj *pVnode = pThread->ppVnodes[v];
4,966✔
874

875
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
4,966✔
876
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to close, %d of %d have been closed", pVnode->vgId,
4,966✔
877
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
878
    tmsgReportStartup("vnode-close", stepDesc);
4,966✔
879

880
    vmCloseVnode(pMgmt, pVnode, false, false);
4,964✔
881
  }
882

883
  dInfo("thread:%d, numOfVnodes:%d is closed", pThread->threadIndex, pThread->vnodeNum);
4,802!
884
  return NULL;
4,802✔
885
}
886

887
static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
1,746✔
888
  int32_t code = 0;
1,746✔
889
  dInfo("start to close all vnodes");
1,746!
890
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
1,746✔
891
  dInfo("vnodes mgmt worker is stopped");
1,746!
892
  tSingleWorkerCleanup(&pMgmt->mgmtMultiWorker);
1,746✔
893
  dInfo("vnodes multiple mgmt worker is stopped");
1,746!
894

895
  int32_t     numOfVnodes = 0;
1,746✔
896
  SVnodeObj **ppVnodes = NULL;
1,746✔
897
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
1,746✔
898
  if (code != 0) {
1,746!
899
    dError("failed to get vnode list since %s", tstrerror(code));
×
900
    return;
×
901
  }
902

903
  int32_t threadNum = tsNumOfCores / 2;
1,746✔
904
  if (threadNum < 1) threadNum = 1;
1,746!
905
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
1,746✔
906

907
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
1,746!
908
  for (int32_t t = 0; t < threadNum; ++t) {
36,666✔
909
    threads[t].threadIndex = t;
34,920✔
910
    threads[t].pMgmt = pMgmt;
34,920✔
911
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
34,920!
912
  }
913

914
  for (int32_t v = 0; v < numOfVnodes; ++v) {
6,712✔
915
    int32_t       t = v % threadNum;
4,966✔
916
    SVnodeThread *pThread = &threads[t];
4,966✔
917
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
4,966!
918
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
4,966✔
919
    }
920
  }
921

922
  pMgmt->state.openVnodes = 0;
1,746✔
923
  dInfo("close %d vnodes with %d threads", numOfVnodes, threadNum);
1,746!
924

925
  for (int32_t t = 0; t < threadNum; ++t) {
36,666✔
926
    SVnodeThread *pThread = &threads[t];
34,920✔
927
    if (pThread->vnodeNum == 0) continue;
34,920✔
928

929
    TdThreadAttr thAttr;
930
    (void)taosThreadAttrInit(&thAttr);
4,802✔
931
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
4,802✔
932
#ifdef TD_COMPACT_OS
933
    (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
934
#endif
935
    if (taosThreadCreate(&pThread->thread, &thAttr, vmCloseVnodeInThread, pThread) != 0) {
4,802!
936
      dError("thread:%d, failed to create thread to close vnode since %s", pThread->threadIndex, strerror(ERRNO));
×
937
    }
938

939
    (void)taosThreadAttrDestroy(&thAttr);
4,802✔
940
  }
941

942
  for (int32_t t = 0; t < threadNum; ++t) {
36,666✔
943
    SVnodeThread *pThread = &threads[t];
34,920✔
944
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
34,920!
945
      (void)taosThreadJoin(pThread->thread, NULL);
4,802✔
946
      taosThreadClear(&pThread->thread);
4,802✔
947
    }
948
    taosMemoryFree(pThread->ppVnodes);
34,920!
949
  }
950
  taosMemoryFree(threads);
1,746!
951

952
  if (ppVnodes != NULL) {
1,746!
953
    taosMemoryFree(ppVnodes);
1,746!
954
  }
955

956
  if (pMgmt->runngingHash != NULL) {
1,746!
957
    taosHashCleanup(pMgmt->runngingHash);
1,746✔
958
    pMgmt->runngingHash = NULL;
1,746✔
959
  }
960

961
  void *pIter = taosHashIterate(pMgmt->closedHash, NULL);
1,746✔
962
  while (pIter) {
1,746!
963
    SVnodeObj **ppVnode = pIter;
×
964
    vmFreeVnodeObj(ppVnode);
×
965
    pIter = taosHashIterate(pMgmt->closedHash, pIter);
×
966
  }
967

968
  if (pMgmt->closedHash != NULL) {
1,746!
969
    taosHashCleanup(pMgmt->closedHash);
1,746✔
970
    pMgmt->closedHash = NULL;
1,746✔
971
  }
972

973
  pIter = taosHashIterate(pMgmt->creatingHash, NULL);
1,746✔
974
  while (pIter) {
1,746!
975
    SVnodeObj **ppVnode = pIter;
×
976
    vmFreeVnodeObj(ppVnode);
×
977
    pIter = taosHashIterate(pMgmt->creatingHash, pIter);
×
978
  }
979

980
  if (pMgmt->creatingHash != NULL) {
1,746!
981
    taosHashCleanup(pMgmt->creatingHash);
1,746✔
982
    pMgmt->creatingHash = NULL;
1,746✔
983
  }
984

985
#ifdef USE_MOUNT
986
  pIter = NULL;
1,746✔
987
  while ((pIter = taosHashIterate(pMgmt->mountTfsHash, pIter))) {
1,748✔
988
    SMountTfs *mountTfs = *(SMountTfs **)pIter;
2✔
989
    if (mountTfs->pFile) {
2!
990
      (void)taosUnLockFile(mountTfs->pFile);
2✔
991
      (void)taosCloseFile(&mountTfs->pFile);
2✔
992
    }
993
    tfsClose(mountTfs->pTfs);
2✔
994
    taosMemoryFree(mountTfs);
2!
995
  }
996
  taosHashCleanup(pMgmt->mountTfsHash);
1,746✔
997
  pMgmt->mountTfsHash = NULL;
1,746✔
998
#endif
999

1000
  dInfo("total vnodes:%d are all closed", numOfVnodes);
1,746!
1001
}
1002

1003
static void vmCleanup(SVnodeMgmt *pMgmt) {
1,746✔
1004
  vmCloseVnodes(pMgmt);
1,746✔
1005
  vmStopWorker(pMgmt);
1,746✔
1006
  vnodeCleanup();
1,746✔
1007
  (void)taosThreadRwlockDestroy(&pMgmt->hashLock);
1,746✔
1008
  (void)taosThreadMutexDestroy(&pMgmt->mutex);
1,746✔
1009
  taosMemoryFree(pMgmt);
1,746!
1010
}
1,746✔
1011

1012
static void vmCheckSyncTimeout(SVnodeMgmt *pMgmt) {
2,348✔
1013
  int32_t     code = 0;
2,348✔
1014
  int32_t     numOfVnodes = 0;
2,348✔
1015
  SVnodeObj **ppVnodes = NULL;
2,348✔
1016
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
2,348✔
1017
  if (code != 0) {
2,348!
1018
    dError("failed to get vnode list since %s", tstrerror(code));
×
1019
    return;
×
1020
  }
1021

1022
  if (ppVnodes != NULL) {
2,348!
1023
    for (int32_t i = 0; i < numOfVnodes; ++i) {
9,878✔
1024
      SVnodeObj *pVnode = ppVnodes[i];
7,530✔
1025
      if (!pVnode->failed) {
7,530!
1026
        vnodeSyncCheckTimeout(pVnode->pImpl);
7,530✔
1027
      }
1028
      vmReleaseVnode(pMgmt, pVnode);
7,530✔
1029
    }
1030
    taosMemoryFree(ppVnodes);
2,348!
1031
  }
1032
}
1033

1034
static void *vmThreadFp(void *param) {
1,746✔
1035
  SVnodeMgmt *pMgmt = param;
1,746✔
1036
  int64_t     lastTime = 0;
1,746✔
1037
  setThreadName("vnode-timer");
1,746✔
1038

1039
  while (1) {
916,696✔
1040
    lastTime++;
918,442✔
1041
    taosMsleep(100);
918,442✔
1042
    if (pMgmt->stop) break;
918,442✔
1043
    if (lastTime % 10 != 0) continue;
916,696✔
1044

1045
    int64_t sec = lastTime / 10;
90,889✔
1046
    if (sec % (VNODE_TIMEOUT_SEC / 2) == 0) {
90,889✔
1047
      vmCheckSyncTimeout(pMgmt);
2,348✔
1048
    }
1049
  }
1050

1051
  return NULL;
1,746✔
1052
}
1053

1054
static int32_t vmInitTimer(SVnodeMgmt *pMgmt) {
1,746✔
1055
  int32_t      code = 0;
1,746✔
1056
  TdThreadAttr thAttr;
1057
  (void)taosThreadAttrInit(&thAttr);
1,746✔
1058
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,746✔
1059
#ifdef TD_COMPACT_OS
1060
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
1061
#endif
1062
  if (taosThreadCreate(&pMgmt->thread, &thAttr, vmThreadFp, pMgmt) != 0) {
1,746!
1063
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
1064
    dError("failed to create vnode timer thread since %s", tstrerror(code));
×
1065
    return code;
×
1066
  }
1067

1068
  (void)taosThreadAttrDestroy(&thAttr);
1,746✔
1069
  return 0;
1,746✔
1070
}
1071

1072
static void vmCleanupTimer(SVnodeMgmt *pMgmt) {
1,746✔
1073
  pMgmt->stop = true;
1,746✔
1074
  if (taosCheckPthreadValid(pMgmt->thread)) {
1,746!
1075
    (void)taosThreadJoin(pMgmt->thread, NULL);
1,746✔
1076
    taosThreadClear(&pMgmt->thread);
1,746✔
1077
  }
1078
}
1,746✔
1079

1080
static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
1,746✔
1081
  int32_t code = -1;
1,746✔
1082

1083
  SVnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SVnodeMgmt));
1,746!
1084
  if (pMgmt == NULL) {
1,746!
1085
    code = terrno;
×
1086
    goto _OVER;
×
1087
  }
1088

1089
  pMgmt->pData = pInput->pData;
1,746✔
1090
  pMgmt->path = pInput->path;
1,746✔
1091
  pMgmt->name = pInput->name;
1,746✔
1092
  pMgmt->msgCb = pInput->msgCb;
1,746✔
1093
  pMgmt->msgCb.putToQueueFp = (PutToQueueFp)vmPutRpcMsgToQueue;
1,746✔
1094
  pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
1,746✔
1095
  pMgmt->msgCb.mgmt = pMgmt;
1,746✔
1096

1097
  code = taosThreadRwlockInit(&pMgmt->hashLock, NULL);
1,746✔
1098
  if (code != 0) {
1,746!
1099
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
1100
    goto _OVER;
×
1101
  }
1102

1103
  code = taosThreadMutexInit(&pMgmt->mutex, NULL);
1,746✔
1104
  if (code != 0) {
1,746!
1105
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
1106
    goto _OVER;
×
1107
  }
1108

1109
  pMgmt->pTfs = pInput->pTfs;
1,746✔
1110
  if (pMgmt->pTfs == NULL) {
1,746!
1111
    dError("tfs is null.");
×
1112
    goto _OVER;
×
1113
  }
1114
#ifdef USE_MOUNT
1115
  if (!(pMgmt->mountTfsHash =
1,746!
1116
            taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK))) {
1,746✔
1117
    dError("failed to init mountTfsHash since %s", terrstr());
×
1118
    return TSDB_CODE_OUT_OF_MEMORY;
×
1119
  }
1120
  if ((code = vmOpenMountTfs(pMgmt)) != 0) {
1,746!
1121
    goto _OVER;
×
1122
  }
1123
#endif
1124
  tmsgReportStartup("vnode-tfs", "initialized");
1,746✔
1125
  if ((code = walInit(pInput->stopDnodeFp)) != 0) {
1,746!
1126
    dError("failed to init wal since %s", tstrerror(code));
×
1127
    goto _OVER;
×
1128
  }
1129

1130
  tmsgReportStartup("vnode-wal", "initialized");
1,746✔
1131

1132
  if ((code = syncInit()) != 0) {
1,746!
1133
    dError("failed to open sync since %s", tstrerror(code));
×
1134
    goto _OVER;
×
1135
  }
1136
  tmsgReportStartup("vnode-sync", "initialized");
1,746✔
1137

1138
  if ((code = vnodeInit(pInput->stopDnodeFp)) != 0) {
1,746!
1139
    dError("failed to init vnode since %s", tstrerror(code));
×
1140
    goto _OVER;
×
1141
  }
1142
  tmsgReportStartup("vnode-commit", "initialized");
1,746✔
1143

1144
  if ((code = vmStartWorker(pMgmt)) != 0) {
1,746!
1145
    dError("failed to init workers since %s", tstrerror(code));
×
1146
    goto _OVER;
×
1147
  }
1148
  tmsgReportStartup("vnode-worker", "initialized");
1,746✔
1149

1150
  if ((code = vmOpenVnodes(pMgmt)) != 0) {
1,746!
1151
    dError("failed to open all vnodes since %s", tstrerror(code));
×
1152
    goto _OVER;
×
1153
  }
1154
  tmsgReportStartup("vnode-vnodes", "initialized");
1,746✔
1155

1156
  if ((code = udfcOpen()) != 0) {
1,746!
1157
    dError("failed to open udfc in vnode since %s", tstrerror(code));
×
1158
    goto _OVER;
×
1159
  }
1160

1161
  code = 0;
1,746✔
1162

1163
_OVER:
1,746✔
1164
  if (code == 0) {
1,746!
1165
    pOutput->pMgmt = pMgmt;
1,746✔
1166
  } else {
1167
    dError("failed to init vnodes-mgmt since %s", tstrerror(code));
×
1168
    vmCleanup(pMgmt);
×
1169
  }
1170

1171
  return code;
1,746✔
1172
}
1173

1174
static int32_t vmRequire(const SMgmtInputOpt *pInput, bool *required) {
1,759✔
1175
  *required = tsNumOfSupportVnodes > 0;
1,759✔
1176
  return 0;
1,759✔
1177
}
1178

1179
static void *vmRestoreVnodeInThread(void *param) {
1,117✔
1180
  SVnodeThread *pThread = param;
1,117✔
1181
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
1,117✔
1182

1183
  dInfo("thread:%d, start to restore %d vnodes", pThread->threadIndex, pThread->vnodeNum);
1,117!
1184
  setThreadName("restore-vnodes");
1,117✔
1185

1186
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
2,258✔
1187
    SVnodeObj *pVnode = pThread->ppVnodes[v];
1,141✔
1188
    if (pVnode->failed) {
1,141!
1189
      dError("vgId:%d, cannot restore a vnode in failed mode.", pVnode->vgId);
×
1190
      continue;
×
1191
    }
1192

1193
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
1,141✔
1194
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been restored", pVnode->vgId,
1,141✔
1195
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
1196
    tmsgReportStartup("vnode-restore", stepDesc);
1,141✔
1197

1198
    int32_t code = vnodeStart(pVnode->pImpl);
1,141✔
1199
    if (code != 0) {
1,141!
1200
      dError("vgId:%d, failed to restore vnode by thread:%d", pVnode->vgId, pThread->threadIndex);
×
1201
      pThread->failed++;
×
1202
    } else {
1203
      dInfo("vgId:%d, is restored by thread:%d", pVnode->vgId, pThread->threadIndex);
1,141!
1204
      pThread->opened++;
1,141✔
1205
      (void)atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
1,141✔
1206
    }
1207
  }
1208

1209
  dInfo("thread:%d, numOfVnodes:%d, restored:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
1,117!
1210
        pThread->failed);
1211
  return NULL;
1,117✔
1212
}
1213

1214
static int32_t vmStartVnodes(SVnodeMgmt *pMgmt) {
1,746✔
1215
  int32_t     code = 0;
1,746✔
1216
  int32_t     numOfVnodes = 0;
1,746✔
1217
  SVnodeObj **ppVnodes = NULL;
1,746✔
1218
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
1,746✔
1219
  if (code != 0) {
1,746!
1220
    dError("failed to get vnode list since %s", tstrerror(code));
×
1221
    return code;
×
1222
  }
1223

1224
  int32_t threadNum = tsNumOfCores / 2;
1,746✔
1225
  if (threadNum < 1) threadNum = 1;
1,746!
1226
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
1,746✔
1227

1228
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
1,746!
1229
  if (threads == NULL) {
1,746!
1230
    return terrno;
×
1231
  }
1232

1233
  for (int32_t t = 0; t < threadNum; ++t) {
36,666✔
1234
    threads[t].threadIndex = t;
34,920✔
1235
    threads[t].pMgmt = pMgmt;
34,920✔
1236
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
34,920!
1237
    if (threads[t].ppVnodes == NULL) {
34,920!
1238
      code = terrno;
×
1239
      break;
×
1240
    }
1241
  }
1242

1243
  for (int32_t v = 0; v < numOfVnodes; ++v) {
2,887✔
1244
    int32_t       t = v % threadNum;
1,141✔
1245
    SVnodeThread *pThread = &threads[t];
1,141✔
1246
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
1,141!
1247
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
1,141✔
1248
    }
1249
  }
1250

1251
  pMgmt->state.openVnodes = 0;
1,746✔
1252
  pMgmt->state.dropVnodes = 0;
1,746✔
1253
  dInfo("restore %d vnodes with %d threads", numOfVnodes, threadNum);
1,746!
1254

1255
  for (int32_t t = 0; t < threadNum; ++t) {
36,666✔
1256
    SVnodeThread *pThread = &threads[t];
34,920✔
1257
    if (pThread->vnodeNum == 0) continue;
34,920✔
1258

1259
    TdThreadAttr thAttr;
1260
    (void)taosThreadAttrInit(&thAttr);
1,117✔
1261
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,117✔
1262
    if (taosThreadCreate(&pThread->thread, &thAttr, vmRestoreVnodeInThread, pThread) != 0) {
1,117!
1263
      dError("thread:%d, failed to create thread to restore vnode since %s", pThread->threadIndex, strerror(ERRNO));
×
1264
    }
1265

1266
    (void)taosThreadAttrDestroy(&thAttr);
1,117✔
1267
  }
1268

1269
  for (int32_t t = 0; t < threadNum; ++t) {
36,666✔
1270
    SVnodeThread *pThread = &threads[t];
34,920✔
1271
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
34,920!
1272
      (void)taosThreadJoin(pThread->thread, NULL);
1,117✔
1273
      taosThreadClear(&pThread->thread);
1,117✔
1274
    }
1275
    taosMemoryFree(pThread->ppVnodes);
34,920!
1276
  }
1277
  taosMemoryFree(threads);
1,746!
1278

1279
  for (int32_t i = 0; i < numOfVnodes; ++i) {
2,887✔
1280
    if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
1,141!
1281
    vmReleaseVnode(pMgmt, ppVnodes[i]);
1,141✔
1282
  }
1283

1284
  if (ppVnodes != NULL) {
1,746!
1285
    taosMemoryFree(ppVnodes);
1,746!
1286
  }
1287

1288
  return vmInitTimer(pMgmt);
1,746✔
1289

1290
_exit:
1291
  for (int32_t t = 0; t < threadNum; ++t) {
1292
    SVnodeThread *pThread = &threads[t];
1293
    taosMemoryFree(pThread->ppVnodes);
1294
  }
1295
  taosMemoryFree(threads);
1296
  return code;
1297
}
1298

1299
static void vmStop(SVnodeMgmt *pMgmt) { vmCleanupTimer(pMgmt); }
1,746✔
1300

1301
SMgmtFunc vmGetMgmtFunc() {
1,759✔
1302
  SMgmtFunc mgmtFunc = {0};
1,759✔
1303
  mgmtFunc.openFp = vmInit;
1,759✔
1304
  mgmtFunc.closeFp = (NodeCloseFp)vmCleanup;
1,759✔
1305
  mgmtFunc.startFp = (NodeStartFp)vmStartVnodes;
1,759✔
1306
  mgmtFunc.stopFp = (NodeStopFp)vmStop;
1,759✔
1307
  mgmtFunc.requiredFp = vmRequire;
1,759✔
1308
  mgmtFunc.getHandlesFp = vmGetMsgHandles;
1,759✔
1309

1310
  return mgmtFunc;
1,759✔
1311
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc