• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4192

30 May 2025 03:55AM UTC coverage: 63.023% (-0.2%) from 63.267%
#4192

push

travis-ci

web-flow
fix:defined col bind in interlace mode (#31246)

157832 of 318864 branches covered (49.5%)

Branch coverage included in aggregate %.

1 of 3 new or added lines in 1 file covered. (33.33%)

2934 existing lines in 172 files now uncovered.

243367 of 317732 relevant lines covered (76.6%)

17346426.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.83
/source/dnode/mgmt/mgmt_vnode/src/vmInt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "vmInt.h"
18
#include "libs/function/tudf.h"
19
#include "osMemory.h"
20
#include "tfs.h"
21
#include "vnd.h"
22

23
int32_t vmGetPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
11,601✔
24
  int32_t    diskId = -1;
11,601✔
25
  SVnodeObj *pVnode = NULL;
11,601✔
26

27
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
11,601✔
28
  int32_t r = taosHashGetDup(pMgmt->runngingHash, &vgId, sizeof(int32_t), (void *)&pVnode);
11,606✔
29
  if (pVnode != NULL) {
11,607!
30
    diskId = pVnode->diskPrimary;
×
31
  }
32
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
11,607✔
33
  return diskId;
11,604✔
34
}
35

36
static void vmFreeVnodeObj(SVnodeObj **ppVnode) {
27,798✔
37
  if (!ppVnode || !(*ppVnode)) return;
27,798!
38

39
  SVnodeObj *pVnode = *ppVnode;
27,798✔
40

41
  int32_t refCount = atomic_load_32(&pVnode->refCount);
27,798✔
42
  while (refCount > 0) {
27,798!
43
    dWarn("vgId:%d, vnode is refenced, retry to free in 200ms, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
×
44
    taosMsleep(200);
×
45
    refCount = atomic_load_32(&pVnode->refCount);
×
46
  }
47

48
  taosMemoryFree(pVnode->path);
27,798!
49
  taosMemoryFree(pVnode);
27,797!
50
  ppVnode[0] = NULL;
27,797✔
51
}
52

53
static int32_t vmRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId, int32_t diskId) {
11,607✔
54
  int32_t    code = 0;
11,607✔
55
  SVnodeObj *pCreatingVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
11,607!
56
  if (pCreatingVnode == NULL) {
11,607!
57
    dError("failed to alloc vnode since %s", terrstr());
×
58
    return terrno;
×
59
  }
60
  (void)memset(pCreatingVnode, 0, sizeof(SVnodeObj));
11,607✔
61

62
  pCreatingVnode->vgId = vgId;
11,607✔
63
  pCreatingVnode->diskPrimary = diskId;
11,607✔
64

65
  code = taosThreadRwlockWrlock(&pMgmt->hashLock);
11,607✔
66
  if (code != 0) {
11,607!
67
    taosMemoryFree(pCreatingVnode);
×
68
    return code;
×
69
  }
70

71
  dTrace("vgId:%d, put vnode into creating hash, pCreatingVnode:%p", vgId, pCreatingVnode);
11,607✔
72
  code = taosHashPut(pMgmt->creatingHash, &vgId, sizeof(int32_t), &pCreatingVnode, sizeof(SVnodeObj *));
11,607✔
73
  if (code != 0) {
11,607!
74
    dError("vgId:%d, failed to put vnode to creatingHash", vgId);
×
75
    taosMemoryFree(pCreatingVnode);
×
76
  }
77

78
  int32_t r = taosThreadRwlockUnlock(&pMgmt->hashLock);
11,607✔
79
  if (r != 0) {
11,607!
80
    dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r));
×
81
  }
82

83
  return code;
11,607✔
84
}
85

86
static void vmUnRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId) {
11,607✔
87
  SVnodeObj *pOld = NULL;
11,607✔
88

89
  (void)taosThreadRwlockWrlock(&pMgmt->hashLock);
11,607✔
90
  int32_t r = taosHashGetDup(pMgmt->creatingHash, &vgId, sizeof(int32_t), (void *)&pOld);
11,607✔
91
  if (r != 0) {
11,607!
92
    dError("vgId:%d, failed to get vnode from creating Hash", vgId);
×
93
  }
94
  dTrace("vgId:%d, remove from creating Hash", vgId);
11,607✔
95
  r = taosHashRemove(pMgmt->creatingHash, &vgId, sizeof(int32_t));
11,607✔
96
  if (r != 0) {
11,607!
97
    dError("vgId:%d, failed to remove vnode from creatingHash", vgId);
×
98
  }
99
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
11,607✔
100

101
  if (pOld) {
11,607!
102
    dTrace("vgId:%d, free vnode pOld:%p", vgId, &pOld);
11,607✔
103
    vmFreeVnodeObj(&pOld);
11,607✔
104
  }
105

106
_OVER:
×
107
  if (r != 0) {
11,607!
108
    dError("vgId:%d, failed to remove vnode from creatingHash since %s", vgId, tstrerror(r));
×
109
  }
110
}
11,607✔
111

112
int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
11,599✔
113
  int32_t code = 0;
11,599✔
114
  STfs   *pTfs = pMgmt->pTfs;
11,599✔
115
  int32_t diskId = 0;
11,599✔
116
  if (!pTfs) {
11,599!
117
    return diskId;
×
118
  }
119

120
  // search fs
121
  char vnodePath[TSDB_FILENAME_LEN] = {0};
11,599✔
122
  snprintf(vnodePath, TSDB_FILENAME_LEN - 1, "vnode%svnode%d", TD_DIRSEP, vgId);
11,599✔
123
  char fname[TSDB_FILENAME_LEN] = {0};
11,599✔
124
  char fnameTmp[TSDB_FILENAME_LEN] = {0};
11,599✔
125
  snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME);
11,599✔
126
  snprintf(fnameTmp, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME_TMP);
11,599✔
127

128
  diskId = tfsSearch(pTfs, 0, fname);
11,599✔
129
  if (diskId >= 0) {
11,563!
130
    return diskId;
×
131
  }
132
  diskId = tfsSearch(pTfs, 0, fnameTmp);
11,563✔
133
  if (diskId >= 0) {
11,595!
134
    return diskId;
×
135
  }
136

137
  // alloc
138
  int32_t     disks[TFS_MAX_DISKS_PER_TIER] = {0};
11,595✔
139
  int32_t     numOfVnodes = 0;
11,595✔
140
  SVnodeObj **ppVnodes = NULL;
11,595✔
141

142
  code = taosThreadMutexLock(&pMgmt->mutex);
11,595✔
143
  if (code != 0) {
11,607!
144
    return code;
×
145
  }
146

147
  code = vmGetAllVnodeListFromHashWithCreating(pMgmt, &numOfVnodes, &ppVnodes);
11,607✔
148
  if (code != 0) {
11,607!
149
    int32_t r = taosThreadMutexUnlock(&pMgmt->mutex);
×
150
    if (r != 0) {
×
151
      dError("vgId:%d, failed to unlock mutex since %s", vgId, tstrerror(r));
×
152
    }
153
    return code;
×
154
  }
155

156
  for (int32_t v = 0; v < numOfVnodes; v++) {
56,562✔
157
    SVnodeObj *pVnode = ppVnodes[v];
44,955✔
158
    disks[pVnode->diskPrimary] += 1;
44,955✔
159
  }
160

161
  int32_t minVal = INT_MAX;
11,607✔
162
  int32_t ndisk = tfsGetDisksAtLevel(pTfs, 0);
11,607✔
163
  diskId = 0;
11,607✔
164
  for (int32_t id = 0; id < ndisk; id++) {
23,313✔
165
    if (minVal > disks[id]) {
11,706✔
166
      minVal = disks[id];
11,619✔
167
      diskId = id;
11,619✔
168
    }
169
  }
170
  code = vmRegisterCreatingState(pMgmt, vgId, diskId);
11,607✔
171
  if (code != 0) {
11,607!
172
    int32_t r = taosThreadMutexUnlock(&pMgmt->mutex);
×
173
    if (r != 0) {
×
174
      dError("vgId:%d, failed to unlock mutex since %s", vgId, tstrerror(r));
×
175
    }
176
    goto _OVER;
×
177
  }
178

179
  code = taosThreadMutexUnlock(&pMgmt->mutex);
11,607✔
180
  if (code != 0) {
11,607!
181
    goto _OVER;
×
182
  }
183

184
_OVER:
11,607✔
185

186
  for (int32_t i = 0; i < numOfVnodes; ++i) {
56,562✔
187
    if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
44,955!
188
    vmReleaseVnode(pMgmt, ppVnodes[i]);
44,955✔
189
  }
190
  if (ppVnodes != NULL) {
11,607!
191
    taosMemoryFree(ppVnodes);
11,607!
192
  }
193

194
  if (code != 0) {
11,607!
195
    dError("vgId:%d, failed to alloc disk since %s", vgId, tstrerror(code));
×
196
    return code;
×
197
  } else {
198
    dInfo("vgId:%d, alloc disk:%d of level 0. ndisk:%d, vnodes: %d", vgId, diskId, ndisk, numOfVnodes);
11,607!
199
    return diskId;
11,607✔
200
  }
201
}
202

203
void vmCleanPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) { vmUnRegisterCreatingState(pMgmt, vgId); }
11,607✔
204

205
SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict) {
52,805,217✔
206
  SVnodeObj *pVnode = NULL;
52,805,217✔
207

208
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
52,805,217✔
209
  int32_t r = taosHashGetDup(pMgmt->runngingHash, &vgId, sizeof(int32_t), (void *)&pVnode);
52,925,467✔
210
  if (pVnode == NULL || strict && (pVnode->dropped || pVnode->failed)) {
52,904,523!
211
    terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
64,149✔
212
    pVnode = NULL;
64,986✔
213
  } else {
214
    int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
52,840,374✔
215
    dTrace("vgId:%d, acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
52,858,075✔
216
  }
217
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
52,923,059✔
218

219
  return pVnode;
52,925,760✔
220
}
221

222
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) { return vmAcquireVnodeImpl(pMgmt, vgId, true); }
52,791,668✔
223

224
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
53,030,637✔
225
  if (pVnode == NULL) return;
53,030,637!
226

227
  //(void)taosThreadRwlockRdlock(&pMgmt->lock);
228
  int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
53,030,637✔
229
  dTrace("vgId:%d, release vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
53,061,240✔
230
  //(void)taosThreadRwlockUnlock(&pMgmt->lock);
231
}
232

233
static int32_t vmRegisterRunningState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
14,498✔
234
  SVnodeObj *pOld = NULL;
14,498✔
235
  dInfo("vgId:%d, put vnode into running hash", pVnode->vgId);
14,498!
236

237
  int32_t r = taosHashGetDup(pMgmt->runngingHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
14,498✔
238
  if (r != 0) {
14,498!
UNCOV
239
    dError("vgId:%d, failed to get vnode from hash", pVnode->vgId);
×
240
  }
241
  if (pOld) {
14,498!
UNCOV
242
    vmFreeVnodeObj(&pOld);
×
243
  }
244
  int32_t code = taosHashPut(pMgmt->runngingHash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
14,498✔
245

246
  return code;
14,498✔
247
}
248

249
static void vmUnRegisterRunningState(SVnodeMgmt *pMgmt, int32_t vgId) {
14,498✔
250
  dInfo("vgId:%d, remove from hash", vgId);
14,498!
251
  int32_t r = taosHashRemove(pMgmt->runngingHash, &vgId, sizeof(int32_t));
14,498✔
252
  if (r != 0) {
14,498!
UNCOV
253
    dError("vgId:%d, failed to remove vnode since %s", vgId, tstrerror(r));
×
254
  }
255
}
14,498✔
256

257
static int32_t vmRegisterClosedState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
1,693✔
258
  int32_t    code = 0;
1,693✔
259
  dInfo("vgId:%d, put into closed hash", pVnode->vgId);
1,693!
260
  SVnodeObj *pClosedVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
1,693!
261
  if (pClosedVnode == NULL) {
1,693!
UNCOV
262
    dError("failed to alloc vnode since %s", terrstr());
×
UNCOV
263
    return terrno;
×
264
  }
265
  (void)memset(pClosedVnode, 0, sizeof(SVnodeObj));
1,693✔
266

267
  pClosedVnode->vgId = pVnode->vgId;
1,693✔
268
  pClosedVnode->dropped = pVnode->dropped;
1,693✔
269
  pClosedVnode->vgVersion = pVnode->vgVersion;
1,693✔
270
  pClosedVnode->diskPrimary = pVnode->diskPrimary;
1,693✔
271
  pClosedVnode->toVgId = pVnode->toVgId;
1,693✔
272

273
  SVnodeObj *pOld = NULL;
1,693✔
274
  int32_t    r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
1,693✔
275
  if (r != 0) {
1,693!
UNCOV
276
    dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
×
277
  }
278
  if (pOld) {
1,693!
UNCOV
279
    vmFreeVnodeObj(&pOld);
×
280
  }
281
  dInfo("vgId:%d, put vnode to closedHash", pVnode->vgId);
1,693!
282
  r = taosHashPut(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), &pClosedVnode, sizeof(SVnodeObj *));
1,693✔
283
  if (r != 0) {
1,693!
UNCOV
284
    dError("vgId:%d, failed to put vnode to closedHash", pVnode->vgId);
×
285
  }
286

287
  return code;
1,693✔
288
}
289

290
static void vmUnRegisterClosedState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
14,498✔
291
  SVnodeObj *pOld = NULL;
14,498✔
292
  dInfo("vgId:%d, remove from closed hash", pVnode->vgId);
14,498!
293
  int32_t    r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
14,498✔
294
  if (r != 0) {
14,498!
UNCOV
295
    dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
×
296
  }
297
  if (pOld != NULL) {
14,498✔
298
    vmFreeVnodeObj(&pOld);
1,693✔
299
    dInfo("vgId:%d, remove from closedHash", pVnode->vgId);
1,693!
300
    r = taosHashRemove(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t));
1,693✔
301
    if (r != 0) {
1,693!
UNCOV
302
      dError("vgId:%d, failed to remove vnode from hash", pVnode->vgId);
×
303
    }
304
  }
305
}
14,498✔
306

307
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
14,498✔
308
  SVnodeObj *pVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
14,498!
309
  if (pVnode == NULL) {
14,498!
UNCOV
310
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
311
    return -1;
×
312
  }
313

314
  pVnode->vgId = pCfg->vgId;
14,498✔
315
  pVnode->vgVersion = pCfg->vgVersion;
14,498✔
316
  pVnode->diskPrimary = pCfg->diskPrimary;
14,498✔
317
  pVnode->refCount = 0;
14,498✔
318
  pVnode->dropped = 0;
14,498✔
319
  pVnode->failed = 0;
14,498✔
320
  pVnode->path = taosStrdup(pCfg->path);
14,498!
321
  pVnode->pImpl = pImpl;
14,498✔
322

323
  if (pVnode->path == NULL) {
14,498!
UNCOV
324
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
325
    taosMemoryFree(pVnode);
×
UNCOV
326
    return -1;
×
327
  }
328

329
  if (pImpl) {
14,498!
330
    if (vmAllocQueue(pMgmt, pVnode) != 0) {
14,498!
331
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
332
      taosMemoryFree(pVnode->path);
×
UNCOV
333
      taosMemoryFree(pVnode);
×
334
      return -1;
×
335
    }
336
  } else {
UNCOV
337
    pVnode->failed = 1;
×
338
  }
339

340
  (void)taosThreadRwlockWrlock(&pMgmt->hashLock);
14,498✔
341
  int32_t code = vmRegisterRunningState(pMgmt, pVnode);
14,498✔
342
  vmUnRegisterClosedState(pMgmt, pVnode);
14,498✔
343
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
14,498✔
344

345
  return code;
14,498✔
346
}
347

348
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal, bool keepClosed) {
14,498✔
349
  char path[TSDB_FILENAME_LEN] = {0};
14,498✔
350
  bool atExit = true;
14,498✔
351

352
  if (pVnode->pImpl && vnodeIsLeader(pVnode->pImpl)) {
14,498✔
353
    vnodeProposeCommitOnNeed(pVnode->pImpl, atExit);
12,057✔
354
  }
355

356
  (void)taosThreadRwlockWrlock(&pMgmt->hashLock);
14,491✔
357
  vmUnRegisterRunningState(pMgmt, pVnode->vgId);
14,498✔
358
  if (keepClosed) {
14,498✔
359
    if (vmRegisterClosedState(pMgmt, pVnode) != 0) {
1,693!
UNCOV
360
      (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
UNCOV
361
      return;
×
362
    };
363
  }
364
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
14,498✔
365

366
  vmReleaseVnode(pMgmt, pVnode);
14,498✔
367

368
  if (pVnode->failed) {
14,498!
UNCOV
369
    goto _closed;
×
370
  }
371
  dInfo("vgId:%d, pre close", pVnode->vgId);
14,498!
372
  vnodePreClose(pVnode->pImpl);
14,498✔
373

374
  dInfo("vgId:%d, wait for vnode ref become 0", pVnode->vgId);
14,496!
375
  while (pVnode->refCount > 0) taosMsleep(10);
14,498!
376

377
  dInfo("vgId:%d, wait for vnode write queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteW.queue,
14,498!
378
        taosQueueGetThreadId(pVnode->pWriteW.queue));
379
  tMultiWorkerCleanup(&pVnode->pWriteW);
14,498✔
380

381
  dInfo("vgId:%d, wait for vnode sync queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue,
14,497!
382
        taosQueueGetThreadId(pVnode->pSyncW.queue));
383
  tMultiWorkerCleanup(&pVnode->pSyncW);
14,498✔
384

385
  dInfo("vgId:%d, wait for vnode sync rd queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncRdW.queue,
14,498!
386
        taosQueueGetThreadId(pVnode->pSyncRdW.queue));
387
  tMultiWorkerCleanup(&pVnode->pSyncRdW);
14,498✔
388

389
  dInfo("vgId:%d, wait for vnode apply queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue,
14,498!
390
        taosQueueGetThreadId(pVnode->pApplyW.queue));
391
  tMultiWorkerCleanup(&pVnode->pApplyW);
14,498✔
392

393
  dInfo("vgId:%d, wait for vnode fetch queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
14,498!
394
        taosQueueGetThreadId(pVnode->pFetchQ));
395
  while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
14,498!
396

397
  dInfo("vgId:%d, wait for vnode query queue:%p is empty", pVnode->vgId, pVnode->pQueryQ);
14,498!
398
  while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
14,505✔
399

400
  tqNotifyClose(pVnode->pImpl->pTq);
14,498✔
401

402
  dInfo("vgId:%d, wait for vnode stream queue:%p is empty, %d remains", pVnode->vgId,
14,498!
403
        pVnode->pStreamQ, taosQueueItemSize(pVnode->pStreamQ));
404
  while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(50);
14,498!
405

406
  dInfo("vgId:%d, wait for vnode stream ctrl queue:%p is empty", pVnode->vgId, pVnode->pStreamCtrlQ);
14,498!
407
  while (!taosQueueEmpty(pVnode->pStreamCtrlQ)) taosMsleep(50);
14,498!
408

409
  dInfo("vgId:%d, wait for vnode stream long-exec queue:%p is empty, %d remains", pVnode->vgId,
14,498!
410
        pVnode->pStreamLongExecQ, taosQueueItemSize(pVnode->pStreamLongExecQ));
411
  while (!taosQueueEmpty(pVnode->pStreamLongExecQ)) taosMsleep(50);
14,498!
412

413
  dInfo("vgId:%d, wait for vnode stream chkpt queue:%p is empty", pVnode->vgId, pVnode->pStreamChkQ);
14,498!
414
  while (!taosQueueEmpty(pVnode->pStreamChkQ)) taosMsleep(10);
14,498!
415

416
  dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId);
14,498!
417

418
  dInfo("vgId:%d, post close", pVnode->vgId);
14,498!
419
  vnodePostClose(pVnode->pImpl);
14,498✔
420

421
  vmFreeQueue(pMgmt, pVnode);
14,498✔
422

423
  if (commitAndRemoveWal) {
14,498✔
424
    dInfo("vgId:%d, commit data for vnode split", pVnode->vgId);
42!
425
    if (vnodeSyncCommit(pVnode->pImpl) != 0) {
42!
426
      dError("vgId:%d, failed to commit data", pVnode->vgId);
×
427
    }
428
    if (vnodeBegin(pVnode->pImpl) != 0) {
42!
UNCOV
429
      dError("vgId:%d, failed to begin", pVnode->vgId);
×
430
    }
431
    dInfo("vgId:%d, commit data finished", pVnode->vgId);
42!
432
  }
433

434
  int32_t nodeId = vnodeNodeId(pVnode->pImpl);
14,498✔
435
  vnodeClose(pVnode->pImpl);
14,497✔
436
  pVnode->pImpl = NULL;
14,498✔
437

438
_closed:
14,498✔
439
  dInfo("vgId:%d, vnode is closed", pVnode->vgId);
14,498!
440

441
  if (commitAndRemoveWal) {
14,498✔
442
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d%swal", TD_DIRSEP, pVnode->vgId, TD_DIRSEP);
42✔
443
    dInfo("vgId:%d, remove all wals, path:%s", pVnode->vgId, path);
42!
444
    if (tfsRmdir(pMgmt->pTfs, path) != 0) {
42!
445
      dTrace("vgId:%d, failed to remove wals, path:%s", pVnode->vgId, path);
×
446
    }
447
    if (tfsMkdir(pMgmt->pTfs, path) != 0) {
42!
UNCOV
448
      dTrace("vgId:%d, failed to create wals, path:%s", pVnode->vgId, path);
×
449
    }
450
  }
451

452
  if (pVnode->dropped) {
14,498✔
453
    dInfo("vgId:%d, vnode is destroyed, dropped:%d", pVnode->vgId, pVnode->dropped);
5,189!
454
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId);
5,189✔
455
    vnodeDestroy(pVnode->vgId, path, pMgmt->pTfs, nodeId);
5,189✔
456
  }
457

458
  vmFreeVnodeObj(&pVnode);
14,498✔
459
}
460

461
void vmCloseFailedVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
×
462
  int32_t r = 0;
×
UNCOV
463
  r = taosThreadRwlockWrlock(&pMgmt->hashLock);
×
464
  if (r != 0) {
×
465
    dError("vgId:%d, failed to lock since %s", vgId, tstrerror(r));
×
466
  }
467
  if (r == 0) {
×
468
    vmUnRegisterRunningState(pMgmt, vgId);
×
469
  }
UNCOV
470
  r = taosThreadRwlockUnlock(&pMgmt->hashLock);
×
471
  if (r != 0) {
×
UNCOV
472
    dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r));
×
473
  }
474
}
×
475

476
static int32_t vmRestoreVgroupId(SWrapperCfg *pCfg, STfs *pTfs) {
×
UNCOV
477
  int32_t srcVgId = pCfg->vgId;
×
UNCOV
478
  int32_t dstVgId = pCfg->toVgId;
×
UNCOV
479
  if (dstVgId == 0) return 0;
×
480

481
  char srcPath[TSDB_FILENAME_LEN];
482
  char dstPath[TSDB_FILENAME_LEN];
483

484
  snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
×
485
  snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
×
486

487
  int32_t diskPrimary = pCfg->diskPrimary;
×
488
  int32_t vgId = vnodeRestoreVgroupId(srcPath, dstPath, srcVgId, dstVgId, diskPrimary, pTfs);
×
UNCOV
489
  if (vgId <= 0) {
×
UNCOV
490
    dError("vgId:%d, failed to restore vgroup id. srcPath: %s", pCfg->vgId, srcPath);
×
491
    return -1;
×
492
  }
493

UNCOV
494
  pCfg->vgId = vgId;
×
UNCOV
495
  pCfg->toVgId = 0;
×
UNCOV
496
  return 0;
×
497
}
498

499
static void *vmOpenVnodeInThread(void *param) {
1,154✔
500
  SVnodeThread *pThread = param;
1,154✔
501
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
1,154✔
502
  char          path[TSDB_FILENAME_LEN];
503

504
  dInfo("thread:%d, start to open or destroy %d vnodes", pThread->threadIndex, pThread->vnodeNum);
1,154!
505
  setThreadName("open-vnodes");
1,154✔
506

507
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
2,310✔
508
    SWrapperCfg *pCfg = &pThread->pCfgs[v];
1,156✔
509
    if (pCfg->dropped) {
1,156!
510
      char stepDesc[TSDB_STEP_DESC_LEN] = {0};
×
UNCOV
511
      snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to destroy, %d of %d have been dropped", pCfg->vgId,
×
512
               pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
513
      tmsgReportStartup("vnode-destroy", stepDesc);
×
514

515
      snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
×
516
      vnodeDestroy(pCfg->vgId, path, pMgmt->pTfs, 0);
×
517
      pThread->updateVnodesList = true;
×
UNCOV
518
      pThread->dropped++;
×
UNCOV
519
      (void)atomic_add_fetch_32(&pMgmt->state.dropVnodes, 1);
×
UNCOV
520
      continue;
×
521
    }
522

523
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
1,156✔
524
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
1,156✔
525
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
526
    tmsgReportStartup("vnode-open", stepDesc);
1,156✔
527

528
    if (pCfg->toVgId) {
1,156!
529
      if (vmRestoreVgroupId(pCfg, pMgmt->pTfs) != 0) {
×
UNCOV
530
        dError("vgId:%d, failed to restore vgroup id by thread:%d", pCfg->vgId, pThread->threadIndex);
×
531
        pThread->failed++;
×
UNCOV
532
        continue;
×
533
      }
UNCOV
534
      pThread->updateVnodesList = true;
×
535
    }
536

537
    int32_t diskPrimary = pCfg->diskPrimary;
1,156✔
538
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
1,156✔
539

540
    SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, false);
1,156✔
541

542
    if (pImpl == NULL) {
1,156!
543
      dError("vgId:%d, failed to open vnode by thread:%d since %s", pCfg->vgId, pThread->threadIndex, terrstr());
×
UNCOV
544
      if (terrno != TSDB_CODE_NEED_RETRY) {
×
UNCOV
545
        pThread->failed++;
×
UNCOV
546
        continue;
×
547
      }
548
    }
549

550
    if (pImpl != NULL) {
1,156!
551
      if (vmOpenVnode(pMgmt, pCfg, pImpl) != 0) {
1,156!
UNCOV
552
        dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
×
UNCOV
553
        pThread->failed++;
×
UNCOV
554
        continue;
×
555
      }
556
    }
557

558
    dInfo("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
1,156!
559
    pThread->opened++;
1,156✔
560
    (void)atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
1,156✔
561
  }
562

563
  dInfo("thread:%d, numOfVnodes:%d, opened:%d dropped:%d failed:%d", pThread->threadIndex, pThread->vnodeNum,
1,154!
564
        pThread->opened, pThread->dropped, pThread->failed);
565
  return NULL;
1,154✔
566
}
567

568
static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
2,689✔
569
  pMgmt->runngingHash =
2,689✔
570
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
2,689✔
571
  if (pMgmt->runngingHash == NULL) {
2,689!
UNCOV
572
    dError("failed to init vnode hash since %s", terrstr());
×
UNCOV
573
    return TSDB_CODE_OUT_OF_MEMORY;
×
574
  }
575

576
  pMgmt->closedHash =
2,689✔
577
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
2,689✔
578
  if (pMgmt->closedHash == NULL) {
2,689!
UNCOV
579
    dError("failed to init vnode closed hash since %s", terrstr());
×
UNCOV
580
    return TSDB_CODE_OUT_OF_MEMORY;
×
581
  }
582

583
  pMgmt->creatingHash =
2,689✔
584
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
2,689✔
585
  if (pMgmt->creatingHash == NULL) {
2,689!
UNCOV
586
    dError("failed to init vnode creatingHash hash since %s", terrstr());
×
UNCOV
587
    return TSDB_CODE_OUT_OF_MEMORY;
×
588
  }
589

590
  SWrapperCfg *pCfgs = NULL;
2,689✔
591
  int32_t      numOfVnodes = 0;
2,689✔
592
  int32_t      code = 0;
2,689✔
593
  if ((code = vmGetVnodeListFromFile(pMgmt, &pCfgs, &numOfVnodes)) != 0) {
2,689!
UNCOV
594
    dInfo("failed to get vnode list from disk since %s", tstrerror(code));
×
UNCOV
595
    return code;
×
596
  }
597

598
  pMgmt->state.totalVnodes = numOfVnodes;
2,689✔
599

600
  int32_t threadNum = tsNumOfCores / 2;
2,689✔
601
  if (threadNum < 1) threadNum = 1;
2,689!
602
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
2,689✔
603

604
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
2,689!
605
  if (threads == NULL) {
2,689!
UNCOV
606
    dError("failed to allocate memory for threads since %s", terrstr());
×
UNCOV
607
    taosMemoryFree(pCfgs);
×
UNCOV
608
    return terrno;
×
609
  }
610

611
  for (int32_t t = 0; t < threadNum; ++t) {
56,469✔
612
    threads[t].threadIndex = t;
53,780✔
613
    threads[t].pMgmt = pMgmt;
53,780✔
614
    threads[t].pCfgs = taosMemoryCalloc(vnodesPerThread, sizeof(SWrapperCfg));
53,780!
615
  }
616

617
  for (int32_t v = 0; v < numOfVnodes; ++v) {
3,845✔
618
    int32_t       t = v % threadNum;
1,156✔
619
    SVnodeThread *pThread = &threads[t];
1,156✔
620
    pThread->pCfgs[pThread->vnodeNum++] = pCfgs[v];
1,156✔
621
  }
622

623
  dInfo("open %d vnodes with %d threads", numOfVnodes, threadNum);
2,689!
624

625
  for (int32_t t = 0; t < threadNum; ++t) {
56,469✔
626
    SVnodeThread *pThread = &threads[t];
53,780✔
627
    if (pThread->vnodeNum == 0) continue;
53,780✔
628

629
    TdThreadAttr thAttr;
630
    (void)taosThreadAttrInit(&thAttr);
1,154✔
631
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,154✔
632
#ifdef TD_COMPACT_OS
633
    (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
634
#endif
635
    if (taosThreadCreate(&pThread->thread, &thAttr, vmOpenVnodeInThread, pThread) != 0) {
1,154!
UNCOV
636
      dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(ERRNO));
×
637
    }
638

639
    (void)taosThreadAttrDestroy(&thAttr);
1,154✔
640
  }
641

642
  bool updateVnodesList = false;
2,689✔
643

644
  for (int32_t t = 0; t < threadNum; ++t) {
56,469✔
645
    SVnodeThread *pThread = &threads[t];
53,780✔
646
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
53,780!
647
      (void)taosThreadJoin(pThread->thread, NULL);
1,154✔
648
      taosThreadClear(&pThread->thread);
1,154✔
649
    }
650
    taosMemoryFree(pThread->pCfgs);
53,780!
651
    if (pThread->updateVnodesList) updateVnodesList = true;
53,780!
652
  }
653
  taosMemoryFree(threads);
2,689!
654
  taosMemoryFree(pCfgs);
2,689!
655

656
  if ((pMgmt->state.openVnodes + pMgmt->state.dropVnodes) != pMgmt->state.totalVnodes) {
2,689!
657
    dError("there are total vnodes:%d, opened:%d", pMgmt->state.totalVnodes, pMgmt->state.openVnodes);
×
658
    return terrno = TSDB_CODE_VND_INIT_FAILED;
×
659
  }
660

661
  if (updateVnodesList && (code = vmWriteVnodeListToFile(pMgmt)) != 0) {
2,689!
UNCOV
662
    dError("failed to write vnode list since %s", tstrerror(code));
×
UNCOV
663
    return code;
×
664
  }
665

666
  dInfo("successfully opened %d vnodes", pMgmt->state.totalVnodes);
2,689!
667
  return 0;
2,689✔
668
}
669

670
static void *vmCloseVnodeInThread(void *param) {
7,495✔
671
  SVnodeThread *pThread = param;
7,495✔
672
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
7,495✔
673

674
  dInfo("thread:%d, start to close %d vnodes", pThread->threadIndex, pThread->vnodeNum);
7,495!
675
  setThreadName("close-vnodes");
7,495✔
676

677
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
15,070✔
678
    SVnodeObj *pVnode = pThread->ppVnodes[v];
7,573✔
679

680
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
7,573✔
681
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to close, %d of %d have been closed", pVnode->vgId,
7,573✔
682
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
683
    tmsgReportStartup("vnode-close", stepDesc);
7,573✔
684

685
    vmCloseVnode(pMgmt, pVnode, false, false);
7,574✔
686
  }
687

688
  dInfo("thread:%d, numOfVnodes:%d is closed", pThread->threadIndex, pThread->vnodeNum);
7,497!
689
  return NULL;
7,498✔
690
}
691

692
static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
2,689✔
693
  int32_t code = 0;
2,689✔
694
  dInfo("start to close all vnodes");
2,689!
695
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
2,689✔
696
  dInfo("vnodes mgmt worker is stopped");
2,689!
697
  tSingleWorkerCleanup(&pMgmt->mgmtMultiWorker);
2,689✔
698
  dInfo("vnodes multiple mgmt worker is stopped");
2,689!
699

700
  int32_t     numOfVnodes = 0;
2,689✔
701
  SVnodeObj **ppVnodes = NULL;
2,689✔
702
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
2,689✔
703
  if (code != 0) {
2,689!
UNCOV
704
    dError("failed to get vnode list since %s", tstrerror(code));
×
UNCOV
705
    return;
×
706
  }
707

708
  int32_t threadNum = tsNumOfCores / 2;
2,689✔
709
  if (threadNum < 1) threadNum = 1;
2,689!
710
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
2,689✔
711

712
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
2,689!
713
  for (int32_t t = 0; t < threadNum; ++t) {
56,469✔
714
    threads[t].threadIndex = t;
53,780✔
715
    threads[t].pMgmt = pMgmt;
53,780✔
716
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
53,780!
717
  }
718

719
  for (int32_t v = 0; v < numOfVnodes; ++v) {
10,263✔
720
    int32_t       t = v % threadNum;
7,574✔
721
    SVnodeThread *pThread = &threads[t];
7,574✔
722
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
7,574!
723
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
7,574✔
724
    }
725
  }
726

727
  pMgmt->state.openVnodes = 0;
2,689✔
728
  dInfo("close %d vnodes with %d threads", numOfVnodes, threadNum);
2,689!
729

730
  int64_t st = taosGetTimestampMs();
2,689✔
731
  dInfo("notify all streams closed in all %d vnodes, ts:%" PRId64, numOfVnodes, st);
2,689!
732
  if (ppVnodes != NULL) {
2,689!
733
    for (int32_t i = 0; i < numOfVnodes; ++i) {
10,263✔
734
      if (ppVnodes[i] != NULL) {
7,574!
735
        if (ppVnodes[i]->pImpl != NULL) {
7,574!
736
          tqNotifyClose(ppVnodes[i]->pImpl->pTq);
7,574✔
737
        }
738
      }
739
    }
740
  }
741

742
  int64_t et = taosGetTimestampMs();
2,689✔
743
  dInfo("notify close stream completed in %d vnodes, elapsed time: %" PRId64 "ms", numOfVnodes, et - st);
2,689!
744

745
  for (int32_t t = 0; t < threadNum; ++t) {
56,469✔
746
    SVnodeThread *pThread = &threads[t];
53,780✔
747
    if (pThread->vnodeNum == 0) continue;
53,780✔
748

749
    TdThreadAttr thAttr;
750
    (void)taosThreadAttrInit(&thAttr);
7,498✔
751
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
7,498✔
752
#ifdef TD_COMPACT_OS
753
    (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
754
#endif
755
    if (taosThreadCreate(&pThread->thread, &thAttr, vmCloseVnodeInThread, pThread) != 0) {
7,498!
UNCOV
756
      dError("thread:%d, failed to create thread to close vnode since %s", pThread->threadIndex, strerror(ERRNO));
×
757
    }
758

759
    (void)taosThreadAttrDestroy(&thAttr);
7,498✔
760
  }
761

762
  for (int32_t t = 0; t < threadNum; ++t) {
56,469✔
763
    SVnodeThread *pThread = &threads[t];
53,780✔
764
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
53,780!
765
      (void)taosThreadJoin(pThread->thread, NULL);
7,498✔
766
      taosThreadClear(&pThread->thread);
7,498✔
767
    }
768
    taosMemoryFree(pThread->ppVnodes);
53,780!
769
  }
770
  taosMemoryFree(threads);
2,689!
771

772
  if (ppVnodes != NULL) {
2,689!
773
    taosMemoryFree(ppVnodes);
2,689!
774
  }
775

776
  if (pMgmt->runngingHash != NULL) {
2,689!
777
    taosHashCleanup(pMgmt->runngingHash);
2,689✔
778
    pMgmt->runngingHash = NULL;
2,689✔
779
  }
780

781
  void *pIter = taosHashIterate(pMgmt->closedHash, NULL);
2,689✔
782
  while (pIter) {
2,689!
UNCOV
783
    SVnodeObj **ppVnode = pIter;
×
UNCOV
784
    vmFreeVnodeObj(ppVnode);
×
UNCOV
785
    pIter = taosHashIterate(pMgmt->closedHash, pIter);
×
786
  }
787

788
  if (pMgmt->closedHash != NULL) {
2,689!
789
    taosHashCleanup(pMgmt->closedHash);
2,689✔
790
    pMgmt->closedHash = NULL;
2,689✔
791
  }
792

793
  pIter = taosHashIterate(pMgmt->creatingHash, NULL);
2,689✔
794
  while (pIter) {
2,689!
UNCOV
795
    SVnodeObj **ppVnode = pIter;
×
UNCOV
796
    vmFreeVnodeObj(ppVnode);
×
UNCOV
797
    pIter = taosHashIterate(pMgmt->creatingHash, pIter);
×
798
  }
799

800
  if (pMgmt->creatingHash != NULL) {
2,689!
801
    taosHashCleanup(pMgmt->creatingHash);
2,689✔
802
    pMgmt->creatingHash = NULL;
2,689✔
803
  }
804

805
  dInfo("total vnodes:%d are all closed", numOfVnodes);
2,689!
806
}
807

808
static void vmCleanup(SVnodeMgmt *pMgmt) {
2,689✔
809
  vmCloseVnodes(pMgmt);
2,689✔
810
  vmStopWorker(pMgmt);
2,689✔
811
  vnodeCleanup();
2,689✔
812
  (void)taosThreadRwlockDestroy(&pMgmt->hashLock);
2,689✔
813
  (void)taosThreadMutexDestroy(&pMgmt->mutex);
2,689✔
814
  (void)taosThreadMutexDestroy(&pMgmt->fileLock);
2,689✔
815
  taosMemoryFree(pMgmt);
2,689!
816
}
2,689✔
817

818
static void vmCheckSyncTimeout(SVnodeMgmt *pMgmt) {
3,062✔
819
  int32_t     code = 0;
3,062✔
820
  int32_t     numOfVnodes = 0;
3,062✔
821
  SVnodeObj **ppVnodes = NULL;
3,062✔
822
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
3,062✔
823
  if (code != 0) {
3,062!
UNCOV
824
    dError("failed to get vnode list since %s", tstrerror(code));
×
UNCOV
825
    return;
×
826
  }
827

828
  if (ppVnodes != NULL) {
3,062!
829
    for (int32_t i = 0; i < numOfVnodes; ++i) {
25,292✔
830
      SVnodeObj *pVnode = ppVnodes[i];
22,230✔
831
      if (!pVnode->failed) {
22,230!
832
        vnodeSyncCheckTimeout(pVnode->pImpl);
22,230✔
833
      }
834
      vmReleaseVnode(pMgmt, pVnode);
22,230✔
835
    }
836
    taosMemoryFree(ppVnodes);
3,062!
837
  }
838
}
839

840
static void *vmThreadFp(void *param) {
2,689✔
841
  SVnodeMgmt *pMgmt = param;
2,689✔
842
  int64_t     lastTime = 0;
2,689✔
843
  setThreadName("vnode-timer");
2,689✔
844

845
  while (1) {
1,212,643✔
846
    lastTime++;
1,215,332✔
847
    taosMsleep(100);
1,215,332✔
848
    if (pMgmt->stop) break;
1,215,332✔
849
    if (lastTime % 10 != 0) continue;
1,212,643✔
850

851
    int64_t sec = lastTime / 10;
120,090✔
852
    if (sec % (VNODE_TIMEOUT_SEC / 2) == 0) {
120,090✔
853
      vmCheckSyncTimeout(pMgmt);
3,062✔
854
    }
855
  }
856

857
  return NULL;
2,689✔
858
}
859

860
static int32_t vmInitTimer(SVnodeMgmt *pMgmt) {
2,689✔
861
  int32_t      code = 0;
2,689✔
862
  TdThreadAttr thAttr;
863
  (void)taosThreadAttrInit(&thAttr);
2,689✔
864
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
2,689✔
865
#ifdef TD_COMPACT_OS
866
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
867
#endif
868
  if (taosThreadCreate(&pMgmt->thread, &thAttr, vmThreadFp, pMgmt) != 0) {
2,689!
UNCOV
869
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
UNCOV
870
    dError("failed to create vnode timer thread since %s", tstrerror(code));
×
UNCOV
871
    return code;
×
872
  }
873

874
  (void)taosThreadAttrDestroy(&thAttr);
2,689✔
875
  return 0;
2,689✔
876
}
877

878
static void vmCleanupTimer(SVnodeMgmt *pMgmt) {
2,689✔
879
  pMgmt->stop = true;
2,689✔
880
  if (taosCheckPthreadValid(pMgmt->thread)) {
2,689!
881
    (void)taosThreadJoin(pMgmt->thread, NULL);
2,689✔
882
    taosThreadClear(&pMgmt->thread);
2,689✔
883
  }
884
}
2,689✔
885

886
static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
2,689✔
887
  int32_t code = -1;
2,689✔
888

889
  SVnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SVnodeMgmt));
2,689!
890
  if (pMgmt == NULL) {
2,689!
UNCOV
891
    code = terrno;
×
UNCOV
892
    goto _OVER;
×
893
  }
894

895
  pMgmt->pData = pInput->pData;
2,689✔
896
  pMgmt->path = pInput->path;
2,689✔
897
  pMgmt->name = pInput->name;
2,689✔
898
  pMgmt->msgCb = pInput->msgCb;
2,689✔
899
  pMgmt->msgCb.putToQueueFp = (PutToQueueFp)vmPutRpcMsgToQueue;
2,689✔
900
  pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
2,689✔
901
  pMgmt->msgCb.mgmt = pMgmt;
2,689✔
902

903
  code = taosThreadRwlockInit(&pMgmt->hashLock, NULL);
2,689✔
904
  if (code != 0) {
2,689!
UNCOV
905
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
906
    goto _OVER;
×
907
  }
908

909
  code = taosThreadMutexInit(&pMgmt->mutex, NULL);
2,689✔
910
  if (code != 0) {
2,689!
UNCOV
911
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
912
    goto _OVER;
×
913
  }
914

915
  code = taosThreadMutexInit(&pMgmt->fileLock, NULL);
2,689✔
916
  if (code != 0) {
2,689!
UNCOV
917
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
918
    goto _OVER;
×
919
  }
920

921
  pMgmt->pTfs = pInput->pTfs;
2,689✔
922
  if (pMgmt->pTfs == NULL) {
2,689!
923
    dError("tfs is null.");
×
924
    goto _OVER;
×
925
  }
926
  tmsgReportStartup("vnode-tfs", "initialized");
2,689✔
927
  if ((code = walInit(pInput->stopDnodeFp)) != 0) {
2,689!
UNCOV
928
    dError("failed to init wal since %s", tstrerror(code));
×
UNCOV
929
    goto _OVER;
×
930
  }
931

932
  tmsgReportStartup("vnode-wal", "initialized");
2,689✔
933

934
  if ((code = syncInit()) != 0) {
2,689!
UNCOV
935
    dError("failed to open sync since %s", tstrerror(code));
×
936
    goto _OVER;
×
937
  }
938
  tmsgReportStartup("vnode-sync", "initialized");
2,689✔
939

940
  if ((code = vnodeInit(pInput->stopDnodeFp)) != 0) {
2,689!
UNCOV
941
    dError("failed to init vnode since %s", tstrerror(code));
×
942
    goto _OVER;
×
943
  }
944
  tmsgReportStartup("vnode-commit", "initialized");
2,689✔
945

946
  if ((code = vmStartWorker(pMgmt)) != 0) {
2,689!
UNCOV
947
    dError("failed to init workers since %s", tstrerror(code));
×
948
    goto _OVER;
×
949
  }
950
  tmsgReportStartup("vnode-worker", "initialized");
2,689✔
951

952
  if ((code = vmOpenVnodes(pMgmt)) != 0) {
2,689!
UNCOV
953
    dError("failed to open all vnodes since %s", tstrerror(code));
×
954
    goto _OVER;
×
955
  }
956
  tmsgReportStartup("vnode-vnodes", "initialized");
2,689✔
957

958
  if ((code = udfcOpen()) != 0) {
2,689!
UNCOV
959
    dError("failed to open udfc in vnode since %s", tstrerror(code));
×
UNCOV
960
    goto _OVER;
×
961
  }
962

963
  code = 0;
2,689✔
964

965
_OVER:
2,689✔
966
  if (code == 0) {
2,689!
967
    pOutput->pMgmt = pMgmt;
2,689✔
968
  } else {
UNCOV
969
    dError("failed to init vnodes-mgmt since %s", tstrerror(code));
×
UNCOV
970
    vmCleanup(pMgmt);
×
971
  }
972

973
  return code;
2,689✔
974
}
975

976
static int32_t vmRequire(const SMgmtInputOpt *pInput, bool *required) {
2,726✔
977
  *required = tsNumOfSupportVnodes > 0;
2,726✔
978
  return 0;
2,726✔
979
}
980

981
static void *vmRestoreVnodeInThread(void *param) {
1,154✔
982
  SVnodeThread *pThread = param;
1,154✔
983
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
1,154✔
984

985
  dInfo("thread:%d, start to restore %d vnodes", pThread->threadIndex, pThread->vnodeNum);
1,154!
986
  setThreadName("restore-vnodes");
1,154✔
987

988
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
2,309✔
989
    SVnodeObj *pVnode = pThread->ppVnodes[v];
1,156✔
990
    if (pVnode->failed) {
1,156!
UNCOV
991
      dError("vgId:%d, cannot restore a vnode in failed mode.", pVnode->vgId);
×
UNCOV
992
      continue;
×
993
    }
994

995
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
1,156✔
996
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been restored", pVnode->vgId,
1,156✔
997
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
998
    tmsgReportStartup("vnode-restore", stepDesc);
1,156✔
999

1000
    int32_t code = vnodeStart(pVnode->pImpl);
1,155✔
1001
    if (code != 0) {
1,156!
UNCOV
1002
      dError("vgId:%d, failed to restore vnode by thread:%d", pVnode->vgId, pThread->threadIndex);
×
UNCOV
1003
      pThread->failed++;
×
1004
    } else {
1005
      dInfo("vgId:%d, is restored by thread:%d", pVnode->vgId, pThread->threadIndex);
1,156!
1006
      pThread->opened++;
1,156✔
1007
      (void)atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
1,156✔
1008
    }
1009
  }
1010

1011
  dInfo("thread:%d, numOfVnodes:%d, restored:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
1,153!
1012
        pThread->failed);
1013
  return NULL;
1,154✔
1014
}
1015

1016
static int32_t vmStartVnodes(SVnodeMgmt *pMgmt) {
2,689✔
1017
  int32_t     code = 0;
2,689✔
1018
  int32_t     numOfVnodes = 0;
2,689✔
1019
  SVnodeObj **ppVnodes = NULL;
2,689✔
1020
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
2,689✔
1021
  if (code != 0) {
2,689!
UNCOV
1022
    dError("failed to get vnode list since %s", tstrerror(code));
×
UNCOV
1023
    return code;
×
1024
  }
1025

1026
  int32_t threadNum = tsNumOfCores / 2;
2,689✔
1027
  if (threadNum < 1) threadNum = 1;
2,689!
1028
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
2,689✔
1029

1030
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
2,689!
1031
  if (threads == NULL) {
2,689!
UNCOV
1032
    return terrno;
×
1033
  }
1034

1035
  for (int32_t t = 0; t < threadNum; ++t) {
56,469✔
1036
    threads[t].threadIndex = t;
53,780✔
1037
    threads[t].pMgmt = pMgmt;
53,780✔
1038
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
53,780!
1039
    if (threads[t].ppVnodes == NULL) {
53,780!
UNCOV
1040
      code = terrno;
×
UNCOV
1041
      break;
×
1042
    }
1043
  }
1044

1045
  for (int32_t v = 0; v < numOfVnodes; ++v) {
3,845✔
1046
    int32_t       t = v % threadNum;
1,156✔
1047
    SVnodeThread *pThread = &threads[t];
1,156✔
1048
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
1,156!
1049
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
1,156✔
1050
    }
1051
  }
1052

1053
  pMgmt->state.openVnodes = 0;
2,689✔
1054
  pMgmt->state.dropVnodes = 0;
2,689✔
1055
  dInfo("restore %d vnodes with %d threads", numOfVnodes, threadNum);
2,689!
1056

1057
  for (int32_t t = 0; t < threadNum; ++t) {
56,469✔
1058
    SVnodeThread *pThread = &threads[t];
53,780✔
1059
    if (pThread->vnodeNum == 0) continue;
53,780✔
1060

1061
    TdThreadAttr thAttr;
1062
    (void)taosThreadAttrInit(&thAttr);
1,154✔
1063
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,154✔
1064
    if (taosThreadCreate(&pThread->thread, &thAttr, vmRestoreVnodeInThread, pThread) != 0) {
1,154!
UNCOV
1065
      dError("thread:%d, failed to create thread to restore vnode since %s", pThread->threadIndex, strerror(ERRNO));
×
1066
    }
1067

1068
    (void)taosThreadAttrDestroy(&thAttr);
1,154✔
1069
  }
1070

1071
  for (int32_t t = 0; t < threadNum; ++t) {
56,469✔
1072
    SVnodeThread *pThread = &threads[t];
53,780✔
1073
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
53,780!
1074
      (void)taosThreadJoin(pThread->thread, NULL);
1,154✔
1075
      taosThreadClear(&pThread->thread);
1,154✔
1076
    }
1077
    taosMemoryFree(pThread->ppVnodes);
53,780!
1078
  }
1079
  taosMemoryFree(threads);
2,689!
1080

1081
  for (int32_t i = 0; i < numOfVnodes; ++i) {
3,845✔
1082
    if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
1,156!
1083
    vmReleaseVnode(pMgmt, ppVnodes[i]);
1,156✔
1084
  }
1085

1086
  if (ppVnodes != NULL) {
2,689!
1087
    taosMemoryFree(ppVnodes);
2,689!
1088
  }
1089

1090
  return vmInitTimer(pMgmt);
2,689✔
1091

1092
_exit:
1093
  for (int32_t t = 0; t < threadNum; ++t) {
1094
    SVnodeThread *pThread = &threads[t];
1095
    taosMemoryFree(pThread->ppVnodes);
1096
  }
1097
  taosMemoryFree(threads);
1098
  return code;
1099
}
1100

1101
static void vmStop(SVnodeMgmt *pMgmt) { vmCleanupTimer(pMgmt); }
2,689✔
1102

1103
SMgmtFunc vmGetMgmtFunc() {
2,726✔
1104
  SMgmtFunc mgmtFunc = {0};
2,726✔
1105
  mgmtFunc.openFp = vmInit;
2,726✔
1106
  mgmtFunc.closeFp = (NodeCloseFp)vmCleanup;
2,726✔
1107
  mgmtFunc.startFp = (NodeStartFp)vmStartVnodes;
2,726✔
1108
  mgmtFunc.stopFp = (NodeStopFp)vmStop;
2,726✔
1109
  mgmtFunc.requiredFp = vmRequire;
2,726✔
1110
  mgmtFunc.getHandlesFp = vmGetMsgHandles;
2,726✔
1111

1112
  return mgmtFunc;
2,726✔
1113
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc