• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3541

26 Nov 2024 03:56AM UTC coverage: 60.776% (-0.07%) from 60.846%
#3541

push

travis-ci

web-flow
Merge pull request #28920 from taosdata/fix/TD-33008-3.0

fix(query)[TD-33008]. fix error handling in tsdbCacheRead

120076 of 252763 branches covered (47.51%)

Branch coverage included in aggregate %.

0 of 2 new or added lines in 1 file covered. (0.0%)

1395 existing lines in 154 files now uncovered.

200995 of 275526 relevant lines covered (72.95%)

19612328.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.32
/source/dnode/vnode/src/vnd/vnodeOpen.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "sync.h"
17
#include "tcs.h"
18
#include "tsdb.h"
19
#include "vnd.h"
20

21
void vnodeGetPrimaryDir(const char *relPath, int32_t diskPrimary, STfs *pTfs, char *buf, size_t bufLen) {
241,679✔
22
  if (pTfs) {
241,679!
23
    SDiskID diskId = {0};
241,693✔
24
    diskId.id = diskPrimary;
241,693✔
25
    snprintf(buf, bufLen - 1, "%s%s%s", tfsGetDiskPath(pTfs, diskId), TD_DIRSEP, relPath);
241,693✔
26
  } else {
27
    snprintf(buf, bufLen - 1, "%s", relPath);
×
28
  }
29
  buf[bufLen - 1] = '\0';
241,654✔
30
}
241,654✔
31

32
static int32_t vnodeMkDir(STfs *pTfs, const char *path) {
24,812✔
33
  if (pTfs) {
24,812!
34
    return tfsMkdirRecur(pTfs, path);
24,813✔
35
  } else {
36
    return taosMkDir(path);
×
37
  }
38
}
39

40
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, int32_t diskPrimary, STfs *pTfs) {
11,073✔
41
  int32_t    code = 0;
11,073✔
42
  SVnodeInfo info = {0};
11,073✔
43
  char       dir[TSDB_FILENAME_LEN] = {0};
11,073✔
44

45
  // check config
46
  if ((code = vnodeCheckCfg(pCfg)) < 0) {
11,073!
47
    vError("vgId:%d, failed to create vnode since:%s", pCfg->vgId, tstrerror(code));
×
48
    return code;
×
49
  }
50

51
  // create vnode env
52
  if (vnodeMkDir(pTfs, path)) {
11,116!
53
    vError("vgId:%d, failed to prepare vnode dir since %s, path: %s", pCfg->vgId, strerror(errno), path);
×
54
    return TAOS_SYSTEM_ERROR(errno);
×
55
  }
56
  vnodeGetPrimaryDir(path, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
11,117✔
57

58
  if (pCfg) {
11,117!
59
    info.config = *pCfg;
11,117✔
60
  } else {
61
    info.config = vnodeCfgDefault;
×
62
  }
63
  info.state.committed = -1;
11,117✔
64
  info.state.applied = -1;
11,117✔
65
  info.state.commitID = 0;
11,117✔
66

67
  SVnodeInfo oldInfo = {0};
11,117✔
68
  oldInfo.config = vnodeCfgDefault;
11,117✔
69
  if (vnodeLoadInfo(dir, &oldInfo) == 0) {
11,117!
70
    vWarn("vgId:%d, vnode config info already exists at %s.", oldInfo.config.vgId, dir);
×
71
    return (oldInfo.config.dbId == info.config.dbId) ? 0 : -1;
×
72
  }
73

74
  vInfo("vgId:%d, save config while create", info.config.vgId);
11,117!
75
  if ((code = vnodeSaveInfo(dir, &info)) < 0 || (code = vnodeCommitInfo(dir)) < 0) {
11,117!
76
    vError("vgId:%d, failed to save vnode config since %s", pCfg ? pCfg->vgId : 0, tstrerror(code));
×
77
    return code;
×
78
  }
79

80
  vInfo("vgId:%d, vnode is created", info.config.vgId);
11,117!
81
  return 0;
11,117✔
82
}
83

84
bool vnodeShouldRemoveWal(SVnode *pVnode) { return pVnode->config.walCfg.clearFiles == 1; }
1,427✔
85

86
int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, int32_t diskPrimary, STfs *pTfs) {
1,427✔
87
  SVnodeInfo info = {0};
1,427✔
88
  char       dir[TSDB_FILENAME_LEN] = {0};
1,427✔
89
  int32_t    ret = 0;
1,427✔
90

91
  vnodeGetPrimaryDir(path, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
1,427✔
92

93
  ret = vnodeLoadInfo(dir, &info);
1,427✔
94
  if (ret < 0) {
1,427!
95
    vError("vgId:%d, failed to read vnode config from %s since %s", pReq->vgId, path, tstrerror(terrno));
×
96
    return ret;
×
97
  }
98

99
  SSyncCfg *pCfg = &info.config.syncCfg;
1,427✔
100

101
  pCfg->replicaNum = 0;
1,427✔
102
  pCfg->totalReplicaNum = 0;
1,427✔
103
  memset(&pCfg->nodeInfo, 0, sizeof(pCfg->nodeInfo));
1,427✔
104

105
  for (int i = 0; i < pReq->replica; ++i) {
5,363✔
106
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
3,936✔
107
    pNode->nodeId = pReq->replicas[i].id;
3,936✔
108
    pNode->nodePort = pReq->replicas[i].port;
3,936✔
109
    tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
3,936✔
110
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
3,936✔
111
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
3,936✔
112
    vInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId);
3,936!
113
    pCfg->replicaNum++;
3,936✔
114
  }
115
  if (pReq->selfIndex != -1) {
1,427!
116
    pCfg->myIndex = pReq->selfIndex;
1,427✔
117
  }
118
  for (int i = pCfg->replicaNum; i < pReq->replica + pReq->learnerReplica; ++i) {
1,767✔
119
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
340✔
120
    pNode->nodeId = pReq->learnerReplicas[pCfg->totalReplicaNum].id;
340✔
121
    pNode->nodePort = pReq->learnerReplicas[pCfg->totalReplicaNum].port;
340✔
122
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
340✔
123
    tstrncpy(pNode->nodeFqdn, pReq->learnerReplicas[pCfg->totalReplicaNum].fqdn, sizeof(pNode->nodeFqdn));
340✔
124
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
340✔
125
    vInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId);
340!
126
    pCfg->totalReplicaNum++;
340✔
127
  }
128
  pCfg->totalReplicaNum += pReq->replica;
1,427✔
129
  if (pReq->learnerSelfIndex != -1) {
1,427!
130
    pCfg->myIndex = pReq->replica + pReq->learnerSelfIndex;
×
131
  }
132
  pCfg->changeVersion = pReq->changeVersion;
1,427✔
133

134
  if (info.config.walCfg.clearFiles) {
1,427!
135
    info.config.walCfg.clearFiles = 0;
×
136

137
    vInfo("vgId:%d, reset wal clearFiles", pReq->vgId);
×
138
  }
139

140
  vInfo("vgId:%d, save config while alter, replicas:%d totalReplicas:%d selfIndex:%d changeVersion:%d", pReq->vgId,
1,427!
141
        pCfg->replicaNum, pCfg->totalReplicaNum, pCfg->myIndex, pCfg->changeVersion);
142

143
  info.config.syncCfg = *pCfg;
1,427✔
144
  ret = vnodeSaveInfo(dir, &info);
1,427✔
145
  if (ret < 0) {
1,427!
146
    vError("vgId:%d, failed to save vnode config since %s", pReq->vgId, tstrerror(terrno));
×
147
    return ret;
×
148
  }
149

150
  ret = vnodeCommitInfo(dir);
1,427✔
151
  if (ret < 0) {
1,427!
152
    vError("vgId:%d, failed to commit vnode config since %s", pReq->vgId, tstrerror(terrno));
×
153
    return ret;
×
154
  }
155

156
  vInfo("vgId:%d, vnode config is saved", info.config.vgId);
1,427!
157
  return 0;
1,427✔
158
}
159

160
static int32_t vnodeVgroupIdLen(int32_t vgId) {
370✔
161
  char tmp[TSDB_FILENAME_LEN];
162
  sprintf(tmp, "%d", vgId);
370✔
163
  return strlen(tmp);
370✔
164
}
165

166
int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId,
86✔
167
                            int32_t diskPrimary, STfs *pTfs) {
168
  int32_t ret = 0;
86✔
169

170
  char oldRname[TSDB_FILENAME_LEN] = {0};
86✔
171
  char newRname[TSDB_FILENAME_LEN] = {0};
86✔
172
  char tsdbPath[TSDB_FILENAME_LEN] = {0};
86✔
173
  char tsdbFilePrefix[TSDB_FILENAME_LEN] = {0};
86✔
174
  snprintf(tsdbPath, TSDB_FILENAME_LEN, "%s%stsdb", srcPath, TD_DIRSEP);
86✔
175
  snprintf(tsdbFilePrefix, TSDB_FILENAME_LEN, "tsdb%sv", TD_DIRSEP);
86✔
176
  int32_t prefixLen = strlen(tsdbFilePrefix);
86✔
177

178
  STfsDir *tsdbDir = NULL;
86✔
179
  int32_t  tret = tfsOpendir(pTfs, tsdbPath, &tsdbDir);
86✔
180
  if (tsdbDir == NULL) {
86!
181
    return 0;
×
182
  }
183

184
  while (1) {
542✔
185
    const STfsFile *tsdbFile = tfsReaddir(tsdbDir);
628✔
186
    if (tsdbFile == NULL) break;
628✔
187
    if (tsdbFile->rname[0] == '\0') continue;
542!
188
    tstrncpy(oldRname, tsdbFile->rname, TSDB_FILENAME_LEN);
542✔
189

190
    char *tsdbFilePrefixPos = strstr(oldRname, tsdbFilePrefix);
542✔
191
    if (tsdbFilePrefixPos == NULL) continue;
542✔
192

193
    int32_t tsdbFileVgId = atoi(tsdbFilePrefixPos + prefixLen);
370✔
194
    if (tsdbFileVgId == srcVgId) {
370!
195
      char *tsdbFileSurfixPos = tsdbFilePrefixPos + prefixLen + vnodeVgroupIdLen(srcVgId);
370✔
196

197
      tsdbFilePrefixPos[prefixLen] = 0;
370✔
198
      snprintf(newRname, TSDB_FILENAME_LEN, "%s%d%s", oldRname, dstVgId, tsdbFileSurfixPos);
370✔
199
      vInfo("vgId:%d, rename file from %s to %s", dstVgId, tsdbFile->rname, newRname);
370!
200

201
      ret = tfsRename(pTfs, diskPrimary, tsdbFile->rname, newRname);
370✔
202
      if (ret != 0) {
370!
203
        vError("vgId:%d, failed to rename file from %s to %s since %s", dstVgId, tsdbFile->rname, newRname, terrstr());
×
204
        tfsClosedir(tsdbDir);
×
205
        return ret;
×
206
      }
207
    }
208
  }
209

210
  tfsClosedir(tsdbDir);
86✔
211

212
  vInfo("vgId:%d, rename dir from %s to %s", dstVgId, srcPath, dstPath);
86!
213
  ret = tfsRename(pTfs, diskPrimary, srcPath, dstPath);
86✔
214
  if (ret != 0) {
86!
215
    vError("vgId:%d, failed to rename dir from %s to %s since %s", dstVgId, srcPath, dstPath, terrstr());
×
216
  }
217
  return ret;
86✔
218
}
219

220
int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq,
86✔
221
                            int32_t diskPrimary, STfs *pTfs) {
222
  SVnodeInfo info = {0};
86✔
223
  char       dir[TSDB_FILENAME_LEN] = {0};
86✔
224
  int32_t    ret = 0;
86✔
225

226
  vnodeGetPrimaryDir(srcPath, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
86✔
227

228
  ret = vnodeLoadInfo(dir, &info);
86✔
229
  if (ret < 0) {
86!
230
    vError("vgId:%d, failed to read vnode config from %s since %s", pReq->srcVgId, srcPath, tstrerror(terrno));
×
231
    return ret;
×
232
  }
233

234
  vInfo("vgId:%d, alter hashrange from [%u, %u] to [%u, %u]", pReq->srcVgId, info.config.hashBegin, info.config.hashEnd,
86!
235
        pReq->hashBegin, pReq->hashEnd);
236
  info.config.vgId = pReq->dstVgId;
86✔
237
  info.config.hashBegin = pReq->hashBegin;
86✔
238
  info.config.hashEnd = pReq->hashEnd;
86✔
239
  info.config.hashChange = true;
86✔
240
  info.config.walCfg.vgId = pReq->dstVgId;
86✔
241
  info.config.syncCfg.changeVersion = pReq->changeVersion;
86✔
242

243
  SSyncCfg *pCfg = &info.config.syncCfg;
86✔
244
  pCfg->myIndex = 0;
86✔
245
  pCfg->replicaNum = 1;
86✔
246
  pCfg->totalReplicaNum = 1;
86✔
247
  memset(&pCfg->nodeInfo, 0, sizeof(pCfg->nodeInfo));
86✔
248

249
  vInfo("vgId:%d, alter vnode replicas to 1", pReq->srcVgId);
86!
250
  SNodeInfo *pNode = &pCfg->nodeInfo[0];
86✔
251
  pNode->nodePort = tsServerPort;
86✔
252
  tstrncpy(pNode->nodeFqdn, tsLocalFqdn, TSDB_FQDN_LEN);
86✔
253
  bool ret1 = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
86✔
254
  vInfo("vgId:%d, ep:%s:%u dnode:%d", pReq->srcVgId, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId);
86!
255

256
  info.config.syncCfg = *pCfg;
86✔
257

258
  ret = vnodeSaveInfo(dir, &info);
86✔
259
  if (ret < 0) {
86!
260
    vError("vgId:%d, failed to save vnode config since %s", pReq->dstVgId, tstrerror(terrno));
×
261
    return ret;
×
262
  }
263

264
  ret = vnodeCommitInfo(dir);
86✔
265
  if (ret < 0) {
86!
266
    vError("vgId:%d, failed to commit vnode config since %s", pReq->dstVgId, tstrerror(terrno));
×
267
    return ret;
×
268
  }
269

270
  vInfo("vgId:%d, rename %s to %s", pReq->dstVgId, srcPath, dstPath);
86!
271
  ret = vnodeRenameVgroupId(srcPath, dstPath, pReq->srcVgId, pReq->dstVgId, diskPrimary, pTfs);
86✔
272
  if (ret < 0) {
86!
273
    vError("vgId:%d, failed to rename vnode from %s to %s since %s", pReq->dstVgId, srcPath, dstPath,
×
274
           tstrerror(terrno));
275
    return ret;
×
276
  }
277

278
  vInfo("vgId:%d, vnode hashrange is altered", info.config.vgId);
86!
279
  return 0;
86✔
280
}
281

282
int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId,
×
283
                             int32_t diskPrimary, STfs *pTfs) {
284
  SVnodeInfo info = {0};
×
285
  char       dir[TSDB_FILENAME_LEN] = {0};
×
286
  int32_t    code = 0;
×
287

288
  vnodeGetPrimaryDir(dstPath, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
×
289
  if (vnodeLoadInfo(dir, &info) == 0) {
×
290
    if (info.config.vgId != dstVgId) {
×
291
      vError("vgId:%d, unexpected vnode config.vgId:%d", dstVgId, info.config.vgId);
×
292
      return TSDB_CODE_FAILED;
×
293
    }
294
    return dstVgId;
×
295
  }
296

297
  vnodeGetPrimaryDir(srcPath, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
×
298
  if ((code = vnodeLoadInfo(dir, &info)) < 0) {
×
299
    vError("vgId:%d, failed to read vnode config from %s since %s", srcVgId, srcPath, tstrerror(terrno));
×
300
    return code;
×
301
  }
302

303
  if (info.config.vgId == srcVgId) {
×
304
    vInfo("vgId:%d, rollback alter hashrange", srcVgId);
×
305
    return srcVgId;
×
306
  } else if (info.config.vgId != dstVgId) {
×
307
    vError("vgId:%d, unexpected vnode config.vgId:%d", dstVgId, info.config.vgId);
×
308
    return TSDB_CODE_FAILED;
×
309
  }
310

311
  vInfo("vgId:%d, rename %s to %s", dstVgId, srcPath, dstPath);
×
312
  if (vnodeRenameVgroupId(srcPath, dstPath, srcVgId, dstVgId, diskPrimary, pTfs) < 0) {
×
313
    vError("vgId:%d, failed to rename vnode from %s to %s since %s", dstVgId, srcPath, dstPath, tstrerror(terrno));
×
314
    return TSDB_CODE_FAILED;
×
315
  }
316

317
  return dstVgId;
×
318
}
319

320
void vnodeDestroy(int32_t vgId, const char *path, STfs *pTfs, int32_t nodeId) {
5,177✔
321
  vInfo("path:%s is removed while destroy vnode", path);
5,177!
322
  if (tfsRmdir(pTfs, path) < 0) {
5,177!
323
    vError("failed to remove path:%s since %s", path, tstrerror(terrno));
×
324
  }
325

326
  // int32_t nlevel = tfsGetLevel(pTfs);
327
  if (nodeId > 0 && vgId > 0 /*&& nlevel > 1*/ && tsS3Enabled) {
5,177!
328
    char vnode_prefix[TSDB_FILENAME_LEN];
329
    snprintf(vnode_prefix, TSDB_FILENAME_LEN, "%d/v%df", nodeId, vgId);
×
330
    tcsDeleteObjectsByPrefix(vnode_prefix);
×
331
  }
332
}
5,177✔
333

334
static int32_t vnodeCheckDisk(int32_t diskPrimary, STfs *pTfs) {
13,684✔
335
  int32_t ndisk = 1;
13,684✔
336
  if (pTfs) {
13,684!
337
    ndisk = tfsGetDisksAtLevel(pTfs, 0);
13,702✔
338
  }
339
  if (diskPrimary < 0 || diskPrimary >= ndisk) {
13,684!
340
    vError("disk:%d is unavailable from the %d disks mounted at level 0", diskPrimary, ndisk);
×
341
    return terrno = TSDB_CODE_FS_INVLD_CFG;
×
342
  }
343
  return 0;
13,702✔
344
}
345

346
SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgCb, bool force) {
13,699✔
347
  SVnode    *pVnode = NULL;
13,699✔
348
  SVnodeInfo info = {0};
13,699✔
349
  char       dir[TSDB_FILENAME_LEN] = {0};
13,699✔
350
  char       tdir[TSDB_FILENAME_LEN * 2] = {0};
13,699✔
351
  int32_t    ret = 0;
13,699✔
352
  terrno = TSDB_CODE_SUCCESS;
13,699✔
353

354
  if (vnodeCheckDisk(diskPrimary, pTfs)) {
13,702!
355
    vError("failed to open vnode from %s since %s. diskPrimary:%d", path, terrstr(), diskPrimary);
×
356
    return NULL;
×
357
  }
358
  vnodeGetPrimaryDir(path, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
13,702✔
359

360
  info.config = vnodeCfgDefault;
13,700✔
361

362
  // load vnode info
363
  vInfo("vgId:%d, start to vnode load info %s", info.config.vgId, dir);
13,700✔
364
  ret = vnodeLoadInfo(dir, &info);
13,712✔
365
  if (ret < 0) {
13,700!
366
    vError("failed to open vnode from %s since %s", path, tstrerror(terrno));
×
367
    terrno = TSDB_CODE_NEED_RETRY;
×
UNCOV
368
    return NULL;
×
369
  }
370

371
  if (vnodeMkDir(pTfs, path)) {
13,700!
372
    vError("vgId:%d, failed to prepare vnode dir since %s, path: %s", info.config.vgId, strerror(errno), path);
×
UNCOV
373
    return NULL;
×
374
  }
375
  // save vnode info on dnode ep changed
376
  bool      updated = false;
13,675✔
377
  SSyncCfg *pCfg = &info.config.syncCfg;
13,675✔
378
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
33,956✔
379
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
20,248✔
380
    if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) {
20,248!
UNCOV
381
      updated = true;
×
382
    }
383
  }
384
  if (updated) {
13,708!
385
    vInfo("vgId:%d, save vnode info since dnode info changed", info.config.vgId);
×
386
    if (vnodeSaveInfo(dir, &info) < 0) {
×
UNCOV
387
      vError("vgId:%d, failed to save vnode info since %s", info.config.vgId, tstrerror(terrno));
×
388
    }
389

390
    if (vnodeCommitInfo(dir) < 0) {
×
UNCOV
391
      vError("vgId:%d, failed to commit vnode info since %s", info.config.vgId, tstrerror(terrno));
×
392
    }
393
  }
394

395
  // create handle
396
  pVnode = taosMemoryCalloc(1, sizeof(*pVnode) + strlen(path) + 1);
13,708✔
397
  if (pVnode == NULL) {
13,670!
398
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
399
    vError("vgId:%d, failed to open vnode since %s", info.config.vgId, tstrerror(terrno));
×
UNCOV
400
    return NULL;
×
401
  }
402

403
  pVnode->path = (char *)&pVnode[1];
13,670✔
404
  strcpy(pVnode->path, path);
13,670✔
405
  pVnode->config = info.config;
13,670✔
406
  pVnode->state.committed = info.state.committed;
13,670✔
407
  pVnode->state.commitTerm = info.state.commitTerm;
13,670✔
408
  pVnode->state.commitID = info.state.commitID;
13,670✔
409
  pVnode->state.applied = info.state.committed;
13,670✔
410
  pVnode->state.applyTerm = info.state.commitTerm;
13,670✔
411
  pVnode->pTfs = pTfs;
13,670✔
412
  pVnode->diskPrimary = diskPrimary;
13,670✔
413
  pVnode->msgCb = msgCb;
13,670✔
414
  (void)taosThreadMutexInit(&pVnode->lock, NULL);
13,670✔
415
  pVnode->blocked = false;
13,689✔
416
  pVnode->disableWrite = false;
13,689✔
417

418
  if (tsem_init(&pVnode->syncSem, 0, 0) != 0) {
13,689!
419
    vError("vgId:%d, failed to init semaphore", TD_VID(pVnode));
×
UNCOV
420
    goto _err;
×
421
  }
422
  (void)taosThreadMutexInit(&pVnode->mutex, NULL);
13,663✔
423
  (void)taosThreadCondInit(&pVnode->poolNotEmpty, NULL);
13,692✔
424

425
  if (vnodeAChannelInit(1, &pVnode->commitChannel) != 0) {
13,682!
426
    vError("vgId:%d, failed to init commit channel", TD_VID(pVnode));
×
UNCOV
427
    goto _err;
×
428
  }
429

430
  int8_t rollback = vnodeShouldRollback(pVnode);
13,702✔
431

432
  // open buffer pool
433
  vInfo("vgId:%d, start to open vnode buffer pool", TD_VID(pVnode));
13,685!
434
  if (vnodeOpenBufPool(pVnode) < 0) {
13,696!
UNCOV
435
    vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));
×
UNCOV
436
    goto _err;
×
437
  }
438

439
  // open meta
440
  vInfo("vgId:%d, start to open vnode meta", TD_VID(pVnode));
13,702✔
441
  if (metaOpen(pVnode, &pVnode->pMeta, rollback) < 0) {
13,703!
UNCOV
442
    vError("vgId:%d, failed to open vnode meta since %s", TD_VID(pVnode), tstrerror(terrno));
×
UNCOV
443
    goto _err;
×
444
  }
445

446
  vInfo("vgId:%d, start to upgrade meta", TD_VID(pVnode));
13,702!
447
  if (metaUpgrade(pVnode, &pVnode->pMeta) < 0) {
13,702!
UNCOV
448
    vError("vgId:%d, failed to upgrade meta since %s", TD_VID(pVnode), tstrerror(terrno));
×
449
  }
450

451
  // open tsdb
452
  vInfo("vgId:%d, start to open vnode tsdb", TD_VID(pVnode));
13,701!
453
  if (!VND_IS_RSMA(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, NULL, rollback, force) < 0) {
13,702!
UNCOV
454
    vError("vgId:%d, failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
×
UNCOV
455
    goto _err;
×
456
  }
457

458
  // open wal
459
  sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_WAL_DIR);
13,701✔
460
  ret = taosRealPath(tdir, NULL, sizeof(tdir));
13,701✔
461
  TAOS_UNUSED(ret);
462

463
  vInfo("vgId:%d, start to open vnode wal", TD_VID(pVnode));
13,697!
464
  pVnode->pWal = walOpen(tdir, &(pVnode->config.walCfg));
13,702✔
465
  if (pVnode->pWal == NULL) {
13,701!
UNCOV
466
    vError("vgId:%d, failed to open vnode wal since %s. wal:%s", TD_VID(pVnode), tstrerror(terrno), tdir);
×
UNCOV
467
    goto _err;
×
468
  }
469

470
  // open tq
471
  sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TQ_DIR);
13,701✔
472
  ret = taosRealPath(tdir, NULL, sizeof(tdir));
13,701✔
473
  TAOS_UNUSED(ret);
474

475
  // open query
476
  vInfo("vgId:%d, start to open vnode query", TD_VID(pVnode));
13,701!
477
  if (vnodeQueryOpen(pVnode)) {
13,702!
478
    vError("vgId:%d, failed to open vnode query since %s", TD_VID(pVnode), tstrerror(terrno));
×
479
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
480
    goto _err;
×
481
  }
482

483
  // sma required the tq is initialized before the vnode open
484
  vInfo("vgId:%d, start to open vnode tq", TD_VID(pVnode));
13,702!
485
  if (tqOpen(tdir, pVnode)) {
13,702!
UNCOV
486
    vError("vgId:%d, failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno));
×
UNCOV
487
    goto _err;
×
488
  }
489

490
  // open sma
491
  vInfo("vgId:%d, start to open vnode sma", TD_VID(pVnode));
13,702!
492
  if (smaOpen(pVnode, rollback, force)) {
13,702!
UNCOV
493
    vError("vgId:%d, failed to open vnode sma since %s", TD_VID(pVnode), tstrerror(terrno));
×
UNCOV
494
    goto _err;
×
495
  }
496

497
  // vnode begin
498
  vInfo("vgId:%d, start to begin vnode", TD_VID(pVnode));
13,702!
499
  if (vnodeBegin(pVnode) < 0) {
13,702!
UNCOV
500
    vError("vgId:%d, failed to begin since %s", TD_VID(pVnode), tstrerror(terrno));
×
UNCOV
501
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
502
    goto _err;
×
503
  }
504

505
  // open sync
506
  vInfo("vgId:%d, start to open sync, changeVersion:%d", TD_VID(pVnode), info.config.syncCfg.changeVersion);
13,702!
507
  if (vnodeSyncOpen(pVnode, dir, info.config.syncCfg.changeVersion)) {
13,702!
UNCOV
508
    vError("vgId:%d, failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno));
×
UNCOV
509
    goto _err;
×
510
  }
511

512
  if (rollback) {
13,702!
513
    vnodeRollback(pVnode);
×
514
  }
515

516
  snprintf(pVnode->monitor.strClusterId, TSDB_CLUSTER_ID_LEN, "%" PRId64, pVnode->config.syncCfg.nodeInfo[0].clusterId);
13,702✔
517
  snprintf(pVnode->monitor.strDnodeId, TSDB_NODE_ID_LEN, "%" PRId32, pVnode->config.syncCfg.nodeInfo[0].nodeId);
13,702✔
518
  snprintf(pVnode->monitor.strVgId, TSDB_VGROUP_ID_LEN, "%" PRId32, pVnode->config.vgId);
13,702✔
519

520
  return pVnode;
13,702✔
521

522
_err:
×
UNCOV
523
  if (pVnode->pQuery) vnodeQueryClose(pVnode);
×
UNCOV
524
  if (pVnode->pTq) tqClose(pVnode->pTq);
×
UNCOV
525
  if (pVnode->pWal) walClose(pVnode->pWal);
×
UNCOV
526
  if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
×
UNCOV
527
  if (pVnode->pSma) smaClose(pVnode->pSma);
×
UNCOV
528
  if (pVnode->pMeta) metaClose(&pVnode->pMeta);
×
UNCOV
529
  if (pVnode->freeList) vnodeCloseBufPool(pVnode);
×
530

UNCOV
531
  taosMemoryFree(pVnode);
×
UNCOV
532
  return NULL;
×
533
}
534

535
void vnodePreClose(SVnode *pVnode) {
13,699✔
536
  vnodeSyncPreClose(pVnode);
13,699✔
537
  vnodeQueryPreClose(pVnode);
13,701✔
538
}
13,695✔
539

540
void vnodePostClose(SVnode *pVnode) { vnodeSyncPostClose(pVnode); }
13,702✔
541

542
void vnodeClose(SVnode *pVnode) {
13,701✔
543
  if (pVnode) {
13,701!
544
    vnodeAWait(&pVnode->commitTask);
13,701✔
545
    if (vnodeAChannelDestroy(&pVnode->commitChannel, true) != 0) {
13,702!
UNCOV
546
      vError("vgId:%d, failed to destroy commit channel", TD_VID(pVnode));
×
547
    }
548

549
    vnodeSyncClose(pVnode);
13,701✔
550
    vnodeQueryClose(pVnode);
13,699✔
551
    tqClose(pVnode->pTq);
13,702✔
552
    walClose(pVnode->pWal);
13,702✔
553
    if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
13,702!
554
    smaClose(pVnode->pSma);
13,698✔
555
    if (pVnode->pMeta) metaClose(&pVnode->pMeta);
13,695!
556
    vnodeCloseBufPool(pVnode);
13,702✔
557

558
    // destroy handle
559
    if (tsem_destroy(&pVnode->syncSem) != 0) {
13,702!
UNCOV
560
      vError("vgId:%d, failed to destroy semaphore", TD_VID(pVnode));
×
561
    }
562
    (void)taosThreadCondDestroy(&pVnode->poolNotEmpty);
13,702✔
563
    (void)taosThreadMutexDestroy(&pVnode->mutex);
13,702✔
564
    (void)taosThreadMutexDestroy(&pVnode->lock);
13,702✔
565
    taosMemoryFree(pVnode);
13,702✔
566
  }
567
}
13,702✔
568

569
// start the sync timer after the queue is ready
570
int32_t vnodeStart(SVnode *pVnode) {
13,702✔
571
  if (pVnode == NULL) {
13,702!
UNCOV
572
    return TSDB_CODE_INVALID_PARA;
×
573
  }
574
  return vnodeSyncStart(pVnode);
13,702✔
575
}
576

577
int32_t vnodeIsCatchUp(SVnode *pVnode) { return syncIsCatchUp(pVnode->sync); }
5,250✔
578

579
ESyncRole vnodeGetRole(SVnode *pVnode) { return syncGetRole(pVnode->sync); }
5,250✔
580

581
int32_t vnodeUpdateArbTerm(SVnode *pVnode, int64_t arbTerm) { return syncUpdateArbTerm(pVnode->sync, arbTerm); }
18✔
582
int32_t vnodeGetArbToken(SVnode *pVnode, char *outToken) { return syncGetArbToken(pVnode->sync, outToken); }
18✔
583

UNCOV
584
void vnodeStop(SVnode *pVnode) {}
×
585

UNCOV
586
int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; }
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc