• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3910

23 Apr 2025 02:47AM UTC coverage: 62.362% (-0.7%) from 63.063%
#3910

push

travis-ci

web-flow
docs(datain): add missing health status types (#30828)

155061 of 317305 branches covered (48.87%)

Branch coverage included in aggregate %.

240172 of 316469 relevant lines covered (75.89%)

6269478.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.17
/source/dnode/vnode/src/vnd/vnodeOpen.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "sync.h"
17
#include "tcs.h"
18
#include "tq.h"
19
#include "tsdb.h"
20
#include "vnd.h"
21

22
void vnodeGetPrimaryDir(const char *relPath, int32_t diskPrimary, STfs *pTfs, char *buf, size_t bufLen) {
220,963✔
23
  if (pTfs) {
220,963!
24
    SDiskID diskId = {0};
220,994✔
25
    diskId.id = diskPrimary;
220,994✔
26
    snprintf(buf, bufLen - 1, "%s%s%s", tfsGetDiskPath(pTfs, diskId), TD_DIRSEP, relPath);
220,994✔
27
  } else {
28
    snprintf(buf, bufLen - 1, "%s", relPath);
×
29
  }
30
  buf[bufLen - 1] = '\0';
220,858✔
31
}
220,858✔
32

33
static int32_t vnodeMkDir(STfs *pTfs, const char *path) {
21,069✔
34
  if (pTfs) {
21,069!
35
    return tfsMkdirRecur(pTfs, path);
21,070✔
36
  } else {
37
    return taosMkDir(path);
×
38
  }
39
}
40

41
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, int32_t diskPrimary, STfs *pTfs) {
9,317✔
42
  int32_t    code = 0;
9,317✔
43
  SVnodeInfo info = {0};
9,317✔
44
  char       dir[TSDB_FILENAME_LEN] = {0};
9,317✔
45

46
  // check config
47
  if ((code = vnodeCheckCfg(pCfg)) < 0) {
9,317!
48
    vError("vgId:%d, failed to create vnode since:%s", pCfg->vgId, tstrerror(code));
×
49
    return code;
×
50
  }
51

52
  // create vnode env
53
  if (vnodeMkDir(pTfs, path)) {
9,376!
54
    vError("vgId:%d, failed to prepare vnode dir since %s, path: %s", pCfg->vgId, strerror(ERRNO), path);
×
55
    return TAOS_SYSTEM_ERROR(ERRNO);
×
56
  }
57
  vnodeGetPrimaryDir(path, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
9,391✔
58

59
  if (pCfg) {
9,391!
60
    info.config = *pCfg;
9,391✔
61
  } else {
62
    info.config = vnodeCfgDefault;
×
63
  }
64
  info.state.committed = -1;
9,391✔
65
  info.state.applied = -1;
9,391✔
66
  info.state.commitID = 0;
9,391✔
67

68
  SVnodeInfo oldInfo = {0};
9,391✔
69
  oldInfo.config = vnodeCfgDefault;
9,391✔
70
  if (vnodeLoadInfo(dir, &oldInfo) == 0) {
9,391!
71
    code = (oldInfo.config.dbId == info.config.dbId) ? 0 : TSDB_CODE_VND_ALREADY_EXIST_BUT_NOT_MATCH;
×
72
    if (code == 0) {
×
73
      vWarn("vgId:%d, vnode config info already exists at %s.", oldInfo.config.vgId, dir);
×
74
    } else {
75
      vError("vgId:%d, vnode config info already exists at %s. oldDbId:%" PRId64 "(%s) at cluster:%" PRId64
×
76
             ", newDbId:%" PRId64 "(%s) at cluser:%" PRId64 ", code:%s",
77
             oldInfo.config.vgId, dir, oldInfo.config.dbId, oldInfo.config.dbname,
78
             oldInfo.config.syncCfg.nodeInfo[oldInfo.config.syncCfg.myIndex].clusterId, info.config.dbId,
79
             info.config.dbname, info.config.syncCfg.nodeInfo[info.config.syncCfg.myIndex].clusterId, tstrerror(code));
80
    }
81
    return code;
×
82
  }
83

84
  vInfo("vgId:%d, save config while create", info.config.vgId);
9,390✔
85
  if ((code = vnodeSaveInfo(dir, &info)) < 0 || (code = vnodeCommitInfo(dir)) < 0) {
9,391!
86
    vError("vgId:%d, failed to save vnode config since %s", pCfg ? pCfg->vgId : 0, tstrerror(code));
×
87
    return code;
×
88
  }
89

90
  vInfo("vgId:%d, vnode is created", info.config.vgId);
9,391✔
91
  return 0;
9,391✔
92
}
93

94
bool vnodeShouldRemoveWal(SVnode *pVnode) { return pVnode->config.walCfg.clearFiles == 1; }
1,240✔
95

96
int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, int32_t diskPrimary, STfs *pTfs) {
1,240✔
97
  SVnodeInfo info = {0};
1,240✔
98
  char       dir[TSDB_FILENAME_LEN] = {0};
1,240✔
99
  int32_t    ret = 0;
1,240✔
100

101
  vnodeGetPrimaryDir(path, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
1,240✔
102

103
  ret = vnodeLoadInfo(dir, &info);
1,240✔
104
  if (ret < 0) {
1,240!
105
    vError("vgId:%d, failed to read vnode config from %s since %s", pReq->vgId, path, tstrerror(terrno));
×
106
    return ret;
×
107
  }
108

109
  SSyncCfg *pCfg = &info.config.syncCfg;
1,240✔
110

111
  pCfg->replicaNum = 0;
1,240✔
112
  pCfg->totalReplicaNum = 0;
1,240✔
113
  memset(&pCfg->nodeInfo, 0, sizeof(pCfg->nodeInfo));
1,240✔
114

115
  for (int i = 0; i < pReq->replica; ++i) {
4,819✔
116
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
3,579✔
117
    pNode->nodeId = pReq->replicas[i].id;
3,579✔
118
    pNode->nodePort = pReq->replicas[i].port;
3,579✔
119
    tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
3,579✔
120
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
3,579✔
121
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
3,579✔
122
    vInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId);
3,579!
123
    pCfg->replicaNum++;
3,579✔
124
  }
125
  if (pReq->selfIndex != -1) {
1,240!
126
    pCfg->myIndex = pReq->selfIndex;
1,240✔
127
  }
128
  for (int i = pCfg->replicaNum; i < pReq->replica + pReq->learnerReplica; ++i) {
1,535✔
129
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
295✔
130
    pNode->nodeId = pReq->learnerReplicas[pCfg->totalReplicaNum].id;
295✔
131
    pNode->nodePort = pReq->learnerReplicas[pCfg->totalReplicaNum].port;
295✔
132
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
295✔
133
    tstrncpy(pNode->nodeFqdn, pReq->learnerReplicas[pCfg->totalReplicaNum].fqdn, sizeof(pNode->nodeFqdn));
295✔
134
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
295✔
135
    vInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId);
295!
136
    pCfg->totalReplicaNum++;
295✔
137
  }
138
  pCfg->totalReplicaNum += pReq->replica;
1,240✔
139
  if (pReq->learnerSelfIndex != -1) {
1,240!
140
    pCfg->myIndex = pReq->replica + pReq->learnerSelfIndex;
×
141
  }
142
  pCfg->changeVersion = pReq->changeVersion;
1,240✔
143

144
  if (info.config.walCfg.clearFiles) {
1,240!
145
    info.config.walCfg.clearFiles = 0;
×
146

147
    vInfo("vgId:%d, reset wal clearFiles", pReq->vgId);
×
148
  }
149

150
  vInfo("vgId:%d, save config while alter, replicas:%d totalReplicas:%d selfIndex:%d changeVersion:%d", pReq->vgId,
1,240!
151
        pCfg->replicaNum, pCfg->totalReplicaNum, pCfg->myIndex, pCfg->changeVersion);
152

153
  info.config.syncCfg = *pCfg;
1,240✔
154
  ret = vnodeSaveInfo(dir, &info);
1,240✔
155
  if (ret < 0) {
1,240!
156
    vError("vgId:%d, failed to save vnode config since %s", pReq->vgId, tstrerror(terrno));
×
157
    return ret;
×
158
  }
159

160
  ret = vnodeCommitInfo(dir);
1,240✔
161
  if (ret < 0) {
1,240!
162
    vError("vgId:%d, failed to commit vnode config since %s", pReq->vgId, tstrerror(terrno));
×
163
    return ret;
×
164
  }
165

166
  vInfo("vgId:%d, vnode config is saved", info.config.vgId);
1,240!
167
  return 0;
1,240✔
168
}
169

170
static int32_t vnodeVgroupIdLen(int32_t vgId) {
54✔
171
  char tmp[TSDB_FILENAME_LEN];
172
  (void)tsnprintf(tmp, TSDB_FILENAME_LEN, "%d", vgId);
54✔
173
  return strlen(tmp);
54✔
174
}
175

176
int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId,
30✔
177
                            int32_t diskPrimary, STfs *pTfs) {
178
  int32_t ret = 0;
30✔
179

180
  char oldRname[TSDB_FILENAME_LEN] = {0};
30✔
181
  char newRname[TSDB_FILENAME_LEN] = {0};
30✔
182
  char tsdbPath[TSDB_FILENAME_LEN] = {0};
30✔
183
  char tsdbFilePrefix[TSDB_FILENAME_LEN] = {0};
30✔
184
  snprintf(tsdbPath, TSDB_FILENAME_LEN, "%s%stsdb", srcPath, TD_DIRSEP);
30✔
185
  snprintf(tsdbFilePrefix, TSDB_FILENAME_LEN, "tsdb%sv", TD_DIRSEP);
30✔
186
  int32_t prefixLen = strlen(tsdbFilePrefix);
30✔
187

188
  STfsDir *tsdbDir = NULL;
30✔
189
  int32_t  tret = tfsOpendir(pTfs, tsdbPath, &tsdbDir);
30✔
190
  if (tsdbDir == NULL) {
30!
191
    return 0;
×
192
  }
193

194
  while (1) {
114✔
195
    const STfsFile *tsdbFile = tfsReaddir(tsdbDir);
144✔
196
    if (tsdbFile == NULL) break;
144✔
197
    if (tsdbFile->rname[0] == '\0') continue;
174!
198
    tstrncpy(oldRname, tsdbFile->rname, TSDB_FILENAME_LEN);
114✔
199

200
    char *tsdbFilePrefixPos = strstr(oldRname, tsdbFilePrefix);
114✔
201
    if (tsdbFilePrefixPos == NULL) continue;
114✔
202

203
    int32_t tsdbFileVgId = 0;
54✔
204
    ret = taosStr2int32(tsdbFilePrefixPos + prefixLen, &tsdbFileVgId);
54✔
205
    if (ret != 0) {
54!
206
      vError("vgId:%d, failed to get tsdb file vgid since %s", dstVgId, tstrerror(ret));
×
207
      tfsClosedir(tsdbDir);
×
208
      return ret;
×
209
    }
210

211
    if (tsdbFileVgId == srcVgId) {
54!
212
      char *tsdbFileSurfixPos = tsdbFilePrefixPos + prefixLen + vnodeVgroupIdLen(srcVgId);
54✔
213

214
      tsdbFilePrefixPos[prefixLen] = 0;
54✔
215
      snprintf(newRname, TSDB_FILENAME_LEN, "%s%d%s", oldRname, dstVgId, tsdbFileSurfixPos);
54✔
216
      vInfo("vgId:%d, rename file from %s to %s", dstVgId, tsdbFile->rname, newRname);
54!
217

218
      ret = tfsRename(pTfs, diskPrimary, tsdbFile->rname, newRname);
54✔
219
      if (ret != 0) {
54!
220
        vError("vgId:%d, failed to rename file from %s to %s since %s", dstVgId, tsdbFile->rname, newRname, terrstr());
×
221
        tfsClosedir(tsdbDir);
×
222
        return ret;
×
223
      }
224
    }
225
  }
226

227
  tfsClosedir(tsdbDir);
30✔
228

229
  vInfo("vgId:%d, rename dir from %s to %s", dstVgId, srcPath, dstPath);
30!
230
  ret = tfsRename(pTfs, diskPrimary, srcPath, dstPath);
30✔
231
  if (ret != 0) {
30!
232
    vError("vgId:%d, failed to rename dir from %s to %s since %s", dstVgId, srcPath, dstPath, terrstr());
×
233
  }
234
  return ret;
30✔
235
}
236

237
int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq,
30✔
238
                            int32_t diskPrimary, STfs *pTfs) {
239
  SVnodeInfo info = {0};
30✔
240
  char       dir[TSDB_FILENAME_LEN] = {0};
30✔
241
  int32_t    ret = 0;
30✔
242

243
  vnodeGetPrimaryDir(srcPath, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
30✔
244

245
  ret = vnodeLoadInfo(dir, &info);
30✔
246
  if (ret < 0) {
30!
247
    vError("vgId:%d, failed to read vnode config from %s since %s", pReq->srcVgId, srcPath, tstrerror(terrno));
×
248
    return ret;
×
249
  }
250

251
  vInfo("vgId:%d, alter hashrange from [%u, %u] to [%u, %u]", pReq->srcVgId, info.config.hashBegin, info.config.hashEnd,
30!
252
        pReq->hashBegin, pReq->hashEnd);
253
  info.config.vgId = pReq->dstVgId;
30✔
254
  info.config.hashBegin = pReq->hashBegin;
30✔
255
  info.config.hashEnd = pReq->hashEnd;
30✔
256
  info.config.hashChange = true;
30✔
257
  info.config.walCfg.vgId = pReq->dstVgId;
30✔
258
  info.config.syncCfg.changeVersion = pReq->changeVersion;
30✔
259

260
  SSyncCfg *pCfg = &info.config.syncCfg;
30✔
261
  pCfg->myIndex = 0;
30✔
262
  pCfg->replicaNum = 1;
30✔
263
  pCfg->totalReplicaNum = 1;
30✔
264
  memset(&pCfg->nodeInfo, 0, sizeof(pCfg->nodeInfo));
30✔
265

266
  vInfo("vgId:%d, alter vnode replicas to 1", pReq->srcVgId);
30!
267
  SNodeInfo *pNode = &pCfg->nodeInfo[0];
30✔
268
  pNode->nodePort = tsServerPort;
30✔
269
  tstrncpy(pNode->nodeFqdn, tsLocalFqdn, TSDB_FQDN_LEN);
30✔
270
  bool ret1 = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
30✔
271
  vInfo("vgId:%d, ep:%s:%u dnode:%d", pReq->srcVgId, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId);
30!
272

273
  info.config.syncCfg = *pCfg;
30✔
274

275
  ret = vnodeSaveInfo(dir, &info);
30✔
276
  if (ret < 0) {
30!
277
    vError("vgId:%d, failed to save vnode config since %s", pReq->dstVgId, tstrerror(terrno));
×
278
    return ret;
×
279
  }
280

281
  ret = vnodeCommitInfo(dir);
30✔
282
  if (ret < 0) {
30!
283
    vError("vgId:%d, failed to commit vnode config since %s", pReq->dstVgId, tstrerror(terrno));
×
284
    return ret;
×
285
  }
286

287
  vInfo("vgId:%d, rename %s to %s", pReq->dstVgId, srcPath, dstPath);
30!
288
  ret = vnodeRenameVgroupId(srcPath, dstPath, pReq->srcVgId, pReq->dstVgId, diskPrimary, pTfs);
30✔
289
  if (ret < 0) {
30!
290
    vError("vgId:%d, failed to rename vnode from %s to %s since %s", pReq->dstVgId, srcPath, dstPath,
×
291
           tstrerror(terrno));
292
    return ret;
×
293
  }
294

295
  vInfo("vgId:%d, vnode hashrange is altered", info.config.vgId);
30!
296
  return 0;
30✔
297
}
298

299
int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId,
×
300
                             int32_t diskPrimary, STfs *pTfs) {
301
  SVnodeInfo info = {0};
×
302
  char       dir[TSDB_FILENAME_LEN] = {0};
×
303
  int32_t    code = 0;
×
304

305
  vnodeGetPrimaryDir(dstPath, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
×
306
  if (vnodeLoadInfo(dir, &info) == 0) {
×
307
    if (info.config.vgId != dstVgId) {
×
308
      vError("vgId:%d, unexpected vnode config.vgId:%d", dstVgId, info.config.vgId);
×
309
      return TSDB_CODE_FAILED;
×
310
    }
311
    return dstVgId;
×
312
  }
313

314
  vnodeGetPrimaryDir(srcPath, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
×
315
  if ((code = vnodeLoadInfo(dir, &info)) < 0) {
×
316
    vError("vgId:%d, failed to read vnode config from %s since %s", srcVgId, srcPath, tstrerror(terrno));
×
317
    return code;
×
318
  }
319

320
  if (info.config.vgId == srcVgId) {
×
321
    vInfo("vgId:%d, rollback alter hashrange", srcVgId);
×
322
    return srcVgId;
×
323
  } else if (info.config.vgId != dstVgId) {
×
324
    vError("vgId:%d, unexpected vnode config.vgId:%d", dstVgId, info.config.vgId);
×
325
    return TSDB_CODE_FAILED;
×
326
  }
327

328
  vInfo("vgId:%d, rename %s to %s", dstVgId, srcPath, dstPath);
×
329
  if (vnodeRenameVgroupId(srcPath, dstPath, srcVgId, dstVgId, diskPrimary, pTfs) < 0) {
×
330
    vError("vgId:%d, failed to rename vnode from %s to %s since %s", dstVgId, srcPath, dstPath, tstrerror(terrno));
×
331
    return TSDB_CODE_FAILED;
×
332
  }
333

334
  return dstVgId;
×
335
}
336

337
void vnodeDestroy(int32_t vgId, const char *path, STfs *pTfs, int32_t nodeId) {
3,494✔
338
  vInfo("path:%s is removed while destroy vnode", path);
3,494✔
339
  if (tfsRmdir(pTfs, path) < 0) {
3,494!
340
    vError("failed to remove path:%s since %s", path, tstrerror(terrno));
×
341
  }
342

343
  // int32_t nlevel = tfsGetLevel(pTfs);
344
#ifdef USE_S3
345
  if (nodeId > 0 && vgId > 0 /*&& nlevel > 1*/ && tsS3Enabled) {
3,494!
346
    char vnode_prefix[TSDB_FILENAME_LEN];
347
    snprintf(vnode_prefix, TSDB_FILENAME_LEN, "%d/v%df", nodeId, vgId);
×
348
    tcsDeleteObjectsByPrefix(vnode_prefix);
×
349
  }
350
#endif
351
}
3,494✔
352

353
static int32_t vnodeCheckDisk(int32_t diskPrimary, STfs *pTfs) {
11,694✔
354
  int32_t ndisk = 1;
11,694✔
355
  if (pTfs) {
11,694!
356
    ndisk = tfsGetDisksAtLevel(pTfs, 0);
11,694✔
357
  }
358
  if (diskPrimary < 0 || diskPrimary >= ndisk) {
11,692!
359
    vError("disk:%d is unavailable from the %d disks mounted at level 0", diskPrimary, ndisk);
×
360
    return terrno = TSDB_CODE_FS_INVLD_CFG;
×
361
  }
362
  return 0;
11,692✔
363
}
364

365
SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgCb, bool force) {
11,693✔
366
  SVnode    *pVnode = NULL;
11,693✔
367
  SVnodeInfo info = {0};
11,693✔
368
  char       dir[TSDB_FILENAME_LEN] = {0};
11,693✔
369
  char       tdir[TSDB_FILENAME_LEN * 2] = {0};
11,693✔
370
  int32_t    ret = 0;
11,693✔
371
  terrno = TSDB_CODE_SUCCESS;
11,693✔
372

373
  if (vnodeCheckDisk(diskPrimary, pTfs)) {
11,694!
374
    vError("failed to open vnode from %s since %s. diskPrimary:%d", path, terrstr(), diskPrimary);
×
375
    return NULL;
×
376
  }
377
  vnodeGetPrimaryDir(path, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
11,692✔
378

379
  info.config = vnodeCfgDefault;
11,696✔
380

381
  // load vnode info
382
  vInfo("vgId:%d, start to vnode load info %s", info.config.vgId, dir);
11,696✔
383
  ret = vnodeLoadInfo(dir, &info);
11,703✔
384
  if (ret < 0) {
11,694!
385
    vError("failed to open vnode from %s since %s", path, tstrerror(terrno));
×
386
    terrno = TSDB_CODE_NEED_RETRY;
×
387
    return NULL;
×
388
  }
389

390
  if (vnodeMkDir(pTfs, path)) {
11,694!
391
    vError("vgId:%d, failed to prepare vnode dir since %s, path: %s", info.config.vgId, strerror(ERRNO), path);
×
392
    return NULL;
×
393
  }
394
  // save vnode info on dnode ep changed
395
  bool      updated = false;
11,681✔
396
  SSyncCfg *pCfg = &info.config.syncCfg;
11,681✔
397
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
29,625✔
398
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
17,928✔
399
    if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) {
17,928!
400
      updated = true;
×
401
    }
402
  }
403
  if (updated) {
11,697!
404
    vInfo("vgId:%d, save vnode info since dnode info changed", info.config.vgId);
×
405
    if (vnodeSaveInfo(dir, &info) < 0) {
×
406
      vError("vgId:%d, failed to save vnode info since %s", info.config.vgId, tstrerror(terrno));
×
407
    }
408

409
    if (vnodeCommitInfo(dir) < 0) {
×
410
      vError("vgId:%d, failed to commit vnode info since %s", info.config.vgId, tstrerror(terrno));
×
411
    }
412
  }
413

414
  // create handle
415
  pVnode = taosMemoryCalloc(1, sizeof(*pVnode) + strlen(path) + 1);
11,684!
416
  if (pVnode == NULL) {
11,681!
417
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
418
    vError("vgId:%d, failed to open vnode since %s", info.config.vgId, tstrerror(terrno));
×
419
    return NULL;
×
420
  }
421

422
  pVnode->path = (char *)&pVnode[1];
11,681✔
423
  memcpy(pVnode->path, path, strlen(path) + 1);
11,681✔
424
  pVnode->config = info.config;
11,681✔
425
  pVnode->state.committed = info.state.committed;
11,681✔
426
  pVnode->state.commitTerm = info.state.commitTerm;
11,681✔
427
  pVnode->state.commitID = info.state.commitID;
11,681✔
428
  pVnode->state.applied = info.state.committed;
11,681✔
429
  pVnode->state.applyTerm = info.state.commitTerm;
11,681✔
430
  pVnode->pTfs = pTfs;
11,681✔
431
  pVnode->diskPrimary = diskPrimary;
11,681✔
432
  pVnode->msgCb = msgCb;
11,681✔
433
  (void)taosThreadMutexInit(&pVnode->lock, NULL);
11,681✔
434
  pVnode->blocked = false;
11,675✔
435
  pVnode->disableWrite = false;
11,675✔
436

437
  if (tsem_init(&pVnode->syncSem, 0, 0) != 0) {
11,675!
438
    vError("vgId:%d, failed to init semaphore", TD_VID(pVnode));
×
439
    goto _err;
×
440
  }
441
  (void)taosThreadMutexInit(&pVnode->mutex, NULL);
11,680✔
442
  (void)taosThreadCondInit(&pVnode->poolNotEmpty, NULL);
11,694✔
443

444
  int8_t rollback = vnodeShouldRollback(pVnode);
11,677✔
445

446
  // open buffer pool
447
  vInfo("vgId:%d, start to open vnode buffer pool", TD_VID(pVnode));
11,686✔
448
  if (vnodeOpenBufPool(pVnode) < 0) {
11,691!
449
    vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));
×
450
    goto _err;
×
451
  }
452

453
  // open meta
454
  (void)taosThreadRwlockInit(&pVnode->metaRWLock, NULL);
11,696✔
455
  vInfo("vgId:%d, start to open vnode meta", TD_VID(pVnode));
11,696✔
456
  if (metaOpen(pVnode, &pVnode->pMeta, rollback) < 0) {
11,696!
457
    vError("vgId:%d, failed to open vnode meta since %s", TD_VID(pVnode), tstrerror(terrno));
×
458
    goto _err;
×
459
  }
460

461
  vInfo("vgId:%d, start to upgrade meta", TD_VID(pVnode));
11,696✔
462
  if (metaUpgrade(pVnode, &pVnode->pMeta) < 0) {
11,696!
463
    vError("vgId:%d, failed to upgrade meta since %s", TD_VID(pVnode), tstrerror(terrno));
×
464
  }
465

466
  // open tsdb
467
  vInfo("vgId:%d, start to open vnode tsdb", TD_VID(pVnode));
11,696✔
468
  if (!VND_IS_RSMA(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, NULL, rollback, force) < 0) {
11,696!
469
    vError("vgId:%d, failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
×
470
    goto _err;
×
471
  }
472

473
  // open wal
474
  (void)tsnprintf(tdir, sizeof(tdir), "%s%s%s", dir, TD_DIRSEP, VNODE_WAL_DIR);
11,689✔
475
  ret = taosRealPath(tdir, NULL, sizeof(tdir));
11,690✔
476
  TAOS_UNUSED(ret);
477

478
  vInfo("vgId:%d, start to open vnode wal", TD_VID(pVnode));
11,690✔
479
  pVnode->pWal = walOpen(tdir, &(pVnode->config.walCfg));
11,694✔
480
  if (pVnode->pWal == NULL) {
11,696!
481
    vError("vgId:%d, failed to open vnode wal since %s. wal:%s", TD_VID(pVnode), tstrerror(terrno), tdir);
×
482
    goto _err;
×
483
  }
484

485
  // open tq
486
  (void)tsnprintf(tdir, sizeof(tdir), "%s%s%s", dir, TD_DIRSEP, VNODE_TQ_DIR);
11,696✔
487
  ret = taosRealPath(tdir, NULL, sizeof(tdir));
11,696✔
488
  TAOS_UNUSED(ret);
489

490
  // init handle map for stream event notification
491
  ret = tqInitNotifyHandleMap(&pVnode->pNotifyHandleMap);
11,693✔
492
  if (ret != TSDB_CODE_SUCCESS) {
11,693!
493
    vError("vgId:%d, failed to init StreamNotifyHandleMap", TD_VID(pVnode));
×
494
    terrno = ret;
×
495
    goto _err;
×
496
  }
497

498
  // open query
499
  vInfo("vgId:%d, start to open vnode query", TD_VID(pVnode));
11,693✔
500
  if (vnodeQueryOpen(pVnode)) {
11,696!
501
    vError("vgId:%d, failed to open vnode query since %s", TD_VID(pVnode), tstrerror(terrno));
×
502
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
503
    goto _err;
×
504
  }
505

506
  // sma required the tq is initialized before the vnode open
507
  vInfo("vgId:%d, start to open vnode tq", TD_VID(pVnode));
11,696✔
508
  if (tqOpen(tdir, pVnode)) {
11,696!
509
    vError("vgId:%d, failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno));
×
510
    goto _err;
×
511
  }
512

513
  // open sma
514
  vInfo("vgId:%d, start to open vnode sma", TD_VID(pVnode));
11,694✔
515
  if (smaOpen(pVnode, rollback, force)) {
11,695!
516
    vError("vgId:%d, failed to open vnode sma since %s", TD_VID(pVnode), tstrerror(terrno));
×
517
    goto _err;
×
518
  }
519

520
  // vnode begin
521
  vInfo("vgId:%d, start to begin vnode", TD_VID(pVnode));
11,696✔
522
  if (vnodeBegin(pVnode) < 0) {
11,696!
523
    vError("vgId:%d, failed to begin since %s", TD_VID(pVnode), tstrerror(terrno));
×
524
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
525
    goto _err;
×
526
  }
527

528
  // open sync
529
  vInfo("vgId:%d, start to open sync, changeVersion:%d", TD_VID(pVnode), info.config.syncCfg.changeVersion);
11,695✔
530
  if (vnodeSyncOpen(pVnode, dir, info.config.syncCfg.changeVersion)) {
11,695!
531
    vError("vgId:%d, failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno));
×
532
    goto _err;
×
533
  }
534

535
  if (rollback) {
11,696!
536
    vnodeRollback(pVnode);
×
537
  }
538

539
  snprintf(pVnode->monitor.strClusterId, TSDB_CLUSTER_ID_LEN, "%" PRId64, pVnode->config.syncCfg.nodeInfo[0].clusterId);
11,696✔
540
  snprintf(pVnode->monitor.strDnodeId, TSDB_NODE_ID_LEN, "%" PRId32, pVnode->config.syncCfg.nodeInfo[0].nodeId);
11,696✔
541
  snprintf(pVnode->monitor.strVgId, TSDB_VGROUP_ID_LEN, "%" PRId32, pVnode->config.vgId);
11,696✔
542

543
  return pVnode;
11,696✔
544

545
_err:
×
546
  if (pVnode->pQuery) vnodeQueryClose(pVnode);
×
547
  if (pVnode->pTq) tqClose(pVnode->pTq);
×
548
  if (pVnode->pWal) walClose(pVnode->pWal);
×
549
  if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
×
550
  if (pVnode->pSma) smaClose(pVnode->pSma);
×
551
  if (pVnode->pMeta) metaClose(&pVnode->pMeta);
×
552
  if (pVnode->freeList) vnodeCloseBufPool(pVnode);
×
553

554
  (void)taosThreadRwlockDestroy(&pVnode->metaRWLock);
×
555
  taosMemoryFree(pVnode);
×
556
  return NULL;
×
557
}
558

559
void vnodePreClose(SVnode *pVnode) {
11,696✔
560
  vnodeSyncPreClose(pVnode);
11,696✔
561
  vnodeQueryPreClose(pVnode);
11,696✔
562
}
11,688✔
563

564
void vnodePostClose(SVnode *pVnode) { vnodeSyncPostClose(pVnode); }
11,696✔
565

566
void vnodeClose(SVnode *pVnode) {
11,696✔
567
  if (pVnode) {
11,696!
568
    vnodeAWait(&pVnode->commitTask);
11,696✔
569
    vnodeSyncClose(pVnode);
11,695✔
570
    vnodeQueryClose(pVnode);
11,693✔
571
    tqDestroyNotifyHandleMap(&pVnode->pNotifyHandleMap);
11,696✔
572
    tqClose(pVnode->pTq);
11,696✔
573
    walClose(pVnode->pWal);
11,695✔
574
    if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
11,696!
575
    smaClose(pVnode->pSma);
11,689✔
576
    if (pVnode->pMeta) metaClose(&pVnode->pMeta);
11,692!
577
    vnodeCloseBufPool(pVnode);
11,695✔
578

579
    // destroy handle
580
    if (tsem_destroy(&pVnode->syncSem) != 0) {
11,696!
581
      vError("vgId:%d, failed to destroy semaphore", TD_VID(pVnode));
×
582
    }
583
    (void)taosThreadCondDestroy(&pVnode->poolNotEmpty);
11,696✔
584
    (void)taosThreadMutexDestroy(&pVnode->mutex);
11,696✔
585
    (void)taosThreadMutexDestroy(&pVnode->lock);
11,696✔
586
    taosMemoryFree(pVnode);
11,696!
587
  }
588
}
11,696✔
589

590
// start the sync timer after the queue is ready
591
int32_t vnodeStart(SVnode *pVnode) {
11,695✔
592
  if (pVnode == NULL) {
11,695!
593
    return TSDB_CODE_INVALID_PARA;
×
594
  }
595
  return vnodeSyncStart(pVnode);
11,695✔
596
}
597

598
int32_t vnodeIsCatchUp(SVnode *pVnode) { return syncIsCatchUp(pVnode->sync); }
3,840✔
599

600
ESyncRole vnodeGetRole(SVnode *pVnode) { return syncGetRole(pVnode->sync); }
3,840✔
601

602
int32_t vnodeUpdateArbTerm(SVnode *pVnode, int64_t arbTerm) { return syncUpdateArbTerm(pVnode->sync, arbTerm); }
88✔
603
int32_t vnodeGetArbToken(SVnode *pVnode, char *outToken) { return syncGetArbToken(pVnode->sync, outToken); }
105✔
604

605
void vnodeStop(SVnode *pVnode) {}
×
606

607
int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; }
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc