• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.64
/source/dnode/mgmt/mgmt_vnode/src/vmInt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "vmInt.h"
18
#include "libs/function/tudf.h"
19
#include "osMemory.h"
20
#include "tfs.h"
21
#include "vnd.h"
22

23
int32_t vmGetPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
9,673✔
24
  int32_t    diskId = -1;
9,673✔
25
  SVnodeObj *pVnode = NULL;
9,673✔
26

27
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
9,673✔
28
  int32_t r = taosHashGetDup(pMgmt->runngingHash, &vgId, sizeof(int32_t), (void *)&pVnode);
9,681✔
29
  if (pVnode != NULL) {
9,679!
30
    diskId = pVnode->diskPrimary;
×
31
  }
32
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
9,679✔
33
  return diskId;
9,679✔
34
}
35

36
static void vmFreeVnodeObj(SVnodeObj **ppVnode) {
22,818✔
37
  if (!ppVnode || !(*ppVnode)) return;
22,818!
38

39
  SVnodeObj *pVnode = *ppVnode;
22,818✔
40

41
  int32_t refCount = atomic_load_32(&pVnode->refCount);
22,818✔
42
  while (refCount > 0) {
22,818!
UNCOV
43
    dWarn("vgId:%d, vnode is refenced, retry to free in 200ms, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
×
UNCOV
44
    taosMsleep(200);
×
UNCOV
45
    refCount = atomic_load_32(&pVnode->refCount);
×
46
  }
47

48
  taosMemoryFree(pVnode->path);
22,818!
49
  taosMemoryFree(pVnode);
22,818!
50
  ppVnode[0] = NULL;
22,818✔
51
}
52

53
static int32_t vmRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId, int32_t diskId) {
9,682✔
54
  int32_t    code = 0;
9,682✔
55
  SVnodeObj *pCreatingVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
9,682!
56
  if (pCreatingVnode == NULL) {
9,682!
57
    dError("failed to alloc vnode since %s", terrstr());
×
58
    return terrno;
×
59
  }
60
  (void)memset(pCreatingVnode, 0, sizeof(SVnodeObj));
9,682✔
61

62
  pCreatingVnode->vgId = vgId;
9,682✔
63
  pCreatingVnode->diskPrimary = diskId;
9,682✔
64

65
  code = taosThreadRwlockWrlock(&pMgmt->hashLock);
9,682✔
66
  if (code != 0) {
9,682!
67
    taosMemoryFree(pCreatingVnode);
×
68
    return code;
×
69
  }
70

71
  dTrace("vgId:%d, put vnode into creating hash, pCreatingVnode:%p", vgId, pCreatingVnode);
9,682✔
72
  code = taosHashPut(pMgmt->creatingHash, &vgId, sizeof(int32_t), &pCreatingVnode, sizeof(SVnodeObj *));
9,682✔
73
  if (code != 0) {
9,682!
74
    dError("vgId:%d, failed to put vnode to creatingHash", vgId);
×
75
    taosMemoryFree(pCreatingVnode);
×
76
  }
77

78
  int32_t r = taosThreadRwlockUnlock(&pMgmt->hashLock);
9,682✔
79
  if (r != 0) {
9,682!
80
    dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r));
×
81
  }
82

83
  return code;
9,682✔
84
}
85

86
static void vmUnRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId) {
9,682✔
87
  SVnodeObj *pOld = NULL;
9,682✔
88

89
  (void)taosThreadRwlockWrlock(&pMgmt->hashLock);
9,682✔
90
  int32_t r = taosHashGetDup(pMgmt->creatingHash, &vgId, sizeof(int32_t), (void *)&pOld);
9,682✔
91
  if (r != 0) {
9,682!
92
    dError("vgId:%d, failed to get vnode from creating Hash", vgId);
×
93
  }
94
  dTrace("vgId:%d, remove from creating Hash", vgId);
9,682✔
95
  r = taosHashRemove(pMgmt->creatingHash, &vgId, sizeof(int32_t));
9,682✔
96
  if (r != 0) {
9,682!
97
    dError("vgId:%d, failed to remove vnode from creatingHash", vgId);
×
98
  }
99
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
9,682✔
100

101
  if (pOld) {
9,682!
102
    dTrace("vgId:%d, free vnode pOld:%p", vgId, &pOld);
9,682✔
103
    vmFreeVnodeObj(&pOld);
9,682✔
104
  }
105

106
_OVER:
×
107
  if (r != 0) {
9,682!
108
    dError("vgId:%d, failed to remove vnode from creatingHash since %s", vgId, tstrerror(r));
×
109
  }
110
}
9,682✔
111

112
int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
9,672✔
113
  int32_t code = 0;
9,672✔
114
  STfs   *pTfs = pMgmt->pTfs;
9,672✔
115
  int32_t diskId = 0;
9,672✔
116
  if (!pTfs) {
9,672!
117
    return diskId;
×
118
  }
119

120
  // search fs
121
  char vnodePath[TSDB_FILENAME_LEN] = {0};
9,672✔
122
  snprintf(vnodePath, TSDB_FILENAME_LEN - 1, "vnode%svnode%d", TD_DIRSEP, vgId);
9,672✔
123
  char fname[TSDB_FILENAME_LEN] = {0};
9,672✔
124
  char fnameTmp[TSDB_FILENAME_LEN] = {0};
9,672✔
125
  snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME);
9,672✔
126
  snprintf(fnameTmp, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME_TMP);
9,672✔
127

128
  diskId = tfsSearch(pTfs, 0, fname);
9,672✔
129
  if (diskId >= 0) {
9,637!
130
    return diskId;
×
131
  }
132
  diskId = tfsSearch(pTfs, 0, fnameTmp);
9,637✔
133
  if (diskId >= 0) {
9,651!
134
    return diskId;
×
135
  }
136

137
  // alloc
138
  int32_t     disks[TFS_MAX_DISKS_PER_TIER] = {0};
9,651✔
139
  int32_t     numOfVnodes = 0;
9,651✔
140
  SVnodeObj **ppVnodes = NULL;
9,651✔
141

142
  code = taosThreadMutexLock(&pMgmt->mutex);
9,651✔
143
  if (code != 0) {
9,682!
144
    return code;
×
145
  }
146

147
  code = vmGetAllVnodeListFromHashWithCreating(pMgmt, &numOfVnodes, &ppVnodes);
9,682✔
148
  if (code != 0) {
9,682!
149
    int32_t r = taosThreadMutexUnlock(&pMgmt->mutex);
×
150
    if (r != 0) {
×
151
      dError("vgId:%d, failed to unlock mutex since %s", vgId, tstrerror(r));
×
152
    }
153
    return code;
×
154
  }
155

156
  for (int32_t v = 0; v < numOfVnodes; v++) {
56,533✔
157
    SVnodeObj *pVnode = ppVnodes[v];
46,851✔
158
    disks[pVnode->diskPrimary] += 1;
46,851✔
159
  }
160

161
  int32_t minVal = INT_MAX;
9,682✔
162
  int32_t ndisk = tfsGetDisksAtLevel(pTfs, 0);
9,682✔
163
  diskId = 0;
9,682✔
164
  for (int32_t id = 0; id < ndisk; id++) {
19,371✔
165
    if (minVal > disks[id]) {
9,689✔
166
      minVal = disks[id];
9,684✔
167
      diskId = id;
9,684✔
168
    }
169
  }
170
  code = vmRegisterCreatingState(pMgmt, vgId, diskId);
9,682✔
171
  if (code != 0) {
9,682!
172
    int32_t r = taosThreadMutexUnlock(&pMgmt->mutex);
×
173
    if (r != 0) {
×
174
      dError("vgId:%d, failed to unlock mutex since %s", vgId, tstrerror(r));
×
175
    }
176
    goto _OVER;
×
177
  }
178

179
  code = taosThreadMutexUnlock(&pMgmt->mutex);
9,682✔
180
  if (code != 0) {
9,682!
181
    goto _OVER;
×
182
  }
183

184
_OVER:
9,682✔
185

186
  for (int32_t i = 0; i < numOfVnodes; ++i) {
56,533✔
187
    if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
46,851!
188
    vmReleaseVnode(pMgmt, ppVnodes[i]);
46,851✔
189
  }
190
  if (ppVnodes != NULL) {
9,682!
191
    taosMemoryFree(ppVnodes);
9,682!
192
  }
193

194
  if (code != 0) {
9,682!
195
    dError("vgId:%d, failed to alloc disk since %s", vgId, tstrerror(code));
×
196
    return code;
×
197
  } else {
198
    dInfo("vgId:%d, alloc disk:%d of level 0. ndisk:%d, vnodes: %d", vgId, diskId, ndisk, numOfVnodes);
9,682!
199
    return diskId;
9,680✔
200
  }
201
}
202

203
void vmCleanPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) { vmUnRegisterCreatingState(pMgmt, vgId); }
9,682✔
204

205
SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict) {
46,684,953✔
206
  SVnodeObj *pVnode = NULL;
46,684,953✔
207

208
  (void)taosThreadRwlockRdlock(&pMgmt->hashLock);
46,684,953✔
209
  int32_t r = taosHashGetDup(pMgmt->runngingHash, &vgId, sizeof(int32_t), (void *)&pVnode);
46,748,844✔
210
  if (pVnode == NULL || strict && (pVnode->dropped || pVnode->failed)) {
46,741,275!
211
    terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
588,659✔
212
    pVnode = NULL;
593,380✔
213
  } else {
214
    int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
46,152,616✔
215
    dTrace("vgId:%d, acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
46,150,452✔
216
  }
217
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
46,743,838✔
218

219
  return pVnode;
46,748,461✔
220
}
221

222
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) { return vmAcquireVnodeImpl(pMgmt, vgId, true); }
46,668,749✔
223

224
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
46,361,036✔
225
  if (pVnode == NULL) return;
46,361,036!
226

227
  //(void)taosThreadRwlockRdlock(&pMgmt->lock);
228
  int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
46,361,036✔
229
  dTrace("vgId:%d, release vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
46,377,256✔
230
  //(void)taosThreadRwlockUnlock(&pMgmt->lock);
231
}
232

233
static int32_t vmRegisterRunningState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
11,907✔
234
  SVnodeObj *pOld = NULL;
11,907✔
235

236
  int32_t r = taosHashGetDup(pMgmt->runngingHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
11,907✔
237
  if (r != 0) {
11,907!
238
    dError("vgId:%d, failed to get vnode from hash", pVnode->vgId);
×
239
  }
240
  if (pOld) {
11,907!
241
    vmFreeVnodeObj(&pOld);
×
242
  }
243
  int32_t code = taosHashPut(pMgmt->runngingHash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
11,907✔
244

245
  return code;
11,907✔
246
}
247

248
static void vmUnRegisterRunningState(SVnodeMgmt *pMgmt, int32_t vgId) {
11,907✔
249
  dInfo("vgId:%d, remove from hash", vgId);
11,907!
250
  int32_t r = taosHashRemove(pMgmt->runngingHash, &vgId, sizeof(int32_t));
11,907✔
251
  if (r != 0) {
11,907!
252
    dError("vgId:%d, failed to remove vnode since %s", vgId, tstrerror(r));
×
253
  }
254
}
11,907✔
255

256
static int32_t vmRegisterClosedState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
1,229✔
257
  int32_t    code = 0;
1,229✔
258
  SVnodeObj *pClosedVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
1,229!
259
  if (pClosedVnode == NULL) {
1,229!
260
    dError("failed to alloc vnode since %s", terrstr());
×
261
    return terrno;
×
262
  }
263
  (void)memset(pClosedVnode, 0, sizeof(SVnodeObj));
1,229✔
264

265
  pClosedVnode->vgId = pVnode->vgId;
1,229✔
266
  pClosedVnode->dropped = pVnode->dropped;
1,229✔
267
  pClosedVnode->vgVersion = pVnode->vgVersion;
1,229✔
268
  pClosedVnode->diskPrimary = pVnode->diskPrimary;
1,229✔
269
  pClosedVnode->toVgId = pVnode->toVgId;
1,229✔
270

271
  SVnodeObj *pOld = NULL;
1,229✔
272
  int32_t    r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
1,229✔
273
  if (r != 0) {
1,229!
274
    dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
×
275
  }
276
  if (pOld) {
1,229!
277
    vmFreeVnodeObj(&pOld);
×
278
  }
279
  dInfo("vgId:%d, put vnode to closedHash", pVnode->vgId);
1,229!
280
  r = taosHashPut(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), &pClosedVnode, sizeof(SVnodeObj *));
1,229✔
281
  if (r != 0) {
1,229!
282
    dError("vgId:%d, failed to put vnode to closedHash", pVnode->vgId);
×
283
  }
284

285
  return code;
1,229✔
286
}
287

288
static void vmUnRegisterClosedState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
11,907✔
289
  SVnodeObj *pOld = NULL;
11,907✔
290
  int32_t    r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
11,907✔
291
  if (r != 0) {
11,907!
292
    dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
×
293
  }
294
  if (pOld != NULL) {
11,907✔
295
    vmFreeVnodeObj(&pOld);
1,229✔
296
    dInfo("vgId:%d, remove from closedHash", pVnode->vgId);
1,229!
297
    r = taosHashRemove(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t));
1,229✔
298
    if (r != 0) {
1,229!
299
      dError("vgId:%d, failed to remove vnode from hash", pVnode->vgId);
×
300
    }
301
  }
302
}
11,907✔
303

304
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
11,907✔
305
  SVnodeObj *pVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
11,907!
306
  if (pVnode == NULL) {
11,907!
307
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
308
    return -1;
×
309
  }
310

311
  pVnode->vgId = pCfg->vgId;
11,907✔
312
  pVnode->vgVersion = pCfg->vgVersion;
11,907✔
313
  pVnode->diskPrimary = pCfg->diskPrimary;
11,907✔
314
  pVnode->refCount = 0;
11,907✔
315
  pVnode->dropped = 0;
11,907✔
316
  pVnode->failed = 0;
11,907✔
317
  pVnode->path = taosStrdup(pCfg->path);
11,907!
318
  pVnode->pImpl = pImpl;
11,907✔
319

320
  if (pVnode->path == NULL) {
11,907!
321
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
322
    taosMemoryFree(pVnode);
×
323
    return -1;
×
324
  }
325

326
  if (pImpl) {
11,907!
327
    if (vmAllocQueue(pMgmt, pVnode) != 0) {
11,907!
328
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
329
      taosMemoryFree(pVnode->path);
×
330
      taosMemoryFree(pVnode);
×
331
      return -1;
×
332
    }
333
  } else {
334
    pVnode->failed = 1;
×
335
  }
336

337
  (void)taosThreadRwlockWrlock(&pMgmt->hashLock);
11,907✔
338
  int32_t code = vmRegisterRunningState(pMgmt, pVnode);
11,907✔
339
  vmUnRegisterClosedState(pMgmt, pVnode);
11,907✔
340
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
11,907✔
341

342
  return code;
11,907✔
343
}
344

345
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal, bool keepClosed) {
11,907✔
346
  char path[TSDB_FILENAME_LEN] = {0};
11,907✔
347
  bool atExit = true;
11,907✔
348

349
  if (pVnode->pImpl && vnodeIsLeader(pVnode->pImpl)) {
11,907✔
350
    vnodeProposeCommitOnNeed(pVnode->pImpl, atExit);
9,966✔
351
  }
352

353
  (void)taosThreadRwlockWrlock(&pMgmt->hashLock);
11,901✔
354
  vmUnRegisterRunningState(pMgmt, pVnode->vgId);
11,907✔
355
  if (keepClosed) {
11,907✔
356
    if (vmRegisterClosedState(pMgmt, pVnode) != 0) {
1,229!
357
      (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
×
358
      return;
×
359
    };
360
  }
361
  (void)taosThreadRwlockUnlock(&pMgmt->hashLock);
11,907✔
362

363
  vmReleaseVnode(pMgmt, pVnode);
11,907✔
364

365
  if (pVnode->failed) {
11,907!
366
    goto _closed;
×
367
  }
368
  dInfo("vgId:%d, pre close", pVnode->vgId);
11,907!
369
  vnodePreClose(pVnode->pImpl);
11,907✔
370

371
  dInfo("vgId:%d, wait for vnode ref become 0", pVnode->vgId);
11,907!
372
  while (pVnode->refCount > 0) taosMsleep(10);
11,907!
373

374
  dInfo("vgId:%d, wait for vnode write queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteW.queue,
11,907!
375
        taosQueueGetThreadId(pVnode->pWriteW.queue));
376
  tMultiWorkerCleanup(&pVnode->pWriteW);
11,907✔
377

378
  dInfo("vgId:%d, wait for vnode sync queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue,
11,907!
379
        taosQueueGetThreadId(pVnode->pSyncW.queue));
380
  tMultiWorkerCleanup(&pVnode->pSyncW);
11,907✔
381

382
  dInfo("vgId:%d, wait for vnode sync rd queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncRdW.queue,
11,907!
383
        taosQueueGetThreadId(pVnode->pSyncRdW.queue));
384
  tMultiWorkerCleanup(&pVnode->pSyncRdW);
11,907✔
385

386
  dInfo("vgId:%d, wait for vnode apply queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue,
11,907!
387
        taosQueueGetThreadId(pVnode->pApplyW.queue));
388
  tMultiWorkerCleanup(&pVnode->pApplyW);
11,907✔
389

390
  dInfo("vgId:%d, wait for vnode fetch queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
11,907!
391
        taosQueueGetThreadId(pVnode->pFetchQ));
392
  while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
11,907!
393

394
  dInfo("vgId:%d, wait for vnode query queue:%p is empty", pVnode->vgId, pVnode->pQueryQ);
11,907!
395
  while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
11,907!
396

397
  tqNotifyClose(pVnode->pImpl->pTq);
11,907✔
398

399
  dInfo("vgId:%d, wait for vnode stream queue:%p is empty, %d remains", pVnode->vgId,
11,906!
400
        pVnode->pStreamQ, taosQueueItemSize(pVnode->pStreamQ));
401
  while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(50);
11,907!
402

403
  dInfo("vgId:%d, wait for vnode stream ctrl queue:%p is empty", pVnode->vgId, pVnode->pStreamCtrlQ);
11,907!
404
  while (!taosQueueEmpty(pVnode->pStreamCtrlQ)) taosMsleep(50);
11,907!
405

406
  dInfo("vgId:%d, wait for vnode stream long-exec queue:%p is empty, %d remains", pVnode->vgId,
11,907!
407
        pVnode->pStreamLongExecQ, taosQueueItemSize(pVnode->pStreamLongExecQ));
408
  while (!taosQueueEmpty(pVnode->pStreamLongExecQ)) taosMsleep(50);
11,907!
409

410
  dInfo("vgId:%d, wait for vnode stream chkpt queue:%p is empty", pVnode->vgId, pVnode->pStreamChkQ);
11,907!
411
  while (!taosQueueEmpty(pVnode->pStreamChkQ)) taosMsleep(10);
11,907!
412

413
  dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId);
11,907!
414

415
  dInfo("vgId:%d, post close", pVnode->vgId);
11,907!
416
  vnodePostClose(pVnode->pImpl);
11,907✔
417

418
  vmFreeQueue(pMgmt, pVnode);
11,901✔
419

420
  if (commitAndRemoveWal) {
11,906✔
421
    dInfo("vgId:%d, commit data for vnode split", pVnode->vgId);
28!
422
    if (vnodeSyncCommit(pVnode->pImpl) != 0) {
28!
423
      dError("vgId:%d, failed to commit data", pVnode->vgId);
×
424
    }
425
    if (vnodeBegin(pVnode->pImpl) != 0) {
28!
426
      dError("vgId:%d, failed to begin", pVnode->vgId);
×
427
    }
428
    dInfo("vgId:%d, commit data finished", pVnode->vgId);
28!
429
  }
430

431
  int32_t nodeId = vnodeNodeId(pVnode->pImpl);
11,906✔
432
  vnodeClose(pVnode->pImpl);
11,907✔
433
  pVnode->pImpl = NULL;
11,907✔
434

435
_closed:
11,907✔
436
  dInfo("vgId:%d, vnode is closed", pVnode->vgId);
11,907!
437

438
  if (commitAndRemoveWal) {
11,907✔
439
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d%swal", TD_DIRSEP, pVnode->vgId, TD_DIRSEP);
28✔
440
    dInfo("vgId:%d, remove all wals, path:%s", pVnode->vgId, path);
28!
441
    if (tfsRmdir(pMgmt->pTfs, path) != 0) {
28!
442
      dTrace("vgId:%d, failed to remove wals, path:%s", pVnode->vgId, path);
×
443
    }
444
    if (tfsMkdir(pMgmt->pTfs, path) != 0) {
28!
445
      dTrace("vgId:%d, failed to create wals, path:%s", pVnode->vgId, path);
×
446
    }
447
  }
448

449
  if (pVnode->dropped) {
11,907✔
450
    dInfo("vgId:%d, vnode is destroyed, dropped:%d", pVnode->vgId, pVnode->dropped);
4,632!
451
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId);
4,632✔
452
    vnodeDestroy(pVnode->vgId, path, pMgmt->pTfs, nodeId);
4,632✔
453
  }
454

455
  vmFreeVnodeObj(&pVnode);
11,907✔
456
}
457

458
void vmCloseFailedVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
×
459
  int32_t r = 0;
×
460
  r = taosThreadRwlockWrlock(&pMgmt->hashLock);
×
461
  if (r != 0) {
×
462
    dError("vgId:%d, failed to lock since %s", vgId, tstrerror(r));
×
463
  }
464
  if (r == 0) {
×
465
    vmUnRegisterRunningState(pMgmt, vgId);
×
466
  }
467
  r = taosThreadRwlockUnlock(&pMgmt->hashLock);
×
468
  if (r != 0) {
×
469
    dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r));
×
470
  }
471
}
×
472

473
static int32_t vmRestoreVgroupId(SWrapperCfg *pCfg, STfs *pTfs) {
×
474
  int32_t srcVgId = pCfg->vgId;
×
475
  int32_t dstVgId = pCfg->toVgId;
×
476
  if (dstVgId == 0) return 0;
×
477

478
  char srcPath[TSDB_FILENAME_LEN];
479
  char dstPath[TSDB_FILENAME_LEN];
480

481
  snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
×
482
  snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
×
483

484
  int32_t diskPrimary = pCfg->diskPrimary;
×
485
  int32_t vgId = vnodeRestoreVgroupId(srcPath, dstPath, srcVgId, dstVgId, diskPrimary, pTfs);
×
486
  if (vgId <= 0) {
×
487
    dError("vgId:%d, failed to restore vgroup id. srcPath: %s", pCfg->vgId, srcPath);
×
488
    return -1;
×
489
  }
490

491
  pCfg->vgId = vgId;
×
492
  pCfg->toVgId = 0;
×
493
  return 0;
×
494
}
495

496
static void *vmOpenVnodeInThread(void *param) {
965✔
497
  SVnodeThread *pThread = param;
965✔
498
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
965✔
499
  char          path[TSDB_FILENAME_LEN];
500

501
  dInfo("thread:%d, start to open or destroy %d vnodes", pThread->threadIndex, pThread->vnodeNum);
965!
502
  setThreadName("open-vnodes");
966✔
503

504
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
1,933✔
505
    SWrapperCfg *pCfg = &pThread->pCfgs[v];
967✔
506
    if (pCfg->dropped) {
967!
507
      char stepDesc[TSDB_STEP_DESC_LEN] = {0};
×
508
      snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to destroy, %d of %d have been dropped", pCfg->vgId,
×
509
               pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
510
      tmsgReportStartup("vnode-destroy", stepDesc);
×
511

512
      snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
×
513
      vnodeDestroy(pCfg->vgId, path, pMgmt->pTfs, 0);
×
514
      pThread->updateVnodesList = true;
×
515
      pThread->dropped++;
×
516
      (void)atomic_add_fetch_32(&pMgmt->state.dropVnodes, 1);
×
517
      continue;
×
518
    }
519

520
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
967✔
521
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
967✔
522
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
523
    tmsgReportStartup("vnode-open", stepDesc);
967✔
524

525
    if (pCfg->toVgId) {
967!
526
      if (vmRestoreVgroupId(pCfg, pMgmt->pTfs) != 0) {
×
527
        dError("vgId:%d, failed to restore vgroup id by thread:%d", pCfg->vgId, pThread->threadIndex);
×
528
        pThread->failed++;
×
529
        continue;
×
530
      }
531
      pThread->updateVnodesList = true;
×
532
    }
533

534
    int32_t diskPrimary = pCfg->diskPrimary;
967✔
535
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
967✔
536

537
    SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, false);
967✔
538

539
    if (pImpl == NULL) {
968!
540
      dError("vgId:%d, failed to open vnode by thread:%d since %s", pCfg->vgId, pThread->threadIndex, terrstr());
×
541
      if (terrno != TSDB_CODE_NEED_RETRY) {
×
542
        pThread->failed++;
×
543
        continue;
×
544
      }
545
    }
546

547
    if (vmOpenVnode(pMgmt, pCfg, pImpl) != 0) {
968!
548
      dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
×
549
      pThread->failed++;
×
550
      continue;
×
551
    }
552

553
    dInfo("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
968!
554
    pThread->opened++;
968✔
555
    (void)atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
968✔
556
  }
557

558
  dInfo("thread:%d, numOfVnodes:%d, opened:%d dropped:%d failed:%d", pThread->threadIndex, pThread->vnodeNum,
966!
559
        pThread->opened, pThread->dropped, pThread->failed);
560
  return NULL;
966✔
561
}
562

563
static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
2,125✔
564
  pMgmt->runngingHash =
2,125✔
565
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
2,125✔
566
  if (pMgmt->runngingHash == NULL) {
2,125!
567
    dError("failed to init vnode hash since %s", terrstr());
×
568
    return TSDB_CODE_OUT_OF_MEMORY;
×
569
  }
570

571
  pMgmt->closedHash =
2,125✔
572
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
2,125✔
573
  if (pMgmt->closedHash == NULL) {
2,125!
574
    dError("failed to init vnode closed hash since %s", terrstr());
×
575
    return TSDB_CODE_OUT_OF_MEMORY;
×
576
  }
577

578
  pMgmt->creatingHash =
2,125✔
579
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
2,125✔
580
  if (pMgmt->creatingHash == NULL) {
2,125!
581
    dError("failed to init vnode creatingHash hash since %s", terrstr());
×
582
    return TSDB_CODE_OUT_OF_MEMORY;
×
583
  }
584

585
  SWrapperCfg *pCfgs = NULL;
2,125✔
586
  int32_t      numOfVnodes = 0;
2,125✔
587
  int32_t      code = 0;
2,125✔
588
  if ((code = vmGetVnodeListFromFile(pMgmt, &pCfgs, &numOfVnodes)) != 0) {
2,125!
589
    dInfo("failed to get vnode list from disk since %s", tstrerror(code));
×
590
    return code;
×
591
  }
592

593
  pMgmt->state.totalVnodes = numOfVnodes;
2,125✔
594

595
  int32_t threadNum = tsNumOfCores / 2;
2,125✔
596
  if (threadNum < 1) threadNum = 1;
2,125!
597
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
2,125✔
598

599
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
2,125!
600
  if (threads == NULL) {
2,125!
601
    dError("failed to allocate memory for threads since %s", terrstr());
×
602
    taosMemoryFree(pCfgs);
×
603
    return terrno;
×
604
  }
605

606
  for (int32_t t = 0; t < threadNum; ++t) {
44,625✔
607
    threads[t].threadIndex = t;
42,500✔
608
    threads[t].pMgmt = pMgmt;
42,500✔
609
    threads[t].pCfgs = taosMemoryCalloc(vnodesPerThread, sizeof(SWrapperCfg));
42,500!
610
  }
611

612
  for (int32_t v = 0; v < numOfVnodes; ++v) {
3,093✔
613
    int32_t       t = v % threadNum;
968✔
614
    SVnodeThread *pThread = &threads[t];
968✔
615
    pThread->pCfgs[pThread->vnodeNum++] = pCfgs[v];
968✔
616
  }
617

618
  dInfo("open %d vnodes with %d threads", numOfVnodes, threadNum);
2,125!
619

620
  for (int32_t t = 0; t < threadNum; ++t) {
44,625✔
621
    SVnodeThread *pThread = &threads[t];
42,500✔
622
    if (pThread->vnodeNum == 0) continue;
42,500✔
623

624
    TdThreadAttr thAttr;
625
    (void)taosThreadAttrInit(&thAttr);
966✔
626
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
966✔
627
#ifdef TD_COMPACT_OS
628
    (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
629
#endif
630
    if (taosThreadCreate(&pThread->thread, &thAttr, vmOpenVnodeInThread, pThread) != 0) {
966!
631
      dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(ERRNO));
×
632
    }
633

634
    (void)taosThreadAttrDestroy(&thAttr);
966✔
635
  }
636

637
  bool updateVnodesList = false;
2,125✔
638

639
  for (int32_t t = 0; t < threadNum; ++t) {
44,625✔
640
    SVnodeThread *pThread = &threads[t];
42,500✔
641
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
42,500!
642
      (void)taosThreadJoin(pThread->thread, NULL);
966✔
643
      taosThreadClear(&pThread->thread);
966✔
644
    }
645
    taosMemoryFree(pThread->pCfgs);
42,500!
646
    if (pThread->updateVnodesList) updateVnodesList = true;
42,500!
647
  }
648
  taosMemoryFree(threads);
2,125!
649
  taosMemoryFree(pCfgs);
2,125!
650

651
  if ((pMgmt->state.openVnodes + pMgmt->state.dropVnodes) != pMgmt->state.totalVnodes) {
2,125!
652
    dError("there are total vnodes:%d, opened:%d", pMgmt->state.totalVnodes, pMgmt->state.openVnodes);
×
653
    return terrno = TSDB_CODE_VND_INIT_FAILED;
×
654
  }
655

656
  if (updateVnodesList && (code = vmWriteVnodeListToFile(pMgmt)) != 0) {
2,125!
657
    dError("failed to write vnode list since %s", tstrerror(code));
×
658
    return code;
×
659
  }
660

661
  dInfo("successfully opened %d vnodes", pMgmt->state.totalVnodes);
2,125!
662
  return 0;
2,125✔
663
}
664

665
static void *vmCloseVnodeInThread(void *param) {
5,955✔
666
  SVnodeThread *pThread = param;
5,955✔
667
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
5,955✔
668

669
  dInfo("thread:%d, start to close %d vnodes", pThread->threadIndex, pThread->vnodeNum);
5,955!
670
  setThreadName("close-vnodes");
5,956✔
671

672
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
11,975✔
673
    SVnodeObj *pVnode = pThread->ppVnodes[v];
6,018✔
674

675
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
6,018✔
676
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to close, %d of %d have been closed", pVnode->vgId,
6,018✔
677
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
678
    tmsgReportStartup("vnode-close", stepDesc);
6,018✔
679

680
    vmCloseVnode(pMgmt, pVnode, false, false);
6,017✔
681
  }
682

683
  dInfo("thread:%d, numOfVnodes:%d is closed", pThread->threadIndex, pThread->vnodeNum);
5,957!
684
  return NULL;
5,957✔
685
}
686

687
static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
2,125✔
688
  int32_t code = 0;
2,125✔
689
  dInfo("start to close all vnodes");
2,125!
690
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
2,125✔
691
  dInfo("vnodes mgmt worker is stopped");
2,125!
692
  tSingleWorkerCleanup(&pMgmt->mgmtMultiWorker);
2,125✔
693
  dInfo("vnodes multiple mgmt worker is stopped");
2,125!
694

695
  int32_t     numOfVnodes = 0;
2,125✔
696
  SVnodeObj **ppVnodes = NULL;
2,125✔
697
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
2,125✔
698
  if (code != 0) {
2,125!
699
    dError("failed to get vnode list since %s", tstrerror(code));
×
700
    return;
×
701
  }
702

703
  int32_t threadNum = tsNumOfCores / 2;
2,125✔
704
  if (threadNum < 1) threadNum = 1;
2,125!
705
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
2,125✔
706

707
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
2,125!
708
  for (int32_t t = 0; t < threadNum; ++t) {
44,625✔
709
    threads[t].threadIndex = t;
42,500✔
710
    threads[t].pMgmt = pMgmt;
42,500✔
711
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
42,500!
712
  }
713

714
  for (int32_t v = 0; v < numOfVnodes; ++v) {
8,143✔
715
    int32_t       t = v % threadNum;
6,018✔
716
    SVnodeThread *pThread = &threads[t];
6,018✔
717
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
6,018!
718
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
6,018✔
719
    }
720
  }
721

722
  pMgmt->state.openVnodes = 0;
2,125✔
723
  dInfo("close %d vnodes with %d threads", numOfVnodes, threadNum);
2,125!
724

725
  int64_t st = taosGetTimestampMs();
2,125✔
726
  dInfo("notify all streams closed in all %d vnodes, ts:%" PRId64, numOfVnodes, st);
2,125!
727
  if (ppVnodes != NULL) {
2,125!
728
    for (int32_t i = 0; i < numOfVnodes; ++i) {
8,143✔
729
      if (ppVnodes[i] != NULL) {
6,018!
730
        if (ppVnodes[i]->pImpl != NULL) {
6,018!
731
          tqNotifyClose(ppVnodes[i]->pImpl->pTq);
6,018✔
732
        }
733
      }
734
    }
735
  }
736

737
  int64_t et = taosGetTimestampMs();
2,125✔
738
  dInfo("notify close stream completed in %d vnodes, elapsed time: %" PRId64 "ms", numOfVnodes, et - st);
2,125!
739

740
  for (int32_t t = 0; t < threadNum; ++t) {
44,625✔
741
    SVnodeThread *pThread = &threads[t];
42,500✔
742
    if (pThread->vnodeNum == 0) continue;
42,500✔
743

744
    TdThreadAttr thAttr;
745
    (void)taosThreadAttrInit(&thAttr);
5,957✔
746
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
5,957✔
747
#ifdef TD_COMPACT_OS
748
    (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
749
#endif
750
    if (taosThreadCreate(&pThread->thread, &thAttr, vmCloseVnodeInThread, pThread) != 0) {
5,957!
751
      dError("thread:%d, failed to create thread to close vnode since %s", pThread->threadIndex, strerror(ERRNO));
×
752
    }
753

754
    (void)taosThreadAttrDestroy(&thAttr);
5,957✔
755
  }
756

757
  for (int32_t t = 0; t < threadNum; ++t) {
44,625✔
758
    SVnodeThread *pThread = &threads[t];
42,500✔
759
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
42,500!
760
      (void)taosThreadJoin(pThread->thread, NULL);
5,957✔
761
      taosThreadClear(&pThread->thread);
5,957✔
762
    }
763
    taosMemoryFree(pThread->ppVnodes);
42,500!
764
  }
765
  taosMemoryFree(threads);
2,125!
766

767
  if (ppVnodes != NULL) {
2,125!
768
    taosMemoryFree(ppVnodes);
2,125!
769
  }
770

771
  if (pMgmt->runngingHash != NULL) {
2,125!
772
    taosHashCleanup(pMgmt->runngingHash);
2,125✔
773
    pMgmt->runngingHash = NULL;
2,125✔
774
  }
775

776
  void *pIter = taosHashIterate(pMgmt->closedHash, NULL);
2,125✔
777
  while (pIter) {
2,125!
778
    SVnodeObj **ppVnode = pIter;
×
779
    vmFreeVnodeObj(ppVnode);
×
780
    pIter = taosHashIterate(pMgmt->closedHash, pIter);
×
781
  }
782

783
  if (pMgmt->closedHash != NULL) {
2,125!
784
    taosHashCleanup(pMgmt->closedHash);
2,125✔
785
    pMgmt->closedHash = NULL;
2,125✔
786
  }
787

788
  pIter = taosHashIterate(pMgmt->creatingHash, NULL);
2,125✔
789
  while (pIter) {
2,125!
790
    SVnodeObj **ppVnode = pIter;
×
791
    vmFreeVnodeObj(ppVnode);
×
792
    pIter = taosHashIterate(pMgmt->creatingHash, pIter);
×
793
  }
794

795
  if (pMgmt->creatingHash != NULL) {
2,125!
796
    taosHashCleanup(pMgmt->creatingHash);
2,125✔
797
    pMgmt->creatingHash = NULL;
2,125✔
798
  }
799

800
  dInfo("total vnodes:%d are all closed", numOfVnodes);
2,125!
801
}
802

803
static void vmCleanup(SVnodeMgmt *pMgmt) {
2,125✔
804
  vmCloseVnodes(pMgmt);
2,125✔
805
  vmStopWorker(pMgmt);
2,125✔
806
  vnodeCleanup();
2,125✔
807
  (void)taosThreadRwlockDestroy(&pMgmt->hashLock);
2,125✔
808
  (void)taosThreadMutexDestroy(&pMgmt->mutex);
2,125✔
809
  (void)taosThreadMutexDestroy(&pMgmt->fileLock);
2,125✔
810
  taosMemoryFree(pMgmt);
2,125!
811
}
2,125✔
812

813
static void vmCheckSyncTimeout(SVnodeMgmt *pMgmt) {
2,979✔
814
  int32_t     code = 0;
2,979✔
815
  int32_t     numOfVnodes = 0;
2,979✔
816
  SVnodeObj **ppVnodes = NULL;
2,979✔
817
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
2,979✔
818
  if (code != 0) {
2,979!
819
    dError("failed to get vnode list since %s", tstrerror(code));
×
820
    return;
×
821
  }
822

823
  if (ppVnodes != NULL) {
2,979!
824
    for (int32_t i = 0; i < numOfVnodes; ++i) {
40,291✔
825
      SVnodeObj *pVnode = ppVnodes[i];
37,312✔
826
      if (!pVnode->failed) {
37,312!
827
        vnodeSyncCheckTimeout(pVnode->pImpl);
37,312✔
828
      }
829
      vmReleaseVnode(pMgmt, pVnode);
37,312✔
830
    }
831
    taosMemoryFree(ppVnodes);
2,979!
832
  }
833
}
834

835
static void *vmThreadFp(void *param) {
2,125✔
836
  SVnodeMgmt *pMgmt = param;
2,125✔
837
  int64_t     lastTime = 0;
2,125✔
838
  setThreadName("vnode-timer");
2,125✔
839

840
  while (1) {
1,122,561✔
841
    lastTime++;
1,124,686✔
842
    taosMsleep(100);
1,124,686✔
843
    if (pMgmt->stop) break;
1,124,686✔
844
    if (lastTime % 10 != 0) continue;
1,122,561✔
845

846
    int64_t sec = lastTime / 10;
111,326✔
847
    if (sec % (VNODE_TIMEOUT_SEC / 2) == 0) {
111,326✔
848
      vmCheckSyncTimeout(pMgmt);
2,979✔
849
    }
850
  }
851

852
  return NULL;
2,125✔
853
}
854

855
static int32_t vmInitTimer(SVnodeMgmt *pMgmt) {
2,125✔
856
  int32_t      code = 0;
2,125✔
857
  TdThreadAttr thAttr;
858
  (void)taosThreadAttrInit(&thAttr);
2,125✔
859
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
2,125✔
860
#ifdef TD_COMPACT_OS
861
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
862
#endif
863
  if (taosThreadCreate(&pMgmt->thread, &thAttr, vmThreadFp, pMgmt) != 0) {
2,125!
864
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
865
    dError("failed to create vnode timer thread since %s", tstrerror(code));
×
866
    return code;
×
867
  }
868

869
  (void)taosThreadAttrDestroy(&thAttr);
2,125✔
870
  return 0;
2,125✔
871
}
872

873
static void vmCleanupTimer(SVnodeMgmt *pMgmt) {
2,125✔
874
  pMgmt->stop = true;
2,125✔
875
  if (taosCheckPthreadValid(pMgmt->thread)) {
2,125!
876
    (void)taosThreadJoin(pMgmt->thread, NULL);
2,125✔
877
    taosThreadClear(&pMgmt->thread);
2,125✔
878
  }
879
}
2,125✔
880

881
static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
2,125✔
882
  int32_t code = -1;
2,125✔
883

884
  SVnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SVnodeMgmt));
2,125!
885
  if (pMgmt == NULL) {
2,125!
886
    code = terrno;
×
887
    goto _OVER;
×
888
  }
889

890
  pMgmt->pData = pInput->pData;
2,125✔
891
  pMgmt->path = pInput->path;
2,125✔
892
  pMgmt->name = pInput->name;
2,125✔
893
  pMgmt->msgCb = pInput->msgCb;
2,125✔
894
  pMgmt->msgCb.putToQueueFp = (PutToQueueFp)vmPutRpcMsgToQueue;
2,125✔
895
  pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
2,125✔
896
  pMgmt->msgCb.mgmt = pMgmt;
2,125✔
897

898
  code = taosThreadRwlockInit(&pMgmt->hashLock, NULL);
2,125✔
899
  if (code != 0) {
2,125!
900
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
901
    goto _OVER;
×
902
  }
903

904
  code = taosThreadMutexInit(&pMgmt->mutex, NULL);
2,125✔
905
  if (code != 0) {
2,125!
906
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
907
    goto _OVER;
×
908
  }
909

910
  code = taosThreadMutexInit(&pMgmt->fileLock, NULL);
2,125✔
911
  if (code != 0) {
2,125!
912
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
913
    goto _OVER;
×
914
  }
915

916
  pMgmt->pTfs = pInput->pTfs;
2,125✔
917
  if (pMgmt->pTfs == NULL) {
2,125!
918
    dError("tfs is null.");
×
919
    goto _OVER;
×
920
  }
921
  tmsgReportStartup("vnode-tfs", "initialized");
2,125✔
922
  if ((code = walInit(pInput->stopDnodeFp)) != 0) {
2,125!
923
    dError("failed to init wal since %s", tstrerror(code));
×
924
    goto _OVER;
×
925
  }
926

927
  tmsgReportStartup("vnode-wal", "initialized");
2,125✔
928

929
  if ((code = syncInit()) != 0) {
2,125!
930
    dError("failed to open sync since %s", tstrerror(code));
×
931
    goto _OVER;
×
932
  }
933
  tmsgReportStartup("vnode-sync", "initialized");
2,125✔
934

935
  if ((code = vnodeInit(pInput->stopDnodeFp)) != 0) {
2,125!
936
    dError("failed to init vnode since %s", tstrerror(code));
×
937
    goto _OVER;
×
938
  }
939
  tmsgReportStartup("vnode-commit", "initialized");
2,125✔
940

941
  if ((code = vmStartWorker(pMgmt)) != 0) {
2,125!
942
    dError("failed to init workers since %s", tstrerror(code));
×
943
    goto _OVER;
×
944
  }
945
  tmsgReportStartup("vnode-worker", "initialized");
2,125✔
946

947
  if ((code = vmOpenVnodes(pMgmt)) != 0) {
2,125!
948
    dError("failed to open all vnodes since %s", tstrerror(code));
×
949
    goto _OVER;
×
950
  }
951
  tmsgReportStartup("vnode-vnodes", "initialized");
2,125✔
952

953
  if ((code = udfcOpen()) != 0) {
2,125!
954
    dError("failed to open udfc in vnode since %s", tstrerror(code));
×
955
    goto _OVER;
×
956
  }
957

958
  code = 0;
2,125✔
959

960
_OVER:
2,125✔
961
  if (code == 0) {
2,125!
962
    pOutput->pMgmt = pMgmt;
2,125✔
963
  } else {
964
    dError("failed to init vnodes-mgmt since %s", tstrerror(code));
×
965
    vmCleanup(pMgmt);
×
966
  }
967

968
  return code;
2,125✔
969
}
970

971
static int32_t vmRequire(const SMgmtInputOpt *pInput, bool *required) {
2,161✔
972
  *required = tsNumOfSupportVnodes > 0;
2,161✔
973
  return 0;
2,161✔
974
}
975

976
static void *vmRestoreVnodeInThread(void *param) {
966✔
977
  SVnodeThread *pThread = param;
966✔
978
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
966✔
979

980
  dInfo("thread:%d, start to restore %d vnodes", pThread->threadIndex, pThread->vnodeNum);
966!
981
  setThreadName("restore-vnodes");
966✔
982

983
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
1,934✔
984
    SVnodeObj *pVnode = pThread->ppVnodes[v];
968✔
985
    if (pVnode->failed) {
968!
986
      dError("vgId:%d, cannot restore a vnode in failed mode.", pVnode->vgId);
×
987
      continue;
×
988
    }
989

990
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
968✔
991
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been restored", pVnode->vgId,
968✔
992
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
993
    tmsgReportStartup("vnode-restore", stepDesc);
968✔
994

995
    int32_t code = vnodeStart(pVnode->pImpl);
968✔
996
    if (code != 0) {
968!
997
      dError("vgId:%d, failed to restore vnode by thread:%d", pVnode->vgId, pThread->threadIndex);
×
998
      pThread->failed++;
×
999
    } else {
1000
      dInfo("vgId:%d, is restored by thread:%d", pVnode->vgId, pThread->threadIndex);
968!
1001
      pThread->opened++;
968✔
1002
      (void)atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
968✔
1003
    }
1004
  }
1005

1006
  dInfo("thread:%d, numOfVnodes:%d, restored:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
966!
1007
        pThread->failed);
1008
  return NULL;
966✔
1009
}
1010

1011
static int32_t vmStartVnodes(SVnodeMgmt *pMgmt) {
2,125✔
1012
  int32_t     code = 0;
2,125✔
1013
  int32_t     numOfVnodes = 0;
2,125✔
1014
  SVnodeObj **ppVnodes = NULL;
2,125✔
1015
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
2,125✔
1016
  if (code != 0) {
2,125!
1017
    dError("failed to get vnode list since %s", tstrerror(code));
×
1018
    return code;
×
1019
  }
1020

1021
  int32_t threadNum = tsNumOfCores / 2;
2,125✔
1022
  if (threadNum < 1) threadNum = 1;
2,125!
1023
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
2,125✔
1024

1025
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
2,125!
1026
  if (threads == NULL) {
2,125!
1027
    return terrno;
×
1028
  }
1029

1030
  for (int32_t t = 0; t < threadNum; ++t) {
44,625✔
1031
    threads[t].threadIndex = t;
42,500✔
1032
    threads[t].pMgmt = pMgmt;
42,500✔
1033
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
42,500!
1034
    if (threads[t].ppVnodes == NULL) {
42,500!
1035
      code = terrno;
×
1036
      break;
×
1037
    }
1038
  }
1039

1040
  for (int32_t v = 0; v < numOfVnodes; ++v) {
3,093✔
1041
    int32_t       t = v % threadNum;
968✔
1042
    SVnodeThread *pThread = &threads[t];
968✔
1043
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
968!
1044
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
968✔
1045
    }
1046
  }
1047

1048
  pMgmt->state.openVnodes = 0;
2,125✔
1049
  pMgmt->state.dropVnodes = 0;
2,125✔
1050
  dInfo("restore %d vnodes with %d threads", numOfVnodes, threadNum);
2,125!
1051

1052
  for (int32_t t = 0; t < threadNum; ++t) {
44,625✔
1053
    SVnodeThread *pThread = &threads[t];
42,500✔
1054
    if (pThread->vnodeNum == 0) continue;
42,500✔
1055

1056
    TdThreadAttr thAttr;
1057
    (void)taosThreadAttrInit(&thAttr);
966✔
1058
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
966✔
1059
    if (taosThreadCreate(&pThread->thread, &thAttr, vmRestoreVnodeInThread, pThread) != 0) {
966!
1060
      dError("thread:%d, failed to create thread to restore vnode since %s", pThread->threadIndex, strerror(ERRNO));
×
1061
    }
1062

1063
    (void)taosThreadAttrDestroy(&thAttr);
966✔
1064
  }
1065

1066
  for (int32_t t = 0; t < threadNum; ++t) {
44,625✔
1067
    SVnodeThread *pThread = &threads[t];
42,500✔
1068
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
42,500!
1069
      (void)taosThreadJoin(pThread->thread, NULL);
966✔
1070
      taosThreadClear(&pThread->thread);
966✔
1071
    }
1072
    taosMemoryFree(pThread->ppVnodes);
42,500!
1073
  }
1074
  taosMemoryFree(threads);
2,125!
1075

1076
  for (int32_t i = 0; i < numOfVnodes; ++i) {
3,093✔
1077
    if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
968!
1078
    vmReleaseVnode(pMgmt, ppVnodes[i]);
968✔
1079
  }
1080

1081
  if (ppVnodes != NULL) {
2,125!
1082
    taosMemoryFree(ppVnodes);
2,125!
1083
  }
1084

1085
  return vmInitTimer(pMgmt);
2,125✔
1086

1087
_exit:
1088
  for (int32_t t = 0; t < threadNum; ++t) {
1089
    SVnodeThread *pThread = &threads[t];
1090
    taosMemoryFree(pThread->ppVnodes);
1091
  }
1092
  taosMemoryFree(threads);
1093
  return code;
1094
}
1095

1096
static void vmStop(SVnodeMgmt *pMgmt) { vmCleanupTimer(pMgmt); }
2,125✔
1097

1098
SMgmtFunc vmGetMgmtFunc() {
2,161✔
1099
  SMgmtFunc mgmtFunc = {0};
2,161✔
1100
  mgmtFunc.openFp = vmInit;
2,161✔
1101
  mgmtFunc.closeFp = (NodeCloseFp)vmCleanup;
2,161✔
1102
  mgmtFunc.startFp = (NodeStartFp)vmStartVnodes;
2,161✔
1103
  mgmtFunc.stopFp = (NodeStopFp)vmStop;
2,161✔
1104
  mgmtFunc.requiredFp = vmRequire;
2,161✔
1105
  mgmtFunc.getHandlesFp = vmGetMsgHandles;
2,161✔
1106

1107
  return mgmtFunc;
2,161✔
1108
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc