• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3541

26 Nov 2024 03:56AM UTC coverage: 60.776% (-0.07%) from 60.846%
#3541

push

travis-ci

web-flow
Merge pull request #28920 from taosdata/fix/TD-33008-3.0

fix(query)[TD-33008]. fix error handling in tsdbCacheRead

120076 of 252763 branches covered (47.51%)

Branch coverage included in aggregate %.

0 of 2 new or added lines in 1 file covered. (0.0%)

1395 existing lines in 154 files now uncovered.

200995 of 275526 relevant lines covered (72.95%)

19612328.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.69
/source/dnode/mgmt/mgmt_vnode/src/vmInt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "vmInt.h"
18
#include "libs/function/tudf.h"
19
#include "osMemory.h"
20
#include "tfs.h"
21
#include "vnd.h"
22

23
int32_t vmGetPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
11,109✔
24
  int32_t    diskId = -1;
11,109✔
25
  SVnodeObj *pVnode = NULL;
11,109✔
26

27
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
11,109✔
28
  int32_t r = taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
11,117✔
29
  if (pVnode != NULL) {
11,116!
30
    diskId = pVnode->diskPrimary;
×
31
  }
32
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
11,116✔
33
  return diskId;
11,115✔
34
}
35

36
int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
11,111✔
37
  int32_t code = 0;
11,111✔
38
  STfs   *pTfs = pMgmt->pTfs;
11,111✔
39
  int32_t diskId = 0;
11,111✔
40
  if (!pTfs) {
11,111!
41
    return diskId;
×
42
  }
43

44
  // search fs
45
  char vnodePath[TSDB_FILENAME_LEN] = {0};
11,111✔
46
  snprintf(vnodePath, TSDB_FILENAME_LEN - 1, "vnode%svnode%d", TD_DIRSEP, vgId);
11,111✔
47
  char fname[TSDB_FILENAME_LEN] = {0};
11,111✔
48
  char fnameTmp[TSDB_FILENAME_LEN] = {0};
11,111✔
49
  snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME);
11,111✔
50
  snprintf(fnameTmp, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME_TMP);
11,111✔
51

52
  diskId = tfsSearch(pTfs, 0, fname);
11,111✔
53
  if (diskId >= 0) {
11,091!
54
    return diskId;
×
55
  }
56
  diskId = tfsSearch(pTfs, 0, fnameTmp);
11,091✔
57
  if (diskId >= 0) {
11,108!
58
    return diskId;
×
59
  }
60

61
  // alloc
62
  int32_t     disks[TFS_MAX_DISKS_PER_TIER] = {0};
11,108✔
63
  int32_t     numOfVnodes = 0;
11,108✔
64
  SVnodeObj **ppVnodes = NULL;
11,108✔
65

66
  code = taosThreadMutexLock(&pMgmt->mutex);
11,108✔
67
  if (code != 0) {
11,117!
68
    return code;
×
69
  }
70

71
  code = vmGetAllVnodeListFromHashWithCreating(pMgmt, &numOfVnodes, &ppVnodes);
11,117✔
72
  if (code != 0) {
11,117!
73
    int32_t r = taosThreadMutexUnlock(&pMgmt->mutex);
×
74
    if (r != 0) {
×
75
      dError("vgId:%d, failed to unlock mutex since %s", vgId, tstrerror(r));
×
76
    }
77
    return code;
×
78
  }
79

80
  for (int32_t v = 0; v < numOfVnodes; v++) {
53,852✔
81
    SVnodeObj *pVnode = ppVnodes[v];
42,735✔
82
    disks[pVnode->diskPrimary] += 1;
42,735✔
83
  }
84

85
  int32_t minVal = INT_MAX;
11,117✔
86
  int32_t ndisk = tfsGetDisksAtLevel(pTfs, 0);
11,117✔
87
  diskId = 0;
11,117✔
88
  for (int32_t id = 0; id < ndisk; id++) {
22,479✔
89
    if (minVal > disks[id]) {
11,362✔
90
      minVal = disks[id];
11,161✔
91
      diskId = id;
11,161✔
92
    }
93
  }
94

95
  SVnodeObj *pCreatingVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
11,117✔
96
  if (pCreatingVnode == NULL) {
11,117!
97
    code = -1;
×
98
    if (terrno != 0) code = terrno;
×
99
    dError("failed to alloc vnode since %s", tstrerror(code));
×
100
    int32_t r = taosThreadMutexUnlock(&pMgmt->mutex);
×
101
    if (r != 0) {
×
102
      dError("vgId:%d, failed to unlock mutex since %s", vgId, tstrerror(r));
×
103
    }
104
    goto _OVER;
×
105
  }
106
  (void)memset(pCreatingVnode, 0, sizeof(SVnodeObj));
11,117✔
107

108
  pCreatingVnode->vgId = vgId;
11,117✔
109
  pCreatingVnode->diskPrimary = diskId;
11,117✔
110

111
  code = taosThreadRwlockWrlock(&pMgmt->lock);
11,117✔
112
  if (code != 0) {
11,117!
113
    int32_t r = taosThreadMutexUnlock(&pMgmt->mutex);
×
114
    if (r != 0) {
×
115
      dError("vgId:%d, failed to unlock mutex since %s", vgId, tstrerror(r));
×
116
    }
117
    taosMemoryFree(pCreatingVnode);
×
118
    goto _OVER;
×
119
  }
120

121
  dTrace("vgId:%d, put vnode into creating hash, pCreatingVnode:%p", vgId, pCreatingVnode);
11,117✔
122
  code = taosHashPut(pMgmt->creatingHash, &vgId, sizeof(int32_t), &pCreatingVnode, sizeof(SVnodeObj *));
11,117✔
123
  if (code != 0) {
11,117!
124
    dError("vgId:%d, failed to put vnode to creatingHash", vgId);
×
125
    taosMemoryFree(pCreatingVnode);
×
126
  }
127

128
  int32_t r = taosThreadRwlockUnlock(&pMgmt->lock);
11,117✔
129
  if (r != 0) {
11,117!
130
    dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r));
×
131
  }
132

133
  code = taosThreadMutexUnlock(&pMgmt->mutex);
11,117✔
134
  if (code != 0) {
11,117!
135
    goto _OVER;
×
136
  }
137

138
_OVER:
11,117✔
139

140
  for (int32_t i = 0; i < numOfVnodes; ++i) {
53,852✔
141
    if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
42,735!
142
    vmReleaseVnode(pMgmt, ppVnodes[i]);
42,735✔
143
  }
144
  if (ppVnodes != NULL) {
11,117!
145
    taosMemoryFree(ppVnodes);
11,117✔
146
  }
147

148
  if (code != 0) {
11,117!
149
    dError("vgId:%d, failed to alloc disk since %s", vgId, tstrerror(code));
×
150
    return code;
×
151
  } else {
152
    dInfo("vgId:%d, alloc disk:%d of level 0. ndisk:%d, vnodes: %d", vgId, diskId, ndisk, numOfVnodes);
11,117!
153
    return diskId;
11,117✔
154
  }
155
}
156

157
SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict) {
54,550,832✔
158
  SVnodeObj *pVnode = NULL;
54,550,832✔
159

160
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
54,550,832✔
161
  int32_t r = taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
54,621,815✔
162
  if (pVnode == NULL || strict && (pVnode->dropped || pVnode->failed)) {
54,606,469!
163
    terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
57,306✔
164
    pVnode = NULL;
63,027✔
165
  } else {
166
    int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
54,549,163✔
167
    dTrace("vgId:%d, acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
54,553,529✔
168
  }
169
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
54,616,560✔
170

171
  return pVnode;
54,624,265✔
172
}
173

174
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) { return vmAcquireVnodeImpl(pMgmt, vgId, true); }
54,517,112✔
175

176
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
54,735,694✔
177
  if (pVnode == NULL) return;
54,735,694!
178

179
  //(void)taosThreadRwlockRdlock(&pMgmt->lock);
180
  int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
54,735,694✔
181
  dTrace("vgId:%d, release vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
54,762,690✔
182
  //(void)taosThreadRwlockUnlock(&pMgmt->lock);
183
}
184

185
static void vmFreeVnodeObj(SVnodeObj **ppVnode) {
26,246✔
186
  if (!ppVnode || !(*ppVnode)) return;
26,246!
187

188
  SVnodeObj *pVnode = *ppVnode;
26,246✔
189

190
  int32_t refCount = atomic_load_32(&pVnode->refCount);
26,246✔
191
  while (refCount > 0) {
26,246!
UNCOV
192
    dWarn("vgId:%d, vnode is refenced, retry to free in 200ms, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
×
UNCOV
193
    taosMsleep(200);
×
UNCOV
194
    refCount = atomic_load_32(&pVnode->refCount);
×
195
  }
196

197
  taosMemoryFree(pVnode->path);
26,246✔
198
  taosMemoryFree(pVnode);
26,246✔
199
  ppVnode[0] = NULL;
26,246✔
200
}
201

202
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
13,702✔
203
  SVnodeObj *pVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
13,702✔
204
  if (pVnode == NULL) {
13,701!
205
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
206
    return -1;
×
207
  }
208

209
  pVnode->vgId = pCfg->vgId;
13,701✔
210
  pVnode->vgVersion = pCfg->vgVersion;
13,701✔
211
  pVnode->diskPrimary = pCfg->diskPrimary;
13,701✔
212
  pVnode->refCount = 0;
13,701✔
213
  pVnode->dropped = 0;
13,701✔
214
  pVnode->failed = 0;
13,701✔
215
  pVnode->path = taosStrdup(pCfg->path);
13,701✔
216
  pVnode->pImpl = pImpl;
13,702✔
217

218
  if (pVnode->path == NULL) {
13,702!
219
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
220
    taosMemoryFree(pVnode);
×
221
    return -1;
×
222
  }
223

224
  if (pImpl) {
13,702!
225
    if (vmAllocQueue(pMgmt, pVnode) != 0) {
13,702!
226
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
227
      taosMemoryFree(pVnode->path);
×
228
      taosMemoryFree(pVnode);
×
229
      return -1;
×
230
    }
231
  } else {
232
    pVnode->failed = 1;
×
233
  }
234

235
  (void)taosThreadRwlockWrlock(&pMgmt->lock);
13,702✔
236

237
  SVnodeObj *pOld = NULL;
13,702✔
238

239
  int32_t    r = taosHashGetDup(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
13,702✔
240
  if (r != 0) {
13,702!
UNCOV
241
    dError("vgId:%d, failed to get vnode from hash", pVnode->vgId);
×
242
  }
243
  if (pOld) {
13,702!
UNCOV
244
    vmFreeVnodeObj(&pOld);
×
245
  }
246
  int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
13,702✔
247

248
  pOld = NULL;
13,702✔
249
  r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
13,702✔
250
  if (r != 0) {
13,702!
UNCOV
251
    dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
×
252
  }
253
  if (pOld != NULL) {
13,702✔
254
    vmFreeVnodeObj(&pOld);
1,427✔
255
    dInfo("vgId:%d, remove from closedHash", pVnode->vgId);
1,427!
256
    r = taosHashRemove(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t));
1,427✔
257
    if (r != 0) {
1,427!
UNCOV
258
      dError("vgId:%d, failed to remove vnode from hash", pVnode->vgId);
×
259
    }
260
  }
261

262
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
13,702✔
263

264
  return code;
13,702✔
265
}
266

267
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal, bool keepClosed) {
13,702✔
268
  char path[TSDB_FILENAME_LEN] = {0};
13,702✔
269
  bool atExit = true;
13,702✔
270

271
  if (pVnode->pImpl && vnodeIsLeader(pVnode->pImpl)) {
13,702!
272
    vnodeProposeCommitOnNeed(pVnode->pImpl, atExit);
11,537✔
273
  }
274

275
  (void)taosThreadRwlockWrlock(&pMgmt->lock);
13,701✔
276
  int32_t r = taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
13,702✔
277
  if (r != 0) {
13,702!
UNCOV
278
    dError("vgId:%d, failed to remove vnode from hash", pVnode->vgId);
×
279
  }
280
  if (keepClosed) {
13,702✔
281
    SVnodeObj *pClosedVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
1,427✔
282
    if (pClosedVnode == NULL) {
1,427!
283
      dError("failed to alloc vnode since %s", terrstr());
×
UNCOV
284
      (void)taosThreadRwlockUnlock(&pMgmt->lock);
×
UNCOV
285
      return;
×
286
    }
287
    (void)memset(pClosedVnode, 0, sizeof(SVnodeObj));
1,427✔
288

289
    pClosedVnode->vgId = pVnode->vgId;
1,427✔
290
    pClosedVnode->dropped = pVnode->dropped;
1,427✔
291
    pClosedVnode->vgVersion = pVnode->vgVersion;
1,427✔
292
    pClosedVnode->diskPrimary = pVnode->diskPrimary;
1,427✔
293
    pClosedVnode->toVgId = pVnode->toVgId;
1,427✔
294

295
    SVnodeObj *pOld = NULL;
1,427✔
296
    r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
1,427✔
297
    if (r != 0) {
1,427!
UNCOV
298
      dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
×
299
    }
300
    if (pOld) {
1,427!
UNCOV
301
      vmFreeVnodeObj(&pOld);
×
302
    }
303
    dInfo("vgId:%d, put vnode to closedHash", pVnode->vgId);
1,427!
304
    r = taosHashPut(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), &pClosedVnode, sizeof(SVnodeObj *));
1,427✔
305
    if (r != 0) {
1,427!
UNCOV
306
      dError("vgId:%d, failed to put vnode to closedHash", pVnode->vgId);
×
307
    }
308
  }
309
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
13,702✔
310

311
  vmReleaseVnode(pMgmt, pVnode);
13,702✔
312

313
  if (pVnode->failed) {
13,702!
UNCOV
314
    goto _closed;
×
315
  }
316
  dInfo("vgId:%d, pre close", pVnode->vgId);
13,702!
317
  vnodePreClose(pVnode->pImpl);
13,702✔
318

319
  dInfo("vgId:%d, wait for vnode ref become 0", pVnode->vgId);
13,697!
320
  while (pVnode->refCount > 0) taosMsleep(10);
13,702!
321

322
  dInfo("vgId:%d, wait for vnode write queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteW.queue,
13,702!
323
        taosQueueGetThreadId(pVnode->pWriteW.queue));
324
  tMultiWorkerCleanup(&pVnode->pWriteW);
13,702✔
325

326
  dInfo("vgId:%d, wait for vnode sync queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue,
13,701!
327
        taosQueueGetThreadId(pVnode->pSyncW.queue));
328
  tMultiWorkerCleanup(&pVnode->pSyncW);
13,702✔
329

330
  dInfo("vgId:%d, wait for vnode sync rd queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncRdW.queue,
13,701!
331
        taosQueueGetThreadId(pVnode->pSyncRdW.queue));
332
  tMultiWorkerCleanup(&pVnode->pSyncRdW);
13,701✔
333

334
  dInfo("vgId:%d, wait for vnode apply queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue,
13,702!
335
        taosQueueGetThreadId(pVnode->pApplyW.queue));
336
  tMultiWorkerCleanup(&pVnode->pApplyW);
13,702✔
337

338
  dInfo("vgId:%d, wait for vnode fetch queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
13,702!
339
        taosQueueGetThreadId(pVnode->pFetchQ));
340
  while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
13,702!
341

342
  dInfo("vgId:%d, wait for vnode query queue:%p is empty", pVnode->vgId, pVnode->pQueryQ);
13,702!
343
  while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
13,704✔
344

345
  tqNotifyClose(pVnode->pImpl->pTq);
13,702✔
346
  dInfo("vgId:%d, wait for vnode stream queue:%p is empty", pVnode->vgId, pVnode->pStreamQ);
13,702!
347
  while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10);
13,721✔
348

349
  dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId);
13,702!
350

351
  dInfo("vgId:%d, post close", pVnode->vgId);
13,702!
352
  vnodePostClose(pVnode->pImpl);
13,702✔
353

354
  vmFreeQueue(pMgmt, pVnode);
13,700✔
355

356
  if (commitAndRemoveWal) {
13,702✔
357
    dInfo("vgId:%d, commit data for vnode split", pVnode->vgId);
86!
358
    if (vnodeSyncCommit(pVnode->pImpl) != 0) {
86!
UNCOV
359
      dError("vgId:%d, failed to commit data", pVnode->vgId);
×
360
    }
361
    if (vnodeBegin(pVnode->pImpl) != 0) {
86!
UNCOV
362
      dError("vgId:%d, failed to begin", pVnode->vgId);
×
363
    }
364
    dInfo("vgId:%d, commit data finished", pVnode->vgId);
86!
365
  }
366

367
  int32_t nodeId = vnodeNodeId(pVnode->pImpl);
13,702✔
368
  vnodeClose(pVnode->pImpl);
13,701✔
369
  pVnode->pImpl = NULL;
13,702✔
370

371
_closed:
13,702✔
372
  dInfo("vgId:%d, vnode is closed", pVnode->vgId);
13,702!
373

374
  if (commitAndRemoveWal) {
13,702✔
375
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d%swal", TD_DIRSEP, pVnode->vgId, TD_DIRSEP);
86✔
376
    dInfo("vgId:%d, remove all wals, path:%s", pVnode->vgId, path);
86!
377
    if (tfsRmdir(pMgmt->pTfs, path) != 0) {
86!
UNCOV
378
      dTrace("vgId:%d, failed to remove wals, path:%s", pVnode->vgId, path);
×
379
    }
380
    if (tfsMkdir(pMgmt->pTfs, path) != 0) {
86!
UNCOV
381
      dTrace("vgId:%d, failed to create wals, path:%s", pVnode->vgId, path);
×
382
    }
383
  }
384

385
  if (pVnode->dropped) {
13,702✔
386
    dInfo("vgId:%d, vnode is destroyed, dropped:%d", pVnode->vgId, pVnode->dropped);
5,177!
387
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId);
5,177✔
388
    vnodeDestroy(pVnode->vgId, path, pMgmt->pTfs, nodeId);
5,177✔
389
  }
390

391
  vmFreeVnodeObj(&pVnode);
13,702✔
392
}
393

394
static int32_t vmRestoreVgroupId(SWrapperCfg *pCfg, STfs *pTfs) {
×
395
  int32_t srcVgId = pCfg->vgId;
×
UNCOV
396
  int32_t dstVgId = pCfg->toVgId;
×
UNCOV
397
  if (dstVgId == 0) return 0;
×
398

399
  char srcPath[TSDB_FILENAME_LEN];
400
  char dstPath[TSDB_FILENAME_LEN];
401

UNCOV
402
  snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
×
403
  snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
×
404

405
  int32_t diskPrimary = pCfg->diskPrimary;
×
406
  int32_t vgId = vnodeRestoreVgroupId(srcPath, dstPath, srcVgId, dstVgId, diskPrimary, pTfs);
×
407
  if (vgId <= 0) {
×
UNCOV
408
    dError("vgId:%d, failed to restore vgroup id. srcPath: %s", pCfg->vgId, srcPath);
×
UNCOV
409
    return -1;
×
410
  }
411

412
  pCfg->vgId = vgId;
×
UNCOV
413
  pCfg->toVgId = 0;
×
UNCOV
414
  return 0;
×
415
}
416

417
static void *vmOpenVnodeInThread(void *param) {
1,054✔
418
  SVnodeThread *pThread = param;
1,054✔
419
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
1,054✔
420
  char          path[TSDB_FILENAME_LEN];
421

422
  dInfo("thread:%d, start to open or destroy %d vnodes", pThread->threadIndex, pThread->vnodeNum);
1,054!
423
  setThreadName("open-vnodes");
1,054✔
424

425
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
2,126✔
426
    SWrapperCfg *pCfg = &pThread->pCfgs[v];
1,072✔
427
    if (pCfg->dropped) {
1,072!
UNCOV
428
      char stepDesc[TSDB_STEP_DESC_LEN] = {0};
×
429
      snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to destroy, %d of %d have been dropped", pCfg->vgId,
×
430
               pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
431
      tmsgReportStartup("vnode-destroy", stepDesc);
×
432

433
      snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
×
434
      vnodeDestroy(pCfg->vgId, path, pMgmt->pTfs, 0);
×
435
      pThread->updateVnodesList = true;
×
436
      pThread->dropped++;
×
UNCOV
437
      (void)atomic_add_fetch_32(&pMgmt->state.dropVnodes, 1);
×
UNCOV
438
      continue;
×
439
    }
440

441
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
1,072✔
442
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
1,072✔
443
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
444
    tmsgReportStartup("vnode-open", stepDesc);
1,072✔
445

446
    if (pCfg->toVgId) {
1,072!
447
      if (vmRestoreVgroupId(pCfg, pMgmt->pTfs) != 0) {
×
448
        dError("vgId:%d, failed to restore vgroup id by thread:%d", pCfg->vgId, pThread->threadIndex);
×
UNCOV
449
        pThread->failed++;
×
450
        continue;
×
451
      }
UNCOV
452
      pThread->updateVnodesList = true;
×
453
    }
454

455
    int32_t diskPrimary = pCfg->diskPrimary;
1,072✔
456
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
1,072✔
457

458
    SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, false);
1,072✔
459

460
    if (pImpl == NULL) {
1,072!
461
      dError("vgId:%d, failed to open vnode by thread:%d since %s", pCfg->vgId, pThread->threadIndex, terrstr());
×
462
      if (terrno != TSDB_CODE_NEED_RETRY) {
×
UNCOV
463
        pThread->failed++;
×
UNCOV
464
        continue;
×
465
      }
466
    }
467

468
    if (vmOpenVnode(pMgmt, pCfg, pImpl) != 0) {
1,072!
469
      dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
×
UNCOV
470
      pThread->failed++;
×
UNCOV
471
      continue;
×
472
    }
473

474
    dInfo("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
1,072!
475
    pThread->opened++;
1,072✔
476
    (void)atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
1,072✔
477
  }
478

479
  dInfo("thread:%d, numOfVnodes:%d, opened:%d dropped:%d failed:%d", pThread->threadIndex, pThread->vnodeNum,
1,054!
480
        pThread->opened, pThread->dropped, pThread->failed);
481
  return NULL;
1,054✔
482
}
483

484
static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
2,393✔
485
  pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
2,393✔
486
  if (pMgmt->hash == NULL) {
2,393!
UNCOV
487
    dError("failed to init vnode hash since %s", terrstr());
×
UNCOV
488
    return TSDB_CODE_OUT_OF_MEMORY;
×
489
  }
490

491
  pMgmt->closedHash =
2,393✔
492
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
2,393✔
493
  if (pMgmt->closedHash == NULL) {
2,393!
UNCOV
494
    dError("failed to init vnode closed hash since %s", terrstr());
×
UNCOV
495
    return TSDB_CODE_OUT_OF_MEMORY;
×
496
  }
497

498
  pMgmt->creatingHash =
2,393✔
499
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
2,393✔
500
  if (pMgmt->creatingHash == NULL) {
2,393!
UNCOV
501
    dError("failed to init vnode creatingHash hash since %s", terrstr());
×
UNCOV
502
    return TSDB_CODE_OUT_OF_MEMORY;
×
503
  }
504

505
  SWrapperCfg *pCfgs = NULL;
2,393✔
506
  int32_t      numOfVnodes = 0;
2,393✔
507
  if (vmGetVnodeListFromFile(pMgmt, &pCfgs, &numOfVnodes) != 0) {
2,393!
UNCOV
508
    dInfo("failed to get vnode list from disk since %s", terrstr());
×
UNCOV
509
    return -1;
×
510
  }
511

512
  pMgmt->state.totalVnodes = numOfVnodes;
2,393✔
513

514
  int32_t threadNum = tsNumOfCores / 2;
2,393✔
515
  if (threadNum < 1) threadNum = 1;
2,393!
516
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
2,393✔
517

518
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
2,393✔
519
  if (threads == NULL) {
2,393!
520
    dError("failed to allocate memory for threads since %s", terrstr());
×
UNCOV
521
    taosMemoryFree(pCfgs);
×
UNCOV
522
    return terrno;
×
523
  }
524

525
  for (int32_t t = 0; t < threadNum; ++t) {
50,253✔
526
    threads[t].threadIndex = t;
47,860✔
527
    threads[t].pMgmt = pMgmt;
47,860✔
528
    threads[t].pCfgs = taosMemoryCalloc(vnodesPerThread, sizeof(SWrapperCfg));
47,860✔
529
  }
530

531
  for (int32_t v = 0; v < numOfVnodes; ++v) {
3,465✔
532
    int32_t       t = v % threadNum;
1,072✔
533
    SVnodeThread *pThread = &threads[t];
1,072✔
534
    pThread->pCfgs[pThread->vnodeNum++] = pCfgs[v];
1,072✔
535
  }
536

537
  dInfo("open %d vnodes with %d threads", numOfVnodes, threadNum);
2,393!
538

539
  for (int32_t t = 0; t < threadNum; ++t) {
50,253✔
540
    SVnodeThread *pThread = &threads[t];
47,860✔
541
    if (pThread->vnodeNum == 0) continue;
47,860✔
542

543
    TdThreadAttr thAttr;
544
    (void)taosThreadAttrInit(&thAttr);
1,054✔
545
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,054✔
546
    if (taosThreadCreate(&pThread->thread, &thAttr, vmOpenVnodeInThread, pThread) != 0) {
1,054!
UNCOV
547
      dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
×
548
    }
549

550
    (void)taosThreadAttrDestroy(&thAttr);
1,054✔
551
  }
552

553
  bool updateVnodesList = false;
2,393✔
554

555
  for (int32_t t = 0; t < threadNum; ++t) {
50,253✔
556
    SVnodeThread *pThread = &threads[t];
47,860✔
557
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
47,860!
558
      (void)taosThreadJoin(pThread->thread, NULL);
1,054✔
559
      taosThreadClear(&pThread->thread);
1,054✔
560
    }
561
    taosMemoryFree(pThread->pCfgs);
47,860✔
562
    if (pThread->updateVnodesList) updateVnodesList = true;
47,860!
563
  }
564
  taosMemoryFree(threads);
2,393✔
565
  taosMemoryFree(pCfgs);
2,393✔
566

567
  if ((pMgmt->state.openVnodes + pMgmt->state.dropVnodes) != pMgmt->state.totalVnodes) {
2,393!
568
    dError("there are total vnodes:%d, opened:%d", pMgmt->state.totalVnodes, pMgmt->state.openVnodes);
×
UNCOV
569
    terrno = TSDB_CODE_VND_INIT_FAILED;
×
UNCOV
570
    return -1;
×
571
  }
572

573
  if (updateVnodesList && vmWriteVnodeListToFile(pMgmt) != 0) {
2,393!
UNCOV
574
    dError("failed to write vnode list since %s", terrstr());
×
UNCOV
575
    return -1;
×
576
  }
577

578
  dInfo("successfully opened %d vnodes", pMgmt->state.totalVnodes);
2,393!
579
  return 0;
2,393✔
580
}
581

582
void vmRemoveFromCreatingHash(SVnodeMgmt *pMgmt, int32_t vgId) {
11,117✔
583
  SVnodeObj *pOld = NULL;
11,117✔
584

585
  (void)taosThreadRwlockWrlock(&pMgmt->lock);
11,117✔
586
  int32_t    r = taosHashGetDup(pMgmt->creatingHash, &vgId, sizeof(int32_t), (void *)&pOld);
11,117✔
587
  if (r != 0) {
11,117!
UNCOV
588
    dError("vgId:%d, failed to get vnode from creating Hash", vgId);
×
589
  }
590
  dTrace("vgId:%d, remove from creating Hash", vgId);
11,117✔
591
  r = taosHashRemove(pMgmt->creatingHash, &vgId, sizeof(int32_t));
11,117✔
592
  if (r != 0) {
11,117!
UNCOV
593
    dError("vgId:%d, failed to remove vnode from hash", vgId);
×
594
  }
595
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
11,117✔
596

597
  if (pOld) {
11,117!
598
    dTrace("vgId:%d, free vnode pOld:%p", vgId, &pOld);
11,117✔
599
    vmFreeVnodeObj(&pOld);
11,117✔
600
  }
601

602
_OVER:
×
603
  if (r != 0) {
11,117!
UNCOV
604
    dError("vgId:%d, failed to remove vnode from creatingHash since %s", vgId, tstrerror(r));
×
605
  }
606
}
11,117✔
607

608
static void *vmCloseVnodeInThread(void *param) {
6,936✔
609
  SVnodeThread *pThread = param;
6,936✔
610
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
6,936✔
611

612
  dInfo("thread:%d, start to close %d vnodes", pThread->threadIndex, pThread->vnodeNum);
6,936✔
613
  setThreadName("close-vnodes");
6,941✔
614

615
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
13,949✔
616
    SVnodeObj *pVnode = pThread->ppVnodes[v];
7,012✔
617

618
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
7,012✔
619
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to close, %d of %d have been closed", pVnode->vgId,
7,012✔
620
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
621
    tmsgReportStartup("vnode-close", stepDesc);
7,012✔
622

623
    vmCloseVnode(pMgmt, pVnode, false, false);
7,012✔
624
  }
625

626
  dInfo("thread:%d, numOfVnodes:%d is closed", pThread->threadIndex, pThread->vnodeNum);
6,937!
627
  return NULL;
6,937✔
628
}
629

630
static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
2,393✔
631
  int32_t code = 0;
2,393✔
632
  dInfo("start to close all vnodes");
2,393!
633
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
2,393✔
634
  dInfo("vnodes mgmt worker is stopped");
2,393!
635
  tSingleWorkerCleanup(&pMgmt->mgmtMultiWorker);
2,393✔
636
  dInfo("vnodes multiple mgmt worker is stopped");
2,393!
637

638
  int32_t     numOfVnodes = 0;
2,393✔
639
  SVnodeObj **ppVnodes = NULL;
2,393✔
640
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
2,393✔
641
  if (code != 0) {
2,393!
UNCOV
642
    dError("failed to get vnode list since %s", tstrerror(code));
×
UNCOV
643
    return;
×
644
  }
645

646
  int32_t threadNum = tsNumOfCores / 2;
2,393✔
647
  if (threadNum < 1) threadNum = 1;
2,393!
648
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
2,393✔
649

650
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
2,393✔
651
  for (int32_t t = 0; t < threadNum; ++t) {
50,253✔
652
    threads[t].threadIndex = t;
47,860✔
653
    threads[t].pMgmt = pMgmt;
47,860✔
654
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
47,860✔
655
  }
656

657
  for (int32_t v = 0; v < numOfVnodes; ++v) {
9,405✔
658
    int32_t       t = v % threadNum;
7,012✔
659
    SVnodeThread *pThread = &threads[t];
7,012✔
660
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
7,012!
661
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
7,012✔
662
    }
663
  }
664

665
  pMgmt->state.openVnodes = 0;
2,393✔
666
  dInfo("close %d vnodes with %d threads", numOfVnodes, threadNum);
2,393!
667

668
  for (int32_t t = 0; t < threadNum; ++t) {
50,253✔
669
    SVnodeThread *pThread = &threads[t];
47,860✔
670
    if (pThread->vnodeNum == 0) continue;
47,860✔
671

672
    TdThreadAttr thAttr;
673
    (void)taosThreadAttrInit(&thAttr);
6,937✔
674
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
6,937✔
675
    if (taosThreadCreate(&pThread->thread, &thAttr, vmCloseVnodeInThread, pThread) != 0) {
6,937!
UNCOV
676
      dError("thread:%d, failed to create thread to close vnode since %s", pThread->threadIndex, strerror(errno));
×
677
    }
678

679
    (void)taosThreadAttrDestroy(&thAttr);
6,937✔
680
  }
681

682
  for (int32_t t = 0; t < threadNum; ++t) {
50,253✔
683
    SVnodeThread *pThread = &threads[t];
47,860✔
684
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
47,860!
685
      (void)taosThreadJoin(pThread->thread, NULL);
6,937✔
686
      taosThreadClear(&pThread->thread);
6,937✔
687
    }
688
    taosMemoryFree(pThread->ppVnodes);
47,860✔
689
  }
690
  taosMemoryFree(threads);
2,393✔
691

692
  if (ppVnodes != NULL) {
2,393!
693
    taosMemoryFree(ppVnodes);
2,393✔
694
  }
695

696
  if (pMgmt->hash != NULL) {
2,393!
697
    taosHashCleanup(pMgmt->hash);
2,393✔
698
    pMgmt->hash = NULL;
2,393✔
699
  }
700

701
  void *pIter = taosHashIterate(pMgmt->closedHash, NULL);
2,393✔
702
  while (pIter) {
2,393!
703
    SVnodeObj **ppVnode = pIter;
×
UNCOV
704
    vmFreeVnodeObj(ppVnode);
×
UNCOV
705
    pIter = taosHashIterate(pMgmt->closedHash, pIter);
×
706
  }
707

708
  if (pMgmt->closedHash != NULL) {
2,393!
709
    taosHashCleanup(pMgmt->closedHash);
2,393✔
710
    pMgmt->closedHash = NULL;
2,393✔
711
  }
712

713
  pIter = taosHashIterate(pMgmt->creatingHash, NULL);
2,393✔
714
  while (pIter) {
2,393!
715
    SVnodeObj **ppVnode = pIter;
×
UNCOV
716
    vmFreeVnodeObj(ppVnode);
×
UNCOV
717
    pIter = taosHashIterate(pMgmt->creatingHash, pIter);
×
718
  }
719

720
  if (pMgmt->creatingHash != NULL) {
2,393!
721
    taosHashCleanup(pMgmt->creatingHash);
2,393✔
722
    pMgmt->creatingHash = NULL;
2,393✔
723
  }
724

725
  dInfo("total vnodes:%d are all closed", numOfVnodes);
2,393!
726
}
727

728
static void vmCleanup(SVnodeMgmt *pMgmt) {
2,393✔
729
  vmCloseVnodes(pMgmt);
2,393✔
730
  vmStopWorker(pMgmt);
2,393✔
731
  vnodeCleanup();
2,393✔
732
  (void)taosThreadRwlockDestroy(&pMgmt->lock);
2,393✔
733
  (void)taosThreadMutexDestroy(&pMgmt->mutex);
2,393✔
734
  (void)taosThreadMutexDestroy(&pMgmt->fileLock);
2,393✔
735
  taosMemoryFree(pMgmt);
2,393✔
736
}
2,393✔
737

738
static void vmCheckSyncTimeout(SVnodeMgmt *pMgmt) {
2,732✔
739
  int32_t     code = 0;
2,732✔
740
  int32_t     numOfVnodes = 0;
2,732✔
741
  SVnodeObj **ppVnodes = NULL;
2,732✔
742
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
2,732✔
743
  if (code != 0) {
2,732!
UNCOV
744
    dError("failed to get vnode list since %s", tstrerror(code));
×
UNCOV
745
    return;
×
746
  }
747

748
  if (ppVnodes != NULL) {
2,732!
749
    for (int32_t i = 0; i < numOfVnodes; ++i) {
33,091✔
750
      SVnodeObj *pVnode = ppVnodes[i];
30,359✔
751
      if (!pVnode->failed) {
30,359!
752
        vnodeSyncCheckTimeout(pVnode->pImpl);
30,359✔
753
      }
754
      vmReleaseVnode(pMgmt, pVnode);
30,359✔
755
    }
756
    taosMemoryFree(ppVnodes);
2,732✔
757
  }
758
}
759

760
static void *vmThreadFp(void *param) {
2,393✔
761
  SVnodeMgmt *pMgmt = param;
2,393✔
762
  int64_t     lastTime = 0;
2,393✔
763
  setThreadName("vnode-timer");
2,393✔
764

765
  while (1) {
1,065,332✔
766
    lastTime++;
1,067,725✔
767
    taosMsleep(100);
1,067,725✔
768
    if (pMgmt->stop) break;
1,067,725✔
769
    if (lastTime % 10 != 0) continue;
1,065,332✔
770

771
    int64_t sec = lastTime / 10;
105,485✔
772
    if (sec % (VNODE_TIMEOUT_SEC / 2) == 0) {
105,485✔
773
      vmCheckSyncTimeout(pMgmt);
2,732✔
774
    }
775
  }
776

777
  return NULL;
2,393✔
778
}
779

780
static int32_t vmInitTimer(SVnodeMgmt *pMgmt) {
2,393✔
781
  int32_t      code = 0;
2,393✔
782
  TdThreadAttr thAttr;
783
  (void)taosThreadAttrInit(&thAttr);
2,393✔
784
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
2,393✔
785
  if (taosThreadCreate(&pMgmt->thread, &thAttr, vmThreadFp, pMgmt) != 0) {
2,393!
786
    code = TAOS_SYSTEM_ERROR(errno);
×
UNCOV
787
    dError("failed to create vnode timer thread since %s", tstrerror(code));
×
UNCOV
788
    return code;
×
789
  }
790

791
  (void)taosThreadAttrDestroy(&thAttr);
2,393✔
792
  return 0;
2,393✔
793
}
794

795
static void vmCleanupTimer(SVnodeMgmt *pMgmt) {
2,393✔
796
  pMgmt->stop = true;
2,393✔
797
  if (taosCheckPthreadValid(pMgmt->thread)) {
2,393!
798
    (void)taosThreadJoin(pMgmt->thread, NULL);
2,393✔
799
    taosThreadClear(&pMgmt->thread);
2,393✔
800
  }
801
}
2,393✔
802

803
static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
2,393✔
804
  int32_t code = -1;
2,393✔
805

806
  SVnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SVnodeMgmt));
2,393✔
807
  if (pMgmt == NULL) {
2,393!
UNCOV
808
    code = terrno;
×
UNCOV
809
    goto _OVER;
×
810
  }
811

812
  pMgmt->pData = pInput->pData;
2,393✔
813
  pMgmt->path = pInput->path;
2,393✔
814
  pMgmt->name = pInput->name;
2,393✔
815
  pMgmt->msgCb = pInput->msgCb;
2,393✔
816
  pMgmt->msgCb.putToQueueFp = (PutToQueueFp)vmPutRpcMsgToQueue;
2,393✔
817
  pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
2,393✔
818
  pMgmt->msgCb.mgmt = pMgmt;
2,393✔
819

820
  code = taosThreadRwlockInit(&pMgmt->lock, NULL);
2,393✔
821
  if (code != 0) {
2,393!
UNCOV
822
    code = TAOS_SYSTEM_ERROR(errno);
×
UNCOV
823
    goto _OVER;
×
824
  }
825

826
  code = taosThreadMutexInit(&pMgmt->mutex, NULL);
2,393✔
827
  if (code != 0) {
2,393!
UNCOV
828
    code = TAOS_SYSTEM_ERROR(errno);
×
UNCOV
829
    goto _OVER;
×
830
  }
831

832
  code = taosThreadMutexInit(&pMgmt->fileLock, NULL);
2,393✔
833
  if (code != 0) {
2,393!
UNCOV
834
    code = TAOS_SYSTEM_ERROR(errno);
×
UNCOV
835
    goto _OVER;
×
836
  }
837

838
  pMgmt->pTfs = pInput->pTfs;
2,393✔
839
  if (pMgmt->pTfs == NULL) {
2,393!
UNCOV
840
    dError("tfs is null.");
×
UNCOV
841
    goto _OVER;
×
842
  }
843
  tmsgReportStartup("vnode-tfs", "initialized");
2,393✔
844
  if ((code = walInit(pInput->stopDnodeFp)) != 0) {
2,393!
UNCOV
845
    dError("failed to init wal since %s", tstrerror(code));
×
UNCOV
846
    goto _OVER;
×
847
  }
848

849
  tmsgReportStartup("vnode-wal", "initialized");
2,393✔
850

851
  if ((code = syncInit()) != 0) {
2,393!
UNCOV
852
    dError("failed to open sync since %s", tstrerror(code));
×
UNCOV
853
    goto _OVER;
×
854
  }
855
  tmsgReportStartup("vnode-sync", "initialized");
2,393✔
856

857
  if ((code = vnodeInit(tsNumOfCommitThreads, pInput->stopDnodeFp)) != 0) {
2,393!
UNCOV
858
    dError("failed to init vnode since %s", tstrerror(code));
×
UNCOV
859
    goto _OVER;
×
860
  }
861
  tmsgReportStartup("vnode-commit", "initialized");
2,393✔
862

863
  if ((code = vmStartWorker(pMgmt)) != 0) {
2,393!
UNCOV
864
    dError("failed to init workers since %s", tstrerror(code));
×
UNCOV
865
    goto _OVER;
×
866
  }
867
  tmsgReportStartup("vnode-worker", "initialized");
2,393✔
868

869
  if ((code = vmOpenVnodes(pMgmt)) != 0) {
2,393!
UNCOV
870
    dError("failed to open all vnodes since %s", tstrerror(code));
×
UNCOV
871
    goto _OVER;
×
872
  }
873
  tmsgReportStartup("vnode-vnodes", "initialized");
2,393✔
874

875
  if ((code = udfcOpen()) != 0) {
2,393!
UNCOV
876
    dError("failed to open udfc in vnode since %s", tstrerror(code));
×
UNCOV
877
    goto _OVER;
×
878
  }
879

880
  code = 0;
2,393✔
881

882
_OVER:
2,393✔
883
  if (code == 0) {
2,393!
884
    pOutput->pMgmt = pMgmt;
2,393✔
885
  } else {
UNCOV
886
    dError("failed to init vnodes-mgmt since %s", tstrerror(code));
×
UNCOV
887
    vmCleanup(pMgmt);
×
888
  }
889

890
  return code;
2,393✔
891
}
892

893
static int32_t vmRequire(const SMgmtInputOpt *pInput, bool *required) {
2,428✔
894
  *required = tsNumOfSupportVnodes > 0;
2,428✔
895
  return 0;
2,428✔
896
}
897

898
static void *vmRestoreVnodeInThread(void *param) {
1,054✔
899
  SVnodeThread *pThread = param;
1,054✔
900
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
1,054✔
901

902
  dInfo("thread:%d, start to restore %d vnodes", pThread->threadIndex, pThread->vnodeNum);
1,054!
903
  setThreadName("restore-vnodes");
1,053✔
904

905
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
2,125✔
906
    SVnodeObj *pVnode = pThread->ppVnodes[v];
1,072✔
907
    if (pVnode->failed) {
1,072!
UNCOV
908
      dError("vgId:%d, cannot restore a vnode in failed mode.", pVnode->vgId);
×
UNCOV
909
      continue;
×
910
    }
911

912
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
1,072✔
913
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been restored", pVnode->vgId,
1,072✔
914
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
915
    tmsgReportStartup("vnode-restore", stepDesc);
1,072✔
916

917
    int32_t code = vnodeStart(pVnode->pImpl);
1,072✔
918
    if (code != 0) {
1,072!
UNCOV
919
      dError("vgId:%d, failed to restore vnode by thread:%d", pVnode->vgId, pThread->threadIndex);
×
UNCOV
920
      pThread->failed++;
×
921
    } else {
922
      dInfo("vgId:%d, is restored by thread:%d", pVnode->vgId, pThread->threadIndex);
1,072!
923
      pThread->opened++;
1,072✔
924
      (void)atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
1,072✔
925
    }
926
  }
927

928
  dInfo("thread:%d, numOfVnodes:%d, restored:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
1,053!
929
        pThread->failed);
930
  return NULL;
1,054✔
931
}
932

933
static int32_t vmStartVnodes(SVnodeMgmt *pMgmt) {
2,393✔
934
  int32_t     code = 0;
2,393✔
935
  int32_t     numOfVnodes = 0;
2,393✔
936
  SVnodeObj **ppVnodes = NULL;
2,393✔
937
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
2,393✔
938
  if (code != 0) {
2,393!
UNCOV
939
    dError("failed to get vnode list since %s", tstrerror(code));
×
UNCOV
940
    return code;
×
941
  }
942

943
  int32_t threadNum = tsNumOfCores / 2;
2,393✔
944
  if (threadNum < 1) threadNum = 1;
2,393!
945
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
2,393✔
946

947
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
2,393✔
948
  if (threads == NULL) {
2,393!
UNCOV
949
    return terrno;
×
950
  }
951

952
  for (int32_t t = 0; t < threadNum; ++t) {
50,253✔
953
    threads[t].threadIndex = t;
47,860✔
954
    threads[t].pMgmt = pMgmt;
47,860✔
955
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
47,860✔
956
    if (threads[t].ppVnodes == NULL) {
47,860!
UNCOV
957
      code = terrno;
×
UNCOV
958
      break;
×
959
    }
960
  }
961

962
  for (int32_t v = 0; v < numOfVnodes; ++v) {
3,465✔
963
    int32_t       t = v % threadNum;
1,072✔
964
    SVnodeThread *pThread = &threads[t];
1,072✔
965
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
1,072!
966
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
1,072✔
967
    }
968
  }
969

970
  pMgmt->state.openVnodes = 0;
2,393✔
971
  pMgmt->state.dropVnodes = 0;
2,393✔
972
  dInfo("restore %d vnodes with %d threads", numOfVnodes, threadNum);
2,393!
973

974
  for (int32_t t = 0; t < threadNum; ++t) {
50,253✔
975
    SVnodeThread *pThread = &threads[t];
47,860✔
976
    if (pThread->vnodeNum == 0) continue;
47,860✔
977

978
    TdThreadAttr thAttr;
979
    (void)taosThreadAttrInit(&thAttr);
1,054✔
980
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,054✔
981
    if (taosThreadCreate(&pThread->thread, &thAttr, vmRestoreVnodeInThread, pThread) != 0) {
1,054!
UNCOV
982
      dError("thread:%d, failed to create thread to restore vnode since %s", pThread->threadIndex, strerror(errno));
×
983
    }
984

985
    (void)taosThreadAttrDestroy(&thAttr);
1,054✔
986
  }
987

988
  for (int32_t t = 0; t < threadNum; ++t) {
50,253✔
989
    SVnodeThread *pThread = &threads[t];
47,860✔
990
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
47,860!
991
      (void)taosThreadJoin(pThread->thread, NULL);
1,054✔
992
      taosThreadClear(&pThread->thread);
1,054✔
993
    }
994
    taosMemoryFree(pThread->ppVnodes);
47,860✔
995
  }
996
  taosMemoryFree(threads);
2,393✔
997

998
  for (int32_t i = 0; i < numOfVnodes; ++i) {
3,465✔
999
    if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
1,072!
1000
    vmReleaseVnode(pMgmt, ppVnodes[i]);
1,072✔
1001
  }
1002

1003
  if (ppVnodes != NULL) {
2,393!
1004
    taosMemoryFree(ppVnodes);
2,393✔
1005
  }
1006

1007
  return vmInitTimer(pMgmt);
2,393✔
1008

1009
_exit:
1010
  for (int32_t t = 0; t < threadNum; ++t) {
1011
    SVnodeThread *pThread = &threads[t];
1012
    taosMemoryFree(pThread->ppVnodes);
1013
  }
1014
  taosMemoryFree(threads);
1015
  return code;
1016
}
1017

1018
static void vmStop(SVnodeMgmt *pMgmt) { vmCleanupTimer(pMgmt); }
2,393✔
1019

1020
SMgmtFunc vmGetMgmtFunc() {
2,428✔
1021
  SMgmtFunc mgmtFunc = {0};
2,428✔
1022
  mgmtFunc.openFp = vmInit;
2,428✔
1023
  mgmtFunc.closeFp = (NodeCloseFp)vmCleanup;
2,428✔
1024
  mgmtFunc.startFp = (NodeStartFp)vmStartVnodes;
2,428✔
1025
  mgmtFunc.stopFp = (NodeStopFp)vmStop;
2,428✔
1026
  mgmtFunc.requiredFp = vmRequire;
2,428✔
1027
  mgmtFunc.getHandlesFp = vmGetMsgHandles;
2,428✔
1028

1029
  return mgmtFunc;
2,428✔
1030
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc