• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5058

17 May 2026 01:15AM UTC coverage: 73.387% (-0.02%) from 73.406%
#5058

push

travis-ci

web-flow
feat (TDgpt): Dynamic Model Synchronization Enhancements (#35344)

* refactor: do some internal refactor.

* fix: fix multiprocess sync issue.

* feat: add dynamic anomaly detection and forecasting services

* fix: log error message for undeploying model in exception handling

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* fix: handle undeploy when model exists only on disk

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/286aafa0-c3ce-4c27-b803-2707571e9dc1

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: guard dynamic registry concurrent access

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: tighten service list locking scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: restore prophet support and update tests per review feedback

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: improve test name and move copy inside lock scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* Potential fix for pull request finding

Co-au... (continued)

281656 of 383795 relevant lines covered (73.39%)

135114337.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.03
/source/dnode/mgmt/mgmt_vnode/src/vmInt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "vmInt.h"
18
#include "libs/function/tudf.h"
19
#include "osMemory.h"
20
#include "tfs.h"
21
#include "vnd.h"
22

23
int32_t vmGetPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
3,723,974✔
24
  int32_t    diskId = -1;
3,723,974✔
25
  SVnodeObj *pVnode = NULL;
3,723,974✔
26

27
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
3,726,395✔
28
  int32_t r = taosHashGetDup(pMgmt->runngingHash, &vgId, sizeof(int32_t), (void *)&pVnode);
3,725,996✔
29
  if (pVnode != NULL) {
3,728,978✔
30
    diskId = pVnode->diskPrimary;
×
31
  }
32
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
3,728,978✔
33
  return diskId;
3,732,088✔
34
}
35

36
static void vmFreeVnodeObj(SVnodeObj **ppVnode) {
9,663,496✔
37
  if (!ppVnode || !(*ppVnode)) return;
9,663,496✔
38

39
  SVnodeObj *pVnode = *ppVnode;
9,663,496✔
40

41
  int32_t refCount = atomic_load_32(&pVnode->refCount);
9,663,496✔
42
  while (refCount > 0) {
9,666,421✔
43
    dWarn("vgId:%d, vnode is refenced, retry to free in 200ms, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
2,925✔
44
    taosMsleep(200);
2,925✔
45
    refCount = atomic_load_32(&pVnode->refCount);
2,925✔
46
  }
47

48
  taosMemoryFree(pVnode->path);
9,663,496✔
49
  taosMemoryFree(pVnode);
9,662,925✔
50
  ppVnode[0] = NULL;
9,663,092✔
51
}
52

53
static int32_t vmRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId, int32_t diskId) {
3,733,118✔
54
  int32_t    code = 0;
3,733,118✔
55
  SVnodeObj *pCreatingVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
3,733,118✔
56
  if (pCreatingVnode == NULL) {
3,733,118✔
57
    dError("failed to alloc vnode since %s", terrstr());
×
58
    return terrno;
×
59
  }
60
  (void)memset(pCreatingVnode, 0, sizeof(SVnodeObj));
3,733,118✔
61

62
  pCreatingVnode->vgId = vgId;
3,733,118✔
63
  pCreatingVnode->diskPrimary = diskId;
3,733,118✔
64

65
  code = taosThreadRwlockWrlock(&pMgmt->hashLock);
3,733,118✔
66
  if (code != 0) {
3,733,118✔
67
    taosMemoryFree(pCreatingVnode);
×
68
    return code;
×
69
  }
70

71
  dTrace("vgId:%d, put vnode into creating hash, pCreatingVnode:%p", vgId, pCreatingVnode);
3,733,118✔
72
  code = taosHashPut(pMgmt->creatingHash, &vgId, sizeof(int32_t), &pCreatingVnode, sizeof(SVnodeObj *));
3,733,118✔
73
  if (code != 0) {
3,733,118✔
74
    dError("vgId:%d, failed to put vnode to creatingHash", vgId);
×
75
    taosMemoryFree(pCreatingVnode);
×
76
  }
77

78
  int32_t r = taosThreadRwlockUnlock(&pMgmt->hashLock);
3,733,118✔
79
  if (r != 0) {
3,733,118✔
80
    dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r));
×
81
  }
82

83
  return code;
3,733,118✔
84
}
85

86
static void vmUnRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId) {
3,734,662✔
87
  SVnodeObj *pOld = NULL;
3,734,662✔
88

89
  (void)taosThreadRwlockWrlock(&pMgmt->hashLock);
3,734,662✔
90
  int32_t r = taosHashGetDup(pMgmt->creatingHash, &vgId, sizeof(int32_t), (void *)&pOld);
3,734,662✔
91
  if (r != 0) {
3,734,662✔
92
    dError("vgId:%d, failed to get vnode from creating Hash", vgId);
×
93
  }
94
  dTrace("vgId:%d, remove from creating Hash", vgId);
3,734,662✔
95
  r = taosHashRemove(pMgmt->creatingHash, &vgId, sizeof(int32_t));
3,734,662✔
96
  if (r != 0) {
3,734,662✔
97
    dError("vgId:%d, failed to remove vnode from creatingHash", vgId);
1,544✔
98
  }
99
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
3,734,662✔
100

101
  if (pOld) {
3,734,662✔
102
    dTrace("vgId:%d, free vnode pOld:%p", vgId, &pOld);
3,733,118✔
103
    vmFreeVnodeObj(&pOld);
3,733,118✔
104
  }
105

106
_OVER:
1,544✔
107
  if (r != 0) {
3,734,367✔
108
    dError("vgId:%d, failed to remove vnode from creatingHash since %s", vgId, tstrerror(r));
1,544✔
109
  }
110
}
3,734,367✔
111

112
int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
3,721,152✔
113
  int32_t code = 0;
3,721,152✔
114
  STfs   *pTfs = pMgmt->pTfs;
3,721,152✔
115
  int32_t diskId = 0;
3,714,415✔
116
  if (!pTfs) {
3,714,415✔
117
    return diskId;
×
118
  }
119

120
  // search fs
121
  char vnodePath[TSDB_FILENAME_LEN] = {0};
3,714,415✔
122
  snprintf(vnodePath, TSDB_FILENAME_LEN - 1, "vnode%svnode%d", TD_DIRSEP, vgId);
3,718,378✔
123
  char fname[TSDB_FILENAME_LEN] = {0};
3,718,378✔
124
  char fnameTmp[TSDB_FILENAME_LEN] = {0};
3,726,326✔
125
  snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME);
3,709,632✔
126
  snprintf(fnameTmp, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME_TMP);
3,709,632✔
127

128
  diskId = tfsSearch(pTfs, 0, fname);
3,709,632✔
129
  if (diskId >= 0) {
3,727,977✔
130
    return diskId;
×
131
  }
132
  diskId = tfsSearch(pTfs, 0, fnameTmp);
3,727,977✔
133
  if (diskId >= 0) {
3,728,522✔
134
    return diskId;
×
135
  }
136

137
  // alloc
138
  int32_t     disks[TFS_MAX_DISKS_PER_TIER] = {0};
3,728,522✔
139
  int32_t     numOfVnodes = 0;
3,730,489✔
140
  SVnodeObj **ppVnodes = NULL;
3,729,836✔
141

142
  code = taosThreadMutexLock(&pMgmt->mutex);
3,729,674✔
143
  if (code != 0) {
3,733,118✔
144
    return code;
×
145
  }
146

147
  code = vmGetAllVnodeListFromHashWithCreating(pMgmt, &numOfVnodes, &ppVnodes);
3,733,118✔
148
  if (code != 0) {
3,733,118✔
149
    int32_t r = taosThreadMutexUnlock(&pMgmt->mutex);
×
150
    if (r != 0) {
×
151
      dError("vgId:%d, failed to unlock mutex since %s", vgId, tstrerror(r));
×
152
    }
153
    return code;
×
154
  }
155

156
  for (int32_t v = 0; v < numOfVnodes; v++) {
16,358,957✔
157
    SVnodeObj *pVnode = ppVnodes[v];
12,625,839✔
158
    disks[pVnode->diskPrimary] += 1;
12,625,839✔
159
  }
160

161
  int32_t minVal = INT_MAX;
3,733,118✔
162
  int32_t ndisk = tfsGetDisksAtLevel(pTfs, 0);
3,733,118✔
163
  diskId = 0;
3,733,118✔
164
  for (int32_t id = 0; id < ndisk; id++) {
7,746,241✔
165
    if (minVal > disks[id]) {
4,013,123✔
166
      minVal = disks[id];
3,793,833✔
167
      diskId = id;
3,793,833✔
168
    }
169
  }
170
  code = vmRegisterCreatingState(pMgmt, vgId, diskId);
3,733,118✔
171
  if (code != 0) {
3,733,118✔
172
    int32_t r = taosThreadMutexUnlock(&pMgmt->mutex);
×
173
    if (r != 0) {
×
174
      dError("vgId:%d, failed to unlock mutex since %s", vgId, tstrerror(r));
×
175
    }
176
    goto _OVER;
×
177
  }
178

179
  code = taosThreadMutexUnlock(&pMgmt->mutex);
3,733,118✔
180
  if (code != 0) {
3,733,118✔
181
    goto _OVER;
×
182
  }
183

184
_OVER:
3,733,118✔
185

186
  for (int32_t i = 0; i < numOfVnodes; ++i) {
16,358,957✔
187
    if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
12,625,839✔
188
    vmReleaseVnode(pMgmt, ppVnodes[i]);
12,625,839✔
189
  }
190
  if (ppVnodes != NULL) {
3,733,118✔
191
    taosMemoryFree(ppVnodes);
3,733,118✔
192
  }
193

194
  if (code != 0) {
3,733,118✔
195
    dError("vgId:%d, failed to alloc disk since %s", vgId, tstrerror(code));
×
196
    return code;
×
197
  } else {
198
    dInfo("vgId:%d, alloc disk:%d of level 0. ndisk:%d, vnodes: %d", vgId, diskId, ndisk, numOfVnodes);
3,733,118✔
199
    return diskId;
3,733,118✔
200
  }
201
}
202

203
void vmCleanPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) { vmUnRegisterCreatingState(pMgmt, vgId); }
3,734,662✔
204

205
SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict) {
2,147,483,647✔
206
  SVnodeObj *pVnode = NULL;
2,147,483,647✔
207

208
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
2,147,483,647✔
209
  int32_t r = taosHashGetDup(pMgmt->runngingHash, &vgId, sizeof(int32_t), (void *)&pVnode);
2,147,483,647✔
210
  if (pVnode == NULL || strict && (pVnode->dropped || pVnode->failed)) {
2,147,483,647✔
211
    terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
30,638,184✔
212
    dDebug("vgId:%d, acquire vnode failed.", vgId);
30,209,683✔
213
    pVnode = NULL;
30,208,357✔
214
  } else {
215
    int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
2,147,483,647✔
216
    dTrace("vgId:%d, acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
2,147,483,647✔
217
  }
218
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
2,147,483,647✔
219

220
  return pVnode;
2,147,483,647✔
221
}
222

223
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) { return vmAcquireVnodeImpl(pMgmt, vgId, true); }
2,147,483,647✔
224

225
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
2,147,483,647✔
226
  if (pVnode == NULL) return;
2,147,483,647✔
227

228
  //(void)taosThreadRwlockRdlock(&pMgmt->lock);
229
  int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
2,147,483,647✔
230
  dTrace("vgId:%d, release vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
2,147,483,647✔
231
  //(void)taosThreadRwlockUnlock(&pMgmt->lock);
232
}
233

234
static int32_t vmRegisterRunningState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
5,050,546✔
235
  SVnodeObj *pOld = NULL;
5,050,546✔
236
  dInfo("vgId:%d, put vnode into running hash", pVnode->vgId);
5,050,546✔
237

238
  int32_t r = taosHashGetDup(pMgmt->runngingHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
5,050,546✔
239
  if (r != 0) {
5,050,546✔
240
    dError("vgId:%d, failed to get vnode from hash", pVnode->vgId);
×
241
  }
242
  if (pOld) {
5,050,546✔
243
    vmFreeVnodeObj(&pOld);
×
244
  }
245
  int32_t code = taosHashPut(pMgmt->runngingHash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
5,050,546✔
246

247
  return code;
5,050,546✔
248
}
249

250
static void vmUnRegisterRunningState(SVnodeMgmt *pMgmt, int32_t vgId) {
5,050,546✔
251
  dInfo("vgId:%d, remove from hash", vgId);
5,050,546✔
252
  int32_t r = taosHashRemove(pMgmt->runngingHash, &vgId, sizeof(int32_t));
5,050,546✔
253
  if (r != 0) {
5,050,546✔
254
    dError("vgId:%d, failed to remove vnode since %s", vgId, tstrerror(r));
×
255
  }
256
}
5,050,546✔
257

258
static int32_t vmRegisterClosedState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
880,633✔
259
  int32_t    code = 0;
880,633✔
260
  dInfo("vgId:%d, put into closed hash", pVnode->vgId);
880,633✔
261
  SVnodeObj *pClosedVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
880,633✔
262
  if (pClosedVnode == NULL) {
880,633✔
263
    dError("failed to alloc vnode since %s", terrstr());
×
264
    return terrno;
×
265
  }
266
  (void)memset(pClosedVnode, 0, sizeof(SVnodeObj));
880,633✔
267

268
  pClosedVnode->vgId = pVnode->vgId;
880,633✔
269
  pClosedVnode->dropped = pVnode->dropped;
880,633✔
270
  pClosedVnode->vgVersion = pVnode->vgVersion;
880,633✔
271
  pClosedVnode->diskPrimary = pVnode->diskPrimary;
880,633✔
272
  pClosedVnode->toVgId = pVnode->toVgId;
880,633✔
273
  pClosedVnode->mountId = pVnode->mountId;
880,633✔
274

275
  SVnodeObj *pOld = NULL;
880,633✔
276
  int32_t    r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
880,633✔
277
  if (r != 0) {
880,633✔
278
    dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
×
279
  }
280
  if (pOld) {
880,633✔
281
    vmFreeVnodeObj(&pOld);
×
282
  }
283
  dInfo("vgId:%d, put vnode to closedHash", pVnode->vgId);
880,633✔
284
  r = taosHashPut(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), &pClosedVnode, sizeof(SVnodeObj *));
880,633✔
285
  if (r != 0) {
880,633✔
286
    dError("vgId:%d, failed to put vnode to closedHash", pVnode->vgId);
×
287
  }
288

289
  return code;
880,633✔
290
}
291

292
static void vmUnRegisterClosedState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
5,050,546✔
293
  SVnodeObj *pOld = NULL;
5,050,546✔
294
  dInfo("vgId:%d, remove from closed hash", pVnode->vgId);
5,050,546✔
295
  int32_t r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
5,050,546✔
296
  if (r != 0) {
5,050,546✔
297
    dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
×
298
  }
299
  if (pOld != NULL) {
5,050,546✔
300
    vmFreeVnodeObj(&pOld);
880,633✔
301
    dInfo("vgId:%d, remove from closedHash", pVnode->vgId);
880,633✔
302
    r = taosHashRemove(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t));
880,633✔
303
    if (r != 0) {
880,633✔
304
      if (r == TSDB_CODE_NOT_FOUND) {
×
305
        dWarn("vgId:%d, vnode not found in closedHash when unregistering", pVnode->vgId);
×
306
      } else {
307
        dError("vgId:%d, failed to remove vnode from hash when unregistering since %s", pVnode->vgId, tstrerror(r));
×
308
      }
309
    }
310
  }
311
}
5,050,546✔
312
#ifdef USE_MOUNT
313
int32_t vmAcquireMountTfs(SVnodeMgmt *pMgmt, int64_t mountId, const char *mountName, const char *mountPath,
3,080✔
314
                          STfs **ppTfs) {
315
  int32_t    code = 0, lino = 0;
3,080✔
316
  TdFilePtr  pFile = NULL;
3,080✔
317
  SArray    *pDisks = NULL;
3,080✔
318
  SMountTfs *pMountTfs = NULL;
3,080✔
319
  bool       unlock = false;
3,080✔
320

321
  pMountTfs = taosHashGet(pMgmt->mountTfsHash, &mountId, sizeof(mountId));
3,080✔
322
  if (pMountTfs && *(SMountTfs **)pMountTfs) {
3,080✔
323
    if (!(*ppTfs = (*(SMountTfs **)pMountTfs)->pTfs)) {
2,118✔
324
      TAOS_CHECK_EXIT(TSDB_CODE_INTERNAL_ERROR);
×
325
    }
326
    (void)atomic_add_fetch_32(&(*(SMountTfs **)pMountTfs)->nRef, 1);
2,118✔
327
    TAOS_RETURN(code);
2,118✔
328
  }
329
  if (!mountPath || mountPath[0] == 0 || mountId == 0) {
962✔
330
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_PARA);
×
331
  }
332
  (void)(taosThreadMutexLock(&pMgmt->mutex));
962✔
333
  unlock = true;
962✔
334
  pMountTfs = taosHashGet(pMgmt->mountTfsHash, &mountId, sizeof(mountId));
962✔
335
  if (pMountTfs && *(SMountTfs **)pMountTfs) {
962✔
336
    if (!(*ppTfs = (*(SMountTfs **)pMountTfs)->pTfs)) {
576✔
337
      TAOS_CHECK_EXIT(TSDB_CODE_INTERNAL_ERROR);
×
338
    }
339
    (void)taosThreadMutexUnlock(&pMgmt->mutex);
576✔
340
    (void)atomic_add_fetch_32(&(*(SMountTfs **)pMountTfs)->nRef, 1);
576✔
341
    TAOS_RETURN(code);
576✔
342
  }
343

344
  TAOS_CHECK_EXIT(vmMountCheckRunning(mountName, mountPath, &pFile, 3));
386✔
345
  TAOS_CHECK_EXIT(vmGetMountDisks(pMgmt, mountPath, &pDisks));
386✔
346
  int32_t numOfDisks = taosArrayGetSize(pDisks);
386✔
347
  if (numOfDisks <= 0) {
386✔
348
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
349
  }
350
  TSDB_CHECK_NULL((pMountTfs = taosMemoryCalloc(1, sizeof(SMountTfs))), code, lino, _exit, terrno);
386✔
351
  if (mountName) (void)snprintf(pMountTfs->name, sizeof(pMountTfs->name), "%s", mountName);
386✔
352
  if (mountPath) (void)snprintf(pMountTfs->path, sizeof(pMountTfs->path), "%s", mountPath);
386✔
353
  pMountTfs->pFile = pFile;
386✔
354
  atomic_store_32(&pMountTfs->nRef, 2);  // init and acquire
386✔
355
  TAOS_CHECK_EXIT(tfsOpen(TARRAY_GET_ELEM(pDisks, 0), numOfDisks, &pMountTfs->pTfs));
386✔
356
  TAOS_CHECK_EXIT(taosHashPut(pMgmt->mountTfsHash, &mountId, sizeof(mountId), &pMountTfs, POINTER_BYTES));
386✔
357
_exit:
386✔
358
  if (unlock) {
386✔
359
    (void)taosThreadMutexUnlock(&pMgmt->mutex);
386✔
360
  }
361
  taosArrayDestroy(pDisks);
386✔
362
  if (code != 0) {
386✔
363
    dError("mount:%" PRIi64 ",%s, failed at line %d to get mount tfs since %s", mountId, mountPath ? mountPath : "NULL",
×
364
           lino, tstrerror(code));
365
    if (pFile) {
×
366
      (void)taosUnLockFile(pFile);
×
367
      (void)taosCloseFile(&pFile);
×
368
    }
369
    if (pMountTfs) {
×
370
      tfsClose(pMountTfs->pTfs);
×
371
      taosMemoryFree(pMountTfs);
×
372
    }
373
    *ppTfs = NULL;
×
374
  } else {
375
    *ppTfs = pMountTfs->pTfs;
386✔
376
  }
377

378
  TAOS_RETURN(code);
386✔
379
}
380
#endif
381

382
bool vmReleaseMountTfs(SVnodeMgmt *pMgmt, int64_t mountId, int32_t minRef) {
3,080✔
383
#ifdef USE_MOUNT
384
  SMountTfs *pMountTfs = NULL;
3,080✔
385
  int32_t    nRef = INT32_MAX, code = 0;
3,080✔
386

387
  pMountTfs = taosHashGet(pMgmt->mountTfsHash, &mountId, sizeof(mountId));
3,080✔
388
  if (pMountTfs && *(SMountTfs **)pMountTfs) {
3,080✔
389
    if ((nRef = atomic_sub_fetch_32(&(*(SMountTfs **)pMountTfs)->nRef, 1)) <= minRef) {
3,080✔
390
      (void)(taosThreadMutexLock(&pMgmt->mutex));
384✔
391
      SMountTfs *pTmp = taosHashGet(pMgmt->mountTfsHash, &mountId, sizeof(mountId));
384✔
392
      if (pTmp && *(SMountTfs **)pTmp) {
384✔
393
        dInfo("mount:%" PRIi64 ", ref:%d, release mount tfs", mountId, nRef);
384✔
394
        tfsClose((*(SMountTfs **)pTmp)->pTfs);
384✔
395
        if ((*(SMountTfs **)pTmp)->pFile) {
384✔
396
          (void)taosUnLockFile((*(SMountTfs **)pTmp)->pFile);
384✔
397
          (void)taosCloseFile(&(*(SMountTfs **)pTmp)->pFile);
384✔
398
        }
399
        taosMemoryFree(*(SMountTfs **)pTmp);
384✔
400
        if ((code = taosHashRemove(pMgmt->mountTfsHash, &mountId, sizeof(mountId))) < 0) {
384✔
401
          dError("failed at line %d to remove mountId:%" PRIi64 " from mount tfs hash", __LINE__, mountId);
×
402
        }
403
      }
404
      (void)taosThreadMutexUnlock(&pMgmt->mutex);
384✔
405
      return true;
384✔
406
    }
407
  }
408
#endif
409
  return false;
2,696✔
410
}
411

412
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
5,048,633✔
413
  SVnodeObj *pVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
5,048,633✔
414
  if (pVnode == NULL) {
5,050,546✔
415
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
416
    return -1;
×
417
  }
418

419
  pVnode->vgId = pCfg->vgId;
5,050,546✔
420
  pVnode->vgVersion = pCfg->vgVersion;
5,050,546✔
421
  pVnode->diskPrimary = pCfg->diskPrimary;
5,050,546✔
422
  pVnode->mountId = pCfg->mountId;
5,050,546✔
423
  pVnode->refCount = 0;
5,050,546✔
424
  pVnode->dropped = 0;
5,050,546✔
425
  pVnode->failed = 0;
5,050,481✔
426
  pVnode->path = taosStrdup(pCfg->path);
5,050,546✔
427
  pVnode->pImpl = pImpl;
5,050,546✔
428

429
  if (pVnode->path == NULL) {
5,050,546✔
430
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
431
    taosMemoryFree(pVnode);
×
432
    return -1;
×
433
  }
434

435
  if (pImpl) {
5,050,546✔
436
    if (vmAllocQueue(pMgmt, pVnode) != 0) {
5,050,546✔
437
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
438
      taosMemoryFree(pVnode->path);
×
439
      taosMemoryFree(pVnode);
×
440
      return -1;
×
441
    }
442
  } else {
443
    pVnode->failed = 1;
×
444
  }
445

446
  (void)taosThreadRwlockWrlock(&pMgmt->hashLock);
5,050,546✔
447
  int32_t code = vmRegisterRunningState(pMgmt, pVnode);
5,050,546✔
448
  vmUnRegisterClosedState(pMgmt, pVnode);
5,050,546✔
449
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
5,050,546✔
450

451
  TAOS_RETURN(code);
5,050,546✔
452
}
453

454
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal, bool keepClosed) {
5,049,479✔
455
  vDebug("vgId:%d, start to close vnode", pVnode->vgId);
5,049,479✔
456
  char path[TSDB_FILENAME_LEN] = {0};
5,049,659✔
457
  bool atExit = true;
5,050,139✔
458

459
  if (pVnode->pImpl && vnodeIsLeader(pVnode->pImpl)) {
5,050,139✔
460
    vnodeProposeCommitOnNeed(pVnode->pImpl, atExit);
4,002,652✔
461
  }
462

463
  (void)taosThreadRwlockWrlock(&pMgmt->hashLock);
5,047,721✔
464
  vmUnRegisterRunningState(pMgmt, pVnode->vgId);
5,050,546✔
465
  if (keepClosed) {
5,050,546✔
466
    if (vmRegisterClosedState(pMgmt, pVnode) != 0) {
880,633✔
467
      (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
468
      return;
×
469
    };
470
  }
471
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
5,050,546✔
472

473
  vmReleaseVnode(pMgmt, pVnode);
5,050,546✔
474

475
  if (pVnode->failed) {
5,050,546✔
476
    goto _closed;
×
477
  }
478
  dInfo("vgId:%d, pre close", pVnode->vgId);
5,050,546✔
479
  vnodePreClose(pVnode->pImpl);
5,050,546✔
480

481
  dInfo("vgId:%d, wait for vnode ref become 0", pVnode->vgId);
5,050,468✔
482
  while (pVnode->refCount > 0) taosMsleep(10);
5,050,773✔
483

484
  dInfo("vgId:%d, wait for vnode write queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteW.queue,
5,050,546✔
485
        taosQueueGetThreadId(pVnode->pWriteW.queue));
486
  tMultiWorkerCleanup(&pVnode->pWriteW);
5,050,546✔
487

488
  dInfo("vgId:%d, wait for vnode sync queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue,
5,050,546✔
489
        taosQueueGetThreadId(pVnode->pSyncW.queue));
490
  tMultiWorkerCleanup(&pVnode->pSyncW);
5,050,546✔
491

492
  dInfo("vgId:%d, wait for vnode sync rd queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncRdW.queue,
5,050,546✔
493
        taosQueueGetThreadId(pVnode->pSyncRdW.queue));
494
  tMultiWorkerCleanup(&pVnode->pSyncRdW);
5,050,546✔
495

496
  dInfo("vgId:%d, wait for vnode apply queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue,
5,050,546✔
497
        taosQueueGetThreadId(pVnode->pApplyW.queue));
498
  tMultiWorkerCleanup(&pVnode->pApplyW);
5,050,546✔
499

500
  dInfo("vgId:%d, wait for vnode fetch queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
5,050,546✔
501
        taosQueueGetThreadId(pVnode->pFetchQ));
502
  while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
5,050,546✔
503

504
  dInfo("vgId:%d, wait for vnode query queue:%p is empty", pVnode->vgId, pVnode->pQueryQ);
5,050,546✔
505
  while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
5,052,262✔
506

507
  dInfo("vgId:%d, wait for vnode stream reader queue:%p is empty", pVnode->vgId, pVnode->pStreamReaderQ);
5,050,546✔
508
  while (!taosQueueEmpty(pVnode->pStreamReaderQ)) taosMsleep(10);
5,050,546✔
509

510
  dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId);
5,050,546✔
511

512
  dInfo("vgId:%d, post close", pVnode->vgId);
5,050,546✔
513
  vnodePostClose(pVnode->pImpl);
5,050,546✔
514

515
  vmFreeQueue(pMgmt, pVnode);
5,050,114✔
516

517
  if (commitAndRemoveWal) {
5,050,546✔
518
    dInfo("vgId:%d, commit data for vnode split", pVnode->vgId);
29,523✔
519
    if (vnodeSyncCommit(pVnode->pImpl) != 0) {
29,523✔
520
      dError("vgId:%d, failed to commit data", pVnode->vgId);
×
521
    }
522
    if (vnodeBegin(pVnode->pImpl) != 0) {
29,523✔
523
      dError("vgId:%d, failed to begin", pVnode->vgId);
×
524
    }
525
    dInfo("vgId:%d, commit data finished", pVnode->vgId);
29,523✔
526
  }
527

528
  int32_t nodeId = vnodeNodeId(pVnode->pImpl);
5,050,546✔
529
  vnodeClose(pVnode->pImpl);
5,050,546✔
530
  pVnode->pImpl = NULL;
5,049,232✔
531

532
_closed:
5,046,145✔
533
  dInfo("vgId:%d, vnode is closed", pVnode->vgId);
5,046,145✔
534

535
  if (commitAndRemoveWal) {
5,050,546✔
536
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d%swal", TD_DIRSEP, pVnode->vgId, TD_DIRSEP);
29,523✔
537
    dInfo("vgId:%d, remove all wals, path:%s", pVnode->vgId, path);
29,523✔
538
    if (tfsRmdir(pMgmt->pTfs, path) != 0) {
29,523✔
539
      dTrace("vgId:%d, failed to remove wals, path:%s", pVnode->vgId, path);
×
540
    }
541
    if (tfsMkdir(pMgmt->pTfs, path) != 0) {
29,523✔
542
      dTrace("vgId:%d, failed to create wals, path:%s", pVnode->vgId, path);
×
543
    }
544
  }
545

546
  if (pVnode->dropped) {
5,050,546✔
547
    dInfo("vgId:%d, vnode is destroyed, dropped:%d", pVnode->vgId, pVnode->dropped);
1,979,236✔
548
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId);
1,979,236✔
549
    vnodeDestroy(pVnode->vgId, path, pMgmt->pTfs, nodeId);
1,979,236✔
550
  }
551
  if (pVnode->mountId && vmReleaseMountTfs(pMgmt, pVnode->mountId, pVnode->dropped ? 1 : 0)) {
5,050,546✔
552
    if (vmWriteMountListToFile(pMgmt) != 0) {
384✔
553
      dError("vgId:%d, failed at line %d to write mount list since %s", pVnode->vgId, __LINE__, terrstr());
×
554
    }
555
  }
556

557
  vmFreeVnodeObj(&pVnode);
5,049,745✔
558
}
559

560
void vmCloseFailedVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
×
561
  int32_t r = 0;
×
562
  r = taosThreadRwlockWrlock(&pMgmt->hashLock);
×
563
  if (r != 0) {
×
564
    dError("vgId:%d, failed to lock since %s", vgId, tstrerror(r));
×
565
  }
566
  if (r == 0) {
×
567
    vmUnRegisterRunningState(pMgmt, vgId);
×
568
  }
569
  r = taosThreadRwlockUnlock(&pMgmt->hashLock);
×
570
  if (r != 0) {
×
571
    dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r));
×
572
  }
573
}
×
574

575
static int32_t vmRestoreVgroupId(SWrapperCfg *pCfg, STfs *pTfs) {
×
576
  int32_t srcVgId = pCfg->vgId;
×
577
  int32_t dstVgId = pCfg->toVgId;
×
578
  if (dstVgId == 0) return 0;
×
579

580
  char srcPath[TSDB_FILENAME_LEN];
×
581
  char dstPath[TSDB_FILENAME_LEN];
×
582

583
  snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
×
584
  snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
×
585

586
  int32_t diskPrimary = pCfg->diskPrimary;
×
587
  int32_t vgId = vnodeRestoreVgroupId(srcPath, dstPath, srcVgId, dstVgId, diskPrimary, pTfs);
×
588
  if (vgId <= 0) {
×
589
    dError("vgId:%d, failed to restore vgroup id. srcPath: %s", pCfg->vgId, srcPath);
×
590
    return -1;
×
591
  }
592

593
  pCfg->vgId = vgId;
×
594
  pCfg->toVgId = 0;
×
595
  return 0;
×
596
}
597

598
static void *vmOpenVnodeInThread(void *param) {
404,100✔
599
  SVnodeThread *pThread = param;
404,100✔
600
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
404,100✔
601
  char          path[TSDB_FILENAME_LEN];
404,013✔
602

603
  dInfo("thread:%d, start to open or destroy %d vnodes", pThread->threadIndex, pThread->vnodeNum);
404,100✔
604
  setThreadName("open-vnodes");
404,100✔
605
  taosSetCpuAffinity(THREAD_CAT_MANAGEMENT);
404,100✔
606

607
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
809,940✔
608
    SWrapperCfg *pCfg = &pThread->pCfgs[v];
405,840✔
609
    if (pCfg->dropped) {
405,840✔
610
      char stepDesc[TSDB_STEP_DESC_LEN] = {0};
×
611
      snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to destroy, %d of %d have been dropped", pCfg->vgId,
×
612
               pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
613
      tmsgReportStartup("vnode-destroy", stepDesc);
×
614

615
      snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
×
616
      vnodeDestroy(pCfg->vgId, path, pMgmt->pTfs, 0);
×
617
      pThread->updateVnodesList = true;
×
618
      pThread->dropped++;
×
619
      (void)atomic_add_fetch_32(&pMgmt->state.dropVnodes, 1);
×
620
      continue;
×
621
    }
622

623
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
405,840✔
624
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
405,840✔
625
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
626
    tmsgReportStartup("vnode-open", stepDesc);
405,840✔
627

628
    if (pCfg->toVgId) {
405,840✔
629
      if (vmRestoreVgroupId(pCfg, pMgmt->pTfs) != 0) {
×
630
        dError("vgId:%d, failed to restore vgroup id by thread:%d", pCfg->vgId, pThread->threadIndex);
×
631
        pThread->failed++;
×
632
        continue;
×
633
      }
634
      pThread->updateVnodesList = true;
×
635
    }
636

637
    int32_t diskPrimary = pCfg->mountId == 0 ? pCfg->diskPrimary : 0;
405,840✔
638
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
405,840✔
639

640
    STfs *pMountTfs = NULL;
405,840✔
641
#ifdef USE_MOUNT
642
    bool releaseTfs = false;
405,840✔
643
    if (pCfg->mountId) {
405,840✔
644
      if (vmAcquireMountTfs(pMgmt, pCfg->mountId, NULL, NULL, &pMountTfs) != 0) {
1,536✔
645
        dError("vgId:%d, failed to get mount tfs by thread:%d", pCfg->vgId, pThread->threadIndex);
×
646
        pThread->failed++;
×
647
        continue;
×
648
      }
649
      releaseTfs = true;
1,536✔
650
    }
651
#endif
652

653
    SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMountTfs, pMgmt->msgCb, false);
405,164✔
654

655
    if (pImpl == NULL) {
405,767✔
656
      dError("vgId:%d, failed to open vnode by thread:%d since %s", pCfg->vgId, pThread->threadIndex, terrstr());
112✔
657
      if (terrno != TSDB_CODE_NEED_RETRY) {
112✔
658
        pThread->failed++;
112✔
659
#ifdef USE_MOUNT
660
        if (releaseTfs) vmReleaseMountTfs(pMgmt, pCfg->mountId, 0);
112✔
661
#endif
662
        continue;
112✔
663
      }
664
    }
665

666
    if (pImpl != NULL) {
405,655✔
667
      if (vmOpenVnode(pMgmt, pCfg, pImpl) != 0) {
405,655✔
668
        dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
×
669
        pThread->failed++;
×
670
#ifdef USE_MOUNT
671
        if (releaseTfs) vmReleaseMountTfs(pMgmt, pCfg->mountId, 0);
×
672
#endif
673
        continue;
×
674
      }
675
    }
676

677
    dInfo("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
405,728✔
678
    pThread->opened++;
405,728✔
679
    (void)atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
405,728✔
680
  }
681

682
  dInfo("thread:%d, numOfVnodes:%d, opened:%d dropped:%d failed:%d", pThread->threadIndex, pThread->vnodeNum,
404,100✔
683
        pThread->opened, pThread->dropped, pThread->failed);
684
  return NULL;
404,100✔
685
}
686

687
#ifdef USE_MOUNT
688
static int32_t vmOpenMountTfs(SVnodeMgmt *pMgmt) {
744,355✔
689
  int32_t    code = 0, lino = 0;
744,355✔
690
  int32_t    numOfMounts = 0;
744,355✔
691
  SMountCfg *pMountCfgs = NULL;
744,355✔
692
  SArray    *pDisks = NULL;
744,355✔
693
  TdFilePtr  pFile = NULL;
744,355✔
694
  SMountTfs *pMountTfs = NULL;
744,355✔
695

696
  TAOS_CHECK_EXIT(vmGetMountListFromFile(pMgmt, &pMountCfgs, &numOfMounts));
744,355✔
697
  for (int32_t i = 0; i < numOfMounts; ++i) {
744,739✔
698
    SMountCfg *pCfg = &pMountCfgs[i];
384✔
699
    if (taosHashGet(pMgmt->mountTfsHash, &pCfg->mountId, sizeof(pCfg->mountId))) {
384✔
700
      TAOS_CHECK_EXIT(TSDB_CODE_INTERNAL_ERROR);
×
701
    }
702
    TAOS_CHECK_EXIT(vmMountCheckRunning(pCfg->name, pCfg->path, &pFile, 3));
384✔
703
    TAOS_CHECK_EXIT(vmGetMountDisks(pMgmt, pCfg->path, &pDisks));
384✔
704
    int32_t nDisks = taosArrayGetSize(pDisks);
384✔
705
    if (nDisks < 1 || nDisks > TFS_MAX_DISKS) {
384✔
706
      dError("mount:%s, %" PRIi64 ", %s, invalid number of disks:%d, expect 1 to %d", pCfg->name, pCfg->mountId,
×
707
             pCfg->path, nDisks, TFS_MAX_DISKS);
708
      TAOS_CHECK_EXIT(TSDB_CODE_INVALID_JSON_FORMAT);
×
709
    }
710

711
    TSDB_CHECK_NULL((pMountTfs = taosMemoryCalloc(1, sizeof(SMountTfs))), code, lino, _exit, terrno);
384✔
712
    TAOS_CHECK_EXIT(tfsOpen(TARRAY_GET_ELEM(pDisks, 0), TARRAY_SIZE(pDisks), &pMountTfs->pTfs));
384✔
713
    (void)snprintf(pMountTfs->name, sizeof(pMountTfs->name), "%s", pCfg->name);
384✔
714
    (void)snprintf(pMountTfs->path, sizeof(pMountTfs->path), "%s", pCfg->path);
384✔
715
    pMountTfs->pFile = pFile;
384✔
716
    pMountTfs->nRef = 1;
384✔
717
    TAOS_CHECK_EXIT(taosHashPut(pMgmt->mountTfsHash, &pCfg->mountId, sizeof(pCfg->mountId), &pMountTfs, POINTER_BYTES));
384✔
718
    taosArrayDestroy(pDisks);
384✔
719
    pDisks = NULL;
384✔
720
    pMountTfs = NULL;
384✔
721
    pFile = NULL;
384✔
722
  }
723
_exit:
744,355✔
724
  if (code != 0) {
744,355✔
725
    dError("failed to open mount tfs at line %d since %s", lino, tstrerror(code));
×
726
    if (pFile) {
×
727
      (void)taosUnLockFile(pFile);
×
728
      (void)taosCloseFile(&pFile);
×
729
    }
730
    if (pMountTfs) {
×
731
      tfsClose(pMountTfs->pTfs);
×
732
      taosMemoryFree(pMountTfs);
×
733
    }
734
    taosArrayDestroy(pDisks);
×
735
  }
736
  taosMemoryFree(pMountCfgs);
744,355✔
737
  TAOS_RETURN(code);
744,355✔
738
}
739
#endif
740
static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
744,355✔
741
  pMgmt->runngingHash =
744,355✔
742
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
744,355✔
743
  if (pMgmt->runngingHash == NULL) {
744,355✔
744
    dError("failed to init vnode hash since %s", terrstr());
×
745
    return TSDB_CODE_OUT_OF_MEMORY;
×
746
  }
747

748
  pMgmt->closedHash =
744,355✔
749
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
744,355✔
750
  if (pMgmt->closedHash == NULL) {
744,355✔
751
    dError("failed to init vnode closed hash since %s", terrstr());
×
752
    return TSDB_CODE_OUT_OF_MEMORY;
×
753
  }
754

755
  pMgmt->creatingHash =
744,355✔
756
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
744,355✔
757
  if (pMgmt->creatingHash == NULL) {
744,355✔
758
    dError("failed to init vnode creatingHash hash since %s", terrstr());
×
759
    return TSDB_CODE_OUT_OF_MEMORY;
×
760
  }
761

762
  SWrapperCfg *pCfgs = NULL;
744,355✔
763
  int32_t      numOfVnodes = 0;
744,355✔
764
  int32_t      code = 0;
744,355✔
765
  if ((code = vmGetVnodeListFromFile(pMgmt, &pCfgs, &numOfVnodes)) != 0) {
744,355✔
766
    dInfo("failed to get vnode list from disk since %s", tstrerror(code));
×
767
    return code;
×
768
  }
769

770
  pMgmt->state.totalVnodes = numOfVnodes;
744,355✔
771

772
  int32_t threadNum = tsNumOfCores / 2;
744,355✔
773
  if (threadNum < 1) threadNum = 1;
744,355✔
774
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
744,355✔
775

776
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
744,355✔
777
  if (threads == NULL) {
744,355✔
778
    dError("failed to allocate memory for threads since %s", terrstr());
×
779
    taosMemoryFree(pCfgs);
×
780
    return terrno;
×
781
  }
782

783
  for (int32_t t = 0; t < threadNum; ++t) {
15,631,455✔
784
    threads[t].threadIndex = t;
14,887,100✔
785
    threads[t].pMgmt = pMgmt;
14,887,100✔
786
    threads[t].pCfgs = taosMemoryCalloc(vnodesPerThread, sizeof(SWrapperCfg));
14,887,100✔
787
  }
788

789
  for (int32_t v = 0; v < numOfVnodes; ++v) {
1,150,195✔
790
    int32_t       t = v % threadNum;
405,840✔
791
    SVnodeThread *pThread = &threads[t];
405,840✔
792
    pThread->pCfgs[pThread->vnodeNum++] = pCfgs[v];
405,840✔
793
  }
794

795
  dInfo("open %d vnodes with %d threads", numOfVnodes, threadNum);
744,355✔
796

797
  for (int32_t t = 0; t < threadNum; ++t) {
15,631,455✔
798
    SVnodeThread *pThread = &threads[t];
14,887,100✔
799
    if (pThread->vnodeNum == 0) continue;
14,887,100✔
800

801
    TdThreadAttr thAttr;
404,013✔
802
    (void)taosThreadAttrInit(&thAttr);
404,100✔
803
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
404,100✔
804
#ifdef TD_COMPACT_OS
805
    (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
806
#endif
807
    if (taosThreadCreate(&pThread->thread, &thAttr, vmOpenVnodeInThread, pThread) != 0) {
404,100✔
808
      dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(ERRNO));
×
809
    }
810

811
    (void)taosThreadAttrDestroy(&thAttr);
404,100✔
812
  }
813

814
  bool updateVnodesList = false;
744,355✔
815

816
  for (int32_t t = 0; t < threadNum; ++t) {
15,631,455✔
817
    SVnodeThread *pThread = &threads[t];
14,887,100✔
818
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
14,887,100✔
819
      (void)taosThreadJoin(pThread->thread, NULL);
404,100✔
820
      taosThreadClear(&pThread->thread);
404,100✔
821
    }
822
    taosMemoryFree(pThread->pCfgs);
14,887,100✔
823
    if (pThread->updateVnodesList) updateVnodesList = true;
14,887,100✔
824
  }
825
  taosMemoryFree(threads);
744,355✔
826
  taosMemoryFree(pCfgs);
744,355✔
827

828
  if ((pMgmt->state.openVnodes + pMgmt->state.dropVnodes) != pMgmt->state.totalVnodes) {
744,355✔
829
    dError("there are total vnodes:%d, opened:%d", pMgmt->state.totalVnodes, pMgmt->state.openVnodes);
112✔
830
    return terrno = TSDB_CODE_VND_INIT_FAILED;
112✔
831
  }
832

833
  if (updateVnodesList && (code = vmWriteVnodeListToFile(pMgmt)) != 0) {
744,243✔
834
    dError("failed to write vnode list since %s", tstrerror(code));
×
835
    return code;
×
836
  }
837

838
#ifdef USE_MOUNT
839
  bool  updateMountList = false;
744,243✔
840
  void *pIter = NULL;
744,243✔
841
  while ((pIter = taosHashIterate(pMgmt->mountTfsHash, pIter))) {
744,627✔
842
    SMountTfs *pMountTfs = *(SMountTfs **)pIter;
384✔
843
    if (pMountTfs && atomic_load_32(&pMountTfs->nRef) <= 1) {
384✔
844
      size_t  keyLen = 0;
×
845
      int64_t mountId = *(int64_t *)taosHashGetKey(pIter, &keyLen);
×
846
      dInfo("mount:%s, %s, %" PRIi64 ", ref:%d, remove unused mount tfs", pMountTfs->name, pMountTfs->path, mountId,
×
847
            atomic_load_32(&pMountTfs->nRef));
848
      if (pMountTfs->pFile) {
×
849
        (void)taosUnLockFile(pMountTfs->pFile);
×
850
        (void)taosCloseFile(&pMountTfs->pFile);
×
851
      }
852
      tfsClose(pMountTfs->pTfs);
×
853
      taosMemoryFree(pMountTfs);
×
854
      if ((code = taosHashRemove(pMgmt->mountTfsHash, &mountId, keyLen)) != 0) {
×
855
        dWarn("failed at line %d to remove mount:%s, %s, %" PRIi64 " from mount tfs hash since %s", __LINE__,
×
856
              pMountTfs->name, pMountTfs->path, mountId, tstrerror(code));
857
      }
858
      updateMountList = true;
×
859
    }
860
  }
861
  if (updateMountList && (code = vmWriteMountListToFile(pMgmt)) != 0) {
744,243✔
862
    dError("failed to write mount list at line %d since %s", __LINE__, tstrerror(code));
×
863
    return code;
×
864
  }
865
#endif
866

867
  dInfo("successfully opened %d vnodes", pMgmt->state.totalVnodes);
744,243✔
868
  return 0;
744,243✔
869
}
870

871
static void *vmCloseVnodeInThread(void *param) {
2,126,595✔
872
  SVnodeThread *pThread = param;
2,126,595✔
873
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
2,126,595✔
874

875
  dInfo("thread:%d, start to close %d vnodes", pThread->threadIndex, pThread->vnodeNum);
2,126,595✔
876
  setThreadName("close-vnodes");
2,126,595✔
877
  taosSetCpuAffinity(THREAD_CAT_MANAGEMENT);
2,126,595✔
878

879
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
4,287,749✔
880
    SVnodeObj *pVnode = pThread->ppVnodes[v];
2,161,154✔
881

882
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
2,161,154✔
883
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to close, %d of %d have been closed", pVnode->vgId,
2,161,154✔
884
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
885
    tmsgReportStartup("vnode-close", stepDesc);
2,160,917✔
886

887
    vmCloseVnode(pMgmt, pVnode, false, false);
2,160,472✔
888
  }
889

890
  dInfo("thread:%d, numOfVnodes:%d is closed", pThread->threadIndex, pThread->vnodeNum);
2,126,595✔
891
  return NULL;
2,126,595✔
892
}
893

894
static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
744,355✔
895
  int32_t code = 0;
744,355✔
896
  dInfo("start to close all vnodes");
744,355✔
897
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
744,355✔
898
  dInfo("vnodes mgmt worker is stopped");
744,355✔
899
  tSingleWorkerCleanup(&pMgmt->mgmtMultiWorker);
744,355✔
900
  dInfo("vnodes multiple mgmt worker is stopped");
744,355✔
901

902
  int32_t     numOfVnodes = 0;
744,355✔
903
  SVnodeObj **ppVnodes = NULL;
744,355✔
904
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
744,355✔
905
  if (code != 0) {
744,355✔
906
    dError("failed to get vnode list since %s", tstrerror(code));
×
907
    return;
×
908
  }
909

910
  int32_t threadNum = tsNumOfCores / 2;
744,355✔
911
  if (threadNum < 1) threadNum = 1;
744,355✔
912
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
744,355✔
913

914
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
744,355✔
915
  for (int32_t t = 0; t < threadNum; ++t) {
15,631,455✔
916
    threads[t].threadIndex = t;
14,887,100✔
917
    threads[t].pMgmt = pMgmt;
14,887,100✔
918
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
14,887,100✔
919
  }
920

921
  for (int32_t v = 0; v < numOfVnodes; ++v) {
2,905,509✔
922
    int32_t       t = v % threadNum;
2,161,154✔
923
    SVnodeThread *pThread = &threads[t];
2,161,154✔
924
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
2,161,154✔
925
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
2,161,154✔
926
    }
927
  }
928

929
  pMgmt->state.openVnodes = 0;
744,355✔
930
  dInfo("close %d vnodes with %d threads", numOfVnodes, threadNum);
744,355✔
931

932
  for (int32_t t = 0; t < threadNum; ++t) {
15,631,455✔
933
    SVnodeThread *pThread = &threads[t];
14,887,100✔
934
    if (pThread->vnodeNum == 0) continue;
14,887,100✔
935

936
    TdThreadAttr thAttr;
2,125,366✔
937
    (void)taosThreadAttrInit(&thAttr);
2,126,595✔
938
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
2,126,595✔
939
#ifdef TD_COMPACT_OS
940
    (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
941
#endif
942
    if (taosThreadCreate(&pThread->thread, &thAttr, vmCloseVnodeInThread, pThread) != 0) {
2,126,595✔
943
      dError("thread:%d, failed to create thread to close vnode since %s", pThread->threadIndex, strerror(ERRNO));
×
944
    }
945

946
    (void)taosThreadAttrDestroy(&thAttr);
2,126,595✔
947
  }
948

949
  for (int32_t t = 0; t < threadNum; ++t) {
15,631,455✔
950
    SVnodeThread *pThread = &threads[t];
14,887,100✔
951
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
14,887,100✔
952
      (void)taosThreadJoin(pThread->thread, NULL);
2,126,595✔
953
      taosThreadClear(&pThread->thread);
2,126,595✔
954
    }
955
    taosMemoryFree(pThread->ppVnodes);
14,887,100✔
956
  }
957
  taosMemoryFree(threads);
744,355✔
958

959
  if (ppVnodes != NULL) {
744,355✔
960
    taosMemoryFree(ppVnodes);
744,355✔
961
  }
962

963
  if (pMgmt->runngingHash != NULL) {
744,355✔
964
    taosHashCleanup(pMgmt->runngingHash);
744,355✔
965
    pMgmt->runngingHash = NULL;
744,355✔
966
  }
967

968
  void *pIter = taosHashIterate(pMgmt->closedHash, NULL);
744,355✔
969
  while (pIter) {
744,355✔
970
    SVnodeObj **ppVnode = pIter;
×
971
    vmFreeVnodeObj(ppVnode);
×
972
    pIter = taosHashIterate(pMgmt->closedHash, pIter);
×
973
  }
974

975
  if (pMgmt->closedHash != NULL) {
744,355✔
976
    taosHashCleanup(pMgmt->closedHash);
744,355✔
977
    pMgmt->closedHash = NULL;
744,355✔
978
  }
979

980
  pIter = taosHashIterate(pMgmt->creatingHash, NULL);
744,355✔
981
  while (pIter) {
744,355✔
982
    SVnodeObj **ppVnode = pIter;
×
983
    vmFreeVnodeObj(ppVnode);
×
984
    pIter = taosHashIterate(pMgmt->creatingHash, pIter);
×
985
  }
986

987
  if (pMgmt->creatingHash != NULL) {
744,355✔
988
    taosHashCleanup(pMgmt->creatingHash);
744,355✔
989
    pMgmt->creatingHash = NULL;
744,355✔
990
  }
991

992
#ifdef USE_MOUNT
993
  pIter = NULL;
744,355✔
994
  while ((pIter = taosHashIterate(pMgmt->mountTfsHash, pIter))) {
744,741✔
995
    SMountTfs *mountTfs = *(SMountTfs **)pIter;
386✔
996
    if (mountTfs->pFile) {
386✔
997
      (void)taosUnLockFile(mountTfs->pFile);
386✔
998
      (void)taosCloseFile(&mountTfs->pFile);
386✔
999
    }
1000
    tfsClose(mountTfs->pTfs);
386✔
1001
    taosMemoryFree(mountTfs);
386✔
1002
  }
1003
  taosHashCleanup(pMgmt->mountTfsHash);
744,355✔
1004
  pMgmt->mountTfsHash = NULL;
744,355✔
1005
#endif
1006

1007
  dInfo("total vnodes:%d are all closed", numOfVnodes);
744,355✔
1008
}
1009

1010
static void vmCleanup(SVnodeMgmt *pMgmt) {
744,355✔
1011
  vmCloseVnodes(pMgmt);
744,355✔
1012
  vmStopWorker(pMgmt);
744,355✔
1013
  vnodeCleanup();
744,355✔
1014
  (void)taosThreadRwlockDestroy(&pMgmt->hashLock);
744,355✔
1015
  (void)taosThreadMutexDestroy(&pMgmt->mutex);
744,355✔
1016
  taosMemoryFree(pMgmt);
744,355✔
1017
}
744,355✔
1018

1019
static void vmCheckSyncTimeout(SVnodeMgmt *pMgmt) {
1,643,974✔
1020
  int32_t     code = 0;
1,643,974✔
1021
  int32_t     numOfVnodes = 0;
1,643,974✔
1022
  SVnodeObj **ppVnodes = NULL;
1,643,974✔
1023
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
1,643,974✔
1024
  if (code != 0) {
1,643,974✔
1025
    dError("failed to get vnode list since %s", tstrerror(code));
×
1026
    return;
×
1027
  }
1028

1029
  if (ppVnodes != NULL) {
1,643,974✔
1030
    for (int32_t i = 0; i < numOfVnodes; ++i) {
7,010,646✔
1031
      SVnodeObj *pVnode = ppVnodes[i];
5,366,672✔
1032
      if (!pVnode->failed) {
5,366,672✔
1033
        vnodeSyncCheckTimeout(pVnode->pImpl);
5,366,672✔
1034
      }
1035
      vmReleaseVnode(pMgmt, pVnode);
5,366,672✔
1036
    }
1037
    taosMemoryFree(ppVnodes);
1,643,974✔
1038
  }
1039
}
1040

1041
static void *vmThreadFp(void *param) {
744,243✔
1042
  SVnodeMgmt *pMgmt = param;
744,243✔
1043
  int64_t     lastTime = 0;
744,243✔
1044
  setThreadName("vnode-timer");
744,243✔
1045
  taosSetCpuAffinity(THREAD_CAT_MANAGEMENT);
744,243✔
1046

1047
  while (1) {
587,588,329✔
1048
    lastTime++;
588,332,572✔
1049
    taosMsleep(100);
588,332,572✔
1050
    if (pMgmt->stop) break;
588,332,572✔
1051
    if (lastTime % 10 != 0) continue;
587,588,329✔
1052

1053
    int64_t sec = lastTime / 10;
58,434,705✔
1054
    if (sec % (VNODE_TIMEOUT_SEC / 2) == 0) {
58,434,705✔
1055
      vmCheckSyncTimeout(pMgmt);
1,643,974✔
1056
    }
1057
  }
1058

1059
  return NULL;
744,243✔
1060
}
1061

1062
static int32_t vmInitTimer(SVnodeMgmt *pMgmt) {
744,243✔
1063
  int32_t      code = 0;
744,243✔
1064
  TdThreadAttr thAttr;
742,369✔
1065
  (void)taosThreadAttrInit(&thAttr);
744,243✔
1066
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
744,243✔
1067
#ifdef TD_COMPACT_OS
1068
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
1069
#endif
1070
  if (taosThreadCreate(&pMgmt->thread, &thAttr, vmThreadFp, pMgmt) != 0) {
744,243✔
1071
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
1072
    dError("failed to create vnode timer thread since %s", tstrerror(code));
×
1073
    return code;
×
1074
  }
1075

1076
  (void)taosThreadAttrDestroy(&thAttr);
744,243✔
1077
  return 0;
744,243✔
1078
}
1079

1080
static void vmCleanupTimer(SVnodeMgmt *pMgmt) {
744,243✔
1081
  pMgmt->stop = true;
744,243✔
1082
  if (taosCheckPthreadValid(pMgmt->thread)) {
744,243✔
1083
    (void)taosThreadJoin(pMgmt->thread, NULL);
744,243✔
1084
    taosThreadClear(&pMgmt->thread);
744,243✔
1085
  }
1086
}
744,243✔
1087

1088
static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
744,355✔
1089
  int32_t code = -1;
744,355✔
1090

1091
  SVnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SVnodeMgmt));
744,355✔
1092
  if (pMgmt == NULL) {
744,355✔
1093
    code = terrno;
×
1094
    goto _OVER;
×
1095
  }
1096

1097
  pMgmt->pData = pInput->pData;
744,355✔
1098
  pMgmt->path = pInput->path;
744,355✔
1099
  pMgmt->name = pInput->name;
744,355✔
1100
  pMgmt->msgCb = pInput->msgCb;
744,355✔
1101
  pMgmt->msgCb.putToQueueFp = (PutToQueueFp)vmPutRpcMsgToQueue;
744,355✔
1102
  pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
744,355✔
1103
  pMgmt->msgCb.mgmt = pMgmt;
744,355✔
1104

1105
  code = taosThreadRwlockInit(&pMgmt->hashLock, NULL);
744,355✔
1106
  if (code != 0) {
744,355✔
1107
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
1108
    goto _OVER;
×
1109
  }
1110

1111
  code = taosThreadMutexInit(&pMgmt->mutex, NULL);
744,355✔
1112
  if (code != 0) {
744,355✔
1113
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
1114
    goto _OVER;
×
1115
  }
1116

1117
  pMgmt->pTfs = pInput->pTfs;
744,355✔
1118
  if (pMgmt->pTfs == NULL) {
744,355✔
1119
    dError("tfs is null.");
×
1120
    goto _OVER;
×
1121
  }
1122
#ifdef USE_MOUNT
1123
  if (!(pMgmt->mountTfsHash =
744,355✔
1124
            taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK))) {
744,355✔
1125
    dError("failed to init mountTfsHash since %s", terrstr());
×
1126
    return TSDB_CODE_OUT_OF_MEMORY;
×
1127
  }
1128
  if ((code = vmOpenMountTfs(pMgmt)) != 0) {
744,355✔
1129
    goto _OVER;
×
1130
  }
1131
#endif
1132
  tmsgReportStartup("vnode-tfs", "initialized");
744,355✔
1133
  if ((code = walInit(pInput->stopDnodeFp)) != 0) {
744,355✔
1134
    dError("failed to init wal since %s", tstrerror(code));
×
1135
    goto _OVER;
×
1136
  }
1137

1138
  tmsgReportStartup("vnode-wal", "initialized");
744,355✔
1139

1140
  if ((code = syncInit()) != 0) {
744,355✔
1141
    dError("failed to open sync since %s", tstrerror(code));
×
1142
    goto _OVER;
×
1143
  }
1144
  tmsgReportStartup("vnode-sync", "initialized");
744,355✔
1145

1146
  if ((code = vnodeInit(pInput->stopDnodeFp)) != 0) {
744,355✔
1147
    dError("failed to init vnode since %s", tstrerror(code));
×
1148
    goto _OVER;
×
1149
  }
1150
  tmsgReportStartup("vnode-commit", "initialized");
744,355✔
1151

1152
  if ((code = vmStartWorker(pMgmt)) != 0) {
744,355✔
1153
    dError("failed to init workers since %s", tstrerror(code));
×
1154
    goto _OVER;
×
1155
  }
1156
  tmsgReportStartup("vnode-worker", "initialized");
744,355✔
1157

1158
  if ((code = vmOpenVnodes(pMgmt)) != 0) {
744,355✔
1159
    dError("failed to open all vnodes since %s", tstrerror(code));
112✔
1160
    goto _OVER;
112✔
1161
  }
1162
  tmsgReportStartup("vnode-vnodes", "initialized");
744,243✔
1163

1164
  if ((code = udfcOpen()) != 0) {
744,243✔
1165
    dError("failed to open udfc in vnode since %s", tstrerror(code));
×
1166
    goto _OVER;
×
1167
  }
1168

1169
  code = 0;
744,243✔
1170

1171
_OVER:
744,355✔
1172
  if (code == 0) {
744,355✔
1173
    pOutput->pMgmt = pMgmt;
744,243✔
1174
  } else {
1175
    dError("failed to init vnodes-mgmt since %s", tstrerror(code));
112✔
1176
    vmCleanup(pMgmt);
112✔
1177
  }
1178

1179
  return code;
744,355✔
1180
}
1181

1182
static int32_t vmRequire(const SMgmtInputOpt *pInput, bool *required) {
749,572✔
1183
  *required = tsNumOfSupportVnodes > 0;
749,572✔
1184
  return 0;
749,572✔
1185
}
1186

1187
static void *vmRestoreVnodeInThread(void *param) {
403,652✔
1188
  SVnodeThread *pThread = param;
403,652✔
1189
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
403,652✔
1190

1191
  dInfo("thread:%d, start to restore %d vnodes", pThread->threadIndex, pThread->vnodeNum);
403,652✔
1192
  setThreadName("restore-vnodes");
403,652✔
1193
  taosSetCpuAffinity(THREAD_CAT_MANAGEMENT);
403,652✔
1194

1195
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
809,044✔
1196
    SVnodeObj *pVnode = pThread->ppVnodes[v];
405,392✔
1197
    if (pVnode->failed) {
405,392✔
1198
      dError("vgId:%d, cannot restore a vnode in failed mode.", pVnode->vgId);
×
1199
      continue;
×
1200
    }
1201

1202
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
405,392✔
1203
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been restored", pVnode->vgId,
405,392✔
1204
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
1205
    tmsgReportStartup("vnode-restore", stepDesc);
405,392✔
1206

1207
    int32_t code = vnodeStart(pVnode->pImpl);
405,392✔
1208
    if (code != 0) {
405,392✔
1209
      dError("vgId:%d, failed to restore vnode by thread:%d", pVnode->vgId, pThread->threadIndex);
×
1210
      pThread->failed++;
×
1211
    } else {
1212
      dInfo("vgId:%d, is restored by thread:%d", pVnode->vgId, pThread->threadIndex);
405,392✔
1213
      pThread->opened++;
405,392✔
1214
      (void)atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
405,392✔
1215
    }
1216
  }
1217

1218
  dInfo("thread:%d, numOfVnodes:%d, restored:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
403,652✔
1219
        pThread->failed);
1220
  return NULL;
403,652✔
1221
}
1222

1223
static int32_t vmStartVnodes(SVnodeMgmt *pMgmt) {
744,243✔
1224
  int32_t     code = 0;
744,243✔
1225
  int32_t     numOfVnodes = 0;
744,243✔
1226
  SVnodeObj **ppVnodes = NULL;
744,243✔
1227
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
744,243✔
1228
  if (code != 0) {
744,243✔
1229
    dError("failed to get vnode list since %s", tstrerror(code));
×
1230
    return code;
×
1231
  }
1232

1233
  int32_t threadNum = tsNumOfCores / 2;
744,243✔
1234
  if (threadNum < 1) threadNum = 1;
744,243✔
1235
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
744,243✔
1236

1237
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
744,243✔
1238
  if (threads == NULL) {
744,243✔
1239
    return terrno;
×
1240
  }
1241

1242
  for (int32_t t = 0; t < threadNum; ++t) {
15,629,103✔
1243
    threads[t].threadIndex = t;
14,884,860✔
1244
    threads[t].pMgmt = pMgmt;
14,884,860✔
1245
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
14,884,860✔
1246
    if (threads[t].ppVnodes == NULL) {
14,884,860✔
1247
      code = terrno;
×
1248
      break;
×
1249
    }
1250
  }
1251

1252
  for (int32_t v = 0; v < numOfVnodes; ++v) {
1,149,635✔
1253
    int32_t       t = v % threadNum;
405,392✔
1254
    SVnodeThread *pThread = &threads[t];
405,392✔
1255
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
405,392✔
1256
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
405,392✔
1257
    }
1258
  }
1259

1260
  pMgmt->state.openVnodes = 0;
744,243✔
1261
  pMgmt->state.dropVnodes = 0;
744,243✔
1262
  dInfo("restore %d vnodes with %d threads", numOfVnodes, threadNum);
744,243✔
1263

1264
  for (int32_t t = 0; t < threadNum; ++t) {
15,629,103✔
1265
    SVnodeThread *pThread = &threads[t];
14,884,860✔
1266
    if (pThread->vnodeNum == 0) continue;
14,884,860✔
1267

1268
    TdThreadAttr thAttr;
403,565✔
1269
    (void)taosThreadAttrInit(&thAttr);
403,652✔
1270
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
403,652✔
1271
    if (taosThreadCreate(&pThread->thread, &thAttr, vmRestoreVnodeInThread, pThread) != 0) {
403,652✔
1272
      dError("thread:%d, failed to create thread to restore vnode since %s", pThread->threadIndex, strerror(ERRNO));
×
1273
    }
1274

1275
    (void)taosThreadAttrDestroy(&thAttr);
403,652✔
1276
  }
1277

1278
  for (int32_t t = 0; t < threadNum; ++t) {
15,629,103✔
1279
    SVnodeThread *pThread = &threads[t];
14,884,860✔
1280
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
14,884,860✔
1281
      (void)taosThreadJoin(pThread->thread, NULL);
403,652✔
1282
      taosThreadClear(&pThread->thread);
403,652✔
1283
    }
1284
    taosMemoryFree(pThread->ppVnodes);
14,884,860✔
1285
  }
1286
  taosMemoryFree(threads);
744,243✔
1287

1288
  for (int32_t i = 0; i < numOfVnodes; ++i) {
1,149,635✔
1289
    if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
405,392✔
1290
    vmReleaseVnode(pMgmt, ppVnodes[i]);
405,392✔
1291
  }
1292

1293
  if (ppVnodes != NULL) {
744,243✔
1294
    taosMemoryFree(ppVnodes);
744,243✔
1295
  }
1296

1297
  return vmInitTimer(pMgmt);
744,243✔
1298

1299
_exit:
1300
  for (int32_t t = 0; t < threadNum; ++t) {
1301
    SVnodeThread *pThread = &threads[t];
1302
    taosMemoryFree(pThread->ppVnodes);
1303
  }
1304
  taosMemoryFree(threads);
1305
  return code;
1306
}
1307

1308
static void vmStop(SVnodeMgmt *pMgmt) { vmCleanupTimer(pMgmt); }
744,243✔
1309

1310
SMgmtFunc vmGetMgmtFunc() {
749,572✔
1311
  SMgmtFunc mgmtFunc = {0};
749,572✔
1312
  mgmtFunc.openFp = vmInit;
749,572✔
1313
  mgmtFunc.closeFp = (NodeCloseFp)vmCleanup;
749,572✔
1314
  mgmtFunc.startFp = (NodeStartFp)vmStartVnodes;
749,572✔
1315
  mgmtFunc.stopFp = (NodeStopFp)vmStop;
749,572✔
1316
  mgmtFunc.requiredFp = vmRequire;
749,572✔
1317
  mgmtFunc.getHandlesFp = vmGetMsgHandles;
749,572✔
1318

1319
  return mgmtFunc;
749,572✔
1320
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc