• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3535

23 Nov 2024 02:07AM UTC coverage: 60.85% (+0.03%) from 60.825%
#3535

push

travis-ci

web-flow
Merge pull request #28893 from taosdata/doc/internal

refact: rename taos lib name

120252 of 252737 branches covered (47.58%)

Branch coverage included in aggregate %.

201187 of 275508 relevant lines covered (73.02%)

15886166.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.04
/source/dnode/mgmt/mgmt_vnode/src/vmInt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "vmInt.h"
18
#include "libs/function/tudf.h"
19
#include "osMemory.h"
20
#include "tfs.h"
21
#include "vnd.h"
22

23
int32_t vmGetPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
11,060✔
24
  int32_t    diskId = -1;
11,060✔
25
  SVnodeObj *pVnode = NULL;
11,060✔
26

27
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
11,060✔
28
  int32_t r = taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
11,066✔
29
  if (pVnode != NULL) {
11,065!
30
    diskId = pVnode->diskPrimary;
×
31
  }
32
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
11,065✔
33
  return diskId;
11,065✔
34
}
35

36
int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
11,064✔
37
  int32_t code = 0;
11,064✔
38
  STfs   *pTfs = pMgmt->pTfs;
11,064✔
39
  int32_t diskId = 0;
11,064✔
40
  if (!pTfs) {
11,064!
41
    return diskId;
×
42
  }
43

44
  // search fs
45
  char vnodePath[TSDB_FILENAME_LEN] = {0};
11,064✔
46
  snprintf(vnodePath, TSDB_FILENAME_LEN - 1, "vnode%svnode%d", TD_DIRSEP, vgId);
11,064✔
47
  char fname[TSDB_FILENAME_LEN] = {0};
11,064✔
48
  char fnameTmp[TSDB_FILENAME_LEN] = {0};
11,064✔
49
  snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME);
11,064✔
50
  snprintf(fnameTmp, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME_TMP);
11,064✔
51

52
  diskId = tfsSearch(pTfs, 0, fname);
11,064✔
53
  if (diskId >= 0) {
11,036!
54
    return diskId;
×
55
  }
56
  diskId = tfsSearch(pTfs, 0, fnameTmp);
11,036✔
57
  if (diskId >= 0) {
11,057!
58
    return diskId;
×
59
  }
60

61
  // alloc
62
  int32_t     disks[TFS_MAX_DISKS_PER_TIER] = {0};
11,057✔
63
  int32_t     numOfVnodes = 0;
11,057✔
64
  SVnodeObj **ppVnodes = NULL;
11,057✔
65

66
  code = taosThreadMutexLock(&pMgmt->mutex);
11,057✔
67
  if (code != 0) {
11,066!
68
    return code;
×
69
  }
70

71
  code = vmGetAllVnodeListFromHashWithCreating(pMgmt, &numOfVnodes, &ppVnodes);
11,066✔
72
  if (code != 0) {
11,066!
73
    int32_t r = taosThreadMutexUnlock(&pMgmt->mutex);
×
74
    if (r != 0) {
×
75
      dError("vgId:%d, failed to unlock mutex since %s", vgId, tstrerror(r));
×
76
    }
77
    return code;
×
78
  }
79

80
  for (int32_t v = 0; v < numOfVnodes; v++) {
54,036✔
81
    SVnodeObj *pVnode = ppVnodes[v];
42,970✔
82
    disks[pVnode->diskPrimary] += 1;
42,970✔
83
  }
84

85
  int32_t minVal = INT_MAX;
11,066✔
86
  int32_t ndisk = tfsGetDisksAtLevel(pTfs, 0);
11,066✔
87
  diskId = 0;
11,066✔
88
  for (int32_t id = 0; id < ndisk; id++) {
22,377✔
89
    if (minVal > disks[id]) {
11,311✔
90
      minVal = disks[id];
11,110✔
91
      diskId = id;
11,110✔
92
    }
93
  }
94

95
  SVnodeObj *pCreatingVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
11,066✔
96
  if (pCreatingVnode == NULL) {
11,066!
97
    code = -1;
×
98
    if (terrno != 0) code = terrno;
×
99
    dError("failed to alloc vnode since %s", tstrerror(code));
×
100
    int32_t r = taosThreadMutexUnlock(&pMgmt->mutex);
×
101
    if (r != 0) {
×
102
      dError("vgId:%d, failed to unlock mutex since %s", vgId, tstrerror(r));
×
103
    }
104
    goto _OVER;
×
105
  }
106
  (void)memset(pCreatingVnode, 0, sizeof(SVnodeObj));
11,066✔
107

108
  pCreatingVnode->vgId = vgId;
11,066✔
109
  pCreatingVnode->diskPrimary = diskId;
11,066✔
110

111
  code = taosThreadRwlockWrlock(&pMgmt->lock);
11,066✔
112
  if (code != 0) {
11,066!
113
    int32_t r = taosThreadMutexUnlock(&pMgmt->mutex);
×
114
    if (r != 0) {
×
115
      dError("vgId:%d, failed to unlock mutex since %s", vgId, tstrerror(r));
×
116
    }
117
    taosMemoryFree(pCreatingVnode);
×
118
    goto _OVER;
×
119
  }
120

121
  dTrace("vgId:%d, put vnode into creating hash, pCreatingVnode:%p", vgId, pCreatingVnode);
11,066✔
122
  code = taosHashPut(pMgmt->creatingHash, &vgId, sizeof(int32_t), &pCreatingVnode, sizeof(SVnodeObj *));
11,066✔
123
  if (code != 0) {
11,066!
124
    dError("vgId:%d, failed to put vnode to creatingHash", vgId);
×
125
    taosMemoryFree(pCreatingVnode);
×
126
  }
127

128
  int32_t r = taosThreadRwlockUnlock(&pMgmt->lock);
11,066✔
129
  if (r != 0) {
11,066!
130
    dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r));
×
131
  }
132

133
  code = taosThreadMutexUnlock(&pMgmt->mutex);
11,066✔
134
  if (code != 0) {
11,066!
135
    goto _OVER;
×
136
  }
137

138
_OVER:
11,066✔
139

140
  for (int32_t i = 0; i < numOfVnodes; ++i) {
54,036✔
141
    if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
42,970!
142
    vmReleaseVnode(pMgmt, ppVnodes[i]);
42,970✔
143
  }
144
  if (ppVnodes != NULL) {
11,066!
145
    taosMemoryFree(ppVnodes);
11,066✔
146
  }
147

148
  if (code != 0) {
11,066!
149
    dError("vgId:%d, failed to alloc disk since %s", vgId, tstrerror(code));
×
150
    return code;
×
151
  } else {
152
    dInfo("vgId:%d, alloc disk:%d of level 0. ndisk:%d, vnodes: %d", vgId, diskId, ndisk, numOfVnodes);
11,066!
153
    return diskId;
11,066✔
154
  }
155
}
156

157
SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict) {
46,274,054✔
158
  SVnodeObj *pVnode = NULL;
46,274,054✔
159

160
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
46,274,054✔
161
  int32_t r = taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
46,348,928✔
162
  if (pVnode == NULL || strict && (pVnode->dropped || pVnode->failed)) {
46,328,886!
163
    terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
47,755✔
164
    pVnode = NULL;
55,443✔
165
  } else {
166
    int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
46,281,131✔
167
    dTrace("vgId:%d, acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
46,282,264✔
168
  }
169
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
46,337,711✔
170

171
  return pVnode;
46,344,587✔
172
}
173

174
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) { return vmAcquireVnodeImpl(pMgmt, vgId, true); }
46,225,291✔
175

176
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
46,467,018✔
177
  if (pVnode == NULL) return;
46,467,018!
178

179
  //(void)taosThreadRwlockRdlock(&pMgmt->lock);
180
  int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
46,467,018✔
181
  dTrace("vgId:%d, release vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
46,493,756✔
182
  //(void)taosThreadRwlockUnlock(&pMgmt->lock);
183
}
184

185
static void vmFreeVnodeObj(SVnodeObj **ppVnode) {
26,166✔
186
  if (!ppVnode || !(*ppVnode)) return;
26,166!
187

188
  SVnodeObj *pVnode = *ppVnode;
26,166✔
189

190
  int32_t refCount = atomic_load_32(&pVnode->refCount);
26,166✔
191
  while (refCount > 0) {
26,166!
192
    dWarn("vgId:%d, vnode is refenced, retry to free in 200ms, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
×
193
    taosMsleep(200);
×
194
    refCount = atomic_load_32(&pVnode->refCount);
×
195
  }
196

197
  taosMemoryFree(pVnode->path);
26,166✔
198
  taosMemoryFree(pVnode);
26,166✔
199
  ppVnode[0] = NULL;
26,166✔
200
}
201

202
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
13,659✔
203
  SVnodeObj *pVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
13,659✔
204
  if (pVnode == NULL) {
13,659!
205
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
206
    return -1;
×
207
  }
208

209
  pVnode->vgId = pCfg->vgId;
13,659✔
210
  pVnode->vgVersion = pCfg->vgVersion;
13,659✔
211
  pVnode->diskPrimary = pCfg->diskPrimary;
13,659✔
212
  pVnode->refCount = 0;
13,659✔
213
  pVnode->dropped = 0;
13,659✔
214
  pVnode->failed = 0;
13,659✔
215
  pVnode->path = taosStrdup(pCfg->path);
13,659✔
216
  pVnode->pImpl = pImpl;
13,658✔
217

218
  if (pVnode->path == NULL) {
13,658!
219
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
220
    taosMemoryFree(pVnode);
×
221
    return -1;
×
222
  }
223

224
  if (pImpl) {
13,658!
225
    if (vmAllocQueue(pMgmt, pVnode) != 0) {
13,658!
226
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
227
      taosMemoryFree(pVnode->path);
×
228
      taosMemoryFree(pVnode);
×
229
      return -1;
×
230
    }
231
  } else {
232
    pVnode->failed = 1;
×
233
  }
234

235
  (void)taosThreadRwlockWrlock(&pMgmt->lock);
13,659✔
236
  SVnodeObj *pOld = NULL;
13,659✔
237
  int32_t    r = taosHashGetDup(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
13,659✔
238
  if (r != 0) {
13,659!
239
    dError("vgId:%d, failed to get vnode from hash", pVnode->vgId);
×
240
  }
241
  if (pOld) {
13,659!
242
    vmFreeVnodeObj(&pOld);
×
243
  }
244
  int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
13,659✔
245

246
  pOld = NULL;
13,659✔
247
  r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
13,659✔
248
  if (r != 0) {
13,659!
249
    dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
×
250
  }
251
  if (pOld) {
13,659✔
252
    vmFreeVnodeObj(&pOld);
1,441✔
253
  }
254

255
  dInfo("vgId:%d, remove from closedHash", pVnode->vgId);
13,659!
256
  r = taosHashRemove(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t));
13,659✔
257
  if (r != 0) {
13,659✔
258
    dError("vgId:%d, failed to remove vnode from hash", pVnode->vgId);
12,218!
259
  }
260
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
13,659✔
261

262
  return code;
13,659✔
263
}
264

265
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal, bool keepClosed) {
13,659✔
266
  char path[TSDB_FILENAME_LEN] = {0};
13,659✔
267
  bool atExit = true;
13,659✔
268

269
  if (pVnode->pImpl && vnodeIsLeader(pVnode->pImpl)) {
13,659✔
270
    vnodeProposeCommitOnNeed(pVnode->pImpl, atExit);
11,490✔
271
  }
272

273
  (void)taosThreadRwlockWrlock(&pMgmt->lock);
13,658✔
274
  int32_t r = taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
13,659✔
275
  if (r != 0) {
13,659!
276
    dError("vgId:%d, failed to remove vnode from hash", pVnode->vgId);
×
277
  }
278
  if (keepClosed) {
13,659✔
279
    SVnodeObj *pClosedVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
1,441✔
280
    if (pClosedVnode == NULL) {
1,441!
281
      dError("failed to alloc vnode since %s", terrstr());
×
282
      (void)taosThreadRwlockUnlock(&pMgmt->lock);
×
283
      return;
×
284
    }
285
    (void)memset(pClosedVnode, 0, sizeof(SVnodeObj));
1,441✔
286

287
    pClosedVnode->vgId = pVnode->vgId;
1,441✔
288
    pClosedVnode->dropped = pVnode->dropped;
1,441✔
289
    pClosedVnode->vgVersion = pVnode->vgVersion;
1,441✔
290
    pClosedVnode->diskPrimary = pVnode->diskPrimary;
1,441✔
291
    pClosedVnode->toVgId = pVnode->toVgId;
1,441✔
292

293
    SVnodeObj *pOld = NULL;
1,441✔
294
    r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
1,441✔
295
    if (r != 0) {
1,441!
296
      dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
×
297
    }
298
    if (pOld) {
1,441!
299
      vmFreeVnodeObj(&pOld);
×
300
    }
301
    dInfo("vgId:%d, put vnode to closedHash", pVnode->vgId);
1,441!
302
    r = taosHashPut(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), &pClosedVnode, sizeof(SVnodeObj *));
1,441✔
303
    if (r != 0) {
1,441!
304
      dError("vgId:%d, failed to put vnode to closedHash", pVnode->vgId);
×
305
    }
306
  }
307
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
13,659✔
308

309
  vmReleaseVnode(pMgmt, pVnode);
13,659✔
310

311
  if (pVnode->failed) {
13,659!
312
    goto _closed;
×
313
  }
314
  dInfo("vgId:%d, pre close", pVnode->vgId);
13,659!
315
  vnodePreClose(pVnode->pImpl);
13,659✔
316

317
  dInfo("vgId:%d, wait for vnode ref become 0", pVnode->vgId);
13,653!
318
  while (pVnode->refCount > 0) taosMsleep(10);
13,660✔
319

320
  dInfo("vgId:%d, wait for vnode write queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteW.queue,
13,659!
321
        taosQueueGetThreadId(pVnode->pWriteW.queue));
322
  tMultiWorkerCleanup(&pVnode->pWriteW);
13,659✔
323

324
  dInfo("vgId:%d, wait for vnode sync queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue,
13,659!
325
        taosQueueGetThreadId(pVnode->pSyncW.queue));
326
  tMultiWorkerCleanup(&pVnode->pSyncW);
13,659✔
327

328
  dInfo("vgId:%d, wait for vnode sync rd queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncRdW.queue,
13,658!
329
        taosQueueGetThreadId(pVnode->pSyncRdW.queue));
330
  tMultiWorkerCleanup(&pVnode->pSyncRdW);
13,658✔
331

332
  dInfo("vgId:%d, wait for vnode apply queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue,
13,659!
333
        taosQueueGetThreadId(pVnode->pApplyW.queue));
334
  tMultiWorkerCleanup(&pVnode->pApplyW);
13,659✔
335

336
  dInfo("vgId:%d, wait for vnode fetch queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
13,659!
337
        taosQueueGetThreadId(pVnode->pFetchQ));
338
  while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
13,659!
339

340
  dInfo("vgId:%d, wait for vnode query queue:%p is empty", pVnode->vgId, pVnode->pQueryQ);
13,659!
341
  while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
13,659!
342

343
  tqNotifyClose(pVnode->pImpl->pTq);
13,659✔
344
  dInfo("vgId:%d, wait for vnode stream queue:%p is empty", pVnode->vgId, pVnode->pStreamQ);
13,659!
345
  while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10);
13,676✔
346

347
  dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId);
13,659!
348

349
  dInfo("vgId:%d, post close", pVnode->vgId);
13,659!
350
  vnodePostClose(pVnode->pImpl);
13,659✔
351

352
  vmFreeQueue(pMgmt, pVnode);
13,659✔
353

354
  if (commitAndRemoveWal) {
13,659✔
355
    dInfo("vgId:%d, commit data for vnode split", pVnode->vgId);
86!
356
    if (vnodeSyncCommit(pVnode->pImpl) != 0) {
86!
357
      dError("vgId:%d, failed to commit data", pVnode->vgId);
×
358
    }
359
    if (vnodeBegin(pVnode->pImpl) != 0) {
86!
360
      dError("vgId:%d, failed to begin", pVnode->vgId);
×
361
    }
362
    dInfo("vgId:%d, commit data finished", pVnode->vgId);
86!
363
  }
364

365
  int32_t nodeId = vnodeNodeId(pVnode->pImpl);
13,659✔
366
  vnodeClose(pVnode->pImpl);
13,659✔
367
  pVnode->pImpl = NULL;
13,659✔
368

369
_closed:
13,659✔
370
  dInfo("vgId:%d, vnode is closed", pVnode->vgId);
13,659!
371

372
  if (commitAndRemoveWal) {
13,659✔
373
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d%swal", TD_DIRSEP, pVnode->vgId, TD_DIRSEP);
86✔
374
    dInfo("vgId:%d, remove all wals, path:%s", pVnode->vgId, path);
86!
375
    if (tfsRmdir(pMgmt->pTfs, path) != 0) {
86!
376
      dTrace("vgId:%d, failed to remove wals, path:%s", pVnode->vgId, path);
×
377
    }
378
    if (tfsMkdir(pMgmt->pTfs, path) != 0) {
86!
379
      dTrace("vgId:%d, failed to create wals, path:%s", pVnode->vgId, path);
×
380
    }
381
  }
382

383
  if (pVnode->dropped) {
13,659✔
384
    dInfo("vgId:%d, vnode is destroyed, dropped:%d", pVnode->vgId, pVnode->dropped);
5,114!
385
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId);
5,114✔
386
    vnodeDestroy(pVnode->vgId, path, pMgmt->pTfs, nodeId);
5,114✔
387
  }
388

389
  vmFreeVnodeObj(&pVnode);
13,659✔
390
}
391

392
static int32_t vmRestoreVgroupId(SWrapperCfg *pCfg, STfs *pTfs) {
×
393
  int32_t srcVgId = pCfg->vgId;
×
394
  int32_t dstVgId = pCfg->toVgId;
×
395
  if (dstVgId == 0) return 0;
×
396

397
  char srcPath[TSDB_FILENAME_LEN];
398
  char dstPath[TSDB_FILENAME_LEN];
399

400
  snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
×
401
  snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
×
402

403
  int32_t diskPrimary = pCfg->diskPrimary;
×
404
  int32_t vgId = vnodeRestoreVgroupId(srcPath, dstPath, srcVgId, dstVgId, diskPrimary, pTfs);
×
405
  if (vgId <= 0) {
×
406
    dError("vgId:%d, failed to restore vgroup id. srcPath: %s", pCfg->vgId, srcPath);
×
407
    return -1;
×
408
  }
409

410
  pCfg->vgId = vgId;
×
411
  pCfg->toVgId = 0;
×
412
  return 0;
×
413
}
414

415
static void *vmOpenVnodeInThread(void *param) {
1,066✔
416
  SVnodeThread *pThread = param;
1,066✔
417
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
1,066✔
418
  char          path[TSDB_FILENAME_LEN];
419

420
  dInfo("thread:%d, start to open or destroy %d vnodes", pThread->threadIndex, pThread->vnodeNum);
1,066!
421
  setThreadName("open-vnodes");
1,066✔
422

423
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
2,132✔
424
    SWrapperCfg *pCfg = &pThread->pCfgs[v];
1,066✔
425
    if (pCfg->dropped) {
1,066!
426
      char stepDesc[TSDB_STEP_DESC_LEN] = {0};
×
427
      snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to destroy, %d of %d have been dropped", pCfg->vgId,
×
428
               pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
429
      tmsgReportStartup("vnode-destroy", stepDesc);
×
430

431
      snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
×
432
      vnodeDestroy(pCfg->vgId, path, pMgmt->pTfs, 0);
×
433
      pThread->updateVnodesList = true;
×
434
      pThread->dropped++;
×
435
      (void)atomic_add_fetch_32(&pMgmt->state.dropVnodes, 1);
×
436
      continue;
×
437
    }
438

439
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
1,066✔
440
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
1,066✔
441
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
442
    tmsgReportStartup("vnode-open", stepDesc);
1,066✔
443

444
    if (pCfg->toVgId) {
1,066!
445
      if (vmRestoreVgroupId(pCfg, pMgmt->pTfs) != 0) {
×
446
        dError("vgId:%d, failed to restore vgroup id by thread:%d", pCfg->vgId, pThread->threadIndex);
×
447
        pThread->failed++;
×
448
        continue;
×
449
      }
450
      pThread->updateVnodesList = true;
×
451
    }
452

453
    int32_t diskPrimary = pCfg->diskPrimary;
1,066✔
454
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
1,066✔
455

456
    SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, false);
1,066✔
457

458
    if (pImpl == NULL) {
1,066!
459
      dError("vgId:%d, failed to open vnode by thread:%d since %s", pCfg->vgId, pThread->threadIndex, terrstr());
×
460
      if (terrno != TSDB_CODE_NEED_RETRY) {
×
461
        pThread->failed++;
×
462
        continue;
×
463
      }
464
    }
465

466
    if (vmOpenVnode(pMgmt, pCfg, pImpl) != 0) {
1,066!
467
      dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
×
468
      pThread->failed++;
×
469
      continue;
×
470
    }
471

472
    dInfo("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
1,066!
473
    pThread->opened++;
1,066✔
474
    (void)atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
1,066✔
475
  }
476

477
  dInfo("thread:%d, numOfVnodes:%d, opened:%d dropped:%d failed:%d", pThread->threadIndex, pThread->vnodeNum,
1,066!
478
        pThread->opened, pThread->dropped, pThread->failed);
479
  return NULL;
1,066✔
480
}
481

482
static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
2,400✔
483
  pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
2,400✔
484
  if (pMgmt->hash == NULL) {
2,400!
485
    dError("failed to init vnode hash since %s", terrstr());
×
486
    return TSDB_CODE_OUT_OF_MEMORY;
×
487
  }
488

489
  pMgmt->closedHash =
2,400✔
490
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
2,400✔
491
  if (pMgmt->closedHash == NULL) {
2,400!
492
    dError("failed to init vnode closed hash since %s", terrstr());
×
493
    return TSDB_CODE_OUT_OF_MEMORY;
×
494
  }
495

496
  pMgmt->creatingHash =
2,400✔
497
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
2,400✔
498
  if (pMgmt->creatingHash == NULL) {
2,400!
499
    dError("failed to init vnode creatingHash hash since %s", terrstr());
×
500
    return TSDB_CODE_OUT_OF_MEMORY;
×
501
  }
502

503
  SWrapperCfg *pCfgs = NULL;
2,400✔
504
  int32_t      numOfVnodes = 0;
2,400✔
505
  if (vmGetVnodeListFromFile(pMgmt, &pCfgs, &numOfVnodes) != 0) {
2,400!
506
    dInfo("failed to get vnode list from disk since %s", terrstr());
×
507
    return -1;
×
508
  }
509

510
  pMgmt->state.totalVnodes = numOfVnodes;
2,400✔
511

512
  int32_t threadNum = tsNumOfCores / 2;
2,400✔
513
  if (threadNum < 1) threadNum = 1;
2,400!
514
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
2,400✔
515

516
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
2,400✔
517
  if (threads == NULL) {
2,400!
518
    dError("failed to allocate memory for threads since %s", terrstr());
×
519
    taosMemoryFree(pCfgs);
×
520
    return terrno;
×
521
  }
522

523
  for (int32_t t = 0; t < threadNum; ++t) {
50,400✔
524
    threads[t].threadIndex = t;
48,000✔
525
    threads[t].pMgmt = pMgmt;
48,000✔
526
    threads[t].pCfgs = taosMemoryCalloc(vnodesPerThread, sizeof(SWrapperCfg));
48,000✔
527
  }
528

529
  for (int32_t v = 0; v < numOfVnodes; ++v) {
3,466✔
530
    int32_t       t = v % threadNum;
1,066✔
531
    SVnodeThread *pThread = &threads[t];
1,066✔
532
    pThread->pCfgs[pThread->vnodeNum++] = pCfgs[v];
1,066✔
533
  }
534

535
  dInfo("open %d vnodes with %d threads", numOfVnodes, threadNum);
2,400!
536

537
  for (int32_t t = 0; t < threadNum; ++t) {
50,400✔
538
    SVnodeThread *pThread = &threads[t];
48,000✔
539
    if (pThread->vnodeNum == 0) continue;
48,000✔
540

541
    TdThreadAttr thAttr;
542
    (void)taosThreadAttrInit(&thAttr);
1,066✔
543
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,066✔
544
    if (taosThreadCreate(&pThread->thread, &thAttr, vmOpenVnodeInThread, pThread) != 0) {
1,066!
545
      dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
×
546
    }
547

548
    (void)taosThreadAttrDestroy(&thAttr);
1,066✔
549
  }
550

551
  bool updateVnodesList = false;
2,400✔
552

553
  for (int32_t t = 0; t < threadNum; ++t) {
50,400✔
554
    SVnodeThread *pThread = &threads[t];
48,000✔
555
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
48,000!
556
      (void)taosThreadJoin(pThread->thread, NULL);
1,066✔
557
      taosThreadClear(&pThread->thread);
1,066✔
558
    }
559
    taosMemoryFree(pThread->pCfgs);
48,000✔
560
    if (pThread->updateVnodesList) updateVnodesList = true;
48,000!
561
  }
562
  taosMemoryFree(threads);
2,400✔
563
  taosMemoryFree(pCfgs);
2,400✔
564

565
  if ((pMgmt->state.openVnodes + pMgmt->state.dropVnodes) != pMgmt->state.totalVnodes) {
2,400!
566
    dError("there are total vnodes:%d, opened:%d", pMgmt->state.totalVnodes, pMgmt->state.openVnodes);
×
567
    terrno = TSDB_CODE_VND_INIT_FAILED;
×
568
    return -1;
×
569
  }
570

571
  if (updateVnodesList && vmWriteVnodeListToFile(pMgmt) != 0) {
2,400!
572
    dError("failed to write vnode list since %s", terrstr());
×
573
    return -1;
×
574
  }
575

576
  dInfo("successfully opened %d vnodes", pMgmt->state.totalVnodes);
2,400!
577
  return 0;
2,400✔
578
}
579

580
void vmRemoveFromCreatingHash(SVnodeMgmt *pMgmt, int32_t vgId) {
11,066✔
581
  SVnodeObj *pOld = NULL;
11,066✔
582

583
  (void)taosThreadRwlockWrlock(&pMgmt->lock);
11,066✔
584
  int32_t    r = taosHashGetDup(pMgmt->creatingHash, &vgId, sizeof(int32_t), (void *)&pOld);
11,066✔
585
  if (r != 0) {
11,066!
586
    dError("vgId:%d, failed to get vnode from creating Hash", vgId);
×
587
  }
588
  dTrace("vgId:%d, remove from creating Hash", vgId);
11,066✔
589
  r = taosHashRemove(pMgmt->creatingHash, &vgId, sizeof(int32_t));
11,066✔
590
  if (r != 0) {
11,066!
591
    dError("vgId:%d, failed to remove vnode from hash", vgId);
×
592
  }
593
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
11,066✔
594

595
  if (pOld) {
11,066!
596
    dTrace("vgId:%d, free vnode pOld:%p", vgId, &pOld);
11,066✔
597
    vmFreeVnodeObj(&pOld);
11,066✔
598
  }
599

600
_OVER:
×
601
  if (r != 0) {
11,066!
602
    dError("vgId:%d, failed to remove vnode from creatingHash since %s", vgId, tstrerror(r));
×
603
  }
604
}
11,066✔
605

606
static void *vmCloseVnodeInThread(void *param) {
6,968✔
607
  SVnodeThread *pThread = param;
6,968✔
608
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
6,968✔
609

610
  dInfo("thread:%d, start to close %d vnodes", pThread->threadIndex, pThread->vnodeNum);
6,968✔
611
  setThreadName("close-vnodes");
6,970✔
612

613
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
13,987✔
614
    SVnodeObj *pVnode = pThread->ppVnodes[v];
7,018✔
615

616
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
7,018✔
617
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to close, %d of %d have been closed", pVnode->vgId,
7,018✔
618
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
619
    tmsgReportStartup("vnode-close", stepDesc);
7,018✔
620

621
    vmCloseVnode(pMgmt, pVnode, false, false);
7,017✔
622
  }
623

624
  dInfo("thread:%d, numOfVnodes:%d is closed", pThread->threadIndex, pThread->vnodeNum);
6,969!
625
  return NULL;
6,969✔
626
}
627

628
static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
2,400✔
629
  int32_t code = 0;
2,400✔
630
  dInfo("start to close all vnodes");
2,400!
631
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
2,400✔
632
  dInfo("vnodes mgmt worker is stopped");
2,400!
633
  tSingleWorkerCleanup(&pMgmt->mgmtMultiWorker);
2,400✔
634
  dInfo("vnodes multiple mgmt worker is stopped");
2,400!
635

636
  int32_t     numOfVnodes = 0;
2,400✔
637
  SVnodeObj **ppVnodes = NULL;
2,400✔
638
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
2,400✔
639
  if (code != 0) {
2,400!
640
    dError("failed to get vnode list since %s", tstrerror(code));
×
641
    return;
×
642
  }
643

644
  int32_t threadNum = tsNumOfCores / 2;
2,400✔
645
  if (threadNum < 1) threadNum = 1;
2,400!
646
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
2,400✔
647

648
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
2,400✔
649
  for (int32_t t = 0; t < threadNum; ++t) {
50,400✔
650
    threads[t].threadIndex = t;
48,000✔
651
    threads[t].pMgmt = pMgmt;
48,000✔
652
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
48,000✔
653
  }
654

655
  for (int32_t v = 0; v < numOfVnodes; ++v) {
9,418✔
656
    int32_t       t = v % threadNum;
7,018✔
657
    SVnodeThread *pThread = &threads[t];
7,018✔
658
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
7,018!
659
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
7,018✔
660
    }
661
  }
662

663
  pMgmt->state.openVnodes = 0;
2,400✔
664
  dInfo("close %d vnodes with %d threads", numOfVnodes, threadNum);
2,400!
665

666
  for (int32_t t = 0; t < threadNum; ++t) {
50,400✔
667
    SVnodeThread *pThread = &threads[t];
48,000✔
668
    if (pThread->vnodeNum == 0) continue;
48,000✔
669

670
    TdThreadAttr thAttr;
671
    (void)taosThreadAttrInit(&thAttr);
6,969✔
672
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
6,969✔
673
    if (taosThreadCreate(&pThread->thread, &thAttr, vmCloseVnodeInThread, pThread) != 0) {
6,969!
674
      dError("thread:%d, failed to create thread to close vnode since %s", pThread->threadIndex, strerror(errno));
×
675
    }
676

677
    (void)taosThreadAttrDestroy(&thAttr);
6,969✔
678
  }
679

680
  for (int32_t t = 0; t < threadNum; ++t) {
50,400✔
681
    SVnodeThread *pThread = &threads[t];
48,000✔
682
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
48,000!
683
      (void)taosThreadJoin(pThread->thread, NULL);
6,969✔
684
      taosThreadClear(&pThread->thread);
6,969✔
685
    }
686
    taosMemoryFree(pThread->ppVnodes);
48,000✔
687
  }
688
  taosMemoryFree(threads);
2,400✔
689

690
  if (ppVnodes != NULL) {
2,400!
691
    taosMemoryFree(ppVnodes);
2,400✔
692
  }
693

694
  if (pMgmt->hash != NULL) {
2,400!
695
    taosHashCleanup(pMgmt->hash);
2,400✔
696
    pMgmt->hash = NULL;
2,400✔
697
  }
698

699
  void *pIter = taosHashIterate(pMgmt->closedHash, NULL);
2,400✔
700
  while (pIter) {
2,400!
701
    SVnodeObj **ppVnode = pIter;
×
702
    vmFreeVnodeObj(ppVnode);
×
703
    pIter = taosHashIterate(pMgmt->closedHash, pIter);
×
704
  }
705

706
  if (pMgmt->closedHash != NULL) {
2,400!
707
    taosHashCleanup(pMgmt->closedHash);
2,400✔
708
    pMgmt->closedHash = NULL;
2,400✔
709
  }
710

711
  pIter = taosHashIterate(pMgmt->creatingHash, NULL);
2,400✔
712
  while (pIter) {
2,400!
713
    SVnodeObj **ppVnode = pIter;
×
714
    vmFreeVnodeObj(ppVnode);
×
715
    pIter = taosHashIterate(pMgmt->creatingHash, pIter);
×
716
  }
717

718
  if (pMgmt->creatingHash != NULL) {
2,400!
719
    taosHashCleanup(pMgmt->creatingHash);
2,400✔
720
    pMgmt->creatingHash = NULL;
2,400✔
721
  }
722

723
  dInfo("total vnodes:%d are all closed", numOfVnodes);
2,400!
724
}
725

726
static void vmCleanup(SVnodeMgmt *pMgmt) {
2,400✔
727
  vmCloseVnodes(pMgmt);
2,400✔
728
  vmStopWorker(pMgmt);
2,400✔
729
  vnodeCleanup();
2,400✔
730
  (void)taosThreadRwlockDestroy(&pMgmt->lock);
2,400✔
731
  (void)taosThreadMutexDestroy(&pMgmt->mutex);
2,400✔
732
  (void)taosThreadMutexDestroy(&pMgmt->fileLock);
2,400✔
733
  taosMemoryFree(pMgmt);
2,400✔
734
}
2,400✔
735

736
static void vmCheckSyncTimeout(SVnodeMgmt *pMgmt) {
2,750✔
737
  int32_t     code = 0;
2,750✔
738
  int32_t     numOfVnodes = 0;
2,750✔
739
  SVnodeObj **ppVnodes = NULL;
2,750✔
740
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
2,750✔
741
  if (code != 0) {
2,750!
742
    dError("failed to get vnode list since %s", tstrerror(code));
×
743
    return;
×
744
  }
745

746
  if (ppVnodes != NULL) {
2,750!
747
    for (int32_t i = 0; i < numOfVnodes; ++i) {
34,464✔
748
      SVnodeObj *pVnode = ppVnodes[i];
31,714✔
749
      if (!pVnode->failed) {
31,714!
750
        vnodeSyncCheckTimeout(pVnode->pImpl);
31,714✔
751
      }
752
      vmReleaseVnode(pMgmt, pVnode);
31,714✔
753
    }
754
    taosMemoryFree(ppVnodes);
2,750✔
755
  }
756
}
757

758
static void *vmThreadFp(void *param) {
2,400✔
759
  SVnodeMgmt *pMgmt = param;
2,400✔
760
  int64_t     lastTime = 0;
2,400✔
761
  setThreadName("vnode-timer");
2,400✔
762

763
  while (1) {
1,067,458✔
764
    lastTime++;
1,069,858✔
765
    taosMsleep(100);
1,069,858✔
766
    if (pMgmt->stop) break;
1,069,858✔
767
    if (lastTime % 10 != 0) continue;
1,067,458✔
768

769
    int64_t sec = lastTime / 10;
105,693✔
770
    if (sec % (VNODE_TIMEOUT_SEC / 2) == 0) {
105,693✔
771
      vmCheckSyncTimeout(pMgmt);
2,750✔
772
    }
773
  }
774

775
  return NULL;
2,400✔
776
}
777

778
static int32_t vmInitTimer(SVnodeMgmt *pMgmt) {
2,400✔
779
  int32_t      code = 0;
2,400✔
780
  TdThreadAttr thAttr;
781
  (void)taosThreadAttrInit(&thAttr);
2,400✔
782
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
2,400✔
783
  if (taosThreadCreate(&pMgmt->thread, &thAttr, vmThreadFp, pMgmt) != 0) {
2,400!
784
    code = TAOS_SYSTEM_ERROR(errno);
×
785
    dError("failed to create vnode timer thread since %s", tstrerror(code));
×
786
    return code;
×
787
  }
788

789
  (void)taosThreadAttrDestroy(&thAttr);
2,400✔
790
  return 0;
2,400✔
791
}
792

793
static void vmCleanupTimer(SVnodeMgmt *pMgmt) {
2,400✔
794
  pMgmt->stop = true;
2,400✔
795
  if (taosCheckPthreadValid(pMgmt->thread)) {
2,400!
796
    (void)taosThreadJoin(pMgmt->thread, NULL);
2,400✔
797
    taosThreadClear(&pMgmt->thread);
2,400✔
798
  }
799
}
2,400✔
800

801
static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
2,400✔
802
  int32_t code = -1;
2,400✔
803

804
  SVnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SVnodeMgmt));
2,400✔
805
  if (pMgmt == NULL) {
2,400!
806
    code = terrno;
×
807
    goto _OVER;
×
808
  }
809

810
  pMgmt->pData = pInput->pData;
2,400✔
811
  pMgmt->path = pInput->path;
2,400✔
812
  pMgmt->name = pInput->name;
2,400✔
813
  pMgmt->msgCb = pInput->msgCb;
2,400✔
814
  pMgmt->msgCb.putToQueueFp = (PutToQueueFp)vmPutRpcMsgToQueue;
2,400✔
815
  pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
2,400✔
816
  pMgmt->msgCb.mgmt = pMgmt;
2,400✔
817

818
  code = taosThreadRwlockInit(&pMgmt->lock, NULL);
2,400✔
819
  if (code != 0) {
2,400!
820
    code = TAOS_SYSTEM_ERROR(errno);
×
821
    goto _OVER;
×
822
  }
823

824
  code = taosThreadMutexInit(&pMgmt->mutex, NULL);
2,400✔
825
  if (code != 0) {
2,400!
826
    code = TAOS_SYSTEM_ERROR(errno);
×
827
    goto _OVER;
×
828
  }
829

830
  code = taosThreadMutexInit(&pMgmt->fileLock, NULL);
2,400✔
831
  if (code != 0) {
2,400!
832
    code = TAOS_SYSTEM_ERROR(errno);
×
833
    goto _OVER;
×
834
  }
835

836
  pMgmt->pTfs = pInput->pTfs;
2,400✔
837
  if (pMgmt->pTfs == NULL) {
2,400!
838
    dError("tfs is null.");
×
839
    goto _OVER;
×
840
  }
841
  tmsgReportStartup("vnode-tfs", "initialized");
2,400✔
842
  if ((code = walInit(pInput->stopDnodeFp)) != 0) {
2,400!
843
    dError("failed to init wal since %s", tstrerror(code));
×
844
    goto _OVER;
×
845
  }
846

847
  tmsgReportStartup("vnode-wal", "initialized");
2,400✔
848

849
  if ((code = syncInit()) != 0) {
2,400!
850
    dError("failed to open sync since %s", tstrerror(code));
×
851
    goto _OVER;
×
852
  }
853
  tmsgReportStartup("vnode-sync", "initialized");
2,400✔
854

855
  if ((code = vnodeInit(tsNumOfCommitThreads, pInput->stopDnodeFp)) != 0) {
2,400!
856
    dError("failed to init vnode since %s", tstrerror(code));
×
857
    goto _OVER;
×
858
  }
859
  tmsgReportStartup("vnode-commit", "initialized");
2,400✔
860

861
  if ((code = vmStartWorker(pMgmt)) != 0) {
2,400!
862
    dError("failed to init workers since %s", tstrerror(code));
×
863
    goto _OVER;
×
864
  }
865
  tmsgReportStartup("vnode-worker", "initialized");
2,400✔
866

867
  if ((code = vmOpenVnodes(pMgmt)) != 0) {
2,400!
868
    dError("failed to open all vnodes since %s", tstrerror(code));
×
869
    goto _OVER;
×
870
  }
871
  tmsgReportStartup("vnode-vnodes", "initialized");
2,400✔
872

873
  if ((code = udfcOpen()) != 0) {
2,400!
874
    dError("failed to open udfc in vnode since %s", tstrerror(code));
×
875
    goto _OVER;
×
876
  }
877

878
  code = 0;
2,400✔
879

880
_OVER:
2,400✔
881
  if (code == 0) {
2,400!
882
    pOutput->pMgmt = pMgmt;
2,400✔
883
  } else {
884
    dError("failed to init vnodes-mgmt since %s", tstrerror(code));
×
885
    vmCleanup(pMgmt);
×
886
  }
887

888
  return code;
2,400✔
889
}
890

891
static int32_t vmRequire(const SMgmtInputOpt *pInput, bool *required) {
2,436✔
892
  *required = tsNumOfSupportVnodes > 0;
2,436✔
893
  return 0;
2,436✔
894
}
895

896
static void *vmRestoreVnodeInThread(void *param) {
1,065✔
897
  SVnodeThread *pThread = param;
1,065✔
898
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
1,065✔
899

900
  dInfo("thread:%d, start to restore %d vnodes", pThread->threadIndex, pThread->vnodeNum);
1,065!
901
  setThreadName("restore-vnodes");
1,065✔
902

903
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
2,132✔
904
    SVnodeObj *pVnode = pThread->ppVnodes[v];
1,066✔
905
    if (pVnode->failed) {
1,066!
906
      dError("vgId:%d, cannot restore a vnode in failed mode.", pVnode->vgId);
×
907
      continue;
×
908
    }
909

910
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
1,066✔
911
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been restored", pVnode->vgId,
1,066✔
912
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
913
    tmsgReportStartup("vnode-restore", stepDesc);
1,066✔
914

915
    int32_t code = vnodeStart(pVnode->pImpl);
1,066✔
916
    if (code != 0) {
1,066!
917
      dError("vgId:%d, failed to restore vnode by thread:%d", pVnode->vgId, pThread->threadIndex);
×
918
      pThread->failed++;
×
919
    } else {
920
      dInfo("vgId:%d, is restored by thread:%d", pVnode->vgId, pThread->threadIndex);
1,066!
921
      pThread->opened++;
1,066✔
922
      (void)atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
1,066✔
923
    }
924
  }
925

926
  dInfo("thread:%d, numOfVnodes:%d, restored:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
1,066!
927
        pThread->failed);
928
  return NULL;
1,066✔
929
}
930

931
static int32_t vmStartVnodes(SVnodeMgmt *pMgmt) {
2,400✔
932
  int32_t     code = 0;
2,400✔
933
  int32_t     numOfVnodes = 0;
2,400✔
934
  SVnodeObj **ppVnodes = NULL;
2,400✔
935
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
2,400✔
936
  if (code != 0) {
2,400!
937
    dError("failed to get vnode list since %s", tstrerror(code));
×
938
    return code;
×
939
  }
940

941
  int32_t threadNum = tsNumOfCores / 2;
2,400✔
942
  if (threadNum < 1) threadNum = 1;
2,400!
943
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
2,400✔
944

945
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
2,400✔
946
  if (threads == NULL) {
2,400!
947
    return terrno;
×
948
  }
949

950
  for (int32_t t = 0; t < threadNum; ++t) {
50,400✔
951
    threads[t].threadIndex = t;
48,000✔
952
    threads[t].pMgmt = pMgmt;
48,000✔
953
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
48,000✔
954
    if (threads[t].ppVnodes == NULL) {
48,000!
955
      code = terrno;
×
956
      break;
×
957
    }
958
  }
959

960
  for (int32_t v = 0; v < numOfVnodes; ++v) {
3,466✔
961
    int32_t       t = v % threadNum;
1,066✔
962
    SVnodeThread *pThread = &threads[t];
1,066✔
963
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
1,066!
964
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
1,066✔
965
    }
966
  }
967

968
  pMgmt->state.openVnodes = 0;
2,400✔
969
  pMgmt->state.dropVnodes = 0;
2,400✔
970
  dInfo("restore %d vnodes with %d threads", numOfVnodes, threadNum);
2,400!
971

972
  for (int32_t t = 0; t < threadNum; ++t) {
50,400✔
973
    SVnodeThread *pThread = &threads[t];
48,000✔
974
    if (pThread->vnodeNum == 0) continue;
48,000✔
975

976
    TdThreadAttr thAttr;
977
    (void)taosThreadAttrInit(&thAttr);
1,066✔
978
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,066✔
979
    if (taosThreadCreate(&pThread->thread, &thAttr, vmRestoreVnodeInThread, pThread) != 0) {
1,066!
980
      dError("thread:%d, failed to create thread to restore vnode since %s", pThread->threadIndex, strerror(errno));
×
981
    }
982

983
    (void)taosThreadAttrDestroy(&thAttr);
1,066✔
984
  }
985

986
  for (int32_t t = 0; t < threadNum; ++t) {
50,400✔
987
    SVnodeThread *pThread = &threads[t];
48,000✔
988
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
48,000!
989
      (void)taosThreadJoin(pThread->thread, NULL);
1,066✔
990
      taosThreadClear(&pThread->thread);
1,066✔
991
    }
992
    taosMemoryFree(pThread->ppVnodes);
48,000✔
993
  }
994
  taosMemoryFree(threads);
2,400✔
995

996
  for (int32_t i = 0; i < numOfVnodes; ++i) {
3,466✔
997
    if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
1,066!
998
    vmReleaseVnode(pMgmt, ppVnodes[i]);
1,066✔
999
  }
1000

1001
  if (ppVnodes != NULL) {
2,400!
1002
    taosMemoryFree(ppVnodes);
2,400✔
1003
  }
1004

1005
  return vmInitTimer(pMgmt);
2,400✔
1006

1007
_exit:
1008
  for (int32_t t = 0; t < threadNum; ++t) {
1009
    SVnodeThread *pThread = &threads[t];
1010
    taosMemoryFree(pThread->ppVnodes);
1011
  }
1012
  taosMemoryFree(threads);
1013
  return code;
1014
}
1015

1016
static void vmStop(SVnodeMgmt *pMgmt) { vmCleanupTimer(pMgmt); }
2,400✔
1017

1018
SMgmtFunc vmGetMgmtFunc() {
2,436✔
1019
  SMgmtFunc mgmtFunc = {0};
2,436✔
1020
  mgmtFunc.openFp = vmInit;
2,436✔
1021
  mgmtFunc.closeFp = (NodeCloseFp)vmCleanup;
2,436✔
1022
  mgmtFunc.startFp = (NodeStartFp)vmStartVnodes;
2,436✔
1023
  mgmtFunc.stopFp = (NodeStopFp)vmStop;
2,436✔
1024
  mgmtFunc.requiredFp = vmRequire;
2,436✔
1025
  mgmtFunc.getHandlesFp = vmGetMsgHandles;
2,436✔
1026

1027
  return mgmtFunc;
2,436✔
1028
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc