• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3545

02 Dec 2024 06:22AM UTC coverage: 60.839% (-0.04%) from 60.88%
#3545

push

travis-ci

web-flow
Merge pull request #28961 from taosdata/fix/refactor-vnode-management-open-vnode

fix/refactor-vnode-management-open-vnode

120592 of 253473 branches covered (47.58%)

Branch coverage included in aggregate %.

102 of 145 new or added lines in 3 files covered. (70.34%)

477 existing lines in 108 files now uncovered.

201840 of 276506 relevant lines covered (73.0%)

19392204.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.71
/source/dnode/mgmt/mgmt_vnode/src/vmInt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "vmInt.h"
18
#include "libs/function/tudf.h"
19
#include "osMemory.h"
20
#include "tfs.h"
21
#include "vnd.h"
22

23
int32_t vmGetPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
11,185✔
24
  int32_t    diskId = -1;
11,185✔
25
  SVnodeObj *pVnode = NULL;
11,185✔
26

27
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
11,185✔
28
  int32_t r = taosHashGetDup(pMgmt->runngingHash, &vgId, sizeof(int32_t), (void *)&pVnode);
11,185✔
29
  if (pVnode != NULL) {
11,185!
30
    diskId = pVnode->diskPrimary;
×
31
  }
32
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
11,185✔
33
  return diskId;
11,185✔
34
}
35

36
static void vmFreeVnodeObj(SVnodeObj **ppVnode) {
26,429✔
37
  if (!ppVnode || !(*ppVnode)) return;
26,429!
38

39
  SVnodeObj *pVnode = *ppVnode;
26,429✔
40

41
  int32_t refCount = atomic_load_32(&pVnode->refCount);
26,429✔
42
  while (refCount > 0) {
26,429!
NEW
43
    dWarn("vgId:%d, vnode is refenced, retry to free in 200ms, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
×
NEW
44
    taosMsleep(200);
×
NEW
45
    refCount = atomic_load_32(&pVnode->refCount);
×
46
  }
47

48
  taosMemoryFree(pVnode->path);
26,429✔
49
  taosMemoryFree(pVnode);
26,429✔
50
  ppVnode[0] = NULL;
26,429✔
51
}
52

53
static int32_t vmRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId, int32_t diskId) {
11,186✔
54
  int32_t    code = 0;
11,186✔
55
  SVnodeObj *pCreatingVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
11,186✔
56
  if (pCreatingVnode == NULL) {
11,186!
NEW
57
    dError("failed to alloc vnode since %s", terrstr());
×
NEW
58
    return terrno;
×
59
  }
60
  (void)memset(pCreatingVnode, 0, sizeof(SVnodeObj));
11,186✔
61

62
  pCreatingVnode->vgId = vgId;
11,186✔
63
  pCreatingVnode->diskPrimary = diskId;
11,186✔
64

65
  code = taosThreadRwlockWrlock(&pMgmt->lock);
11,186✔
66
  if (code != 0) {
11,186!
NEW
67
    taosMemoryFree(pCreatingVnode);
×
NEW
68
    return code;
×
69
  }
70

71
  dTrace("vgId:%d, put vnode into creating hash, pCreatingVnode:%p", vgId, pCreatingVnode);
11,186✔
72
  code = taosHashPut(pMgmt->creatingHash, &vgId, sizeof(int32_t), &pCreatingVnode, sizeof(SVnodeObj *));
11,186✔
73
  if (code != 0) {
11,186!
NEW
74
    dError("vgId:%d, failed to put vnode to creatingHash", vgId);
×
NEW
75
    taosMemoryFree(pCreatingVnode);
×
76
  }
77

78
  int32_t r = taosThreadRwlockUnlock(&pMgmt->lock);
11,186✔
79
  if (r != 0) {
11,186!
NEW
80
    dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r));
×
81
  }
82

83
  return code;
11,186✔
84
}
85

86
static void vmUnRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId) {
11,186✔
87
  SVnodeObj *pOld = NULL;
11,186✔
88

89
  (void)taosThreadRwlockWrlock(&pMgmt->lock);
11,186✔
90
  int32_t r = taosHashGetDup(pMgmt->creatingHash, &vgId, sizeof(int32_t), (void *)&pOld);
11,186✔
91
  if (r != 0) {
11,186!
NEW
92
    dError("vgId:%d, failed to get vnode from creating Hash", vgId);
×
93
  }
94
  dTrace("vgId:%d, remove from creating Hash", vgId);
11,186✔
95
  r = taosHashRemove(pMgmt->creatingHash, &vgId, sizeof(int32_t));
11,186✔
96
  if (r != 0) {
11,186!
NEW
97
    dError("vgId:%d, failed to remove vnode from hash", vgId);
×
98
  }
99
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
11,186✔
100

101
  if (pOld) {
11,186!
102
    dTrace("vgId:%d, free vnode pOld:%p", vgId, &pOld);
11,186✔
103
    vmFreeVnodeObj(&pOld);
11,186✔
104
  }
105

NEW
106
_OVER:
×
107
  if (r != 0) {
11,186!
NEW
108
    dError("vgId:%d, failed to remove vnode from creatingHash since %s", vgId, tstrerror(r));
×
109
  }
110
}
11,186✔
111

112
int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
11,183✔
113
  int32_t code = 0;
11,183✔
114
  STfs   *pTfs = pMgmt->pTfs;
11,183✔
115
  int32_t diskId = 0;
11,183✔
116
  if (!pTfs) {
11,183!
117
    return diskId;
×
118
  }
119

120
  // search fs
121
  char vnodePath[TSDB_FILENAME_LEN] = {0};
11,183✔
122
  snprintf(vnodePath, TSDB_FILENAME_LEN - 1, "vnode%svnode%d", TD_DIRSEP, vgId);
11,183✔
123
  char fname[TSDB_FILENAME_LEN] = {0};
11,183✔
124
  char fnameTmp[TSDB_FILENAME_LEN] = {0};
11,183✔
125
  snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME);
11,183✔
126
  snprintf(fnameTmp, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME_TMP);
11,183✔
127

128
  diskId = tfsSearch(pTfs, 0, fname);
11,183✔
129
  if (diskId >= 0) {
11,158!
130
    return diskId;
×
131
  }
132
  diskId = tfsSearch(pTfs, 0, fnameTmp);
11,158✔
133
  if (diskId >= 0) {
11,181!
134
    return diskId;
×
135
  }
136

137
  // alloc
138
  int32_t     disks[TFS_MAX_DISKS_PER_TIER] = {0};
11,181✔
139
  int32_t     numOfVnodes = 0;
11,181✔
140
  SVnodeObj **ppVnodes = NULL;
11,181✔
141

142
  code = taosThreadMutexLock(&pMgmt->mutex);
11,181✔
143
  if (code != 0) {
11,186!
144
    return code;
×
145
  }
146

147
  code = vmGetAllVnodeListFromHashWithCreating(pMgmt, &numOfVnodes, &ppVnodes);
11,186✔
148
  if (code != 0) {
11,186!
149
    int32_t r = taosThreadMutexUnlock(&pMgmt->mutex);
×
150
    if (r != 0) {
×
151
      dError("vgId:%d, failed to unlock mutex since %s", vgId, tstrerror(r));
×
152
    }
153
    return code;
×
154
  }
155

156
  for (int32_t v = 0; v < numOfVnodes; v++) {
54,715✔
157
    SVnodeObj *pVnode = ppVnodes[v];
43,529✔
158
    disks[pVnode->diskPrimary] += 1;
43,529✔
159
  }
160

161
  int32_t minVal = INT_MAX;
11,186✔
162
  int32_t ndisk = tfsGetDisksAtLevel(pTfs, 0);
11,186✔
163
  diskId = 0;
11,186✔
164
  for (int32_t id = 0; id < ndisk; id++) {
22,617✔
165
    if (minVal > disks[id]) {
11,431✔
166
      minVal = disks[id];
11,230✔
167
      diskId = id;
11,230✔
168
    }
169
  }
170
  code = vmRegisterCreatingState(pMgmt, vgId, diskId);
11,186✔
171
  if (code != 0) {
11,186!
172
    int32_t r = taosThreadMutexUnlock(&pMgmt->mutex);
×
173
    if (r != 0) {
×
174
      dError("vgId:%d, failed to unlock mutex since %s", vgId, tstrerror(r));
×
175
    }
176
    goto _OVER;
×
177
  }
178

179
  code = taosThreadMutexUnlock(&pMgmt->mutex);
11,186✔
180
  if (code != 0) {
11,186!
181
    goto _OVER;
×
182
  }
183

184
_OVER:
11,186✔
185

186
  for (int32_t i = 0; i < numOfVnodes; ++i) {
54,715✔
187
    if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
43,529!
188
    vmReleaseVnode(pMgmt, ppVnodes[i]);
43,529✔
189
  }
190
  if (ppVnodes != NULL) {
11,186!
191
    taosMemoryFree(ppVnodes);
11,186✔
192
  }
193

194
  if (code != 0) {
11,186!
195
    dError("vgId:%d, failed to alloc disk since %s", vgId, tstrerror(code));
×
196
    return code;
×
197
  } else {
198
    dInfo("vgId:%d, alloc disk:%d of level 0. ndisk:%d, vnodes: %d", vgId, diskId, ndisk, numOfVnodes);
11,186!
199
    return diskId;
11,186✔
200
  }
201
}
202

203
void vmCleanPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) { vmUnRegisterCreatingState(pMgmt, vgId); }
11,186✔
204

205
SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict) {
49,738,191✔
206
  SVnodeObj *pVnode = NULL;
49,738,191✔
207

208
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
49,738,191✔
209
  int32_t r = taosHashGetDup(pMgmt->runngingHash, &vgId, sizeof(int32_t), (void *)&pVnode);
49,859,127✔
210
  if (pVnode == NULL || strict && (pVnode->dropped || pVnode->failed)) {
49,833,178!
211
    terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
62,235✔
212
    pVnode = NULL;
59,184✔
213
  } else {
214
    int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
49,770,943✔
215
    dTrace("vgId:%d, acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
49,791,324✔
216
  }
217
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
49,850,513✔
218

219
  return pVnode;
49,850,784✔
220
}
221

222
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) { return vmAcquireVnodeImpl(pMgmt, vgId, true); }
49,725,075✔
223

224
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
49,968,672✔
225
  if (pVnode == NULL) return;
49,968,672!
226

227
  //(void)taosThreadRwlockRdlock(&pMgmt->lock);
228
  int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
49,968,672✔
229
  dTrace("vgId:%d, release vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
49,998,280✔
230
  //(void)taosThreadRwlockUnlock(&pMgmt->lock);
231
}
232

233
static int32_t vmRegisterRunningState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
13,776✔
234
  SVnodeObj *pOld = NULL;
13,776✔
235

236
  int32_t r = taosHashGetDup(pMgmt->runngingHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
13,776✔
237
  if (r != 0) {
13,776!
NEW
238
    dError("vgId:%d, failed to get vnode from hash", pVnode->vgId);
×
239
  }
240
  if (pOld) {
13,776!
NEW
241
    vmFreeVnodeObj(&pOld);
×
242
  }
243
  int32_t code = taosHashPut(pMgmt->runngingHash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
13,776✔
244

245
  return code;
13,776✔
246
}
247

248
static void vmUnRegisterRunningState(SVnodeMgmt *pMgmt, int32_t vgId) {
13,776✔
249
  dInfo("vgId:%d, remove from hash", vgId);
13,776!
250
  int32_t r = taosHashRemove(pMgmt->runngingHash, &vgId, sizeof(int32_t));
13,776✔
251
  if (r != 0) {
13,776!
NEW
252
    dError("vgId:%d, failed to remove vnode since %s", vgId, tstrerror(r));
×
253
  }
254
}
13,776✔
255

256
static int32_t vmRegisterClosedState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
1,467✔
257
  int32_t    code = 0;
1,467✔
258
  SVnodeObj *pClosedVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
1,467✔
259
  if (pClosedVnode == NULL) {
1,467!
NEW
260
    dError("failed to alloc vnode since %s", terrstr());
×
NEW
261
    return terrno;
×
262
  }
263
  (void)memset(pClosedVnode, 0, sizeof(SVnodeObj));
1,467✔
264

265
  pClosedVnode->vgId = pVnode->vgId;
1,467✔
266
  pClosedVnode->dropped = pVnode->dropped;
1,467✔
267
  pClosedVnode->vgVersion = pVnode->vgVersion;
1,467✔
268
  pClosedVnode->diskPrimary = pVnode->diskPrimary;
1,467✔
269
  pClosedVnode->toVgId = pVnode->toVgId;
1,467✔
270

271
  SVnodeObj *pOld = NULL;
1,467✔
272
  int32_t    r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
1,467✔
273
  if (r != 0) {
1,467!
NEW
274
    dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
×
275
  }
276
  if (pOld) {
1,467!
NEW
277
    vmFreeVnodeObj(&pOld);
×
278
  }
279
  dInfo("vgId:%d, put vnode to closedHash", pVnode->vgId);
1,467!
280
  r = taosHashPut(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), &pClosedVnode, sizeof(SVnodeObj *));
1,467✔
281
  if (r != 0) {
1,467!
NEW
282
    dError("vgId:%d, failed to put vnode to closedHash", pVnode->vgId);
×
283
  }
284

285
  return code;
1,467✔
286
}
287

288
static void vmUnRegisterClosedState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
13,776✔
289
  SVnodeObj *pOld = NULL;
13,776✔
290
  int32_t    r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
13,776✔
291
  if (r != 0) {
13,776!
NEW
292
    dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
×
293
  }
294
  if (pOld != NULL) {
13,776✔
295
    vmFreeVnodeObj(&pOld);
1,467✔
296
    dInfo("vgId:%d, remove from closedHash", pVnode->vgId);
1,467!
297
    r = taosHashRemove(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t));
1,467✔
298
    if (r != 0) {
1,467!
NEW
299
      dError("vgId:%d, failed to remove vnode from hash", pVnode->vgId);
×
300
    }
301
  }
302
}
13,776✔
303

304
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
13,776✔
305
  SVnodeObj *pVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
13,776✔
306
  if (pVnode == NULL) {
13,776!
307
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
308
    return -1;
×
309
  }
310

311
  pVnode->vgId = pCfg->vgId;
13,776✔
312
  pVnode->vgVersion = pCfg->vgVersion;
13,776✔
313
  pVnode->diskPrimary = pCfg->diskPrimary;
13,776✔
314
  pVnode->refCount = 0;
13,776✔
315
  pVnode->dropped = 0;
13,776✔
316
  pVnode->failed = 0;
13,776✔
317
  pVnode->path = taosStrdup(pCfg->path);
13,776✔
318
  pVnode->pImpl = pImpl;
13,776✔
319

320
  if (pVnode->path == NULL) {
13,776!
321
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
322
    taosMemoryFree(pVnode);
×
323
    return -1;
×
324
  }
325

326
  if (pImpl) {
13,776!
327
    if (vmAllocQueue(pMgmt, pVnode) != 0) {
13,776!
328
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
329
      taosMemoryFree(pVnode->path);
×
330
      taosMemoryFree(pVnode);
×
331
      return -1;
×
332
    }
333
  } else {
334
    pVnode->failed = 1;
×
335
  }
336

337
  (void)taosThreadRwlockWrlock(&pMgmt->lock);
13,776✔
338
  int32_t code = vmRegisterRunningState(pMgmt, pVnode);
13,776✔
339
  vmUnRegisterClosedState(pMgmt, pVnode);
13,776✔
340
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
13,776✔
341

342
  return code;
13,776✔
343
}
344

345
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal, bool keepClosed) {
13,774✔
346
  char path[TSDB_FILENAME_LEN] = {0};
13,774✔
347
  bool atExit = true;
13,774✔
348

349
  if (pVnode->pImpl && vnodeIsLeader(pVnode->pImpl)) {
13,774!
350
    vnodeProposeCommitOnNeed(pVnode->pImpl, atExit);
11,607✔
351
  }
352

353
  (void)taosThreadRwlockWrlock(&pMgmt->lock);
13,767✔
354
  vmUnRegisterRunningState(pMgmt, pVnode->vgId);
13,776✔
355
  if (keepClosed) {
13,776✔
356
    if (vmRegisterClosedState(pMgmt, pVnode) != 0) {
1,467!
UNCOV
357
      (void)taosThreadRwlockUnlock(&pMgmt->lock);
×
358
      return;
×
359
    };
360
  }
361
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
13,776✔
362

363
  vmReleaseVnode(pMgmt, pVnode);
13,776✔
364

365
  if (pVnode->failed) {
13,776!
366
    goto _closed;
×
367
  }
368
  dInfo("vgId:%d, pre close", pVnode->vgId);
13,776!
369
  vnodePreClose(pVnode->pImpl);
13,776✔
370

371
  dInfo("vgId:%d, wait for vnode ref become 0", pVnode->vgId);
13,774!
372
  while (pVnode->refCount > 0) taosMsleep(10);
13,776!
373

374
  dInfo("vgId:%d, wait for vnode write queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteW.queue,
13,776!
375
        taosQueueGetThreadId(pVnode->pWriteW.queue));
376
  tMultiWorkerCleanup(&pVnode->pWriteW);
13,776✔
377

378
  dInfo("vgId:%d, wait for vnode sync queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue,
13,776!
379
        taosQueueGetThreadId(pVnode->pSyncW.queue));
380
  tMultiWorkerCleanup(&pVnode->pSyncW);
13,776✔
381

382
  dInfo("vgId:%d, wait for vnode sync rd queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncRdW.queue,
13,776!
383
        taosQueueGetThreadId(pVnode->pSyncRdW.queue));
384
  tMultiWorkerCleanup(&pVnode->pSyncRdW);
13,776✔
385

386
  dInfo("vgId:%d, wait for vnode apply queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue,
13,776!
387
        taosQueueGetThreadId(pVnode->pApplyW.queue));
388
  tMultiWorkerCleanup(&pVnode->pApplyW);
13,776✔
389

390
  dInfo("vgId:%d, wait for vnode fetch queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
13,776!
391
        taosQueueGetThreadId(pVnode->pFetchQ));
392
  while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
13,776!
393

394
  dInfo("vgId:%d, wait for vnode query queue:%p is empty", pVnode->vgId, pVnode->pQueryQ);
13,776!
395
  while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
13,777✔
396

397
  tqNotifyClose(pVnode->pImpl->pTq);
13,776✔
398
  dInfo("vgId:%d, wait for vnode stream queue:%p is empty", pVnode->vgId, pVnode->pStreamQ);
13,775!
399
  while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10);
13,806✔
400

401
  dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId);
13,776!
402

403
  dInfo("vgId:%d, post close", pVnode->vgId);
13,776!
404
  vnodePostClose(pVnode->pImpl);
13,776✔
405

406
  vmFreeQueue(pMgmt, pVnode);
13,775✔
407

408
  if (commitAndRemoveWal) {
13,776✔
409
    dInfo("vgId:%d, commit data for vnode split", pVnode->vgId);
88!
410
    if (vnodeSyncCommit(pVnode->pImpl) != 0) {
88!
411
      dError("vgId:%d, failed to commit data", pVnode->vgId);
×
412
    }
413
    if (vnodeBegin(pVnode->pImpl) != 0) {
88!
414
      dError("vgId:%d, failed to begin", pVnode->vgId);
×
415
    }
416
    dInfo("vgId:%d, commit data finished", pVnode->vgId);
88!
417
  }
418

419
  int32_t nodeId = vnodeNodeId(pVnode->pImpl);
13,776✔
420
  vnodeClose(pVnode->pImpl);
13,775✔
421
  pVnode->pImpl = NULL;
13,776✔
422

423
_closed:
13,776✔
424
  dInfo("vgId:%d, vnode is closed", pVnode->vgId);
13,776!
425

426
  if (commitAndRemoveWal) {
13,776✔
427
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d%swal", TD_DIRSEP, pVnode->vgId, TD_DIRSEP);
88✔
428
    dInfo("vgId:%d, remove all wals, path:%s", pVnode->vgId, path);
88!
429
    if (tfsRmdir(pMgmt->pTfs, path) != 0) {
88!
430
      dTrace("vgId:%d, failed to remove wals, path:%s", pVnode->vgId, path);
×
431
    }
432
    if (tfsMkdir(pMgmt->pTfs, path) != 0) {
88!
433
      dTrace("vgId:%d, failed to create wals, path:%s", pVnode->vgId, path);
×
434
    }
435
  }
436

437
  if (pVnode->dropped) {
13,776✔
438
    dInfo("vgId:%d, vnode is destroyed, dropped:%d", pVnode->vgId, pVnode->dropped);
5,184!
439
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId);
5,184✔
440
    vnodeDestroy(pVnode->vgId, path, pMgmt->pTfs, nodeId);
5,184✔
441
  }
442

443
  vmFreeVnodeObj(&pVnode);
13,776✔
444
}
445

NEW
446
void vmCloseFailedVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
×
NEW
447
  int32_t r = 0;
×
NEW
448
  r = taosThreadRwlockWrlock(&pMgmt->lock);
×
NEW
449
  if (r != 0) {
×
NEW
450
    dError("vgId:%d, failed to lock since %s", vgId, tstrerror(r));
×
451
  }
NEW
452
  if (r == 0) {
×
NEW
453
    vmUnRegisterRunningState(pMgmt, vgId);
×
454
  }
NEW
455
  r = taosThreadRwlockUnlock(&pMgmt->lock);
×
NEW
456
  if (r != 0) {
×
NEW
457
    dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r));
×
458
  }
NEW
459
}
×
460

461
static int32_t vmRestoreVgroupId(SWrapperCfg *pCfg, STfs *pTfs) {
×
462
  int32_t srcVgId = pCfg->vgId;
×
463
  int32_t dstVgId = pCfg->toVgId;
×
464
  if (dstVgId == 0) return 0;
×
465

466
  char srcPath[TSDB_FILENAME_LEN];
467
  char dstPath[TSDB_FILENAME_LEN];
468

469
  snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
×
470
  snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
×
471

472
  int32_t diskPrimary = pCfg->diskPrimary;
×
473
  int32_t vgId = vnodeRestoreVgroupId(srcPath, dstPath, srcVgId, dstVgId, diskPrimary, pTfs);
×
474
  if (vgId <= 0) {
×
475
    dError("vgId:%d, failed to restore vgroup id. srcPath: %s", pCfg->vgId, srcPath);
×
476
    return -1;
×
477
  }
478

479
  pCfg->vgId = vgId;
×
480
  pCfg->toVgId = 0;
×
481
  return 0;
×
482
}
483

484
static void *vmOpenVnodeInThread(void *param) {
1,030✔
485
  SVnodeThread *pThread = param;
1,030✔
486
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
1,030✔
487
  char          path[TSDB_FILENAME_LEN];
488

489
  dInfo("thread:%d, start to open or destroy %d vnodes", pThread->threadIndex, pThread->vnodeNum);
1,030!
490
  setThreadName("open-vnodes");
1,031✔
491

492
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
2,066✔
493
    SWrapperCfg *pCfg = &pThread->pCfgs[v];
1,035✔
494
    if (pCfg->dropped) {
1,035!
495
      char stepDesc[TSDB_STEP_DESC_LEN] = {0};
×
496
      snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to destroy, %d of %d have been dropped", pCfg->vgId,
×
497
               pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
498
      tmsgReportStartup("vnode-destroy", stepDesc);
×
499

500
      snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
×
501
      vnodeDestroy(pCfg->vgId, path, pMgmt->pTfs, 0);
×
502
      pThread->updateVnodesList = true;
×
503
      pThread->dropped++;
×
504
      (void)atomic_add_fetch_32(&pMgmt->state.dropVnodes, 1);
×
505
      continue;
×
506
    }
507

508
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
1,035✔
509
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
1,035✔
510
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
511
    tmsgReportStartup("vnode-open", stepDesc);
1,035✔
512

513
    if (pCfg->toVgId) {
1,035!
514
      if (vmRestoreVgroupId(pCfg, pMgmt->pTfs) != 0) {
×
515
        dError("vgId:%d, failed to restore vgroup id by thread:%d", pCfg->vgId, pThread->threadIndex);
×
516
        pThread->failed++;
×
517
        continue;
×
518
      }
519
      pThread->updateVnodesList = true;
×
520
    }
521

522
    int32_t diskPrimary = pCfg->diskPrimary;
1,035✔
523
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
1,035✔
524

525
    SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, false);
1,035✔
526

527
    if (pImpl == NULL) {
1,035!
528
      dError("vgId:%d, failed to open vnode by thread:%d since %s", pCfg->vgId, pThread->threadIndex, terrstr());
×
529
      if (terrno != TSDB_CODE_NEED_RETRY) {
×
530
        pThread->failed++;
×
531
        continue;
×
532
      }
533
    }
534

535
    if (vmOpenVnode(pMgmt, pCfg, pImpl) != 0) {
1,035!
536
      dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
×
537
      pThread->failed++;
×
538
      continue;
×
539
    }
540

541
    dInfo("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
1,035!
542
    pThread->opened++;
1,035✔
543
    (void)atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
1,035✔
544
  }
545

546
  dInfo("thread:%d, numOfVnodes:%d, opened:%d dropped:%d failed:%d", pThread->threadIndex, pThread->vnodeNum,
1,031!
547
        pThread->opened, pThread->dropped, pThread->failed);
548
  return NULL;
1,031✔
549
}
550

551
static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
2,412✔
552
  pMgmt->runngingHash =
2,412✔
553
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
2,412✔
554
  if (pMgmt->runngingHash == NULL) {
2,412!
555
    dError("failed to init vnode hash since %s", terrstr());
×
556
    return TSDB_CODE_OUT_OF_MEMORY;
×
557
  }
558

559
  pMgmt->closedHash =
2,412✔
560
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
2,412✔
561
  if (pMgmt->closedHash == NULL) {
2,412!
562
    dError("failed to init vnode closed hash since %s", terrstr());
×
563
    return TSDB_CODE_OUT_OF_MEMORY;
×
564
  }
565

566
  pMgmt->creatingHash =
2,412✔
567
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
2,412✔
568
  if (pMgmt->creatingHash == NULL) {
2,412!
569
    dError("failed to init vnode creatingHash hash since %s", terrstr());
×
570
    return TSDB_CODE_OUT_OF_MEMORY;
×
571
  }
572

573
  SWrapperCfg *pCfgs = NULL;
2,412✔
574
  int32_t      numOfVnodes = 0;
2,412✔
575
  if (vmGetVnodeListFromFile(pMgmt, &pCfgs, &numOfVnodes) != 0) {
2,412!
576
    dInfo("failed to get vnode list from disk since %s", terrstr());
×
577
    return -1;
×
578
  }
579

580
  pMgmt->state.totalVnodes = numOfVnodes;
2,412✔
581

582
  int32_t threadNum = tsNumOfCores / 2;
2,412✔
583
  if (threadNum < 1) threadNum = 1;
2,412!
584
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
2,412✔
585

586
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
2,412✔
587
  if (threads == NULL) {
2,412!
588
    dError("failed to allocate memory for threads since %s", terrstr());
×
589
    taosMemoryFree(pCfgs);
×
590
    return terrno;
×
591
  }
592

593
  for (int32_t t = 0; t < threadNum; ++t) {
50,652✔
594
    threads[t].threadIndex = t;
48,240✔
595
    threads[t].pMgmt = pMgmt;
48,240✔
596
    threads[t].pCfgs = taosMemoryCalloc(vnodesPerThread, sizeof(SWrapperCfg));
48,240✔
597
  }
598

599
  for (int32_t v = 0; v < numOfVnodes; ++v) {
3,447✔
600
    int32_t       t = v % threadNum;
1,035✔
601
    SVnodeThread *pThread = &threads[t];
1,035✔
602
    pThread->pCfgs[pThread->vnodeNum++] = pCfgs[v];
1,035✔
603
  }
604

605
  dInfo("open %d vnodes with %d threads", numOfVnodes, threadNum);
2,412!
606

607
  for (int32_t t = 0; t < threadNum; ++t) {
50,652✔
608
    SVnodeThread *pThread = &threads[t];
48,240✔
609
    if (pThread->vnodeNum == 0) continue;
48,240✔
610

611
    TdThreadAttr thAttr;
612
    (void)taosThreadAttrInit(&thAttr);
1,031✔
613
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,031✔
614
    if (taosThreadCreate(&pThread->thread, &thAttr, vmOpenVnodeInThread, pThread) != 0) {
1,031!
615
      dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
×
616
    }
617

618
    (void)taosThreadAttrDestroy(&thAttr);
1,031✔
619
  }
620

621
  bool updateVnodesList = false;
2,412✔
622

623
  for (int32_t t = 0; t < threadNum; ++t) {
50,652✔
624
    SVnodeThread *pThread = &threads[t];
48,240✔
625
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
48,240!
626
      (void)taosThreadJoin(pThread->thread, NULL);
1,031✔
627
      taosThreadClear(&pThread->thread);
1,031✔
628
    }
629
    taosMemoryFree(pThread->pCfgs);
48,240✔
630
    if (pThread->updateVnodesList) updateVnodesList = true;
48,240!
631
  }
632
  taosMemoryFree(threads);
2,412✔
633
  taosMemoryFree(pCfgs);
2,412✔
634

635
  if ((pMgmt->state.openVnodes + pMgmt->state.dropVnodes) != pMgmt->state.totalVnodes) {
2,412!
636
    dError("there are total vnodes:%d, opened:%d", pMgmt->state.totalVnodes, pMgmt->state.openVnodes);
×
637
    terrno = TSDB_CODE_VND_INIT_FAILED;
×
638
    return -1;
×
639
  }
640

641
  if (updateVnodesList && vmWriteVnodeListToFile(pMgmt) != 0) {
2,412!
642
    dError("failed to write vnode list since %s", terrstr());
×
643
    return -1;
×
644
  }
645

646
  dInfo("successfully opened %d vnodes", pMgmt->state.totalVnodes);
2,412!
647
  return 0;
2,412✔
648
}
649

650
static void *vmCloseVnodeInThread(void *param) {
6,972✔
651
  SVnodeThread *pThread = param;
6,972✔
652
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
6,972✔
653

654
  dInfo("thread:%d, start to close %d vnodes", pThread->threadIndex, pThread->vnodeNum);
6,972✔
655
  setThreadName("close-vnodes");
6,974✔
656

657
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
14,009✔
658
    SVnodeObj *pVnode = pThread->ppVnodes[v];
7,037✔
659

660
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
7,037✔
661
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to close, %d of %d have been closed", pVnode->vgId,
7,037✔
662
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
663
    tmsgReportStartup("vnode-close", stepDesc);
7,037✔
664

665
    vmCloseVnode(pMgmt, pVnode, false, false);
7,036✔
666
  }
667

668
  dInfo("thread:%d, numOfVnodes:%d is closed", pThread->threadIndex, pThread->vnodeNum);
6,972!
669
  return NULL;
6,972✔
670
}
671

672
static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
2,412✔
673
  int32_t code = 0;
2,412✔
674
  dInfo("start to close all vnodes");
2,412!
675
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
2,412✔
676
  dInfo("vnodes mgmt worker is stopped");
2,412!
677
  tSingleWorkerCleanup(&pMgmt->mgmtMultiWorker);
2,412✔
678
  dInfo("vnodes multiple mgmt worker is stopped");
2,412!
679

680
  int32_t     numOfVnodes = 0;
2,412✔
681
  SVnodeObj **ppVnodes = NULL;
2,412✔
682
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
2,412✔
683
  if (code != 0) {
2,412!
684
    dError("failed to get vnode list since %s", tstrerror(code));
×
685
    return;
×
686
  }
687

688
  int32_t threadNum = tsNumOfCores / 2;
2,412✔
689
  if (threadNum < 1) threadNum = 1;
2,412!
690
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
2,412✔
691

692
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
2,412✔
693
  for (int32_t t = 0; t < threadNum; ++t) {
50,652✔
694
    threads[t].threadIndex = t;
48,240✔
695
    threads[t].pMgmt = pMgmt;
48,240✔
696
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
48,240✔
697
  }
698

699
  for (int32_t v = 0; v < numOfVnodes; ++v) {
9,449✔
700
    int32_t       t = v % threadNum;
7,037✔
701
    SVnodeThread *pThread = &threads[t];
7,037✔
702
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
7,037!
703
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
7,037✔
704
    }
705
  }
706

707
  pMgmt->state.openVnodes = 0;
2,412✔
708
  dInfo("close %d vnodes with %d threads", numOfVnodes, threadNum);
2,412!
709

710
  for (int32_t t = 0; t < threadNum; ++t) {
50,652✔
711
    SVnodeThread *pThread = &threads[t];
48,240✔
712
    if (pThread->vnodeNum == 0) continue;
48,240✔
713

714
    TdThreadAttr thAttr;
715
    (void)taosThreadAttrInit(&thAttr);
6,972✔
716
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
6,972✔
717
    if (taosThreadCreate(&pThread->thread, &thAttr, vmCloseVnodeInThread, pThread) != 0) {
6,972!
718
      dError("thread:%d, failed to create thread to close vnode since %s", pThread->threadIndex, strerror(errno));
×
719
    }
720

721
    (void)taosThreadAttrDestroy(&thAttr);
6,972✔
722
  }
723

724
  for (int32_t t = 0; t < threadNum; ++t) {
50,652✔
725
    SVnodeThread *pThread = &threads[t];
48,240✔
726
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
48,240!
727
      (void)taosThreadJoin(pThread->thread, NULL);
6,972✔
728
      taosThreadClear(&pThread->thread);
6,972✔
729
    }
730
    taosMemoryFree(pThread->ppVnodes);
48,240✔
731
  }
732
  taosMemoryFree(threads);
2,412✔
733

734
  if (ppVnodes != NULL) {
2,412!
735
    taosMemoryFree(ppVnodes);
2,412✔
736
  }
737

738
  if (pMgmt->runngingHash != NULL) {
2,412!
739
    taosHashCleanup(pMgmt->runngingHash);
2,412✔
740
    pMgmt->runngingHash = NULL;
2,412✔
741
  }
742

743
  void *pIter = taosHashIterate(pMgmt->closedHash, NULL);
2,412✔
744
  while (pIter) {
2,412!
745
    SVnodeObj **ppVnode = pIter;
×
746
    vmFreeVnodeObj(ppVnode);
×
747
    pIter = taosHashIterate(pMgmt->closedHash, pIter);
×
748
  }
749

750
  if (pMgmt->closedHash != NULL) {
2,412!
751
    taosHashCleanup(pMgmt->closedHash);
2,412✔
752
    pMgmt->closedHash = NULL;
2,412✔
753
  }
754

755
  pIter = taosHashIterate(pMgmt->creatingHash, NULL);
2,412✔
756
  while (pIter) {
2,412!
757
    SVnodeObj **ppVnode = pIter;
×
758
    vmFreeVnodeObj(ppVnode);
×
759
    pIter = taosHashIterate(pMgmt->creatingHash, pIter);
×
760
  }
761

762
  if (pMgmt->creatingHash != NULL) {
2,412!
763
    taosHashCleanup(pMgmt->creatingHash);
2,412✔
764
    pMgmt->creatingHash = NULL;
2,412✔
765
  }
766

767
  dInfo("total vnodes:%d are all closed", numOfVnodes);
2,412!
768
}
769

770
static void vmCleanup(SVnodeMgmt *pMgmt) {
2,412✔
771
  vmCloseVnodes(pMgmt);
2,412✔
772
  vmStopWorker(pMgmt);
2,412✔
773
  vnodeCleanup();
2,412✔
774
  (void)taosThreadRwlockDestroy(&pMgmt->lock);
2,412✔
775
  (void)taosThreadMutexDestroy(&pMgmt->mutex);
2,412✔
776
  (void)taosThreadMutexDestroy(&pMgmt->fileLock);
2,412✔
777
  taosMemoryFree(pMgmt);
2,412✔
778
}
2,412✔
779

780
static void vmCheckSyncTimeout(SVnodeMgmt *pMgmt) {
2,692✔
781
  int32_t     code = 0;
2,692✔
782
  int32_t     numOfVnodes = 0;
2,692✔
783
  SVnodeObj **ppVnodes = NULL;
2,692✔
784
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
2,692✔
785
  if (code != 0) {
2,692!
786
    dError("failed to get vnode list since %s", tstrerror(code));
×
787
    return;
×
788
  }
789

790
  if (ppVnodes != NULL) {
2,692!
791
    for (int32_t i = 0; i < numOfVnodes; ++i) {
33,775✔
792
      SVnodeObj *pVnode = ppVnodes[i];
31,083✔
793
      if (!pVnode->failed) {
31,083!
794
        vnodeSyncCheckTimeout(pVnode->pImpl);
31,083✔
795
      }
796
      vmReleaseVnode(pMgmt, pVnode);
31,083✔
797
    }
798
    taosMemoryFree(ppVnodes);
2,692✔
799
  }
800
}
801

802
static void *vmThreadFp(void *param) {
2,412✔
803
  SVnodeMgmt *pMgmt = param;
2,412✔
804
  int64_t     lastTime = 0;
2,412✔
805
  setThreadName("vnode-timer");
2,412✔
806

807
  while (1) {
1,053,744✔
808
    lastTime++;
1,056,156✔
809
    taosMsleep(100);
1,056,156✔
810
    if (pMgmt->stop) break;
1,056,156✔
811
    if (lastTime % 10 != 0) continue;
1,053,744✔
812

813
    int64_t sec = lastTime / 10;
104,311✔
814
    if (sec % (VNODE_TIMEOUT_SEC / 2) == 0) {
104,311✔
815
      vmCheckSyncTimeout(pMgmt);
2,692✔
816
    }
817
  }
818

819
  return NULL;
2,412✔
820
}
821

822
static int32_t vmInitTimer(SVnodeMgmt *pMgmt) {
2,412✔
823
  int32_t      code = 0;
2,412✔
824
  TdThreadAttr thAttr;
825
  (void)taosThreadAttrInit(&thAttr);
2,412✔
826
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
2,412✔
827
  if (taosThreadCreate(&pMgmt->thread, &thAttr, vmThreadFp, pMgmt) != 0) {
2,412!
828
    code = TAOS_SYSTEM_ERROR(errno);
×
829
    dError("failed to create vnode timer thread since %s", tstrerror(code));
×
830
    return code;
×
831
  }
832

833
  (void)taosThreadAttrDestroy(&thAttr);
2,412✔
834
  return 0;
2,412✔
835
}
836

837
static void vmCleanupTimer(SVnodeMgmt *pMgmt) {
2,412✔
838
  pMgmt->stop = true;
2,412✔
839
  if (taosCheckPthreadValid(pMgmt->thread)) {
2,412!
840
    (void)taosThreadJoin(pMgmt->thread, NULL);
2,412✔
841
    taosThreadClear(&pMgmt->thread);
2,412✔
842
  }
843
}
2,412✔
844

845
static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
2,412✔
846
  int32_t code = -1;
2,412✔
847

848
  SVnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SVnodeMgmt));
2,412✔
849
  if (pMgmt == NULL) {
2,412!
850
    code = terrno;
×
851
    goto _OVER;
×
852
  }
853

854
  pMgmt->pData = pInput->pData;
2,412✔
855
  pMgmt->path = pInput->path;
2,412✔
856
  pMgmt->name = pInput->name;
2,412✔
857
  pMgmt->msgCb = pInput->msgCb;
2,412✔
858
  pMgmt->msgCb.putToQueueFp = (PutToQueueFp)vmPutRpcMsgToQueue;
2,412✔
859
  pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
2,412✔
860
  pMgmt->msgCb.mgmt = pMgmt;
2,412✔
861

862
  code = taosThreadRwlockInit(&pMgmt->lock, NULL);
2,412✔
863
  if (code != 0) {
2,412!
864
    code = TAOS_SYSTEM_ERROR(errno);
×
865
    goto _OVER;
×
866
  }
867

868
  code = taosThreadMutexInit(&pMgmt->mutex, NULL);
2,412✔
869
  if (code != 0) {
2,412!
870
    code = TAOS_SYSTEM_ERROR(errno);
×
871
    goto _OVER;
×
872
  }
873

874
  code = taosThreadMutexInit(&pMgmt->fileLock, NULL);
2,412✔
875
  if (code != 0) {
2,412!
876
    code = TAOS_SYSTEM_ERROR(errno);
×
877
    goto _OVER;
×
878
  }
879

880
  pMgmt->pTfs = pInput->pTfs;
2,412✔
881
  if (pMgmt->pTfs == NULL) {
2,412!
882
    dError("tfs is null.");
×
883
    goto _OVER;
×
884
  }
885
  tmsgReportStartup("vnode-tfs", "initialized");
2,412✔
886
  if ((code = walInit(pInput->stopDnodeFp)) != 0) {
2,412!
887
    dError("failed to init wal since %s", tstrerror(code));
×
888
    goto _OVER;
×
889
  }
890

891
  tmsgReportStartup("vnode-wal", "initialized");
2,412✔
892

893
  if ((code = syncInit()) != 0) {
2,412!
894
    dError("failed to open sync since %s", tstrerror(code));
×
895
    goto _OVER;
×
896
  }
897
  tmsgReportStartup("vnode-sync", "initialized");
2,412✔
898

899
  if ((code = vnodeInit(tsNumOfCommitThreads, pInput->stopDnodeFp)) != 0) {
2,412!
900
    dError("failed to init vnode since %s", tstrerror(code));
×
901
    goto _OVER;
×
902
  }
903
  tmsgReportStartup("vnode-commit", "initialized");
2,412✔
904

905
  if ((code = vmStartWorker(pMgmt)) != 0) {
2,412!
906
    dError("failed to init workers since %s", tstrerror(code));
×
907
    goto _OVER;
×
908
  }
909
  tmsgReportStartup("vnode-worker", "initialized");
2,412✔
910

911
  if ((code = vmOpenVnodes(pMgmt)) != 0) {
2,412!
912
    dError("failed to open all vnodes since %s", tstrerror(code));
×
913
    goto _OVER;
×
914
  }
915
  tmsgReportStartup("vnode-vnodes", "initialized");
2,412✔
916

917
  if ((code = udfcOpen()) != 0) {
2,412!
918
    dError("failed to open udfc in vnode since %s", tstrerror(code));
×
919
    goto _OVER;
×
920
  }
921

922
  code = 0;
2,412✔
923

924
_OVER:
2,412✔
925
  if (code == 0) {
2,412!
926
    pOutput->pMgmt = pMgmt;
2,412✔
927
  } else {
928
    dError("failed to init vnodes-mgmt since %s", tstrerror(code));
×
929
    vmCleanup(pMgmt);
×
930
  }
931

932
  return code;
2,412✔
933
}
934

935
static int32_t vmRequire(const SMgmtInputOpt *pInput, bool *required) {
2,447✔
936
  *required = tsNumOfSupportVnodes > 0;
2,447✔
937
  return 0;
2,447✔
938
}
939

940
static void *vmRestoreVnodeInThread(void *param) {
1,031✔
941
  SVnodeThread *pThread = param;
1,031✔
942
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
1,031✔
943

944
  dInfo("thread:%d, start to restore %d vnodes", pThread->threadIndex, pThread->vnodeNum);
1,031!
945
  setThreadName("restore-vnodes");
1,031✔
946

947
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
2,066✔
948
    SVnodeObj *pVnode = pThread->ppVnodes[v];
1,035✔
949
    if (pVnode->failed) {
1,035!
950
      dError("vgId:%d, cannot restore a vnode in failed mode.", pVnode->vgId);
×
951
      continue;
×
952
    }
953

954
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
1,035✔
955
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been restored", pVnode->vgId,
1,035✔
956
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
957
    tmsgReportStartup("vnode-restore", stepDesc);
1,035✔
958

959
    int32_t code = vnodeStart(pVnode->pImpl);
1,034✔
960
    if (code != 0) {
1,035!
961
      dError("vgId:%d, failed to restore vnode by thread:%d", pVnode->vgId, pThread->threadIndex);
×
962
      pThread->failed++;
×
963
    } else {
964
      dInfo("vgId:%d, is restored by thread:%d", pVnode->vgId, pThread->threadIndex);
1,035!
965
      pThread->opened++;
1,035✔
966
      (void)atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
1,035✔
967
    }
968
  }
969

970
  dInfo("thread:%d, numOfVnodes:%d, restored:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
1,031!
971
        pThread->failed);
972
  return NULL;
1,031✔
973
}
974

975
static int32_t vmStartVnodes(SVnodeMgmt *pMgmt) {
2,412✔
976
  int32_t     code = 0;
2,412✔
977
  int32_t     numOfVnodes = 0;
2,412✔
978
  SVnodeObj **ppVnodes = NULL;
2,412✔
979
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
2,412✔
980
  if (code != 0) {
2,412!
981
    dError("failed to get vnode list since %s", tstrerror(code));
×
982
    return code;
×
983
  }
984

985
  int32_t threadNum = tsNumOfCores / 2;
2,412✔
986
  if (threadNum < 1) threadNum = 1;
2,412!
987
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
2,412✔
988

989
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
2,412✔
990
  if (threads == NULL) {
2,412!
991
    return terrno;
×
992
  }
993

994
  for (int32_t t = 0; t < threadNum; ++t) {
50,652✔
995
    threads[t].threadIndex = t;
48,240✔
996
    threads[t].pMgmt = pMgmt;
48,240✔
997
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
48,240✔
998
    if (threads[t].ppVnodes == NULL) {
48,240!
999
      code = terrno;
×
1000
      break;
×
1001
    }
1002
  }
1003

1004
  for (int32_t v = 0; v < numOfVnodes; ++v) {
3,447✔
1005
    int32_t       t = v % threadNum;
1,035✔
1006
    SVnodeThread *pThread = &threads[t];
1,035✔
1007
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
1,035!
1008
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
1,035✔
1009
    }
1010
  }
1011

1012
  pMgmt->state.openVnodes = 0;
2,412✔
1013
  pMgmt->state.dropVnodes = 0;
2,412✔
1014
  dInfo("restore %d vnodes with %d threads", numOfVnodes, threadNum);
2,412!
1015

1016
  for (int32_t t = 0; t < threadNum; ++t) {
50,652✔
1017
    SVnodeThread *pThread = &threads[t];
48,240✔
1018
    if (pThread->vnodeNum == 0) continue;
48,240✔
1019

1020
    TdThreadAttr thAttr;
1021
    (void)taosThreadAttrInit(&thAttr);
1,031✔
1022
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,031✔
1023
    if (taosThreadCreate(&pThread->thread, &thAttr, vmRestoreVnodeInThread, pThread) != 0) {
1,031!
1024
      dError("thread:%d, failed to create thread to restore vnode since %s", pThread->threadIndex, strerror(errno));
×
1025
    }
1026

1027
    (void)taosThreadAttrDestroy(&thAttr);
1,031✔
1028
  }
1029

1030
  for (int32_t t = 0; t < threadNum; ++t) {
50,652✔
1031
    SVnodeThread *pThread = &threads[t];
48,240✔
1032
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
48,240!
1033
      (void)taosThreadJoin(pThread->thread, NULL);
1,031✔
1034
      taosThreadClear(&pThread->thread);
1,031✔
1035
    }
1036
    taosMemoryFree(pThread->ppVnodes);
48,240✔
1037
  }
1038
  taosMemoryFree(threads);
2,412✔
1039

1040
  for (int32_t i = 0; i < numOfVnodes; ++i) {
3,447✔
1041
    if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
1,035!
1042
    vmReleaseVnode(pMgmt, ppVnodes[i]);
1,035✔
1043
  }
1044

1045
  if (ppVnodes != NULL) {
2,412!
1046
    taosMemoryFree(ppVnodes);
2,412✔
1047
  }
1048

1049
  return vmInitTimer(pMgmt);
2,412✔
1050

1051
_exit:
1052
  for (int32_t t = 0; t < threadNum; ++t) {
1053
    SVnodeThread *pThread = &threads[t];
1054
    taosMemoryFree(pThread->ppVnodes);
1055
  }
1056
  taosMemoryFree(threads);
1057
  return code;
1058
}
1059

1060
static void vmStop(SVnodeMgmt *pMgmt) { vmCleanupTimer(pMgmt); }
2,412✔
1061

1062
SMgmtFunc vmGetMgmtFunc() {
2,447✔
1063
  SMgmtFunc mgmtFunc = {0};
2,447✔
1064
  mgmtFunc.openFp = vmInit;
2,447✔
1065
  mgmtFunc.closeFp = (NodeCloseFp)vmCleanup;
2,447✔
1066
  mgmtFunc.startFp = (NodeStartFp)vmStartVnodes;
2,447✔
1067
  mgmtFunc.stopFp = (NodeStopFp)vmStop;
2,447✔
1068
  mgmtFunc.requiredFp = vmRequire;
2,447✔
1069
  mgmtFunc.getHandlesFp = vmGetMsgHandles;
2,447✔
1070

1071
  return mgmtFunc;
2,447✔
1072
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc