• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3620

21 Feb 2025 09:00AM UTC coverage: 63.573% (+0.2%) from 63.423%
#3620

push

travis-ci

web-flow
ci: taosBenchmark add coverage cases branch 3.0 (#29788)

* fix: add unit test for taos-tools

* fix: only .cpp include

* fix: remove no use function

* fix: restore toolsSys.c

* fix: add toolsSys case

* fix: rebuild error fixed

* fix: fix build error

* fix: support get vgroups with core and memory limit

* fix: build error for strcasecmp

* fix: add insertBasic.py case

* fix: add command line set vgroups=3

* fix: change with ns database

* toolscJson read with int replace float and add insertPrecison.py

* fix: add insertBindVGroup.json case

* fix: remove public fun removeQuotation

* fix: vgroups change method

* fix: memory leak for runInsertLimitThread slot

* insertPrecision.py word write wrong

* fix: check isFloat number

* fix: vgroups change logic error

* fix: insertBasic.py real and expect error

* fix: adjust default vgroups

* fix: adjust default vgroups modify comment

148962 of 300203 branches covered (49.62%)

Branch coverage included in aggregate %.

15 of 16 new or added lines in 1 file covered. (93.75%)

2018 existing lines in 133 files now uncovered.

233201 of 300933 relevant lines covered (77.49%)

18174406.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.74
/source/dnode/mgmt/mgmt_vnode/src/vmInt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "vmInt.h"
18
#include "libs/function/tudf.h"
19
#include "osMemory.h"
20
#include "tfs.h"
21
#include "vnd.h"
22

23
int32_t vmGetPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
10,025✔
24
  int32_t    diskId = -1;
10,025✔
25
  SVnodeObj *pVnode = NULL;
10,025✔
26

27
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
10,025✔
28
  int32_t r = taosHashGetDup(pMgmt->runngingHash, &vgId, sizeof(int32_t), (void *)&pVnode);
10,030✔
29
  if (pVnode != NULL) {
10,030!
30
    diskId = pVnode->diskPrimary;
×
31
  }
32
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
10,030✔
33
  return diskId;
10,030✔
34
}
35

36
static void vmFreeVnodeObj(SVnodeObj **ppVnode) {
23,585✔
37
  if (!ppVnode || !(*ppVnode)) return;
23,585!
38

39
  SVnodeObj *pVnode = *ppVnode;
23,585✔
40

41
  int32_t refCount = atomic_load_32(&pVnode->refCount);
23,585✔
42
  while (refCount > 0) {
23,585!
43
    dWarn("vgId:%d, vnode is refenced, retry to free in 200ms, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
×
44
    taosMsleep(200);
×
45
    refCount = atomic_load_32(&pVnode->refCount);
×
46
  }
47

48
  taosMemoryFree(pVnode->path);
23,585!
49
  taosMemoryFree(pVnode);
23,585!
50
  ppVnode[0] = NULL;
23,585✔
51
}
52

53
static int32_t vmRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId, int32_t diskId) {
10,031✔
54
  int32_t    code = 0;
10,031✔
55
  SVnodeObj *pCreatingVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
10,031!
56
  if (pCreatingVnode == NULL) {
10,031!
57
    dError("failed to alloc vnode since %s", terrstr());
×
58
    return terrno;
×
59
  }
60
  (void)memset(pCreatingVnode, 0, sizeof(SVnodeObj));
10,031✔
61

62
  pCreatingVnode->vgId = vgId;
10,031✔
63
  pCreatingVnode->diskPrimary = diskId;
10,031✔
64

65
  code = taosThreadRwlockWrlock(&pMgmt->lock);
10,031✔
66
  if (code != 0) {
10,031!
67
    taosMemoryFree(pCreatingVnode);
×
68
    return code;
×
69
  }
70

71
  dTrace("vgId:%d, put vnode into creating hash, pCreatingVnode:%p", vgId, pCreatingVnode);
10,031✔
72
  code = taosHashPut(pMgmt->creatingHash, &vgId, sizeof(int32_t), &pCreatingVnode, sizeof(SVnodeObj *));
10,031✔
73
  if (code != 0) {
10,031!
74
    dError("vgId:%d, failed to put vnode to creatingHash", vgId);
×
75
    taosMemoryFree(pCreatingVnode);
×
76
  }
77

78
  int32_t r = taosThreadRwlockUnlock(&pMgmt->lock);
10,031✔
79
  if (r != 0) {
10,031!
80
    dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r));
×
81
  }
82

83
  return code;
10,031✔
84
}
85

86
static void vmUnRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId) {
10,031✔
87
  SVnodeObj *pOld = NULL;
10,031✔
88

89
  (void)taosThreadRwlockWrlock(&pMgmt->lock);
10,031✔
90
  int32_t r = taosHashGetDup(pMgmt->creatingHash, &vgId, sizeof(int32_t), (void *)&pOld);
10,031✔
91
  if (r != 0) {
10,031!
92
    dError("vgId:%d, failed to get vnode from creating Hash", vgId);
×
93
  }
94
  dTrace("vgId:%d, remove from creating Hash", vgId);
10,031✔
95
  r = taosHashRemove(pMgmt->creatingHash, &vgId, sizeof(int32_t));
10,031✔
96
  if (r != 0) {
10,031!
97
    dError("vgId:%d, failed to remove vnode from creatingHash", vgId);
×
98
  }
99
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
10,031✔
100

101
  if (pOld) {
10,031!
102
    dTrace("vgId:%d, free vnode pOld:%p", vgId, &pOld);
10,031✔
103
    vmFreeVnodeObj(&pOld);
10,031✔
104
  }
105

106
_OVER:
×
107
  if (r != 0) {
10,031!
108
    dError("vgId:%d, failed to remove vnode from creatingHash since %s", vgId, tstrerror(r));
×
109
  }
110
}
10,031✔
111

112
int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
10,025✔
113
  int32_t code = 0;
10,025✔
114
  STfs   *pTfs = pMgmt->pTfs;
10,025✔
115
  int32_t diskId = 0;
10,025✔
116
  if (!pTfs) {
10,025!
117
    return diskId;
×
118
  }
119

120
  // search fs
121
  char vnodePath[TSDB_FILENAME_LEN] = {0};
10,025✔
122
  snprintf(vnodePath, TSDB_FILENAME_LEN - 1, "vnode%svnode%d", TD_DIRSEP, vgId);
10,025✔
123
  char fname[TSDB_FILENAME_LEN] = {0};
10,025✔
124
  char fnameTmp[TSDB_FILENAME_LEN] = {0};
10,025✔
125
  snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME);
10,025✔
126
  snprintf(fnameTmp, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME_TMP);
10,025✔
127

128
  diskId = tfsSearch(pTfs, 0, fname);
10,025✔
129
  if (diskId >= 0) {
10,001!
130
    return diskId;
×
131
  }
132
  diskId = tfsSearch(pTfs, 0, fnameTmp);
10,001✔
133
  if (diskId >= 0) {
10,009!
134
    return diskId;
×
135
  }
136

137
  // alloc
138
  int32_t     disks[TFS_MAX_DISKS_PER_TIER] = {0};
10,009✔
139
  int32_t     numOfVnodes = 0;
10,009✔
140
  SVnodeObj **ppVnodes = NULL;
10,009✔
141

142
  code = taosThreadMutexLock(&pMgmt->mutex);
10,009✔
143
  if (code != 0) {
10,031!
144
    return code;
×
145
  }
146

147
  code = vmGetAllVnodeListFromHashWithCreating(pMgmt, &numOfVnodes, &ppVnodes);
10,031✔
148
  if (code != 0) {
10,031!
149
    int32_t r = taosThreadMutexUnlock(&pMgmt->mutex);
×
150
    if (r != 0) {
×
151
      dError("vgId:%d, failed to unlock mutex since %s", vgId, tstrerror(r));
×
152
    }
153
    return code;
×
154
  }
155

156
  for (int32_t v = 0; v < numOfVnodes; v++) {
51,798✔
157
    SVnodeObj *pVnode = ppVnodes[v];
41,767✔
158
    disks[pVnode->diskPrimary] += 1;
41,767✔
159
  }
160

161
  int32_t minVal = INT_MAX;
10,031✔
162
  int32_t ndisk = tfsGetDisksAtLevel(pTfs, 0);
10,031✔
163
  diskId = 0;
10,031✔
164
  for (int32_t id = 0; id < ndisk; id++) {
20,069✔
165
    if (minVal > disks[id]) {
10,038✔
166
      minVal = disks[id];
10,033✔
167
      diskId = id;
10,033✔
168
    }
169
  }
170
  code = vmRegisterCreatingState(pMgmt, vgId, diskId);
10,031✔
171
  if (code != 0) {
10,031!
172
    int32_t r = taosThreadMutexUnlock(&pMgmt->mutex);
×
173
    if (r != 0) {
×
174
      dError("vgId:%d, failed to unlock mutex since %s", vgId, tstrerror(r));
×
175
    }
176
    goto _OVER;
×
177
  }
178

179
  code = taosThreadMutexUnlock(&pMgmt->mutex);
10,031✔
180
  if (code != 0) {
10,031!
181
    goto _OVER;
×
182
  }
183

184
_OVER:
10,031✔
185

186
  for (int32_t i = 0; i < numOfVnodes; ++i) {
51,798✔
187
    if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
41,767!
188
    vmReleaseVnode(pMgmt, ppVnodes[i]);
41,767✔
189
  }
190
  if (ppVnodes != NULL) {
10,031!
191
    taosMemoryFree(ppVnodes);
10,031!
192
  }
193

194
  if (code != 0) {
10,031!
195
    dError("vgId:%d, failed to alloc disk since %s", vgId, tstrerror(code));
×
196
    return code;
×
197
  } else {
198
    dInfo("vgId:%d, alloc disk:%d of level 0. ndisk:%d, vnodes: %d", vgId, diskId, ndisk, numOfVnodes);
10,031!
199
    return diskId;
10,031✔
200
  }
201
}
202

203
void vmCleanPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) { vmUnRegisterCreatingState(pMgmt, vgId); }
10,031✔
204

205
SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict) {
44,301,709✔
206
  SVnodeObj *pVnode = NULL;
44,301,709✔
207

208
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
44,301,709✔
209
  int32_t r = taosHashGetDup(pMgmt->runngingHash, &vgId, sizeof(int32_t), (void *)&pVnode);
44,377,390✔
210
  if (pVnode == NULL || strict && (pVnode->dropped || pVnode->failed)) {
44,364,492!
211
    terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
43,118✔
212
    pVnode = NULL;
49,867✔
213
  } else {
214
    int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
44,321,374✔
215
    dTrace("vgId:%d, acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
44,320,027✔
216
  }
217
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
44,369,898✔
218

219
  return pVnode;
44,377,916✔
220
}
221

222
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) { return vmAcquireVnodeImpl(pMgmt, vgId, true); }
44,255,291✔
223

224
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
44,500,312✔
225
  if (pVnode == NULL) return;
44,500,312!
226

227
  //(void)taosThreadRwlockRdlock(&pMgmt->lock);
228
  int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
44,500,312✔
229
  dTrace("vgId:%d, release vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
44,525,788✔
230
  //(void)taosThreadRwlockUnlock(&pMgmt->lock);
231
}
232

233
static int32_t vmRegisterRunningState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
12,334✔
234
  SVnodeObj *pOld = NULL;
12,334✔
235

236
  int32_t r = taosHashGetDup(pMgmt->runngingHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
12,334✔
237
  if (r != 0) {
12,334!
238
    dError("vgId:%d, failed to get vnode from hash", pVnode->vgId);
×
239
  }
240
  if (pOld) {
12,334!
241
    vmFreeVnodeObj(&pOld);
×
242
  }
243
  int32_t code = taosHashPut(pMgmt->runngingHash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
12,334✔
244

245
  return code;
12,334✔
246
}
247

248
static void vmUnRegisterRunningState(SVnodeMgmt *pMgmt, int32_t vgId) {
12,327✔
249
  dInfo("vgId:%d, remove from hash", vgId);
12,327!
250
  int32_t r = taosHashRemove(pMgmt->runngingHash, &vgId, sizeof(int32_t));
12,327✔
251
  if (r != 0) {
12,327!
252
    dError("vgId:%d, failed to remove vnode since %s", vgId, tstrerror(r));
×
253
  }
254
}
12,327✔
255

256
static int32_t vmRegisterClosedState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
1,247✔
257
  int32_t    code = 0;
1,247✔
258
  SVnodeObj *pClosedVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
1,247!
259
  if (pClosedVnode == NULL) {
1,247!
260
    dError("failed to alloc vnode since %s", terrstr());
×
261
    return terrno;
×
262
  }
263
  (void)memset(pClosedVnode, 0, sizeof(SVnodeObj));
1,247✔
264

265
  pClosedVnode->vgId = pVnode->vgId;
1,247✔
266
  pClosedVnode->dropped = pVnode->dropped;
1,247✔
267
  pClosedVnode->vgVersion = pVnode->vgVersion;
1,247✔
268
  pClosedVnode->diskPrimary = pVnode->diskPrimary;
1,247✔
269
  pClosedVnode->toVgId = pVnode->toVgId;
1,247✔
270

271
  SVnodeObj *pOld = NULL;
1,247✔
272
  int32_t    r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
1,247✔
273
  if (r != 0) {
1,247!
274
    dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
×
275
  }
276
  if (pOld) {
1,247!
277
    vmFreeVnodeObj(&pOld);
×
278
  }
279
  dInfo("vgId:%d, put vnode to closedHash", pVnode->vgId);
1,247!
280
  r = taosHashPut(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), &pClosedVnode, sizeof(SVnodeObj *));
1,247✔
281
  if (r != 0) {
1,247!
282
    dError("vgId:%d, failed to put vnode to closedHash", pVnode->vgId);
×
283
  }
284

285
  return code;
1,247✔
286
}
287

288
static void vmUnRegisterClosedState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
12,334✔
289
  SVnodeObj *pOld = NULL;
12,334✔
290
  int32_t    r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
12,334✔
291
  if (r != 0) {
12,334!
292
    dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
×
293
  }
294
  if (pOld != NULL) {
12,334✔
295
    vmFreeVnodeObj(&pOld);
1,247✔
296
    dInfo("vgId:%d, remove from closedHash", pVnode->vgId);
1,247!
297
    r = taosHashRemove(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t));
1,247✔
298
    if (r != 0) {
1,247!
299
      dError("vgId:%d, failed to remove vnode from hash", pVnode->vgId);
×
300
    }
301
  }
302
}
12,334✔
303

304
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
12,334✔
305
  SVnodeObj *pVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
12,334!
306
  if (pVnode == NULL) {
12,334!
307
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
308
    return -1;
×
309
  }
310

311
  pVnode->vgId = pCfg->vgId;
12,334✔
312
  pVnode->vgVersion = pCfg->vgVersion;
12,334✔
313
  pVnode->diskPrimary = pCfg->diskPrimary;
12,334✔
314
  pVnode->refCount = 0;
12,334✔
315
  pVnode->dropped = 0;
12,334✔
316
  pVnode->failed = 0;
12,334✔
317
  pVnode->path = taosStrdup(pCfg->path);
12,334!
318
  pVnode->pImpl = pImpl;
12,334✔
319

320
  if (pVnode->path == NULL) {
12,334!
321
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
322
    taosMemoryFree(pVnode);
×
323
    return -1;
×
324
  }
325

326
  if (pImpl) {
12,334!
327
    if (vmAllocQueue(pMgmt, pVnode) != 0) {
12,334!
328
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
329
      taosMemoryFree(pVnode->path);
×
330
      taosMemoryFree(pVnode);
×
331
      return -1;
×
332
    }
333
  } else {
334
    pVnode->failed = 1;
×
335
  }
336

337
  (void)taosThreadRwlockWrlock(&pMgmt->lock);
12,334✔
338
  int32_t code = vmRegisterRunningState(pMgmt, pVnode);
12,334✔
339
  vmUnRegisterClosedState(pMgmt, pVnode);
12,334✔
340
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
12,334✔
341

342
  return code;
12,334✔
343
}
344

345
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal, bool keepClosed) {
12,327✔
346
  char path[TSDB_FILENAME_LEN] = {0};
12,327✔
347
  bool atExit = true;
12,327✔
348

349
  if (pVnode->pImpl && vnodeIsLeader(pVnode->pImpl)) {
12,327✔
350
    vnodeProposeCommitOnNeed(pVnode->pImpl, atExit);
10,314✔
351
  }
352

353
  (void)taosThreadRwlockWrlock(&pMgmt->lock);
12,319✔
354
  vmUnRegisterRunningState(pMgmt, pVnode->vgId);
12,327✔
355
  if (keepClosed) {
12,327✔
356
    if (vmRegisterClosedState(pMgmt, pVnode) != 0) {
1,247!
357
      (void)taosThreadRwlockUnlock(&pMgmt->lock);
×
358
      return;
×
359
    };
360
  }
361
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
12,327✔
362

363
  vmReleaseVnode(pMgmt, pVnode);
12,327✔
364

365
  if (pVnode->failed) {
12,327!
366
    goto _closed;
×
367
  }
368
  dInfo("vgId:%d, pre close", pVnode->vgId);
12,327!
369
  vnodePreClose(pVnode->pImpl);
12,327✔
370

371
  dInfo("vgId:%d, wait for vnode ref become 0", pVnode->vgId);
12,323!
372
  while (pVnode->refCount > 0) taosMsleep(10);
12,328✔
373

374
  dInfo("vgId:%d, wait for vnode write queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteW.queue,
12,327!
375
        taosQueueGetThreadId(pVnode->pWriteW.queue));
376
  tMultiWorkerCleanup(&pVnode->pWriteW);
12,327✔
377

378
  dInfo("vgId:%d, wait for vnode sync queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue,
12,327!
379
        taosQueueGetThreadId(pVnode->pSyncW.queue));
380
  tMultiWorkerCleanup(&pVnode->pSyncW);
12,327✔
381

382
  dInfo("vgId:%d, wait for vnode sync rd queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncRdW.queue,
12,327!
383
        taosQueueGetThreadId(pVnode->pSyncRdW.queue));
384
  tMultiWorkerCleanup(&pVnode->pSyncRdW);
12,327✔
385

386
  dInfo("vgId:%d, wait for vnode apply queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue,
12,327!
387
        taosQueueGetThreadId(pVnode->pApplyW.queue));
388
  tMultiWorkerCleanup(&pVnode->pApplyW);
12,327✔
389

390
  dInfo("vgId:%d, wait for vnode fetch queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
12,326!
391
        taosQueueGetThreadId(pVnode->pFetchQ));
392
  while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
12,327!
393

394
  dInfo("vgId:%d, wait for vnode query queue:%p is empty", pVnode->vgId, pVnode->pQueryQ);
12,327!
395
  while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
12,328✔
396

397
  tqNotifyClose(pVnode->pImpl->pTq);
12,327✔
398

399
  dInfo("vgId:%d, wait for vnode stream queue:%p is empty, %d remains", pVnode->vgId,
12,307!
400
        pVnode->pStreamQ, taosQueueItemSize(pVnode->pStreamQ));
401
  while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10);
12,325✔
402

403
  dInfo("vgId:%d, wait for vnode stream ctrl queue:%p is empty", pVnode->vgId, pVnode->pStreamCtrlQ);
12,307!
404
  while (!taosQueueEmpty(pVnode->pStreamCtrlQ)) taosMsleep(10);
12,307!
405

406
  dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId);
12,307!
407

408
  dInfo("vgId:%d, post close", pVnode->vgId);
12,307!
409
  vnodePostClose(pVnode->pImpl);
12,307✔
410

411
  vmFreeQueue(pMgmt, pVnode);
12,307✔
412

413
  if (commitAndRemoveWal) {
12,307✔
414
    dInfo("vgId:%d, commit data for vnode split", pVnode->vgId);
32!
415
    if (vnodeSyncCommit(pVnode->pImpl) != 0) {
32!
UNCOV
416
      dError("vgId:%d, failed to commit data", pVnode->vgId);
×
417
    }
418
    if (vnodeBegin(pVnode->pImpl) != 0) {
32!
UNCOV
419
      dError("vgId:%d, failed to begin", pVnode->vgId);
×
420
    }
421
    dInfo("vgId:%d, commit data finished", pVnode->vgId);
32!
422
  }
423

424
  int32_t nodeId = vnodeNodeId(pVnode->pImpl);
12,307✔
425
  vnodeClose(pVnode->pImpl);
12,305✔
426
  pVnode->pImpl = NULL;
12,307✔
427

428
_closed:
12,307✔
429
  dInfo("vgId:%d, vnode is closed", pVnode->vgId);
12,307!
430

431
  if (commitAndRemoveWal) {
12,307✔
432
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d%swal", TD_DIRSEP, pVnode->vgId, TD_DIRSEP);
32✔
433
    dInfo("vgId:%d, remove all wals, path:%s", pVnode->vgId, path);
32!
434
    if (tfsRmdir(pMgmt->pTfs, path) != 0) {
32!
UNCOV
435
      dTrace("vgId:%d, failed to remove wals, path:%s", pVnode->vgId, path);
×
436
    }
437
    if (tfsMkdir(pMgmt->pTfs, path) != 0) {
32!
UNCOV
438
      dTrace("vgId:%d, failed to create wals, path:%s", pVnode->vgId, path);
×
439
    }
440
  }
441

442
  if (pVnode->dropped) {
12,307✔
443
    dInfo("vgId:%d, vnode is destroyed, dropped:%d", pVnode->vgId, pVnode->dropped);
4,568!
444
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId);
4,568✔
445
    vnodeDestroy(pVnode->vgId, path, pMgmt->pTfs, nodeId);
4,568✔
446
  }
447

448
  vmFreeVnodeObj(&pVnode);
12,307✔
449
}
450

451
void vmCloseFailedVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
×
452
  int32_t r = 0;
×
453
  r = taosThreadRwlockWrlock(&pMgmt->lock);
×
454
  if (r != 0) {
×
UNCOV
455
    dError("vgId:%d, failed to lock since %s", vgId, tstrerror(r));
×
456
  }
457
  if (r == 0) {
×
UNCOV
458
    vmUnRegisterRunningState(pMgmt, vgId);
×
459
  }
460
  r = taosThreadRwlockUnlock(&pMgmt->lock);
×
461
  if (r != 0) {
×
UNCOV
462
    dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r));
×
463
  }
UNCOV
464
}
×
465

466
static int32_t vmRestoreVgroupId(SWrapperCfg *pCfg, STfs *pTfs) {
×
467
  int32_t srcVgId = pCfg->vgId;
×
468
  int32_t dstVgId = pCfg->toVgId;
×
UNCOV
469
  if (dstVgId == 0) return 0;
×
470

471
  char srcPath[TSDB_FILENAME_LEN];
472
  char dstPath[TSDB_FILENAME_LEN];
473

474
  snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
×
UNCOV
475
  snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
×
476

477
  int32_t diskPrimary = pCfg->diskPrimary;
×
478
  int32_t vgId = vnodeRestoreVgroupId(srcPath, dstPath, srcVgId, dstVgId, diskPrimary, pTfs);
×
479
  if (vgId <= 0) {
×
480
    dError("vgId:%d, failed to restore vgroup id. srcPath: %s", pCfg->vgId, srcPath);
×
UNCOV
481
    return -1;
×
482
  }
483

484
  pCfg->vgId = vgId;
×
485
  pCfg->toVgId = 0;
×
UNCOV
486
  return 0;
×
487
}
488

489
static void *vmOpenVnodeInThread(void *param) {
1,007✔
490
  SVnodeThread *pThread = param;
1,007✔
491
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
1,007✔
492
  char          path[TSDB_FILENAME_LEN];
493

494
  dInfo("thread:%d, start to open or destroy %d vnodes", pThread->threadIndex, pThread->vnodeNum);
1,007!
495
  setThreadName("open-vnodes");
1,007✔
496

497
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
2,031✔
498
    SWrapperCfg *pCfg = &pThread->pCfgs[v];
1,024✔
499
    if (pCfg->dropped) {
1,024!
500
      char stepDesc[TSDB_STEP_DESC_LEN] = {0};
×
UNCOV
501
      snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to destroy, %d of %d have been dropped", pCfg->vgId,
×
502
               pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
UNCOV
503
      tmsgReportStartup("vnode-destroy", stepDesc);
×
504

505
      snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
×
506
      vnodeDestroy(pCfg->vgId, path, pMgmt->pTfs, 0);
×
507
      pThread->updateVnodesList = true;
×
508
      pThread->dropped++;
×
509
      (void)atomic_add_fetch_32(&pMgmt->state.dropVnodes, 1);
×
UNCOV
510
      continue;
×
511
    }
512

513
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
1,024✔
514
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
1,024✔
515
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
516
    tmsgReportStartup("vnode-open", stepDesc);
1,024✔
517

518
    if (pCfg->toVgId) {
1,024!
519
      if (vmRestoreVgroupId(pCfg, pMgmt->pTfs) != 0) {
×
520
        dError("vgId:%d, failed to restore vgroup id by thread:%d", pCfg->vgId, pThread->threadIndex);
×
521
        pThread->failed++;
×
UNCOV
522
        continue;
×
523
      }
UNCOV
524
      pThread->updateVnodesList = true;
×
525
    }
526

527
    int32_t diskPrimary = pCfg->diskPrimary;
1,024✔
528
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
1,024✔
529

530
    SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, false);
1,024✔
531

532
    if (pImpl == NULL) {
1,024!
533
      dError("vgId:%d, failed to open vnode by thread:%d since %s", pCfg->vgId, pThread->threadIndex, terrstr());
×
534
      if (terrno != TSDB_CODE_NEED_RETRY) {
×
535
        pThread->failed++;
×
UNCOV
536
        continue;
×
537
      }
538
    }
539

540
    if (vmOpenVnode(pMgmt, pCfg, pImpl) != 0) {
1,024!
541
      dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
×
542
      pThread->failed++;
×
UNCOV
543
      continue;
×
544
    }
545

546
    dInfo("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
1,024!
547
    pThread->opened++;
1,024✔
548
    (void)atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
1,024✔
549
  }
550

551
  dInfo("thread:%d, numOfVnodes:%d, opened:%d dropped:%d failed:%d", pThread->threadIndex, pThread->vnodeNum,
1,007!
552
        pThread->opened, pThread->dropped, pThread->failed);
553
  return NULL;
1,007✔
554
}
555

556
static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
2,267✔
557
  pMgmt->runngingHash =
2,267✔
558
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
2,267✔
559
  if (pMgmt->runngingHash == NULL) {
2,267!
560
    dError("failed to init vnode hash since %s", terrstr());
×
UNCOV
561
    return TSDB_CODE_OUT_OF_MEMORY;
×
562
  }
563

564
  pMgmt->closedHash =
2,267✔
565
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
2,267✔
566
  if (pMgmt->closedHash == NULL) {
2,267!
567
    dError("failed to init vnode closed hash since %s", terrstr());
×
UNCOV
568
    return TSDB_CODE_OUT_OF_MEMORY;
×
569
  }
570

571
  pMgmt->creatingHash =
2,267✔
572
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
2,267✔
573
  if (pMgmt->creatingHash == NULL) {
2,267!
574
    dError("failed to init vnode creatingHash hash since %s", terrstr());
×
UNCOV
575
    return TSDB_CODE_OUT_OF_MEMORY;
×
576
  }
577

578
  SWrapperCfg *pCfgs = NULL;
2,267✔
579
  int32_t      numOfVnodes = 0;
2,267✔
580
  if (vmGetVnodeListFromFile(pMgmt, &pCfgs, &numOfVnodes) != 0) {
2,267!
581
    dInfo("failed to get vnode list from disk since %s", terrstr());
×
UNCOV
582
    return -1;
×
583
  }
584

585
  pMgmt->state.totalVnodes = numOfVnodes;
2,267✔
586

587
  int32_t threadNum = tsNumOfCores / 2;
2,267✔
588
  if (threadNum < 1) threadNum = 1;
2,267!
589
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
2,267✔
590

591
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
2,267!
592
  if (threads == NULL) {
2,267!
593
    dError("failed to allocate memory for threads since %s", terrstr());
×
594
    taosMemoryFree(pCfgs);
×
UNCOV
595
    return terrno;
×
596
  }
597

598
  for (int32_t t = 0; t < threadNum; ++t) {
47,607✔
599
    threads[t].threadIndex = t;
45,340✔
600
    threads[t].pMgmt = pMgmt;
45,340✔
601
    threads[t].pCfgs = taosMemoryCalloc(vnodesPerThread, sizeof(SWrapperCfg));
45,340!
602
  }
603

604
  for (int32_t v = 0; v < numOfVnodes; ++v) {
3,291✔
605
    int32_t       t = v % threadNum;
1,024✔
606
    SVnodeThread *pThread = &threads[t];
1,024✔
607
    pThread->pCfgs[pThread->vnodeNum++] = pCfgs[v];
1,024✔
608
  }
609

610
  dInfo("open %d vnodes with %d threads", numOfVnodes, threadNum);
2,267!
611

612
  for (int32_t t = 0; t < threadNum; ++t) {
47,607✔
613
    SVnodeThread *pThread = &threads[t];
45,340✔
614
    if (pThread->vnodeNum == 0) continue;
45,340✔
615

616
    TdThreadAttr thAttr;
617
    (void)taosThreadAttrInit(&thAttr);
1,007✔
618
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,007✔
619
    if (taosThreadCreate(&pThread->thread, &thAttr, vmOpenVnodeInThread, pThread) != 0) {
1,007!
UNCOV
620
      dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
×
621
    }
622

623
    (void)taosThreadAttrDestroy(&thAttr);
1,007✔
624
  }
625

626
  bool updateVnodesList = false;
2,267✔
627

628
  for (int32_t t = 0; t < threadNum; ++t) {
47,607✔
629
    SVnodeThread *pThread = &threads[t];
45,340✔
630
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
45,340!
631
      (void)taosThreadJoin(pThread->thread, NULL);
1,007✔
632
      taosThreadClear(&pThread->thread);
1,007✔
633
    }
634
    taosMemoryFree(pThread->pCfgs);
45,340!
635
    if (pThread->updateVnodesList) updateVnodesList = true;
45,340!
636
  }
637
  taosMemoryFree(threads);
2,267!
638
  taosMemoryFree(pCfgs);
2,267!
639

640
  if ((pMgmt->state.openVnodes + pMgmt->state.dropVnodes) != pMgmt->state.totalVnodes) {
2,267!
641
    dError("there are total vnodes:%d, opened:%d", pMgmt->state.totalVnodes, pMgmt->state.openVnodes);
×
642
    terrno = TSDB_CODE_VND_INIT_FAILED;
×
UNCOV
643
    return -1;
×
644
  }
645

646
  if (updateVnodesList && vmWriteVnodeListToFile(pMgmt) != 0) {
2,267!
647
    dError("failed to write vnode list since %s", terrstr());
×
UNCOV
648
    return -1;
×
649
  }
650

651
  dInfo("successfully opened %d vnodes", pMgmt->state.totalVnodes);
2,267!
652
  return 0;
2,267✔
653
}
654

655
static void *vmCloseVnodeInThread(void *param) {
6,410✔
656
  SVnodeThread *pThread = param;
6,410✔
657
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
6,410✔
658

659
  dInfo("thread:%d, start to close %d vnodes", pThread->threadIndex, pThread->vnodeNum);
6,410!
660
  setThreadName("close-vnodes");
6,410✔
661

662
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
12,870✔
663
    SVnodeObj *pVnode = pThread->ppVnodes[v];
6,480✔
664

665
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
6,480✔
666
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to close, %d of %d have been closed", pVnode->vgId,
6,480✔
667
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
668
    tmsgReportStartup("vnode-close", stepDesc);
6,480✔
669

670
    vmCloseVnode(pMgmt, pVnode, false, false);
6,479✔
671
  }
672

673
  dInfo("thread:%d, numOfVnodes:%d is closed", pThread->threadIndex, pThread->vnodeNum);
6,390!
674
  return NULL;
6,390✔
675
}
676

677
static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
2,267✔
678
  int32_t code = 0;
2,267✔
679
  dInfo("start to close all vnodes");
2,267!
680
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
2,267✔
681
  dInfo("vnodes mgmt worker is stopped");
2,267!
682
  tSingleWorkerCleanup(&pMgmt->mgmtMultiWorker);
2,267✔
683
  dInfo("vnodes multiple mgmt worker is stopped");
2,267!
684

685
  int32_t     numOfVnodes = 0;
2,267✔
686
  SVnodeObj **ppVnodes = NULL;
2,267✔
687
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
2,267✔
688
  if (code != 0) {
2,267!
689
    dError("failed to get vnode list since %s", tstrerror(code));
×
UNCOV
690
    return;
×
691
  }
692

693
  int32_t threadNum = tsNumOfCores / 2;
2,267✔
694
  if (threadNum < 1) threadNum = 1;
2,267!
695
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
2,267✔
696

697
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
2,267!
698
  for (int32_t t = 0; t < threadNum; ++t) {
47,607✔
699
    threads[t].threadIndex = t;
45,340✔
700
    threads[t].pMgmt = pMgmt;
45,340✔
701
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
45,340!
702
  }
703

704
  for (int32_t v = 0; v < numOfVnodes; ++v) {
8,754✔
705
    int32_t       t = v % threadNum;
6,487✔
706
    SVnodeThread *pThread = &threads[t];
6,487✔
707
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
6,487!
708
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
6,487✔
709
    }
710
  }
711

712
  pMgmt->state.openVnodes = 0;
2,267✔
713
  dInfo("close %d vnodes with %d threads", numOfVnodes, threadNum);
2,267!
714

715
  int64_t st = taosGetTimestampMs();
2,267✔
716
  dInfo("notify all streams closed in all %d vnodes, ts:%" PRId64, numOfVnodes, st);
2,267!
717
  if (ppVnodes != NULL) {
2,267!
718
    for (int32_t i = 0; i < numOfVnodes; ++i) {
8,754✔
719
      if (ppVnodes[i] != NULL) {
6,487!
720
        if (ppVnodes[i]->pImpl != NULL) {
6,487!
721
          tqNotifyClose(ppVnodes[i]->pImpl->pTq);
6,487✔
722
        }
723
      }
724
    }
725
  }
726

727
  int64_t et = taosGetTimestampMs();
2,267✔
728
  dInfo("notify close stream completed in %d vnodes, elapsed time: %" PRId64 "ms", numOfVnodes, et - st);
2,267!
729

730
  for (int32_t t = 0; t < threadNum; ++t) {
47,607✔
731
    SVnodeThread *pThread = &threads[t];
45,340✔
732
    if (pThread->vnodeNum == 0) continue;
45,340✔
733

734
    TdThreadAttr thAttr;
735
    (void)taosThreadAttrInit(&thAttr);
6,410✔
736
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
6,410✔
737

738
    if (taosThreadCreate(&pThread->thread, &thAttr, vmCloseVnodeInThread, pThread) != 0) {
6,410!
UNCOV
739
      dError("thread:%d, failed to create thread to close vnode since %s", pThread->threadIndex, strerror(errno));
×
740
    }
741

742
    (void)taosThreadAttrDestroy(&thAttr);
6,410✔
743
  }
744

745
  for (int32_t t = 0; t < threadNum; ++t) {
47,587✔
746
    SVnodeThread *pThread = &threads[t];
45,321✔
747
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
45,321!
748
      (void)taosThreadJoin(pThread->thread, NULL);
6,391✔
749
      taosThreadClear(&pThread->thread);
6,390✔
750
    }
751
    taosMemoryFree(pThread->ppVnodes);
45,320!
752
  }
753
  taosMemoryFree(threads);
2,266!
754

755
  if (ppVnodes != NULL) {
2,266!
756
    taosMemoryFree(ppVnodes);
2,266!
757
  }
758

759
  if (pMgmt->runngingHash != NULL) {
2,266!
760
    taosHashCleanup(pMgmt->runngingHash);
2,266✔
761
    pMgmt->runngingHash = NULL;
2,266✔
762
  }
763

764
  void *pIter = taosHashIterate(pMgmt->closedHash, NULL);
2,266✔
765
  while (pIter) {
2,266!
766
    SVnodeObj **ppVnode = pIter;
×
767
    vmFreeVnodeObj(ppVnode);
×
UNCOV
768
    pIter = taosHashIterate(pMgmt->closedHash, pIter);
×
769
  }
770

771
  if (pMgmt->closedHash != NULL) {
2,266!
772
    taosHashCleanup(pMgmt->closedHash);
2,266✔
773
    pMgmt->closedHash = NULL;
2,266✔
774
  }
775

776
  pIter = taosHashIterate(pMgmt->creatingHash, NULL);
2,266✔
777
  while (pIter) {
2,266!
778
    SVnodeObj **ppVnode = pIter;
×
779
    vmFreeVnodeObj(ppVnode);
×
UNCOV
780
    pIter = taosHashIterate(pMgmt->creatingHash, pIter);
×
781
  }
782

783
  if (pMgmt->creatingHash != NULL) {
2,266!
784
    taosHashCleanup(pMgmt->creatingHash);
2,266✔
785
    pMgmt->creatingHash = NULL;
2,266✔
786
  }
787

788
  dInfo("total vnodes:%d are all closed", numOfVnodes);
2,266!
789
}
790

791
static void vmCleanup(SVnodeMgmt *pMgmt) {
2,267✔
792
  vmCloseVnodes(pMgmt);
2,267✔
793
  vmStopWorker(pMgmt);
2,266✔
794
  vnodeCleanup();
2,266✔
795
  (void)taosThreadRwlockDestroy(&pMgmt->lock);
2,266✔
796
  (void)taosThreadMutexDestroy(&pMgmt->mutex);
2,266✔
797
  (void)taosThreadMutexDestroy(&pMgmt->fileLock);
2,266✔
798
  taosMemoryFree(pMgmt);
2,266!
799
}
2,266✔
800

801
static void vmCheckSyncTimeout(SVnodeMgmt *pMgmt) {
2,930✔
802
  int32_t     code = 0;
2,930✔
803
  int32_t     numOfVnodes = 0;
2,930✔
804
  SVnodeObj **ppVnodes = NULL;
2,930✔
805
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
2,930✔
806
  if (code != 0) {
2,930!
807
    dError("failed to get vnode list since %s", tstrerror(code));
×
UNCOV
808
    return;
×
809
  }
810

811
  if (ppVnodes != NULL) {
2,930!
812
    for (int32_t i = 0; i < numOfVnodes; ++i) {
36,191✔
813
      SVnodeObj *pVnode = ppVnodes[i];
33,261✔
814
      if (!pVnode->failed) {
33,261!
815
        vnodeSyncCheckTimeout(pVnode->pImpl);
33,261✔
816
      }
817
      vmReleaseVnode(pMgmt, pVnode);
33,261✔
818
    }
819
    taosMemoryFree(ppVnodes);
2,930!
820
  }
821
}
822

823
static void *vmThreadFp(void *param) {
2,267✔
824
  SVnodeMgmt *pMgmt = param;
2,267✔
825
  int64_t     lastTime = 0;
2,267✔
826
  setThreadName("vnode-timer");
2,267✔
827

828
  while (1) {
1,118,112✔
829
    lastTime++;
1,120,379✔
830
    taosMsleep(100);
1,120,379✔
831
    if (pMgmt->stop) break;
1,120,379✔
832
    if (lastTime % 10 != 0) continue;
1,118,112✔
833

834
    int64_t sec = lastTime / 10;
110,791✔
835
    if (sec % (VNODE_TIMEOUT_SEC / 2) == 0) {
110,791✔
836
      vmCheckSyncTimeout(pMgmt);
2,930✔
837
    }
838
  }
839

840
  return NULL;
2,267✔
841
}
842

843
static int32_t vmInitTimer(SVnodeMgmt *pMgmt) {
2,267✔
844
  int32_t      code = 0;
2,267✔
845
  TdThreadAttr thAttr;
846
  (void)taosThreadAttrInit(&thAttr);
2,267✔
847
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
2,267✔
848
  if (taosThreadCreate(&pMgmt->thread, &thAttr, vmThreadFp, pMgmt) != 0) {
2,267!
849
    code = TAOS_SYSTEM_ERROR(errno);
×
850
    dError("failed to create vnode timer thread since %s", tstrerror(code));
×
UNCOV
851
    return code;
×
852
  }
853

854
  (void)taosThreadAttrDestroy(&thAttr);
2,267✔
855
  return 0;
2,267✔
856
}
857

858
static void vmCleanupTimer(SVnodeMgmt *pMgmt) {
2,267✔
859
  pMgmt->stop = true;
2,267✔
860
  if (taosCheckPthreadValid(pMgmt->thread)) {
2,267!
861
    (void)taosThreadJoin(pMgmt->thread, NULL);
2,267✔
862
    taosThreadClear(&pMgmt->thread);
2,267✔
863
  }
864
}
2,267✔
865

866
static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
2,267✔
867
  int32_t code = -1;
2,267✔
868

869
  SVnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SVnodeMgmt));
2,267!
870
  if (pMgmt == NULL) {
2,267!
871
    code = terrno;
×
UNCOV
872
    goto _OVER;
×
873
  }
874

875
  pMgmt->pData = pInput->pData;
2,267✔
876
  pMgmt->path = pInput->path;
2,267✔
877
  pMgmt->name = pInput->name;
2,267✔
878
  pMgmt->msgCb = pInput->msgCb;
2,267✔
879
  pMgmt->msgCb.putToQueueFp = (PutToQueueFp)vmPutRpcMsgToQueue;
2,267✔
880
  pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
2,267✔
881
  pMgmt->msgCb.mgmt = pMgmt;
2,267✔
882

883
  code = taosThreadRwlockInit(&pMgmt->lock, NULL);
2,267✔
884
  if (code != 0) {
2,267!
885
    code = TAOS_SYSTEM_ERROR(errno);
×
UNCOV
886
    goto _OVER;
×
887
  }
888

889
  code = taosThreadMutexInit(&pMgmt->mutex, NULL);
2,267✔
890
  if (code != 0) {
2,267!
891
    code = TAOS_SYSTEM_ERROR(errno);
×
UNCOV
892
    goto _OVER;
×
893
  }
894

895
  code = taosThreadMutexInit(&pMgmt->fileLock, NULL);
2,267✔
896
  if (code != 0) {
2,267!
897
    code = TAOS_SYSTEM_ERROR(errno);
×
UNCOV
898
    goto _OVER;
×
899
  }
900

901
  pMgmt->pTfs = pInput->pTfs;
2,267✔
902
  if (pMgmt->pTfs == NULL) {
2,267!
903
    dError("tfs is null.");
×
UNCOV
904
    goto _OVER;
×
905
  }
906
  tmsgReportStartup("vnode-tfs", "initialized");
2,267✔
907
  if ((code = walInit(pInput->stopDnodeFp)) != 0) {
2,267!
908
    dError("failed to init wal since %s", tstrerror(code));
×
UNCOV
909
    goto _OVER;
×
910
  }
911

912
  tmsgReportStartup("vnode-wal", "initialized");
2,267✔
913

914
  if ((code = syncInit()) != 0) {
2,267!
915
    dError("failed to open sync since %s", tstrerror(code));
×
UNCOV
916
    goto _OVER;
×
917
  }
918
  tmsgReportStartup("vnode-sync", "initialized");
2,267✔
919

920
  if ((code = vnodeInit(pInput->stopDnodeFp)) != 0) {
2,267!
921
    dError("failed to init vnode since %s", tstrerror(code));
×
UNCOV
922
    goto _OVER;
×
923
  }
924
  tmsgReportStartup("vnode-commit", "initialized");
2,267✔
925

926
  if ((code = vmStartWorker(pMgmt)) != 0) {
2,267!
927
    dError("failed to init workers since %s", tstrerror(code));
×
UNCOV
928
    goto _OVER;
×
929
  }
930
  tmsgReportStartup("vnode-worker", "initialized");
2,267✔
931

932
  if ((code = vmOpenVnodes(pMgmt)) != 0) {
2,267!
933
    dError("failed to open all vnodes since %s", tstrerror(code));
×
UNCOV
934
    goto _OVER;
×
935
  }
936
  tmsgReportStartup("vnode-vnodes", "initialized");
2,267✔
937

938
  if ((code = udfcOpen()) != 0) {
2,267!
939
    dError("failed to open udfc in vnode since %s", tstrerror(code));
×
UNCOV
940
    goto _OVER;
×
941
  }
942

943
  code = 0;
2,267✔
944

945
_OVER:
2,267✔
946
  if (code == 0) {
2,267!
947
    pOutput->pMgmt = pMgmt;
2,267✔
948
  } else {
949
    dError("failed to init vnodes-mgmt since %s", tstrerror(code));
×
UNCOV
950
    vmCleanup(pMgmt);
×
951
  }
952

953
  return code;
2,267✔
954
}
955

956
static int32_t vmRequire(const SMgmtInputOpt *pInput, bool *required) {
2,305✔
957
  *required = tsNumOfSupportVnodes > 0;
2,305✔
958
  return 0;
2,305✔
959
}
960

961
static void *vmRestoreVnodeInThread(void *param) {
1,007✔
962
  SVnodeThread *pThread = param;
1,007✔
963
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
1,007✔
964

965
  dInfo("thread:%d, start to restore %d vnodes", pThread->threadIndex, pThread->vnodeNum);
1,007!
966
  setThreadName("restore-vnodes");
1,007✔
967

968
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
2,031✔
969
    SVnodeObj *pVnode = pThread->ppVnodes[v];
1,024✔
970
    if (pVnode->failed) {
1,024!
971
      dError("vgId:%d, cannot restore a vnode in failed mode.", pVnode->vgId);
×
UNCOV
972
      continue;
×
973
    }
974

975
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
1,024✔
976
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been restored", pVnode->vgId,
1,024✔
977
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
978
    tmsgReportStartup("vnode-restore", stepDesc);
1,024✔
979

980
    int32_t code = vnodeStart(pVnode->pImpl);
1,024✔
981
    if (code != 0) {
1,024!
982
      dError("vgId:%d, failed to restore vnode by thread:%d", pVnode->vgId, pThread->threadIndex);
×
UNCOV
983
      pThread->failed++;
×
984
    } else {
985
      dInfo("vgId:%d, is restored by thread:%d", pVnode->vgId, pThread->threadIndex);
1,024!
986
      pThread->opened++;
1,024✔
987
      (void)atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
1,024✔
988
    }
989
  }
990

991
  dInfo("thread:%d, numOfVnodes:%d, restored:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
1,007!
992
        pThread->failed);
993
  return NULL;
1,007✔
994
}
995

996
static int32_t vmStartVnodes(SVnodeMgmt *pMgmt) {
2,267✔
997
  int32_t     code = 0;
2,267✔
998
  int32_t     numOfVnodes = 0;
2,267✔
999
  SVnodeObj **ppVnodes = NULL;
2,267✔
1000
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
2,267✔
1001
  if (code != 0) {
2,267!
1002
    dError("failed to get vnode list since %s", tstrerror(code));
×
UNCOV
1003
    return code;
×
1004
  }
1005

1006
  int32_t threadNum = tsNumOfCores / 2;
2,267✔
1007
  if (threadNum < 1) threadNum = 1;
2,267!
1008
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
2,267✔
1009

1010
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
2,267!
1011
  if (threads == NULL) {
2,267!
UNCOV
1012
    return terrno;
×
1013
  }
1014

1015
  for (int32_t t = 0; t < threadNum; ++t) {
47,607✔
1016
    threads[t].threadIndex = t;
45,340✔
1017
    threads[t].pMgmt = pMgmt;
45,340✔
1018
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
45,340!
1019
    if (threads[t].ppVnodes == NULL) {
45,340!
1020
      code = terrno;
×
UNCOV
1021
      break;
×
1022
    }
1023
  }
1024

1025
  for (int32_t v = 0; v < numOfVnodes; ++v) {
3,291✔
1026
    int32_t       t = v % threadNum;
1,024✔
1027
    SVnodeThread *pThread = &threads[t];
1,024✔
1028
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
1,024!
1029
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
1,024✔
1030
    }
1031
  }
1032

1033
  pMgmt->state.openVnodes = 0;
2,267✔
1034
  pMgmt->state.dropVnodes = 0;
2,267✔
1035
  dInfo("restore %d vnodes with %d threads", numOfVnodes, threadNum);
2,267!
1036

1037
  for (int32_t t = 0; t < threadNum; ++t) {
47,607✔
1038
    SVnodeThread *pThread = &threads[t];
45,340✔
1039
    if (pThread->vnodeNum == 0) continue;
45,340✔
1040

1041
    TdThreadAttr thAttr;
1042
    (void)taosThreadAttrInit(&thAttr);
1,007✔
1043
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,007✔
1044
    if (taosThreadCreate(&pThread->thread, &thAttr, vmRestoreVnodeInThread, pThread) != 0) {
1,007!
UNCOV
1045
      dError("thread:%d, failed to create thread to restore vnode since %s", pThread->threadIndex, strerror(errno));
×
1046
    }
1047

1048
    (void)taosThreadAttrDestroy(&thAttr);
1,007✔
1049
  }
1050

1051
  for (int32_t t = 0; t < threadNum; ++t) {
47,607✔
1052
    SVnodeThread *pThread = &threads[t];
45,340✔
1053
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
45,340!
1054
      (void)taosThreadJoin(pThread->thread, NULL);
1,007✔
1055
      taosThreadClear(&pThread->thread);
1,007✔
1056
    }
1057
    taosMemoryFree(pThread->ppVnodes);
45,340!
1058
  }
1059
  taosMemoryFree(threads);
2,267!
1060

1061
  for (int32_t i = 0; i < numOfVnodes; ++i) {
3,291✔
1062
    if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
1,024!
1063
    vmReleaseVnode(pMgmt, ppVnodes[i]);
1,024✔
1064
  }
1065

1066
  if (ppVnodes != NULL) {
2,267!
1067
    taosMemoryFree(ppVnodes);
2,267!
1068
  }
1069

1070
  return vmInitTimer(pMgmt);
2,267✔
1071

1072
_exit:
1073
  for (int32_t t = 0; t < threadNum; ++t) {
1074
    SVnodeThread *pThread = &threads[t];
1075
    taosMemoryFree(pThread->ppVnodes);
1076
  }
1077
  taosMemoryFree(threads);
1078
  return code;
1079
}
1080

1081
static void vmStop(SVnodeMgmt *pMgmt) { vmCleanupTimer(pMgmt); }
2,267✔
1082

1083
SMgmtFunc vmGetMgmtFunc() {
2,305✔
1084
  SMgmtFunc mgmtFunc = {0};
2,305✔
1085
  mgmtFunc.openFp = vmInit;
2,305✔
1086
  mgmtFunc.closeFp = (NodeCloseFp)vmCleanup;
2,305✔
1087
  mgmtFunc.startFp = (NodeStartFp)vmStartVnodes;
2,305✔
1088
  mgmtFunc.stopFp = (NodeStopFp)vmStop;
2,305✔
1089
  mgmtFunc.requiredFp = vmRequire;
2,305✔
1090
  mgmtFunc.getHandlesFp = vmGetMsgHandles;
2,305✔
1091

1092
  return mgmtFunc;
2,305✔
1093
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc