• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3796

31 Mar 2025 10:39AM UTC coverage: 30.372% (-7.1%) from 37.443%
#3796

push

travis-ci

happyguoxy
test:add test cases

69287 of 309062 branches covered (22.42%)

Branch coverage included in aggregate %.

118044 of 307720 relevant lines covered (38.36%)

278592.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.14
/source/dnode/mgmt/mgmt_vnode/src/vmInt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "vmInt.h"
18
#include "libs/function/tudf.h"
19
#include "osMemory.h"
20
#include "tfs.h"
21
#include "vnd.h"
22

23
int32_t vmGetPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
21✔
24
  int32_t    diskId = -1;
21✔
25
  SVnodeObj *pVnode = NULL;
21✔
26

27
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
21✔
28
  int32_t r = taosHashGetDup(pMgmt->runngingHash, &vgId, sizeof(int32_t), (void *)&pVnode);
21✔
29
  if (pVnode != NULL) {
21!
30
    diskId = pVnode->diskPrimary;
×
31
  }
32
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
21✔
33
  return diskId;
21✔
34
}
35

36
static void vmFreeVnodeObj(SVnodeObj **ppVnode) {
42✔
37
  if (!ppVnode || !(*ppVnode)) return;
42!
38

39
  SVnodeObj *pVnode = *ppVnode;
42✔
40

41
  int32_t refCount = atomic_load_32(&pVnode->refCount);
42✔
42
  while (refCount > 0) {
42!
43
    dWarn("vgId:%d, vnode is refenced, retry to free in 200ms, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
×
44
    taosMsleep(200);
×
45
    refCount = atomic_load_32(&pVnode->refCount);
×
46
  }
47

48
  taosMemoryFree(pVnode->path);
42!
49
  taosMemoryFree(pVnode);
42!
50
  ppVnode[0] = NULL;
42✔
51
}
52

53
static int32_t vmRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId, int32_t diskId) {
21✔
54
  int32_t    code = 0;
21✔
55
  SVnodeObj *pCreatingVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
21!
56
  if (pCreatingVnode == NULL) {
21!
57
    dError("failed to alloc vnode since %s", terrstr());
×
58
    return terrno;
×
59
  }
60
  (void)memset(pCreatingVnode, 0, sizeof(SVnodeObj));
21✔
61

62
  pCreatingVnode->vgId = vgId;
21✔
63
  pCreatingVnode->diskPrimary = diskId;
21✔
64

65
  code = taosThreadRwlockWrlock(&pMgmt->hashLock);
21✔
66
  if (code != 0) {
21!
67
    taosMemoryFree(pCreatingVnode);
×
68
    return code;
×
69
  }
70

71
  dTrace("vgId:%d, put vnode into creating hash, pCreatingVnode:%p", vgId, pCreatingVnode);
21✔
72
  code = taosHashPut(pMgmt->creatingHash, &vgId, sizeof(int32_t), &pCreatingVnode, sizeof(SVnodeObj *));
21✔
73
  if (code != 0) {
21!
74
    dError("vgId:%d, failed to put vnode to creatingHash", vgId);
×
75
    taosMemoryFree(pCreatingVnode);
×
76
  }
77

78
  int32_t r = taosThreadRwlockUnlock(&pMgmt->hashLock);
21✔
79
  if (r != 0) {
21!
80
    dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r));
×
81
  }
82

83
  return code;
21✔
84
}
85

86
static void vmUnRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId) {
21✔
87
  SVnodeObj *pOld = NULL;
21✔
88

89
  (void)taosThreadRwlockWrlock(&pMgmt->hashLock);
21✔
90
  int32_t r = taosHashGetDup(pMgmt->creatingHash, &vgId, sizeof(int32_t), (void *)&pOld);
21✔
91
  if (r != 0) {
21!
92
    dError("vgId:%d, failed to get vnode from creating Hash", vgId);
×
93
  }
94
  dTrace("vgId:%d, remove from creating Hash", vgId);
21✔
95
  r = taosHashRemove(pMgmt->creatingHash, &vgId, sizeof(int32_t));
21✔
96
  if (r != 0) {
21!
97
    dError("vgId:%d, failed to remove vnode from creatingHash", vgId);
×
98
  }
99
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
21✔
100

101
  if (pOld) {
21!
102
    dTrace("vgId:%d, free vnode pOld:%p", vgId, &pOld);
21✔
103
    vmFreeVnodeObj(&pOld);
21✔
104
  }
105

106
_OVER:
×
107
  if (r != 0) {
21!
108
    dError("vgId:%d, failed to remove vnode from creatingHash since %s", vgId, tstrerror(r));
×
109
  }
110
}
21✔
111

112
int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
21✔
113
  int32_t code = 0;
21✔
114
  STfs   *pTfs = pMgmt->pTfs;
21✔
115
  int32_t diskId = 0;
21✔
116
  if (!pTfs) {
21!
117
    return diskId;
×
118
  }
119

120
  // search fs
121
  char vnodePath[TSDB_FILENAME_LEN] = {0};
21✔
122
  snprintf(vnodePath, TSDB_FILENAME_LEN - 1, "vnode%svnode%d", TD_DIRSEP, vgId);
21✔
123
  char fname[TSDB_FILENAME_LEN] = {0};
21✔
124
  char fnameTmp[TSDB_FILENAME_LEN] = {0};
21✔
125
  snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME);
21✔
126
  snprintf(fnameTmp, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME_TMP);
21✔
127

128
  diskId = tfsSearch(pTfs, 0, fname);
21✔
129
  if (diskId >= 0) {
21!
130
    return diskId;
×
131
  }
132
  diskId = tfsSearch(pTfs, 0, fnameTmp);
21✔
133
  if (diskId >= 0) {
21!
134
    return diskId;
×
135
  }
136

137
  // alloc
138
  int32_t     disks[TFS_MAX_DISKS_PER_TIER] = {0};
21✔
139
  int32_t     numOfVnodes = 0;
21✔
140
  SVnodeObj **ppVnodes = NULL;
21✔
141

142
  code = taosThreadMutexLock(&pMgmt->mutex);
21✔
143
  if (code != 0) {
21!
144
    return code;
×
145
  }
146

147
  code = vmGetAllVnodeListFromHashWithCreating(pMgmt, &numOfVnodes, &ppVnodes);
21✔
148
  if (code != 0) {
21!
149
    int32_t r = taosThreadMutexUnlock(&pMgmt->mutex);
×
150
    if (r != 0) {
×
151
      dError("vgId:%d, failed to unlock mutex since %s", vgId, tstrerror(r));
×
152
    }
153
    return code;
×
154
  }
155

156
  for (int32_t v = 0; v < numOfVnodes; v++) {
31✔
157
    SVnodeObj *pVnode = ppVnodes[v];
10✔
158
    disks[pVnode->diskPrimary] += 1;
10✔
159
  }
160

161
  int32_t minVal = INT_MAX;
21✔
162
  int32_t ndisk = tfsGetDisksAtLevel(pTfs, 0);
21✔
163
  diskId = 0;
21✔
164
  for (int32_t id = 0; id < ndisk; id++) {
42✔
165
    if (minVal > disks[id]) {
21!
166
      minVal = disks[id];
21✔
167
      diskId = id;
21✔
168
    }
169
  }
170
  code = vmRegisterCreatingState(pMgmt, vgId, diskId);
21✔
171
  if (code != 0) {
21!
172
    int32_t r = taosThreadMutexUnlock(&pMgmt->mutex);
×
173
    if (r != 0) {
×
174
      dError("vgId:%d, failed to unlock mutex since %s", vgId, tstrerror(r));
×
175
    }
176
    goto _OVER;
×
177
  }
178

179
  code = taosThreadMutexUnlock(&pMgmt->mutex);
21✔
180
  if (code != 0) {
21!
181
    goto _OVER;
×
182
  }
183

184
_OVER:
21✔
185

186
  for (int32_t i = 0; i < numOfVnodes; ++i) {
31✔
187
    if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
10!
188
    vmReleaseVnode(pMgmt, ppVnodes[i]);
10✔
189
  }
190
  if (ppVnodes != NULL) {
21!
191
    taosMemoryFree(ppVnodes);
21!
192
  }
193

194
  if (code != 0) {
21!
195
    dError("vgId:%d, failed to alloc disk since %s", vgId, tstrerror(code));
×
196
    return code;
×
197
  } else {
198
    dInfo("vgId:%d, alloc disk:%d of level 0. ndisk:%d, vnodes: %d", vgId, diskId, ndisk, numOfVnodes);
21!
199
    return diskId;
21✔
200
  }
201
}
202

203
void vmCleanPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) { vmUnRegisterCreatingState(pMgmt, vgId); }
21✔
204

205
SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict) {
316✔
206
  SVnodeObj *pVnode = NULL;
316✔
207

208
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
316✔
209
  int32_t r = taosHashGetDup(pMgmt->runngingHash, &vgId, sizeof(int32_t), (void *)&pVnode);
316✔
210
  if (pVnode == NULL || strict && (pVnode->dropped || pVnode->failed)) {
316!
211
    terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
40✔
212
    pVnode = NULL;
40✔
213
  } else {
214
    int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
276✔
215
    dTrace("vgId:%d, acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
276✔
216
  }
217
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
316✔
218

219
  return pVnode;
316✔
220
}
221

222
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) { return vmAcquireVnodeImpl(pMgmt, vgId, true); }
279✔
223

224
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
361✔
225
  if (pVnode == NULL) return;
361!
226

227
  //(void)taosThreadRwlockRdlock(&pMgmt->lock);
228
  int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
361✔
229
  dTrace("vgId:%d, release vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
361✔
230
  //(void)taosThreadRwlockUnlock(&pMgmt->lock);
231
}
232

233
static int32_t vmRegisterRunningState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
21✔
234
  SVnodeObj *pOld = NULL;
21✔
235

236
  int32_t r = taosHashGetDup(pMgmt->runngingHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
21✔
237
  if (r != 0) {
21!
238
    dError("vgId:%d, failed to get vnode from hash", pVnode->vgId);
×
239
  }
240
  if (pOld) {
21!
241
    vmFreeVnodeObj(&pOld);
×
242
  }
243
  int32_t code = taosHashPut(pMgmt->runngingHash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
21✔
244

245
  return code;
21✔
246
}
247

248
static void vmUnRegisterRunningState(SVnodeMgmt *pMgmt, int32_t vgId) {
21✔
249
  dInfo("vgId:%d, remove from hash", vgId);
21!
250
  int32_t r = taosHashRemove(pMgmt->runngingHash, &vgId, sizeof(int32_t));
21✔
251
  if (r != 0) {
21!
252
    dError("vgId:%d, failed to remove vnode since %s", vgId, tstrerror(r));
×
253
  }
254
}
21✔
255

256
static int32_t vmRegisterClosedState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
×
257
  int32_t    code = 0;
×
258
  SVnodeObj *pClosedVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
×
259
  if (pClosedVnode == NULL) {
×
260
    dError("failed to alloc vnode since %s", terrstr());
×
261
    return terrno;
×
262
  }
263
  (void)memset(pClosedVnode, 0, sizeof(SVnodeObj));
×
264

265
  pClosedVnode->vgId = pVnode->vgId;
×
266
  pClosedVnode->dropped = pVnode->dropped;
×
267
  pClosedVnode->vgVersion = pVnode->vgVersion;
×
268
  pClosedVnode->diskPrimary = pVnode->diskPrimary;
×
269
  pClosedVnode->toVgId = pVnode->toVgId;
×
270

271
  SVnodeObj *pOld = NULL;
×
272
  int32_t    r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
×
273
  if (r != 0) {
×
274
    dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
×
275
  }
276
  if (pOld) {
×
277
    vmFreeVnodeObj(&pOld);
×
278
  }
279
  dInfo("vgId:%d, put vnode to closedHash", pVnode->vgId);
×
280
  r = taosHashPut(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), &pClosedVnode, sizeof(SVnodeObj *));
×
281
  if (r != 0) {
×
282
    dError("vgId:%d, failed to put vnode to closedHash", pVnode->vgId);
×
283
  }
284

285
  return code;
×
286
}
287

288
static void vmUnRegisterClosedState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
21✔
289
  SVnodeObj *pOld = NULL;
21✔
290
  int32_t    r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
21✔
291
  if (r != 0) {
21!
292
    dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
×
293
  }
294
  if (pOld != NULL) {
21!
295
    vmFreeVnodeObj(&pOld);
×
296
    dInfo("vgId:%d, remove from closedHash", pVnode->vgId);
×
297
    r = taosHashRemove(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t));
×
298
    if (r != 0) {
×
299
      dError("vgId:%d, failed to remove vnode from hash", pVnode->vgId);
×
300
    }
301
  }
302
}
21✔
303

304
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
21✔
305
  SVnodeObj *pVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
21!
306
  if (pVnode == NULL) {
21!
307
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
308
    return -1;
×
309
  }
310

311
  pVnode->vgId = pCfg->vgId;
21✔
312
  pVnode->vgVersion = pCfg->vgVersion;
21✔
313
  pVnode->diskPrimary = pCfg->diskPrimary;
21✔
314
  pVnode->refCount = 0;
21✔
315
  pVnode->dropped = 0;
21✔
316
  pVnode->failed = 0;
21✔
317
  pVnode->path = taosStrdup(pCfg->path);
21!
318
  pVnode->pImpl = pImpl;
21✔
319

320
  if (pVnode->path == NULL) {
21!
321
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
322
    taosMemoryFree(pVnode);
×
323
    return -1;
×
324
  }
325

326
  if (pImpl) {
21!
327
    if (vmAllocQueue(pMgmt, pVnode) != 0) {
21!
328
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
329
      taosMemoryFree(pVnode->path);
×
330
      taosMemoryFree(pVnode);
×
331
      return -1;
×
332
    }
333
  } else {
334
    pVnode->failed = 1;
×
335
  }
336

337
  (void)taosThreadRwlockWrlock(&pMgmt->hashLock);
21✔
338
  int32_t code = vmRegisterRunningState(pMgmt, pVnode);
21✔
339
  vmUnRegisterClosedState(pMgmt, pVnode);
21✔
340
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
21✔
341

342
  return code;
21✔
343
}
344

345
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal, bool keepClosed) {
21✔
346
  char path[TSDB_FILENAME_LEN] = {0};
21✔
347
  bool atExit = true;
21✔
348

349
  if (pVnode->pImpl && vnodeIsLeader(pVnode->pImpl)) {
21!
350
    vnodeProposeCommitOnNeed(pVnode->pImpl, atExit);
21✔
351
  }
352

353
  (void)taosThreadRwlockWrlock(&pMgmt->hashLock);
21✔
354
  vmUnRegisterRunningState(pMgmt, pVnode->vgId);
21✔
355
  if (keepClosed) {
21!
356
    if (vmRegisterClosedState(pMgmt, pVnode) != 0) {
×
357
      (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
358
      return;
×
359
    };
360
  }
361
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
21✔
362

363
  vmReleaseVnode(pMgmt, pVnode);
21✔
364

365
  if (pVnode->failed) {
21!
366
    goto _closed;
×
367
  }
368
  dInfo("vgId:%d, pre close", pVnode->vgId);
21!
369
  vnodePreClose(pVnode->pImpl);
21✔
370

371
  dInfo("vgId:%d, wait for vnode ref become 0", pVnode->vgId);
21!
372
  while (pVnode->refCount > 0) taosMsleep(10);
21!
373

374
  dInfo("vgId:%d, wait for vnode write queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteW.queue,
21!
375
        taosQueueGetThreadId(pVnode->pWriteW.queue));
376
  tMultiWorkerCleanup(&pVnode->pWriteW);
21✔
377

378
  dInfo("vgId:%d, wait for vnode sync queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue,
21!
379
        taosQueueGetThreadId(pVnode->pSyncW.queue));
380
  tMultiWorkerCleanup(&pVnode->pSyncW);
21✔
381

382
  dInfo("vgId:%d, wait for vnode sync rd queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncRdW.queue,
21!
383
        taosQueueGetThreadId(pVnode->pSyncRdW.queue));
384
  tMultiWorkerCleanup(&pVnode->pSyncRdW);
21✔
385

386
  dInfo("vgId:%d, wait for vnode apply queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue,
21!
387
        taosQueueGetThreadId(pVnode->pApplyW.queue));
388
  tMultiWorkerCleanup(&pVnode->pApplyW);
21✔
389

390
  dInfo("vgId:%d, wait for vnode fetch queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
21!
391
        taosQueueGetThreadId(pVnode->pFetchQ));
392
  while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
21!
393

394
  dInfo("vgId:%d, wait for vnode query queue:%p is empty", pVnode->vgId, pVnode->pQueryQ);
21!
395
  while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
21!
396

397
  tqNotifyClose(pVnode->pImpl->pTq);
21✔
398

399
  dInfo("vgId:%d, wait for vnode stream queue:%p is empty, %d remains", pVnode->vgId,
21!
400
        pVnode->pStreamQ, taosQueueItemSize(pVnode->pStreamQ));
401
  while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(50);
21!
402

403
  dInfo("vgId:%d, wait for vnode stream ctrl queue:%p is empty", pVnode->vgId, pVnode->pStreamCtrlQ);
21!
404
  while (!taosQueueEmpty(pVnode->pStreamCtrlQ)) taosMsleep(50);
21!
405

406
  dInfo("vgId:%d, wait for vnode stream long-exec queue:%p is empty, %d remains", pVnode->vgId,
21!
407
        pVnode->pStreamLongExecQ, taosQueueItemSize(pVnode->pStreamLongExecQ));
408
  while (!taosQueueEmpty(pVnode->pStreamLongExecQ)) taosMsleep(50);
21!
409

410
  dInfo("vgId:%d, wait for vnode stream chkpt queue:%p is empty", pVnode->vgId, pVnode->pStreamChkQ);
21!
411
  while (!taosQueueEmpty(pVnode->pStreamChkQ)) taosMsleep(10);
21!
412

413
  dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId);
21!
414

415
  dInfo("vgId:%d, post close", pVnode->vgId);
21!
416
  vnodePostClose(pVnode->pImpl);
21✔
417

418
  vmFreeQueue(pMgmt, pVnode);
21✔
419

420
  if (commitAndRemoveWal) {
21!
421
    dInfo("vgId:%d, commit data for vnode split", pVnode->vgId);
×
422
    if (vnodeSyncCommit(pVnode->pImpl) != 0) {
×
423
      dError("vgId:%d, failed to commit data", pVnode->vgId);
×
424
    }
425
    if (vnodeBegin(pVnode->pImpl) != 0) {
×
426
      dError("vgId:%d, failed to begin", pVnode->vgId);
×
427
    }
428
    dInfo("vgId:%d, commit data finished", pVnode->vgId);
×
429
  }
430

431
  int32_t nodeId = vnodeNodeId(pVnode->pImpl);
21✔
432
  vnodeClose(pVnode->pImpl);
21✔
433
  pVnode->pImpl = NULL;
21✔
434

435
_closed:
21✔
436
  dInfo("vgId:%d, vnode is closed", pVnode->vgId);
21!
437

438
  if (commitAndRemoveWal) {
21!
439
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d%swal", TD_DIRSEP, pVnode->vgId, TD_DIRSEP);
×
440
    dInfo("vgId:%d, remove all wals, path:%s", pVnode->vgId, path);
×
441
    if (tfsRmdir(pMgmt->pTfs, path) != 0) {
×
442
      dTrace("vgId:%d, failed to remove wals, path:%s", pVnode->vgId, path);
×
443
    }
444
    if (tfsMkdir(pMgmt->pTfs, path) != 0) {
×
445
      dTrace("vgId:%d, failed to create wals, path:%s", pVnode->vgId, path);
×
446
    }
447
  }
448

449
  if (pVnode->dropped) {
21✔
450
    dInfo("vgId:%d, vnode is destroyed, dropped:%d", pVnode->vgId, pVnode->dropped);
16!
451
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId);
16✔
452
    vnodeDestroy(pVnode->vgId, path, pMgmt->pTfs, nodeId);
16✔
453
  }
454

455
  vmFreeVnodeObj(&pVnode);
21✔
456
}
457

458
void vmCloseFailedVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
×
459
  int32_t r = 0;
×
460
  r = taosThreadRwlockWrlock(&pMgmt->hashLock);
×
461
  if (r != 0) {
×
462
    dError("vgId:%d, failed to lock since %s", vgId, tstrerror(r));
×
463
  }
464
  if (r == 0) {
×
465
    vmUnRegisterRunningState(pMgmt, vgId);
×
466
  }
467
  r = taosThreadRwlockUnlock(&pMgmt->hashLock);
×
468
  if (r != 0) {
×
469
    dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r));
×
470
  }
471
}
×
472

473
static int32_t vmRestoreVgroupId(SWrapperCfg *pCfg, STfs *pTfs) {
×
474
  int32_t srcVgId = pCfg->vgId;
×
475
  int32_t dstVgId = pCfg->toVgId;
×
476
  if (dstVgId == 0) return 0;
×
477

478
  char srcPath[TSDB_FILENAME_LEN];
479
  char dstPath[TSDB_FILENAME_LEN];
480

481
  snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
×
482
  snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
×
483

484
  int32_t diskPrimary = pCfg->diskPrimary;
×
485
  int32_t vgId = vnodeRestoreVgroupId(srcPath, dstPath, srcVgId, dstVgId, diskPrimary, pTfs);
×
486
  if (vgId <= 0) {
×
487
    dError("vgId:%d, failed to restore vgroup id. srcPath: %s", pCfg->vgId, srcPath);
×
488
    return -1;
×
489
  }
490

491
  pCfg->vgId = vgId;
×
492
  pCfg->toVgId = 0;
×
493
  return 0;
×
494
}
495

496
static void *vmOpenVnodeInThread(void *param) {
×
497
  SVnodeThread *pThread = param;
×
498
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
×
499
  char          path[TSDB_FILENAME_LEN];
500

501
  dInfo("thread:%d, start to open or destroy %d vnodes", pThread->threadIndex, pThread->vnodeNum);
×
502
  setThreadName("open-vnodes");
×
503

504
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
×
505
    SWrapperCfg *pCfg = &pThread->pCfgs[v];
×
506
    if (pCfg->dropped) {
×
507
      char stepDesc[TSDB_STEP_DESC_LEN] = {0};
×
508
      snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to destroy, %d of %d have been dropped", pCfg->vgId,
×
509
               pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
510
      tmsgReportStartup("vnode-destroy", stepDesc);
×
511

512
      snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
×
513
      vnodeDestroy(pCfg->vgId, path, pMgmt->pTfs, 0);
×
514
      pThread->updateVnodesList = true;
×
515
      pThread->dropped++;
×
516
      (void)atomic_add_fetch_32(&pMgmt->state.dropVnodes, 1);
×
517
      continue;
×
518
    }
519

520
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
×
521
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
×
522
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
523
    tmsgReportStartup("vnode-open", stepDesc);
×
524

525
    if (pCfg->toVgId) {
×
526
      if (vmRestoreVgroupId(pCfg, pMgmt->pTfs) != 0) {
×
527
        dError("vgId:%d, failed to restore vgroup id by thread:%d", pCfg->vgId, pThread->threadIndex);
×
528
        pThread->failed++;
×
529
        continue;
×
530
      }
531
      pThread->updateVnodesList = true;
×
532
    }
533

534
    int32_t diskPrimary = pCfg->diskPrimary;
×
535
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
×
536

537
    SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, false);
×
538

539
    if (pImpl == NULL) {
×
540
      dError("vgId:%d, failed to open vnode by thread:%d since %s", pCfg->vgId, pThread->threadIndex, terrstr());
×
541
      if (terrno != TSDB_CODE_NEED_RETRY) {
×
542
        pThread->failed++;
×
543
        continue;
×
544
      }
545
    }
546

547
    if (vmOpenVnode(pMgmt, pCfg, pImpl) != 0) {
×
548
      dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
×
549
      pThread->failed++;
×
550
      continue;
×
551
    }
552

553
    dInfo("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
×
554
    pThread->opened++;
×
555
    (void)atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
×
556
  }
557

558
  dInfo("thread:%d, numOfVnodes:%d, opened:%d dropped:%d failed:%d", pThread->threadIndex, pThread->vnodeNum,
×
559
        pThread->opened, pThread->dropped, pThread->failed);
560
  return NULL;
×
561
}
562

563
static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
9✔
564
  pMgmt->runngingHash =
9✔
565
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
9✔
566
  if (pMgmt->runngingHash == NULL) {
9!
567
    dError("failed to init vnode hash since %s", terrstr());
×
568
    return TSDB_CODE_OUT_OF_MEMORY;
×
569
  }
570

571
  pMgmt->closedHash =
9✔
572
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
9✔
573
  if (pMgmt->closedHash == NULL) {
9!
574
    dError("failed to init vnode closed hash since %s", terrstr());
×
575
    return TSDB_CODE_OUT_OF_MEMORY;
×
576
  }
577

578
  pMgmt->creatingHash =
9✔
579
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
9✔
580
  if (pMgmt->creatingHash == NULL) {
9!
581
    dError("failed to init vnode creatingHash hash since %s", terrstr());
×
582
    return TSDB_CODE_OUT_OF_MEMORY;
×
583
  }
584

585
  SWrapperCfg *pCfgs = NULL;
9✔
586
  int32_t      numOfVnodes = 0;
9✔
587
  int32_t      code = 0;
9✔
588
  if ((code = vmGetVnodeListFromFile(pMgmt, &pCfgs, &numOfVnodes)) != 0) {
9!
589
    dInfo("failed to get vnode list from disk since %s", tstrerror(code));
×
590
    return code;
×
591
  }
592

593
  pMgmt->state.totalVnodes = numOfVnodes;
9✔
594

595
  int32_t threadNum = tsNumOfCores / 2;
9✔
596
  if (threadNum < 1) threadNum = 1;
9!
597
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
9✔
598

599
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
9!
600
  if (threads == NULL) {
9!
601
    dError("failed to allocate memory for threads since %s", terrstr());
×
602
    taosMemoryFree(pCfgs);
×
603
    return terrno;
×
604
  }
605

606
  for (int32_t t = 0; t < threadNum; ++t) {
189✔
607
    threads[t].threadIndex = t;
180✔
608
    threads[t].pMgmt = pMgmt;
180✔
609
    threads[t].pCfgs = taosMemoryCalloc(vnodesPerThread, sizeof(SWrapperCfg));
180!
610
  }
611

612
  for (int32_t v = 0; v < numOfVnodes; ++v) {
9!
613
    int32_t       t = v % threadNum;
×
614
    SVnodeThread *pThread = &threads[t];
×
615
    pThread->pCfgs[pThread->vnodeNum++] = pCfgs[v];
×
616
  }
617

618
  dInfo("open %d vnodes with %d threads", numOfVnodes, threadNum);
9!
619

620
  for (int32_t t = 0; t < threadNum; ++t) {
189✔
621
    SVnodeThread *pThread = &threads[t];
180✔
622
    if (pThread->vnodeNum == 0) continue;
180!
623

624
    TdThreadAttr thAttr;
625
    (void)taosThreadAttrInit(&thAttr);
×
626
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
×
627
#ifdef TD_COMPACT_OS
628
    (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
629
#endif
630
    if (taosThreadCreate(&pThread->thread, &thAttr, vmOpenVnodeInThread, pThread) != 0) {
×
631
      dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(ERRNO));
×
632
    }
633

634
    (void)taosThreadAttrDestroy(&thAttr);
×
635
  }
636

637
  bool updateVnodesList = false;
9✔
638

639
  for (int32_t t = 0; t < threadNum; ++t) {
189✔
640
    SVnodeThread *pThread = &threads[t];
180✔
641
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
180!
642
      (void)taosThreadJoin(pThread->thread, NULL);
×
643
      taosThreadClear(&pThread->thread);
×
644
    }
645
    taosMemoryFree(pThread->pCfgs);
180!
646
    if (pThread->updateVnodesList) updateVnodesList = true;
180!
647
  }
648
  taosMemoryFree(threads);
9!
649
  taosMemoryFree(pCfgs);
9!
650

651
  if ((pMgmt->state.openVnodes + pMgmt->state.dropVnodes) != pMgmt->state.totalVnodes) {
9!
652
    dError("there are total vnodes:%d, opened:%d", pMgmt->state.totalVnodes, pMgmt->state.openVnodes);
×
653
    return terrno = TSDB_CODE_VND_INIT_FAILED;
×
654
  }
655

656
  if (updateVnodesList && (code = vmWriteVnodeListToFile(pMgmt)) != 0) {
9!
657
    dError("failed to write vnode list since %s", tstrerror(code));
×
658
    return code;
×
659
  }
660

661
  dInfo("successfully opened %d vnodes", pMgmt->state.totalVnodes);
9!
662
  return 0;
9✔
663
}
664

665
static void *vmCloseVnodeInThread(void *param) {
5✔
666
  SVnodeThread *pThread = param;
5✔
667
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
5✔
668

669
  dInfo("thread:%d, start to close %d vnodes", pThread->threadIndex, pThread->vnodeNum);
5!
670
  setThreadName("close-vnodes");
5✔
671

672
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
10✔
673
    SVnodeObj *pVnode = pThread->ppVnodes[v];
5✔
674

675
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
5✔
676
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to close, %d of %d have been closed", pVnode->vgId,
5✔
677
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
678
    tmsgReportStartup("vnode-close", stepDesc);
5✔
679

680
    vmCloseVnode(pMgmt, pVnode, false, false);
5✔
681
  }
682

683
  dInfo("thread:%d, numOfVnodes:%d is closed", pThread->threadIndex, pThread->vnodeNum);
5!
684
  return NULL;
5✔
685
}
686

687
static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
9✔
688
  int32_t code = 0;
9✔
689
  dInfo("start to close all vnodes");
9!
690
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
9✔
691
  dInfo("vnodes mgmt worker is stopped");
9!
692
  tSingleWorkerCleanup(&pMgmt->mgmtMultiWorker);
9✔
693
  dInfo("vnodes multiple mgmt worker is stopped");
9!
694

695
  int32_t     numOfVnodes = 0;
9✔
696
  SVnodeObj **ppVnodes = NULL;
9✔
697
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
9✔
698
  if (code != 0) {
9!
699
    dError("failed to get vnode list since %s", tstrerror(code));
×
700
    return;
×
701
  }
702

703
  int32_t threadNum = tsNumOfCores / 2;
9✔
704
  if (threadNum < 1) threadNum = 1;
9!
705
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
9✔
706

707
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
9!
708
  for (int32_t t = 0; t < threadNum; ++t) {
189✔
709
    threads[t].threadIndex = t;
180✔
710
    threads[t].pMgmt = pMgmt;
180✔
711
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
180!
712
  }
713

714
  for (int32_t v = 0; v < numOfVnodes; ++v) {
14✔
715
    int32_t       t = v % threadNum;
5✔
716
    SVnodeThread *pThread = &threads[t];
5✔
717
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
5!
718
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
5✔
719
    }
720
  }
721

722
  pMgmt->state.openVnodes = 0;
9✔
723
  dInfo("close %d vnodes with %d threads", numOfVnodes, threadNum);
9!
724

725
  int64_t st = taosGetTimestampMs();
9✔
726
  dInfo("notify all streams closed in all %d vnodes, ts:%" PRId64, numOfVnodes, st);
9!
727
  if (ppVnodes != NULL) {
9!
728
    for (int32_t i = 0; i < numOfVnodes; ++i) {
14✔
729
      if (ppVnodes[i] != NULL) {
5!
730
        if (ppVnodes[i]->pImpl != NULL) {
5!
731
          tqNotifyClose(ppVnodes[i]->pImpl->pTq);
5✔
732
        }
733
      }
734
    }
735
  }
736

737
  int64_t et = taosGetTimestampMs();
9✔
738
  dInfo("notify close stream completed in %d vnodes, elapsed time: %" PRId64 "ms", numOfVnodes, et - st);
9!
739

740
  for (int32_t t = 0; t < threadNum; ++t) {
189✔
741
    SVnodeThread *pThread = &threads[t];
180✔
742
    if (pThread->vnodeNum == 0) continue;
180✔
743

744
    TdThreadAttr thAttr;
745
    (void)taosThreadAttrInit(&thAttr);
5✔
746
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
5✔
747
#ifdef TD_COMPACT_OS
748
    (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
749
#endif
750
    if (taosThreadCreate(&pThread->thread, &thAttr, vmCloseVnodeInThread, pThread) != 0) {
5!
751
      dError("thread:%d, failed to create thread to close vnode since %s", pThread->threadIndex, strerror(ERRNO));
×
752
    }
753

754
    (void)taosThreadAttrDestroy(&thAttr);
5✔
755
  }
756

757
  for (int32_t t = 0; t < threadNum; ++t) {
189✔
758
    SVnodeThread *pThread = &threads[t];
180✔
759
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
180!
760
      (void)taosThreadJoin(pThread->thread, NULL);
5✔
761
      taosThreadClear(&pThread->thread);
5✔
762
    }
763
    taosMemoryFree(pThread->ppVnodes);
180!
764
  }
765
  taosMemoryFree(threads);
9!
766

767
  if (ppVnodes != NULL) {
9!
768
    taosMemoryFree(ppVnodes);
9!
769
  }
770

771
  if (pMgmt->runngingHash != NULL) {
9!
772
    taosHashCleanup(pMgmt->runngingHash);
9✔
773
    pMgmt->runngingHash = NULL;
9✔
774
  }
775

776
  void *pIter = taosHashIterate(pMgmt->closedHash, NULL);
9✔
777
  while (pIter) {
9!
778
    SVnodeObj **ppVnode = pIter;
×
779
    vmFreeVnodeObj(ppVnode);
×
780
    pIter = taosHashIterate(pMgmt->closedHash, pIter);
×
781
  }
782

783
  if (pMgmt->closedHash != NULL) {
9!
784
    taosHashCleanup(pMgmt->closedHash);
9✔
785
    pMgmt->closedHash = NULL;
9✔
786
  }
787

788
  pIter = taosHashIterate(pMgmt->creatingHash, NULL);
9✔
789
  while (pIter) {
9!
790
    SVnodeObj **ppVnode = pIter;
×
791
    vmFreeVnodeObj(ppVnode);
×
792
    pIter = taosHashIterate(pMgmt->creatingHash, pIter);
×
793
  }
794

795
  if (pMgmt->creatingHash != NULL) {
9!
796
    taosHashCleanup(pMgmt->creatingHash);
9✔
797
    pMgmt->creatingHash = NULL;
9✔
798
  }
799

800
  dInfo("total vnodes:%d are all closed", numOfVnodes);
9!
801
}
802

803
static void vmCleanup(SVnodeMgmt *pMgmt) {
9✔
804
  vmCloseVnodes(pMgmt);
9✔
805
  vmStopWorker(pMgmt);
9✔
806
  vnodeCleanup();
9✔
807
  (void)taosThreadRwlockDestroy(&pMgmt->hashLock);
9✔
808
  (void)taosThreadMutexDestroy(&pMgmt->mutex);
9✔
809
  (void)taosThreadMutexDestroy(&pMgmt->fileLock);
9✔
810
  taosMemoryFree(pMgmt);
9!
811
}
9✔
812

813
static void vmCheckSyncTimeout(SVnodeMgmt *pMgmt) {
×
814
  int32_t     code = 0;
×
815
  int32_t     numOfVnodes = 0;
×
816
  SVnodeObj **ppVnodes = NULL;
×
817
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
×
818
  if (code != 0) {
×
819
    dError("failed to get vnode list since %s", tstrerror(code));
×
820
    return;
×
821
  }
822

823
  if (ppVnodes != NULL) {
×
824
    for (int32_t i = 0; i < numOfVnodes; ++i) {
×
825
      SVnodeObj *pVnode = ppVnodes[i];
×
826
      if (!pVnode->failed) {
×
827
        vnodeSyncCheckTimeout(pVnode->pImpl);
×
828
      }
829
      vmReleaseVnode(pMgmt, pVnode);
×
830
    }
831
    taosMemoryFree(ppVnodes);
×
832
  }
833
}
834

835
static void *vmThreadFp(void *param) {
9✔
836
  SVnodeMgmt *pMgmt = param;
9✔
837
  int64_t     lastTime = 0;
9✔
838
  setThreadName("vnode-timer");
9✔
839

840
  while (1) {
271✔
841
    lastTime++;
280✔
842
    taosMsleep(100);
280✔
843
    if (pMgmt->stop) break;
280✔
844
    if (lastTime % 10 != 0) continue;
271✔
845

846
    int64_t sec = lastTime / 10;
23✔
847
    if (sec % (VNODE_TIMEOUT_SEC / 2) == 0) {
23!
848
      vmCheckSyncTimeout(pMgmt);
×
849
    }
850
  }
851

852
  return NULL;
9✔
853
}
854

855
static int32_t vmInitTimer(SVnodeMgmt *pMgmt) {
9✔
856
  int32_t      code = 0;
9✔
857
  TdThreadAttr thAttr;
858
  (void)taosThreadAttrInit(&thAttr);
9✔
859
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
9✔
860
#ifdef TD_COMPACT_OS
861
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
862
#endif
863
  if (taosThreadCreate(&pMgmt->thread, &thAttr, vmThreadFp, pMgmt) != 0) {
9!
864
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
865
    dError("failed to create vnode timer thread since %s", tstrerror(code));
×
866
    return code;
×
867
  }
868

869
  (void)taosThreadAttrDestroy(&thAttr);
9✔
870
  return 0;
9✔
871
}
872

873
static void vmCleanupTimer(SVnodeMgmt *pMgmt) {
9✔
874
  pMgmt->stop = true;
9✔
875
  if (taosCheckPthreadValid(pMgmt->thread)) {
9!
876
    (void)taosThreadJoin(pMgmt->thread, NULL);
9✔
877
    taosThreadClear(&pMgmt->thread);
9✔
878
  }
879
}
9✔
880

881
static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
9✔
882
  int32_t code = -1;
9✔
883

884
  SVnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SVnodeMgmt));
9!
885
  if (pMgmt == NULL) {
9!
886
    code = terrno;
×
887
    goto _OVER;
×
888
  }
889

890
  pMgmt->pData = pInput->pData;
9✔
891
  pMgmt->path = pInput->path;
9✔
892
  pMgmt->name = pInput->name;
9✔
893
  pMgmt->msgCb = pInput->msgCb;
9✔
894
  pMgmt->msgCb.putToQueueFp = (PutToQueueFp)vmPutRpcMsgToQueue;
9✔
895
  pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
9✔
896
  pMgmt->msgCb.mgmt = pMgmt;
9✔
897

898
  code = taosThreadRwlockInit(&pMgmt->hashLock, NULL);
9✔
899
  if (code != 0) {
9!
900
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
901
    goto _OVER;
×
902
  }
903

904
  code = taosThreadMutexInit(&pMgmt->mutex, NULL);
9✔
905
  if (code != 0) {
9!
906
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
907
    goto _OVER;
×
908
  }
909

910
  code = taosThreadMutexInit(&pMgmt->fileLock, NULL);
9✔
911
  if (code != 0) {
9!
912
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
913
    goto _OVER;
×
914
  }
915

916
  pMgmt->pTfs = pInput->pTfs;
9✔
917
  if (pMgmt->pTfs == NULL) {
9!
918
    dError("tfs is null.");
×
919
    goto _OVER;
×
920
  }
921
  tmsgReportStartup("vnode-tfs", "initialized");
9✔
922
  if ((code = walInit(pInput->stopDnodeFp)) != 0) {
9!
923
    dError("failed to init wal since %s", tstrerror(code));
×
924
    goto _OVER;
×
925
  }
926

927
  tmsgReportStartup("vnode-wal", "initialized");
9✔
928

929
  if ((code = syncInit()) != 0) {
9!
930
    dError("failed to open sync since %s", tstrerror(code));
×
931
    goto _OVER;
×
932
  }
933
  tmsgReportStartup("vnode-sync", "initialized");
9✔
934

935
  if ((code = vnodeInit(pInput->stopDnodeFp)) != 0) {
9!
936
    dError("failed to init vnode since %s", tstrerror(code));
×
937
    goto _OVER;
×
938
  }
939
  tmsgReportStartup("vnode-commit", "initialized");
9✔
940

941
  if ((code = vmStartWorker(pMgmt)) != 0) {
9!
942
    dError("failed to init workers since %s", tstrerror(code));
×
943
    goto _OVER;
×
944
  }
945
  tmsgReportStartup("vnode-worker", "initialized");
9✔
946

947
  if ((code = vmOpenVnodes(pMgmt)) != 0) {
9!
948
    dError("failed to open all vnodes since %s", tstrerror(code));
×
949
    goto _OVER;
×
950
  }
951
  tmsgReportStartup("vnode-vnodes", "initialized");
9✔
952

953
  if ((code = udfcOpen()) != 0) {
9!
954
    dError("failed to open udfc in vnode since %s", tstrerror(code));
×
955
    goto _OVER;
×
956
  }
957

958
  code = 0;
9✔
959

960
_OVER:
9✔
961
  if (code == 0) {
9!
962
    pOutput->pMgmt = pMgmt;
9✔
963
  } else {
964
    dError("failed to init vnodes-mgmt since %s", tstrerror(code));
×
965
    vmCleanup(pMgmt);
×
966
  }
967

968
  return code;
9✔
969
}
970

971
static int32_t vmRequire(const SMgmtInputOpt *pInput, bool *required) {
9✔
972
  *required = tsNumOfSupportVnodes > 0;
9✔
973
  return 0;
9✔
974
}
975

976
static void *vmRestoreVnodeInThread(void *param) {
×
977
  SVnodeThread *pThread = param;
×
978
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
×
979

980
  dInfo("thread:%d, start to restore %d vnodes", pThread->threadIndex, pThread->vnodeNum);
×
981
  setThreadName("restore-vnodes");
×
982

983
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
×
984
    SVnodeObj *pVnode = pThread->ppVnodes[v];
×
985
    if (pVnode->failed) {
×
986
      dError("vgId:%d, cannot restore a vnode in failed mode.", pVnode->vgId);
×
987
      continue;
×
988
    }
989

990
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
×
991
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been restored", pVnode->vgId,
×
992
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
993
    tmsgReportStartup("vnode-restore", stepDesc);
×
994

995
    int32_t code = vnodeStart(pVnode->pImpl);
×
996
    if (code != 0) {
×
997
      dError("vgId:%d, failed to restore vnode by thread:%d", pVnode->vgId, pThread->threadIndex);
×
998
      pThread->failed++;
×
999
    } else {
1000
      dInfo("vgId:%d, is restored by thread:%d", pVnode->vgId, pThread->threadIndex);
×
1001
      pThread->opened++;
×
1002
      (void)atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
×
1003
    }
1004
  }
1005

1006
  dInfo("thread:%d, numOfVnodes:%d, restored:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
×
1007
        pThread->failed);
1008
  return NULL;
×
1009
}
1010

1011
static int32_t vmStartVnodes(SVnodeMgmt *pMgmt) {
9✔
1012
  int32_t     code = 0;
9✔
1013
  int32_t     numOfVnodes = 0;
9✔
1014
  SVnodeObj **ppVnodes = NULL;
9✔
1015
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
9✔
1016
  if (code != 0) {
9!
1017
    dError("failed to get vnode list since %s", tstrerror(code));
×
1018
    return code;
×
1019
  }
1020

1021
  int32_t threadNum = tsNumOfCores / 2;
9✔
1022
  if (threadNum < 1) threadNum = 1;
9!
1023
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
9✔
1024

1025
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
9!
1026
  if (threads == NULL) {
9!
1027
    return terrno;
×
1028
  }
1029

1030
  for (int32_t t = 0; t < threadNum; ++t) {
189✔
1031
    threads[t].threadIndex = t;
180✔
1032
    threads[t].pMgmt = pMgmt;
180✔
1033
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
180!
1034
    if (threads[t].ppVnodes == NULL) {
180!
1035
      code = terrno;
×
1036
      break;
×
1037
    }
1038
  }
1039

1040
  for (int32_t v = 0; v < numOfVnodes; ++v) {
9!
1041
    int32_t       t = v % threadNum;
×
1042
    SVnodeThread *pThread = &threads[t];
×
1043
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
×
1044
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
×
1045
    }
1046
  }
1047

1048
  pMgmt->state.openVnodes = 0;
9✔
1049
  pMgmt->state.dropVnodes = 0;
9✔
1050
  dInfo("restore %d vnodes with %d threads", numOfVnodes, threadNum);
9!
1051

1052
  for (int32_t t = 0; t < threadNum; ++t) {
189✔
1053
    SVnodeThread *pThread = &threads[t];
180✔
1054
    if (pThread->vnodeNum == 0) continue;
180!
1055

1056
    TdThreadAttr thAttr;
1057
    (void)taosThreadAttrInit(&thAttr);
×
1058
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
×
1059
    if (taosThreadCreate(&pThread->thread, &thAttr, vmRestoreVnodeInThread, pThread) != 0) {
×
1060
      dError("thread:%d, failed to create thread to restore vnode since %s", pThread->threadIndex, strerror(ERRNO));
×
1061
    }
1062

1063
    (void)taosThreadAttrDestroy(&thAttr);
×
1064
  }
1065

1066
  for (int32_t t = 0; t < threadNum; ++t) {
189✔
1067
    SVnodeThread *pThread = &threads[t];
180✔
1068
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
180!
1069
      (void)taosThreadJoin(pThread->thread, NULL);
×
1070
      taosThreadClear(&pThread->thread);
×
1071
    }
1072
    taosMemoryFree(pThread->ppVnodes);
180!
1073
  }
1074
  taosMemoryFree(threads);
9!
1075

1076
  for (int32_t i = 0; i < numOfVnodes; ++i) {
9!
1077
    if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
×
1078
    vmReleaseVnode(pMgmt, ppVnodes[i]);
×
1079
  }
1080

1081
  if (ppVnodes != NULL) {
9!
1082
    taosMemoryFree(ppVnodes);
9!
1083
  }
1084

1085
  return vmInitTimer(pMgmt);
9✔
1086

1087
_exit:
1088
  for (int32_t t = 0; t < threadNum; ++t) {
1089
    SVnodeThread *pThread = &threads[t];
1090
    taosMemoryFree(pThread->ppVnodes);
1091
  }
1092
  taosMemoryFree(threads);
1093
  return code;
1094
}
1095

1096
static void vmStop(SVnodeMgmt *pMgmt) { vmCleanupTimer(pMgmt); }
9✔
1097

1098
SMgmtFunc vmGetMgmtFunc() {
9✔
1099
  SMgmtFunc mgmtFunc = {0};
9✔
1100
  mgmtFunc.openFp = vmInit;
9✔
1101
  mgmtFunc.closeFp = (NodeCloseFp)vmCleanup;
9✔
1102
  mgmtFunc.startFp = (NodeStartFp)vmStartVnodes;
9✔
1103
  mgmtFunc.stopFp = (NodeStopFp)vmStop;
9✔
1104
  mgmtFunc.requiredFp = vmRequire;
9✔
1105
  mgmtFunc.getHandlesFp = vmGetMsgHandles;
9✔
1106

1107
  return mgmtFunc;
9✔
1108
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc