• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4513

17 Jul 2025 02:02AM UTC coverage: 31.359% (-31.1%) from 62.446%
#4513

push

travis-ci

web-flow
Merge pull request #31914 from taosdata/fix/3.0/compare-ans-failed

fix:Convert line endings from LF to CRLF for ans file

68541 of 301034 branches covered (22.77%)

Branch coverage included in aggregate %.

117356 of 291771 relevant lines covered (40.22%)

602262.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

48.09
/source/dnode/mgmt/mgmt_vnode/src/vmInt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "vmInt.h"
18
#include "libs/function/tudf.h"
19
#include "osMemory.h"
20
#include "tfs.h"
21
#include "vnd.h"
22

23
int32_t vmGetPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
40✔
24
  int32_t    diskId = -1;
40✔
25
  SVnodeObj *pVnode = NULL;
40✔
26

27
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
40✔
28
  int32_t r = taosHashGetDup(pMgmt->runngingHash, &vgId, sizeof(int32_t), (void *)&pVnode);
40✔
29
  if (pVnode != NULL) {
40!
30
    diskId = pVnode->diskPrimary;
×
31
  }
32
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
40✔
33
  return diskId;
40✔
34
}
35

36
static void vmFreeVnodeObj(SVnodeObj **ppVnode) {
93✔
37
  if (!ppVnode || !(*ppVnode)) return;
93!
38

39
  SVnodeObj *pVnode = *ppVnode;
93✔
40

41
  int32_t refCount = atomic_load_32(&pVnode->refCount);
93✔
42
  while (refCount > 0) {
93!
43
    dWarn("vgId:%d, vnode is refenced, retry to free in 200ms, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
×
44
    taosMsleep(200);
×
45
    refCount = atomic_load_32(&pVnode->refCount);
×
46
  }
47

48
  taosMemoryFree(pVnode->path);
93!
49
  taosMemoryFree(pVnode);
93!
50
  ppVnode[0] = NULL;
93✔
51
}
52

53
static int32_t vmRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId, int32_t diskId) {
40✔
54
  int32_t    code = 0;
40✔
55
  SVnodeObj *pCreatingVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
40!
56
  if (pCreatingVnode == NULL) {
40!
57
    dError("failed to alloc vnode since %s", terrstr());
×
58
    return terrno;
×
59
  }
60
  (void)memset(pCreatingVnode, 0, sizeof(SVnodeObj));
40✔
61

62
  pCreatingVnode->vgId = vgId;
40✔
63
  pCreatingVnode->diskPrimary = diskId;
40✔
64

65
  code = taosThreadRwlockWrlock(&pMgmt->hashLock);
40✔
66
  if (code != 0) {
40!
67
    taosMemoryFree(pCreatingVnode);
×
68
    return code;
×
69
  }
70

71
  dTrace("vgId:%d, put vnode into creating hash, pCreatingVnode:%p", vgId, pCreatingVnode);
40✔
72
  code = taosHashPut(pMgmt->creatingHash, &vgId, sizeof(int32_t), &pCreatingVnode, sizeof(SVnodeObj *));
40✔
73
  if (code != 0) {
40!
74
    dError("vgId:%d, failed to put vnode to creatingHash", vgId);
×
75
    taosMemoryFree(pCreatingVnode);
×
76
  }
77

78
  int32_t r = taosThreadRwlockUnlock(&pMgmt->hashLock);
40✔
79
  if (r != 0) {
40!
80
    dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r));
×
81
  }
82

83
  return code;
40✔
84
}
85

86
static void vmUnRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId) {
40✔
87
  SVnodeObj *pOld = NULL;
40✔
88

89
  (void)taosThreadRwlockWrlock(&pMgmt->hashLock);
40✔
90
  int32_t r = taosHashGetDup(pMgmt->creatingHash, &vgId, sizeof(int32_t), (void *)&pOld);
40✔
91
  if (r != 0) {
40!
92
    dError("vgId:%d, failed to get vnode from creating Hash", vgId);
×
93
  }
94
  dTrace("vgId:%d, remove from creating Hash", vgId);
40✔
95
  r = taosHashRemove(pMgmt->creatingHash, &vgId, sizeof(int32_t));
40✔
96
  if (r != 0) {
40!
97
    dError("vgId:%d, failed to remove vnode from creatingHash", vgId);
×
98
  }
99
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
40✔
100

101
  if (pOld) {
40!
102
    dTrace("vgId:%d, free vnode pOld:%p", vgId, &pOld);
40✔
103
    vmFreeVnodeObj(&pOld);
40✔
104
  }
105

106
_OVER:
×
107
  if (r != 0) {
40!
108
    dError("vgId:%d, failed to remove vnode from creatingHash since %s", vgId, tstrerror(r));
×
109
  }
110
}
40✔
111

112
int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
40✔
113
  int32_t code = 0;
40✔
114
  STfs   *pTfs = pMgmt->pTfs;
40✔
115
  int32_t diskId = 0;
40✔
116
  if (!pTfs) {
40!
117
    return diskId;
×
118
  }
119

120
  // search fs
121
  char vnodePath[TSDB_FILENAME_LEN] = {0};
40✔
122
  snprintf(vnodePath, TSDB_FILENAME_LEN - 1, "vnode%svnode%d", TD_DIRSEP, vgId);
40✔
123
  char fname[TSDB_FILENAME_LEN] = {0};
40✔
124
  char fnameTmp[TSDB_FILENAME_LEN] = {0};
40✔
125
  snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME);
40✔
126
  snprintf(fnameTmp, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME_TMP);
40✔
127

128
  diskId = tfsSearch(pTfs, 0, fname);
40✔
129
  if (diskId >= 0) {
40!
130
    return diskId;
×
131
  }
132
  diskId = tfsSearch(pTfs, 0, fnameTmp);
40✔
133
  if (diskId >= 0) {
40!
134
    return diskId;
×
135
  }
136

137
  // alloc
138
  int32_t     disks[TFS_MAX_DISKS_PER_TIER] = {0};
40✔
139
  int32_t     numOfVnodes = 0;
40✔
140
  SVnodeObj **ppVnodes = NULL;
40✔
141

142
  code = taosThreadMutexLock(&pMgmt->mutex);
40✔
143
  if (code != 0) {
40!
144
    return code;
×
145
  }
146

147
  code = vmGetAllVnodeListFromHashWithCreating(pMgmt, &numOfVnodes, &ppVnodes);
40✔
148
  if (code != 0) {
40!
149
    int32_t r = taosThreadMutexUnlock(&pMgmt->mutex);
×
150
    if (r != 0) {
×
151
      dError("vgId:%d, failed to unlock mutex since %s", vgId, tstrerror(r));
×
152
    }
153
    return code;
×
154
  }
155

156
  for (int32_t v = 0; v < numOfVnodes; v++) {
111✔
157
    SVnodeObj *pVnode = ppVnodes[v];
71✔
158
    disks[pVnode->diskPrimary] += 1;
71✔
159
  }
160

161
  int32_t minVal = INT_MAX;
40✔
162
  int32_t ndisk = tfsGetDisksAtLevel(pTfs, 0);
40✔
163
  diskId = 0;
40✔
164
  for (int32_t id = 0; id < ndisk; id++) {
80✔
165
    if (minVal > disks[id]) {
40!
166
      minVal = disks[id];
40✔
167
      diskId = id;
40✔
168
    }
169
  }
170
  code = vmRegisterCreatingState(pMgmt, vgId, diskId);
40✔
171
  if (code != 0) {
40!
172
    int32_t r = taosThreadMutexUnlock(&pMgmt->mutex);
×
173
    if (r != 0) {
×
174
      dError("vgId:%d, failed to unlock mutex since %s", vgId, tstrerror(r));
×
175
    }
176
    goto _OVER;
×
177
  }
178

179
  code = taosThreadMutexUnlock(&pMgmt->mutex);
40✔
180
  if (code != 0) {
40!
181
    goto _OVER;
×
182
  }
183

184
_OVER:
40✔
185

186
  for (int32_t i = 0; i < numOfVnodes; ++i) {
111✔
187
    if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
71!
188
    vmReleaseVnode(pMgmt, ppVnodes[i]);
71✔
189
  }
190
  if (ppVnodes != NULL) {
40!
191
    taosMemoryFree(ppVnodes);
40!
192
  }
193

194
  if (code != 0) {
40!
195
    dError("vgId:%d, failed to alloc disk since %s", vgId, tstrerror(code));
×
196
    return code;
×
197
  } else {
198
    dInfo("vgId:%d, alloc disk:%d of level 0. ndisk:%d, vnodes: %d", vgId, diskId, ndisk, numOfVnodes);
40!
199
    return diskId;
40✔
200
  }
201
}
202

203
void vmCleanPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) { vmUnRegisterCreatingState(pMgmt, vgId); }
40✔
204

205
SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict) {
86,938✔
206
  SVnodeObj *pVnode = NULL;
86,938✔
207

208
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
86,938✔
209
  int32_t r = taosHashGetDup(pMgmt->runngingHash, &vgId, sizeof(int32_t), (void *)&pVnode);
86,942✔
210
  if (pVnode == NULL || strict && (pVnode->dropped || pVnode->failed)) {
86,940!
211
    terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
122✔
212
    dDebug("vgId:%d, acquire vnode failed.", vgId);
122!
213
    pVnode = NULL;
122✔
214
  } else {
215
    int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
86,818✔
216
    dTrace("vgId:%d, acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
86,820✔
217
  }
218
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
86,942✔
219

220
  return pVnode;
86,942✔
221
}
222

223
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) { return vmAcquireVnodeImpl(pMgmt, vgId, true); }
86,882✔
224

225
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
87,118✔
226
  if (pVnode == NULL) return;
87,118!
227

228
  //(void)taosThreadRwlockRdlock(&pMgmt->lock);
229
  int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
87,118✔
230
  dTrace("vgId:%d, release vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
87,117✔
231
  //(void)taosThreadRwlockUnlock(&pMgmt->lock);
232
}
233

234
static int32_t vmRegisterRunningState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
53✔
235
  SVnodeObj *pOld = NULL;
53✔
236
  dInfo("vgId:%d, put vnode into running hash", pVnode->vgId);
53!
237

238
  int32_t r = taosHashGetDup(pMgmt->runngingHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
53✔
239
  if (r != 0) {
53!
240
    dError("vgId:%d, failed to get vnode from hash", pVnode->vgId);
×
241
  }
242
  if (pOld) {
53!
243
    vmFreeVnodeObj(&pOld);
×
244
  }
245
  int32_t code = taosHashPut(pMgmt->runngingHash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
53✔
246

247
  return code;
53✔
248
}
249

250
static void vmUnRegisterRunningState(SVnodeMgmt *pMgmt, int32_t vgId) {
53✔
251
  dInfo("vgId:%d, remove from hash", vgId);
53!
252
  int32_t r = taosHashRemove(pMgmt->runngingHash, &vgId, sizeof(int32_t));
53✔
253
  if (r != 0) {
53!
254
    dError("vgId:%d, failed to remove vnode since %s", vgId, tstrerror(r));
×
255
  }
256
}
53✔
257

258
static int32_t vmRegisterClosedState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
×
259
  int32_t    code = 0;
×
260
  dInfo("vgId:%d, put into closed hash", pVnode->vgId);
×
261
  SVnodeObj *pClosedVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
×
262
  if (pClosedVnode == NULL) {
×
263
    dError("failed to alloc vnode since %s", terrstr());
×
264
    return terrno;
×
265
  }
266
  (void)memset(pClosedVnode, 0, sizeof(SVnodeObj));
×
267

268
  pClosedVnode->vgId = pVnode->vgId;
×
269
  pClosedVnode->dropped = pVnode->dropped;
×
270
  pClosedVnode->vgVersion = pVnode->vgVersion;
×
271
  pClosedVnode->diskPrimary = pVnode->diskPrimary;
×
272
  pClosedVnode->toVgId = pVnode->toVgId;
×
273
  pClosedVnode->mountId = pVnode->mountId;
×
274

275
  SVnodeObj *pOld = NULL;
×
276
  int32_t    r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
×
277
  if (r != 0) {
×
278
    dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
×
279
  }
280
  if (pOld) {
×
281
    vmFreeVnodeObj(&pOld);
×
282
  }
283
  dInfo("vgId:%d, put vnode to closedHash", pVnode->vgId);
×
284
  r = taosHashPut(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), &pClosedVnode, sizeof(SVnodeObj *));
×
285
  if (r != 0) {
×
286
    dError("vgId:%d, failed to put vnode to closedHash", pVnode->vgId);
×
287
  }
288

289
  return code;
×
290
}
291

292
static void vmUnRegisterClosedState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
53✔
293
  SVnodeObj *pOld = NULL;
53✔
294
  dInfo("vgId:%d, remove from closed hash", pVnode->vgId);
53!
295
  int32_t    r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
53✔
296
  if (r != 0) {
53!
297
    dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
×
298
  }
299
  if (pOld != NULL) {
53!
300
    vmFreeVnodeObj(&pOld);
×
301
    dInfo("vgId:%d, remove from closedHash", pVnode->vgId);
×
302
    r = taosHashRemove(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t));
×
303
    if (r != 0) {
×
304
      dError("vgId:%d, failed to remove vnode from hash", pVnode->vgId);
×
305
    }
306
  }
307
}
53✔
308
#ifdef USE_MOUNT
309
int32_t vmAcquireMountTfs(SVnodeMgmt *pMgmt, int64_t mountId, const char* mountName, const char *mountPath, STfs **ppTfs) {
×
310
  int32_t    code = 0, lino = 0;
×
311
  TdFilePtr  pFile = NULL;
×
312
  SArray    *pDisks = NULL;
×
313
  SMountTfs *pMountTfs = NULL;
×
314
  bool       unlock = false;
×
315

316
  pMountTfs = taosHashGet(pMgmt->mountTfsHash, &mountId, sizeof(mountId));
×
317
  if (pMountTfs && *(SMountTfs **)pMountTfs) {
×
318
    if (!(*ppTfs = (*(SMountTfs **)pMountTfs)->pTfs)) {
×
319
      TAOS_CHECK_EXIT(TSDB_CODE_INTERNAL_ERROR);
×
320
    }
321
    atomic_add_fetch_32(&(*(SMountTfs **)pMountTfs)->nRef, 1);
×
322
    TAOS_RETURN(code);
×
323
  }
324
  if (!mountPath || mountPath[0] == 0 || mountId == 0) {
×
325
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_PARA);
×
326
  }
327
  (void)(taosThreadMutexLock(&pMgmt->mutex));
×
328
  unlock = true;
×
329
  pMountTfs = taosHashGet(pMgmt->mountTfsHash, &mountId, sizeof(mountId));
×
330
  if (pMountTfs && *(SMountTfs **)pMountTfs) {
×
331
    if (!(*ppTfs = (*(SMountTfs **)pMountTfs)->pTfs)) {
×
332
      TAOS_CHECK_EXIT(TSDB_CODE_INTERNAL_ERROR);
×
333
    }
334
    (void)taosThreadMutexUnlock(&pMgmt->mutex);
×
335
    atomic_add_fetch_32(&(*(SMountTfs **)pMountTfs)->nRef, 1);
×
336
    TAOS_RETURN(code);
×
337
  }
338

339
  TAOS_CHECK_EXIT(vmMountCheckRunning(mountName, mountPath, &pFile, 3));
×
340
  TAOS_CHECK_EXIT(vmGetMountDisks(pMgmt, mountPath, &pDisks));
×
341
  int32_t numOfDisks = taosArrayGetSize(pDisks);
×
342
  if (numOfDisks <= 0) {
×
343
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
344
  }
345
  TSDB_CHECK_NULL((pMountTfs = taosMemoryCalloc(1, sizeof(SMountTfs))), code, lino, _exit, terrno);
×
346
  if (mountName) (void)snprintf(pMountTfs->name, sizeof(pMountTfs->name), "%s", mountName);
×
347
  if (mountPath) (void)snprintf(pMountTfs->path, sizeof(pMountTfs->path), "%s", mountPath);
×
348
  pMountTfs->pFile = pFile;
×
349
  atomic_store_32(&pMountTfs->nRef, 2);  // init and acquire
×
350
  TAOS_CHECK_EXIT(tfsOpen(TARRAY_GET_ELEM(pDisks, 0), numOfDisks, &pMountTfs->pTfs));
×
351
  TAOS_CHECK_EXIT(taosHashPut(pMgmt->mountTfsHash, &mountId, sizeof(mountId), &pMountTfs, POINTER_BYTES));
×
352
_exit:
×
353
  if (unlock) {
×
354
    (void)taosThreadMutexUnlock(&pMgmt->mutex);
×
355
  }
356
  taosArrayDestroy(pDisks);
×
357
  if (code != 0) {
×
358
    dError("mount:%" PRIi64 ",%s, failed at line %d to get mount tfs since %s", mountId, mountPath ? mountPath : "NULL",
×
359
           lino, tstrerror(code));
360
    if (pFile) {
×
361
      (void)taosUnLockFile(pFile);
×
362
      (void)taosCloseFile(&pFile);
×
363
    }
364
    if (pMountTfs) {
×
365
      tfsClose(pMountTfs->pTfs);
×
366
      taosMemoryFree(pMountTfs);
×
367
    }
368
    *ppTfs = NULL;
×
369
  } else {
370
    *ppTfs = pMountTfs->pTfs;
×
371
  }
372

373
  TAOS_RETURN(code);
×
374
}
375
#endif
376

377
bool vmReleaseMountTfs(SVnodeMgmt *pMgmt, int64_t mountId, int32_t minRef) {
×
378
#ifdef USE_MOUNT
379
  SMountTfs *pMountTfs = NULL;
×
380
  int32_t    nRef = INT32_MAX;
×
381

382
  pMountTfs = taosHashGet(pMgmt->mountTfsHash, &mountId, sizeof(mountId));
×
383
  if (pMountTfs && *(SMountTfs **)pMountTfs) {
×
384
    if ((nRef = atomic_sub_fetch_32(&(*(SMountTfs **)pMountTfs)->nRef, 1)) <= minRef) {
×
385
      (void)(taosThreadMutexLock(&pMgmt->mutex));
×
386
      SMountTfs *pTmp = taosHashGet(pMgmt->mountTfsHash, &mountId, sizeof(mountId));
×
387
      if (pTmp && *(SMountTfs **)pTmp) {
×
388
        dInfo("mount:%" PRIi64 ", ref:%d, release mount tfs", mountId, nRef);
×
389
        tfsClose((*(SMountTfs **)pTmp)->pTfs);
×
390
        if ((*(SMountTfs **)pTmp)->pFile) {
×
391
          (void)taosUnLockFile((*(SMountTfs **)pTmp)->pFile);
×
392
          (void)taosCloseFile(&(*(SMountTfs **)pTmp)->pFile);
×
393
        }
394
        taosMemoryFree(*(SMountTfs **)pTmp);
×
395
        taosHashRemove(pMgmt->mountTfsHash, &mountId, sizeof(mountId));
×
396
      }
397
      (void)taosThreadMutexUnlock(&pMgmt->mutex);
×
398
      return true;
×
399
    }
400
  }
401
#endif
402
  return false;
×
403
}
404

405

406
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
53✔
407
  SVnodeObj *pVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
53!
408
  if (pVnode == NULL) {
53!
409
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
410
    return -1;
×
411
  }
412

413
  pVnode->vgId = pCfg->vgId;
53✔
414
  pVnode->vgVersion = pCfg->vgVersion;
53✔
415
  pVnode->diskPrimary = pCfg->diskPrimary;
53✔
416
  pVnode->mountId = pCfg->mountId;
53✔
417
  pVnode->refCount = 0;
53✔
418
  pVnode->dropped = 0;
53✔
419
  pVnode->failed = 0;
53✔
420
  pVnode->path = taosStrdup(pCfg->path);
53!
421
  pVnode->pImpl = pImpl;
53✔
422

423
  if (pVnode->path == NULL) {
53!
424
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
425
    taosMemoryFree(pVnode);
×
426
    return -1;
×
427
  }
428

429
  if (pImpl) {
53!
430
    if (vmAllocQueue(pMgmt, pVnode) != 0) {
53!
431
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
432
      taosMemoryFree(pVnode->path);
×
433
      taosMemoryFree(pVnode);
×
434
      return -1;
×
435
    }
436
  } else {
437
    pVnode->failed = 1;
×
438
  }
439

440
  (void)taosThreadRwlockWrlock(&pMgmt->hashLock);
53✔
441
  int32_t code = vmRegisterRunningState(pMgmt, pVnode);
53✔
442
  vmUnRegisterClosedState(pMgmt, pVnode);
53✔
443
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
53✔
444

445
  TAOS_RETURN(code);
53✔
446
}
447

448
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal, bool keepClosed) {
53✔
449
  char path[TSDB_FILENAME_LEN] = {0};
53✔
450
  bool atExit = true;
53✔
451

452
  if (pVnode->pImpl && vnodeIsLeader(pVnode->pImpl)) {
53!
453
    vnodeProposeCommitOnNeed(pVnode->pImpl, atExit);
53✔
454
  }
455

456
  (void)taosThreadRwlockWrlock(&pMgmt->hashLock);
53✔
457
  vmUnRegisterRunningState(pMgmt, pVnode->vgId);
53✔
458
  if (keepClosed) {
53!
459
    if (vmRegisterClosedState(pMgmt, pVnode) != 0) {
×
460
      (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
461
      return;
×
462
    };
463
  }
464
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
53✔
465

466
  vmReleaseVnode(pMgmt, pVnode);
53✔
467

468
  if (pVnode->failed) {
53!
469
    goto _closed;
×
470
  }
471
  dInfo("vgId:%d, pre close", pVnode->vgId);
53!
472
  vnodePreClose(pVnode->pImpl);
53✔
473

474
  dInfo("vgId:%d, wait for vnode ref become 0", pVnode->vgId);
53!
475
  while (pVnode->refCount > 0) taosMsleep(10);
53!
476

477
  dInfo("vgId:%d, wait for vnode write queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteW.queue,
53!
478
        taosQueueGetThreadId(pVnode->pWriteW.queue));
479
  tMultiWorkerCleanup(&pVnode->pWriteW);
53✔
480

481
  dInfo("vgId:%d, wait for vnode sync queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue,
53!
482
        taosQueueGetThreadId(pVnode->pSyncW.queue));
483
  tMultiWorkerCleanup(&pVnode->pSyncW);
53✔
484

485
  dInfo("vgId:%d, wait for vnode sync rd queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncRdW.queue,
53!
486
        taosQueueGetThreadId(pVnode->pSyncRdW.queue));
487
  tMultiWorkerCleanup(&pVnode->pSyncRdW);
53✔
488

489
  dInfo("vgId:%d, wait for vnode apply queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue,
53!
490
        taosQueueGetThreadId(pVnode->pApplyW.queue));
491
  tMultiWorkerCleanup(&pVnode->pApplyW);
53✔
492

493
  dInfo("vgId:%d, wait for vnode fetch queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
53!
494
        taosQueueGetThreadId(pVnode->pFetchQ));
495
  while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
53!
496

497
  dInfo("vgId:%d, wait for vnode query queue:%p is empty", pVnode->vgId, pVnode->pQueryQ);
53!
498
  while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
53!
499

500
  dInfo("vgId:%d, wait for vnode stream reader queue:%p is empty", pVnode->vgId, pVnode->pStreamReaderQ);
53!
501
  while (!taosQueueEmpty(pVnode->pStreamReaderQ)) taosMsleep(10);
53!
502

503
  dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId);
53!
504

505
  dInfo("vgId:%d, post close", pVnode->vgId);
53!
506
  vnodePostClose(pVnode->pImpl);
53✔
507

508
  vmFreeQueue(pMgmt, pVnode);
53✔
509

510
  if (commitAndRemoveWal) {
53!
511
    dInfo("vgId:%d, commit data for vnode split", pVnode->vgId);
×
512
    if (vnodeSyncCommit(pVnode->pImpl) != 0) {
×
513
      dError("vgId:%d, failed to commit data", pVnode->vgId);
×
514
    }
515
    if (vnodeBegin(pVnode->pImpl) != 0) {
×
516
      dError("vgId:%d, failed to begin", pVnode->vgId);
×
517
    }
518
    dInfo("vgId:%d, commit data finished", pVnode->vgId);
×
519
  }
520

521
  int32_t nodeId = vnodeNodeId(pVnode->pImpl);
53✔
522
  vnodeClose(pVnode->pImpl);
53✔
523
  pVnode->pImpl = NULL;
53✔
524

525
_closed:
53✔
526
  dInfo("vgId:%d, vnode is closed", pVnode->vgId);
53!
527

528
  if (commitAndRemoveWal) {
53!
529
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d%swal", TD_DIRSEP, pVnode->vgId, TD_DIRSEP);
×
530
    dInfo("vgId:%d, remove all wals, path:%s", pVnode->vgId, path);
×
531
    if (tfsRmdir(pMgmt->pTfs, path) != 0) {
×
532
      dTrace("vgId:%d, failed to remove wals, path:%s", pVnode->vgId, path);
×
533
    }
534
    if (tfsMkdir(pMgmt->pTfs, path) != 0) {
×
535
      dTrace("vgId:%d, failed to create wals, path:%s", pVnode->vgId, path);
×
536
    }
537
  }
538

539
  if (pVnode->dropped) {
53✔
540
    dInfo("vgId:%d, vnode is destroyed, dropped:%d", pVnode->vgId, pVnode->dropped);
16!
541
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId);
16✔
542
    vnodeDestroy(pVnode->vgId, path, pMgmt->pTfs, nodeId);
16✔
543
  }
544
  if (pVnode->mountId && vmReleaseMountTfs(pMgmt, pVnode->mountId, pVnode->dropped ? 1 : 0)) {
53!
545
    vmWriteMountListToFile(pMgmt);
×
546
  }
547

548
  vmFreeVnodeObj(&pVnode);
53✔
549
}
550

551
void vmCloseFailedVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
×
552
  int32_t r = 0;
×
553
  r = taosThreadRwlockWrlock(&pMgmt->hashLock);
×
554
  if (r != 0) {
×
555
    dError("vgId:%d, failed to lock since %s", vgId, tstrerror(r));
×
556
  }
557
  if (r == 0) {
×
558
    vmUnRegisterRunningState(pMgmt, vgId);
×
559
  }
560
  r = taosThreadRwlockUnlock(&pMgmt->hashLock);
×
561
  if (r != 0) {
×
562
    dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r));
×
563
  }
564
}
×
565

566
static int32_t vmRestoreVgroupId(SWrapperCfg *pCfg, STfs *pTfs) {
×
567
  int32_t srcVgId = pCfg->vgId;
×
568
  int32_t dstVgId = pCfg->toVgId;
×
569
  if (dstVgId == 0) return 0;
×
570

571
  char srcPath[TSDB_FILENAME_LEN];
572
  char dstPath[TSDB_FILENAME_LEN];
573

574
  snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
×
575
  snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
×
576

577
  int32_t diskPrimary = pCfg->diskPrimary;
×
578
  int32_t vgId = vnodeRestoreVgroupId(srcPath, dstPath, srcVgId, dstVgId, diskPrimary, pTfs);
×
579
  if (vgId <= 0) {
×
580
    dError("vgId:%d, failed to restore vgroup id. srcPath: %s", pCfg->vgId, srcPath);
×
581
    return -1;
×
582
  }
583

584
  pCfg->vgId = vgId;
×
585
  pCfg->toVgId = 0;
×
586
  return 0;
×
587
}
588

589
static void *vmOpenVnodeInThread(void *param) {
13✔
590
  SVnodeThread *pThread = param;
13✔
591
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
13✔
592
  char          path[TSDB_FILENAME_LEN];
593

594
  dInfo("thread:%d, start to open or destroy %d vnodes", pThread->threadIndex, pThread->vnodeNum);
13!
595
  setThreadName("open-vnodes");
13✔
596

597
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
26✔
598
    SWrapperCfg *pCfg = &pThread->pCfgs[v];
13✔
599
    if (pCfg->dropped) {
13!
600
      char stepDesc[TSDB_STEP_DESC_LEN] = {0};
×
601
      snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to destroy, %d of %d have been dropped", pCfg->vgId,
×
602
               pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
603
      tmsgReportStartup("vnode-destroy", stepDesc);
×
604

605
      snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
×
606
      vnodeDestroy(pCfg->vgId, path, pMgmt->pTfs, 0);
×
607
      pThread->updateVnodesList = true;
×
608
      pThread->dropped++;
×
609
      (void)atomic_add_fetch_32(&pMgmt->state.dropVnodes, 1);
×
610
      continue;
×
611
    }
612

613
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
13✔
614
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
13✔
615
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
616
    tmsgReportStartup("vnode-open", stepDesc);
13✔
617

618
    if (pCfg->toVgId) {
13!
619
      if (vmRestoreVgroupId(pCfg, pMgmt->pTfs) != 0) {
×
620
        dError("vgId:%d, failed to restore vgroup id by thread:%d", pCfg->vgId, pThread->threadIndex);
×
621
        pThread->failed++;
×
622
        continue;
×
623
      }
624
      pThread->updateVnodesList = true;
×
625
    }
626

627
    int32_t diskPrimary = pCfg->mountId == 0 ? pCfg->diskPrimary : 0;
13!
628
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
13✔
629

630
    STfs *pMountTfs = NULL;
13✔
631
#ifdef USE_MOUNT
632
    bool releaseTfs = false;
13✔
633
    if (pCfg->mountId) {
13!
634
      if (vmAcquireMountTfs(pMgmt, pCfg->mountId, NULL, NULL, &pMountTfs) != 0) {
×
635
        dError("vgId:%d, failed to get mount tfs by thread:%d", pCfg->vgId, pThread->threadIndex);
×
636
        pThread->failed++;
×
637
        continue;
×
638
      }
639
      releaseTfs = true;
×
640
    }
641
#endif
642

643
    SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMountTfs, pMgmt->msgCb, false);
13✔
644

645
    if (pImpl == NULL) {
13!
646
      dError("vgId:%d, failed to open vnode by thread:%d since %s", pCfg->vgId, pThread->threadIndex, terrstr());
×
647
      if (terrno != TSDB_CODE_NEED_RETRY) {
×
648
        pThread->failed++;
×
649
#ifdef USE_MOUNT
650
        if (releaseTfs) vmReleaseMountTfs(pMgmt, pCfg->mountId, 0);
×
651
#endif
652
        continue;
×
653
      }
654
    }
655

656
    if (pImpl != NULL) {
13!
657
      if (vmOpenVnode(pMgmt, pCfg, pImpl) != 0) {
13!
658
        dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
×
659
        pThread->failed++;
×
660
#ifdef USE_MOUNT
661
        if (releaseTfs) vmReleaseMountTfs(pMgmt, pCfg->mountId, 0);
×
662
#endif
663
        continue;
×
664
      }
665
    }
666

667
    dInfo("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
13!
668
    pThread->opened++;
13✔
669
    (void)atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
13✔
670
  }
671

672
  dInfo("thread:%d, numOfVnodes:%d, opened:%d dropped:%d failed:%d", pThread->threadIndex, pThread->vnodeNum,
13!
673
        pThread->opened, pThread->dropped, pThread->failed);
674
  return NULL;
13✔
675
}
676

677
#ifdef USE_MOUNT
678
static int32_t vmOpenMountTfs(SVnodeMgmt *pMgmt) {
15✔
679
  int32_t    code = 0, lino = 0;
15✔
680
  int32_t    numOfMounts = 0;
15✔
681
  SMountCfg *pMountCfgs = NULL;
15✔
682
  SArray    *pDisks = NULL;
15✔
683
  TdFilePtr  pFile = NULL;
15✔
684
  SMountTfs *pMountTfs = NULL;
15✔
685

686
  TAOS_CHECK_EXIT(vmGetMountListFromFile(pMgmt, &pMountCfgs, &numOfMounts));
15!
687
  for (int32_t i = 0; i < numOfMounts; ++i) {
15!
688
    SMountCfg *pCfg = &pMountCfgs[i];
×
689
    if (taosHashGet(pMgmt->mountTfsHash, &pCfg->mountId, sizeof(pCfg->mountId))) {
×
690
      TAOS_CHECK_EXIT(TSDB_CODE_INTERNAL_ERROR);
×
691
    }
692
    TAOS_CHECK_EXIT(vmMountCheckRunning(pCfg->name, pCfg->path, &pFile, 3));
×
693
    TAOS_CHECK_EXIT(vmGetMountDisks(pMgmt, pCfg->path, &pDisks));
×
694
    int32_t nDisks = taosArrayGetSize(pDisks);
×
695
    if (nDisks < 1 || nDisks > TFS_MAX_DISKS) {
×
696
      dError("mount:%s, %" PRIi64 ", %s, invalid number of disks:%d, expect 1 to %d", pCfg->name, pCfg->mountId,
×
697
             pCfg->path, nDisks, TFS_MAX_DISKS);
698
      TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
699
    }
700

701
    TSDB_CHECK_NULL((pMountTfs = taosMemoryCalloc(1, sizeof(SMountTfs))), code, lino, _exit, terrno);
×
702
    TAOS_CHECK_EXIT(tfsOpen(TARRAY_GET_ELEM(pDisks, 0), TARRAY_SIZE(pDisks), &pMountTfs->pTfs));
×
703
    (void)snprintf(pMountTfs->name, sizeof(pMountTfs->name), "%s", pCfg->name);
×
704
    (void)snprintf(pMountTfs->path, sizeof(pMountTfs->path), "%s", pCfg->path);
×
705
    pMountTfs->pFile = pFile;
×
706
    pMountTfs->nRef = 1;
×
707
    TAOS_CHECK_EXIT(taosHashPut(pMgmt->mountTfsHash, &pCfg->mountId, sizeof(pCfg->mountId), &pMountTfs, POINTER_BYTES));
×
708
    taosArrayDestroy(pDisks);
×
709
    pDisks = NULL;
×
710
    pMountTfs = NULL;
×
711
    pFile = NULL;
×
712
  }
713
_exit:
15✔
714
  if (code != 0) {
15!
715
    dError("failed to open mount tfs at line %d since %s", lino, tstrerror(code));
×
716
    if(pFile) {
×
717
      (void)taosUnLockFile(pFile);
×
718
      (void)taosCloseFile(&pFile);
×
719
    }
720
    if (pMountTfs) {
×
721
      tfsClose(pMountTfs->pTfs);
×
722
      taosMemoryFree(pMountTfs);
×
723
    }
724
    taosArrayDestroy(pDisks);
×
725
  }
726
  taosMemoryFree(pMountCfgs);
15!
727
  TAOS_RETURN(code);
15✔
728
}
729
#endif
730
static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
15✔
731
  pMgmt->runngingHash =
15✔
732
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
15✔
733
  if (pMgmt->runngingHash == NULL) {
15!
734
    dError("failed to init vnode hash since %s", terrstr());
×
735
    return TSDB_CODE_OUT_OF_MEMORY;
×
736
  }
737

738
  pMgmt->closedHash =
15✔
739
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
15✔
740
  if (pMgmt->closedHash == NULL) {
15!
741
    dError("failed to init vnode closed hash since %s", terrstr());
×
742
    return TSDB_CODE_OUT_OF_MEMORY;
×
743
  }
744

745
  pMgmt->creatingHash =
15✔
746
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
15✔
747
  if (pMgmt->creatingHash == NULL) {
15!
748
    dError("failed to init vnode creatingHash hash since %s", terrstr());
×
749
    return TSDB_CODE_OUT_OF_MEMORY;
×
750
  }
751

752
  SWrapperCfg *pCfgs = NULL;
15✔
753
  int32_t      numOfVnodes = 0;
15✔
754
  int32_t      code = 0;
15✔
755
  if ((code = vmGetVnodeListFromFile(pMgmt, &pCfgs, &numOfVnodes)) != 0) {
15!
756
    dInfo("failed to get vnode list from disk since %s", tstrerror(code));
×
757
    return code;
×
758
  }
759

760
  pMgmt->state.totalVnodes = numOfVnodes;
15✔
761

762
  int32_t threadNum = tsNumOfCores / 2;
15✔
763
  if (threadNum < 1) threadNum = 1;
15!
764
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
15✔
765

766
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
15!
767
  if (threads == NULL) {
15!
768
    dError("failed to allocate memory for threads since %s", terrstr());
×
769
    taosMemoryFree(pCfgs);
×
770
    return terrno;
×
771
  }
772

773
  for (int32_t t = 0; t < threadNum; ++t) {
315✔
774
    threads[t].threadIndex = t;
300✔
775
    threads[t].pMgmt = pMgmt;
300✔
776
    threads[t].pCfgs = taosMemoryCalloc(vnodesPerThread, sizeof(SWrapperCfg));
300!
777
  }
778

779
  for (int32_t v = 0; v < numOfVnodes; ++v) {
28✔
780
    int32_t       t = v % threadNum;
13✔
781
    SVnodeThread *pThread = &threads[t];
13✔
782
    pThread->pCfgs[pThread->vnodeNum++] = pCfgs[v];
13✔
783
  }
784

785
  dInfo("open %d vnodes with %d threads", numOfVnodes, threadNum);
15!
786

787
  for (int32_t t = 0; t < threadNum; ++t) {
315✔
788
    SVnodeThread *pThread = &threads[t];
300✔
789
    if (pThread->vnodeNum == 0) continue;
300✔
790

791
    TdThreadAttr thAttr;
792
    (void)taosThreadAttrInit(&thAttr);
13✔
793
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
13✔
794
#ifdef TD_COMPACT_OS
795
    (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
796
#endif
797
    if (taosThreadCreate(&pThread->thread, &thAttr, vmOpenVnodeInThread, pThread) != 0) {
13!
798
      dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(ERRNO));
×
799
    }
800

801
    (void)taosThreadAttrDestroy(&thAttr);
13✔
802
  }
803

804
  bool updateVnodesList = false;
15✔
805

806
  for (int32_t t = 0; t < threadNum; ++t) {
315✔
807
    SVnodeThread *pThread = &threads[t];
300✔
808
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
300!
809
      (void)taosThreadJoin(pThread->thread, NULL);
13✔
810
      taosThreadClear(&pThread->thread);
13✔
811
    }
812
    taosMemoryFree(pThread->pCfgs);
300!
813
    if (pThread->updateVnodesList) updateVnodesList = true;
300!
814
  }
815
  taosMemoryFree(threads);
15!
816
  taosMemoryFree(pCfgs);
15!
817

818
  if ((pMgmt->state.openVnodes + pMgmt->state.dropVnodes) != pMgmt->state.totalVnodes) {
15!
819
    dError("there are total vnodes:%d, opened:%d", pMgmt->state.totalVnodes, pMgmt->state.openVnodes);
×
820
    return terrno = TSDB_CODE_VND_INIT_FAILED;
×
821
  }
822

823
  if (updateVnodesList && (code = vmWriteVnodeListToFile(pMgmt)) != 0) {
15!
824
    dError("failed to write vnode list since %s", tstrerror(code));
×
825
    return code;
×
826
  }
827

828
#ifdef USE_MOUNT
829
  bool  updateMountList = false;
15✔
830
  void *pIter = NULL;
15✔
831
  while ((pIter = taosHashIterate(pMgmt->mountTfsHash, pIter))) {
15!
832
    SMountTfs *pMountTfs = *(SMountTfs **)pIter;
×
833
    if (pMountTfs && atomic_load_32(&pMountTfs->nRef) <= 1) {
×
834
      size_t  keyLen = 0;
×
835
      int64_t mountId = *(int64_t *)taosHashGetKey(pIter, &keyLen);
×
836
      dInfo("mount:%s, %s, %" PRIi64 ", ref:%d, remove unused mount tfs", pMountTfs->name, pMountTfs->path, mountId,
×
837
            atomic_load_32(&pMountTfs->nRef));
838
      if(pMountTfs->pFile) {
×
839
        (void)taosUnLockFile(pMountTfs->pFile);
×
840
        (void)taosCloseFile(&pMountTfs->pFile);
×
841
      }
842
      tfsClose(pMountTfs->pTfs);
×
843
      taosMemoryFree(pMountTfs);
×
844
      taosHashRemove(pMgmt->mountTfsHash, &mountId, keyLen);
×
845
      updateMountList = true;
×
846
    }
847
  }
848
  if (updateMountList && (code = vmWriteMountListToFile(pMgmt)) != 0) {
15!
849
    dError("failed to write mount list at line %d since %s", __LINE__, tstrerror(code));
×
850
    return code;
×
851
  }
852
#endif
853

854
  dInfo("successfully opened %d vnodes", pMgmt->state.totalVnodes);
15!
855
  return 0;
15✔
856
}
857

858
static void *vmCloseVnodeInThread(void *param) {
36✔
859
  SVnodeThread *pThread = param;
36✔
860
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
36✔
861

862
  dInfo("thread:%d, start to close %d vnodes", pThread->threadIndex, pThread->vnodeNum);
36!
863
  setThreadName("close-vnodes");
37✔
864

865
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
74✔
866
    SVnodeObj *pVnode = pThread->ppVnodes[v];
37✔
867

868
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
37✔
869
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to close, %d of %d have been closed", pVnode->vgId,
37✔
870
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
871
    tmsgReportStartup("vnode-close", stepDesc);
37✔
872

873
    vmCloseVnode(pMgmt, pVnode, false, false);
37✔
874
  }
875

876
  dInfo("thread:%d, numOfVnodes:%d is closed", pThread->threadIndex, pThread->vnodeNum);
37!
877
  return NULL;
37✔
878
}
879

880
static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
15✔
881
  int32_t code = 0;
15✔
882
  dInfo("start to close all vnodes");
15!
883
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
15✔
884
  dInfo("vnodes mgmt worker is stopped");
15!
885
  tSingleWorkerCleanup(&pMgmt->mgmtMultiWorker);
15✔
886
  dInfo("vnodes multiple mgmt worker is stopped");
15!
887

888
  int32_t     numOfVnodes = 0;
15✔
889
  SVnodeObj **ppVnodes = NULL;
15✔
890
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
15✔
891
  if (code != 0) {
15!
892
    dError("failed to get vnode list since %s", tstrerror(code));
×
893
    return;
×
894
  }
895

896
  int32_t threadNum = tsNumOfCores / 2;
15✔
897
  if (threadNum < 1) threadNum = 1;
15!
898
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
15✔
899

900
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
15!
901
  for (int32_t t = 0; t < threadNum; ++t) {
315✔
902
    threads[t].threadIndex = t;
300✔
903
    threads[t].pMgmt = pMgmt;
300✔
904
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
300!
905
  }
906

907
  for (int32_t v = 0; v < numOfVnodes; ++v) {
52✔
908
    int32_t       t = v % threadNum;
37✔
909
    SVnodeThread *pThread = &threads[t];
37✔
910
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
37!
911
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
37✔
912
    }
913
  }
914

915
  pMgmt->state.openVnodes = 0;
15✔
916
  dInfo("close %d vnodes with %d threads", numOfVnodes, threadNum);
15!
917

918
  for (int32_t t = 0; t < threadNum; ++t) {
315✔
919
    SVnodeThread *pThread = &threads[t];
300✔
920
    if (pThread->vnodeNum == 0) continue;
300✔
921

922
    TdThreadAttr thAttr;
923
    (void)taosThreadAttrInit(&thAttr);
37✔
924
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
37✔
925
#ifdef TD_COMPACT_OS
926
    (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
927
#endif
928
    if (taosThreadCreate(&pThread->thread, &thAttr, vmCloseVnodeInThread, pThread) != 0) {
37!
929
      dError("thread:%d, failed to create thread to close vnode since %s", pThread->threadIndex, strerror(ERRNO));
×
930
    }
931

932
    (void)taosThreadAttrDestroy(&thAttr);
37✔
933
  }
934

935
  for (int32_t t = 0; t < threadNum; ++t) {
315✔
936
    SVnodeThread *pThread = &threads[t];
300✔
937
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
300!
938
      (void)taosThreadJoin(pThread->thread, NULL);
37✔
939
      taosThreadClear(&pThread->thread);
37✔
940
    }
941
    taosMemoryFree(pThread->ppVnodes);
300!
942
  }
943
  taosMemoryFree(threads);
15!
944

945
  if (ppVnodes != NULL) {
15!
946
    taosMemoryFree(ppVnodes);
15!
947
  }
948

949
  if (pMgmt->runngingHash != NULL) {
15!
950
    taosHashCleanup(pMgmt->runngingHash);
15✔
951
    pMgmt->runngingHash = NULL;
15✔
952
  }
953

954
  void *pIter = taosHashIterate(pMgmt->closedHash, NULL);
15✔
955
  while (pIter) {
15!
956
    SVnodeObj **ppVnode = pIter;
×
957
    vmFreeVnodeObj(ppVnode);
×
958
    pIter = taosHashIterate(pMgmt->closedHash, pIter);
×
959
  }
960

961
  if (pMgmt->closedHash != NULL) {
15!
962
    taosHashCleanup(pMgmt->closedHash);
15✔
963
    pMgmt->closedHash = NULL;
15✔
964
  }
965

966
  pIter = taosHashIterate(pMgmt->creatingHash, NULL);
15✔
967
  while (pIter) {
15!
968
    SVnodeObj **ppVnode = pIter;
×
969
    vmFreeVnodeObj(ppVnode);
×
970
    pIter = taosHashIterate(pMgmt->creatingHash, pIter);
×
971
  }
972

973
  if (pMgmt->creatingHash != NULL) {
15!
974
    taosHashCleanup(pMgmt->creatingHash);
15✔
975
    pMgmt->creatingHash = NULL;
15✔
976
  }
977

978
#ifdef USE_MOUNT
979
  pIter = NULL;
15✔
980
  while ((pIter = taosHashIterate(pMgmt->mountTfsHash, pIter))) {
15!
981
    SMountTfs *mountTfs = *(SMountTfs **)pIter;
×
982
    if(mountTfs->pFile) {
×
983
      (void)taosUnLockFile(mountTfs->pFile);
×
984
      (void)taosCloseFile(&mountTfs->pFile);
×
985
    }
986
    tfsClose(mountTfs->pTfs);
×
987
    taosMemoryFree(mountTfs);
×
988
  }
989
  taosHashCleanup(pMgmt->mountTfsHash);
15✔
990
  pMgmt->mountTfsHash = NULL;
15✔
991
#endif
992

993
  dInfo("total vnodes:%d are all closed", numOfVnodes);
15!
994
}
995

996
static void vmCleanup(SVnodeMgmt *pMgmt) {
15✔
997
  vmCloseVnodes(pMgmt);
15✔
998
  vmStopWorker(pMgmt);
15✔
999
  vnodeCleanup();
15✔
1000
  (void)taosThreadRwlockDestroy(&pMgmt->hashLock);
15✔
1001
  (void)taosThreadMutexDestroy(&pMgmt->mutex);
15✔
1002
  taosMemoryFree(pMgmt);
15!
1003
}
15✔
1004

1005
static void vmCheckSyncTimeout(SVnodeMgmt *pMgmt) {
4✔
1006
  int32_t     code = 0;
4✔
1007
  int32_t     numOfVnodes = 0;
4✔
1008
  SVnodeObj **ppVnodes = NULL;
4✔
1009
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
4✔
1010
  if (code != 0) {
4!
1011
    dError("failed to get vnode list since %s", tstrerror(code));
×
1012
    return;
×
1013
  }
1014

1015
  if (ppVnodes != NULL) {
4!
1016
    for (int32_t i = 0; i < numOfVnodes; ++i) {
31✔
1017
      SVnodeObj *pVnode = ppVnodes[i];
27✔
1018
      if (!pVnode->failed) {
27!
1019
        vnodeSyncCheckTimeout(pVnode->pImpl);
27✔
1020
      }
1021
      vmReleaseVnode(pMgmt, pVnode);
27✔
1022
    }
1023
    taosMemoryFree(ppVnodes);
4!
1024
  }
1025
}
1026

1027
static void *vmThreadFp(void *param) {
15✔
1028
  SVnodeMgmt *pMgmt = param;
15✔
1029
  int64_t     lastTime = 0;
15✔
1030
  setThreadName("vnode-timer");
15✔
1031

1032
  while (1) {
1,703✔
1033
    lastTime++;
1,718✔
1034
    taosMsleep(100);
1,718✔
1035
    if (pMgmt->stop) break;
1,718✔
1036
    if (lastTime % 10 != 0) continue;
1,703✔
1037

1038
    int64_t sec = lastTime / 10;
163✔
1039
    if (sec % (VNODE_TIMEOUT_SEC / 2) == 0) {
163✔
1040
      vmCheckSyncTimeout(pMgmt);
4✔
1041
    }
1042
  }
1043

1044
  return NULL;
15✔
1045
}
1046

1047
static int32_t vmInitTimer(SVnodeMgmt *pMgmt) {
15✔
1048
  int32_t      code = 0;
15✔
1049
  TdThreadAttr thAttr;
1050
  (void)taosThreadAttrInit(&thAttr);
15✔
1051
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
15✔
1052
#ifdef TD_COMPACT_OS
1053
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
1054
#endif
1055
  if (taosThreadCreate(&pMgmt->thread, &thAttr, vmThreadFp, pMgmt) != 0) {
15!
1056
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
1057
    dError("failed to create vnode timer thread since %s", tstrerror(code));
×
1058
    return code;
×
1059
  }
1060

1061
  (void)taosThreadAttrDestroy(&thAttr);
15✔
1062
  return 0;
15✔
1063
}
1064

1065
static void vmCleanupTimer(SVnodeMgmt *pMgmt) {
15✔
1066
  pMgmt->stop = true;
15✔
1067
  if (taosCheckPthreadValid(pMgmt->thread)) {
15!
1068
    (void)taosThreadJoin(pMgmt->thread, NULL);
15✔
1069
    taosThreadClear(&pMgmt->thread);
15✔
1070
  }
1071
}
15✔
1072

1073
static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
15✔
1074
  int32_t code = -1;
15✔
1075

1076
  SVnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SVnodeMgmt));
15!
1077
  if (pMgmt == NULL) {
15!
1078
    code = terrno;
×
1079
    goto _OVER;
×
1080
  }
1081

1082
  pMgmt->pData = pInput->pData;
15✔
1083
  pMgmt->path = pInput->path;
15✔
1084
  pMgmt->name = pInput->name;
15✔
1085
  pMgmt->msgCb = pInput->msgCb;
15✔
1086
  pMgmt->msgCb.putToQueueFp = (PutToQueueFp)vmPutRpcMsgToQueue;
15✔
1087
  pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
15✔
1088
  pMgmt->msgCb.mgmt = pMgmt;
15✔
1089

1090
  code = taosThreadRwlockInit(&pMgmt->hashLock, NULL);
15✔
1091
  if (code != 0) {
15!
1092
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
1093
    goto _OVER;
×
1094
  }
1095

1096
  code = taosThreadMutexInit(&pMgmt->mutex, NULL);
15✔
1097
  if (code != 0) {
15!
1098
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
1099
    goto _OVER;
×
1100
  }
1101

1102
  pMgmt->pTfs = pInput->pTfs;
15✔
1103
  if (pMgmt->pTfs == NULL) {
15!
1104
    dError("tfs is null.");
×
1105
    goto _OVER;
×
1106
  }
1107
#ifdef USE_MOUNT
1108
  if (!(pMgmt->mountTfsHash =
15!
1109
            taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK))) {
15✔
1110
    dError("failed to init mountTfsHash since %s", terrstr());
×
1111
    return TSDB_CODE_OUT_OF_MEMORY;
×
1112
  }
1113
  if ((code = vmOpenMountTfs(pMgmt)) != 0) {
15!
1114
    goto _OVER;
×
1115
  }
1116
#endif
1117
  tmsgReportStartup("vnode-tfs", "initialized");
15✔
1118
  if ((code = walInit(pInput->stopDnodeFp)) != 0) {
15!
1119
    dError("failed to init wal since %s", tstrerror(code));
×
1120
    goto _OVER;
×
1121
  }
1122

1123
  tmsgReportStartup("vnode-wal", "initialized");
15✔
1124

1125
  if ((code = syncInit()) != 0) {
15!
1126
    dError("failed to open sync since %s", tstrerror(code));
×
1127
    goto _OVER;
×
1128
  }
1129
  tmsgReportStartup("vnode-sync", "initialized");
15✔
1130

1131
  if ((code = vnodeInit(pInput->stopDnodeFp)) != 0) {
15!
1132
    dError("failed to init vnode since %s", tstrerror(code));
×
1133
    goto _OVER;
×
1134
  }
1135
  tmsgReportStartup("vnode-commit", "initialized");
15✔
1136

1137
  if ((code = vmStartWorker(pMgmt)) != 0) {
15!
1138
    dError("failed to init workers since %s", tstrerror(code));
×
1139
    goto _OVER;
×
1140
  }
1141
  tmsgReportStartup("vnode-worker", "initialized");
15✔
1142

1143
  if ((code = vmOpenVnodes(pMgmt)) != 0) {
15!
1144
    dError("failed to open all vnodes since %s", tstrerror(code));
×
1145
    goto _OVER;
×
1146
  }
1147
  tmsgReportStartup("vnode-vnodes", "initialized");
15✔
1148

1149
  if ((code = udfcOpen()) != 0) {
15!
1150
    dError("failed to open udfc in vnode since %s", tstrerror(code));
×
1151
    goto _OVER;
×
1152
  }
1153

1154
  code = 0;
15✔
1155

1156
_OVER:
15✔
1157
  if (code == 0) {
15!
1158
    pOutput->pMgmt = pMgmt;
15✔
1159
  } else {
1160
    dError("failed to init vnodes-mgmt since %s", tstrerror(code));
×
1161
    vmCleanup(pMgmt);
×
1162
  }
1163

1164
  return code;
15✔
1165
}
1166

1167
static int32_t vmRequire(const SMgmtInputOpt *pInput, bool *required) {
15✔
1168
  *required = tsNumOfSupportVnodes > 0;
15✔
1169
  return 0;
15✔
1170
}
1171

1172
static void *vmRestoreVnodeInThread(void *param) {
13✔
1173
  SVnodeThread *pThread = param;
13✔
1174
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
13✔
1175

1176
  dInfo("thread:%d, start to restore %d vnodes", pThread->threadIndex, pThread->vnodeNum);
13!
1177
  setThreadName("restore-vnodes");
13✔
1178

1179
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
26✔
1180
    SVnodeObj *pVnode = pThread->ppVnodes[v];
13✔
1181
    if (pVnode->failed) {
13!
1182
      dError("vgId:%d, cannot restore a vnode in failed mode.", pVnode->vgId);
×
1183
      continue;
×
1184
    }
1185

1186
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
13✔
1187
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been restored", pVnode->vgId,
13✔
1188
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
1189
    tmsgReportStartup("vnode-restore", stepDesc);
13✔
1190

1191
    int32_t code = vnodeStart(pVnode->pImpl);
13✔
1192
    if (code != 0) {
13!
1193
      dError("vgId:%d, failed to restore vnode by thread:%d", pVnode->vgId, pThread->threadIndex);
×
1194
      pThread->failed++;
×
1195
    } else {
1196
      dInfo("vgId:%d, is restored by thread:%d", pVnode->vgId, pThread->threadIndex);
13!
1197
      pThread->opened++;
13✔
1198
      (void)atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
13✔
1199
    }
1200
  }
1201

1202
  dInfo("thread:%d, numOfVnodes:%d, restored:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
13!
1203
        pThread->failed);
1204
  return NULL;
13✔
1205
}
1206

1207
static int32_t vmStartVnodes(SVnodeMgmt *pMgmt) {
15✔
1208
  int32_t     code = 0;
15✔
1209
  int32_t     numOfVnodes = 0;
15✔
1210
  SVnodeObj **ppVnodes = NULL;
15✔
1211
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
15✔
1212
  if (code != 0) {
15!
1213
    dError("failed to get vnode list since %s", tstrerror(code));
×
1214
    return code;
×
1215
  }
1216

1217
  int32_t threadNum = tsNumOfCores / 2;
15✔
1218
  if (threadNum < 1) threadNum = 1;
15!
1219
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
15✔
1220

1221
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
15!
1222
  if (threads == NULL) {
15!
1223
    return terrno;
×
1224
  }
1225

1226
  for (int32_t t = 0; t < threadNum; ++t) {
315✔
1227
    threads[t].threadIndex = t;
300✔
1228
    threads[t].pMgmt = pMgmt;
300✔
1229
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
300!
1230
    if (threads[t].ppVnodes == NULL) {
300!
1231
      code = terrno;
×
1232
      break;
×
1233
    }
1234
  }
1235

1236
  for (int32_t v = 0; v < numOfVnodes; ++v) {
28✔
1237
    int32_t       t = v % threadNum;
13✔
1238
    SVnodeThread *pThread = &threads[t];
13✔
1239
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
13!
1240
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
13✔
1241
    }
1242
  }
1243

1244
  pMgmt->state.openVnodes = 0;
15✔
1245
  pMgmt->state.dropVnodes = 0;
15✔
1246
  dInfo("restore %d vnodes with %d threads", numOfVnodes, threadNum);
15!
1247

1248
  for (int32_t t = 0; t < threadNum; ++t) {
315✔
1249
    SVnodeThread *pThread = &threads[t];
300✔
1250
    if (pThread->vnodeNum == 0) continue;
300✔
1251

1252
    TdThreadAttr thAttr;
1253
    (void)taosThreadAttrInit(&thAttr);
13✔
1254
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
13✔
1255
    if (taosThreadCreate(&pThread->thread, &thAttr, vmRestoreVnodeInThread, pThread) != 0) {
13!
1256
      dError("thread:%d, failed to create thread to restore vnode since %s", pThread->threadIndex, strerror(ERRNO));
×
1257
    }
1258

1259
    (void)taosThreadAttrDestroy(&thAttr);
13✔
1260
  }
1261

1262
  for (int32_t t = 0; t < threadNum; ++t) {
315✔
1263
    SVnodeThread *pThread = &threads[t];
300✔
1264
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
300!
1265
      (void)taosThreadJoin(pThread->thread, NULL);
13✔
1266
      taosThreadClear(&pThread->thread);
13✔
1267
    }
1268
    taosMemoryFree(pThread->ppVnodes);
300!
1269
  }
1270
  taosMemoryFree(threads);
15!
1271

1272
  for (int32_t i = 0; i < numOfVnodes; ++i) {
28✔
1273
    if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
13!
1274
    vmReleaseVnode(pMgmt, ppVnodes[i]);
13✔
1275
  }
1276

1277
  if (ppVnodes != NULL) {
15!
1278
    taosMemoryFree(ppVnodes);
15!
1279
  }
1280

1281
  return vmInitTimer(pMgmt);
15✔
1282

1283
_exit:
1284
  for (int32_t t = 0; t < threadNum; ++t) {
1285
    SVnodeThread *pThread = &threads[t];
1286
    taosMemoryFree(pThread->ppVnodes);
1287
  }
1288
  taosMemoryFree(threads);
1289
  return code;
1290
}
1291

1292
static void vmStop(SVnodeMgmt *pMgmt) { vmCleanupTimer(pMgmt); }
15✔
1293

1294
SMgmtFunc vmGetMgmtFunc() {
15✔
1295
  SMgmtFunc mgmtFunc = {0};
15✔
1296
  mgmtFunc.openFp = vmInit;
15✔
1297
  mgmtFunc.closeFp = (NodeCloseFp)vmCleanup;
15✔
1298
  mgmtFunc.startFp = (NodeStartFp)vmStartVnodes;
15✔
1299
  mgmtFunc.stopFp = (NodeStopFp)vmStop;
15✔
1300
  mgmtFunc.requiredFp = vmRequire;
15✔
1301
  mgmtFunc.getHandlesFp = vmGetMsgHandles;
15✔
1302

1303
  return mgmtFunc;
15✔
1304
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc