• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3621

22 Feb 2025 11:44AM UTC coverage: 2.037% (-61.5%) from 63.573%
#3621

push

travis-ci

web-flow
Merge pull request #29874 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

4357 of 287032 branches covered (1.52%)

Branch coverage included in aggregate %.

0 of 174 new or added lines in 18 files covered. (0.0%)

213359 existing lines in 469 files now uncovered.

7260 of 283369 relevant lines covered (2.56%)

23737.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/mgmt/mgmt_vnode/src/vmInt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "vmInt.h"
18
#include "libs/function/tudf.h"
19
#include "osMemory.h"
20
#include "tfs.h"
21
#include "vnd.h"
22

UNCOV
23
int32_t vmGetPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
×
UNCOV
24
  int32_t    diskId = -1;
×
UNCOV
25
  SVnodeObj *pVnode = NULL;
×
26

UNCOV
27
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
×
UNCOV
28
  int32_t r = taosHashGetDup(pMgmt->runngingHash, &vgId, sizeof(int32_t), (void *)&pVnode);
×
UNCOV
29
  if (pVnode != NULL) {
×
30
    diskId = pVnode->diskPrimary;
×
31
  }
UNCOV
32
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
×
UNCOV
33
  return diskId;
×
34
}
35

UNCOV
36
static void vmFreeVnodeObj(SVnodeObj **ppVnode) {
×
UNCOV
37
  if (!ppVnode || !(*ppVnode)) return;
×
38

UNCOV
39
  SVnodeObj *pVnode = *ppVnode;
×
40

UNCOV
41
  int32_t refCount = atomic_load_32(&pVnode->refCount);
×
UNCOV
42
  while (refCount > 0) {
×
43
    dWarn("vgId:%d, vnode is refenced, retry to free in 200ms, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
×
44
    taosMsleep(200);
×
45
    refCount = atomic_load_32(&pVnode->refCount);
×
46
  }
47

UNCOV
48
  taosMemoryFree(pVnode->path);
×
UNCOV
49
  taosMemoryFree(pVnode);
×
UNCOV
50
  ppVnode[0] = NULL;
×
51
}
52

UNCOV
53
static int32_t vmRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId, int32_t diskId) {
×
UNCOV
54
  int32_t    code = 0;
×
UNCOV
55
  SVnodeObj *pCreatingVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
×
UNCOV
56
  if (pCreatingVnode == NULL) {
×
57
    dError("failed to alloc vnode since %s", terrstr());
×
58
    return terrno;
×
59
  }
UNCOV
60
  (void)memset(pCreatingVnode, 0, sizeof(SVnodeObj));
×
61

UNCOV
62
  pCreatingVnode->vgId = vgId;
×
UNCOV
63
  pCreatingVnode->diskPrimary = diskId;
×
64

UNCOV
65
  code = taosThreadRwlockWrlock(&pMgmt->lock);
×
UNCOV
66
  if (code != 0) {
×
67
    taosMemoryFree(pCreatingVnode);
×
68
    return code;
×
69
  }
70

UNCOV
71
  dTrace("vgId:%d, put vnode into creating hash, pCreatingVnode:%p", vgId, pCreatingVnode);
×
UNCOV
72
  code = taosHashPut(pMgmt->creatingHash, &vgId, sizeof(int32_t), &pCreatingVnode, sizeof(SVnodeObj *));
×
UNCOV
73
  if (code != 0) {
×
74
    dError("vgId:%d, failed to put vnode to creatingHash", vgId);
×
75
    taosMemoryFree(pCreatingVnode);
×
76
  }
77

UNCOV
78
  int32_t r = taosThreadRwlockUnlock(&pMgmt->lock);
×
UNCOV
79
  if (r != 0) {
×
80
    dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r));
×
81
  }
82

UNCOV
83
  return code;
×
84
}
85

UNCOV
86
static void vmUnRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId) {
×
UNCOV
87
  SVnodeObj *pOld = NULL;
×
88

UNCOV
89
  (void)taosThreadRwlockWrlock(&pMgmt->lock);
×
UNCOV
90
  int32_t r = taosHashGetDup(pMgmt->creatingHash, &vgId, sizeof(int32_t), (void *)&pOld);
×
UNCOV
91
  if (r != 0) {
×
92
    dError("vgId:%d, failed to get vnode from creating Hash", vgId);
×
93
  }
UNCOV
94
  dTrace("vgId:%d, remove from creating Hash", vgId);
×
UNCOV
95
  r = taosHashRemove(pMgmt->creatingHash, &vgId, sizeof(int32_t));
×
UNCOV
96
  if (r != 0) {
×
97
    dError("vgId:%d, failed to remove vnode from creatingHash", vgId);
×
98
  }
UNCOV
99
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
×
100

UNCOV
101
  if (pOld) {
×
UNCOV
102
    dTrace("vgId:%d, free vnode pOld:%p", vgId, &pOld);
×
UNCOV
103
    vmFreeVnodeObj(&pOld);
×
104
  }
105

106
_OVER:
×
UNCOV
107
  if (r != 0) {
×
108
    dError("vgId:%d, failed to remove vnode from creatingHash since %s", vgId, tstrerror(r));
×
109
  }
UNCOV
110
}
×
111

UNCOV
112
int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
×
UNCOV
113
  int32_t code = 0;
×
UNCOV
114
  STfs   *pTfs = pMgmt->pTfs;
×
UNCOV
115
  int32_t diskId = 0;
×
UNCOV
116
  if (!pTfs) {
×
117
    return diskId;
×
118
  }
119

120
  // search fs
UNCOV
121
  char vnodePath[TSDB_FILENAME_LEN] = {0};
×
UNCOV
122
  snprintf(vnodePath, TSDB_FILENAME_LEN - 1, "vnode%svnode%d", TD_DIRSEP, vgId);
×
UNCOV
123
  char fname[TSDB_FILENAME_LEN] = {0};
×
UNCOV
124
  char fnameTmp[TSDB_FILENAME_LEN] = {0};
×
UNCOV
125
  snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME);
×
UNCOV
126
  snprintf(fnameTmp, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME_TMP);
×
127

UNCOV
128
  diskId = tfsSearch(pTfs, 0, fname);
×
UNCOV
129
  if (diskId >= 0) {
×
130
    return diskId;
×
131
  }
UNCOV
132
  diskId = tfsSearch(pTfs, 0, fnameTmp);
×
UNCOV
133
  if (diskId >= 0) {
×
134
    return diskId;
×
135
  }
136

137
  // alloc
UNCOV
138
  int32_t     disks[TFS_MAX_DISKS_PER_TIER] = {0};
×
UNCOV
139
  int32_t     numOfVnodes = 0;
×
UNCOV
140
  SVnodeObj **ppVnodes = NULL;
×
141

UNCOV
142
  code = taosThreadMutexLock(&pMgmt->mutex);
×
UNCOV
143
  if (code != 0) {
×
144
    return code;
×
145
  }
146

UNCOV
147
  code = vmGetAllVnodeListFromHashWithCreating(pMgmt, &numOfVnodes, &ppVnodes);
×
UNCOV
148
  if (code != 0) {
×
149
    int32_t r = taosThreadMutexUnlock(&pMgmt->mutex);
×
150
    if (r != 0) {
×
151
      dError("vgId:%d, failed to unlock mutex since %s", vgId, tstrerror(r));
×
152
    }
153
    return code;
×
154
  }
155

UNCOV
156
  for (int32_t v = 0; v < numOfVnodes; v++) {
×
UNCOV
157
    SVnodeObj *pVnode = ppVnodes[v];
×
UNCOV
158
    disks[pVnode->diskPrimary] += 1;
×
159
  }
160

UNCOV
161
  int32_t minVal = INT_MAX;
×
UNCOV
162
  int32_t ndisk = tfsGetDisksAtLevel(pTfs, 0);
×
UNCOV
163
  diskId = 0;
×
UNCOV
164
  for (int32_t id = 0; id < ndisk; id++) {
×
UNCOV
165
    if (minVal > disks[id]) {
×
UNCOV
166
      minVal = disks[id];
×
UNCOV
167
      diskId = id;
×
168
    }
169
  }
UNCOV
170
  code = vmRegisterCreatingState(pMgmt, vgId, diskId);
×
UNCOV
171
  if (code != 0) {
×
172
    int32_t r = taosThreadMutexUnlock(&pMgmt->mutex);
×
173
    if (r != 0) {
×
174
      dError("vgId:%d, failed to unlock mutex since %s", vgId, tstrerror(r));
×
175
    }
176
    goto _OVER;
×
177
  }
178

UNCOV
179
  code = taosThreadMutexUnlock(&pMgmt->mutex);
×
UNCOV
180
  if (code != 0) {
×
181
    goto _OVER;
×
182
  }
183

UNCOV
184
_OVER:
×
185

UNCOV
186
  for (int32_t i = 0; i < numOfVnodes; ++i) {
×
UNCOV
187
    if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
×
UNCOV
188
    vmReleaseVnode(pMgmt, ppVnodes[i]);
×
189
  }
UNCOV
190
  if (ppVnodes != NULL) {
×
UNCOV
191
    taosMemoryFree(ppVnodes);
×
192
  }
193

UNCOV
194
  if (code != 0) {
×
195
    dError("vgId:%d, failed to alloc disk since %s", vgId, tstrerror(code));
×
196
    return code;
×
197
  } else {
UNCOV
198
    dInfo("vgId:%d, alloc disk:%d of level 0. ndisk:%d, vnodes: %d", vgId, diskId, ndisk, numOfVnodes);
×
UNCOV
199
    return diskId;
×
200
  }
201
}
202

UNCOV
203
void vmCleanPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) { vmUnRegisterCreatingState(pMgmt, vgId); }
×
204

UNCOV
205
SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict) {
×
UNCOV
206
  SVnodeObj *pVnode = NULL;
×
207

UNCOV
208
  (void)taosThreadRwlockRdlock(&pMgmt->lock);
×
UNCOV
209
  int32_t r = taosHashGetDup(pMgmt->runngingHash, &vgId, sizeof(int32_t), (void *)&pVnode);
×
UNCOV
210
  if (pVnode == NULL || strict && (pVnode->dropped || pVnode->failed)) {
×
UNCOV
211
    terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
×
UNCOV
212
    pVnode = NULL;
×
213
  } else {
UNCOV
214
    int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
×
UNCOV
215
    dTrace("vgId:%d, acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
×
216
  }
UNCOV
217
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
×
218

UNCOV
219
  return pVnode;
×
220
}
221

UNCOV
222
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) { return vmAcquireVnodeImpl(pMgmt, vgId, true); }
×
223

UNCOV
224
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
×
UNCOV
225
  if (pVnode == NULL) return;
×
226

227
  //(void)taosThreadRwlockRdlock(&pMgmt->lock);
UNCOV
228
  int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
×
UNCOV
229
  dTrace("vgId:%d, release vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
×
230
  //(void)taosThreadRwlockUnlock(&pMgmt->lock);
231
}
232

UNCOV
233
static int32_t vmRegisterRunningState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
×
UNCOV
234
  SVnodeObj *pOld = NULL;
×
235

UNCOV
236
  int32_t r = taosHashGetDup(pMgmt->runngingHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
×
UNCOV
237
  if (r != 0) {
×
238
    dError("vgId:%d, failed to get vnode from hash", pVnode->vgId);
×
239
  }
UNCOV
240
  if (pOld) {
×
241
    vmFreeVnodeObj(&pOld);
×
242
  }
UNCOV
243
  int32_t code = taosHashPut(pMgmt->runngingHash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
×
244

UNCOV
245
  return code;
×
246
}
247

UNCOV
248
static void vmUnRegisterRunningState(SVnodeMgmt *pMgmt, int32_t vgId) {
×
UNCOV
249
  dInfo("vgId:%d, remove from hash", vgId);
×
UNCOV
250
  int32_t r = taosHashRemove(pMgmt->runngingHash, &vgId, sizeof(int32_t));
×
UNCOV
251
  if (r != 0) {
×
252
    dError("vgId:%d, failed to remove vnode since %s", vgId, tstrerror(r));
×
253
  }
UNCOV
254
}
×
255

UNCOV
256
static int32_t vmRegisterClosedState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
×
UNCOV
257
  int32_t    code = 0;
×
UNCOV
258
  SVnodeObj *pClosedVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
×
UNCOV
259
  if (pClosedVnode == NULL) {
×
260
    dError("failed to alloc vnode since %s", terrstr());
×
261
    return terrno;
×
262
  }
UNCOV
263
  (void)memset(pClosedVnode, 0, sizeof(SVnodeObj));
×
264

UNCOV
265
  pClosedVnode->vgId = pVnode->vgId;
×
UNCOV
266
  pClosedVnode->dropped = pVnode->dropped;
×
UNCOV
267
  pClosedVnode->vgVersion = pVnode->vgVersion;
×
UNCOV
268
  pClosedVnode->diskPrimary = pVnode->diskPrimary;
×
UNCOV
269
  pClosedVnode->toVgId = pVnode->toVgId;
×
270

UNCOV
271
  SVnodeObj *pOld = NULL;
×
UNCOV
272
  int32_t    r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
×
UNCOV
273
  if (r != 0) {
×
274
    dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
×
275
  }
UNCOV
276
  if (pOld) {
×
277
    vmFreeVnodeObj(&pOld);
×
278
  }
UNCOV
279
  dInfo("vgId:%d, put vnode to closedHash", pVnode->vgId);
×
UNCOV
280
  r = taosHashPut(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), &pClosedVnode, sizeof(SVnodeObj *));
×
UNCOV
281
  if (r != 0) {
×
282
    dError("vgId:%d, failed to put vnode to closedHash", pVnode->vgId);
×
283
  }
284

UNCOV
285
  return code;
×
286
}
287

UNCOV
288
static void vmUnRegisterClosedState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
×
UNCOV
289
  SVnodeObj *pOld = NULL;
×
UNCOV
290
  int32_t    r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
×
UNCOV
291
  if (r != 0) {
×
292
    dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
×
293
  }
UNCOV
294
  if (pOld != NULL) {
×
UNCOV
295
    vmFreeVnodeObj(&pOld);
×
UNCOV
296
    dInfo("vgId:%d, remove from closedHash", pVnode->vgId);
×
UNCOV
297
    r = taosHashRemove(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t));
×
UNCOV
298
    if (r != 0) {
×
299
      dError("vgId:%d, failed to remove vnode from hash", pVnode->vgId);
×
300
    }
301
  }
UNCOV
302
}
×
303

UNCOV
304
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
×
UNCOV
305
  SVnodeObj *pVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
×
UNCOV
306
  if (pVnode == NULL) {
×
307
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
308
    return -1;
×
309
  }
310

UNCOV
311
  pVnode->vgId = pCfg->vgId;
×
UNCOV
312
  pVnode->vgVersion = pCfg->vgVersion;
×
UNCOV
313
  pVnode->diskPrimary = pCfg->diskPrimary;
×
UNCOV
314
  pVnode->refCount = 0;
×
UNCOV
315
  pVnode->dropped = 0;
×
UNCOV
316
  pVnode->failed = 0;
×
UNCOV
317
  pVnode->path = taosStrdup(pCfg->path);
×
UNCOV
318
  pVnode->pImpl = pImpl;
×
319

UNCOV
320
  if (pVnode->path == NULL) {
×
321
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
322
    taosMemoryFree(pVnode);
×
323
    return -1;
×
324
  }
325

UNCOV
326
  if (pImpl) {
×
UNCOV
327
    if (vmAllocQueue(pMgmt, pVnode) != 0) {
×
328
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
329
      taosMemoryFree(pVnode->path);
×
330
      taosMemoryFree(pVnode);
×
331
      return -1;
×
332
    }
333
  } else {
334
    pVnode->failed = 1;
×
335
  }
336

UNCOV
337
  (void)taosThreadRwlockWrlock(&pMgmt->lock);
×
UNCOV
338
  int32_t code = vmRegisterRunningState(pMgmt, pVnode);
×
UNCOV
339
  vmUnRegisterClosedState(pMgmt, pVnode);
×
UNCOV
340
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
×
341

UNCOV
342
  return code;
×
343
}
344

UNCOV
345
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal, bool keepClosed) {
×
UNCOV
346
  char path[TSDB_FILENAME_LEN] = {0};
×
UNCOV
347
  bool atExit = true;
×
348

UNCOV
349
  if (pVnode->pImpl && vnodeIsLeader(pVnode->pImpl)) {
×
UNCOV
350
    vnodeProposeCommitOnNeed(pVnode->pImpl, atExit);
×
351
  }
352

UNCOV
353
  (void)taosThreadRwlockWrlock(&pMgmt->lock);
×
UNCOV
354
  vmUnRegisterRunningState(pMgmt, pVnode->vgId);
×
UNCOV
355
  if (keepClosed) {
×
UNCOV
356
    if (vmRegisterClosedState(pMgmt, pVnode) != 0) {
×
357
      (void)taosThreadRwlockUnlock(&pMgmt->lock);
×
358
      return;
×
359
    };
360
  }
UNCOV
361
  (void)taosThreadRwlockUnlock(&pMgmt->lock);
×
362

UNCOV
363
  vmReleaseVnode(pMgmt, pVnode);
×
364

UNCOV
365
  if (pVnode->failed) {
×
366
    goto _closed;
×
367
  }
UNCOV
368
  dInfo("vgId:%d, pre close", pVnode->vgId);
×
UNCOV
369
  vnodePreClose(pVnode->pImpl);
×
370

UNCOV
371
  dInfo("vgId:%d, wait for vnode ref become 0", pVnode->vgId);
×
UNCOV
372
  while (pVnode->refCount > 0) taosMsleep(10);
×
373

UNCOV
374
  dInfo("vgId:%d, wait for vnode write queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteW.queue,
×
375
        taosQueueGetThreadId(pVnode->pWriteW.queue));
UNCOV
376
  tMultiWorkerCleanup(&pVnode->pWriteW);
×
377

UNCOV
378
  dInfo("vgId:%d, wait for vnode sync queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue,
×
379
        taosQueueGetThreadId(pVnode->pSyncW.queue));
UNCOV
380
  tMultiWorkerCleanup(&pVnode->pSyncW);
×
381

UNCOV
382
  dInfo("vgId:%d, wait for vnode sync rd queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncRdW.queue,
×
383
        taosQueueGetThreadId(pVnode->pSyncRdW.queue));
UNCOV
384
  tMultiWorkerCleanup(&pVnode->pSyncRdW);
×
385

UNCOV
386
  dInfo("vgId:%d, wait for vnode apply queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue,
×
387
        taosQueueGetThreadId(pVnode->pApplyW.queue));
UNCOV
388
  tMultiWorkerCleanup(&pVnode->pApplyW);
×
389

UNCOV
390
  dInfo("vgId:%d, wait for vnode fetch queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
×
391
        taosQueueGetThreadId(pVnode->pFetchQ));
UNCOV
392
  while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
×
393

UNCOV
394
  dInfo("vgId:%d, wait for vnode query queue:%p is empty", pVnode->vgId, pVnode->pQueryQ);
×
UNCOV
395
  while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
×
396

UNCOV
397
  tqNotifyClose(pVnode->pImpl->pTq);
×
398

UNCOV
399
  dInfo("vgId:%d, wait for vnode stream queue:%p is empty, %d remains", pVnode->vgId,
×
400
        pVnode->pStreamQ, taosQueueItemSize(pVnode->pStreamQ));
NEW
401
  while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(50);
×
402

UNCOV
403
  dInfo("vgId:%d, wait for vnode stream ctrl queue:%p is empty", pVnode->vgId, pVnode->pStreamCtrlQ);
×
NEW
404
  while (!taosQueueEmpty(pVnode->pStreamCtrlQ)) taosMsleep(50);
×
405

NEW
406
  dInfo("vgId:%d, wait for vnode stream long-exec queue:%p is empty, %d remains", pVnode->vgId,
×
407
        pVnode->pStreamLongExecQ, taosQueueItemSize(pVnode->pStreamLongExecQ));
NEW
408
  while (!taosQueueEmpty(pVnode->pStreamLongExecQ)) taosMsleep(50);
×
409

UNCOV
410
  dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId);
×
411

UNCOV
412
  dInfo("vgId:%d, post close", pVnode->vgId);
×
UNCOV
413
  vnodePostClose(pVnode->pImpl);
×
414

UNCOV
415
  vmFreeQueue(pMgmt, pVnode);
×
416

UNCOV
417
  if (commitAndRemoveWal) {
×
UNCOV
418
    dInfo("vgId:%d, commit data for vnode split", pVnode->vgId);
×
UNCOV
419
    if (vnodeSyncCommit(pVnode->pImpl) != 0) {
×
420
      dError("vgId:%d, failed to commit data", pVnode->vgId);
×
421
    }
UNCOV
422
    if (vnodeBegin(pVnode->pImpl) != 0) {
×
423
      dError("vgId:%d, failed to begin", pVnode->vgId);
×
424
    }
UNCOV
425
    dInfo("vgId:%d, commit data finished", pVnode->vgId);
×
426
  }
427

UNCOV
428
  int32_t nodeId = vnodeNodeId(pVnode->pImpl);
×
UNCOV
429
  vnodeClose(pVnode->pImpl);
×
UNCOV
430
  pVnode->pImpl = NULL;
×
431

UNCOV
432
_closed:
×
UNCOV
433
  dInfo("vgId:%d, vnode is closed", pVnode->vgId);
×
434

UNCOV
435
  if (commitAndRemoveWal) {
×
UNCOV
436
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d%swal", TD_DIRSEP, pVnode->vgId, TD_DIRSEP);
×
UNCOV
437
    dInfo("vgId:%d, remove all wals, path:%s", pVnode->vgId, path);
×
UNCOV
438
    if (tfsRmdir(pMgmt->pTfs, path) != 0) {
×
439
      dTrace("vgId:%d, failed to remove wals, path:%s", pVnode->vgId, path);
×
440
    }
UNCOV
441
    if (tfsMkdir(pMgmt->pTfs, path) != 0) {
×
442
      dTrace("vgId:%d, failed to create wals, path:%s", pVnode->vgId, path);
×
443
    }
444
  }
445

UNCOV
446
  if (pVnode->dropped) {
×
UNCOV
447
    dInfo("vgId:%d, vnode is destroyed, dropped:%d", pVnode->vgId, pVnode->dropped);
×
UNCOV
448
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId);
×
UNCOV
449
    vnodeDestroy(pVnode->vgId, path, pMgmt->pTfs, nodeId);
×
450
  }
451

UNCOV
452
  vmFreeVnodeObj(&pVnode);
×
453
}
454

455
void vmCloseFailedVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
×
456
  int32_t r = 0;
×
457
  r = taosThreadRwlockWrlock(&pMgmt->lock);
×
458
  if (r != 0) {
×
459
    dError("vgId:%d, failed to lock since %s", vgId, tstrerror(r));
×
460
  }
461
  if (r == 0) {
×
462
    vmUnRegisterRunningState(pMgmt, vgId);
×
463
  }
464
  r = taosThreadRwlockUnlock(&pMgmt->lock);
×
465
  if (r != 0) {
×
466
    dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r));
×
467
  }
468
}
×
469

470
static int32_t vmRestoreVgroupId(SWrapperCfg *pCfg, STfs *pTfs) {
×
471
  int32_t srcVgId = pCfg->vgId;
×
472
  int32_t dstVgId = pCfg->toVgId;
×
473
  if (dstVgId == 0) return 0;
×
474

475
  char srcPath[TSDB_FILENAME_LEN];
476
  char dstPath[TSDB_FILENAME_LEN];
477

478
  snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
×
479
  snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
×
480

481
  int32_t diskPrimary = pCfg->diskPrimary;
×
482
  int32_t vgId = vnodeRestoreVgroupId(srcPath, dstPath, srcVgId, dstVgId, diskPrimary, pTfs);
×
483
  if (vgId <= 0) {
×
484
    dError("vgId:%d, failed to restore vgroup id. srcPath: %s", pCfg->vgId, srcPath);
×
485
    return -1;
×
486
  }
487

488
  pCfg->vgId = vgId;
×
489
  pCfg->toVgId = 0;
×
490
  return 0;
×
491
}
492

UNCOV
493
static void *vmOpenVnodeInThread(void *param) {
×
UNCOV
494
  SVnodeThread *pThread = param;
×
UNCOV
495
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
×
496
  char          path[TSDB_FILENAME_LEN];
497

UNCOV
498
  dInfo("thread:%d, start to open or destroy %d vnodes", pThread->threadIndex, pThread->vnodeNum);
×
UNCOV
499
  setThreadName("open-vnodes");
×
500

UNCOV
501
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
×
UNCOV
502
    SWrapperCfg *pCfg = &pThread->pCfgs[v];
×
UNCOV
503
    if (pCfg->dropped) {
×
504
      char stepDesc[TSDB_STEP_DESC_LEN] = {0};
×
505
      snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to destroy, %d of %d have been dropped", pCfg->vgId,
×
506
               pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
507
      tmsgReportStartup("vnode-destroy", stepDesc);
×
508

509
      snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
×
510
      vnodeDestroy(pCfg->vgId, path, pMgmt->pTfs, 0);
×
511
      pThread->updateVnodesList = true;
×
512
      pThread->dropped++;
×
513
      (void)atomic_add_fetch_32(&pMgmt->state.dropVnodes, 1);
×
514
      continue;
×
515
    }
516

UNCOV
517
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
×
UNCOV
518
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
×
519
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
UNCOV
520
    tmsgReportStartup("vnode-open", stepDesc);
×
521

UNCOV
522
    if (pCfg->toVgId) {
×
523
      if (vmRestoreVgroupId(pCfg, pMgmt->pTfs) != 0) {
×
524
        dError("vgId:%d, failed to restore vgroup id by thread:%d", pCfg->vgId, pThread->threadIndex);
×
525
        pThread->failed++;
×
526
        continue;
×
527
      }
528
      pThread->updateVnodesList = true;
×
529
    }
530

UNCOV
531
    int32_t diskPrimary = pCfg->diskPrimary;
×
UNCOV
532
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
×
533

UNCOV
534
    SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, false);
×
535

UNCOV
536
    if (pImpl == NULL) {
×
537
      dError("vgId:%d, failed to open vnode by thread:%d since %s", pCfg->vgId, pThread->threadIndex, terrstr());
×
538
      if (terrno != TSDB_CODE_NEED_RETRY) {
×
539
        pThread->failed++;
×
540
        continue;
×
541
      }
542
    }
543

UNCOV
544
    if (vmOpenVnode(pMgmt, pCfg, pImpl) != 0) {
×
545
      dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
×
546
      pThread->failed++;
×
547
      continue;
×
548
    }
549

UNCOV
550
    dInfo("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
×
UNCOV
551
    pThread->opened++;
×
UNCOV
552
    (void)atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
×
553
  }
554

UNCOV
555
  dInfo("thread:%d, numOfVnodes:%d, opened:%d dropped:%d failed:%d", pThread->threadIndex, pThread->vnodeNum,
×
556
        pThread->opened, pThread->dropped, pThread->failed);
UNCOV
557
  return NULL;
×
558
}
559

UNCOV
560
static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
×
UNCOV
561
  pMgmt->runngingHash =
×
UNCOV
562
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
×
UNCOV
563
  if (pMgmt->runngingHash == NULL) {
×
564
    dError("failed to init vnode hash since %s", terrstr());
×
565
    return TSDB_CODE_OUT_OF_MEMORY;
×
566
  }
567

UNCOV
568
  pMgmt->closedHash =
×
UNCOV
569
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
×
UNCOV
570
  if (pMgmt->closedHash == NULL) {
×
571
    dError("failed to init vnode closed hash since %s", terrstr());
×
572
    return TSDB_CODE_OUT_OF_MEMORY;
×
573
  }
574

UNCOV
575
  pMgmt->creatingHash =
×
UNCOV
576
      taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
×
UNCOV
577
  if (pMgmt->creatingHash == NULL) {
×
578
    dError("failed to init vnode creatingHash hash since %s", terrstr());
×
579
    return TSDB_CODE_OUT_OF_MEMORY;
×
580
  }
581

UNCOV
582
  SWrapperCfg *pCfgs = NULL;
×
UNCOV
583
  int32_t      numOfVnodes = 0;
×
UNCOV
584
  if (vmGetVnodeListFromFile(pMgmt, &pCfgs, &numOfVnodes) != 0) {
×
585
    dInfo("failed to get vnode list from disk since %s", terrstr());
×
586
    return -1;
×
587
  }
588

UNCOV
589
  pMgmt->state.totalVnodes = numOfVnodes;
×
590

UNCOV
591
  int32_t threadNum = tsNumOfCores / 2;
×
UNCOV
592
  if (threadNum < 1) threadNum = 1;
×
UNCOV
593
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
×
594

UNCOV
595
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
×
UNCOV
596
  if (threads == NULL) {
×
597
    dError("failed to allocate memory for threads since %s", terrstr());
×
598
    taosMemoryFree(pCfgs);
×
599
    return terrno;
×
600
  }
601

UNCOV
602
  for (int32_t t = 0; t < threadNum; ++t) {
×
UNCOV
603
    threads[t].threadIndex = t;
×
UNCOV
604
    threads[t].pMgmt = pMgmt;
×
UNCOV
605
    threads[t].pCfgs = taosMemoryCalloc(vnodesPerThread, sizeof(SWrapperCfg));
×
606
  }
607

UNCOV
608
  for (int32_t v = 0; v < numOfVnodes; ++v) {
×
UNCOV
609
    int32_t       t = v % threadNum;
×
UNCOV
610
    SVnodeThread *pThread = &threads[t];
×
UNCOV
611
    pThread->pCfgs[pThread->vnodeNum++] = pCfgs[v];
×
612
  }
613

UNCOV
614
  dInfo("open %d vnodes with %d threads", numOfVnodes, threadNum);
×
615

UNCOV
616
  for (int32_t t = 0; t < threadNum; ++t) {
×
UNCOV
617
    SVnodeThread *pThread = &threads[t];
×
UNCOV
618
    if (pThread->vnodeNum == 0) continue;
×
619

620
    TdThreadAttr thAttr;
UNCOV
621
    (void)taosThreadAttrInit(&thAttr);
×
UNCOV
622
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
×
UNCOV
623
    if (taosThreadCreate(&pThread->thread, &thAttr, vmOpenVnodeInThread, pThread) != 0) {
×
624
      dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
×
625
    }
626

UNCOV
627
    (void)taosThreadAttrDestroy(&thAttr);
×
628
  }
629

UNCOV
630
  bool updateVnodesList = false;
×
631

UNCOV
632
  for (int32_t t = 0; t < threadNum; ++t) {
×
UNCOV
633
    SVnodeThread *pThread = &threads[t];
×
UNCOV
634
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
×
UNCOV
635
      (void)taosThreadJoin(pThread->thread, NULL);
×
UNCOV
636
      taosThreadClear(&pThread->thread);
×
637
    }
UNCOV
638
    taosMemoryFree(pThread->pCfgs);
×
UNCOV
639
    if (pThread->updateVnodesList) updateVnodesList = true;
×
640
  }
UNCOV
641
  taosMemoryFree(threads);
×
UNCOV
642
  taosMemoryFree(pCfgs);
×
643

UNCOV
644
  if ((pMgmt->state.openVnodes + pMgmt->state.dropVnodes) != pMgmt->state.totalVnodes) {
×
645
    dError("there are total vnodes:%d, opened:%d", pMgmt->state.totalVnodes, pMgmt->state.openVnodes);
×
646
    terrno = TSDB_CODE_VND_INIT_FAILED;
×
647
    return -1;
×
648
  }
649

UNCOV
650
  if (updateVnodesList && vmWriteVnodeListToFile(pMgmt) != 0) {
×
651
    dError("failed to write vnode list since %s", terrstr());
×
652
    return -1;
×
653
  }
654

UNCOV
655
  dInfo("successfully opened %d vnodes", pMgmt->state.totalVnodes);
×
UNCOV
656
  return 0;
×
657
}
658

UNCOV
659
static void *vmCloseVnodeInThread(void *param) {
×
UNCOV
660
  SVnodeThread *pThread = param;
×
UNCOV
661
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
×
662

UNCOV
663
  dInfo("thread:%d, start to close %d vnodes", pThread->threadIndex, pThread->vnodeNum);
×
UNCOV
664
  setThreadName("close-vnodes");
×
665

UNCOV
666
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
×
UNCOV
667
    SVnodeObj *pVnode = pThread->ppVnodes[v];
×
668

UNCOV
669
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
×
UNCOV
670
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to close, %d of %d have been closed", pVnode->vgId,
×
671
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
UNCOV
672
    tmsgReportStartup("vnode-close", stepDesc);
×
673

UNCOV
674
    vmCloseVnode(pMgmt, pVnode, false, false);
×
675
  }
676

UNCOV
677
  dInfo("thread:%d, numOfVnodes:%d is closed", pThread->threadIndex, pThread->vnodeNum);
×
UNCOV
678
  return NULL;
×
679
}
680

UNCOV
681
static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
×
UNCOV
682
  int32_t code = 0;
×
UNCOV
683
  dInfo("start to close all vnodes");
×
UNCOV
684
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
×
UNCOV
685
  dInfo("vnodes mgmt worker is stopped");
×
UNCOV
686
  tSingleWorkerCleanup(&pMgmt->mgmtMultiWorker);
×
UNCOV
687
  dInfo("vnodes multiple mgmt worker is stopped");
×
688

UNCOV
689
  int32_t     numOfVnodes = 0;
×
UNCOV
690
  SVnodeObj **ppVnodes = NULL;
×
UNCOV
691
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
×
UNCOV
692
  if (code != 0) {
×
693
    dError("failed to get vnode list since %s", tstrerror(code));
×
694
    return;
×
695
  }
696

UNCOV
697
  int32_t threadNum = tsNumOfCores / 2;
×
UNCOV
698
  if (threadNum < 1) threadNum = 1;
×
UNCOV
699
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
×
700

UNCOV
701
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
×
UNCOV
702
  for (int32_t t = 0; t < threadNum; ++t) {
×
UNCOV
703
    threads[t].threadIndex = t;
×
UNCOV
704
    threads[t].pMgmt = pMgmt;
×
UNCOV
705
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
×
706
  }
707

UNCOV
708
  for (int32_t v = 0; v < numOfVnodes; ++v) {
×
UNCOV
709
    int32_t       t = v % threadNum;
×
UNCOV
710
    SVnodeThread *pThread = &threads[t];
×
UNCOV
711
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
×
UNCOV
712
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
×
713
    }
714
  }
715

UNCOV
716
  pMgmt->state.openVnodes = 0;
×
UNCOV
717
  dInfo("close %d vnodes with %d threads", numOfVnodes, threadNum);
×
718

UNCOV
719
  int64_t st = taosGetTimestampMs();
×
UNCOV
720
  dInfo("notify all streams closed in all %d vnodes, ts:%" PRId64, numOfVnodes, st);
×
UNCOV
721
  if (ppVnodes != NULL) {
×
UNCOV
722
    for (int32_t i = 0; i < numOfVnodes; ++i) {
×
UNCOV
723
      if (ppVnodes[i] != NULL) {
×
UNCOV
724
        if (ppVnodes[i]->pImpl != NULL) {
×
UNCOV
725
          tqNotifyClose(ppVnodes[i]->pImpl->pTq);
×
726
        }
727
      }
728
    }
729
  }
730

UNCOV
731
  int64_t et = taosGetTimestampMs();
×
UNCOV
732
  dInfo("notify close stream completed in %d vnodes, elapsed time: %" PRId64 "ms", numOfVnodes, et - st);
×
733

UNCOV
734
  for (int32_t t = 0; t < threadNum; ++t) {
×
UNCOV
735
    SVnodeThread *pThread = &threads[t];
×
UNCOV
736
    if (pThread->vnodeNum == 0) continue;
×
737

738
    TdThreadAttr thAttr;
UNCOV
739
    (void)taosThreadAttrInit(&thAttr);
×
UNCOV
740
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
×
741

UNCOV
742
    if (taosThreadCreate(&pThread->thread, &thAttr, vmCloseVnodeInThread, pThread) != 0) {
×
743
      dError("thread:%d, failed to create thread to close vnode since %s", pThread->threadIndex, strerror(errno));
×
744
    }
745

UNCOV
746
    (void)taosThreadAttrDestroy(&thAttr);
×
747
  }
748

UNCOV
749
  for (int32_t t = 0; t < threadNum; ++t) {
×
UNCOV
750
    SVnodeThread *pThread = &threads[t];
×
UNCOV
751
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
×
UNCOV
752
      (void)taosThreadJoin(pThread->thread, NULL);
×
UNCOV
753
      taosThreadClear(&pThread->thread);
×
754
    }
UNCOV
755
    taosMemoryFree(pThread->ppVnodes);
×
756
  }
UNCOV
757
  taosMemoryFree(threads);
×
758

UNCOV
759
  if (ppVnodes != NULL) {
×
UNCOV
760
    taosMemoryFree(ppVnodes);
×
761
  }
762

UNCOV
763
  if (pMgmt->runngingHash != NULL) {
×
UNCOV
764
    taosHashCleanup(pMgmt->runngingHash);
×
UNCOV
765
    pMgmt->runngingHash = NULL;
×
766
  }
767

UNCOV
768
  void *pIter = taosHashIterate(pMgmt->closedHash, NULL);
×
UNCOV
769
  while (pIter) {
×
770
    SVnodeObj **ppVnode = pIter;
×
771
    vmFreeVnodeObj(ppVnode);
×
772
    pIter = taosHashIterate(pMgmt->closedHash, pIter);
×
773
  }
774

UNCOV
775
  if (pMgmt->closedHash != NULL) {
×
UNCOV
776
    taosHashCleanup(pMgmt->closedHash);
×
UNCOV
777
    pMgmt->closedHash = NULL;
×
778
  }
779

UNCOV
780
  pIter = taosHashIterate(pMgmt->creatingHash, NULL);
×
UNCOV
781
  while (pIter) {
×
782
    SVnodeObj **ppVnode = pIter;
×
783
    vmFreeVnodeObj(ppVnode);
×
784
    pIter = taosHashIterate(pMgmt->creatingHash, pIter);
×
785
  }
786

UNCOV
787
  if (pMgmt->creatingHash != NULL) {
×
UNCOV
788
    taosHashCleanup(pMgmt->creatingHash);
×
UNCOV
789
    pMgmt->creatingHash = NULL;
×
790
  }
791

UNCOV
792
  dInfo("total vnodes:%d are all closed", numOfVnodes);
×
793
}
794

UNCOV
795
static void vmCleanup(SVnodeMgmt *pMgmt) {
×
UNCOV
796
  vmCloseVnodes(pMgmt);
×
UNCOV
797
  vmStopWorker(pMgmt);
×
UNCOV
798
  vnodeCleanup();
×
UNCOV
799
  (void)taosThreadRwlockDestroy(&pMgmt->lock);
×
UNCOV
800
  (void)taosThreadMutexDestroy(&pMgmt->mutex);
×
UNCOV
801
  (void)taosThreadMutexDestroy(&pMgmt->fileLock);
×
UNCOV
802
  taosMemoryFree(pMgmt);
×
UNCOV
803
}
×
804

UNCOV
805
static void vmCheckSyncTimeout(SVnodeMgmt *pMgmt) {
×
UNCOV
806
  int32_t     code = 0;
×
UNCOV
807
  int32_t     numOfVnodes = 0;
×
UNCOV
808
  SVnodeObj **ppVnodes = NULL;
×
UNCOV
809
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
×
UNCOV
810
  if (code != 0) {
×
811
    dError("failed to get vnode list since %s", tstrerror(code));
×
812
    return;
×
813
  }
814

UNCOV
815
  if (ppVnodes != NULL) {
×
UNCOV
816
    for (int32_t i = 0; i < numOfVnodes; ++i) {
×
UNCOV
817
      SVnodeObj *pVnode = ppVnodes[i];
×
UNCOV
818
      if (!pVnode->failed) {
×
UNCOV
819
        vnodeSyncCheckTimeout(pVnode->pImpl);
×
820
      }
UNCOV
821
      vmReleaseVnode(pMgmt, pVnode);
×
822
    }
UNCOV
823
    taosMemoryFree(ppVnodes);
×
824
  }
825
}
826

UNCOV
827
static void *vmThreadFp(void *param) {
×
UNCOV
828
  SVnodeMgmt *pMgmt = param;
×
UNCOV
829
  int64_t     lastTime = 0;
×
UNCOV
830
  setThreadName("vnode-timer");
×
831

UNCOV
832
  while (1) {
×
UNCOV
833
    lastTime++;
×
UNCOV
834
    taosMsleep(100);
×
UNCOV
835
    if (pMgmt->stop) break;
×
UNCOV
836
    if (lastTime % 10 != 0) continue;
×
837

UNCOV
838
    int64_t sec = lastTime / 10;
×
UNCOV
839
    if (sec % (VNODE_TIMEOUT_SEC / 2) == 0) {
×
UNCOV
840
      vmCheckSyncTimeout(pMgmt);
×
841
    }
842
  }
843

UNCOV
844
  return NULL;
×
845
}
846

UNCOV
847
static int32_t vmInitTimer(SVnodeMgmt *pMgmt) {
×
UNCOV
848
  int32_t      code = 0;
×
849
  TdThreadAttr thAttr;
UNCOV
850
  (void)taosThreadAttrInit(&thAttr);
×
UNCOV
851
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
×
UNCOV
852
  if (taosThreadCreate(&pMgmt->thread, &thAttr, vmThreadFp, pMgmt) != 0) {
×
853
    code = TAOS_SYSTEM_ERROR(errno);
×
854
    dError("failed to create vnode timer thread since %s", tstrerror(code));
×
855
    return code;
×
856
  }
857

UNCOV
858
  (void)taosThreadAttrDestroy(&thAttr);
×
UNCOV
859
  return 0;
×
860
}
861

UNCOV
862
static void vmCleanupTimer(SVnodeMgmt *pMgmt) {
×
UNCOV
863
  pMgmt->stop = true;
×
UNCOV
864
  if (taosCheckPthreadValid(pMgmt->thread)) {
×
UNCOV
865
    (void)taosThreadJoin(pMgmt->thread, NULL);
×
UNCOV
866
    taosThreadClear(&pMgmt->thread);
×
867
  }
UNCOV
868
}
×
869

UNCOV
870
static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
×
UNCOV
871
  int32_t code = -1;
×
872

UNCOV
873
  SVnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SVnodeMgmt));
×
UNCOV
874
  if (pMgmt == NULL) {
×
875
    code = terrno;
×
876
    goto _OVER;
×
877
  }
878

UNCOV
879
  pMgmt->pData = pInput->pData;
×
UNCOV
880
  pMgmt->path = pInput->path;
×
UNCOV
881
  pMgmt->name = pInput->name;
×
UNCOV
882
  pMgmt->msgCb = pInput->msgCb;
×
UNCOV
883
  pMgmt->msgCb.putToQueueFp = (PutToQueueFp)vmPutRpcMsgToQueue;
×
UNCOV
884
  pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
×
UNCOV
885
  pMgmt->msgCb.mgmt = pMgmt;
×
886

UNCOV
887
  code = taosThreadRwlockInit(&pMgmt->lock, NULL);
×
UNCOV
888
  if (code != 0) {
×
889
    code = TAOS_SYSTEM_ERROR(errno);
×
890
    goto _OVER;
×
891
  }
892

UNCOV
893
  code = taosThreadMutexInit(&pMgmt->mutex, NULL);
×
UNCOV
894
  if (code != 0) {
×
895
    code = TAOS_SYSTEM_ERROR(errno);
×
896
    goto _OVER;
×
897
  }
898

UNCOV
899
  code = taosThreadMutexInit(&pMgmt->fileLock, NULL);
×
UNCOV
900
  if (code != 0) {
×
901
    code = TAOS_SYSTEM_ERROR(errno);
×
902
    goto _OVER;
×
903
  }
904

UNCOV
905
  pMgmt->pTfs = pInput->pTfs;
×
UNCOV
906
  if (pMgmt->pTfs == NULL) {
×
907
    dError("tfs is null.");
×
908
    goto _OVER;
×
909
  }
UNCOV
910
  tmsgReportStartup("vnode-tfs", "initialized");
×
UNCOV
911
  if ((code = walInit(pInput->stopDnodeFp)) != 0) {
×
912
    dError("failed to init wal since %s", tstrerror(code));
×
913
    goto _OVER;
×
914
  }
915

UNCOV
916
  tmsgReportStartup("vnode-wal", "initialized");
×
917

UNCOV
918
  if ((code = syncInit()) != 0) {
×
919
    dError("failed to open sync since %s", tstrerror(code));
×
920
    goto _OVER;
×
921
  }
UNCOV
922
  tmsgReportStartup("vnode-sync", "initialized");
×
923

UNCOV
924
  if ((code = vnodeInit(pInput->stopDnodeFp)) != 0) {
×
925
    dError("failed to init vnode since %s", tstrerror(code));
×
926
    goto _OVER;
×
927
  }
UNCOV
928
  tmsgReportStartup("vnode-commit", "initialized");
×
929

UNCOV
930
  if ((code = vmStartWorker(pMgmt)) != 0) {
×
931
    dError("failed to init workers since %s", tstrerror(code));
×
932
    goto _OVER;
×
933
  }
UNCOV
934
  tmsgReportStartup("vnode-worker", "initialized");
×
935

UNCOV
936
  if ((code = vmOpenVnodes(pMgmt)) != 0) {
×
937
    dError("failed to open all vnodes since %s", tstrerror(code));
×
938
    goto _OVER;
×
939
  }
UNCOV
940
  tmsgReportStartup("vnode-vnodes", "initialized");
×
941

UNCOV
942
  if ((code = udfcOpen()) != 0) {
×
943
    dError("failed to open udfc in vnode since %s", tstrerror(code));
×
944
    goto _OVER;
×
945
  }
946

UNCOV
947
  code = 0;
×
948

UNCOV
949
_OVER:
×
UNCOV
950
  if (code == 0) {
×
UNCOV
951
    pOutput->pMgmt = pMgmt;
×
952
  } else {
953
    dError("failed to init vnodes-mgmt since %s", tstrerror(code));
×
954
    vmCleanup(pMgmt);
×
955
  }
956

UNCOV
957
  return code;
×
958
}
959

UNCOV
960
static int32_t vmRequire(const SMgmtInputOpt *pInput, bool *required) {
×
UNCOV
961
  *required = tsNumOfSupportVnodes > 0;
×
UNCOV
962
  return 0;
×
963
}
964

UNCOV
965
static void *vmRestoreVnodeInThread(void *param) {
×
UNCOV
966
  SVnodeThread *pThread = param;
×
UNCOV
967
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
×
968

UNCOV
969
  dInfo("thread:%d, start to restore %d vnodes", pThread->threadIndex, pThread->vnodeNum);
×
UNCOV
970
  setThreadName("restore-vnodes");
×
971

UNCOV
972
  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
×
UNCOV
973
    SVnodeObj *pVnode = pThread->ppVnodes[v];
×
UNCOV
974
    if (pVnode->failed) {
×
975
      dError("vgId:%d, cannot restore a vnode in failed mode.", pVnode->vgId);
×
976
      continue;
×
977
    }
978

UNCOV
979
    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
×
UNCOV
980
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been restored", pVnode->vgId,
×
981
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
UNCOV
982
    tmsgReportStartup("vnode-restore", stepDesc);
×
983

UNCOV
984
    int32_t code = vnodeStart(pVnode->pImpl);
×
UNCOV
985
    if (code != 0) {
×
986
      dError("vgId:%d, failed to restore vnode by thread:%d", pVnode->vgId, pThread->threadIndex);
×
987
      pThread->failed++;
×
988
    } else {
UNCOV
989
      dInfo("vgId:%d, is restored by thread:%d", pVnode->vgId, pThread->threadIndex);
×
UNCOV
990
      pThread->opened++;
×
UNCOV
991
      (void)atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
×
992
    }
993
  }
994

UNCOV
995
  dInfo("thread:%d, numOfVnodes:%d, restored:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
×
996
        pThread->failed);
UNCOV
997
  return NULL;
×
998
}
999

UNCOV
1000
static int32_t vmStartVnodes(SVnodeMgmt *pMgmt) {
×
UNCOV
1001
  int32_t     code = 0;
×
UNCOV
1002
  int32_t     numOfVnodes = 0;
×
UNCOV
1003
  SVnodeObj **ppVnodes = NULL;
×
UNCOV
1004
  code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
×
UNCOV
1005
  if (code != 0) {
×
1006
    dError("failed to get vnode list since %s", tstrerror(code));
×
1007
    return code;
×
1008
  }
1009

UNCOV
1010
  int32_t threadNum = tsNumOfCores / 2;
×
UNCOV
1011
  if (threadNum < 1) threadNum = 1;
×
UNCOV
1012
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
×
1013

UNCOV
1014
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
×
UNCOV
1015
  if (threads == NULL) {
×
1016
    return terrno;
×
1017
  }
1018

UNCOV
1019
  for (int32_t t = 0; t < threadNum; ++t) {
×
UNCOV
1020
    threads[t].threadIndex = t;
×
UNCOV
1021
    threads[t].pMgmt = pMgmt;
×
UNCOV
1022
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
×
UNCOV
1023
    if (threads[t].ppVnodes == NULL) {
×
1024
      code = terrno;
×
1025
      break;
×
1026
    }
1027
  }
1028

UNCOV
1029
  for (int32_t v = 0; v < numOfVnodes; ++v) {
×
UNCOV
1030
    int32_t       t = v % threadNum;
×
UNCOV
1031
    SVnodeThread *pThread = &threads[t];
×
UNCOV
1032
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
×
UNCOV
1033
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
×
1034
    }
1035
  }
1036

UNCOV
1037
  pMgmt->state.openVnodes = 0;
×
UNCOV
1038
  pMgmt->state.dropVnodes = 0;
×
UNCOV
1039
  dInfo("restore %d vnodes with %d threads", numOfVnodes, threadNum);
×
1040

UNCOV
1041
  for (int32_t t = 0; t < threadNum; ++t) {
×
UNCOV
1042
    SVnodeThread *pThread = &threads[t];
×
UNCOV
1043
    if (pThread->vnodeNum == 0) continue;
×
1044

1045
    TdThreadAttr thAttr;
UNCOV
1046
    (void)taosThreadAttrInit(&thAttr);
×
UNCOV
1047
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
×
UNCOV
1048
    if (taosThreadCreate(&pThread->thread, &thAttr, vmRestoreVnodeInThread, pThread) != 0) {
×
1049
      dError("thread:%d, failed to create thread to restore vnode since %s", pThread->threadIndex, strerror(errno));
×
1050
    }
1051

UNCOV
1052
    (void)taosThreadAttrDestroy(&thAttr);
×
1053
  }
1054

UNCOV
1055
  for (int32_t t = 0; t < threadNum; ++t) {
×
UNCOV
1056
    SVnodeThread *pThread = &threads[t];
×
UNCOV
1057
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
×
UNCOV
1058
      (void)taosThreadJoin(pThread->thread, NULL);
×
UNCOV
1059
      taosThreadClear(&pThread->thread);
×
1060
    }
UNCOV
1061
    taosMemoryFree(pThread->ppVnodes);
×
1062
  }
UNCOV
1063
  taosMemoryFree(threads);
×
1064

UNCOV
1065
  for (int32_t i = 0; i < numOfVnodes; ++i) {
×
UNCOV
1066
    if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
×
UNCOV
1067
    vmReleaseVnode(pMgmt, ppVnodes[i]);
×
1068
  }
1069

UNCOV
1070
  if (ppVnodes != NULL) {
×
UNCOV
1071
    taosMemoryFree(ppVnodes);
×
1072
  }
1073

UNCOV
1074
  return vmInitTimer(pMgmt);
×
1075

1076
_exit:
1077
  for (int32_t t = 0; t < threadNum; ++t) {
1078
    SVnodeThread *pThread = &threads[t];
1079
    taosMemoryFree(pThread->ppVnodes);
1080
  }
1081
  taosMemoryFree(threads);
1082
  return code;
1083
}
1084

UNCOV
1085
static void vmStop(SVnodeMgmt *pMgmt) { vmCleanupTimer(pMgmt); }
×
1086

UNCOV
1087
SMgmtFunc vmGetMgmtFunc() {
×
UNCOV
1088
  SMgmtFunc mgmtFunc = {0};
×
UNCOV
1089
  mgmtFunc.openFp = vmInit;
×
UNCOV
1090
  mgmtFunc.closeFp = (NodeCloseFp)vmCleanup;
×
UNCOV
1091
  mgmtFunc.startFp = (NodeStartFp)vmStartVnodes;
×
UNCOV
1092
  mgmtFunc.stopFp = (NodeStopFp)vmStop;
×
UNCOV
1093
  mgmtFunc.requiredFp = vmRequire;
×
UNCOV
1094
  mgmtFunc.getHandlesFp = vmGetMsgHandles;
×
1095

UNCOV
1096
  return mgmtFunc;
×
1097
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc