• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3564

24 Dec 2024 05:40AM UTC coverage: 62.21% (+1.2%) from 61.045%
#3564

push

travis-ci

web-flow
Merge pull request #29289 from taosdata/fix/TD-33270-2

ci(stream):add stream unit test

138331 of 285924 branches covered (48.38%)

Branch coverage included in aggregate %.

215800 of 283329 relevant lines covered (76.17%)

19198660.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.37
/source/dnode/vnode/src/vnd/vnodeOpen.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "sync.h"
17
#include "tcs.h"
18
#include "tsdb.h"
19
#include "vnd.h"
20

21
void vnodeGetPrimaryDir(const char *relPath, int32_t diskPrimary, STfs *pTfs, char *buf, size_t bufLen) {
212,357✔
22
  if (pTfs) {
212,357!
23
    SDiskID diskId = {0};
212,381✔
24
    diskId.id = diskPrimary;
212,381✔
25
    snprintf(buf, bufLen - 1, "%s%s%s", tfsGetDiskPath(pTfs, diskId), TD_DIRSEP, relPath);
212,381✔
26
  } else {
27
    snprintf(buf, bufLen - 1, "%s", relPath);
×
28
  }
29
  buf[bufLen - 1] = '\0';
212,348✔
30
}
212,348✔
31

32
static int32_t vnodeMkDir(STfs *pTfs, const char *path) {
21,635✔
33
  if (pTfs) {
21,635!
34
    return tfsMkdirRecur(pTfs, path);
21,636✔
35
  } else {
36
    return taosMkDir(path);
×
37
  }
38
}
39

40
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, int32_t diskPrimary, STfs *pTfs) {
9,625✔
41
  int32_t    code = 0;
9,625✔
42
  SVnodeInfo info = {0};
9,625✔
43
  char       dir[TSDB_FILENAME_LEN] = {0};
9,625✔
44

45
  // check config
46
  if ((code = vnodeCheckCfg(pCfg)) < 0) {
9,625!
47
    vError("vgId:%d, failed to create vnode since:%s", pCfg->vgId, tstrerror(code));
×
48
    return code;
×
49
  }
50

51
  // create vnode env
52
  if (vnodeMkDir(pTfs, path)) {
9,665!
53
    vError("vgId:%d, failed to prepare vnode dir since %s, path: %s", pCfg->vgId, strerror(errno), path);
×
54
    return TAOS_SYSTEM_ERROR(errno);
×
55
  }
56
  vnodeGetPrimaryDir(path, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
9,667✔
57

58
  if (pCfg) {
9,667!
59
    info.config = *pCfg;
9,667✔
60
  } else {
61
    info.config = vnodeCfgDefault;
×
62
  }
63
  info.state.committed = -1;
9,667✔
64
  info.state.applied = -1;
9,667✔
65
  info.state.commitID = 0;
9,667✔
66

67
  SVnodeInfo oldInfo = {0};
9,667✔
68
  oldInfo.config = vnodeCfgDefault;
9,667✔
69
  if (vnodeLoadInfo(dir, &oldInfo) == 0) {
9,667!
70
    code = (oldInfo.config.dbId == info.config.dbId) ? 0 : TSDB_CODE_VND_ALREADY_EXIST_BUT_NOT_MATCH;
×
71
    if (code == 0) {
×
72
      vWarn("vgId:%d, vnode config info already exists at %s.", oldInfo.config.vgId, dir);
×
73
    } else {
74
      vError("vgId:%d, vnode config info already exists at %s. oldDbId:%" PRId64 "(%s) at cluster:%" PRId64
×
75
             ", newDbId:%" PRId64 "(%s) at cluser:%" PRId64 ", code:%s",
76
             oldInfo.config.vgId, dir, oldInfo.config.dbId, oldInfo.config.dbname,
77
             oldInfo.config.syncCfg.nodeInfo[oldInfo.config.syncCfg.myIndex].clusterId, info.config.dbId,
78
             info.config.dbname, info.config.syncCfg.nodeInfo[info.config.syncCfg.myIndex].clusterId, tstrerror(code));
79
    }
80
    return code;
×
81
  }
82

83
  vInfo("vgId:%d, save config while create", info.config.vgId);
9,667✔
84
  if ((code = vnodeSaveInfo(dir, &info)) < 0 || (code = vnodeCommitInfo(dir)) < 0) {
9,667!
85
    vError("vgId:%d, failed to save vnode config since %s", pCfg ? pCfg->vgId : 0, tstrerror(code));
×
86
    return code;
×
87
  }
88

89
  vInfo("vgId:%d, vnode is created", info.config.vgId);
9,667✔
90
  return 0;
9,667✔
91
}
92

93
bool vnodeShouldRemoveWal(SVnode *pVnode) { return pVnode->config.walCfg.clearFiles == 1; }
1,302✔
94

95
int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, int32_t diskPrimary, STfs *pTfs) {
1,302✔
96
  SVnodeInfo info = {0};
1,302✔
97
  char       dir[TSDB_FILENAME_LEN] = {0};
1,302✔
98
  int32_t    ret = 0;
1,302✔
99

100
  vnodeGetPrimaryDir(path, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
1,302✔
101

102
  ret = vnodeLoadInfo(dir, &info);
1,302✔
103
  if (ret < 0) {
1,302!
104
    vError("vgId:%d, failed to read vnode config from %s since %s", pReq->vgId, path, tstrerror(terrno));
×
105
    return ret;
×
106
  }
107

108
  SSyncCfg *pCfg = &info.config.syncCfg;
1,302✔
109

110
  pCfg->replicaNum = 0;
1,302✔
111
  pCfg->totalReplicaNum = 0;
1,302✔
112
  memset(&pCfg->nodeInfo, 0, sizeof(pCfg->nodeInfo));
1,302✔
113

114
  for (int i = 0; i < pReq->replica; ++i) {
4,969✔
115
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
3,667✔
116
    pNode->nodeId = pReq->replicas[i].id;
3,667✔
117
    pNode->nodePort = pReq->replicas[i].port;
3,667✔
118
    tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
3,667✔
119
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
3,667✔
120
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
3,667✔
121
    vInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId);
3,667!
122
    pCfg->replicaNum++;
3,667✔
123
  }
124
  if (pReq->selfIndex != -1) {
1,302!
125
    pCfg->myIndex = pReq->selfIndex;
1,302✔
126
  }
127
  for (int i = pCfg->replicaNum; i < pReq->replica + pReq->learnerReplica; ++i) {
1,615✔
128
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
313✔
129
    pNode->nodeId = pReq->learnerReplicas[pCfg->totalReplicaNum].id;
313✔
130
    pNode->nodePort = pReq->learnerReplicas[pCfg->totalReplicaNum].port;
313✔
131
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
313✔
132
    tstrncpy(pNode->nodeFqdn, pReq->learnerReplicas[pCfg->totalReplicaNum].fqdn, sizeof(pNode->nodeFqdn));
313✔
133
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
313✔
134
    vInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId);
313!
135
    pCfg->totalReplicaNum++;
313✔
136
  }
137
  pCfg->totalReplicaNum += pReq->replica;
1,302✔
138
  if (pReq->learnerSelfIndex != -1) {
1,302!
139
    pCfg->myIndex = pReq->replica + pReq->learnerSelfIndex;
×
140
  }
141
  pCfg->changeVersion = pReq->changeVersion;
1,302✔
142

143
  if (info.config.walCfg.clearFiles) {
1,302!
144
    info.config.walCfg.clearFiles = 0;
×
145

146
    vInfo("vgId:%d, reset wal clearFiles", pReq->vgId);
×
147
  }
148

149
  vInfo("vgId:%d, save config while alter, replicas:%d totalReplicas:%d selfIndex:%d changeVersion:%d", pReq->vgId,
1,302!
150
        pCfg->replicaNum, pCfg->totalReplicaNum, pCfg->myIndex, pCfg->changeVersion);
151

152
  info.config.syncCfg = *pCfg;
1,302✔
153
  ret = vnodeSaveInfo(dir, &info);
1,302✔
154
  if (ret < 0) {
1,302!
155
    vError("vgId:%d, failed to save vnode config since %s", pReq->vgId, tstrerror(terrno));
×
156
    return ret;
×
157
  }
158

159
  ret = vnodeCommitInfo(dir);
1,302✔
160
  if (ret < 0) {
1,302!
161
    vError("vgId:%d, failed to commit vnode config since %s", pReq->vgId, tstrerror(terrno));
×
162
    return ret;
×
163
  }
164

165
  vInfo("vgId:%d, vnode config is saved", info.config.vgId);
1,302!
166
  return 0;
1,302✔
167
}
168

169
static int32_t vnodeVgroupIdLen(int32_t vgId) {
198✔
170
  char tmp[TSDB_FILENAME_LEN];
171
  (void)tsnprintf(tmp, TSDB_FILENAME_LEN, "%d", vgId);
198✔
172
  return strlen(tmp);
198✔
173
}
174

175
int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId,
40✔
176
                            int32_t diskPrimary, STfs *pTfs) {
177
  int32_t ret = 0;
40✔
178

179
  char oldRname[TSDB_FILENAME_LEN] = {0};
40✔
180
  char newRname[TSDB_FILENAME_LEN] = {0};
40✔
181
  char tsdbPath[TSDB_FILENAME_LEN] = {0};
40✔
182
  char tsdbFilePrefix[TSDB_FILENAME_LEN] = {0};
40✔
183
  snprintf(tsdbPath, TSDB_FILENAME_LEN, "%s%stsdb", srcPath, TD_DIRSEP);
40✔
184
  snprintf(tsdbFilePrefix, TSDB_FILENAME_LEN, "tsdb%sv", TD_DIRSEP);
40✔
185
  int32_t prefixLen = strlen(tsdbFilePrefix);
40✔
186

187
  STfsDir *tsdbDir = NULL;
40✔
188
  int32_t  tret = tfsOpendir(pTfs, tsdbPath, &tsdbDir);
40✔
189
  if (tsdbDir == NULL) {
40!
190
    return 0;
×
191
  }
192

193
  while (1) {
278✔
194
    const STfsFile *tsdbFile = tfsReaddir(tsdbDir);
318✔
195
    if (tsdbFile == NULL) break;
318✔
196
    if (tsdbFile->rname[0] == '\0') continue;
358!
197
    tstrncpy(oldRname, tsdbFile->rname, TSDB_FILENAME_LEN);
278✔
198

199
    char *tsdbFilePrefixPos = strstr(oldRname, tsdbFilePrefix);
278✔
200
    if (tsdbFilePrefixPos == NULL) continue;
278✔
201

202
    int32_t tsdbFileVgId = 0;
198✔
203
    ret = taosStr2int32(tsdbFilePrefixPos + prefixLen, &tsdbFileVgId);
198✔
204
    if (ret != 0) {
198!
205
      vError("vgId:%d, failed to get tsdb file vgid since %s", dstVgId, tstrerror(ret));
×
206
      tfsClosedir(tsdbDir);
×
207
      return ret;
×
208
    }
209

210
    if (tsdbFileVgId == srcVgId) {
198!
211
      char *tsdbFileSurfixPos = tsdbFilePrefixPos + prefixLen + vnodeVgroupIdLen(srcVgId);
198✔
212

213
      tsdbFilePrefixPos[prefixLen] = 0;
198✔
214
      snprintf(newRname, TSDB_FILENAME_LEN, "%s%d%s", oldRname, dstVgId, tsdbFileSurfixPos);
198✔
215
      vInfo("vgId:%d, rename file from %s to %s", dstVgId, tsdbFile->rname, newRname);
198!
216

217
      ret = tfsRename(pTfs, diskPrimary, tsdbFile->rname, newRname);
198✔
218
      if (ret != 0) {
198!
219
        vError("vgId:%d, failed to rename file from %s to %s since %s", dstVgId, tsdbFile->rname, newRname, terrstr());
×
220
        tfsClosedir(tsdbDir);
×
221
        return ret;
×
222
      }
223
    }
224
  }
225

226
  tfsClosedir(tsdbDir);
40✔
227

228
  vInfo("vgId:%d, rename dir from %s to %s", dstVgId, srcPath, dstPath);
40!
229
  ret = tfsRename(pTfs, diskPrimary, srcPath, dstPath);
40✔
230
  if (ret != 0) {
40!
231
    vError("vgId:%d, failed to rename dir from %s to %s since %s", dstVgId, srcPath, dstPath, terrstr());
×
232
  }
233
  return ret;
40✔
234
}
235

236
int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq,
40✔
237
                            int32_t diskPrimary, STfs *pTfs) {
238
  SVnodeInfo info = {0};
40✔
239
  char       dir[TSDB_FILENAME_LEN] = {0};
40✔
240
  int32_t    ret = 0;
40✔
241

242
  vnodeGetPrimaryDir(srcPath, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
40✔
243

244
  ret = vnodeLoadInfo(dir, &info);
40✔
245
  if (ret < 0) {
40!
246
    vError("vgId:%d, failed to read vnode config from %s since %s", pReq->srcVgId, srcPath, tstrerror(terrno));
×
247
    return ret;
×
248
  }
249

250
  vInfo("vgId:%d, alter hashrange from [%u, %u] to [%u, %u]", pReq->srcVgId, info.config.hashBegin, info.config.hashEnd,
40!
251
        pReq->hashBegin, pReq->hashEnd);
252
  info.config.vgId = pReq->dstVgId;
40✔
253
  info.config.hashBegin = pReq->hashBegin;
40✔
254
  info.config.hashEnd = pReq->hashEnd;
40✔
255
  info.config.hashChange = true;
40✔
256
  info.config.walCfg.vgId = pReq->dstVgId;
40✔
257
  info.config.syncCfg.changeVersion = pReq->changeVersion;
40✔
258

259
  SSyncCfg *pCfg = &info.config.syncCfg;
40✔
260
  pCfg->myIndex = 0;
40✔
261
  pCfg->replicaNum = 1;
40✔
262
  pCfg->totalReplicaNum = 1;
40✔
263
  memset(&pCfg->nodeInfo, 0, sizeof(pCfg->nodeInfo));
40✔
264

265
  vInfo("vgId:%d, alter vnode replicas to 1", pReq->srcVgId);
40!
266
  SNodeInfo *pNode = &pCfg->nodeInfo[0];
40✔
267
  pNode->nodePort = tsServerPort;
40✔
268
  tstrncpy(pNode->nodeFqdn, tsLocalFqdn, TSDB_FQDN_LEN);
40✔
269
  bool ret1 = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
40✔
270
  vInfo("vgId:%d, ep:%s:%u dnode:%d", pReq->srcVgId, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId);
40!
271

272
  info.config.syncCfg = *pCfg;
40✔
273

274
  ret = vnodeSaveInfo(dir, &info);
40✔
275
  if (ret < 0) {
40!
276
    vError("vgId:%d, failed to save vnode config since %s", pReq->dstVgId, tstrerror(terrno));
×
277
    return ret;
×
278
  }
279

280
  ret = vnodeCommitInfo(dir);
40✔
281
  if (ret < 0) {
40!
282
    vError("vgId:%d, failed to commit vnode config since %s", pReq->dstVgId, tstrerror(terrno));
×
283
    return ret;
×
284
  }
285

286
  vInfo("vgId:%d, rename %s to %s", pReq->dstVgId, srcPath, dstPath);
40!
287
  ret = vnodeRenameVgroupId(srcPath, dstPath, pReq->srcVgId, pReq->dstVgId, diskPrimary, pTfs);
40✔
288
  if (ret < 0) {
40!
289
    vError("vgId:%d, failed to rename vnode from %s to %s since %s", pReq->dstVgId, srcPath, dstPath,
×
290
           tstrerror(terrno));
291
    return ret;
×
292
  }
293

294
  vInfo("vgId:%d, vnode hashrange is altered", info.config.vgId);
40!
295
  return 0;
40✔
296
}
297

298
int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId,
×
299
                             int32_t diskPrimary, STfs *pTfs) {
300
  SVnodeInfo info = {0};
×
301
  char       dir[TSDB_FILENAME_LEN] = {0};
×
302
  int32_t    code = 0;
×
303

304
  vnodeGetPrimaryDir(dstPath, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
×
305
  if (vnodeLoadInfo(dir, &info) == 0) {
×
306
    if (info.config.vgId != dstVgId) {
×
307
      vError("vgId:%d, unexpected vnode config.vgId:%d", dstVgId, info.config.vgId);
×
308
      return TSDB_CODE_FAILED;
×
309
    }
310
    return dstVgId;
×
311
  }
312

313
  vnodeGetPrimaryDir(srcPath, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
×
314
  if ((code = vnodeLoadInfo(dir, &info)) < 0) {
×
315
    vError("vgId:%d, failed to read vnode config from %s since %s", srcVgId, srcPath, tstrerror(terrno));
×
316
    return code;
×
317
  }
318

319
  if (info.config.vgId == srcVgId) {
×
320
    vInfo("vgId:%d, rollback alter hashrange", srcVgId);
×
321
    return srcVgId;
×
322
  } else if (info.config.vgId != dstVgId) {
×
323
    vError("vgId:%d, unexpected vnode config.vgId:%d", dstVgId, info.config.vgId);
×
324
    return TSDB_CODE_FAILED;
×
325
  }
326

327
  vInfo("vgId:%d, rename %s to %s", dstVgId, srcPath, dstPath);
×
328
  if (vnodeRenameVgroupId(srcPath, dstPath, srcVgId, dstVgId, diskPrimary, pTfs) < 0) {
×
329
    vError("vgId:%d, failed to rename vnode from %s to %s since %s", dstVgId, srcPath, dstPath, tstrerror(terrno));
×
330
    return TSDB_CODE_FAILED;
×
331
  }
332

333
  return dstVgId;
×
334
}
335

336
void vnodeDestroy(int32_t vgId, const char *path, STfs *pTfs, int32_t nodeId) {
4,530✔
337
  vInfo("path:%s is removed while destroy vnode", path);
4,530✔
338
  if (tfsRmdir(pTfs, path) < 0) {
4,530!
339
    vError("failed to remove path:%s since %s", path, tstrerror(terrno));
×
340
  }
341

342
  // int32_t nlevel = tfsGetLevel(pTfs);
343
  if (nodeId > 0 && vgId > 0 /*&& nlevel > 1*/ && tsS3Enabled) {
4,530!
344
    char vnode_prefix[TSDB_FILENAME_LEN];
345
    snprintf(vnode_prefix, TSDB_FILENAME_LEN, "%d/v%df", nodeId, vgId);
×
346
    tcsDeleteObjectsByPrefix(vnode_prefix);
×
347
  }
348
}
4,530✔
349

350
static int32_t vnodeCheckDisk(int32_t diskPrimary, STfs *pTfs) {
11,967✔
351
  int32_t ndisk = 1;
11,967✔
352
  if (pTfs) {
11,967!
353
    ndisk = tfsGetDisksAtLevel(pTfs, 0);
11,977✔
354
  }
355
  if (diskPrimary < 0 || diskPrimary >= ndisk) {
11,969!
356
    vError("disk:%d is unavailable from the %d disks mounted at level 0", diskPrimary, ndisk);
×
357
    return terrno = TSDB_CODE_FS_INVLD_CFG;
×
358
  }
359
  return 0;
11,979✔
360
}
361

362
SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgCb, bool force) {
11,976✔
363
  SVnode    *pVnode = NULL;
11,976✔
364
  SVnodeInfo info = {0};
11,976✔
365
  char       dir[TSDB_FILENAME_LEN] = {0};
11,976✔
366
  char       tdir[TSDB_FILENAME_LEN * 2] = {0};
11,976✔
367
  int32_t    ret = 0;
11,976✔
368
  terrno = TSDB_CODE_SUCCESS;
11,976✔
369

370
  if (vnodeCheckDisk(diskPrimary, pTfs)) {
11,979!
371
    vError("failed to open vnode from %s since %s. diskPrimary:%d", path, terrstr(), diskPrimary);
×
372
    return NULL;
×
373
  }
374
  vnodeGetPrimaryDir(path, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
11,979✔
375

376
  info.config = vnodeCfgDefault;
11,978✔
377

378
  // load vnode info
379
  vInfo("vgId:%d, start to vnode load info %s", info.config.vgId, dir);
11,978✔
380
  ret = vnodeLoadInfo(dir, &info);
11,989✔
381
  if (ret < 0) {
11,973!
382
    vError("failed to open vnode from %s since %s", path, tstrerror(terrno));
×
383
    terrno = TSDB_CODE_NEED_RETRY;
×
384
    return NULL;
×
385
  }
386

387
  if (vnodeMkDir(pTfs, path)) {
11,973!
388
    vError("vgId:%d, failed to prepare vnode dir since %s, path: %s", info.config.vgId, strerror(errno), path);
×
389
    return NULL;
×
390
  }
391
  // save vnode info on dnode ep changed
392
  bool      updated = false;
11,945✔
393
  SSyncCfg *pCfg = &info.config.syncCfg;
11,945✔
394
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
30,116✔
395
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
18,128✔
396
    if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) {
18,128!
397
      updated = true;
×
398
    }
399
  }
400
  if (updated) {
11,988!
401
    vInfo("vgId:%d, save vnode info since dnode info changed", info.config.vgId);
×
402
    if (vnodeSaveInfo(dir, &info) < 0) {
×
403
      vError("vgId:%d, failed to save vnode info since %s", info.config.vgId, tstrerror(terrno));
×
404
    }
405

406
    if (vnodeCommitInfo(dir) < 0) {
×
407
      vError("vgId:%d, failed to commit vnode info since %s", info.config.vgId, tstrerror(terrno));
×
408
    }
409
  }
410

411
  // create handle
412
  pVnode = taosMemoryCalloc(1, sizeof(*pVnode) + strlen(path) + 1);
11,960!
413
  if (pVnode == NULL) {
11,938!
414
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
415
    vError("vgId:%d, failed to open vnode since %s", info.config.vgId, tstrerror(terrno));
×
416
    return NULL;
×
417
  }
418

419
  pVnode->path = (char *)&pVnode[1];
11,938✔
420
  memcpy(pVnode->path, path, strlen(path) + 1);
11,938✔
421
  pVnode->config = info.config;
11,938✔
422
  pVnode->state.committed = info.state.committed;
11,938✔
423
  pVnode->state.commitTerm = info.state.commitTerm;
11,938✔
424
  pVnode->state.commitID = info.state.commitID;
11,938✔
425
  pVnode->state.applied = info.state.committed;
11,938✔
426
  pVnode->state.applyTerm = info.state.commitTerm;
11,938✔
427
  pVnode->pTfs = pTfs;
11,938✔
428
  pVnode->diskPrimary = diskPrimary;
11,938✔
429
  pVnode->msgCb = msgCb;
11,938✔
430
  (void)taosThreadMutexInit(&pVnode->lock, NULL);
11,938✔
431
  pVnode->blocked = false;
11,926✔
432
  pVnode->disableWrite = false;
11,926✔
433

434
  if (tsem_init(&pVnode->syncSem, 0, 0) != 0) {
11,926!
435
    vError("vgId:%d, failed to init semaphore", TD_VID(pVnode));
×
436
    goto _err;
×
437
  }
438
  (void)taosThreadMutexInit(&pVnode->mutex, NULL);
11,961✔
439
  (void)taosThreadCondInit(&pVnode->poolNotEmpty, NULL);
11,974✔
440

441
  int8_t rollback = vnodeShouldRollback(pVnode);
11,919✔
442

443
  // open buffer pool
444
  vInfo("vgId:%d, start to open vnode buffer pool", TD_VID(pVnode));
11,967✔
445
  if (vnodeOpenBufPool(pVnode) < 0) {
11,982!
446
    vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));
×
447
    goto _err;
×
448
  }
449

450
  // open meta
451
  vInfo("vgId:%d, start to open vnode meta", TD_VID(pVnode));
11,979✔
452
  if (metaOpen(pVnode, &pVnode->pMeta, rollback) < 0) {
11,979!
453
    vError("vgId:%d, failed to open vnode meta since %s", TD_VID(pVnode), tstrerror(terrno));
×
454
    goto _err;
×
455
  }
456

457
  vInfo("vgId:%d, start to upgrade meta", TD_VID(pVnode));
11,979✔
458
  if (metaUpgrade(pVnode, &pVnode->pMeta) < 0) {
11,980!
459
    vError("vgId:%d, failed to upgrade meta since %s", TD_VID(pVnode), tstrerror(terrno));
×
460
  }
461

462
  // open tsdb
463
  vInfo("vgId:%d, start to open vnode tsdb", TD_VID(pVnode));
11,978✔
464
  if (!VND_IS_RSMA(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, NULL, rollback, force) < 0) {
11,979!
465
    vError("vgId:%d, failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
×
466
    goto _err;
×
467
  }
468

469
  // open wal
470
  (void)tsnprintf(tdir, sizeof(tdir), "%s%s%s", dir, TD_DIRSEP, VNODE_WAL_DIR);
11,979✔
471
  ret = taosRealPath(tdir, NULL, sizeof(tdir));
11,977✔
472
  TAOS_UNUSED(ret);
473

474
  vInfo("vgId:%d, start to open vnode wal", TD_VID(pVnode));
11,977✔
475
  pVnode->pWal = walOpen(tdir, &(pVnode->config.walCfg));
11,979✔
476
  if (pVnode->pWal == NULL) {
11,979!
477
    vError("vgId:%d, failed to open vnode wal since %s. wal:%s", TD_VID(pVnode), tstrerror(terrno), tdir);
×
478
    goto _err;
×
479
  }
480

481
  // open tq
482
  (void)tsnprintf(tdir, sizeof(tdir), "%s%s%s", dir, TD_DIRSEP, VNODE_TQ_DIR);
11,979✔
483
  ret = taosRealPath(tdir, NULL, sizeof(tdir));
11,979✔
484
  TAOS_UNUSED(ret);
485

486
  // open query
487
  vInfo("vgId:%d, start to open vnode query", TD_VID(pVnode));
11,979✔
488
  if (vnodeQueryOpen(pVnode)) {
11,979!
489
    vError("vgId:%d, failed to open vnode query since %s", TD_VID(pVnode), tstrerror(terrno));
×
490
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
491
    goto _err;
×
492
  }
493

494
  // sma required the tq is initialized before the vnode open
495
  vInfo("vgId:%d, start to open vnode tq", TD_VID(pVnode));
11,979✔
496
  if (tqOpen(tdir, pVnode)) {
11,979!
497
    vError("vgId:%d, failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno));
×
498
    goto _err;
×
499
  }
500

501
  // open sma
502
  vInfo("vgId:%d, start to open vnode sma", TD_VID(pVnode));
11,977✔
503
  if (smaOpen(pVnode, rollback, force)) {
11,979!
504
    vError("vgId:%d, failed to open vnode sma since %s", TD_VID(pVnode), tstrerror(terrno));
×
505
    goto _err;
×
506
  }
507

508
  // vnode begin
509
  vInfo("vgId:%d, start to begin vnode", TD_VID(pVnode));
11,979✔
510
  if (vnodeBegin(pVnode) < 0) {
11,979!
511
    vError("vgId:%d, failed to begin since %s", TD_VID(pVnode), tstrerror(terrno));
×
512
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
513
    goto _err;
×
514
  }
515

516
  // open sync
517
  vInfo("vgId:%d, start to open sync, changeVersion:%d", TD_VID(pVnode), info.config.syncCfg.changeVersion);
11,978✔
518
  if (vnodeSyncOpen(pVnode, dir, info.config.syncCfg.changeVersion)) {
11,980!
519
    vError("vgId:%d, failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno));
×
520
    goto _err;
×
521
  }
522

523
  if (rollback) {
11,979!
524
    vnodeRollback(pVnode);
×
525
  }
526

527
  snprintf(pVnode->monitor.strClusterId, TSDB_CLUSTER_ID_LEN, "%" PRId64, pVnode->config.syncCfg.nodeInfo[0].clusterId);
11,979✔
528
  snprintf(pVnode->monitor.strDnodeId, TSDB_NODE_ID_LEN, "%" PRId32, pVnode->config.syncCfg.nodeInfo[0].nodeId);
11,979✔
529
  snprintf(pVnode->monitor.strVgId, TSDB_VGROUP_ID_LEN, "%" PRId32, pVnode->config.vgId);
11,979✔
530

531
  return pVnode;
11,979✔
532

533
_err:
×
534
  if (pVnode->pQuery) vnodeQueryClose(pVnode);
×
535
  if (pVnode->pTq) tqClose(pVnode->pTq);
×
536
  if (pVnode->pWal) walClose(pVnode->pWal);
×
537
  if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
×
538
  if (pVnode->pSma) smaClose(pVnode->pSma);
×
539
  if (pVnode->pMeta) metaClose(&pVnode->pMeta);
×
540
  if (pVnode->freeList) vnodeCloseBufPool(pVnode);
×
541

542
  taosMemoryFree(pVnode);
×
543
  return NULL;
×
544
}
545

546
void vnodePreClose(SVnode *pVnode) {
11,978✔
547
  vnodeSyncPreClose(pVnode);
11,978✔
548
  vnodeQueryPreClose(pVnode);
11,979✔
549
}
11,977✔
550

551
void vnodePostClose(SVnode *pVnode) { vnodeSyncPostClose(pVnode); }
11,979✔
552

553
void vnodeClose(SVnode *pVnode) {
11,979✔
554
  if (pVnode) {
11,979!
555
    vnodeAWait(&pVnode->commitTask);
11,979✔
556
    vnodeSyncClose(pVnode);
11,979✔
557
    vnodeQueryClose(pVnode);
11,978✔
558
    tqClose(pVnode->pTq);
11,979✔
559
    walClose(pVnode->pWal);
11,979✔
560
    if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
11,979!
561
    smaClose(pVnode->pSma);
11,977✔
562
    if (pVnode->pMeta) metaClose(&pVnode->pMeta);
11,971!
563
    vnodeCloseBufPool(pVnode);
11,977✔
564

565
    // destroy handle
566
    if (tsem_destroy(&pVnode->syncSem) != 0) {
11,979!
567
      vError("vgId:%d, failed to destroy semaphore", TD_VID(pVnode));
×
568
    }
569
    (void)taosThreadCondDestroy(&pVnode->poolNotEmpty);
11,979✔
570
    (void)taosThreadMutexDestroy(&pVnode->mutex);
11,979✔
571
    (void)taosThreadMutexDestroy(&pVnode->lock);
11,979✔
572
    taosMemoryFree(pVnode);
11,979!
573
  }
574
}
11,979✔
575

576
// start the sync timer after the queue is ready
577
int32_t vnodeStart(SVnode *pVnode) {
11,979✔
578
  if (pVnode == NULL) {
11,979!
579
    return TSDB_CODE_INVALID_PARA;
×
580
  }
581
  return vnodeSyncStart(pVnode);
11,979✔
582
}
583

584
int32_t vnodeIsCatchUp(SVnode *pVnode) { return syncIsCatchUp(pVnode->sync); }
4,591✔
585

586
ESyncRole vnodeGetRole(SVnode *pVnode) { return syncGetRole(pVnode->sync); }
4,591✔
587

588
int32_t vnodeUpdateArbTerm(SVnode *pVnode, int64_t arbTerm) { return syncUpdateArbTerm(pVnode->sync, arbTerm); }
16✔
589
int32_t vnodeGetArbToken(SVnode *pVnode, char *outToken) { return syncGetArbToken(pVnode->sync, outToken); }
18✔
590

591
void vnodeStop(SVnode *pVnode) {}
×
592

593
int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; }
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc