• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5069

17 May 2026 01:15AM UTC coverage: 73.389% (+0.02%) from 73.368%
#5069

push

travis-ci

web-flow
feat (TDgpt): Dynamic Model Synchronization Enhancements (#35344)

* refactor: do some internal refactor.

* fix: fix multiprocess sync issue.

* feat: add dynamic anomaly detection and forecasting services

* fix: log error message for undeploying model in exception handling

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* fix: handle undeploy when model exists only on disk

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/286aafa0-c3ce-4c27-b803-2707571e9dc1

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: guard dynamic registry concurrent access

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: tighten service list locking scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: restore prophet support and update tests per review feedback

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: improve test name and move copy inside lock scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* Potential fix for pull request finding

Co-au... (continued)

281664 of 383795 relevant lines covered (73.39%)

139049899.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.41
/source/dnode/vnode/src/vnd/vnodeOpen.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "sync.h"
17
#include "tss.h"
18
#include "tq.h"
19
#include "tsdb.h"
20
#include "vnd.h"
21

22
void vnodeGetPrimaryDir(const char *relPath, int32_t diskPrimary, STfs *pTfs, char *buf, size_t bufLen) {
93,617,563✔
23
  if (pTfs) {
93,617,563✔
24
    SDiskID diskId = {0};
93,626,029✔
25
    diskId.id = diskPrimary;
93,626,029✔
26
    snprintf(buf, bufLen - 1, "%s%s%s", tfsGetDiskPath(pTfs, diskId), TD_DIRSEP, relPath);
93,626,029✔
27
  } else {
28
    snprintf(buf, bufLen - 1, "%s", relPath);
×
29
  }
30
  buf[bufLen - 1] = '\0';
93,617,084✔
31
}
93,653,036✔
32

33
void vnodeGetPrimaryPath(SVnode *pVnode, bool mount, char *buf, size_t bufLen) {
84,208,699✔
34
  if (pVnode->mounted) {
84,208,699✔
35
    if (mount) {  // mount path
30,492✔
36
      SDiskID diskId = {0};
×
37
      diskId.id = pVnode->diskPrimary;
×
38
      snprintf(buf, bufLen - 1, "%s%svnode%svnode%d", tfsGetDiskPath(pVnode->pMountTfs, diskId), TD_DIRSEP, TD_DIRSEP,
×
39
               pVnode->config.mountVgId);
40
    } else {  // host path
41
      vnodeGetPrimaryDir(pVnode->path, 0, pVnode->pTfs, buf, bufLen);
30,492✔
42
    }
43
    buf[bufLen - 1] = '\0';
30,492✔
44

45
  } else {
46
    vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, buf, bufLen);
84,182,096✔
47
  }
48
}
84,216,461✔
49

50
static int32_t vnodeMkDir(STfs *pTfs, const char *path) {
8,514,034✔
51
  if (pTfs) {
8,514,034✔
52
    return tfsMkdirRecur(pTfs, path);
8,524,798✔
53
  } else {
54
    return taosMkDir(path);
×
55
  }
56
}
57

58
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, int32_t diskPrimary, STfs *pTfs) {
3,622,760✔
59
  int32_t    code = 0;
3,622,760✔
60
  SVnodeInfo info = {0};
3,622,760✔
61
  char       dir[TSDB_FILENAME_LEN] = {0};
3,626,530✔
62

63
  // check config
64
  if ((code = vnodeCheckCfg(pCfg)) < 0) {
3,626,530✔
65
    vError("vgId:%d, failed to create vnode since:%s", pCfg->vgId, tstrerror(code));
×
66
    return code;
×
67
  }
68

69
  // create vnode env
70
  if (vnodeMkDir(pTfs, path)) {
3,625,949✔
71
    vError("vgId:%d, failed to prepare vnode dir since %s, path: %s", pCfg->vgId, strerror(ERRNO), path);
×
72
    return TAOS_SYSTEM_ERROR(ERRNO);
×
73
  }
74
  vnodeGetPrimaryDir(path, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
3,626,530✔
75

76
  if (pCfg) {
3,626,530✔
77
    info.config = *pCfg;
3,626,530✔
78
  } else {
79
    info.config = vnodeCfgDefault;
×
80
  }
81
  info.state.committed = -1;
3,626,530✔
82
  info.state.applied = -1;
3,626,530✔
83
  info.state.commitID = 0;
3,626,530✔
84

85
  SVnodeInfo oldInfo = {0};
3,626,530✔
86
  oldInfo.config = vnodeCfgDefault;
3,626,530✔
87
  if (vnodeLoadInfo(dir, &oldInfo) == 0) {
3,626,530✔
88
    code = (oldInfo.config.dbId == info.config.dbId) ? 0 : TSDB_CODE_VND_ALREADY_EXIST_BUT_NOT_MATCH;
×
89
    if (code == 0) {
×
90
      vWarn("vgId:%d, vnode config info already exists at %s.", oldInfo.config.vgId, dir);
×
91
    } else {
92
      vError("vgId:%d, vnode config info already exists at %s. oldDbId:%" PRId64 "(%s) at cluster:%" PRId64
×
93
             ", newDbId:%" PRId64 "(%s) at cluser:%" PRId64 ", code:%s",
94
             oldInfo.config.vgId, dir, oldInfo.config.dbId, oldInfo.config.dbname,
95
             oldInfo.config.syncCfg.nodeInfo[oldInfo.config.syncCfg.myIndex].clusterId, info.config.dbId,
96
             info.config.dbname, info.config.syncCfg.nodeInfo[info.config.syncCfg.myIndex].clusterId, tstrerror(code));
97
    }
98
    return code;
×
99
  }
100

101
  vInfo("vgId:%d, save config while create", info.config.vgId);
3,626,530✔
102
  if ((code = vnodeSaveInfo(dir, &info)) < 0 || (code = vnodeCommitInfo(dir)) < 0) {
3,626,530✔
103
    vError("vgId:%d, failed to save vnode config since %s", pCfg ? pCfg->vgId : 0, tstrerror(code));
×
104
    return code;
×
105
  }
106

107
  vInfo("vgId:%d, vnode is created", info.config.vgId);
3,626,530✔
108
  return 0;
3,626,530✔
109
}
110

111
bool vnodeShouldRemoveWal(SVnode *pVnode) { return pVnode->config.walCfg.clearFiles == 1; }
867,829✔
112

113
int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, int32_t diskPrimary, STfs *pTfs) {
867,829✔
114
  SVnodeInfo info = {0};
867,829✔
115
  char       dir[TSDB_FILENAME_LEN] = {0};
867,829✔
116
  int32_t    ret = 0;
867,829✔
117

118
  vnodeGetPrimaryDir(path, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
867,829✔
119

120
  ret = vnodeLoadInfo(dir, &info);
867,829✔
121
  if (ret < 0) {
867,829✔
122
    vError("vgId:%d, failed to read vnode config from %s since %s", pReq->vgId, path, tstrerror(terrno));
×
123
    return ret;
×
124
  }
125

126
  SSyncCfg *pCfg = &info.config.syncCfg;
867,829✔
127

128
  pCfg->replicaNum = 0;
867,829✔
129
  pCfg->totalReplicaNum = 0;
867,829✔
130
  memset(&pCfg->nodeInfo, 0, sizeof(pCfg->nodeInfo));
867,829✔
131

132
  for (int i = 0; i < pReq->replica; ++i) {
3,391,163✔
133
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
2,523,334✔
134
    pNode->nodeId = pReq->replicas[i].id;
2,523,334✔
135
    pNode->nodePort = pReq->replicas[i].port;
2,523,334✔
136
    tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
2,523,334✔
137
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
2,523,334✔
138
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
2,523,334✔
139
    vInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId);
2,523,334✔
140
    pCfg->replicaNum++;
2,523,334✔
141
  }
142
  if (pReq->selfIndex != -1) {
867,829✔
143
    pCfg->myIndex = pReq->selfIndex;
867,829✔
144
  }
145
  for (int i = pCfg->replicaNum; i < pReq->replica + pReq->learnerReplica; ++i) {
1,083,334✔
146
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
215,505✔
147
    pNode->nodeId = pReq->learnerReplicas[pCfg->totalReplicaNum].id;
215,505✔
148
    pNode->nodePort = pReq->learnerReplicas[pCfg->totalReplicaNum].port;
215,505✔
149
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
215,505✔
150
    tstrncpy(pNode->nodeFqdn, pReq->learnerReplicas[pCfg->totalReplicaNum].fqdn, sizeof(pNode->nodeFqdn));
215,505✔
151
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
215,505✔
152
    vInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId);
215,505✔
153
    pCfg->totalReplicaNum++;
215,505✔
154
  }
155
  pCfg->totalReplicaNum += pReq->replica;
867,829✔
156
  if (pReq->learnerSelfIndex != -1) {
867,829✔
157
    pCfg->myIndex = pReq->replica + pReq->learnerSelfIndex;
×
158
  }
159
  pCfg->changeVersion = pReq->changeVersion;
867,829✔
160

161
  if (info.config.walCfg.clearFiles) {
867,829✔
162
    info.config.walCfg.clearFiles = 0;
×
163

164
    vInfo("vgId:%d, reset wal clearFiles", pReq->vgId);
×
165
  }
166

167
  vInfo("vgId:%d, save config while alter, replicas:%d totalReplicas:%d selfIndex:%d changeVersion:%d", pReq->vgId,
867,829✔
168
        pCfg->replicaNum, pCfg->totalReplicaNum, pCfg->myIndex, pCfg->changeVersion);
169

170
  info.config.syncCfg = *pCfg;
867,829✔
171
  ret = vnodeSaveInfo(dir, &info);
867,829✔
172
  if (ret < 0) {
867,829✔
173
    vError("vgId:%d, failed to save vnode config since %s", pReq->vgId, tstrerror(terrno));
×
174
    return ret;
×
175
  }
176

177
  ret = vnodeCommitInfo(dir);
867,829✔
178
  if (ret < 0) {
867,829✔
179
    vError("vgId:%d, failed to commit vnode config since %s", pReq->vgId, tstrerror(terrno));
×
180
    return ret;
×
181
  }
182

183
  vInfo("vgId:%d, vnode config is saved", info.config.vgId);
867,829✔
184
  return 0;
867,829✔
185
}
186

187
static int32_t vnodeVgroupIdLen(int32_t vgId) {
89,100✔
188
  char tmp[TSDB_FILENAME_LEN];
89,100✔
189
  (void)snprintf(tmp, TSDB_FILENAME_LEN, "%d", vgId);
89,100✔
190
  return strlen(tmp);
89,100✔
191
}
192

193
int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId,
29,120✔
194
                            int32_t diskPrimary, STfs *pTfs) {
195
  int32_t ret = 0;
29,120✔
196

197
  char oldRname[TSDB_FILENAME_LEN] = {0};
29,120✔
198
  char newRname[TSDB_FILENAME_LEN] = {0};
29,120✔
199
  char tsdbPath[TSDB_FILENAME_LEN] = {0};
29,120✔
200
  char tsdbFilePrefix[TSDB_FILENAME_LEN] = {0};
29,120✔
201
  snprintf(tsdbPath, TSDB_FILENAME_LEN, "%s%stsdb", srcPath, TD_DIRSEP);
29,120✔
202
  snprintf(tsdbFilePrefix, TSDB_FILENAME_LEN, "tsdb%sv", TD_DIRSEP);
29,120✔
203
  int32_t prefixLen = strlen(tsdbFilePrefix);
29,120✔
204

205
  STfsDir *tsdbDir = NULL;
29,120✔
206
  int32_t  tret = tfsOpendir(pTfs, tsdbPath, &tsdbDir);
29,120✔
207
  if (tsdbDir == NULL) {
29,120✔
208
    return 0;
×
209
  }
210

211
  while (1) {
147,340✔
212
    const STfsFile *tsdbFile = tfsReaddir(tsdbDir);
176,460✔
213
    if (tsdbFile == NULL) break;
176,460✔
214
    if (tsdbFile->rname[0] == '\0') continue;
147,340✔
215
    tstrncpy(oldRname, tsdbFile->rname, TSDB_FILENAME_LEN);
147,340✔
216

217
    char *tsdbFilePrefixPos = strstr(oldRname, tsdbFilePrefix);
147,340✔
218
    if (tsdbFilePrefixPos == NULL) continue;
147,340✔
219

220
    int32_t tsdbFileVgId = 0;
89,100✔
221
    ret = taosStr2int32(tsdbFilePrefixPos + prefixLen, &tsdbFileVgId);
89,100✔
222
    if (ret != 0) {
89,100✔
223
      vError("vgId:%d, failed to get tsdb file vgid since %s", dstVgId, tstrerror(ret));
×
224
      tfsClosedir(tsdbDir);
×
225
      return ret;
×
226
    }
227

228
    if (tsdbFileVgId == srcVgId) {
89,100✔
229
      char *tsdbFileSurfixPos = tsdbFilePrefixPos + prefixLen + vnodeVgroupIdLen(srcVgId);
89,100✔
230

231
      tsdbFilePrefixPos[prefixLen] = 0;
89,100✔
232
      snprintf(newRname, TSDB_FILENAME_LEN, "%s%d%s", oldRname, dstVgId, tsdbFileSurfixPos);
89,100✔
233
      vInfo("vgId:%d, rename file from %s to %s", dstVgId, tsdbFile->rname, newRname);
89,100✔
234

235
      ret = tfsRename(pTfs, diskPrimary, tsdbFile->rname, newRname);
89,100✔
236
      if (ret != 0) {
89,100✔
237
        vError("vgId:%d, failed to rename file from %s to %s since %s", dstVgId, tsdbFile->rname, newRname, terrstr());
×
238
        tfsClosedir(tsdbDir);
×
239
        return ret;
×
240
      }
241
    }
242
  }
243

244
  tfsClosedir(tsdbDir);
29,120✔
245

246
  vInfo("vgId:%d, rename dir from %s to %s", dstVgId, srcPath, dstPath);
29,120✔
247
  ret = tfsRename(pTfs, diskPrimary, srcPath, dstPath);
29,120✔
248
  if (ret != 0) {
29,120✔
249
    vError("vgId:%d, failed to rename dir from %s to %s since %s", dstVgId, srcPath, dstPath, terrstr());
×
250
  }
251
  return ret;
29,120✔
252
}
253

254
int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq,
29,120✔
255
                            int32_t diskPrimary, STfs *pTfs) {
256
  SVnodeInfo info = {0};
29,120✔
257
  char       dir[TSDB_FILENAME_LEN] = {0};
29,120✔
258
  int32_t    ret = 0;
29,120✔
259

260
  vnodeGetPrimaryDir(srcPath, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
29,120✔
261

262
  ret = vnodeLoadInfo(dir, &info);
29,120✔
263
  if (ret < 0) {
29,120✔
264
    vError("vgId:%d, failed to read vnode config from %s since %s", pReq->srcVgId, srcPath, tstrerror(terrno));
×
265
    return ret;
×
266
  }
267

268
  vInfo("vgId:%d, alter hashrange from [%u, %u] to [%u, %u]", pReq->srcVgId, info.config.hashBegin, info.config.hashEnd,
29,120✔
269
        pReq->hashBegin, pReq->hashEnd);
270
  info.config.vgId = pReq->dstVgId;
29,120✔
271
  info.config.hashBegin = pReq->hashBegin;
29,120✔
272
  info.config.hashEnd = pReq->hashEnd;
29,120✔
273
  info.config.hashChange = true;
29,120✔
274
  info.config.walCfg.vgId = pReq->dstVgId;
29,120✔
275
  info.config.syncCfg.changeVersion = pReq->changeVersion;
29,120✔
276

277
  SSyncCfg *pCfg = &info.config.syncCfg;
29,120✔
278
  pCfg->myIndex = 0;
29,120✔
279
  pCfg->replicaNum = 1;
29,120✔
280
  pCfg->totalReplicaNum = 1;
29,120✔
281
  memset(&pCfg->nodeInfo, 0, sizeof(pCfg->nodeInfo));
29,120✔
282

283
  vInfo("vgId:%d, alter vnode replicas to 1", pReq->srcVgId);
29,120✔
284
  SNodeInfo *pNode = &pCfg->nodeInfo[0];
29,120✔
285
  pNode->nodePort = tsServerPort;
29,120✔
286
  tstrncpy(pNode->nodeFqdn, tsLocalFqdn, TSDB_FQDN_LEN);
29,120✔
287
  bool ret1 = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
29,120✔
288
  vInfo("vgId:%d, ep:%s:%u dnode:%d", pReq->srcVgId, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId);
29,120✔
289

290
  info.config.syncCfg = *pCfg;
29,120✔
291

292
  ret = vnodeSaveInfo(dir, &info);
29,120✔
293
  if (ret < 0) {
29,120✔
294
    vError("vgId:%d, failed to save vnode config since %s", pReq->dstVgId, tstrerror(terrno));
×
295
    return ret;
×
296
  }
297

298
  ret = vnodeCommitInfo(dir);
29,120✔
299
  if (ret < 0) {
29,120✔
300
    vError("vgId:%d, failed to commit vnode config since %s", pReq->dstVgId, tstrerror(terrno));
×
301
    return ret;
×
302
  }
303

304
  vInfo("vgId:%d, rename %s to %s", pReq->dstVgId, srcPath, dstPath);
29,120✔
305
  ret = vnodeRenameVgroupId(srcPath, dstPath, pReq->srcVgId, pReq->dstVgId, diskPrimary, pTfs);
29,120✔
306
  if (ret < 0) {
29,120✔
307
    vError("vgId:%d, failed to rename vnode from %s to %s since %s", pReq->dstVgId, srcPath, dstPath,
×
308
           tstrerror(terrno));
309
    return ret;
×
310
  }
311

312
  vInfo("vgId:%d, vnode hashrange is altered", info.config.vgId);
29,120✔
313
  return 0;
29,120✔
314
}
315

316
int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId,
×
317
                             int32_t diskPrimary, STfs *pTfs) {
318
  SVnodeInfo info = {0};
×
319
  char       dir[TSDB_FILENAME_LEN] = {0};
×
320
  int32_t    code = 0;
×
321

322
  vnodeGetPrimaryDir(dstPath, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
×
323
  if (vnodeLoadInfo(dir, &info) == 0) {
×
324
    if (info.config.vgId != dstVgId) {
×
325
      vError("vgId:%d, unexpected vnode config.vgId:%d", dstVgId, info.config.vgId);
×
326
      return TSDB_CODE_FAILED;
×
327
    }
328
    return dstVgId;
×
329
  }
330

331
  vnodeGetPrimaryDir(srcPath, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
×
332
  if ((code = vnodeLoadInfo(dir, &info)) < 0) {
×
333
    vError("vgId:%d, failed to read vnode config from %s since %s", srcVgId, srcPath, tstrerror(terrno));
×
334
    return code;
×
335
  }
336

337
  if (info.config.vgId == srcVgId) {
×
338
    vInfo("vgId:%d, rollback alter hashrange", srcVgId);
×
339
    return srcVgId;
×
340
  } else if (info.config.vgId != dstVgId) {
×
341
    vError("vgId:%d, unexpected vnode config.vgId:%d", dstVgId, info.config.vgId);
×
342
    return TSDB_CODE_FAILED;
×
343
  }
344

345
  vInfo("vgId:%d, rename %s to %s", dstVgId, srcPath, dstPath);
×
346
  if (vnodeRenameVgroupId(srcPath, dstPath, srcVgId, dstVgId, diskPrimary, pTfs) < 0) {
×
347
    vError("vgId:%d, failed to rename vnode from %s to %s since %s", dstVgId, srcPath, dstPath, tstrerror(terrno));
×
348
    return TSDB_CODE_FAILED;
×
349
  }
350

351
  return dstVgId;
×
352
}
353

354
void vnodeDestroy(int32_t vgId, const char *path, STfs *pTfs, int32_t nodeId) {
1,924,592✔
355
  vInfo("path:%s is removed while destroy vnode", path);
1,924,592✔
356
  if (tfsRmdir(pTfs, path) < 0) {
1,924,592✔
357
    vError("failed to remove path:%s since %s", path, tstrerror(terrno));
×
358
  }
359

360
#ifdef USE_SHARED_STORAGE
361
  if (nodeId > 0 && vgId > 0 && tsSsEnabled) {
1,924,592✔
362
    // we should only do this on the leader node, but it is ok to do this on all nodes
363
    char prefix[TSDB_FILENAME_LEN];
20,852✔
364
    snprintf(prefix, TSDB_FILENAME_LEN, "vnode%d/", vgId);
20,852✔
365
    int32_t code = tssDeleteFileByPrefixFromDefault(prefix);
20,852✔
366
    if (code < 0) {
20,852✔
367
      vError("vgId:%d, failed to remove vnode files from shared storage since %s", vgId, tstrerror(code));
20,050✔
368
    } else {
369
      vInfo("vgId:%d, removed vnode files from shared storage", vgId);
802✔
370
    }
371
  }
372
#endif
373
}
1,924,592✔
374

375
static int32_t vnodeCheckDisk(int32_t diskPrimary, STfs *pTfs) {
4,906,884✔
376
  int32_t ndisk = 1;
4,906,884✔
377
  if (pTfs) {
4,906,884✔
378
    ndisk = tfsGetDisksAtLevel(pTfs, 0);
4,906,884✔
379
  }
380
  if (diskPrimary < 0 || diskPrimary >= ndisk) {
4,905,104✔
381
    vError("disk:%d is unavailable from the %d disks mounted at level 0", diskPrimary, ndisk);
492✔
382
    return terrno = TSDB_CODE_FS_INVLD_CFG;
492✔
383
  }
384
  return 0;
4,904,617✔
385
}
386

387
SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, STfs *pMountTfs, SMsgCb msgCb, bool force) {
4,880,975✔
388
  SVnode    *pVnode = NULL;
4,880,975✔
389
  SVnodeInfo info = {0};
4,880,975✔
390
  char       dir[TSDB_FILENAME_LEN] = {0};
4,910,051✔
391
  char       tdir[TSDB_FILENAME_LEN * 2] = {0};
4,907,713✔
392
  int32_t    ret = 0;
4,907,859✔
393
  bool       mounted = pMountTfs != NULL;
4,907,859✔
394
  terrno = TSDB_CODE_SUCCESS;
4,907,859✔
395

396
  if (vnodeCheckDisk(diskPrimary, pTfs)) {
4,906,579✔
397
    vError("failed to open vnode from %s since %s. diskPrimary:%d", path, terrstr(), diskPrimary);
×
398
    return NULL;
×
399
  }
400
  vnodeGetPrimaryDir(path, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
4,901,040✔
401

402
  info.config = vnodeCfgDefault;
4,903,401✔
403

404
  // load vnode info
405
  vInfo("vgId:%d, start to vnode load info %s", info.config.vgId, dir);
4,903,401✔
406
  ret = vnodeLoadInfo(dir, &info);
4,921,329✔
407
  if (ret < 0) {
4,901,770✔
408
    vError("failed to open vnode from %s since %s", path, tstrerror(terrno));
×
409
    terrno = TSDB_CODE_NEED_RETRY;
×
410
    return NULL;
×
411
  }
412

413
  if (!mounted && vnodeMkDir(pTfs, path)) {
4,901,770✔
414
    vError("vgId:%d, failed to prepare vnode dir since %s, path: %s", info.config.vgId, strerror(ERRNO), path);
7,583✔
415
    return NULL;
×
416
  }
417
  // save vnode info on dnode ep changed
418
  bool      updated = false;
4,892,963✔
419
  SSyncCfg *pCfg = &info.config.syncCfg;
4,892,963✔
420
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
13,092,664✔
421
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
8,194,129✔
422
    if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) {
8,183,609✔
423
      updated = true;
×
424
    }
425
  }
426
  if (updated) {
4,892,661✔
427
    vInfo("vgId:%d, save vnode info since dnode info changed", info.config.vgId);
×
428
    if (vnodeSaveInfo(dir, &info) < 0) {
×
429
      vError("vgId:%d, failed to save vnode info since %s", info.config.vgId, tstrerror(terrno));
×
430
    }
431

432
    if (vnodeCommitInfo(dir) < 0) {
×
433
      vError("vgId:%d, failed to commit vnode info since %s", info.config.vgId, tstrerror(terrno));
×
434
    }
435
  }
436

437
  // create handle
438
  pVnode = taosMemoryCalloc(1, sizeof(*pVnode) + strlen(path) + 1);
4,892,384✔
439
  if (pVnode == NULL) {
4,909,770✔
440
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
441
    vError("vgId:%d, failed to open vnode since %s", info.config.vgId, tstrerror(terrno));
×
442
    return NULL;
×
443
  }
444

445
  pVnode->path = (char *)&pVnode[1];
4,909,770✔
446
  memcpy(pVnode->path, path, strlen(path) + 1);
4,909,770✔
447
  pVnode->config = info.config;
4,909,770✔
448
  pVnode->state.committed = info.state.committed;
4,910,247✔
449
  pVnode->state.commitTerm = info.state.commitTerm;
4,909,787✔
450
  pVnode->state.commitID = info.state.commitID;
4,909,787✔
451
  pVnode->state.applied = info.state.committed;
4,910,264✔
452
  pVnode->state.applyTerm = info.state.commitTerm;
4,909,770✔
453
  pVnode->pTfs = pTfs;
4,910,264✔
454
  pVnode->pMountTfs = pMountTfs;
4,909,756✔
455
  pVnode->mounted = mounted;
4,909,730✔
456
  pVnode->diskPrimary = diskPrimary;
4,910,250✔
457
  pVnode->msgCb = msgCb;
4,909,770✔
458
  (void)taosThreadMutexInit(&pVnode->lock, NULL);
4,909,716✔
459
  pVnode->blocked = false;
4,909,035✔
460
  pVnode->disableWrite = false;
4,909,538✔
461

462
  if (tsem_init(&pVnode->syncSem, 0, 0) != 0) {
4,909,236✔
463
    vError("vgId:%d, failed to init semaphore", TD_VID(pVnode));
×
464
    goto _err;
×
465
  }
466
  (void)taosThreadMutexInit(&pVnode->mutex, NULL);
4,908,746✔
467
  (void)taosThreadCondInit(&pVnode->poolNotEmpty, NULL);
4,909,781✔
468

469
  vInfo("vgId:%d, finished vnode load info %s, vnode committed:%" PRId64, info.config.vgId, dir,
4,908,239✔
470
        pVnode->state.committed);
471

472
  int8_t rollback = vnodeShouldRollback(pVnode);
4,909,270✔
473

474
  // open buffer pool
475
  vInfo("vgId:%d, start to open vnode buffer pool", TD_VID(pVnode));
4,909,992✔
476
  if (vnodeOpenBufPool(pVnode) < 0) {
4,910,939✔
477
    vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));
×
478
    goto _err;
×
479
  }
480

481
  // open meta
482
  (void)taosThreadRwlockInit(&pVnode->metaRWLock, NULL);
4,910,264✔
483
  vInfo("vgId:%d, start to open vnode meta", TD_VID(pVnode));
4,910,264✔
484
  if (metaOpen(pVnode, &pVnode->pMeta, rollback) < 0) {
4,910,264✔
485
    vError("vgId:%d, failed to open vnode meta since %s", TD_VID(pVnode), tstrerror(terrno));
91✔
486
    goto _err;
91✔
487
  }
488

489
  vInfo("vgId:%d, start to upgrade meta", TD_VID(pVnode));
4,908,636✔
490
  if (!mounted && metaUpgrade(pVnode, &pVnode->pMeta) < 0) {
4,910,173✔
491
    vError("vgId:%d, failed to upgrade meta since %s", TD_VID(pVnode), tstrerror(terrno));
×
492
  }
493

494
  // open tsdb
495
  vInfo("vgId:%d, start to open vnode tsdb", TD_VID(pVnode));
4,907,438✔
496
  if ((terrno = tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, NULL, rollback, force)) < 0) {
4,910,173✔
497
    vError("vgId:%d, failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
×
498
    goto _err;
×
499
  }
500

501
  if (TSDB_CACHE_RESET(pVnode->config)) {
4,897,751✔
502
    // flag vnode tsdb cache
503
    vInfo("vgId:%d, start to flag vnode tsdb cache", TD_VID(pVnode));
×
504

505
    if (metaFlagCache(pVnode) < 0) {
×
506
      vError("vgId:%d, failed to flag tsdb cache since %s", TD_VID(pVnode), tstrerror(terrno));
×
507
      goto _err;
×
508
    }
509
  }
510

511
  // open wal
512
  (void)snprintf(tdir, sizeof(tdir), "%s%s%s", dir, TD_DIRSEP, VNODE_WAL_DIR);
4,898,640✔
513
  ret = taosRealPath(tdir, NULL, sizeof(tdir));
4,898,640✔
514
  TAOS_UNUSED(ret);
515

516
  vInfo("vgId:%d, start to open vnode wal", TD_VID(pVnode));
4,900,017✔
517
  pVnode->pWal = walOpen(tdir, &(pVnode->config.walCfg));
4,908,640✔
518
  if (pVnode->pWal == NULL) {
4,907,893✔
519
    vError("vgId:%d, failed to open vnode wal since %s. wal:%s", TD_VID(pVnode), tstrerror(terrno), tdir);
×
520
    goto _err;
×
521
  }
522

523
  // open tq
524
  (void)snprintf(tdir, sizeof(tdir), "%s%s%s", dir, TD_DIRSEP, VNODE_TQ_DIR);
4,895,700✔
525
  ret = taosRealPath(tdir, NULL, sizeof(tdir));
4,895,700✔
526
  TAOS_UNUSED(ret);
527

528
  // open query
529
  vInfo("vgId:%d, start to open vnode query", TD_VID(pVnode));
4,909,353✔
530
  if (vnodeQueryOpen(pVnode)) {
4,913,416✔
531
    vError("vgId:%d, failed to open vnode query since %s", TD_VID(pVnode), tstrerror(terrno));
×
532
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
533
    goto _err;
×
534
  }
535

536
  // sma required the tq is initialized before the vnode open
537
  vInfo("vgId:%d, start to open vnode tq", TD_VID(pVnode));
4,910,173✔
538
  if (tqOpen(tdir, pVnode)) {
4,910,173✔
539
    vError("vgId:%d, failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno));
×
540
    goto _err;
×
541
  }
542

543
  // open blob store engine
544
  vInfo("vgId:%d, start to open blob store engine", TD_VID(pVnode));
4,905,813✔
545
  (void)snprintf(tdir, sizeof(tdir), "%s%s%s", dir, TD_DIRSEP, VNODE_BSE_DIR);
4,911,028✔
546

547
  SBseCfg cfg = {
9,818,457✔
548
      .vgId = pVnode->config.vgId,
4,910,173✔
549
      .keepDays = pVnode->config.tsdbCfg.days,
4,910,173✔
550
      .keeps = pVnode->config.tsdbCfg.keep0,
4,910,173✔
551
      .retention = pVnode->config.tsdbCfg.retentions[0],
552
      .precision = pVnode->config.tsdbCfg.precision,
4,910,173✔
553
  };
554
  tstrncpy(cfg.encryptAlgrName, pVnode->config.tdbEncryptData.encryptAlgrName, TSDB_ENCRYPT_ALGR_NAME_LEN);
4,910,173✔
555
  tstrncpy(cfg.encryptKey, tsDataKey, ENCRYPT_KEY_LEN + 1);
4,910,173✔
556

557
  ret = bseOpen(tdir, &cfg, &pVnode->pBse);
4,909,529✔
558
  if (ret != 0) {
4,910,173✔
559
    vError("vgId:%d, failed to open blob store engine since %s", TD_VID(pVnode), tstrerror(ret));
×
560
    terrno = ret;
×
561
    goto _err;
×
562
  }
563

564
  // vnode begin
565
  vInfo("vgId:%d, start to begin vnode", TD_VID(pVnode));
4,910,173✔
566
  if (vnodeBegin(pVnode) < 0) {
4,910,173✔
567
    vError("vgId:%d, failed to begin since %s", TD_VID(pVnode), tstrerror(terrno));
×
568
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
569
    goto _err;
×
570
  }
571

572
  // open sync
573
  vInfo("vgId:%d, start to open sync, changeVersion:%d", TD_VID(pVnode), info.config.syncCfg.changeVersion);
4,905,066✔
574
  if (vnodeSyncOpen(pVnode, dir, info.config.syncCfg.changeVersion)) {
4,915,846✔
575
    vError("vgId:%d, failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno));
×
576
    goto _err;
×
577
  }
578

579
  if (rollback) {
4,910,173✔
580
    vnodeRollback(pVnode);
×
581
  }
582

583
  snprintf(pVnode->monitor.strClusterId, TSDB_CLUSTER_ID_LEN, "%" PRId64, pVnode->config.syncCfg.nodeInfo[0].clusterId);
4,910,173✔
584
  snprintf(pVnode->monitor.strDnodeId, TSDB_NODE_ID_LEN, "%" PRId32, pVnode->config.syncCfg.nodeInfo[0].nodeId);
4,910,173✔
585
  snprintf(pVnode->monitor.strVgId, TSDB_VGROUP_ID_LEN, "%" PRId32, pVnode->config.vgId);
4,910,173✔
586

587
  return pVnode;
4,910,173✔
588

589
_err:
91✔
590
  if (pVnode->pQuery) vnodeQueryClose(pVnode);
91✔
591
  if (pVnode->pTq) tqClose(pVnode->pTq);
91✔
592
  if (pVnode->pWal) walClose(pVnode->pWal);
91✔
593
  if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
91✔
594
  if (pVnode->pMeta) metaClose(&pVnode->pMeta);
91✔
595
  if (pVnode->pBse) {
91✔
596
    bseClose(pVnode->pBse);
×
597
    pVnode->pBse = NULL;
×
598
  }
599
  if (pVnode->freeList) vnodeCloseBufPool(pVnode);
91✔
600

601
  (void)taosThreadRwlockDestroy(&pVnode->metaRWLock);
91✔
602
  taosMemoryFree(pVnode);
91✔
603
  return NULL;
91✔
604
}
605

606
void vnodePreClose(SVnode *pVnode) {
4,910,173✔
607
  streamRemoveVnodeLeader(pVnode->config.vgId);
4,910,173✔
608
  vnodeSyncPreClose(pVnode);
4,910,087✔
609
  vnodeQueryPreClose(pVnode);
4,909,654✔
610
}
4,909,287✔
611

612
void vnodePostClose(SVnode *pVnode) { vnodeSyncPostClose(pVnode); }
4,910,173✔
613

614
void vnodeClose(SVnode *pVnode) {
4,910,115✔
615
  if (pVnode) {
4,910,115✔
616
    vInfo("start to close vnode");
4,910,115✔
617
    vnodeAWait(&pVnode->commitTask2);
4,910,173✔
618
    vnodeAWait(&pVnode->commitTask);
4,910,115✔
619
    vnodeSyncClose(pVnode);
4,910,115✔
620
    vnodeQueryClose(pVnode);
4,910,173✔
621
    tqClose(pVnode->pTq);
4,910,173✔
622
    walClose(pVnode->pWal);
4,910,173✔
623
    if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
4,908,671✔
624
    if (pVnode->pMeta) metaClose(&pVnode->pMeta);
4,909,511✔
625
    vnodeCloseBufPool(pVnode);
4,910,283✔
626

627
    if (pVnode->pBse) {
4,910,115✔
628
      bseClose(pVnode->pBse);
4,910,173✔
629
    }
630

631
    // destroy handle
632
    if (tsem_destroy(&pVnode->syncSem) != 0) {
4,907,084✔
633
      vError("vgId:%d, failed to destroy semaphore", TD_VID(pVnode));
×
634
    }
635
    (void)taosThreadCondDestroy(&pVnode->poolNotEmpty);
4,906,390✔
636
    (void)taosThreadMutexDestroy(&pVnode->mutex);
4,904,981✔
637
    (void)taosThreadMutexDestroy(&pVnode->lock);
4,908,461✔
638
    taosMemoryFree(pVnode);
4,909,231✔
639
  }
640
}
4,906,187✔
641

642
// start the sync timer after the queue is ready
643
int32_t vnodeStart(SVnode *pVnode) {
4,909,900✔
644
  if (pVnode == NULL) {
4,909,900✔
645
    return TSDB_CODE_INVALID_PARA;
×
646
  }
647
  return vnodeSyncStart(pVnode);
4,909,900✔
648
}
649

650
int32_t vnodeIsCatchUp(SVnode *pVnode) { return syncIsCatchUp(pVnode->sync); }
2,473,604✔
651

652
ESyncRole vnodeGetRole(SVnode *pVnode) { return syncGetRole(pVnode->sync); }
2,473,604✔
653

654
int32_t vnodeUpdateArbTerm(SVnode *pVnode, int64_t arbTerm) { return syncUpdateArbTerm(pVnode->sync, arbTerm); }
115,904✔
655
int32_t vnodeGetArbToken(SVnode *pVnode, char *outToken) { return syncGetArbToken(pVnode->sync, outToken); }
148,545✔
656

657
int32_t vnodeSetWalKeepVersion(SVnode *pVnode, int64_t keepVersion) {
3,810✔
658
  if (pVnode == NULL || pVnode->pWal == NULL) {
3,810✔
659
    return TSDB_CODE_INVALID_PARA;
×
660
  }
661
  return walSetKeepVersion(pVnode->pWal, keepVersion);
3,810✔
662
}
663

664
void vnodeStop(SVnode *pVnode) {}
×
665

666
int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; }
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc