• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3523

06 Nov 2024 02:29AM UTC coverage: 55.861% (-2.4%) from 58.216%
#3523

push

travis-ci

web-flow
Merge pull request #28551 from taosdata/feat/TS-5215-2

test(blob): testing & fixes for blob

106075 of 245834 branches covered (43.15%)

Branch coverage included in aggregate %.

0 of 15 new or added lines in 2 files covered. (0.0%)

17003 existing lines in 254 files now uncovered.

181910 of 269703 relevant lines covered (67.45%)

1527639.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.94
/source/dnode/mgmt/mgmt_vnode/src/vmInt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "vmInt.h"
18
#include "libs/function/tudf.h"
19
#include "tfs.h"
20
#include "vnd.h"
21

22
int32_t vmGetPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
4,703✔
23
  int32_t    diskId = -1;
4,703✔
24
  SVnodeObj *pVnode = NULL;
4,703✔
25

26
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
4,703✔
27
  int32_t r = taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
4,704✔
28
  if (pVnode != NULL) {
4,703!
29
    diskId = pVnode->diskPrimary;
×
30
  }
31
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
4,703✔
32
  return diskId;
4,704✔
33
}
34

35
int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
4,704✔
36
  int32_t code = 0;
4,704✔
37
  STfs   *pTfs = pMgmt->pTfs;
4,704✔
38
  int32_t diskId = 0;
4,704✔
39
  if (!pTfs) {
4,704!
40
    return diskId;
×
41
  }
42

43
  // search fs
44
  char vnodePath[TSDB_FILENAME_LEN] = {0};
4,704✔
45
  snprintf(vnodePath, TSDB_FILENAME_LEN - 1, "vnode%svnode%d", TD_DIRSEP, vgId);
4,704✔
46
  char fname[TSDB_FILENAME_LEN] = {0};
4,704✔
47
  char fnameTmp[TSDB_FILENAME_LEN] = {0};
4,704✔
48
  snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME);
4,704✔
49
  snprintf(fnameTmp, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME_TMP);
4,704✔
50

51
  diskId = tfsSearch(pTfs, 0, fname);
4,704✔
52
  if (diskId >= 0) {
4,687!
53
    return diskId;
×
54
  }
55
  diskId = tfsSearch(pTfs, 0, fnameTmp);
4,687✔
56
  if (diskId >= 0) {
4,696!
57
    return diskId;
×
58
  }
59

60
  // alloc
61
  int32_t     disks[TFS_MAX_DISKS_PER_TIER] = {0};
4,696✔
62
  int32_t     numOfVnodes = 0;
4,696✔
63
  SVnodeObj **ppVnodes = NULL;
4,696✔
64

65
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
4,696✔
66
  if (code != 0) {
4,701!
67
    return code;
×
68
  }
69
  for (int32_t v = 0; v < numOfVnodes; v++) {
642,948✔
70
    SVnodeObj *pVnode = ppVnodes[v];
638,247✔
71
    disks[pVnode->diskPrimary] += 1;
638,247✔
72
  }
73

74
  int32_t minVal = INT_MAX;
4,701✔
75
  int32_t ndisk = tfsGetDisksAtLevel(pTfs, 0);
4,701✔
76
  diskId = 0;
4,686✔
77
  for (int32_t id = 0; id < ndisk; id++) {
9,385✔
78
    if (minVal > disks[id]) {
4,699✔
79
      minVal = disks[id];
4,687✔
80
      diskId = id;
4,687✔
81
    }
82
  }
83

84
  for (int32_t i = 0; i < numOfVnodes; ++i) {
644,609✔
85
    if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
639,954!
86
    vmReleaseVnode(pMgmt, ppVnodes[i]);
639,954✔
87
  }
88
  if (ppVnodes != NULL) {
4,655!
89
    taosMemoryFree(ppVnodes);
4,675✔
90
  }
91

92
  dInfo("vgId:%d, alloc disk:%d of level 0. ndisk:%d, vnodes: %d", vgId, diskId, ndisk, numOfVnodes);
4,662!
93
  return diskId;
4,704✔
94
}
95

96
SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict) {
6,199,118✔
97
  SVnodeObj *pVnode = NULL;
6,199,118✔
98

99
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
6,199,118✔
100
  int32_t r = taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
6,204,810✔
101
  if (pVnode == NULL || strict && (pVnode->dropped || pVnode->failed)) {
6,203,466!
102
    terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
112,719✔
103
    pVnode = NULL;
112,490✔
104
  } else {
105
    int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
6,090,747✔
106
    dTrace("vgId:%d, acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
6,090,201✔
107
  }
108
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
6,202,691✔
109

110
  return pVnode;
6,203,738✔
111
}
112

113
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) { return vmAcquireVnodeImpl(pMgmt, vgId, true); }
6,191,555✔
114

115
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
8,876,333✔
116
  if (pVnode == NULL) return;
8,876,333!
117

118
  //(void)taosThreadRwlockRdlock(&pMgmt->lock);
119
  int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
8,876,333✔
120
  dTrace("vgId:%d, release vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
8,879,053✔
121
  //(void)taosThreadRwlockUnlock(&pMgmt->lock);
122
}
123

124
static void vmFreeVnodeObj(SVnodeObj **ppVnode) {
5,693✔
125
  if (!ppVnode || !(*ppVnode)) return;
5,693!
126

127
  SVnodeObj *pVnode = *ppVnode;
5,693✔
128

129
  int32_t refCount = atomic_load_32(&pVnode->refCount);
5,693✔
130
  while (refCount > 0) {
5,693!
131
    dWarn("vgId:%d, vnode is refenced, retry to free in 200ms, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
×
132
    taosMsleep(200);
×
133
    refCount = atomic_load_32(&pVnode->refCount);
×
134
  }
135

136
  taosMemoryFree(pVnode->path);
5,693✔
137
  taosMemoryFree(pVnode);
5,693✔
138
  ppVnode[0] = NULL;
5,693✔
139
}
140

141
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
5,405✔
142
  SVnodeObj *pVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
5,405✔
143
  if (pVnode == NULL) {
5,404!
144
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
145
    return -1;
×
146
  }
147

148
  pVnode->vgId = pCfg->vgId;
5,404✔
149
  pVnode->vgVersion = pCfg->vgVersion;
5,404✔
150
  pVnode->diskPrimary = pCfg->diskPrimary;
5,404✔
151
  pVnode->refCount = 0;
5,404✔
152
  pVnode->dropped = 0;
5,404✔
153
  pVnode->failed = 0;
5,404✔
154
  pVnode->path = taosStrdup(pCfg->path);
5,404✔
155
  pVnode->pImpl = pImpl;
5,405✔
156

157
  if (pVnode->path == NULL) {
5,405!
158
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
159
    taosMemoryFree(pVnode);
×
160
    return -1;
×
161
  }
162

163
  if (pImpl) {
5,406!
164
    if (vmAllocQueue(pMgmt, pVnode) != 0) {
5,406!
165
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
166
      taosMemoryFree(pVnode->path);
×
167
      taosMemoryFree(pVnode);
×
168
      return -1;
×
169
    }
170
  } else {
171
    pVnode->failed = 1;
×
172
  }
173

174
  (void)taosThreadRwlockWrlock(&pMgmt->lock);
5,406✔
175
  SVnodeObj *pOld = NULL;
5,406✔
176
  int32_t    r = taosHashGetDup(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
5,406✔
177
  if (r != 0) {
5,406!
178
    dError("vgId:%d, failed to get vnode from hash", pVnode->vgId);
×
179
  }
180
  if (pOld) {
5,406!
181
    vmFreeVnodeObj(&pOld);
×
182
  }
183
  int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
5,406✔
184

185
  pOld = NULL;
5,406✔
186
  r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
5,406✔
187
  if (r != 0) {
5,406!
188
    dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
×
189
  }
190
  if (pOld) {
5,406✔
191
    vmFreeVnodeObj(&pOld);
287✔
192
  }
193

194
  dInfo("vgId:%d, remove from closedHash", pVnode->vgId);
5,406!
195
  r = taosHashRemove(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t));
5,406✔
196
  if (r != 0) {
5,406✔
197
    dError("vgId:%d, failed to remove vnode from hash", pVnode->vgId);
5,119!
198
  }
199
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
5,406✔
200

201
  return code;
5,406✔
202
}
203

204
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal, bool keepClosed) {
5,405✔
205
  char path[TSDB_FILENAME_LEN] = {0};
5,405✔
206
  bool atExit = true;
5,405✔
207

208
  if (pVnode->pImpl && vnodeIsLeader(pVnode->pImpl)) {
5,405!
209
    vnodeProposeCommitOnNeed(pVnode->pImpl, atExit);
4,935✔
210
  }
211

212
  (void)taosThreadRwlockWrlock(&pMgmt->lock);
5,406✔
213
  int32_t r = taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
5,406✔
214
  if (r != 0) {
5,406!
215
    dError("vgId:%d, failed to remove vnode from hash", pVnode->vgId);
×
216
  }
217
  if (keepClosed) {
5,406✔
218
    SVnodeObj *pClosedVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
287✔
219
    (void)memset(pClosedVnode, 0, sizeof(SVnodeObj));
287✔
220
    if (pVnode == NULL) {
287!
221
      dError("vgId:%d, failed to alloc vnode since %s", pVnode->vgId, terrstr());
×
222
      (void)taosThreadRwlockUnlock(&pMgmt->lock);
×
223
      return;
×
224
    }
225

226
    pClosedVnode->vgId = pVnode->vgId;
287✔
227
    pClosedVnode->dropped = pVnode->dropped;
287✔
228
    pClosedVnode->vgVersion = pVnode->vgVersion;
287✔
229
    pClosedVnode->diskPrimary = pVnode->diskPrimary;
287✔
230
    pClosedVnode->toVgId = pVnode->toVgId;
287✔
231

232
    SVnodeObj *pOld = NULL;
287✔
233
    r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
287✔
234
    if (r != 0) {
287!
235
      dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
×
236
    }
237
    if (pOld) {
287!
238
      vmFreeVnodeObj(&pOld);
×
239
    }
240
    dInfo("vgId:%d, put vnode to closedHash", pVnode->vgId);
287!
241
    r = taosHashPut(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), &pClosedVnode, sizeof(SVnodeObj *));
287✔
242
    if (r != 0) {
287!
243
      dError("vgId:%d, failed to put vnode to closedHash", pVnode->vgId);
×
244
    }
245
  }
246
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
5,406✔
247

248
  vmReleaseVnode(pMgmt, pVnode);
5,406✔
249

250
  if (pVnode->failed) {
5,406!
251
    goto _closed;
×
252
  }
253
  dInfo("vgId:%d, pre close", pVnode->vgId);
5,406!
254
  vnodePreClose(pVnode->pImpl);
5,406✔
255

256
  dInfo("vgId:%d, wait for vnode ref become 0", pVnode->vgId);
5,404!
257
  while (pVnode->refCount > 0) taosMsleep(10);
5,406!
258

259
  dInfo("vgId:%d, wait for vnode write queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteW.queue,
5,406!
260
        taosQueueGetThreadId(pVnode->pWriteW.queue));
261
  tMultiWorkerCleanup(&pVnode->pWriteW);
5,406✔
262

263
  dInfo("vgId:%d, wait for vnode sync queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue,
5,406!
264
        taosQueueGetThreadId(pVnode->pSyncW.queue));
265
  tMultiWorkerCleanup(&pVnode->pSyncW);
5,406✔
266

267
  dInfo("vgId:%d, wait for vnode sync rd queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncRdW.queue,
5,406!
268
        taosQueueGetThreadId(pVnode->pSyncRdW.queue));
269
  tMultiWorkerCleanup(&pVnode->pSyncRdW);
5,406✔
270

271
  dInfo("vgId:%d, wait for vnode apply queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue,
5,406!
272
        taosQueueGetThreadId(pVnode->pApplyW.queue));
273
  tMultiWorkerCleanup(&pVnode->pApplyW);
5,406✔
274

275
  dInfo("vgId:%d, wait for vnode fetch queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
5,406!
276
        taosQueueGetThreadId(pVnode->pFetchQ));
277
  while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
5,406!
278

279
  dInfo("vgId:%d, wait for vnode query queue:%p is empty", pVnode->vgId, pVnode->pQueryQ);
5,406!
280
  while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
5,406!
281

282
  tqNotifyClose(pVnode->pImpl->pTq);
5,406✔
283
  dInfo("vgId:%d, wait for vnode stream queue:%p is empty", pVnode->vgId, pVnode->pStreamQ);
5,406!
284
  while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10);
5,432✔
285

286
  dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId);
5,406!
287

288
  dInfo("vgId:%d, post close", pVnode->vgId);
5,406!
289
  vnodePostClose(pVnode->pImpl);
5,406✔
290

291
  vmFreeQueue(pMgmt, pVnode);
5,406✔
292

293
  if (commitAndRemoveWal) {
5,406!
UNCOV
294
    dInfo("vgId:%d, commit data for vnode split", pVnode->vgId);
×
UNCOV
295
    if (vnodeSyncCommit(pVnode->pImpl) != 0) {
×
296
      dError("vgId:%d, failed to commit data", pVnode->vgId);
×
297
    }
UNCOV
298
    if (vnodeBegin(pVnode->pImpl) != 0) {
×
299
      dError("vgId:%d, failed to begin", pVnode->vgId);
×
300
    }
UNCOV
301
    dInfo("vgId:%d, commit data finished", pVnode->vgId);
×
302
  }
303

304
  int32_t nodeId = vnodeNodeId(pVnode->pImpl);
5,406✔
305
  vnodeClose(pVnode->pImpl);
5,405✔
306
  pVnode->pImpl = NULL;
5,406✔
307

308
_closed:
5,406✔
309
  dInfo("vgId:%d, vnode is closed", pVnode->vgId);
5,406!
310

311
  if (commitAndRemoveWal) {
5,406!
UNCOV
312
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d%swal", TD_DIRSEP, pVnode->vgId, TD_DIRSEP);
×
UNCOV
313
    dInfo("vgId:%d, remove all wals, path:%s", pVnode->vgId, path);
×
UNCOV
314
    if (tfsRmdir(pMgmt->pTfs, path) != 0) {
×
315
      dTrace("vgId:%d, failed to remove wals, path:%s", pVnode->vgId, path);
×
316
    }
UNCOV
317
    if (tfsMkdir(pMgmt->pTfs, path) != 0) {
×
318
      dTrace("vgId:%d, failed to create wals, path:%s", pVnode->vgId, path);
×
319
    }
320
  }
321

322
  if (pVnode->dropped) {
5,406✔
323
    dInfo("vgId:%d, vnode is destroyed, dropped:%d", pVnode->vgId, pVnode->dropped);
3,060!
324
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId);
3,060✔
325
    vnodeDestroy(pVnode->vgId, path, pMgmt->pTfs, nodeId);
3,060✔
326
  }
327

328
  vmFreeVnodeObj(&pVnode);
5,406✔
329
}
330

331
static int32_t vmRestoreVgroupId(SWrapperCfg *pCfg, STfs *pTfs) {
×
332
  int32_t srcVgId = pCfg->vgId;
×
333
  int32_t dstVgId = pCfg->toVgId;
×
334
  if (dstVgId == 0) return 0;
×
335

336
  char srcPath[TSDB_FILENAME_LEN];
337
  char dstPath[TSDB_FILENAME_LEN];
338

339
  snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
×
340
  snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
×
341

342
  int32_t diskPrimary = pCfg->diskPrimary;
×
343
  int32_t vgId = vnodeRestoreVgroupId(srcPath, dstPath, srcVgId, dstVgId, diskPrimary, pTfs);
×
344
  if (vgId <= 0) {
×
345
    dError("vgId:%d, failed to restore vgroup id. srcPath: %s", pCfg->vgId, srcPath);
×
346
    return -1;
×
347
  }
348

349
  pCfg->vgId = vgId;
×
350
  pCfg->toVgId = 0;
×
351
  return 0;
×
352
}
353

354
static void *vmOpenVnodeInThread(void *param) {
415✔
355
  SVnodeThread *pThread = param;
415✔
356
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
415✔
357
  char          path[TSDB_FILENAME_LEN];
358

359
  dInfo("thread:%d, start to open or destroy %d vnodes", pThread->threadIndex, pThread->vnodeNum);
415!
360
  setThreadName("open-vnodes");
415✔
361

362
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
830✔
363
    SWrapperCfg *pCfg = &pThread->pCfgs[v];
415✔
364
    if (pCfg->dropped) {
415!
365
      char stepDesc[TSDB_STEP_DESC_LEN] = {0};
×
366
      snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to destroy, %d of %d have been dropped", pCfg->vgId,
×
367
               pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
368
      tmsgReportStartup("vnode-destroy", stepDesc);
×
369

370
      snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
×
371
      vnodeDestroy(pCfg->vgId, path, pMgmt->pTfs, 0);
×
372
      pThread->updateVnodesList = true;
×
373
      pThread->dropped++;
×
374
      (void)atomic_add_fetch_32(&pMgmt->state.dropVnodes, 1);
×
375
      continue;
×
376
    }
377

378
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
415✔
379
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
415✔
380
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
381
    tmsgReportStartup("vnode-open", stepDesc);
415✔
382

383
    if (pCfg->toVgId) {
415!
384
      if (vmRestoreVgroupId(pCfg, pMgmt->pTfs) != 0) {
×
385
        dError("vgId:%d, failed to restore vgroup id by thread:%d", pCfg->vgId, pThread->threadIndex);
×
386
        pThread->failed++;
×
387
        continue;
×
388
      }
389
      pThread->updateVnodesList = true;
×
390
    }
391

392
    int32_t diskPrimary = pCfg->diskPrimary;
415✔
393
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
415✔
394

395
    SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, false);
415✔
396

397
    if (pImpl == NULL) {
414!
398
      dError("vgId:%d, failed to open vnode by thread:%d since %s", pCfg->vgId, pThread->threadIndex, terrstr());
×
399
      if (terrno != TSDB_CODE_NEED_RETRY) {
×
400
        pThread->failed++;
×
401
        continue;
×
402
      }
403
    }
404

405
    if (vmOpenVnode(pMgmt, pCfg, pImpl) != 0) {
414!
406
      dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
×
407
      pThread->failed++;
×
408
      continue;
×
409
    }
410

411
    dInfo("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
415!
412
    pThread->opened++;
415✔
413
    (void)atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
415✔
414
  }
415

416
  dInfo("thread:%d, numOfVnodes:%d, opened:%d dropped:%d failed:%d", pThread->threadIndex, pThread->vnodeNum,
415!
417
        pThread->opened, pThread->dropped, pThread->failed);
418
  return NULL;
415✔
419
}
420

421
static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
853✔
422
  pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
853✔
423
  if (pMgmt->hash == NULL) {
853!
424
    dError("failed to init vnode hash since %s", terrstr());
×
425
    return TSDB_CODE_OUT_OF_MEMORY;
×
426
  }
427

428
  pMgmt->closedHash =
853✔
429
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
853✔
430
  if (pMgmt->hash == NULL) {
853!
431
    dError("failed to init vnode closed hash since %s", terrstr());
×
432
    return TSDB_CODE_OUT_OF_MEMORY;
×
433
  }
434

435
  SWrapperCfg *pCfgs = NULL;
853✔
436
  int32_t      numOfVnodes = 0;
853✔
437
  if (vmGetVnodeListFromFile(pMgmt, &pCfgs, &numOfVnodes) != 0) {
853!
438
    dInfo("failed to get vnode list from disk since %s", terrstr());
×
439
    return -1;
×
440
  }
441

442
  pMgmt->state.totalVnodes = numOfVnodes;
853✔
443

444
  int32_t threadNum = tsNumOfCores / 2;
853✔
445
  if (threadNum < 1) threadNum = 1;
853!
446
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
853✔
447

448
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
853✔
449
  if (threads == NULL) {
853!
450
    dError("failed to allocate memory for threads since %s", terrstr());
×
451
    taosMemoryFree(pCfgs);
×
452
    return terrno;
×
453
  }
454

455
  for (int32_t t = 0; t < threadNum; ++t) {
17,913✔
456
    threads[t].threadIndex = t;
17,060✔
457
    threads[t].pMgmt = pMgmt;
17,060✔
458
    threads[t].pCfgs = taosMemoryCalloc(vnodesPerThread, sizeof(SWrapperCfg));
17,060✔
459
  }
460

461
  for (int32_t v = 0; v < numOfVnodes; ++v) {
1,268✔
462
    int32_t       t = v % threadNum;
415✔
463
    SVnodeThread *pThread = &threads[t];
415✔
464
    pThread->pCfgs[pThread->vnodeNum++] = pCfgs[v];
415✔
465
  }
466

467
  dInfo("open %d vnodes with %d threads", numOfVnodes, threadNum);
853!
468

469
  for (int32_t t = 0; t < threadNum; ++t) {
17,913✔
470
    SVnodeThread *pThread = &threads[t];
17,060✔
471
    if (pThread->vnodeNum == 0) continue;
17,060✔
472

473
    TdThreadAttr thAttr;
474
    (void)taosThreadAttrInit(&thAttr);
415✔
475
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
415✔
476
    if (taosThreadCreate(&pThread->thread, &thAttr, vmOpenVnodeInThread, pThread) != 0) {
415!
477
      dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
×
478
    }
479

480
    (void)taosThreadAttrDestroy(&thAttr);
415✔
481
  }
482

483
  bool updateVnodesList = false;
853✔
484

485
  for (int32_t t = 0; t < threadNum; ++t) {
17,913✔
486
    SVnodeThread *pThread = &threads[t];
17,060✔
487
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
17,060!
488
      (void)taosThreadJoin(pThread->thread, NULL);
415✔
489
      taosThreadClear(&pThread->thread);
415✔
490
    }
491
    taosMemoryFree(pThread->pCfgs);
17,060✔
492
    if (pThread->updateVnodesList) updateVnodesList = true;
17,060!
493
  }
494
  taosMemoryFree(threads);
853✔
495
  taosMemoryFree(pCfgs);
853✔
496

497
  if ((pMgmt->state.openVnodes + pMgmt->state.dropVnodes) != pMgmt->state.totalVnodes) {
853!
498
    dError("there are total vnodes:%d, opened:%d", pMgmt->state.totalVnodes, pMgmt->state.openVnodes);
×
499
    terrno = TSDB_CODE_VND_INIT_FAILED;
×
500
    return -1;
×
501
  }
502

503
  if (updateVnodesList && vmWriteVnodeListToFile(pMgmt) != 0) {
853!
504
    dError("failed to write vnode list since %s", terrstr());
×
505
    return -1;
×
506
  }
507

508
  dInfo("successfully opened %d vnodes", pMgmt->state.totalVnodes);
853!
509
  return 0;
853✔
510
}
511

512
static void *vmCloseVnodeInThread(void *param) {
1,853✔
513
  SVnodeThread *pThread = param;
1,853✔
514
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
1,853✔
515

516
  dInfo("thread:%d, start to close %d vnodes", pThread->threadIndex, pThread->vnodeNum);
1,853!
517
  setThreadName("close-vnodes");
1,853✔
518

519
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
3,912✔
520
    SVnodeObj *pVnode = pThread->ppVnodes[v];
2,059✔
521

522
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
2,059✔
523
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to close, %d of %d have been closed", pVnode->vgId,
2,059✔
524
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
525
    tmsgReportStartup("vnode-close", stepDesc);
2,059✔
526

527
    vmCloseVnode(pMgmt, pVnode, false, false);
2,059✔
528
  }
529

530
  dInfo("thread:%d, numOfVnodes:%d is closed", pThread->threadIndex, pThread->vnodeNum);
1,853!
531
  return NULL;
1,853✔
532
}
533

534
static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
853✔
535
  int32_t code = 0;
853✔
536
  dInfo("start to close all vnodes");
853!
537
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
853✔
538
  dInfo("vnodes mgmt worker is stopped");
853!
539
  tSingleWorkerCleanup(&pMgmt->mgmtMultiWorker);
853✔
540
  dInfo("vnodes multiple mgmt worker is stopped");
853!
541

542
  int32_t     numOfVnodes = 0;
853✔
543
  SVnodeObj **ppVnodes = NULL;
853✔
544
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
853✔
545
  if (code != 0) {
853!
546
    dError("failed to get vnode list since %s", tstrerror(code));
×
547
    return;
×
548
  }
549

550
  int32_t threadNum = tsNumOfCores / 2;
853✔
551
  if (threadNum < 1) threadNum = 1;
853!
552
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
853✔
553

554
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
853✔
555
  for (int32_t t = 0; t < threadNum; ++t) {
17,913✔
556
    threads[t].threadIndex = t;
17,060✔
557
    threads[t].pMgmt = pMgmt;
17,060✔
558
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
17,060✔
559
  }
560

561
  for (int32_t v = 0; v < numOfVnodes; ++v) {
2,912✔
562
    int32_t       t = v % threadNum;
2,059✔
563
    SVnodeThread *pThread = &threads[t];
2,059✔
564
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
2,059!
565
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
2,059✔
566
    }
567
  }
568

569
  pMgmt->state.openVnodes = 0;
853✔
570
  dInfo("close %d vnodes with %d threads", numOfVnodes, threadNum);
853!
571

572
  for (int32_t t = 0; t < threadNum; ++t) {
17,913✔
573
    SVnodeThread *pThread = &threads[t];
17,060✔
574
    if (pThread->vnodeNum == 0) continue;
17,060✔
575

576
    TdThreadAttr thAttr;
577
    (void)taosThreadAttrInit(&thAttr);
1,853✔
578
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,853✔
579
    if (taosThreadCreate(&pThread->thread, &thAttr, vmCloseVnodeInThread, pThread) != 0) {
1,853!
580
      dError("thread:%d, failed to create thread to close vnode since %s", pThread->threadIndex, strerror(errno));
×
581
    }
582

583
    (void)taosThreadAttrDestroy(&thAttr);
1,853✔
584
  }
585

586
  for (int32_t t = 0; t < threadNum; ++t) {
17,913✔
587
    SVnodeThread *pThread = &threads[t];
17,060✔
588
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
17,060!
589
      (void)taosThreadJoin(pThread->thread, NULL);
1,853✔
590
      taosThreadClear(&pThread->thread);
1,853✔
591
    }
592
    taosMemoryFree(pThread->ppVnodes);
17,060✔
593
  }
594
  taosMemoryFree(threads);
853✔
595

596
  if (ppVnodes != NULL) {
853!
597
    taosMemoryFree(ppVnodes);
853✔
598
  }
599

600
  if (pMgmt->hash != NULL) {
853!
601
    taosHashCleanup(pMgmt->hash);
853✔
602
    pMgmt->hash = NULL;
853✔
603
  }
604

605
  void *pIter = taosHashIterate(pMgmt->closedHash, NULL);
853✔
606
  while (pIter) {
853!
607
    SVnodeObj **ppVnode = pIter;
×
608
    vmFreeVnodeObj(ppVnode);
×
609
    pIter = taosHashIterate(pMgmt->closedHash, pIter);
×
610
  }
611

612
  if (pMgmt->closedHash != NULL) {
853!
613
    taosHashCleanup(pMgmt->closedHash);
853✔
614
    pMgmt->closedHash = NULL;
853✔
615
  }
616

617
  dInfo("total vnodes:%d are all closed", numOfVnodes);
853!
618
}
619

620
static void vmCleanup(SVnodeMgmt *pMgmt) {
853✔
621
  vmCloseVnodes(pMgmt);
853✔
622
  vmStopWorker(pMgmt);
853✔
623
  vnodeCleanup();
853✔
624
  (void)taosThreadRwlockDestroy(&pMgmt->lock);
853✔
625
  (void)taosThreadMutexDestroy(&pMgmt->fileLock);
853✔
626
  taosMemoryFree(pMgmt);
853✔
627
}
853✔
628

629
static void vmCheckSyncTimeout(SVnodeMgmt *pMgmt) {
1,942✔
630
  int32_t     code = 0;
1,942✔
631
  int32_t     numOfVnodes = 0;
1,942✔
632
  SVnodeObj **ppVnodes = NULL;
1,942✔
633
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
1,942✔
634
  if (code != 0) {
1,942!
635
    dError("failed to get vnode list since %s", tstrerror(code));
×
636
    return;
×
637
  }
638

639
  if (ppVnodes != NULL) {
1,942!
640
    for (int32_t i = 0; i < numOfVnodes; ++i) {
102,320✔
641
      SVnodeObj *pVnode = ppVnodes[i];
100,378✔
642
      if (!pVnode->failed) {
100,378!
643
        vnodeSyncCheckTimeout(pVnode->pImpl);
100,378✔
644
      }
645
      vmReleaseVnode(pMgmt, pVnode);
100,378✔
646
    }
647
    taosMemoryFree(ppVnodes);
1,942✔
648
  }
649
}
650

651
static void *vmThreadFp(void *param) {
853✔
652
  SVnodeMgmt *pMgmt = param;
853✔
653
  int64_t     lastTime = 0;
853✔
654
  setThreadName("vnode-timer");
853✔
655

656
  while (1) {
662,905✔
657
    lastTime++;
663,758✔
658
    taosMsleep(100);
663,758✔
659
    if (pMgmt->stop) break;
663,758✔
660
    if (lastTime % 10 != 0) continue;
662,905✔
661

662
    int64_t sec = lastTime / 10;
65,904✔
663
    if (sec % (VNODE_TIMEOUT_SEC / 2) == 0) {
65,904✔
664
      vmCheckSyncTimeout(pMgmt);
1,942✔
665
    }
666
  }
667

668
  return NULL;
853✔
669
}
670

671
static int32_t vmInitTimer(SVnodeMgmt *pMgmt) {
853✔
672
  int32_t      code = 0;
853✔
673
  TdThreadAttr thAttr;
674
  (void)taosThreadAttrInit(&thAttr);
853✔
675
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
853✔
676
  if (taosThreadCreate(&pMgmt->thread, &thAttr, vmThreadFp, pMgmt) != 0) {
853!
677
    code = TAOS_SYSTEM_ERROR(errno);
×
678
    dError("failed to create vnode timer thread since %s", tstrerror(code));
×
679
    return code;
×
680
  }
681

682
  (void)taosThreadAttrDestroy(&thAttr);
853✔
683
  return 0;
853✔
684
}
685

686
static void vmCleanupTimer(SVnodeMgmt *pMgmt) {
853✔
687
  pMgmt->stop = true;
853✔
688
  if (taosCheckPthreadValid(pMgmt->thread)) {
853!
689
    (void)taosThreadJoin(pMgmt->thread, NULL);
853✔
690
    taosThreadClear(&pMgmt->thread);
853✔
691
  }
692
}
853✔
693

694
static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
853✔
695
  int32_t code = -1;
853✔
696

697
  SVnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SVnodeMgmt));
853✔
698
  if (pMgmt == NULL) {
853!
699
    code = terrno;
×
700
    goto _OVER;
×
701
  }
702

703
  pMgmt->pData = pInput->pData;
853✔
704
  pMgmt->path = pInput->path;
853✔
705
  pMgmt->name = pInput->name;
853✔
706
  pMgmt->msgCb = pInput->msgCb;
853✔
707
  pMgmt->msgCb.putToQueueFp = (PutToQueueFp)vmPutRpcMsgToQueue;
853✔
708
  pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
853✔
709
  pMgmt->msgCb.mgmt = pMgmt;
853✔
710

711
  code = taosThreadRwlockInit(&pMgmt->lock, NULL);
853✔
712
  if (code != 0) {
853!
713
    code = TAOS_SYSTEM_ERROR(errno);
×
714
    goto _OVER;
×
715
  }
716

717
  code = taosThreadMutexInit(&pMgmt->fileLock, NULL);
853✔
718
  if (code != 0) {
853!
719
    code = TAOS_SYSTEM_ERROR(errno);
×
720
    goto _OVER;
×
721
  }
722

723
  pMgmt->pTfs = pInput->pTfs;
853✔
724
  if (pMgmt->pTfs == NULL) {
853!
725
    dError("tfs is null.");
×
726
    goto _OVER;
×
727
  }
728
  tmsgReportStartup("vnode-tfs", "initialized");
853✔
729
  if ((code = walInit(pInput->stopDnodeFp)) != 0) {
853!
730
    dError("failed to init wal since %s", tstrerror(code));
×
731
    goto _OVER;
×
732
  }
733

734
  tmsgReportStartup("vnode-wal", "initialized");
853✔
735

736
  if ((code = syncInit()) != 0) {
853!
737
    dError("failed to open sync since %s", tstrerror(code));
×
738
    goto _OVER;
×
739
  }
740
  tmsgReportStartup("vnode-sync", "initialized");
853✔
741

742
  if ((code = vnodeInit(tsNumOfCommitThreads, pInput->stopDnodeFp)) != 0) {
853!
743
    dError("failed to init vnode since %s", tstrerror(code));
×
744
    goto _OVER;
×
745
  }
746
  tmsgReportStartup("vnode-commit", "initialized");
853✔
747

748
  if ((code = vmStartWorker(pMgmt)) != 0) {
853!
749
    dError("failed to init workers since %s", tstrerror(code));
×
750
    goto _OVER;
×
751
  }
752
  tmsgReportStartup("vnode-worker", "initialized");
853✔
753

754
  if ((code = vmOpenVnodes(pMgmt)) != 0) {
853!
755
    dError("failed to open all vnodes since %s", tstrerror(code));
×
756
    goto _OVER;
×
757
  }
758
  tmsgReportStartup("vnode-vnodes", "initialized");
853✔
759

760
  if ((code = udfcOpen()) != 0) {
853!
761
    dError("failed to open udfc in vnode since %s", tstrerror(code));
×
762
    goto _OVER;
×
763
  }
764

765
  code = 0;
853✔
766

767
_OVER:
853✔
768
  if (code == 0) {
853!
769
    pOutput->pMgmt = pMgmt;
853✔
770
  } else {
771
    dError("failed to init vnodes-mgmt since %s", tstrerror(code));
×
772
    vmCleanup(pMgmt);
×
773
  }
774

775
  return code;
853✔
776
}
777

778
static int32_t vmRequire(const SMgmtInputOpt *pInput, bool *required) {
874✔
779
  *required = tsNumOfSupportVnodes > 0;
874✔
780
  return 0;
874✔
781
}
782

783
static void *vmRestoreVnodeInThread(void *param) {
415✔
784
  SVnodeThread *pThread = param;
415✔
785
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
415✔
786

787
  dInfo("thread:%d, start to restore %d vnodes", pThread->threadIndex, pThread->vnodeNum);
415!
788
  setThreadName("restore-vnodes");
415✔
789

790
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
830✔
791
    SVnodeObj *pVnode = pThread->ppVnodes[v];
415✔
792
    if (pVnode->failed) {
415!
793
      dError("vgId:%d, cannot restore a vnode in failed mode.", pVnode->vgId);
×
794
      continue;
×
795
    }
796

797
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
415✔
798
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been restored", pVnode->vgId,
415✔
799
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
800
    tmsgReportStartup("vnode-restore", stepDesc);
415✔
801

802
    int32_t code = vnodeStart(pVnode->pImpl);
415✔
803
    if (code != 0) {
415!
804
      dError("vgId:%d, failed to restore vnode by thread:%d", pVnode->vgId, pThread->threadIndex);
×
805
      pThread->failed++;
×
806
    } else {
807
      dInfo("vgId:%d, is restored by thread:%d", pVnode->vgId, pThread->threadIndex);
415!
808
      pThread->opened++;
415✔
809
      (void)atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
415✔
810
    }
811
  }
812

813
  dInfo("thread:%d, numOfVnodes:%d, restored:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
415!
814
        pThread->failed);
815
  return NULL;
415✔
816
}
817

818
static int32_t vmStartVnodes(SVnodeMgmt *pMgmt) {
853✔
819
  int32_t     code = 0;
853✔
820
  int32_t     numOfVnodes = 0;
853✔
821
  SVnodeObj **ppVnodes = NULL;
853✔
822
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
853✔
823
  if (code != 0) {
853!
824
    dError("failed to get vnode list since %s", tstrerror(code));
×
825
    return code;
×
826
  }
827

828
  int32_t threadNum = tsNumOfCores / 2;
853✔
829
  if (threadNum < 1) threadNum = 1;
853!
830
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
853✔
831

832
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
853✔
833
  if (threads == NULL) {
853!
834
    return terrno;
×
835
  }
836

837
  for (int32_t t = 0; t < threadNum; ++t) {
17,913✔
838
    threads[t].threadIndex = t;
17,060✔
839
    threads[t].pMgmt = pMgmt;
17,060✔
840
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
17,060✔
841
    if (threads[t].ppVnodes == NULL) {
17,060!
842
      code = terrno;
×
843
      break;
×
844
    }
845
  }
846

847
  for (int32_t v = 0; v < numOfVnodes; ++v) {
1,268✔
848
    int32_t       t = v % threadNum;
415✔
849
    SVnodeThread *pThread = &threads[t];
415✔
850
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
415!
851
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
415✔
852
    }
853
  }
854

855
  pMgmt->state.openVnodes = 0;
853✔
856
  pMgmt->state.dropVnodes = 0;
853✔
857
  dInfo("restore %d vnodes with %d threads", numOfVnodes, threadNum);
853!
858

859
  for (int32_t t = 0; t < threadNum; ++t) {
17,913✔
860
    SVnodeThread *pThread = &threads[t];
17,060✔
861
    if (pThread->vnodeNum == 0) continue;
17,060✔
862

863
    TdThreadAttr thAttr;
864
    (void)taosThreadAttrInit(&thAttr);
415✔
865
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
415✔
866
    if (taosThreadCreate(&pThread->thread, &thAttr, vmRestoreVnodeInThread, pThread) != 0) {
415!
867
      dError("thread:%d, failed to create thread to restore vnode since %s", pThread->threadIndex, strerror(errno));
×
868
    }
869

870
    (void)taosThreadAttrDestroy(&thAttr);
415✔
871
  }
872

873
  for (int32_t t = 0; t < threadNum; ++t) {
17,913✔
874
    SVnodeThread *pThread = &threads[t];
17,060✔
875
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
17,060!
876
      (void)taosThreadJoin(pThread->thread, NULL);
415✔
877
      taosThreadClear(&pThread->thread);
415✔
878
    }
879
    taosMemoryFree(pThread->ppVnodes);
17,060✔
880
  }
881
  taosMemoryFree(threads);
853✔
882

883
  for (int32_t i = 0; i < numOfVnodes; ++i) {
1,268✔
884
    if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
415!
885
    vmReleaseVnode(pMgmt, ppVnodes[i]);
415✔
886
  }
887

888
  if (ppVnodes != NULL) {
853!
889
    taosMemoryFree(ppVnodes);
853✔
890
  }
891

892
  return vmInitTimer(pMgmt);
853✔
893

894
_exit:
895
  for (int32_t t = 0; t < threadNum; ++t) {
896
    SVnodeThread *pThread = &threads[t];
897
    taosMemoryFree(pThread->ppVnodes);
898
  }
899
  taosMemoryFree(threads);
900
  return code;
901
}
902

903
static void vmStop(SVnodeMgmt *pMgmt) { vmCleanupTimer(pMgmt); }
853✔
904

905
SMgmtFunc vmGetMgmtFunc() {
874✔
906
  SMgmtFunc mgmtFunc = {0};
874✔
907
  mgmtFunc.openFp = vmInit;
874✔
908
  mgmtFunc.closeFp = (NodeCloseFp)vmCleanup;
874✔
909
  mgmtFunc.startFp = (NodeStartFp)vmStartVnodes;
874✔
910
  mgmtFunc.stopFp = (NodeStopFp)vmStop;
874✔
911
  mgmtFunc.requiredFp = vmRequire;
874✔
912
  mgmtFunc.getHandlesFp = vmGetMsgHandles;
874✔
913

914
  return mgmtFunc;
874✔
915
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc